This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git


The following commit(s) were added to refs/heads/master by this push:
     new e9001d2  [SPARK-43351] Support more data types when reading from spark 
connect arrow dataset to data frame; Also implement CreateTempView
e9001d2 is described below

commit e9001d2edbc2dd9ba83b7e721d79103bbc3bc598
Author: hiboyang <[email protected]>
AuthorDate: Tue Jun 27 09:46:20 2023 -0700

    [SPARK-43351] Support more data types when reading from spark connect arrow 
dataset to data frame; Also implement CreateTempView
    
    ### What changes were proposed in this pull request?
    
    Support more data types when reading from spark connect arrow dataset to 
data frame; Also implement CreateTempView
    
    ### Why are the changes needed?
    
    Support more data types when reading from spark connect arrow dataset to 
data frame; Also implement CreateTempView
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, able to create temp view, e.g.
    ```
    dataframe.CreateTempView(...)
    ```
    
    ### How was this patch tested?
    
    Unit test, and also manual test by running example code
    
    Closes #11 from hiboyang/bo-dev-03.
    
    Authored-by: hiboyang <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 client/sql/dataframe.go                         | 184 ++++++++++++++---
 client/sql/dataframe_test.go                    | 249 ++++++++++++++++++++++++
 client/sql/datatype.go                          |  77 ++++++++
 cmd/spark-connect-example-spark-session/main.go |  15 ++
 4 files changed, 497 insertions(+), 28 deletions(-)

diff --git a/client/sql/dataframe.go b/client/sql/dataframe.go
index eb1718a..f2a0747 100644
--- a/client/sql/dataframe.go
+++ b/client/sql/dataframe.go
@@ -37,6 +37,8 @@ type DataFrame interface {
        Collect() ([]Row, error)
        // Write returns a data frame writer, which could be used to save data 
frame to supported storage.
        Write() DataFrameWriter
+       // CreateTempView creates or replaces a temporary view.
+       CreateTempView(viewName string, replace bool, global bool) error
 }
 
 // dataFrameImpl is an implementation of DataFrame interface.
@@ -157,6 +159,30 @@ func (df *dataFrameImpl) Write() DataFrameWriter {
        return &writer
 }
 
+func (df *dataFrameImpl) CreateTempView(viewName string, replace bool, global 
bool) error {
+       plan := &proto.Plan{
+               OpType: &proto.Plan_Command{
+                       Command: &proto.Command{
+                               CommandType: &proto.Command_CreateDataframeView{
+                                       CreateDataframeView: 
&proto.CreateDataFrameViewCommand{
+                                               Input:    df.relation,
+                                               Name:     viewName,
+                                               Replace:  replace,
+                                               IsGlobal: global,
+                                       },
+                               },
+                       },
+               },
+       }
+
+       responseClient, err := df.sparkSession.executePlan(plan)
+       if err != nil {
+               return fmt.Errorf("failed to create temp view %s: %w", 
viewName, err)
+       }
+
+       return consumeExecutePlanClient(responseClient)
+}
+
 func (df *dataFrameImpl) createPlan() *proto.Plan {
        return &proto.Plan{
                OpType: &proto.Plan_Root{
@@ -208,38 +234,16 @@ func readArrowBatchData(data []byte, schema *StructType) 
([]Row, error) {
                                return nil, fmt.Errorf("failed to read arrow: 
%w", err)
                        }
                }
-               numColumns := len(arrowReader.Schema().Fields())
+
+               values, err := readArrowRecord(record)
+               if err != nil {
+                       return nil, err
+               }
+
                numRows := int(record.NumRows())
                if rows == nil {
                        rows = make([]Row, 0, numRows)
                }
-               values := make([][]any, numRows)
-               for i := range values {
-                       values[i] = make([]any, numColumns)
-               }
-               for columnIndex := 0; columnIndex < numColumns; columnIndex++ {
-                       columnData := record.Column(columnIndex).Data()
-                       dataTypeId := columnData.DataType().ID()
-                       switch dataTypeId {
-                       case arrow.STRING:
-                               vector := array.NewStringData(columnData)
-                               for rowIndex := 0; rowIndex < numRows; 
rowIndex++ {
-                                       values[rowIndex][columnIndex] = 
vector.Value(rowIndex)
-                               }
-                       case arrow.INT32:
-                               vector := array.NewInt32Data(columnData)
-                               for rowIndex := 0; rowIndex < numRows; 
rowIndex++ {
-                                       values[rowIndex][columnIndex] = 
vector.Value(rowIndex)
-                               }
-                       case arrow.INT64:
-                               vector := array.NewInt64Data(columnData)
-                               for rowIndex := 0; rowIndex < numRows; 
rowIndex++ {
-                                       values[rowIndex][columnIndex] = 
vector.Value(rowIndex)
-                               }
-                       default:
-                               return nil, fmt.Errorf("unsupported arrow data 
type %s in column %d", dataTypeId.String(), columnIndex)
-                       }
-               }
 
                for _, v := range values {
                        row := &GenericRowWithSchema{
@@ -258,6 +262,107 @@ func readArrowBatchData(data []byte, schema *StructType) 
([]Row, error) {
        return rows, nil
 }
 
+// readArrowRecordColumn reads all values from arrow record and return [][]any
+func readArrowRecord(record arrow.Record) ([][]any, error) {
+       numRows := record.NumRows()
+       numColumns := int(record.NumCols())
+
+       values := make([][]any, numRows)
+       for i := range values {
+               values[i] = make([]any, numColumns)
+       }
+
+       for columnIndex := 0; columnIndex < numColumns; columnIndex++ {
+               err := readArrowRecordColumn(record, columnIndex, values)
+               if err != nil {
+                       return nil, err
+               }
+       }
+       return values, nil
+}
+
+// readArrowRecordColumn reads all values in a column and stores them in values
+func readArrowRecordColumn(record arrow.Record, columnIndex int, values 
[][]any) error {
+       numRows := int(record.NumRows())
+       columnData := record.Column(columnIndex).Data()
+       dataTypeId := columnData.DataType().ID()
+       switch dataTypeId {
+       case arrow.BOOL:
+               vector := array.NewBooleanData(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.INT8:
+               vector := array.NewInt8Data(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.INT16:
+               vector := array.NewInt16Data(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.INT32:
+               vector := array.NewInt32Data(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.INT64:
+               vector := array.NewInt64Data(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.FLOAT16:
+               vector := array.NewFloat16Data(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.FLOAT32:
+               vector := array.NewFloat32Data(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.FLOAT64:
+               vector := array.NewFloat64Data(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.DECIMAL | arrow.DECIMAL128:
+               vector := array.NewDecimal128Data(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.DECIMAL256:
+               vector := array.NewDecimal256Data(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.STRING:
+               vector := array.NewStringData(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.BINARY:
+               vector := array.NewBinaryData(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.TIMESTAMP:
+               vector := array.NewTimestampData(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       case arrow.DATE64:
+               vector := array.NewDate64Data(columnData)
+               for rowIndex := 0; rowIndex < numRows; rowIndex++ {
+                       values[rowIndex][columnIndex] = vector.Value(rowIndex)
+               }
+       default:
+               return fmt.Errorf("unsupported arrow data type %s in column 
%d", dataTypeId.String(), columnIndex)
+       }
+       return nil
+}
+
 func convertProtoDataTypeToStructType(input *proto.DataType) *StructType {
        dataTypeStruct := input.GetStruct()
        if dataTypeStruct == nil {
@@ -283,12 +388,35 @@ func convertProtoStructField(field 
*proto.DataType_StructField) StructField {
        }
 }
 
+// convertProtoDataTypeToDataType converts protobuf data type to Spark connect 
sql data type
 func convertProtoDataTypeToDataType(input *proto.DataType) DataType {
        switch v := input.GetKind().(type) {
+       case *proto.DataType_Boolean_:
+               return BooleanType{}
+       case *proto.DataType_Byte_:
+               return ByteType{}
+       case *proto.DataType_Short_:
+               return ShortType{}
        case *proto.DataType_Integer_:
                return IntegerType{}
+       case *proto.DataType_Long_:
+               return LongType{}
+       case *proto.DataType_Float_:
+               return FloatType{}
+       case *proto.DataType_Double_:
+               return DoubleType{}
+       case *proto.DataType_Decimal_:
+               return DecimalType{}
        case *proto.DataType_String_:
                return StringType{}
+       case *proto.DataType_Binary_:
+               return BinaryType{}
+       case *proto.DataType_Timestamp_:
+               return TimestampType{}
+       case *proto.DataType_TimestampNtz:
+               return TimestampNtzType{}
+       case *proto.DataType_Date_:
+               return DateType{}
        default:
                return UnsupportedType{
                        TypeInfo: v,
diff --git a/client/sql/dataframe_test.go b/client/sql/dataframe_test.go
index d8c3c80..7775b74 100644
--- a/client/sql/dataframe_test.go
+++ b/client/sql/dataframe_test.go
@@ -20,8 +20,12 @@ import (
        "bytes"
        "github.com/apache/arrow/go/v12/arrow"
        "github.com/apache/arrow/go/v12/arrow/array"
+       "github.com/apache/arrow/go/v12/arrow/decimal128"
+       "github.com/apache/arrow/go/v12/arrow/decimal256"
+       "github.com/apache/arrow/go/v12/arrow/float16"
        "github.com/apache/arrow/go/v12/arrow/ipc"
        "github.com/apache/arrow/go/v12/arrow/memory"
+       proto "github.com/apache/spark-connect-go/v_3_4/internal/generated"
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
        "testing"
@@ -55,3 +59,248 @@ func TestShowArrowBatchData(t *testing.T) {
        err = showArrowBatchData(buf.Bytes())
        assert.Nil(t, err)
 }
+
+func TestReadArrowRecord(t *testing.T) {
+       arrowFields := []arrow.Field{
+               {
+                       Name: "boolean_column",
+                       Type: &arrow.BooleanType{},
+               },
+               {
+                       Name: "int8_column",
+                       Type: &arrow.Int8Type{},
+               },
+               {
+                       Name: "int16_column",
+                       Type: &arrow.Int16Type{},
+               },
+               {
+                       Name: "int32_column",
+                       Type: &arrow.Int32Type{},
+               },
+               {
+                       Name: "int64_column",
+                       Type: &arrow.Int64Type{},
+               },
+               {
+                       Name: "float16_column",
+                       Type: &arrow.Float16Type{},
+               },
+               {
+                       Name: "float32_column",
+                       Type: &arrow.Float32Type{},
+               },
+               {
+                       Name: "float64_column",
+                       Type: &arrow.Float64Type{},
+               },
+               {
+                       Name: "decimal128_column",
+                       Type: &arrow.Decimal128Type{},
+               },
+               {
+                       Name: "decimal256_column",
+                       Type: &arrow.Decimal256Type{},
+               },
+               {
+                       Name: "string_column",
+                       Type: &arrow.StringType{},
+               },
+               {
+                       Name: "binary_column",
+                       Type: &arrow.BinaryType{},
+               },
+               {
+                       Name: "timestamp_column",
+                       Type: &arrow.TimestampType{},
+               },
+               {
+                       Name: "date64_column",
+                       Type: &arrow.Date64Type{},
+               },
+       }
+       arrowSchema := arrow.NewSchema(arrowFields, nil)
+       var buf bytes.Buffer
+       arrowWriter := ipc.NewWriter(&buf, ipc.WithSchema(arrowSchema))
+       defer arrowWriter.Close()
+
+       alloc := memory.NewGoAllocator()
+       recordBuilder := array.NewRecordBuilder(alloc, arrowSchema)
+       defer recordBuilder.Release()
+
+       i := 0
+       recordBuilder.Field(i).(*array.BooleanBuilder).Append(false)
+       recordBuilder.Field(i).(*array.BooleanBuilder).Append(true)
+
+       i++
+       recordBuilder.Field(i).(*array.Int8Builder).Append(1)
+       recordBuilder.Field(i).(*array.Int8Builder).Append(2)
+
+       i++
+       recordBuilder.Field(i).(*array.Int16Builder).Append(10)
+       recordBuilder.Field(i).(*array.Int16Builder).Append(20)
+
+       i++
+       recordBuilder.Field(i).(*array.Int32Builder).Append(100)
+       recordBuilder.Field(i).(*array.Int32Builder).Append(200)
+
+       i++
+       recordBuilder.Field(i).(*array.Int64Builder).Append(1000)
+       recordBuilder.Field(i).(*array.Int64Builder).Append(2000)
+
+       i++
+       
recordBuilder.Field(i).(*array.Float16Builder).Append(float16.New(10000.1))
+       
recordBuilder.Field(i).(*array.Float16Builder).Append(float16.New(20000.1))
+
+       i++
+       recordBuilder.Field(i).(*array.Float32Builder).Append(100000.1)
+       recordBuilder.Field(i).(*array.Float32Builder).Append(200000.1)
+
+       i++
+       recordBuilder.Field(i).(*array.Float64Builder).Append(1000000.1)
+       recordBuilder.Field(i).(*array.Float64Builder).Append(2000000.1)
+
+       i++
+       
recordBuilder.Field(i).(*array.Decimal128Builder).Append(decimal128.FromI64(10000000))
+       
recordBuilder.Field(i).(*array.Decimal128Builder).Append(decimal128.FromI64(20000000))
+
+       i++
+       
recordBuilder.Field(i).(*array.Decimal256Builder).Append(decimal256.FromI64(100000000))
+       
recordBuilder.Field(i).(*array.Decimal256Builder).Append(decimal256.FromI64(200000000))
+
+       i++
+       recordBuilder.Field(i).(*array.StringBuilder).Append("str1")
+       recordBuilder.Field(i).(*array.StringBuilder).Append("str2")
+
+       i++
+       recordBuilder.Field(i).(*array.BinaryBuilder).Append([]byte("bytes1"))
+       recordBuilder.Field(i).(*array.BinaryBuilder).Append([]byte("bytes2"))
+
+       i++
+       
recordBuilder.Field(i).(*array.TimestampBuilder).Append(arrow.Timestamp(1686981953115000))
+       
recordBuilder.Field(i).(*array.TimestampBuilder).Append(arrow.Timestamp(1686981953116000))
+
+       i++
+       
recordBuilder.Field(i).(*array.Date64Builder).Append(arrow.Date64(1686981953117000))
+       
recordBuilder.Field(i).(*array.Date64Builder).Append(arrow.Date64(1686981953118000))
+
+       record := recordBuilder.NewRecord()
+       defer record.Release()
+
+       values, err := readArrowRecord(record)
+       require.Nil(t, err)
+       assert.Equal(t, 2, len(values))
+       assert.Equal(t, []any{
+               false, int8(1), int16(10), int32(100), int64(1000),
+               float16.New(10000.1), float32(100000.1), 1000000.1,
+               decimal128.FromI64(10000000), decimal256.FromI64(100000000),
+               "str1", []byte("bytes1"),
+               arrow.Timestamp(1686981953115000), 
arrow.Date64(1686981953117000)},
+               values[0])
+       assert.Equal(t, []any{
+               true, int8(2), int16(20), int32(200), int64(2000),
+               float16.New(20000.1), float32(200000.1), 2000000.1,
+               decimal128.FromI64(20000000), decimal256.FromI64(200000000),
+               "str2", []byte("bytes2"),
+               arrow.Timestamp(1686981953116000), 
arrow.Date64(1686981953118000)},
+               values[1])
+}
+
+func TestReadArrowRecord_UnsupportedType(t *testing.T) {
+       arrowFields := []arrow.Field{
+               {
+                       Name: "unsupported_type_column",
+                       Type: &arrow.MonthIntervalType{},
+               },
+       }
+       arrowSchema := arrow.NewSchema(arrowFields, nil)
+       var buf bytes.Buffer
+       arrowWriter := ipc.NewWriter(&buf, ipc.WithSchema(arrowSchema))
+       defer arrowWriter.Close()
+
+       alloc := memory.NewGoAllocator()
+       recordBuilder := array.NewRecordBuilder(alloc, arrowSchema)
+       defer recordBuilder.Release()
+
+       recordBuilder.Field(0).(*array.MonthIntervalBuilder).Append(1)
+
+       record := recordBuilder.NewRecord()
+       defer record.Release()
+
+       _, err := readArrowRecord(record)
+       require.NotNil(t, err)
+}
+
+func TestConvertProtoDataTypeToDataType(t *testing.T) {
+       booleanDataType := &proto.DataType{
+               Kind: &proto.DataType_Boolean_{},
+       }
+       assert.Equal(t, "Boolean", 
convertProtoDataTypeToDataType(booleanDataType).TypeName())
+
+       byteDataType := &proto.DataType{
+               Kind: &proto.DataType_Byte_{},
+       }
+       assert.Equal(t, "Byte", 
convertProtoDataTypeToDataType(byteDataType).TypeName())
+
+       shortDataType := &proto.DataType{
+               Kind: &proto.DataType_Short_{},
+       }
+       assert.Equal(t, "Short", 
convertProtoDataTypeToDataType(shortDataType).TypeName())
+
+       integerDataType := &proto.DataType{
+               Kind: &proto.DataType_Integer_{},
+       }
+       assert.Equal(t, "Integer", 
convertProtoDataTypeToDataType(integerDataType).TypeName())
+
+       longDataType := &proto.DataType{
+               Kind: &proto.DataType_Long_{},
+       }
+       assert.Equal(t, "Long", 
convertProtoDataTypeToDataType(longDataType).TypeName())
+
+       floatDataType := &proto.DataType{
+               Kind: &proto.DataType_Float_{},
+       }
+       assert.Equal(t, "Float", 
convertProtoDataTypeToDataType(floatDataType).TypeName())
+
+       doubleDataType := &proto.DataType{
+               Kind: &proto.DataType_Double_{},
+       }
+       assert.Equal(t, "Double", 
convertProtoDataTypeToDataType(doubleDataType).TypeName())
+
+       decimalDataType := &proto.DataType{
+               Kind: &proto.DataType_Decimal_{},
+       }
+       assert.Equal(t, "Decimal", 
convertProtoDataTypeToDataType(decimalDataType).TypeName())
+
+       stringDataType := &proto.DataType{
+               Kind: &proto.DataType_String_{},
+       }
+       assert.Equal(t, "String", 
convertProtoDataTypeToDataType(stringDataType).TypeName())
+
+       binaryDataType := &proto.DataType{
+               Kind: &proto.DataType_Binary_{},
+       }
+       assert.Equal(t, "Binary", 
convertProtoDataTypeToDataType(binaryDataType).TypeName())
+
+       timestampDataType := &proto.DataType{
+               Kind: &proto.DataType_Timestamp_{},
+       }
+       assert.Equal(t, "Timestamp", 
convertProtoDataTypeToDataType(timestampDataType).TypeName())
+
+       timestampNtzDataType := &proto.DataType{
+               Kind: &proto.DataType_TimestampNtz{},
+       }
+       assert.Equal(t, "TimestampNtz", 
convertProtoDataTypeToDataType(timestampNtzDataType).TypeName())
+
+       dateDataType := &proto.DataType{
+               Kind: &proto.DataType_Date_{},
+       }
+       assert.Equal(t, "Date", 
convertProtoDataTypeToDataType(dateDataType).TypeName())
+}
+
+func TestConvertProtoDataTypeToDataType_UnsupportedType(t *testing.T) {
+       unsupportedDataType := &proto.DataType{
+               Kind: &proto.DataType_YearMonthInterval_{},
+       }
+       assert.Equal(t, "Unsupported", 
convertProtoDataTypeToDataType(unsupportedDataType).TypeName())
+}
diff --git a/client/sql/datatype.go b/client/sql/datatype.go
index 05ab02e..e201114 100644
--- a/client/sql/datatype.go
+++ b/client/sql/datatype.go
@@ -25,6 +25,27 @@ type DataType interface {
        TypeName() string
 }
 
+type BooleanType struct {
+}
+
+func (t BooleanType) TypeName() string {
+       return getDataTypeName(t)
+}
+
+type ByteType struct {
+}
+
+func (t ByteType) TypeName() string {
+       return getDataTypeName(t)
+}
+
+type ShortType struct {
+}
+
+func (t ShortType) TypeName() string {
+       return getDataTypeName(t)
+}
+
 type IntegerType struct {
 }
 
@@ -32,6 +53,34 @@ func (t IntegerType) TypeName() string {
        return getDataTypeName(t)
 }
 
+type LongType struct {
+}
+
+func (t LongType) TypeName() string {
+       return getDataTypeName(t)
+}
+
+type FloatType struct {
+}
+
+func (t FloatType) TypeName() string {
+       return getDataTypeName(t)
+}
+
+type DoubleType struct {
+}
+
+func (t DoubleType) TypeName() string {
+       return getDataTypeName(t)
+}
+
+type DecimalType struct {
+}
+
+func (t DecimalType) TypeName() string {
+       return getDataTypeName(t)
+}
+
 type StringType struct {
 }
 
@@ -39,6 +88,34 @@ func (t StringType) TypeName() string {
        return getDataTypeName(t)
 }
 
+type BinaryType struct {
+}
+
+func (t BinaryType) TypeName() string {
+       return getDataTypeName(t)
+}
+
+type TimestampType struct {
+}
+
+func (t TimestampType) TypeName() string {
+       return getDataTypeName(t)
+}
+
+type TimestampNtzType struct {
+}
+
+func (t TimestampNtzType) TypeName() string {
+       return getDataTypeName(t)
+}
+
+type DateType struct {
+}
+
+func (t DateType) TypeName() string {
+       return getDataTypeName(t)
+}
+
 type UnsupportedType struct {
        TypeInfo any
 }
diff --git a/cmd/spark-connect-example-spark-session/main.go 
b/cmd/spark-connect-example-spark-session/main.go
index 6c0b5db..c35bbeb 100644
--- a/cmd/spark-connect-example-spark-session/main.go
+++ b/cmd/spark-connect-example-spark-session/main.go
@@ -41,6 +41,7 @@ func main() {
                log.Fatalf("Failed: %s", err.Error())
        }
 
+       log.Printf("DataFrame from sql: select 'apple' as word, 123 as count 
union all select 'orange' as word, 456 as count")
        err = df.Show(100, false)
        if err != nil {
                log.Fatalf("Failed: %s", err.Error())
@@ -86,5 +87,19 @@ func main() {
                log.Fatalf("Failed: %s", err.Error())
        }
 
+       log.Printf("DataFrame from reading parquet")
+       df.Show(100, false)
+
+       err = df.CreateTempView("view1", true, false)
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       df, err = spark.Sql("select count, word from view1 order by count")
+       if err != nil {
+               log.Fatalf("Failed: %s", err.Error())
+       }
+
+       log.Printf("DataFrame from sql: select count, word from view1 order by 
count")
        df.Show(100, false)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to