This is an automated email from the ASF dual-hosted git repository.

mgrund pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 15b481b  [#58] Adding Unpivot/Melt
15b481b is described below

commit 15b481bb3f023cb494719141d1bd44b497802b93
Author: Martin Grund <[email protected]>
AuthorDate: Mon Dec 30 09:55:10 2024 +0100

    [#58] Adding Unpivot/Melt
    
    ### What changes were proposed in this pull request?
    Adding support for Unpviot/Melt
    
    ### Why are the changes needed?
    Compatibility
    
    ### Does this PR introduce _any_ user-facing change?
    New functinality.
    
    ### How was this patch tested?
    Integration test.
    
    Closes #93 from grundprinzip/unpivot.
    
    Authored-by: Martin Grund <[email protected]>
    Signed-off-by: Martin Grund <[email protected]>
---
 internal/tests/integration/dataframe_test.go | 21 ++++++++
 spark/client/client.go                       |  4 +-
 spark/sql/dataframe.go                       | 77 ++++++++++++++++++++++++++++
 3 files changed, 101 insertions(+), 1 deletion(-)

diff --git a/internal/tests/integration/dataframe_test.go 
b/internal/tests/integration/dataframe_test.go
index 5f67be8..f2265f5 100644
--- a/internal/tests/integration/dataframe_test.go
+++ b/internal/tests/integration/dataframe_test.go
@@ -781,3 +781,24 @@ func TestDataFrame_WithOption(t *testing.T) {
        assert.NoError(t, err)
        assert.Equal(t, int64(10), c)
 }
+
+func TestDataFrame_Unpivot(t *testing.T) {
+       ctx, spark := connect()
+       data := [][]any{{1, 11, 1.1}, {2, 12, 1.2}}
+       schema := types.StructOf(
+               types.NewStructField("id", types.INTEGER),
+               types.NewStructField("int", types.INTEGER),
+               types.NewStructField("double", types.DOUBLE),
+       )
+       df, err := spark.CreateDataFrame(ctx, data, schema)
+       assert.NoError(t, err)
+
+       udf, err := df.Unpivot(ctx, []column.Convertible{functions.Col("id")},
+               []column.Convertible{functions.Col("int"), 
functions.Col("double")},
+               "type", "value")
+
+       assert.NoError(t, err)
+       cnt, err := udf.Count(ctx)
+       assert.NoError(t, err)
+       assert.Equal(t, int64(4), cnt)
+}
diff --git a/spark/client/client.go b/spark/client/client.go
index 37b1a55..7c827da 100644
--- a/spark/client/client.go
+++ b/spark/client/client.go
@@ -331,7 +331,9 @@ func (s *sparkConnectClientImpl) SemanticHash(ctx 
context.Context, plan *proto.P
        return response.GetSemanticHash().GetResult(), nil
 }
 
-func (s *sparkConnectClientImpl) Config(ctx context.Context, operation 
*proto.ConfigRequest_Operation) (*generated.ConfigResponse, error) {
+func (s *sparkConnectClientImpl) Config(ctx context.Context,
+       operation *proto.ConfigRequest_Operation,
+) (*generated.ConfigResponse, error) {
        request := &proto.ConfigRequest{
                Operation: operation,
                UserContext: &proto.UserContext{
diff --git a/spark/sql/dataframe.go b/spark/sql/dataframe.go
index 742a5c0..65f9484 100644
--- a/spark/sql/dataframe.go
+++ b/spark/sql/dataframe.go
@@ -133,6 +133,9 @@ type DataFrame interface {
        Join(ctx context.Context, other DataFrame, on column.Convertible, 
joinType utils.JoinType) (DataFrame, error)
        // Limit applies a limit on the DataFrame
        Limit(ctx context.Context, limit int32) DataFrame
+       // Melt is an alias for Unpivot.
+       Melt(ctx context.Context, ids []column.Convertible, values 
[]column.Convertible,
+               variableColumnName string, valueColumnName string) (DataFrame, 
error)
        // Offset returns a new DataFrame by skipping the first `offset` rows.
        Offset(ctx context.Context, offset int32) DataFrame
        // OrderBy is an alias for Sort
@@ -188,6 +191,27 @@ type DataFrame interface {
        // Unpersist resets the storage level for this data frame, and if 
necessary removes it
        // from server-side caches.
        Unpersist(ctx context.Context) error
+       // Unpivot a DataFrame from wide format to long format, optionally 
leaving
+       // identifier columns set. This is the reverse to 
`groupBy(...).pivot(...).agg(...)`,
+       // except for the aggregation, which cannot be reversed.
+       //
+       // This function is useful to massage a DataFrame into a format where 
some
+       // columns are identifier columns ("ids"), while all other columns 
("values")
+       // are "unpivoted" to the rows, leaving just two non-id columns, named 
as given
+       // by `variableColumnName` and `valueColumnName`.
+       //
+       // When no "id" columns are given, the unpivoted DataFrame consists of 
only the
+       // "variable" and "value" columns.
+       //
+       // The `values` columns must not be empty so at least one value must be 
given to be unpivoted.
+       // When `values` is `None`, all non-id columns will be unpivoted.
+       //
+       // All "value" columns must share a least common data type. Unless they 
are the same data type,
+       // all "value" columns are cast to the nearest common data type. For 
instance, types
+       // `IntegerType` and `LongType` are cast to `LongType`, while 
`IntegerType` and `StringType`
+       // do not have a common data type and `unpivot` fails.
+       Unpivot(ctx context.Context, ids []column.Convertible, values 
[]column.Convertible,
+               variableColumnName string, valueColumnName string) (DataFrame, 
error)
        // WithColumn returns a new DataFrame by adding a column or replacing 
the
        // existing column that has the same name. The column expression must 
be an
        // expression over this DataFrame; attempting to add a column from some 
other
@@ -1349,3 +1373,56 @@ func (df *dataFrameImpl) Summary(ctx context.Context, 
statistics ...string) Data
        }
        return NewDataFrame(df.session, rel)
 }
+
+func (df *dataFrameImpl) Melt(ctx context.Context,
+       ids []column.Convertible,
+       values []column.Convertible,
+       variableColumnName string,
+       valueColumnName string,
+) (DataFrame, error) {
+       return df.Unpivot(ctx, ids, values, variableColumnName, valueColumnName)
+}
+
+func (df *dataFrameImpl) Unpivot(ctx context.Context,
+       ids []column.Convertible,
+       values []column.Convertible,
+       variableColumnName string,
+       valueColumnName string,
+) (DataFrame, error) {
+       idExprs := make([]*proto.Expression, 0, len(ids))
+       for _, id := range ids {
+               expr, err := id.ToProto(ctx)
+               if err != nil {
+                       return nil, err
+               }
+               idExprs = append(idExprs, expr)
+       }
+
+       valueExprs := make([]*proto.Expression, 0, len(values))
+       for _, value := range values {
+               expr, err := value.ToProto(ctx)
+               if err != nil {
+                       return nil, err
+               }
+               valueExprs = append(valueExprs, expr)
+       }
+
+       rel := &proto.Relation{
+               Common: &proto.RelationCommon{
+                       PlanId: newPlanId(),
+               },
+
+               RelType: &proto.Relation_Unpivot{
+                       Unpivot: &proto.Unpivot{
+                               Input: df.relation,
+                               Ids:   idExprs,
+                               Values: &proto.Unpivot_Values{
+                                       Values: valueExprs,
+                               },
+                               VariableColumnName: variableColumnName,
+                               ValueColumnName:    valueColumnName,
+                       },
+               },
+       }
+       return NewDataFrame(df.session, rel), nil
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to