This is an automated email from the ASF dual-hosted git repository.

uzadude pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafu.git


The following commit(s) were added to refs/heads/main by this push:
     new 601ab12  DATAFU-179 Support Spark 3.3.x and 3.4.x (#47)
601ab12 is described below

commit 601ab124d547bdbf54165f0e598bb28dbf29ff50
Author: Eyal Allweil <e...@apache.org>
AuthorDate: Mon Jan 6 09:16:27 2025 +0200

    DATAFU-179 Support Spark 3.3.x and 3.4.x (#47)
---
 datafu-spark/README.md                                        | 4 ++--
 datafu-spark/build.gradle                                     | 2 +-
 datafu-spark/build_and_test_spark.sh                          | 5 +++--
 datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py | 6 +++---
 datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala   | 2 +-
 5 files changed, 10 insertions(+), 9 deletions(-)

diff --git a/datafu-spark/README.md b/datafu-spark/README.md
index 152677b..d201463 100644
--- a/datafu-spark/README.md
+++ b/datafu-spark/README.md
@@ -11,7 +11,7 @@ This matrix represents versions of Spark that DataFu has been 
compiled and teste
 | 1.7.0 | 2.2.0 to 2.2.2, 2.3.0 to 2.3.2 and 2.4.0 to 2.4.3|
 | 1.8.0 | 2.2.3, 2.3.3, and 2.4.4 to 2.4.5|
 | 2.0.0 | 3.0.x - 3.1.x |
-| 2.1.0 (not released yet) | 3.0.x - 3.2.x |
+| 2.1.0 (not released yet) | 3.0.x - 3.4.x |
 
 # Examples
 
@@ -25,7 +25,7 @@ Here are some examples of things you can do with it:
 
 * [Count distinct up 
to](https://github.com/apache/datafu/blob/main/datafu-spark/src/main/scala/datafu/spark/Aggregators.scala#L187)
 - an efficient implementation when you just want to verify that a certain 
minimum of distinct rows appear in a table
 
-It has been tested on Spark releases from 3.0.0 to 3.1.3 using Scala 2.12. You 
can check if your Spark/Scala version combination has been tested by looking 
[here.](https://github.com/apache/datafu/blob/main/datafu-spark/build_and_test_spark.sh#L20)
+It has been tested on Spark releases from 3.0.0 to 3.4.2 using Scala 2.12. You 
can check if your Spark/Scala version combination has been tested by looking 
[here.](https://github.com/apache/datafu/blob/main/datafu-spark/build_and_test_spark.sh#L20)
 
 -----------
 
diff --git a/datafu-spark/build.gradle b/datafu-spark/build.gradle
index e7011dd..9cd6f90 100644
--- a/datafu-spark/build.gradle
+++ b/datafu-spark/build.gradle
@@ -72,7 +72,7 @@ dependencies {
        testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + 
":3.1.2_" + sparkTestingBaseVersion
     } else if (sparkVersion > "3.2" && sparkVersion < "3.3") {
        testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + 
":3.2.1_" + sparkTestingBaseVersion
-    } else if (sparkVersion > "3.3" && sparkVersion < "3.4") {
+    } else if (sparkVersion >= "3.3") {
        testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + 
":3.3.0_" + sparkTestingBaseVersion
     } else {
        testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":" 
+ sparkVersion + "_" + sparkTestingBaseVersion
diff --git a/datafu-spark/build_and_test_spark.sh 
b/datafu-spark/build_and_test_spark.sh
index 5add676..dbc86fc 100755
--- a/datafu-spark/build_and_test_spark.sh
+++ b/datafu-spark/build_and_test_spark.sh
@@ -17,8 +17,9 @@
 
 #!/bin/bash
 
-export SPARK_VERSIONS_FOR_SCALA_212="3.0.1 3.0.2 3.0.3 3.1.1 3.1.2 3.1.3 3.2.0 
3.2.1 3.2.2 3.2.3 3.2.4"
-export LATEST_SPARK_VERSIONS_FOR_SCALA_212="3.0.3 3.1.3 3.2.4"
+# we skip 3.0.0 because it has a bug which fails our Aggregator tests
+export SPARK_VERSIONS_FOR_SCALA_212="3.0.1 3.0.2 3.0.3 3.1.1 3.1.2 3.1.3 3.2.0 
3.2.1 3.2.2 3.2.3 3.2.4 3.3.0 3.3.1 3.3.2 3.3.3 3.3.4 3.4.0 3.4.1 3.4.2"
+export LATEST_SPARK_VERSIONS_FOR_SCALA_212="3.0.3 3.1.3 3.2.4 3.3.4 3.4.2"
 
 
 STARTTIME=$(date +%s)
diff --git a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py 
b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
index 00bcd90..4f88cfd 100644
--- a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
+++ b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py
@@ -20,6 +20,7 @@ import os
 from py4j.java_gateway import JavaGateway, GatewayParameters
 from pyspark.conf import SparkConf
 from pyspark.context import SparkContext
+from pyspark.sql import SQLContext
 from pyspark.sql import SparkSession
 
 # use jvm gateway to create a java class instance by full-qualified class name
@@ -63,10 +64,9 @@ class Context(object):
         conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf)
         self.sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf)
 
-        # Spark 2
+        # Spark 3
         self.sparkSession = SparkSession(self.sc, jSparkSession)
-        self.sqlContext = self.sparkSession._wrapped
-
+        self.sqlContext = SQLContext(sparkContext=self.sc, 
sparkSession=self.sparkSession)
 ctx = None
 
 
diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala 
b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
index 2b347c3..4d571c7 100644
--- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
+++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala
@@ -191,7 +191,7 @@ object SparkDFUtils {
     * the same functionality as {@link #dedupWithOrder} but implemented using 
UDAF to utilize
     * map side aggregation.
     * this function should be used in cases when you expect a large number of 
rows to get combined,
-    * as they share the same group column.
+    * as they share the same group column, or if you have some groups with 
extreme skew.
     *
     * @param df DataFrame to operate on
     * @param groupCol column to group by the records

Reply via email to