This is an automated email from the ASF dual-hosted git repository.
mgrund pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark-connect-go.git
The following commit(s) were added to refs/heads/master by this push:
new 229c570 #58 Adding `DropNa` and `df.na.*` methods
229c570 is described below
commit 229c570bff7b503e267e4072acb55bc91886a30b
Author: Martin Grund <[email protected]>
AuthorDate: Thu Jan 2 10:47:33 2025 +0100
#58 Adding `DropNa` and `df.na.*` methods
### What changes were proposed in this pull request?
This patch adds support for `df.DropNa()` and the corresponding
`df.Na().drop()`-like functions.
* `df.Na().Drop()`
* `df.Na().Fill()`
* `df.Na().Replace()`
### Why are the changes needed?
Compatibility
### Does this PR introduce _any_ user-facing change?
New supported APIs
### How was this patch tested?
Added IT
Closes #99 from grundprinzip/dropna.
Authored-by: Martin Grund <[email protected]>
Signed-off-by: Martin Grund <[email protected]>
---
internal/tests/integration/dataframe_test.go | 53 +++++++++++++++++++++++
spark/sql/dataframe.go | 48 ++++++++++++++++++++-
spark/sql/dataframenafunctions.go | 64 ++++++++++++++++++++++++++++
3 files changed, 164 insertions(+), 1 deletion(-)
diff --git a/internal/tests/integration/dataframe_test.go
b/internal/tests/integration/dataframe_test.go
index 5a61d48..0afe3f2 100644
--- a/internal/tests/integration/dataframe_test.go
+++ b/internal/tests/integration/dataframe_test.go
@@ -925,3 +925,56 @@ func TestDataFrame_FillNa(t *testing.T) {
assert.Equal(t, []any{nil, int64(12), int64(20)}, res[0].Values())
assert.Equal(t, []any{int64(1), int64(10), int64(1)}, res[1].Values())
}
+
+func TestDataFrame_DFNaFunctions(t *testing.T) {
+ ctx, spark := connect()
+ data := [][]any{
+ {10, 80.5, "Alice", true},
+ {5, nil, "Bob", nil},
+ {nil, nil, "Tom", nil},
+ {nil, nil, nil, nil},
+ }
+ schema := types.StructOf(
+ types.NewStructField("age", types.INTEGER),
+ types.NewStructField("height", types.DOUBLE),
+ types.NewStructField("name", types.STRING),
+ types.NewStructField("bool", types.BOOLEAN),
+ )
+ df, err := spark.CreateDataFrame(ctx, data, schema)
+ assert.NoError(t, err)
+
+ res, err := df.Na().Drop(ctx)
+ assert.NoError(t, err)
+ rows, err := res.Collect(ctx)
+ assert.NoError(t, err)
+ assert.Len(t, rows, 1)
+ assert.Equal(t, rows[0].At(2), "Alice")
+
+ res, err = df.Na().DropAll(ctx)
+ assert.NoError(t, err)
+ rows, err = res.Collect(ctx)
+ assert.NoError(t, err)
+ assert.Len(t, rows, 3)
+
+ // Fill must only use long types
+ res, err = df.Na().Fill(ctx, types.Int64(50))
+ assert.NoError(t, err)
+ rows, err = res.Collect(ctx)
+ assert.NoError(t, err)
+ assert.Len(t, rows, 4)
+
+ assert.Equal(t, int32(50), rows[2].At(0))
+ assert.Equal(t, int32(50), rows[3].At(0))
+ assert.Equal(t, float64(50), rows[2].At(1))
+ assert.Equal(t, float64(50), rows[3].At(1))
+
+ res, err = df.Na().Replace(ctx,
[]types.PrimitiveTypeLiteral{types.String("Alice")},
[]types.PrimitiveTypeLiteral{
+ types.String("Bob"),
+ })
+ assert.NoError(t, err)
+ rows, err = res.Collect(ctx)
+ assert.NoError(t, err)
+ assert.Len(t, rows, 4)
+
+ assert.Equal(t, "Bob", rows[0].At(2))
+}
diff --git a/spark/sql/dataframe.go b/spark/sql/dataframe.go
index 30b5293..3979c34 100644
--- a/spark/sql/dataframe.go
+++ b/spark/sql/dataframe.go
@@ -1,4 +1,3 @@
-//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
@@ -104,6 +103,13 @@ type DataFrame interface {
DropByName(ctx context.Context, columns ...string) (DataFrame, error)
// DropDuplicates returns a new DataFrame that contains only the unique
rows from this DataFrame.
DropDuplicates(ctx context.Context, columns ...string) (DataFrame,
error)
+ // Drops all rows containing any null or NaN values. This is similar to
PySparks dropna with how=any
+ DropNa(ctx context.Context, cols ...string) (DataFrame, error)
+ // Drops all rows containing all null or NaN values in the specified
columns. This is
+ // similar to PySparks dropna with how=all
+ DropNaAll(ctx context.Context, cols ...string) (DataFrame, error)
+ // Drops all rows containing null or NaN values in the specified
columns. with a max threshold.
+ DropNaWithThreshold(ctx context.Context, threshold int32, cols
...string) (DataFrame, error)
// ExceptAll is similar to Substract but does not perform the distinct
operation.
ExceptAll(ctx context.Context, other DataFrame) DataFrame
// Explain returns the string explain plan for the current DataFrame
according to the explainMode.
@@ -140,6 +146,7 @@ type DataFrame interface {
// Melt is an alias for Unpivot.
Melt(ctx context.Context, ids []column.Convertible, values
[]column.Convertible,
variableColumnName string, valueColumnName string) (DataFrame,
error)
+ Na() DataFrameNaFunctions
// Offset returns a new DataFrame by skipping the first `offset` rows.
Offset(ctx context.Context, offset int32) DataFrame
// OrderBy is an alias for Sort
@@ -1534,3 +1541,42 @@ func (df *dataFrameImpl) FillNaWithValues(ctx
context.Context,
}
return makeDataframeWithFillNaRelation(df, valueLiterals, columns), nil
}
+
+func (df *dataFrameImpl) DropNa(ctx context.Context, subset ...string)
(DataFrame, error) {
+ rel := &proto.Relation{
+ Common: &proto.RelationCommon{
+ PlanId: newPlanId(),
+ },
+ RelType: &proto.Relation_DropNa{
+ DropNa: &proto.NADrop{
+ Input: df.relation,
+ Cols: subset,
+ },
+ },
+ }
+ return NewDataFrame(df.session, rel), nil
+}
+
+func (df *dataFrameImpl) DropNaAll(ctx context.Context, subset ...string)
(DataFrame, error) {
+ return df.DropNaWithThreshold(ctx, 1, subset...)
+}
+
+func (df *dataFrameImpl) DropNaWithThreshold(ctx context.Context, thresh
int32, subset ...string) (DataFrame, error) {
+ rel := &proto.Relation{
+ Common: &proto.RelationCommon{
+ PlanId: newPlanId(),
+ },
+ RelType: &proto.Relation_DropNa{
+ DropNa: &proto.NADrop{
+ Input: df.relation,
+ MinNonNulls: &thresh,
+ Cols: subset,
+ },
+ },
+ }
+ return NewDataFrame(df.session, rel), nil
+}
+
+func (df *dataFrameImpl) Na() DataFrameNaFunctions {
+ return &dataFrameNaFunctionsImpl{dataFrame: df}
+}
diff --git a/spark/sql/dataframenafunctions.go
b/spark/sql/dataframenafunctions.go
new file mode 100644
index 0000000..6fd6850
--- /dev/null
+++ b/spark/sql/dataframenafunctions.go
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package sql
+
+import (
+ "context"
+
+ "github.com/apache/spark-connect-go/v35/spark/sql/types"
+)
+
+type DataFrameNaFunctions interface {
+ Drop(ctx context.Context, cols ...string) (DataFrame, error)
+ DropAll(ctx context.Context, cols ...string) (DataFrame, error)
+ DropWithThreshold(ctx context.Context, threshold int32, cols ...string)
(DataFrame, error)
+ Fill(ctx context.Context, value types.PrimitiveTypeLiteral, cols
...string) (DataFrame, error)
+ FillWithValues(ctx context.Context, values
map[string]types.PrimitiveTypeLiteral) (DataFrame, error)
+ Replace(ctx context.Context, toReplace []types.PrimitiveTypeLiteral,
+ values []types.PrimitiveTypeLiteral, cols ...string)
(DataFrame, error)
+}
+
+type dataFrameNaFunctionsImpl struct {
+ dataFrame DataFrame
+}
+
+func (d *dataFrameNaFunctionsImpl) Drop(ctx context.Context, cols ...string)
(DataFrame, error) {
+ return d.dataFrame.DropNa(ctx, cols...)
+}
+
+func (d *dataFrameNaFunctionsImpl) DropAll(ctx context.Context, cols
...string) (DataFrame, error) {
+ return d.dataFrame.DropNaAll(ctx, cols...)
+}
+
+func (d *dataFrameNaFunctionsImpl) DropWithThreshold(ctx context.Context,
threshold int32, cols ...string) (DataFrame, error) {
+ return d.dataFrame.DropNaWithThreshold(ctx, threshold, cols...)
+}
+
+func (d *dataFrameNaFunctionsImpl) Fill(ctx context.Context, value
types.PrimitiveTypeLiteral, cols ...string) (DataFrame, error) {
+ return d.dataFrame.FillNa(ctx, value, cols...)
+}
+
+func (d *dataFrameNaFunctionsImpl) FillWithValues(ctx context.Context,
+ values map[string]types.PrimitiveTypeLiteral,
+) (DataFrame, error) {
+ return d.dataFrame.FillNaWithValues(ctx, values)
+}
+
+func (d *dataFrameNaFunctionsImpl) Replace(ctx context.Context,
+ toReplace []types.PrimitiveTypeLiteral, values
[]types.PrimitiveTypeLiteral, cols ...string,
+) (DataFrame, error) {
+ return d.dataFrame.Replace(ctx, toReplace, values, cols...)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]