This is an automated email from the ASF dual-hosted git repository. uzadude pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafu.git
The following commit(s) were added to refs/heads/main by this push: new 601ab12 DATAFU-179 Support Spark 3.3.x and 3.4.x (#47) 601ab12 is described below commit 601ab124d547bdbf54165f0e598bb28dbf29ff50 Author: Eyal Allweil <e...@apache.org> AuthorDate: Mon Jan 6 09:16:27 2025 +0200 DATAFU-179 Support Spark 3.3.x and 3.4.x (#47) --- datafu-spark/README.md | 4 ++-- datafu-spark/build.gradle | 2 +- datafu-spark/build_and_test_spark.sh | 5 +++-- datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py | 6 +++--- datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala | 2 +- 5 files changed, 10 insertions(+), 9 deletions(-) diff --git a/datafu-spark/README.md b/datafu-spark/README.md index 152677b..d201463 100644 --- a/datafu-spark/README.md +++ b/datafu-spark/README.md @@ -11,7 +11,7 @@ This matrix represents versions of Spark that DataFu has been compiled and teste | 1.7.0 | 2.2.0 to 2.2.2, 2.3.0 to 2.3.2 and 2.4.0 to 2.4.3| | 1.8.0 | 2.2.3, 2.3.3, and 2.4.4 to 2.4.5| | 2.0.0 | 3.0.x - 3.1.x | -| 2.1.0 (not released yet) | 3.0.x - 3.2.x | +| 2.1.0 (not released yet) | 3.0.x - 3.4.x | # Examples @@ -25,7 +25,7 @@ Here are some examples of things you can do with it: * [Count distinct up to](https://github.com/apache/datafu/blob/main/datafu-spark/src/main/scala/datafu/spark/Aggregators.scala#L187) - an efficient implementation when you just want to verify that a certain minimum of distinct rows appear in a table -It has been tested on Spark releases from 3.0.0 to 3.1.3 using Scala 2.12. You can check if your Spark/Scala version combination has been tested by looking [here.](https://github.com/apache/datafu/blob/main/datafu-spark/build_and_test_spark.sh#L20) +It has been tested on Spark releases from 3.0.0 to 3.4.2 using Scala 2.12. You can check if your Spark/Scala version combination has been tested by looking [here.](https://github.com/apache/datafu/blob/main/datafu-spark/build_and_test_spark.sh#L20) ----------- diff --git a/datafu-spark/build.gradle b/datafu-spark/build.gradle index e7011dd..9cd6f90 100644 --- a/datafu-spark/build.gradle +++ b/datafu-spark/build.gradle @@ -72,7 +72,7 @@ dependencies { testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.1.2_" + sparkTestingBaseVersion } else if (sparkVersion > "3.2" && sparkVersion < "3.3") { testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.2.1_" + sparkTestingBaseVersion - } else if (sparkVersion > "3.3" && sparkVersion < "3.4") { + } else if (sparkVersion >= "3.3") { testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":3.3.0_" + sparkTestingBaseVersion } else { testCompile "com.holdenkarau:spark-testing-base_" + scalaVersion + ":" + sparkVersion + "_" + sparkTestingBaseVersion diff --git a/datafu-spark/build_and_test_spark.sh b/datafu-spark/build_and_test_spark.sh index 5add676..dbc86fc 100755 --- a/datafu-spark/build_and_test_spark.sh +++ b/datafu-spark/build_and_test_spark.sh @@ -17,8 +17,9 @@ #!/bin/bash -export SPARK_VERSIONS_FOR_SCALA_212="3.0.1 3.0.2 3.0.3 3.1.1 3.1.2 3.1.3 3.2.0 3.2.1 3.2.2 3.2.3 3.2.4" -export LATEST_SPARK_VERSIONS_FOR_SCALA_212="3.0.3 3.1.3 3.2.4" +# we skip 3.0.0 because it has a bug which fails our Aggregator tests +export SPARK_VERSIONS_FOR_SCALA_212="3.0.1 3.0.2 3.0.3 3.1.1 3.1.2 3.1.3 3.2.0 3.2.1 3.2.2 3.2.3 3.2.4 3.3.0 3.3.1 3.3.2 3.3.3 3.3.4 3.4.0 3.4.1 3.4.2" +export LATEST_SPARK_VERSIONS_FOR_SCALA_212="3.0.3 3.1.3 3.2.4 3.3.4 3.4.2" STARTTIME=$(date +%s) diff --git a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py index 00bcd90..4f88cfd 100644 --- a/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py +++ b/datafu-spark/src/main/resources/pyspark_utils/bridge_utils.py @@ -20,6 +20,7 @@ import os from py4j.java_gateway import JavaGateway, GatewayParameters from pyspark.conf import SparkConf from pyspark.context import SparkContext +from pyspark.sql import SQLContext from pyspark.sql import SparkSession # use jvm gateway to create a java class instance by full-qualified class name @@ -63,10 +64,9 @@ class Context(object): conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) self.sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf) - # Spark 2 + # Spark 3 self.sparkSession = SparkSession(self.sc, jSparkSession) - self.sqlContext = self.sparkSession._wrapped - + self.sqlContext = SQLContext(sparkContext=self.sc, sparkSession=self.sparkSession) ctx = None diff --git a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala index 2b347c3..4d571c7 100644 --- a/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala +++ b/datafu-spark/src/main/scala/datafu/spark/SparkDFUtils.scala @@ -191,7 +191,7 @@ object SparkDFUtils { * the same functionality as {@link #dedupWithOrder} but implemented using UDAF to utilize * map side aggregation. * this function should be used in cases when you expect a large number of rows to get combined, - * as they share the same group column. + * as they share the same group column, or if you have some groups with extreme skew. * * @param df DataFrame to operate on * @param groupCol column to group by the records