This is an automated email from the ASF dual-hosted git repository.
mgrund pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git
The following commit(s) were added to refs/heads/master by this push:
new 6f295b6 #53 Adding support for UserAgent and UserId
6f295b6 is described below
commit 6f295b602ce40cf5a116d3d4aec7b771f60a5f3c
Author: Martin Grund <[email protected]>
AuthorDate: Fri Jan 3 07:40:41 2025 +0100
#53 Adding support for UserAgent and UserId
### What changes were proposed in this pull request?
This patch adds proper support for the `user_agent` and `user_id` property
of the connection string. In addition, it provides sensible default values for
these parameters when connecting from a Go application.
### Why are the changes needed?
Compatibility
### Does this PR introduce _any_ user-facing change?
Compatibility
### How was this patch tested?
Added tests
Closes #105 from grundprinzip/53.
Authored-by: Martin Grund <[email protected]>
Signed-off-by: Martin Grund <[email protected]>
---
spark/client/channel/channel.go | 36 +++++++
spark/client/channel/channel_test.go | 16 +++
spark/client/client.go | 138 +++++++++---------------
spark/client/options/options.go | 8 ++
spark/sql/sparksession.go | 8 +-
spark/{client/options/options.go => version.go} | 12 +--
6 files changed, 123 insertions(+), 95 deletions(-)
diff --git a/spark/client/channel/channel.go b/spark/client/channel/channel.go
index 4696db7..a31287e 100644
--- a/spark/client/channel/channel.go
+++ b/spark/client/channel/channel.go
@@ -24,9 +24,13 @@ import (
"fmt"
"net"
"net/url"
+ "os"
+ "runtime"
"strconv"
"strings"
+ "github.com/apache/spark-connect-go/v35/spark"
+
"github.com/google/uuid"
"google.golang.org/grpc/credentials/insecure"
@@ -56,6 +60,9 @@ type Builder interface {
// SessionId identifies the client side session identifier. This value
must be a UUID formatted
// as a string.
SessionId() string
+ // UserAgent identifies the user agent string that is passed as part of
the request. It contains
+ // information about the operating system, Go version etc.
+ UserAgent() string
}
// BaseBuilder is used to parse the different parameters of the connection
@@ -69,6 +76,7 @@ type BaseBuilder struct {
user string
headers map[string]string
sessionId string
+ userAgent string
}
func (cb *BaseBuilder) Host() string {
@@ -95,6 +103,10 @@ func (cb *BaseBuilder) SessionId() string {
return cb.sessionId
}
+func (cb *BaseBuilder) UserAgent() string {
+ return cb.userAgent
+}
+
// Build finalizes the creation of the gprc.ClientConn by creating a GRPC
channel
// with the necessary options extracted from the connection string. For
// TLS connections, this function will load the system certificates.
@@ -178,6 +190,7 @@ func NewBuilder(connection string) (*BaseBuilder, error) {
port: port,
headers: map[string]string{},
sessionId: uuid.NewString(),
+ userAgent: "",
}
elements := strings.Split(u.Path, ";")
@@ -190,10 +203,33 @@ func NewBuilder(connection string) (*BaseBuilder, error) {
cb.user = props[1]
} else if props[0] == "session_id" {
cb.sessionId = props[1]
+ } else if props[0] == "user_agent" {
+ cb.userAgent = props[1]
} else {
cb.headers[props[0]] = props[1]
}
}
}
+
+ // Set default user ID if not set.
+ if cb.user == "" {
+ cb.user = os.Getenv("USER")
+ if cb.user == "" {
+ cb.user = "na"
+ }
+ }
+
+ // Update the user agent if it is not set or set to a custom value.
+ val := os.Getenv("SPARK_CONNECT_USER_AGENT")
+ if cb.userAgent == "" && val != "" {
+ cb.userAgent = os.Getenv("SPARK_CONNECT_USER_AGENT")
+ } else if cb.userAgent == "" {
+ cb.userAgent = "_SPARK_CONNECT_GO"
+ }
+
+ // In addition, to the specified user agent, we need to append
information about the
+ // host encoded as user agent components.
+ cb.userAgent = fmt.Sprintf("%s spark/%s os/%s go/%s", cb.userAgent,
spark.Version(), runtime.GOOS, runtime.Version())
+
return cb, nil
}
diff --git a/spark/client/channel/channel_test.go
b/spark/client/channel/channel_test.go
index 5099949..d5fb834 100644
--- a/spark/client/channel/channel_test.go
+++ b/spark/client/channel/channel_test.go
@@ -96,3 +96,19 @@ func TestChannelBuildConnect(t *testing.T) {
assert.Nil(t, err, "no error for proper connection")
assert.NotNil(t, conn)
}
+
+func TestChannelBulder_UserAgent(t *testing.T) {
+ cb, err := channel.NewBuilder("sc://localhost")
+ assert.NoError(t, err)
+ assert.True(t, strings.Contains(cb.UserAgent(), "_SPARK_CONNECT_GO"))
+ assert.True(t, strings.Contains(cb.UserAgent(), "go/"))
+ assert.True(t, strings.Contains(cb.UserAgent(), "spark/"))
+ assert.True(t, strings.Contains(cb.UserAgent(), "os/"))
+
+ cb, err = channel.NewBuilder("sc://localhost/;user_agent=custom")
+ assert.NoError(t, err)
+ assert.True(t, strings.Contains(cb.UserAgent(), "custom"))
+ assert.True(t, strings.Contains(cb.UserAgent(), "go/"))
+ assert.True(t, strings.Contains(cb.UserAgent(), "spark/"))
+ assert.True(t, strings.Contains(cb.UserAgent(), "os/"))
+}
diff --git a/spark/client/client.go b/spark/client/client.go
index 7c827da..e71daea 100644
--- a/spark/client/client.go
+++ b/spark/client/client.go
@@ -56,8 +56,9 @@ func (s *sparkConnectClientImpl) newExecutePlanRequest(plan
*proto.Plan) *proto.
SessionId: s.sessionId,
Plan: plan,
UserContext: &proto.UserContext{
- UserId: "na",
+ UserId: s.opts.UserId,
},
+ ClientType: &s.opts.UserAgent,
// Operation ID is needed for being able to reattach.
OperationId: &operationId,
RequestOptions: []*proto.ExecutePlanRequest_RequestOption{
@@ -109,16 +110,22 @@ func (s *sparkConnectClientImpl) ExecutePlan(ctx
context.Context, plan *proto.Pl
return NewExecuteResponseStream(c, s.sessionId, *request.OperationId,
s.opts), nil
}
-func (s *sparkConnectClientImpl) AnalyzePlan(ctx context.Context, plan
*proto.Plan) (*proto.AnalyzePlanResponse, error) {
- request := proto.AnalyzePlanRequest{
+// Creates a new AnalyzePlanRequest with the necessary metadata.
+func (s *sparkConnectClientImpl) newAnalyzePlanStub() proto.AnalyzePlanRequest
{
+ return proto.AnalyzePlanRequest{
SessionId: s.sessionId,
- Analyze: &proto.AnalyzePlanRequest_Schema_{
- Schema: &proto.AnalyzePlanRequest_Schema{
- Plan: plan,
- },
- },
UserContext: &proto.UserContext{
- UserId: "na",
+ UserId: s.opts.UserId,
+ },
+ ClientType: &s.opts.UserAgent,
+ }
+}
+
+func (s *sparkConnectClientImpl) AnalyzePlan(ctx context.Context, plan
*proto.Plan) (*proto.AnalyzePlanResponse, error) {
+ request := s.newAnalyzePlanStub()
+ request.Analyze = &proto.AnalyzePlanRequest_Schema_{
+ Schema: &proto.AnalyzePlanRequest_Schema{
+ Plan: plan,
},
}
// Append the other items to the request.
@@ -149,17 +156,11 @@ func (s *sparkConnectClientImpl) Explain(ctx
context.Context, plan *proto.Plan,
return nil, sparkerrors.WithType(fmt.Errorf("unsupported
explain mode %v",
explainMode), sparkerrors.InvalidArgumentError)
}
-
- request := proto.AnalyzePlanRequest{
- SessionId: s.sessionId,
- Analyze: &proto.AnalyzePlanRequest_Explain_{
- Explain: &proto.AnalyzePlanRequest_Explain{
- Plan: plan,
- ExplainMode: mode,
- },
- },
- UserContext: &proto.UserContext{
- UserId: "na",
+ request := s.newAnalyzePlanStub()
+ request.Analyze = &proto.AnalyzePlanRequest_Explain_{
+ Explain: &proto.AnalyzePlanRequest_Explain{
+ Plan: plan,
+ ExplainMode: mode,
},
}
// Append the other items to the request.
@@ -174,17 +175,11 @@ func (s *sparkConnectClientImpl) Explain(ctx
context.Context, plan *proto.Plan,
func (s *sparkConnectClientImpl) Persist(ctx context.Context, plan
*proto.Plan, storageLevel utils.StorageLevel) error {
protoLevel := utils.ToProtoStorageLevel(storageLevel)
-
- request := proto.AnalyzePlanRequest{
- SessionId: s.sessionId,
- Analyze: &proto.AnalyzePlanRequest_Persist_{
- Persist: &proto.AnalyzePlanRequest_Persist{
- Relation: plan.GetRoot(),
- StorageLevel: protoLevel,
- },
- },
- UserContext: &proto.UserContext{
- UserId: "na",
+ request := s.newAnalyzePlanStub()
+ request.Analyze = &proto.AnalyzePlanRequest_Persist_{
+ Persist: &proto.AnalyzePlanRequest_Persist{
+ Relation: plan.GetRoot(),
+ StorageLevel: protoLevel,
},
}
// Append the other items to the request.
@@ -198,15 +193,10 @@ func (s *sparkConnectClientImpl) Persist(ctx
context.Context, plan *proto.Plan,
}
func (s *sparkConnectClientImpl) Unpersist(ctx context.Context, plan
*proto.Plan) error {
- request := proto.AnalyzePlanRequest{
- SessionId: s.sessionId,
- Analyze: &proto.AnalyzePlanRequest_Unpersist_{
- Unpersist: &proto.AnalyzePlanRequest_Unpersist{
- Relation: plan.GetRoot(),
- },
- },
- UserContext: &proto.UserContext{
- UserId: "na",
+ request := s.newAnalyzePlanStub()
+ request.Analyze = &proto.AnalyzePlanRequest_Unpersist_{
+ Unpersist: &proto.AnalyzePlanRequest_Unpersist{
+ Relation: plan.GetRoot(),
},
}
// Append the other items to the request.
@@ -220,15 +210,10 @@ func (s *sparkConnectClientImpl) Unpersist(ctx
context.Context, plan *proto.Plan
}
func (s *sparkConnectClientImpl) GetStorageLevel(ctx context.Context, plan
*proto.Plan) (*utils.StorageLevel, error) {
- request := proto.AnalyzePlanRequest{
- SessionId: s.sessionId,
- Analyze: &proto.AnalyzePlanRequest_GetStorageLevel_{
- GetStorageLevel:
&proto.AnalyzePlanRequest_GetStorageLevel{
- Relation: plan.GetRoot(),
- },
- },
- UserContext: &proto.UserContext{
- UserId: "na",
+ request := s.newAnalyzePlanStub()
+ request.Analyze = &proto.AnalyzePlanRequest_GetStorageLevel_{
+ GetStorageLevel: &proto.AnalyzePlanRequest_GetStorageLevel{
+ Relation: plan.GetRoot(),
},
}
// Append the other items to the request.
@@ -245,14 +230,9 @@ func (s *sparkConnectClientImpl) GetStorageLevel(ctx
context.Context, plan *prot
}
func (s *sparkConnectClientImpl) SparkVersion(ctx context.Context) (string,
error) {
- request := proto.AnalyzePlanRequest{
- SessionId: s.sessionId,
- Analyze: &proto.AnalyzePlanRequest_SparkVersion_{
- SparkVersion: &proto.AnalyzePlanRequest_SparkVersion{},
- },
- UserContext: &proto.UserContext{
- UserId: "na",
- },
+ request := s.newAnalyzePlanStub()
+ request.Analyze = &proto.AnalyzePlanRequest_SparkVersion_{
+ SparkVersion: &proto.AnalyzePlanRequest_SparkVersion{},
}
// Append the other items to the request.
ctx = metadata.NewOutgoingContext(ctx, s.metadata)
@@ -265,15 +245,10 @@ func (s *sparkConnectClientImpl) SparkVersion(ctx
context.Context) (string, erro
}
func (s *sparkConnectClientImpl) DDLParse(ctx context.Context, sql string)
(*types.StructType, error) {
- request := proto.AnalyzePlanRequest{
- SessionId: s.sessionId,
- Analyze: &proto.AnalyzePlanRequest_DdlParse{
- DdlParse: &proto.AnalyzePlanRequest_DDLParse{
- DdlString: sql,
- },
- },
- UserContext: &proto.UserContext{
- UserId: "na",
+ request := s.newAnalyzePlanStub()
+ request.Analyze = &proto.AnalyzePlanRequest_DdlParse{
+ DdlParse: &proto.AnalyzePlanRequest_DDLParse{
+ DdlString: sql,
},
}
// Append the other items to the request.
@@ -287,16 +262,11 @@ func (s *sparkConnectClientImpl) DDLParse(ctx
context.Context, sql string) (*typ
}
func (s *sparkConnectClientImpl) SameSemantics(ctx context.Context, plan1
*proto.Plan, plan2 *proto.Plan) (bool, error) {
- request := proto.AnalyzePlanRequest{
- SessionId: s.sessionId,
- Analyze: &proto.AnalyzePlanRequest_SameSemantics_{
- SameSemantics: &proto.AnalyzePlanRequest_SameSemantics{
- TargetPlan: plan1,
- OtherPlan: plan2,
- },
- },
- UserContext: &proto.UserContext{
- UserId: "na",
+ request := s.newAnalyzePlanStub()
+ request.Analyze = &proto.AnalyzePlanRequest_SameSemantics_{
+ SameSemantics: &proto.AnalyzePlanRequest_SameSemantics{
+ TargetPlan: plan1,
+ OtherPlan: plan2,
},
}
// Append the other items to the request.
@@ -310,15 +280,10 @@ func (s *sparkConnectClientImpl) SameSemantics(ctx
context.Context, plan1 *proto
}
func (s *sparkConnectClientImpl) SemanticHash(ctx context.Context, plan
*proto.Plan) (int32, error) {
- request := proto.AnalyzePlanRequest{
- SessionId: s.sessionId,
- Analyze: &proto.AnalyzePlanRequest_SemanticHash_{
- SemanticHash: &proto.AnalyzePlanRequest_SemanticHash{
- Plan: plan,
- },
- },
- UserContext: &proto.UserContext{
- UserId: "na",
+ request := s.newAnalyzePlanStub()
+ request.Analyze = &proto.AnalyzePlanRequest_SemanticHash_{
+ SemanticHash: &proto.AnalyzePlanRequest_SemanticHash{
+ Plan: plan,
},
}
// Append the other items to the request.
@@ -337,8 +302,9 @@ func (s *sparkConnectClientImpl) Config(ctx context.Context,
request := &proto.ConfigRequest{
Operation: operation,
UserContext: &proto.UserContext{
- UserId: "na",
+ UserId: s.opts.UserId,
},
+ ClientType: &s.opts.UserAgent,
}
request.SessionId = s.sessionId
resp, err := s.client.Config(ctx, request)
diff --git a/spark/client/options/options.go b/spark/client/options/options.go
index e88b802..35dbeca 100644
--- a/spark/client/options/options.go
+++ b/spark/client/options/options.go
@@ -17,8 +17,16 @@ package options
type SparkClientOptions struct {
ReattachExecution bool
+ UserAgent string
+ UserId string
}
var DefaultSparkClientOptions = SparkClientOptions{
ReattachExecution: false,
}
+
+func NewSparkClientOptions(reattach bool) SparkClientOptions {
+ return SparkClientOptions{
+ ReattachExecution: reattach,
+ }
+}
diff --git a/spark/sql/sparksession.go b/spark/sql/sparksession.go
index 4bf94cc..fbaf5a5 100644
--- a/spark/sql/sparksession.go
+++ b/spark/sql/sparksession.go
@@ -93,9 +93,15 @@ func (s *SparkSessionBuilder) Build(ctx context.Context)
(SparkSession, error) {
}
sessionId := uuid.NewString()
+
+ // Update the options according to the configuration.
+ opts :=
options.NewSparkClientOptions(options.DefaultSparkClientOptions.ReattachExecution)
+ opts.UserAgent = s.channelBuilder.UserAgent()
+ opts.UserId = s.channelBuilder.User()
+
return &sparkSessionImpl{
sessionId: sessionId,
- client: client.NewSparkExecutor(conn, meta, sessionId,
options.DefaultSparkClientOptions),
+ client: client.NewSparkExecutor(conn, meta, sessionId, opts),
}, nil
}
diff --git a/spark/client/options/options.go b/spark/version.go
similarity index 78%
copy from spark/client/options/options.go
copy to spark/version.go
index e88b802..8386b5e 100644
--- a/spark/client/options/options.go
+++ b/spark/version.go
@@ -5,7 +5,7 @@
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
-// http://www.apache.org/licenses/LICENSE-2.0
+// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
@@ -13,12 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package options
+package spark
-type SparkClientOptions struct {
- ReattachExecution bool
-}
-
-var DefaultSparkClientOptions = SparkClientOptions{
- ReattachExecution: false,
+func Version() string {
+ return "3.5.x"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]