This is an automated email from the ASF dual-hosted git repository.

eyal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafu.git


The following commit(s) were added to refs/heads/main by this push:
     new 6fd6fc4  DATAFU-177 Add dedupByAllExcept (#46)
6fd6fc4 is described below

commit 6fd6fc4cb5e8156291600ee5f6ef3591dd74541e
Author: Eyal Allweil <e...@apache.org>
AuthorDate: Mon Dec 9 11:07:43 2024 +0200

    DATAFU-177 Add dedupByAllExcept (#46)
    
    @uzadude has approved in the JIRA, so I'm merging this in.
---
 .../src/main/scala/datafu/spark/DataFrameOps.scala |  3 ++
 .../src/main/scala/datafu/spark/SparkDFUtils.scala | 30 +++++++++++++++++++
 .../test/scala/datafu/spark/TestSparkDFUtils.scala | 34 ++++++++++++++++++++++
 3 files changed, 67 insertions(+)

diff --git a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala 
b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
index daad469..4b204fc 100644
--- a/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/DataFrameOps.scala
@@ -56,6 +56,9 @@ object DataFrameOps {
                           columnsFilter,
                           columnsFilterKeep)
 
+    def dedupByAllExcept(ignoredColumn: String, aggFunction : String => Column 
= org.apache.spark.sql.functions.max) : DataFrame =
+      SparkDFUtils.dedupByAllExcept(df, ignoredColumn, aggFunction)
+      
     def flatten(colName: String): DataFrame = SparkDFUtils.flatten(df, colName)
 
     def changeSchema(newScheme: String*): DataFrame =
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala 
b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
index 2732460..2b347c3 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
@@ -245,6 +245,36 @@ object SparkDFUtils {
     df2.sparkSession.createDataFrame(df2.rdd,ns)
   }
 
+/**
+   * Method for reducing rows when there is one column whose value is not 
important, but you don't want to lose any
+   * actual data from the other rows. For example if a server creates events 
with an autogenerated event id, and sometimes
+   * events are duplicated. You don't want double rows just for the event ids, 
but if any of the other fields are distinct
+   * you want to keep the rows (with their event ids)
+   * @param df
+   * @param ignoredColumn The one column whose value you need only one of
+   * @param aggFunction Default is max
+   * @return
+   */
+  def dedupByAllExcept(df: DataFrame, ignoredColumn: String, aggFunction : 
String => Column): DataFrame = {
+    val cols = df.schema.fields.filter(_.name != ignoredColumn).map(i => 
i.name)
+    val grouped = df.groupBy(cols.head, cols.tail: _*)
+    val resultWithAggregateFieldName = grouped.agg(aggFunction(ignoredColumn))
+
+    val pattern = s".*\\($ignoredColumn\\)"
+    val candidateFields = 
resultWithAggregateFieldName.schema.fields.filter(_.name.matches(pattern))
+
+    // Cleanly notify users if something unexpected with the aggregate 
function name has happened
+    if (candidateFields.isEmpty) {
+      throw new Exception(s"Aggregated field name not found after applying 
aggregate function to '$ignoredColumn'")
+    } else if (candidateFields.size > 1) {
+      throw new Exception(s"Multiple fields with names that match aggregate 
function pattern found for '$ignoredColumn'")
+    }
+
+    // rename the aggregated field back to its original name and restore the 
original order
+    resultWithAggregateFieldName.withColumnRenamed(candidateFields(0).name, 
ignoredColumn).
+      select(df.columns.head, df.columns.tail:_*)
+  }
+  
   /**
     * Returns a DataFrame with the given column (should be a StructType)
     * replaced by its inner fields.
diff --git a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala 
b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
index 75fa8da..ae5873c 100644
--- a/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
+++ b/datafu-spark/src/test/scala/datafu/spark/TestSparkDFUtils.scala
@@ -455,4 +455,38 @@ class DataFrameOpsTests extends FunSuite with 
DataFrameSuiteBase {
 
     assertDataFrameEquals(expected, actual)
   }
+  
+  test("test_dedup_by-all_except") {
+
+    val df = spark.createDataFrame(
+      Seq(("a", "a", 1, 2, "aa12", "a"),
+        ("a", "a", 1, 1, "aa11", "a"),
+        ("a", "a", 2, 1, "aa21", "a"),
+        ("a", "b", 3, 2, "ab32", "a"),
+        ("b", "a", 1, 1, "ba11", "a"))
+    ).toDF("col_grp1", "col_grp2", "col_grp3", "col_grp4", "col_grp5", 
"col_grp6")
+
+    val noChange = df.dedupByAllExcept("col_grp1")
+
+    assertDataFrameNoOrderEquals(df, noChange)
+
+    val df2 = spark.createDataFrame(
+      Seq(("a", "a"),
+        ("b", "a"),
+        ("c", "c"),
+        ("d", "d"),
+        ("e", "e"))
+    ).toDF("col_grp1", "col_grp2")
+
+    val loseRow = df2.dedupByAllExcept("col_grp1")
+
+    val expected = spark.createDataFrame(
+      Seq(("b", "a"),
+        ("c", "c"),
+        ("d", "d"),
+        ("e", "e"))
+    ).toDF("col_grp1", "col_grp2")
+
+   assertDataFrameNoOrderEquals(expected, loseRow)
+  }
 }

Reply via email to