This is an automated email from the ASF dual-hosted git repository.

mgrund pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 229c570  #58 Adding `DropNa` and `df.na.*` methods
229c570 is described below

commit 229c570bff7b503e267e4072acb55bc91886a30b
Author: Martin Grund <[email protected]>
AuthorDate: Thu Jan 2 10:47:33 2025 +0100

    #58 Adding `DropNa` and `df.na.*` methods
    
    ### What changes were proposed in this pull request?
    This patch adds support for `df.DropNa()` and the corresponding 
`df.Na().drop()`-like functions.
    
    * `df.Na().Drop()`
    * `df.Na().Fill()`
    * `df.Na().Replace()`
    
    ### Why are the changes needed?
    Compatibility
    
    ### Does this PR introduce _any_ user-facing change?
    New supported APIs
    
    ### How was this patch tested?
    Added IT
    
    Closes #99 from grundprinzip/dropna.
    
    Authored-by: Martin Grund <[email protected]>
    Signed-off-by: Martin Grund <[email protected]>
---
 internal/tests/integration/dataframe_test.go | 53 +++++++++++++++++++++++
 spark/sql/dataframe.go                       | 48 ++++++++++++++++++++-
 spark/sql/dataframenafunctions.go            | 64 ++++++++++++++++++++++++++++
 3 files changed, 164 insertions(+), 1 deletion(-)

diff --git a/internal/tests/integration/dataframe_test.go 
b/internal/tests/integration/dataframe_test.go
index 5a61d48..0afe3f2 100644
--- a/internal/tests/integration/dataframe_test.go
+++ b/internal/tests/integration/dataframe_test.go
@@ -925,3 +925,56 @@ func TestDataFrame_FillNa(t *testing.T) {
        assert.Equal(t, []any{nil, int64(12), int64(20)}, res[0].Values())
        assert.Equal(t, []any{int64(1), int64(10), int64(1)}, res[1].Values())
 }
+
+func TestDataFrame_DFNaFunctions(t *testing.T) {
+       ctx, spark := connect()
+       data := [][]any{
+               {10, 80.5, "Alice", true},
+               {5, nil, "Bob", nil},
+               {nil, nil, "Tom", nil},
+               {nil, nil, nil, nil},
+       }
+       schema := types.StructOf(
+               types.NewStructField("age", types.INTEGER),
+               types.NewStructField("height", types.DOUBLE),
+               types.NewStructField("name", types.STRING),
+               types.NewStructField("bool", types.BOOLEAN),
+       )
+       df, err := spark.CreateDataFrame(ctx, data, schema)
+       assert.NoError(t, err)
+
+       res, err := df.Na().Drop(ctx)
+       assert.NoError(t, err)
+       rows, err := res.Collect(ctx)
+       assert.NoError(t, err)
+       assert.Len(t, rows, 1)
+       assert.Equal(t, rows[0].At(2), "Alice")
+
+       res, err = df.Na().DropAll(ctx)
+       assert.NoError(t, err)
+       rows, err = res.Collect(ctx)
+       assert.NoError(t, err)
+       assert.Len(t, rows, 3)
+
+       // Fill must only use long types
+       res, err = df.Na().Fill(ctx, types.Int64(50))
+       assert.NoError(t, err)
+       rows, err = res.Collect(ctx)
+       assert.NoError(t, err)
+       assert.Len(t, rows, 4)
+
+       assert.Equal(t, int32(50), rows[2].At(0))
+       assert.Equal(t, int32(50), rows[3].At(0))
+       assert.Equal(t, float64(50), rows[2].At(1))
+       assert.Equal(t, float64(50), rows[3].At(1))
+
+       res, err = df.Na().Replace(ctx, 
[]types.PrimitiveTypeLiteral{types.String("Alice")}, 
[]types.PrimitiveTypeLiteral{
+               types.String("Bob"),
+       })
+       assert.NoError(t, err)
+       rows, err = res.Collect(ctx)
+       assert.NoError(t, err)
+       assert.Len(t, rows, 4)
+
+       assert.Equal(t, "Bob", rows[0].At(2))
+}
diff --git a/spark/sql/dataframe.go b/spark/sql/dataframe.go
index 30b5293..3979c34 100644
--- a/spark/sql/dataframe.go
+++ b/spark/sql/dataframe.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one or more
 // contributor license agreements.  See the NOTICE file distributed with
 // this work for additional information regarding copyright ownership.
@@ -104,6 +103,13 @@ type DataFrame interface {
        DropByName(ctx context.Context, columns ...string) (DataFrame, error)
        // DropDuplicates returns a new DataFrame that contains only the unique 
rows from this DataFrame.
        DropDuplicates(ctx context.Context, columns ...string) (DataFrame, 
error)
+       // Drops all rows containing any null or NaN values. This is similar to 
PySparks dropna with how=any
+       DropNa(ctx context.Context, cols ...string) (DataFrame, error)
+       // Drops all rows containing all null or NaN values in the specified 
columns. This is
+       // similar to PySparks dropna with how=all
+       DropNaAll(ctx context.Context, cols ...string) (DataFrame, error)
+       // Drops all rows containing null or NaN values in the specified 
columns. with a max threshold.
+       DropNaWithThreshold(ctx context.Context, threshold int32, cols 
...string) (DataFrame, error)
        // ExceptAll is similar to Substract but does not perform the distinct 
operation.
        ExceptAll(ctx context.Context, other DataFrame) DataFrame
        // Explain returns the string explain plan for the current DataFrame 
according to the explainMode.
@@ -140,6 +146,7 @@ type DataFrame interface {
        // Melt is an alias for Unpivot.
        Melt(ctx context.Context, ids []column.Convertible, values 
[]column.Convertible,
                variableColumnName string, valueColumnName string) (DataFrame, 
error)
+       Na() DataFrameNaFunctions
        // Offset returns a new DataFrame by skipping the first `offset` rows.
        Offset(ctx context.Context, offset int32) DataFrame
        // OrderBy is an alias for Sort
@@ -1534,3 +1541,42 @@ func (df *dataFrameImpl) FillNaWithValues(ctx 
context.Context,
        }
        return makeDataframeWithFillNaRelation(df, valueLiterals, columns), nil
 }
+
+func (df *dataFrameImpl) DropNa(ctx context.Context, subset ...string) 
(DataFrame, error) {
+       rel := &proto.Relation{
+               Common: &proto.RelationCommon{
+                       PlanId: newPlanId(),
+               },
+               RelType: &proto.Relation_DropNa{
+                       DropNa: &proto.NADrop{
+                               Input: df.relation,
+                               Cols:  subset,
+                       },
+               },
+       }
+       return NewDataFrame(df.session, rel), nil
+}
+
+func (df *dataFrameImpl) DropNaAll(ctx context.Context, subset ...string) 
(DataFrame, error) {
+       return df.DropNaWithThreshold(ctx, 1, subset...)
+}
+
+func (df *dataFrameImpl) DropNaWithThreshold(ctx context.Context, thresh 
int32, subset ...string) (DataFrame, error) {
+       rel := &proto.Relation{
+               Common: &proto.RelationCommon{
+                       PlanId: newPlanId(),
+               },
+               RelType: &proto.Relation_DropNa{
+                       DropNa: &proto.NADrop{
+                               Input:       df.relation,
+                               MinNonNulls: &thresh,
+                               Cols:        subset,
+                       },
+               },
+       }
+       return NewDataFrame(df.session, rel), nil
+}
+
+func (df *dataFrameImpl) Na() DataFrameNaFunctions {
+       return &dataFrameNaFunctionsImpl{dataFrame: df}
+}
diff --git a/spark/sql/dataframenafunctions.go 
b/spark/sql/dataframenafunctions.go
new file mode 100644
index 0000000..6fd6850
--- /dev/null
+++ b/spark/sql/dataframenafunctions.go
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+       "context"
+
+       "github.com/apache/spark-connect-go/v35/spark/sql/types"
+)
+
+type DataFrameNaFunctions interface {
+       Drop(ctx context.Context, cols ...string) (DataFrame, error)
+       DropAll(ctx context.Context, cols ...string) (DataFrame, error)
+       DropWithThreshold(ctx context.Context, threshold int32, cols ...string) 
(DataFrame, error)
+       Fill(ctx context.Context, value types.PrimitiveTypeLiteral, cols 
...string) (DataFrame, error)
+       FillWithValues(ctx context.Context, values 
map[string]types.PrimitiveTypeLiteral) (DataFrame, error)
+       Replace(ctx context.Context, toReplace []types.PrimitiveTypeLiteral,
+               values []types.PrimitiveTypeLiteral, cols ...string) 
(DataFrame, error)
+}
+
+type dataFrameNaFunctionsImpl struct {
+       dataFrame DataFrame
+}
+
+func (d *dataFrameNaFunctionsImpl) Drop(ctx context.Context, cols ...string) 
(DataFrame, error) {
+       return d.dataFrame.DropNa(ctx, cols...)
+}
+
+func (d *dataFrameNaFunctionsImpl) DropAll(ctx context.Context, cols 
...string) (DataFrame, error) {
+       return d.dataFrame.DropNaAll(ctx, cols...)
+}
+
+func (d *dataFrameNaFunctionsImpl) DropWithThreshold(ctx context.Context, 
threshold int32, cols ...string) (DataFrame, error) {
+       return d.dataFrame.DropNaWithThreshold(ctx, threshold, cols...)
+}
+
+func (d *dataFrameNaFunctionsImpl) Fill(ctx context.Context, value 
types.PrimitiveTypeLiteral, cols ...string) (DataFrame, error) {
+       return d.dataFrame.FillNa(ctx, value, cols...)
+}
+
+func (d *dataFrameNaFunctionsImpl) FillWithValues(ctx context.Context,
+       values map[string]types.PrimitiveTypeLiteral,
+) (DataFrame, error) {
+       return d.dataFrame.FillNaWithValues(ctx, values)
+}
+
+func (d *dataFrameNaFunctionsImpl) Replace(ctx context.Context,
+       toReplace []types.PrimitiveTypeLiteral, values 
[]types.PrimitiveTypeLiteral, cols ...string,
+) (DataFrame, error) {
+       return d.dataFrame.Replace(ctx, toReplace, values, cols...)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to