This is an automated email from the ASF dual-hosted git repository.
mgrund pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git
The following commit(s) were added to refs/heads/master by this push:
new 57211c6 Add column.getItem to Columns
57211c6 is described below
commit 57211c6be15e8af7afb9a33f2ac819ec6f14175a
Author: Magnus Pierre <[email protected]>
AuthorDate: Wed Jan 1 13:05:45 2025 +0100
Add column.getItem to Columns
Added the getItem method to Column, and added the unresolvedExtractValue
type. I have stayed as close as possible to existing implementation for another
language (python) to ensure same functionality.
Also added a simple test case to test the functionality
Closes #79 from magpierre/Add-Column.GetItem-to-Columns.
Authored-by: Magnus Pierre <[email protected]>
Signed-off-by: Martin Grund <[email protected]>
---
internal/tests/integration/functions_test.go | 15 ++++++++++++
spark/sql/column/column.go | 4 ++++
spark/sql/column/expressions.go | 35 ++++++++++++++++++++++++++++
3 files changed, 54 insertions(+)
diff --git a/internal/tests/integration/functions_test.go
b/internal/tests/integration/functions_test.go
index 33f3352..c620fd3 100644
--- a/internal/tests/integration/functions_test.go
+++ b/internal/tests/integration/functions_test.go
@@ -38,3 +38,18 @@ func TestIntegration_BuiltinFunctions(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 10, len(res))
}
+
+func TestIntegration_ColumnGetItem(t *testing.T) {
+ ctx := context.Background()
+ spark, err :=
sql.NewSessionBuilder().Remote("sc://localhost").Build(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ df, _ := spark.Sql(ctx, "select sequence(1,10) as s")
+ df, err = df.Select(ctx, functions.Col("s").GetItem(2))
+ assert.NoError(t, err)
+ res, err := df.Collect(ctx)
+ assert.NoError(t, err)
+ assert.Equal(t, int32(3), res[0].Values()[0])
+}
diff --git a/spark/sql/column/column.go b/spark/sql/column/column.go
index ce70189..b0b3b87 100644
--- a/spark/sql/column/column.go
+++ b/spark/sql/column/column.go
@@ -77,6 +77,10 @@ func (c Column) Desc() Column {
})
}
+func (c Column) GetItem(key any) Column {
+ return NewColumn(NewUnresolvedExtractValue("getItem", c.expr,
NewLiteral(key)))
+}
+
func (c Column) Asc() Column {
return NewColumn(&sortExpression{
child: c.expr,
diff --git a/spark/sql/column/expressions.go b/spark/sql/column/expressions.go
index a2ffa24..198127d 100644
--- a/spark/sql/column/expressions.go
+++ b/spark/sql/column/expressions.go
@@ -166,6 +166,37 @@ func (c *caseWhenExpression) ToProto(ctx context.Context)
(*proto.Expression, er
return fun.ToProto(ctx)
}
+type unresolvedExtractValue struct {
+ name string
+ child expression
+ extraction expression
+}
+
+func (u *unresolvedExtractValue) DebugString() string {
+ return fmt.Sprintf("%s(%s, %s)", u.name, u.child.DebugString(),
u.extraction.DebugString())
+}
+
+func (u *unresolvedExtractValue) ToProto(ctx context.Context)
(*proto.Expression, error) {
+ expr := newProtoExpression()
+ child, err := u.child.ToProto(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ extraction, err := u.extraction.ToProto(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ expr.ExprType = &proto.Expression_UnresolvedExtractValue_{
+ UnresolvedExtractValue:
&proto.Expression_UnresolvedExtractValue{
+ Child: child,
+ Extraction: extraction,
+ },
+ }
+ return expr, nil
+}
+
type unresolvedFunction struct {
name string
args []expression
@@ -211,6 +242,10 @@ func (u *unresolvedFunction) ToProto(ctx context.Context)
(*proto.Expression, er
return expr, nil
}
+func NewUnresolvedExtractValue(name string, child expression, extraction
expression) expression {
+ return &unresolvedExtractValue{name: name, child: child, extraction:
extraction}
+}
+
func NewUnresolvedFunction(name string, args []expression, isDistinct bool)
expression {
return &unresolvedFunction{name: name, args: args, isDistinct:
isDistinct}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]