This is an automated email from the ASF dual-hosted git repository.

mgrund pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f295b6  #53 Adding support for UserAgent and UserId
6f295b6 is described below

commit 6f295b602ce40cf5a116d3d4aec7b771f60a5f3c
Author: Martin Grund <[email protected]>
AuthorDate: Fri Jan 3 07:40:41 2025 +0100

    #53 Adding support for UserAgent and UserId
    
    ### What changes were proposed in this pull request?
    This patch adds proper support for the `user_agent` and `user_id` property 
of the connection string. In addition, it provides sensible default values for 
these parameters when connecting from a Go application.
    
    ### Why are the changes needed?
    Compatibility
    
    ### Does this PR introduce _any_ user-facing change?
    Compatibility
    
    ### How was this patch tested?
    Added tests
    
    Closes #105 from grundprinzip/53.
    
    Authored-by: Martin Grund <[email protected]>
    Signed-off-by: Martin Grund <[email protected]>
---
 spark/client/channel/channel.go                 |  36 +++++++
 spark/client/channel/channel_test.go            |  16 +++
 spark/client/client.go                          | 138 +++++++++---------------
 spark/client/options/options.go                 |   8 ++
 spark/sql/sparksession.go                       |   8 +-
 spark/{client/options/options.go => version.go} |  12 +--
 6 files changed, 123 insertions(+), 95 deletions(-)

diff --git a/spark/client/channel/channel.go b/spark/client/channel/channel.go
index 4696db7..a31287e 100644
--- a/spark/client/channel/channel.go
+++ b/spark/client/channel/channel.go
@@ -24,9 +24,13 @@ import (
        "fmt"
        "net"
        "net/url"
+       "os"
+       "runtime"
        "strconv"
        "strings"
 
+       "github.com/apache/spark-connect-go/v35/spark"
+
        "github.com/google/uuid"
 
        "google.golang.org/grpc/credentials/insecure"
@@ -56,6 +60,9 @@ type Builder interface {
        // SessionId identifies the client side session identifier. This value 
must be a UUID formatted
        // as a string.
        SessionId() string
+       // UserAgent identifies the user agent string that is passed as part of 
the request. It contains
+       // information about the operating system, Go version etc.
+       UserAgent() string
 }
 
 // BaseBuilder is used to parse the different parameters of the connection
@@ -69,6 +76,7 @@ type BaseBuilder struct {
        user      string
        headers   map[string]string
        sessionId string
+       userAgent string
 }
 
 func (cb *BaseBuilder) Host() string {
@@ -95,6 +103,10 @@ func (cb *BaseBuilder) SessionId() string {
        return cb.sessionId
 }
 
+func (cb *BaseBuilder) UserAgent() string {
+       return cb.userAgent
+}
+
 // Build finalizes the creation of the gprc.ClientConn by creating a GRPC 
channel
 // with the necessary options extracted from the connection string. For
 // TLS connections, this function will load the system certificates.
@@ -178,6 +190,7 @@ func NewBuilder(connection string) (*BaseBuilder, error) {
                port:      port,
                headers:   map[string]string{},
                sessionId: uuid.NewString(),
+               userAgent: "",
        }
 
        elements := strings.Split(u.Path, ";")
@@ -190,10 +203,33 @@ func NewBuilder(connection string) (*BaseBuilder, error) {
                                cb.user = props[1]
                        } else if props[0] == "session_id" {
                                cb.sessionId = props[1]
+                       } else if props[0] == "user_agent" {
+                               cb.userAgent = props[1]
                        } else {
                                cb.headers[props[0]] = props[1]
                        }
                }
        }
+
+       // Set default user ID if not set.
+       if cb.user == "" {
+               cb.user = os.Getenv("USER")
+               if cb.user == "" {
+                       cb.user = "na"
+               }
+       }
+
+       // Update the user agent if it is not set or set to a custom value.
+       val := os.Getenv("SPARK_CONNECT_USER_AGENT")
+       if cb.userAgent == "" && val != "" {
+               cb.userAgent = os.Getenv("SPARK_CONNECT_USER_AGENT")
+       } else if cb.userAgent == "" {
+               cb.userAgent = "_SPARK_CONNECT_GO"
+       }
+
+       // In addition, to the specified user agent, we need to append 
information about the
+       // host encoded as user agent components.
+       cb.userAgent = fmt.Sprintf("%s spark/%s os/%s go/%s", cb.userAgent, 
spark.Version(), runtime.GOOS, runtime.Version())
+
        return cb, nil
 }
diff --git a/spark/client/channel/channel_test.go 
b/spark/client/channel/channel_test.go
index 5099949..d5fb834 100644
--- a/spark/client/channel/channel_test.go
+++ b/spark/client/channel/channel_test.go
@@ -96,3 +96,19 @@ func TestChannelBuildConnect(t *testing.T) {
        assert.Nil(t, err, "no error for proper connection")
        assert.NotNil(t, conn)
 }
+
+func TestChannelBulder_UserAgent(t *testing.T) {
+       cb, err := channel.NewBuilder("sc://localhost")
+       assert.NoError(t, err)
+       assert.True(t, strings.Contains(cb.UserAgent(), "_SPARK_CONNECT_GO"))
+       assert.True(t, strings.Contains(cb.UserAgent(), "go/"))
+       assert.True(t, strings.Contains(cb.UserAgent(), "spark/"))
+       assert.True(t, strings.Contains(cb.UserAgent(), "os/"))
+
+       cb, err = channel.NewBuilder("sc://localhost/;user_agent=custom")
+       assert.NoError(t, err)
+       assert.True(t, strings.Contains(cb.UserAgent(), "custom"))
+       assert.True(t, strings.Contains(cb.UserAgent(), "go/"))
+       assert.True(t, strings.Contains(cb.UserAgent(), "spark/"))
+       assert.True(t, strings.Contains(cb.UserAgent(), "os/"))
+}
diff --git a/spark/client/client.go b/spark/client/client.go
index 7c827da..e71daea 100644
--- a/spark/client/client.go
+++ b/spark/client/client.go
@@ -56,8 +56,9 @@ func (s *sparkConnectClientImpl) newExecutePlanRequest(plan 
*proto.Plan) *proto.
                SessionId: s.sessionId,
                Plan:      plan,
                UserContext: &proto.UserContext{
-                       UserId: "na",
+                       UserId: s.opts.UserId,
                },
+               ClientType: &s.opts.UserAgent,
                // Operation ID is needed for being able to reattach.
                OperationId: &operationId,
                RequestOptions: []*proto.ExecutePlanRequest_RequestOption{
@@ -109,16 +110,22 @@ func (s *sparkConnectClientImpl) ExecutePlan(ctx 
context.Context, plan *proto.Pl
        return NewExecuteResponseStream(c, s.sessionId, *request.OperationId, 
s.opts), nil
 }
 
-func (s *sparkConnectClientImpl) AnalyzePlan(ctx context.Context, plan 
*proto.Plan) (*proto.AnalyzePlanResponse, error) {
-       request := proto.AnalyzePlanRequest{
+// Creates a new AnalyzePlanRequest with the necessary metadata.
+func (s *sparkConnectClientImpl) newAnalyzePlanStub() proto.AnalyzePlanRequest 
{
+       return proto.AnalyzePlanRequest{
                SessionId: s.sessionId,
-               Analyze: &proto.AnalyzePlanRequest_Schema_{
-                       Schema: &proto.AnalyzePlanRequest_Schema{
-                               Plan: plan,
-                       },
-               },
                UserContext: &proto.UserContext{
-                       UserId: "na",
+                       UserId: s.opts.UserId,
+               },
+               ClientType: &s.opts.UserAgent,
+       }
+}
+
+func (s *sparkConnectClientImpl) AnalyzePlan(ctx context.Context, plan 
*proto.Plan) (*proto.AnalyzePlanResponse, error) {
+       request := s.newAnalyzePlanStub()
+       request.Analyze = &proto.AnalyzePlanRequest_Schema_{
+               Schema: &proto.AnalyzePlanRequest_Schema{
+                       Plan: plan,
                },
        }
        // Append the other items to the request.
@@ -149,17 +156,11 @@ func (s *sparkConnectClientImpl) Explain(ctx 
context.Context, plan *proto.Plan,
                return nil, sparkerrors.WithType(fmt.Errorf("unsupported 
explain mode %v",
                        explainMode), sparkerrors.InvalidArgumentError)
        }
-
-       request := proto.AnalyzePlanRequest{
-               SessionId: s.sessionId,
-               Analyze: &proto.AnalyzePlanRequest_Explain_{
-                       Explain: &proto.AnalyzePlanRequest_Explain{
-                               Plan:        plan,
-                               ExplainMode: mode,
-                       },
-               },
-               UserContext: &proto.UserContext{
-                       UserId: "na",
+       request := s.newAnalyzePlanStub()
+       request.Analyze = &proto.AnalyzePlanRequest_Explain_{
+               Explain: &proto.AnalyzePlanRequest_Explain{
+                       Plan:        plan,
+                       ExplainMode: mode,
                },
        }
        // Append the other items to the request.
@@ -174,17 +175,11 @@ func (s *sparkConnectClientImpl) Explain(ctx 
context.Context, plan *proto.Plan,
 
 func (s *sparkConnectClientImpl) Persist(ctx context.Context, plan 
*proto.Plan, storageLevel utils.StorageLevel) error {
        protoLevel := utils.ToProtoStorageLevel(storageLevel)
-
-       request := proto.AnalyzePlanRequest{
-               SessionId: s.sessionId,
-               Analyze: &proto.AnalyzePlanRequest_Persist_{
-                       Persist: &proto.AnalyzePlanRequest_Persist{
-                               Relation:     plan.GetRoot(),
-                               StorageLevel: protoLevel,
-                       },
-               },
-               UserContext: &proto.UserContext{
-                       UserId: "na",
+       request := s.newAnalyzePlanStub()
+       request.Analyze = &proto.AnalyzePlanRequest_Persist_{
+               Persist: &proto.AnalyzePlanRequest_Persist{
+                       Relation:     plan.GetRoot(),
+                       StorageLevel: protoLevel,
                },
        }
        // Append the other items to the request.
@@ -198,15 +193,10 @@ func (s *sparkConnectClientImpl) Persist(ctx 
context.Context, plan *proto.Plan,
 }
 
 func (s *sparkConnectClientImpl) Unpersist(ctx context.Context, plan 
*proto.Plan) error {
-       request := proto.AnalyzePlanRequest{
-               SessionId: s.sessionId,
-               Analyze: &proto.AnalyzePlanRequest_Unpersist_{
-                       Unpersist: &proto.AnalyzePlanRequest_Unpersist{
-                               Relation: plan.GetRoot(),
-                       },
-               },
-               UserContext: &proto.UserContext{
-                       UserId: "na",
+       request := s.newAnalyzePlanStub()
+       request.Analyze = &proto.AnalyzePlanRequest_Unpersist_{
+               Unpersist: &proto.AnalyzePlanRequest_Unpersist{
+                       Relation: plan.GetRoot(),
                },
        }
        // Append the other items to the request.
@@ -220,15 +210,10 @@ func (s *sparkConnectClientImpl) Unpersist(ctx 
context.Context, plan *proto.Plan
 }
 
 func (s *sparkConnectClientImpl) GetStorageLevel(ctx context.Context, plan 
*proto.Plan) (*utils.StorageLevel, error) {
-       request := proto.AnalyzePlanRequest{
-               SessionId: s.sessionId,
-               Analyze: &proto.AnalyzePlanRequest_GetStorageLevel_{
-                       GetStorageLevel: 
&proto.AnalyzePlanRequest_GetStorageLevel{
-                               Relation: plan.GetRoot(),
-                       },
-               },
-               UserContext: &proto.UserContext{
-                       UserId: "na",
+       request := s.newAnalyzePlanStub()
+       request.Analyze = &proto.AnalyzePlanRequest_GetStorageLevel_{
+               GetStorageLevel: &proto.AnalyzePlanRequest_GetStorageLevel{
+                       Relation: plan.GetRoot(),
                },
        }
        // Append the other items to the request.
@@ -245,14 +230,9 @@ func (s *sparkConnectClientImpl) GetStorageLevel(ctx 
context.Context, plan *prot
 }
 
 func (s *sparkConnectClientImpl) SparkVersion(ctx context.Context) (string, 
error) {
-       request := proto.AnalyzePlanRequest{
-               SessionId: s.sessionId,
-               Analyze: &proto.AnalyzePlanRequest_SparkVersion_{
-                       SparkVersion: &proto.AnalyzePlanRequest_SparkVersion{},
-               },
-               UserContext: &proto.UserContext{
-                       UserId: "na",
-               },
+       request := s.newAnalyzePlanStub()
+       request.Analyze = &proto.AnalyzePlanRequest_SparkVersion_{
+               SparkVersion: &proto.AnalyzePlanRequest_SparkVersion{},
        }
        // Append the other items to the request.
        ctx = metadata.NewOutgoingContext(ctx, s.metadata)
@@ -265,15 +245,10 @@ func (s *sparkConnectClientImpl) SparkVersion(ctx 
context.Context) (string, erro
 }
 
 func (s *sparkConnectClientImpl) DDLParse(ctx context.Context, sql string) 
(*types.StructType, error) {
-       request := proto.AnalyzePlanRequest{
-               SessionId: s.sessionId,
-               Analyze: &proto.AnalyzePlanRequest_DdlParse{
-                       DdlParse: &proto.AnalyzePlanRequest_DDLParse{
-                               DdlString: sql,
-                       },
-               },
-               UserContext: &proto.UserContext{
-                       UserId: "na",
+       request := s.newAnalyzePlanStub()
+       request.Analyze = &proto.AnalyzePlanRequest_DdlParse{
+               DdlParse: &proto.AnalyzePlanRequest_DDLParse{
+                       DdlString: sql,
                },
        }
        // Append the other items to the request.
@@ -287,16 +262,11 @@ func (s *sparkConnectClientImpl) DDLParse(ctx 
context.Context, sql string) (*typ
 }
 
 func (s *sparkConnectClientImpl) SameSemantics(ctx context.Context, plan1 
*proto.Plan, plan2 *proto.Plan) (bool, error) {
-       request := proto.AnalyzePlanRequest{
-               SessionId: s.sessionId,
-               Analyze: &proto.AnalyzePlanRequest_SameSemantics_{
-                       SameSemantics: &proto.AnalyzePlanRequest_SameSemantics{
-                               TargetPlan: plan1,
-                               OtherPlan:  plan2,
-                       },
-               },
-               UserContext: &proto.UserContext{
-                       UserId: "na",
+       request := s.newAnalyzePlanStub()
+       request.Analyze = &proto.AnalyzePlanRequest_SameSemantics_{
+               SameSemantics: &proto.AnalyzePlanRequest_SameSemantics{
+                       TargetPlan: plan1,
+                       OtherPlan:  plan2,
                },
        }
        // Append the other items to the request.
@@ -310,15 +280,10 @@ func (s *sparkConnectClientImpl) SameSemantics(ctx 
context.Context, plan1 *proto
 }
 
 func (s *sparkConnectClientImpl) SemanticHash(ctx context.Context, plan 
*proto.Plan) (int32, error) {
-       request := proto.AnalyzePlanRequest{
-               SessionId: s.sessionId,
-               Analyze: &proto.AnalyzePlanRequest_SemanticHash_{
-                       SemanticHash: &proto.AnalyzePlanRequest_SemanticHash{
-                               Plan: plan,
-                       },
-               },
-               UserContext: &proto.UserContext{
-                       UserId: "na",
+       request := s.newAnalyzePlanStub()
+       request.Analyze = &proto.AnalyzePlanRequest_SemanticHash_{
+               SemanticHash: &proto.AnalyzePlanRequest_SemanticHash{
+                       Plan: plan,
                },
        }
        // Append the other items to the request.
@@ -337,8 +302,9 @@ func (s *sparkConnectClientImpl) Config(ctx context.Context,
        request := &proto.ConfigRequest{
                Operation: operation,
                UserContext: &proto.UserContext{
-                       UserId: "na",
+                       UserId: s.opts.UserId,
                },
+               ClientType: &s.opts.UserAgent,
        }
        request.SessionId = s.sessionId
        resp, err := s.client.Config(ctx, request)
diff --git a/spark/client/options/options.go b/spark/client/options/options.go
index e88b802..35dbeca 100644
--- a/spark/client/options/options.go
+++ b/spark/client/options/options.go
@@ -17,8 +17,16 @@ package options
 
 type SparkClientOptions struct {
        ReattachExecution bool
+       UserAgent         string
+       UserId            string
 }
 
 var DefaultSparkClientOptions = SparkClientOptions{
        ReattachExecution: false,
 }
+
+func NewSparkClientOptions(reattach bool) SparkClientOptions {
+       return SparkClientOptions{
+               ReattachExecution: reattach,
+       }
+}
diff --git a/spark/sql/sparksession.go b/spark/sql/sparksession.go
index 4bf94cc..fbaf5a5 100644
--- a/spark/sql/sparksession.go
+++ b/spark/sql/sparksession.go
@@ -93,9 +93,15 @@ func (s *SparkSessionBuilder) Build(ctx context.Context) 
(SparkSession, error) {
        }
 
        sessionId := uuid.NewString()
+
+       // Update the options according to the configuration.
+       opts := 
options.NewSparkClientOptions(options.DefaultSparkClientOptions.ReattachExecution)
+       opts.UserAgent = s.channelBuilder.UserAgent()
+       opts.UserId = s.channelBuilder.User()
+
        return &sparkSessionImpl{
                sessionId: sessionId,
-               client:    client.NewSparkExecutor(conn, meta, sessionId, 
options.DefaultSparkClientOptions),
+               client:    client.NewSparkExecutor(conn, meta, sessionId, opts),
        }, nil
 }
 
diff --git a/spark/client/options/options.go b/spark/version.go
similarity index 78%
copy from spark/client/options/options.go
copy to spark/version.go
index e88b802..8386b5e 100644
--- a/spark/client/options/options.go
+++ b/spark/version.go
@@ -5,7 +5,7 @@
 // (the "License"); you may not use this file except in compliance with
 // the License.  You may obtain a copy of the License at
 //
-//     http://www.apache.org/licenses/LICENSE-2.0
+//    http://www.apache.org/licenses/LICENSE-2.0
 //
 // Unless required by applicable law or agreed to in writing, software
 // distributed under the License is distributed on an "AS IS" BASIS,
@@ -13,12 +13,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package options
+package spark
 
-type SparkClientOptions struct {
-       ReattachExecution bool
-}
-
-var DefaultSparkClientOptions = SparkClientOptions{
-       ReattachExecution: false,
+func Version() string {
+       return "3.5.x"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to