This is an automated email from the ASF dual-hosted git repository.
mgrund pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git
The following commit(s) were added to refs/heads/master by this push:
new 15b481b [#58] Adding Unpivot/Melt
15b481b is described below
commit 15b481bb3f023cb494719141d1bd44b497802b93
Author: Martin Grund <[email protected]>
AuthorDate: Mon Dec 30 09:55:10 2024 +0100
[#58] Adding Unpivot/Melt
### What changes were proposed in this pull request?
Adding support for Unpviot/Melt
### Why are the changes needed?
Compatibility
### Does this PR introduce _any_ user-facing change?
New functinality.
### How was this patch tested?
Integration test.
Closes #93 from grundprinzip/unpivot.
Authored-by: Martin Grund <[email protected]>
Signed-off-by: Martin Grund <[email protected]>
---
internal/tests/integration/dataframe_test.go | 21 ++++++++
spark/client/client.go | 4 +-
spark/sql/dataframe.go | 77 ++++++++++++++++++++++++++++
3 files changed, 101 insertions(+), 1 deletion(-)
diff --git a/internal/tests/integration/dataframe_test.go
b/internal/tests/integration/dataframe_test.go
index 5f67be8..f2265f5 100644
--- a/internal/tests/integration/dataframe_test.go
+++ b/internal/tests/integration/dataframe_test.go
@@ -781,3 +781,24 @@ func TestDataFrame_WithOption(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, int64(10), c)
}
+
+func TestDataFrame_Unpivot(t *testing.T) {
+ ctx, spark := connect()
+ data := [][]any{{1, 11, 1.1}, {2, 12, 1.2}}
+ schema := types.StructOf(
+ types.NewStructField("id", types.INTEGER),
+ types.NewStructField("int", types.INTEGER),
+ types.NewStructField("double", types.DOUBLE),
+ )
+ df, err := spark.CreateDataFrame(ctx, data, schema)
+ assert.NoError(t, err)
+
+ udf, err := df.Unpivot(ctx, []column.Convertible{functions.Col("id")},
+ []column.Convertible{functions.Col("int"),
functions.Col("double")},
+ "type", "value")
+
+ assert.NoError(t, err)
+ cnt, err := udf.Count(ctx)
+ assert.NoError(t, err)
+ assert.Equal(t, int64(4), cnt)
+}
diff --git a/spark/client/client.go b/spark/client/client.go
index 37b1a55..7c827da 100644
--- a/spark/client/client.go
+++ b/spark/client/client.go
@@ -331,7 +331,9 @@ func (s *sparkConnectClientImpl) SemanticHash(ctx
context.Context, plan *proto.P
return response.GetSemanticHash().GetResult(), nil
}
-func (s *sparkConnectClientImpl) Config(ctx context.Context, operation
*proto.ConfigRequest_Operation) (*generated.ConfigResponse, error) {
+func (s *sparkConnectClientImpl) Config(ctx context.Context,
+ operation *proto.ConfigRequest_Operation,
+) (*generated.ConfigResponse, error) {
request := &proto.ConfigRequest{
Operation: operation,
UserContext: &proto.UserContext{
diff --git a/spark/sql/dataframe.go b/spark/sql/dataframe.go
index 742a5c0..65f9484 100644
--- a/spark/sql/dataframe.go
+++ b/spark/sql/dataframe.go
@@ -133,6 +133,9 @@ type DataFrame interface {
Join(ctx context.Context, other DataFrame, on column.Convertible,
joinType utils.JoinType) (DataFrame, error)
// Limit applies a limit on the DataFrame
Limit(ctx context.Context, limit int32) DataFrame
+ // Melt is an alias for Unpivot.
+ Melt(ctx context.Context, ids []column.Convertible, values
[]column.Convertible,
+ variableColumnName string, valueColumnName string) (DataFrame,
error)
// Offset returns a new DataFrame by skipping the first `offset` rows.
Offset(ctx context.Context, offset int32) DataFrame
// OrderBy is an alias for Sort
@@ -188,6 +191,27 @@ type DataFrame interface {
// Unpersist resets the storage level for this data frame, and if
necessary removes it
// from server-side caches.
Unpersist(ctx context.Context) error
+ // Unpivot a DataFrame from wide format to long format, optionally
leaving
+ // identifier columns set. This is the reverse to
`groupBy(...).pivot(...).agg(...)`,
+ // except for the aggregation, which cannot be reversed.
+ //
+ // This function is useful to massage a DataFrame into a format where
some
+ // columns are identifier columns ("ids"), while all other columns
("values")
+ // are "unpivoted" to the rows, leaving just two non-id columns, named
as given
+ // by `variableColumnName` and `valueColumnName`.
+ //
+ // When no "id" columns are given, the unpivoted DataFrame consists of
only the
+ // "variable" and "value" columns.
+ //
+ // The `values` columns must not be empty so at least one value must be
given to be unpivoted.
+ // When `values` is `None`, all non-id columns will be unpivoted.
+ //
+ // All "value" columns must share a least common data type. Unless they
are the same data type,
+ // all "value" columns are cast to the nearest common data type. For
instance, types
+ // `IntegerType` and `LongType` are cast to `LongType`, while
`IntegerType` and `StringType`
+ // do not have a common data type and `unpivot` fails.
+ Unpivot(ctx context.Context, ids []column.Convertible, values
[]column.Convertible,
+ variableColumnName string, valueColumnName string) (DataFrame,
error)
// WithColumn returns a new DataFrame by adding a column or replacing
the
// existing column that has the same name. The column expression must
be an
// expression over this DataFrame; attempting to add a column from some
other
@@ -1349,3 +1373,56 @@ func (df *dataFrameImpl) Summary(ctx context.Context,
statistics ...string) Data
}
return NewDataFrame(df.session, rel)
}
+
+func (df *dataFrameImpl) Melt(ctx context.Context,
+ ids []column.Convertible,
+ values []column.Convertible,
+ variableColumnName string,
+ valueColumnName string,
+) (DataFrame, error) {
+ return df.Unpivot(ctx, ids, values, variableColumnName, valueColumnName)
+}
+
+func (df *dataFrameImpl) Unpivot(ctx context.Context,
+ ids []column.Convertible,
+ values []column.Convertible,
+ variableColumnName string,
+ valueColumnName string,
+) (DataFrame, error) {
+ idExprs := make([]*proto.Expression, 0, len(ids))
+ for _, id := range ids {
+ expr, err := id.ToProto(ctx)
+ if err != nil {
+ return nil, err
+ }
+ idExprs = append(idExprs, expr)
+ }
+
+ valueExprs := make([]*proto.Expression, 0, len(values))
+ for _, value := range values {
+ expr, err := value.ToProto(ctx)
+ if err != nil {
+ return nil, err
+ }
+ valueExprs = append(valueExprs, expr)
+ }
+
+ rel := &proto.Relation{
+ Common: &proto.RelationCommon{
+ PlanId: newPlanId(),
+ },
+
+ RelType: &proto.Relation_Unpivot{
+ Unpivot: &proto.Unpivot{
+ Input: df.relation,
+ Ids: idExprs,
+ Values: &proto.Unpivot_Values{
+ Values: valueExprs,
+ },
+ VariableColumnName: variableColumnName,
+ ValueColumnName: valueColumnName,
+ },
+ },
+ }
+ return NewDataFrame(df.session, rel), nil
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]