This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e2d7a4  [SPARK-43351] Add init Spark Connect Go files
2e2d7a4 is described below

commit 2e2d7a48f4a9626034ca11c691bcfc042d42e953
Author: Bo Yang <[email protected]>
AuthorDate: Fri Jun 2 20:15:35 2023 +0900

    [SPARK-43351] Add init Spark Connect Go files
    
    ### What changes were proposed in this pull request?
    
    This pull request is to add a small Spark Connect Go client example and 
prototype.
    
    JIRA: 
https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-43351
    
    ### Why are the changes needed?
    
    Spark Connect was released in Spark 3.4.0. There is no Go client yet. 
Better to have a Go client so Spark Connect could be used by Go programmer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. User will be able to use Go to write Spark Connect application. A very 
simple example in Go looks like following:
    ```
    func main() {
            remote := "localhost:15002"
            spark, _ := sql.SparkSession.Builder.Remote(remote).Build()
            defer spark.Stop()
    
            df, _ := spark.Sql("select 'apple' as word, 123 as count union all 
select 'orange' as word, 456 as count")
            df.Show(100, false)
    }
    ```
    
    ### How was this patch tested?
    
    Manually tested by running the example Go code.
    
    Closes #6 from hiboyang/bo-dev-01.
    
    Lead-authored-by: Bo Yang <[email protected]>
    Co-authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .gitignore                                        |   3 +
 README.md                                         |  68 ++++++
 client/sql/dataframe.go                           | 282 ++++++++++++++++++++++
 client/sql/dataframe_test.go                      |  57 +++++
 client/sql/datatype.go                            |  63 +++++
 client/sql/plan.go                                |  26 ++
 client/sql/row.go                                 |  35 +++
 client/sql/sparksession.go                        | 134 ++++++++++
 client/sql/structtype.go                          |  28 +++
 cmd/spark-connect-example-raw-grpc-client/main.go |  64 +++++
 cmd/spark-connect-example-spark-session/main.go   |  73 ++++++
 go.sum                                            |  31 +++
 12 files changed, 864 insertions(+)

diff --git a/.gitignore b/.gitignore
index 0bca2cf..f61e556 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,6 @@ internal/generated.out
 
 # Ignore Coverage Files
 coverage*
+
+# Ignore IDE files
+.idea/
diff --git a/README.md b/README.md
index b2be4d3..4422b9a 100644
--- a/README.md
+++ b/README.md
@@ -21,6 +21,74 @@ git submodule init --depth 1
 make gen && make check && make test
 ```
 
+## Spark Connect Go Application Example
+
+A very simple example in Go looks like following:
+
+```
+func main() {
+       remote := "localhost:15002"
+       spark, _ := sql.SparkSession.Builder.Remote(remote).Build()
+       defer spark.Stop()
+
+       df, _ := spark.Sql("select 'apple' as word, 123 as count union all 
select 'orange' as word, 456 as count")
+       df.Show(100, false)
+}
+```
+
+## High Level Design
+
+Following [diagram](https://textik.com/#ac299c8f32c4c342) shows main code in 
current prototype:
+
+```
+    +-------------------+                                                      
                        
+    |                   |                                                      
                        
+    |   dataFrameImpl   |                                                      
                        
+    |                   |                                                      
                        
+    +-------------------+                                                      
                        
+              |                                                                
                        
+              |                                                                
                        
+              +                                                                
                        
+    +-------------------+                                                      
                        
+    |                   |                                                      
                        
+    | sparkSessionImpl  |                                                      
                        
+    |                   |                                                      
                        
+    +-------------------+                                                      
                        
+              |                                                                
                        
+              |                                                                
                        
+              +                                                                
                        
++---------------------------+               +----------------+                 
                        
+|                           |               |                |                 
                        
+| SparkConnectServiceClient |--------------+|  Spark Driver  |                 
                        
+|                           |               |                |                 
                        
++---------------------------+               +----------------+
+
+```
+
+`SparkConnectServiceClient` is GRPC client which talks to Spark Driver. 
`sparkSessionImpl` generates `dataFrameImpl`
+instances. `dataFrameImpl` uses the GRPC client in `sparkSessionImpl` to 
communicate with Spark Driver.
+
+We will mimic the logic in Spark Connect Scala implementation, and adopt Go 
common practices, e.g. returning `error` object for
+error handling.
+
+## How to Run Spark Connect Go Application
+
+1. Install Golang: https://go.dev/doc/install.
+
+2. Download Spark distribution (3.4.0+), unzip the folder.
+
+3. Start Spark Connect server by running command:
+
+```
+sbin/start-connect-server.sh --packages 
org.apache.spark:spark-connect_2.12:3.4.0
+```
+
+4. In this repo, run Go application:
+
+```
+go run cmd/spark-connect-example-spark-session/main.go
+```
+
 ## Contributing
 
 Please review the [Contribution to Spark 
guide](https://spark.apache.org/contributing.html)
diff --git a/client/sql/dataframe.go b/client/sql/dataframe.go
new file mode 100644
index 0000000..64e7646
--- /dev/null
+++ b/client/sql/dataframe.go
@@ -0,0 +1,282 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+       "bytes"
+       "errors"
+       "fmt"
+       "github.com/apache/arrow/go/v12/arrow"
+       "github.com/apache/arrow/go/v12/arrow/array"
+       "github.com/apache/arrow/go/v12/arrow/ipc"
+       proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+       "io"
+)
+
+type DataFrame interface {
+       Show(numRows int, truncate bool) error
+       Schema() (*StructType, error)
+       Collect() ([]Row, error)
+}
+
+type dataFrameImpl struct {
+       sparkSession *sparkSessionImpl
+       relation     *proto.Relation // TODO change to proto.Plan?
+}
+
+func (df *dataFrameImpl) Show(numRows int, truncate bool) error {
+       truncateValue := 0
+       if truncate {
+               truncateValue = 20
+       }
+       vertical := false
+
+       plan := &proto.Plan{
+               OpType: &proto.Plan_Root{
+                       Root: &proto.Relation{
+                               Common: &proto.RelationCommon{
+                                       PlanId: newPlanId(),
+                               },
+                               RelType: &proto.Relation_ShowString{
+                                       ShowString: &proto.ShowString{
+                                               Input:    df.relation,
+                                               NumRows:  int32(numRows),
+                                               Truncate: int32(truncateValue),
+                                               Vertical: vertical,
+                                       },
+                               },
+                       },
+               },
+       }
+
+       responseClient, err := df.sparkSession.executePlan(plan)
+       if err != nil {
+               return fmt.Errorf("failed to show dataframe: %w", err)
+       }
+
+       for {
+               response, err := responseClient.Recv()
+               if err != nil {
+                       return fmt.Errorf("failed to receive show response: 
%w", err)
+               }
+               arrowBatch := response.GetArrowBatch()
+               if arrowBatch == nil {
+                       continue
+               }
+               err = showArrowBatch(arrowBatch)
+               if err != nil {
+                       return err
+               }
+               return nil
+       }
+
+       return fmt.Errorf("did not get arrow batch in response")
+}
+
+func (df *dataFrameImpl) Schema() (*StructType, error) {
+       response, err := df.sparkSession.analyzePlan(df.createPlan())
+       if err != nil {
+               return nil, fmt.Errorf("failed to analyze plan: %w", err)
+       }
+
+       responseSchema := response.GetSchema().Schema
+       result := convertProtoDataTypeToStructType(responseSchema)
+       return result, nil
+}
+
+func (df *dataFrameImpl) Collect() ([]Row, error) {
+       responseClient, err := df.sparkSession.executePlan(df.createPlan())
+       if err != nil {
+               return nil, fmt.Errorf("failed to execute plan: %w", err)
+       }
+
+       var schema *StructType
+       var allRows []Row
+
+       for {
+               response, err := responseClient.Recv()
+               if err != nil {
+                       if errors.Is(err, io.EOF) {
+                               return allRows, nil
+                       } else {
+                               return nil, fmt.Errorf("failed to receive plan 
execution response: %w", err)
+                       }
+               }
+
+               dataType := response.GetSchema()
+               if dataType != nil {
+                       schema = convertProtoDataTypeToStructType(dataType)
+                       continue
+               }
+
+               arrowBatch := response.GetArrowBatch()
+               if arrowBatch == nil {
+                       continue
+               }
+
+               rowBatch, err := readArrowBatchData(arrowBatch.Data, schema)
+               if err != nil {
+                       return nil, err
+               }
+
+               if allRows == nil {
+                       allRows = make([]Row, 0, len(rowBatch))
+               }
+               allRows = append(allRows, rowBatch...)
+       }
+
+       return allRows, nil
+}
+
+func (df *dataFrameImpl) createPlan() *proto.Plan {
+       return &proto.Plan{
+               OpType: &proto.Plan_Root{
+                       Root: &proto.Relation{
+                               Common: &proto.RelationCommon{
+                                       PlanId: newPlanId(),
+                               },
+                               RelType: df.relation.RelType,
+                       },
+               },
+       }
+}
+
+func showArrowBatch(arrowBatch *proto.ExecutePlanResponse_ArrowBatch) error {
+       return showArrowBatchData(arrowBatch.Data)
+}
+
+func showArrowBatchData(data []byte) error {
+       rows, err := readArrowBatchData(data, nil)
+       if err != nil {
+               return err
+       }
+       for _, row := range rows {
+               values, err := row.Values()
+               if err != nil {
+                       return fmt.Errorf("failed to get values in the row: 
%w", err)
+               }
+               fmt.Println(values...)
+       }
+       return nil
+}
+
+func readArrowBatchData(data []byte, schema *StructType) ([]Row, error) {
+       reader := bytes.NewReader(data)
+       arrowReader, err := ipc.NewReader(reader)
+       if err != nil {
+               return nil, fmt.Errorf("failed to create arrow reader: %w", err)
+       }
+       defer arrowReader.Release()
+
+       var rows []Row
+
+       for {
+               record, err := arrowReader.Read()
+               if err != nil {
+                       if errors.Is(err, io.EOF) {
+                               return rows, nil
+                       } else {
+                               return nil, fmt.Errorf("failed to read arrow: 
%w", err)
+                       }
+               }
+               numColumns := len(arrowReader.Schema().Fields())
+               numRows := int(record.NumRows())
+               if rows == nil {
+                       rows = make([]Row, 0, numRows)
+               }
+               values := make([][]any, numRows)
+               for i := range values {
+                       values[i] = make([]any, numColumns)
+               }
+               for columnIndex := 0; columnIndex < numColumns; columnIndex++ {
+                       columnData := record.Column(columnIndex).Data()
+                       dataTypeId := columnData.DataType().ID()
+                       switch dataTypeId {
+                       case arrow.STRING:
+                               vector := array.NewStringData(columnData)
+                               for rowIndex := 0; rowIndex < numRows; 
rowIndex++ {
+                                       values[rowIndex][columnIndex] = 
vector.Value(rowIndex)
+                               }
+                       case arrow.INT32:
+                               vector := array.NewInt32Data(columnData)
+                               for rowIndex := 0; rowIndex < numRows; 
rowIndex++ {
+                                       values[rowIndex][columnIndex] = 
vector.Value(rowIndex)
+                               }
+                       case arrow.INT64:
+                               vector := array.NewInt64Data(columnData)
+                               for rowIndex := 0; rowIndex < numRows; 
rowIndex++ {
+                                       values[rowIndex][columnIndex] = 
vector.Value(rowIndex)
+                               }
+                       default:
+                               return nil, fmt.Errorf("unsupported arrow data 
type %s in column %d", dataTypeId.String(), columnIndex)
+                       }
+               }
+
+               for _, v := range values {
+                       row := &GenericRowWithSchema{
+                               schema: schema,
+                               values: v,
+                       }
+                       rows = append(rows, row)
+               }
+
+               hasNext := arrowReader.Next()
+               if !hasNext {
+                       break
+               }
+       }
+
+       return rows, nil
+}
+
+func convertProtoDataTypeToStructType(input *proto.DataType) *StructType {
+       dataTypeStruct := input.GetStruct()
+       if dataTypeStruct == nil {
+               panic("dataType.GetStruct() is nil")
+       }
+       return &StructType{
+               Fields: convertProtoStructFields(dataTypeStruct.Fields),
+       }
+}
+
+func convertProtoStructFields(input []*proto.DataType_StructField) 
[]StructField {
+       result := make([]StructField, len(input))
+       for i, f := range input {
+               result[i] = convertProtoStructField(f)
+       }
+       return result
+}
+
+func convertProtoStructField(field *proto.DataType_StructField) StructField {
+       return StructField{
+               Name:     field.Name,
+               DataType: convertProtoDataTypeToDataType(field.DataType),
+       }
+}
+
+func convertProtoDataTypeToDataType(input *proto.DataType) DataType {
+       switch v := input.GetKind().(type) {
+       case *proto.DataType_Integer_:
+               return IntegerType{}
+       case *proto.DataType_String_:
+               return StringType{}
+       default:
+               return UnsupportedType{
+                       TypeInfo: v,
+               }
+       }
+}
diff --git a/client/sql/dataframe_test.go b/client/sql/dataframe_test.go
new file mode 100644
index 0000000..d8c3c80
--- /dev/null
+++ b/client/sql/dataframe_test.go
@@ -0,0 +1,57 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+       "bytes"
+       "github.com/apache/arrow/go/v12/arrow"
+       "github.com/apache/arrow/go/v12/arrow/array"
+       "github.com/apache/arrow/go/v12/arrow/ipc"
+       "github.com/apache/arrow/go/v12/arrow/memory"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "testing"
+)
+
+func TestShowArrowBatchData(t *testing.T) {
+       arrowFields := []arrow.Field{
+               {
+                       Name: "show_string",
+                       Type: &arrow.StringType{},
+               },
+       }
+       arrowSchema := arrow.NewSchema(arrowFields, nil)
+       var buf bytes.Buffer
+       arrowWriter := ipc.NewWriter(&buf, ipc.WithSchema(arrowSchema))
+       defer arrowWriter.Close()
+
+       alloc := memory.NewGoAllocator()
+       recordBuilder := array.NewRecordBuilder(alloc, arrowSchema)
+       defer recordBuilder.Release()
+
+       recordBuilder.Field(0).(*array.StringBuilder).Append("str1a\nstr1b")
+       recordBuilder.Field(0).(*array.StringBuilder).Append("str2")
+
+       record := recordBuilder.NewRecord()
+       defer record.Release()
+
+       err := arrowWriter.Write(record)
+       require.Nil(t, err)
+
+       err = showArrowBatchData(buf.Bytes())
+       assert.Nil(t, err)
+}
diff --git a/client/sql/datatype.go b/client/sql/datatype.go
new file mode 100644
index 0000000..05ab02e
--- /dev/null
+++ b/client/sql/datatype.go
@@ -0,0 +1,63 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+       "reflect"
+       "strings"
+)
+
+type DataType interface {
+       TypeName() string
+}
+
+type IntegerType struct {
+}
+
+func (t IntegerType) TypeName() string {
+       return getDataTypeName(t)
+}
+
+type StringType struct {
+}
+
+func (t StringType) TypeName() string {
+       return getDataTypeName(t)
+}
+
+type UnsupportedType struct {
+       TypeInfo any
+}
+
+func (t UnsupportedType) TypeName() string {
+       return getDataTypeName(t)
+}
+
+func getDataTypeName(dataType DataType) string {
+       t := reflect.TypeOf(dataType)
+       if t == nil {
+               return "(nil)"
+       }
+       var name string
+       if t.Kind() == reflect.Ptr {
+               name = t.Elem().Name()
+       } else {
+               name = t.Name()
+       }
+       name = strings.TrimSuffix(name, "Type")
+       return name
+}
diff --git a/client/sql/plan.go b/client/sql/plan.go
new file mode 100644
index 0000000..66b9e05
--- /dev/null
+++ b/client/sql/plan.go
@@ -0,0 +1,26 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import "sync/atomic"
+
+var atomicInt64 atomic.Int64
+
+func newPlanId() *int64 {
+       v := atomicInt64.Add(1)
+       return &v
+}
diff --git a/client/sql/row.go b/client/sql/row.go
new file mode 100644
index 0000000..3bee2ac
--- /dev/null
+++ b/client/sql/row.go
@@ -0,0 +1,35 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+type Row interface {
+       Schema() (*StructType, error)
+       Values() ([]any, error)
+}
+
+type GenericRowWithSchema struct {
+       values []any
+       schema *StructType
+}
+
+func (r *GenericRowWithSchema) Schema() (*StructType, error) {
+       return r.schema, nil
+}
+
+func (r *GenericRowWithSchema) Values() ([]any, error) {
+       return r.values, nil
+}
diff --git a/client/sql/sparksession.go b/client/sql/sparksession.go
new file mode 100644
index 0000000..86f4cd1
--- /dev/null
+++ b/client/sql/sparksession.go
@@ -0,0 +1,134 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+       "context"
+       "fmt"
+       proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+       "github.com/google/uuid"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+)
+
+var SparkSession sparkSessionBuilderEntrypoint
+
+type sparkSession interface {
+       Sql(query string) (DataFrame, error)
+       Stop() error
+}
+
+type sparkSessionBuilderEntrypoint struct {
+       Builder SparkSessionBuilder
+}
+
+type SparkSessionBuilder struct {
+       connectionString string
+}
+
+func (s SparkSessionBuilder) Remote(connectionString string) 
SparkSessionBuilder {
+       copy := s
+       copy.connectionString = connectionString
+       return copy
+}
+
+func (s SparkSessionBuilder) Build() (sparkSession, error) {
+       opts := []grpc.DialOption{
+               grpc.WithTransportCredentials(insecure.NewCredentials()),
+       }
+
+       conn, err := grpc.Dial(s.connectionString, opts...)
+       if err != nil {
+               return nil, fmt.Errorf("failed to connect to remote %s: %w", 
s.connectionString, err)
+       }
+
+       client := proto.NewSparkConnectServiceClient(conn)
+       return &sparkSessionImpl{
+               sessionId: uuid.NewString(),
+               client:    client,
+       }, nil
+}
+
+type sparkSessionImpl struct {
+       sessionId string
+       client    proto.SparkConnectServiceClient
+}
+
+func (s *sparkSessionImpl) Sql(query string) (DataFrame, error) {
+       plan := &proto.Plan{
+               OpType: &proto.Plan_Command{
+                       Command: &proto.Command{
+                               CommandType: &proto.Command_SqlCommand{
+                                       SqlCommand: &proto.SqlCommand{
+                                               Sql: query,
+                                       },
+                               },
+                       },
+               },
+       }
+       responseClient, err := s.executePlan(plan)
+       if err != nil {
+               return nil, fmt.Errorf("failed to execute sql: %s: %w", query, 
err)
+       }
+       for {
+               response, err := responseClient.Recv()
+               if err != nil {
+                       return nil, fmt.Errorf("failed to receive ExecutePlan 
response: %w", err)
+               }
+               sqlCommandResult := response.GetSqlCommandResult()
+               if sqlCommandResult == nil {
+                       continue
+               }
+               return &dataFrameImpl{
+                       sparkSession: s,
+                       relation:     sqlCommandResult.GetRelation(),
+               }, nil
+       }
+       return nil, fmt.Errorf("failed to get SqlCommandResult in ExecutePlan 
response")
+}
+
+func (s *sparkSessionImpl) Stop() error {
+       return nil
+}
+
+func (s *sparkSessionImpl) executePlan(plan *proto.Plan) 
(proto.SparkConnectService_ExecutePlanClient, error) {
+       request := proto.ExecutePlanRequest{
+               SessionId: s.sessionId,
+               Plan:      plan,
+       }
+       executePlanClient, err := s.client.ExecutePlan(context.TODO(), &request)
+       if err != nil {
+               return nil, fmt.Errorf("failed to call ExecutePlan in session 
%s: %w", s.sessionId, err)
+       }
+       return executePlanClient, nil
+}
+
+func (s *sparkSessionImpl) analyzePlan(plan *proto.Plan) 
(*proto.AnalyzePlanResponse, error) {
+       request := proto.AnalyzePlanRequest{
+               SessionId: s.sessionId,
+               Analyze: &proto.AnalyzePlanRequest_Schema_{
+                       Schema: &proto.AnalyzePlanRequest_Schema{
+                               Plan: plan,
+                       },
+               },
+       }
+       response, err := s.client.AnalyzePlan(context.TODO(), &request)
+       if err != nil {
+               return nil, fmt.Errorf("failed to call AnalyzePlan in session 
%s: %w", s.sessionId, err)
+       }
+       return response, nil
+}
diff --git a/client/sql/structtype.go b/client/sql/structtype.go
new file mode 100644
index 0000000..2a59c30
--- /dev/null
+++ b/client/sql/structtype.go
@@ -0,0 +1,28 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+type StructField struct {
+       Name     string
+       DataType DataType
+       Nullable bool // default should be true
+}
+
+type StructType struct {
+       TypeName string
+       Fields   []StructField
+}
diff --git a/cmd/spark-connect-example-raw-grpc-client/main.go 
b/cmd/spark-connect-example-raw-grpc-client/main.go
new file mode 100644
index 0000000..8398292
--- /dev/null
+++ b/cmd/spark-connect-example-raw-grpc-client/main.go
@@ -0,0 +1,64 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "context"
+       "flag"
+       proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
+       "github.com/google/uuid"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       "log"
+       "time"
+)
+
+var (
+       remote = flag.String("remote", "localhost:15002", "the remote address 
of Spark Connect server to connect to")
+)
+
+func main() {
+       opts := []grpc.DialOption{
+               grpc.WithTransportCredentials(insecure.NewCredentials()),
+       }
+
+       conn, err := grpc.Dial(*remote, opts...)
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+       defer conn.Close()
+
+       client := proto.NewSparkConnectServiceClient(conn)
+
+       ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+       defer cancel()
+
+       configRequest := proto.ConfigRequest{
+               SessionId: uuid.NewString(),
+               Operation: &proto.ConfigRequest_Operation{
+                       OpType: &proto.ConfigRequest_Operation_GetAll{
+                               GetAll: &proto.ConfigRequest_GetAll{},
+                       },
+               },
+       }
+       configResponse, err := client.Config(ctx, &configRequest)
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       log.Printf("configResponse: %v", configResponse)
+}
diff --git a/cmd/spark-connect-example-spark-session/main.go 
b/cmd/spark-connect-example-spark-session/main.go
new file mode 100644
index 0000000..0f4a6cc
--- /dev/null
+++ b/cmd/spark-connect-example-spark-session/main.go
@@ -0,0 +1,73 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+       "flag"
+       "github.com/apache/spark-connect-go/v_3_4/client/sql"
+       "log"
+)
+
+var (
+       remote = flag.String("remote", "localhost:15002",
+               "the remote address of Spark Connect server to connect to")
+)
+
+func main() {
+       spark, err := sql.SparkSession.Builder.Remote(*remote).Build()
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+       defer spark.Stop()
+
+       df, err := spark.Sql("select 'apple' as word, 123 as count union all 
select 'orange' as word, 456 as count")
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       err = df.Show(100, false)
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       schema, err := df.Schema()
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       for _, f := range schema.Fields {
+               log.Printf("Field in dataframe schema: %s - %s", f.Name, 
f.DataType.TypeName())
+       }
+
+       rows, err := df.Collect()
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       schema, err = rows[0].Schema()
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       for _, f := range schema.Fields {
+               log.Printf("Field in row: %s - %s", f.Name, 
f.DataType.TypeName())
+       }
+
+       for _, row := range rows {
+               log.Printf("Row: %v", row)
+       }
+}
diff --git a/go.sum b/go.sum
index 7fb61d6..f8c434e 100644
--- a/go.sum
+++ b/go.sum
@@ -1,40 +1,66 @@
+github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c 
h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
+github.com/andybalholm/brotli v1.0.4 
h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY=
 github.com/andybalholm/brotli v1.0.4/go.mod 
h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
+github.com/apache/arrow/go/v12 v12.0.0 
h1:xtZE63VWl7qLdB0JObIXvvhGjoVNrQ9ciIHG2OK5cmc=
 github.com/apache/arrow/go/v12 v12.0.0/go.mod 
h1:d+tV/eHZZ7Dz7RPrFKtPK02tpr+c9/PEd/zm8mDS9Vg=
+github.com/apache/thrift v0.16.0 
h1:qEy6UW60iVOlUy+b9ZR0d5WzUWYGOo4HfopoyBaNmoY=
 github.com/apache/thrift v0.16.0/go.mod 
h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU=
 github.com/creack/pty v1.1.9/go.mod 
h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
 github.com/davecgh/go-spew v1.1.0/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod 
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/goccy/go-json v0.9.11 
h1:/pAaQDLHEoCq/5FFmSKBswWmK6H0e8g4159Kc/X/nqk=
 github.com/goccy/go-json v0.9.11/go.mod 
h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
 github.com/golang/mock v1.5.0/go.mod 
h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
 github.com/golang/protobuf v1.5.0/go.mod 
h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
 github.com/golang/protobuf v1.5.2 
h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
 github.com/golang/protobuf v1.5.2/go.mod 
h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
 github.com/golang/snappy v0.0.4/go.mod 
h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/google/flatbuffers v2.0.8+incompatible 
h1:ivUb1cGomAB101ZM1T0nOiWz9pSrTMoa9+EiY7igmkM=
 github.com/google/flatbuffers v2.0.8+incompatible/go.mod 
h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
 github.com/google/go-cmp v0.5.5/go.mod 
h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
 github.com/google/uuid v1.3.0/go.mod 
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/klauspost/asmfmt v1.3.2 
h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
 github.com/klauspost/asmfmt v1.3.2/go.mod 
h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
+github.com/klauspost/compress v1.15.9 
h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
 github.com/klauspost/compress v1.15.9/go.mod 
h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
+github.com/klauspost/cpuid/v2 v2.0.9 
h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
 github.com/klauspost/cpuid/v2 v2.0.9/go.mod 
h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
+github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
+github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod 
h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 
h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs=
 github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod 
h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
+github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 
h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
 github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod 
h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
+github.com/pierrec/lz4/v4 v4.1.15 
h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
 github.com/pierrec/lz4/v4 v4.1.15/go.mod 
h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
+github.com/pmezard/go-difflib v1.0.0 
h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod 
h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/rogpeppe/go-internal v1.9.0 
h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
 github.com/stretchr/objx v0.1.0/go.mod 
h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.4.0/go.mod 
h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
 github.com/stretchr/testify v1.7.1/go.mod 
h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
+github.com/stretchr/testify v1.8.0 
h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
 github.com/stretchr/testify v1.8.0/go.mod 
h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
+github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
 github.com/zeebo/xxh3 v1.0.2/go.mod 
h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod 
h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod 
h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/exp v0.0.0-20220827204233-334a2380cb91 
h1:tnebWN09GYg9OLPss1KXj8txwZc6X6uMr6VFdcGNbHw=
 golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
+golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
 golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod 
h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod 
h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ=
 golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod 
h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod 
h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod 
h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
@@ -43,11 +69,14 @@ golang.org/x/text v0.3.0/go.mod 
h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
 golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod 
h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
 golang.org/x/tools v0.6.0/go.mod 
h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod 
h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f 
h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0=
 golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod 
h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
+gonum.org/v1/gonum v0.11.0 h1:f1IJhK4Km5tBJmaiJXtk/PkL4cdVX6J+tGiM187uT5E=
 google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f 
h1:BWUVssLB0HVOSY78gIdvk1dTVYtT1y8SBWtPYuTJ/6w=
 google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f/go.mod 
h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM=
 google.golang.org/grpc v1.54.0 h1:EhTqbhiYeixwWQtAEZAxmV9MGqcjEU2mFx52xCzNyag=
@@ -57,5 +86,7 @@ google.golang.org/protobuf v1.26.0/go.mod 
h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
 google.golang.org/protobuf v1.30.0 
h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng=
 google.golang.org/protobuf v1.30.0/go.mod 
h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod 
h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c 
h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod 
h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to