(spark) branch master updated: [SPARK-48016][SQL][TESTS][FOLLOWUP] Update Java 21 golden file
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 65cf5b18648a [SPARK-48016][SQL][TESTS][FOLLOWUP] Update Java 21 golden file 65cf5b18648a is described below commit 65cf5b18648a81fc9b0787d03f23f7465c20f3ec Author: Dongjoon Hyun AuthorDate: Tue Apr 30 22:42:02 2024 -0700 [SPARK-48016][SQL][TESTS][FOLLOWUP] Update Java 21 golden file ### What changes were proposed in this pull request? This is a follow-up of SPARK-48016 to update the missed Java 21 golden file. - #46286 ### Why are the changes needed? To recover Java 21 CIs: - https://github.com/apache/spark/actions/workflows/build_java21.yml - https://github.com/apache/spark/actions/workflows/build_maven_java21.yml - https://github.com/apache/spark/actions/workflows/build_maven_java21_macos14.yml ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual tests. I regenerated all in Java 21 and this was the only one affected. ``` $ SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly org.apache.spark.sql.SQLQueryTestSuite" ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46313 from dongjoon-hyun/SPARK-48016. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../results/try_arithmetic.sql.out.java21 | 64 ++ 1 file changed, 64 insertions(+) diff --git a/sql/core/src/test/resources/sql-tests/results/try_arithmetic.sql.out.java21 b/sql/core/src/test/resources/sql-tests/results/try_arithmetic.sql.out.java21 index dcdb9d0dcb19..002a0dfcf37e 100644 --- a/sql/core/src/test/resources/sql-tests/results/try_arithmetic.sql.out.java21 +++ b/sql/core/src/test/resources/sql-tests/results/try_arithmetic.sql.out.java21 @@ -15,6 +15,22 @@ struct NULL +-- !query +SELECT try_add(2147483647, decimal(1)) +-- !query schema +struct +-- !query output +2147483648 + + +-- !query +SELECT try_add(2147483647, "1") +-- !query schema +struct +-- !query output +2.147483648E9 + + -- !query SELECT try_add(-2147483648, -1) -- !query schema @@ -249,6 +265,22 @@ struct NULL +-- !query +SELECT try_divide(1, decimal(0)) +-- !query schema +struct +-- !query output +NULL + + +-- !query +SELECT try_divide(1, "0") +-- !query schema +struct +-- !query output +NULL + + -- !query SELECT try_divide(interval 2 year, 2) -- !query schema @@ -313,6 +345,22 @@ struct NULL +-- !query +SELECT try_subtract(2147483647, decimal(-1)) +-- !query schema +struct +-- !query output +2147483648 + + +-- !query +SELECT try_subtract(2147483647, "-1") +-- !query schema +struct +-- !query output +2.147483648E9 + + -- !query SELECT try_subtract(-2147483648, 1) -- !query schema @@ -409,6 +457,22 @@ struct NULL +-- !query +SELECT try_multiply(2147483647, decimal(-2)) +-- !query schema +struct +-- !query output +-4294967294 + + +-- !query +SELECT try_multiply(2147483647, "-2") +-- !query schema +struct +-- !query output +-4.294967294E9 + + -- !query SELECT try_multiply(-2147483648, 2) -- !query schema - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48047][SQL] Reduce memory pressure of empty TreeNode tags
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 02206cd66dbf [SPARK-48047][SQL] Reduce memory pressure of empty TreeNode tags 02206cd66dbf is described below commit 02206cd66dbfc8de602a685b032f1805bcf8e36f Author: Nick Young AuthorDate: Tue Apr 30 22:07:20 2024 -0700 [SPARK-48047][SQL] Reduce memory pressure of empty TreeNode tags ### What changes were proposed in this pull request? - Changed the `tags` variable of the `TreeNode` class to initialize lazily. This will reduce unnecessary driver memory pressure. ### Why are the changes needed? - Plans with large expression or operator trees are known to cause driver memory pressure; this is one step in alleviating that issue. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UT covers behavior. Outwards facing behavior does not change. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46285 from n-young-db/treenode-tags. Authored-by: Nick Young Signed-off-by: Dongjoon Hyun --- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 24 ++ 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 94e893d468b3..dd39f3182bfb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -78,8 +78,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] /** * A mutable map for holding auxiliary information of this tree node. It will be carried over * when this node is copied via `makeCopy`, or transformed via `transformUp`/`transformDown`. + * We lazily evaluate the `tags` since the default size of a `mutable.Map` is nonzero. This + * will reduce unnecessary memory pressure. */ - private val tags: mutable.Map[TreeNodeTag[_], Any] = mutable.Map.empty + private[this] var _tags: mutable.Map[TreeNodeTag[_], Any] = null + private def tags: mutable.Map[TreeNodeTag[_], Any] = { +if (_tags eq null) { + _tags = mutable.Map.empty +} +_tags + } /** * Default tree pattern [[BitSet] for a [[TreeNode]]. @@ -147,11 +155,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] ineffectiveRules.get(ruleId.id) } + def isTagsEmpty: Boolean = (_tags eq null) || _tags.isEmpty + def copyTagsFrom(other: BaseType): Unit = { // SPARK-32753: it only makes sense to copy tags to a new node // but it's too expensive to detect other cases likes node removal // so we make a compromise here to copy tags to node with no tags -if (tags.isEmpty) { +if (isTagsEmpty && !other.isTagsEmpty) { tags ++= other.tags } } @@ -161,11 +171,17 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] } def getTagValue[T](tag: TreeNodeTag[T]): Option[T] = { -tags.get(tag).map(_.asInstanceOf[T]) +if (isTagsEmpty) { + None +} else { + tags.get(tag).map(_.asInstanceOf[T]) +} } def unsetTagValue[T](tag: TreeNodeTag[T]): Unit = { -tags -= tag +if (!isTagsEmpty) { + tags -= tag +} } /** - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (c71d02ab7c80 -> 991763c2cdf8)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from c71d02ab7c80 [SPARK-48028][TESTS] Regenerate benchmark results after turning ANSI on add 991763c2cdf8 [SPARK-46894][PYTHON] Move PySpark error conditions into standalone JSON file No new revisions were added by this update. Summary of changes: python/MANIFEST.in |9 +- python/docs/source/getting_started/install.rst |4 +- .../{error_classes.py => error-conditions.json}| 28 - python/pyspark/errors/error_classes.py | 1165 +--- python/pyspark/errors/exceptions/__init__.py | 40 +- python/pyspark/errors_doc_gen.py |2 +- 6 files changed, 29 insertions(+), 1219 deletions(-) copy python/pyspark/errors/{error_classes.py => error-conditions.json} (97%) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48063][CORE] Enable `spark.stage.ignoreDecommissionFetchFailure` by default
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new f3cc8f930383 [SPARK-48063][CORE] Enable `spark.stage.ignoreDecommissionFetchFailure` by default f3cc8f930383 is described below commit f3cc8f930383659b9f99e56b38de4b97d588e20b Author: Dongjoon Hyun AuthorDate: Tue Apr 30 15:19:00 2024 -0700 [SPARK-48063][CORE] Enable `spark.stage.ignoreDecommissionFetchFailure` by default ### What changes were proposed in this pull request? This PR aims to **enable `spark.stage.ignoreDecommissionFetchFailure` by default** while keeping `spark.scheduler.maxRetainedRemovedDecommissionExecutors=0` without any change for Apache Spark 4.0.0 in order to help a user use this feature more easily by setting only one configuration, `spark.scheduler.maxRetainedRemovedDecommissionExecutors`. ### Why are the changes needed? This feature was added at Apache Spark 3.4.0 via SPARK-40481 and SPARK-40979 and has been used for two years to support executor decommissioning features in the production. - #37924 - #38441 ### Does this PR introduce _any_ user-facing change? No because `spark.scheduler.maxRetainedRemovedDecommissionExecutors` is still `0`. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46308 from dongjoon-hyun/SPARK-48063. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/internal/config/package.scala | 2 +- docs/configuration.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b2cbb6f6deb6..2e207422ae06 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2403,7 +2403,7 @@ package object config { s"count ${STAGE_MAX_CONSECUTIVE_ATTEMPTS.key}") .version("3.4.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) private[spark] val SCHEDULER_MAX_RETAINED_REMOVED_EXECUTORS = ConfigBuilder("spark.scheduler.maxRetainedRemovedDecommissionExecutors") diff --git a/docs/configuration.md b/docs/configuration.md index d5e2a569fdea..2e612ffd9ab9 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -3072,7 +3072,7 @@ Apart from these, the following properties are also available, and may be useful spark.stage.ignoreDecommissionFetchFailure - false + true Whether ignore stage fetch failure caused by executor decommission when count spark.stage.maxConsecutiveAttempts - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48060][SS][TESTS] Fix `StreamingQueryHashPartitionVerifySuite` to update golden files correctly
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new faab553cac70 [SPARK-48060][SS][TESTS] Fix `StreamingQueryHashPartitionVerifySuite` to update golden files correctly faab553cac70 is described below commit faab553cac70eefeec286b1823b70ad62bed87f8 Author: Dongjoon Hyun AuthorDate: Tue Apr 30 12:50:07 2024 -0700 [SPARK-48060][SS][TESTS] Fix `StreamingQueryHashPartitionVerifySuite` to update golden files correctly ### What changes were proposed in this pull request? This PR aims to fix `StreamingQueryHashPartitionVerifySuite` to update golden files correctly. - The documentation is added. - Newly generated files are updated. ### Why are the changes needed? Previously, `SPARK_GENERATE_GOLDEN_FILES` doesn't work as expected because it updates the files under `target` directory. We need to update `src/test` files. **BEFORE** ``` $ SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly *StreamingQueryHashPartitionVerifySuite" $ git status On branch master Your branch is up to date with 'apache/master'. nothing to commit, working tree clean ``` **AFTER** ``` $ SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly *StreamingQueryHashPartitionVerifySuite" \ -Dspark.sql.test.randomDataGenerator.maxStrLen=100 \ -Dspark.sql.test.randomDataGenerator.maxArraySize=4 $ git status On branch SPARK-48060 Your branch is up to date with 'dongjoon/SPARK-48060'. Changes not staged for commit: (use "git add ..." to update what will be committed) (use "git restore ..." to discard changes in working directory) modified: sql/core/src/test/resources/structured-streaming/partition-tests/randomSchemas modified: sql/core/src/test/resources/structured-streaming/partition-tests/rowsAndPartIds no changes added to commit (use "git add" and/or "git commit -a") ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. I regenerate the data like the following. ``` $ SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly *StreamingQueryHashPartitionVerifySuite" \ -Dspark.sql.test.randomDataGenerator.maxStrLen=100 \ -Dspark.sql.test.randomDataGenerator.maxArraySize=4 ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46304 from dongjoon-hyun/SPARK-48060. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .../partition-tests/randomSchemas | 2 +- .../partition-tests/rowsAndPartIds | Bin 4862115 -> 13341426 bytes .../StreamingQueryHashPartitionVerifySuite.scala | 22 +++-- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/sql/core/src/test/resources/structured-streaming/partition-tests/randomSchemas b/sql/core/src/test/resources/structured-streaming/partition-tests/randomSchemas index 8d6ff942610c..f6eadd776cc6 100644 --- a/sql/core/src/test/resources/structured-streaming/partition-tests/randomSchemas +++ b/sql/core/src/test/resources/structured-streaming/partition-tests/randomSchemas @@ -1 +1 @@ -col_0 STRUCT NOT NULL, col_3: FLOAT NOT NULL, col_4: INT NOT NULL>,col_1 STRUCT, col_3: ARRAY NOT NULL, col_4: ARRAY, col_5: TIMESTAMP NOT NULL, col_6: STRUCT, col_1: BIGINT NOT NULL> NOT NULL, col_7: ARRAY NOT NULL, col_8: ARRAY, col_9: BIGINT NOT NULL> NOT NULL,col_2 BIGINT NOT NULL,col_3 STRUCT,col_1 STRUCT NOT NULL,col_2 STRING NOT NULL,col_3 STRUCT, col_2: ARRAY NOT NULL> NOT NULL,col_4 BINARY NOT NULL,col_5 ARRAY NOT NULL,col_6 ARRAY,col_7 DOUBLE NOT NULL,col_8 ARRAY NOT NULL,col_9 ARRAY,col_10 FLOAT NOT NULL,col_11 STRUCT NOT NULL>, col_1: STRUCT NOT NULL, col_1: INT, col_2: STRUCT
(spark) branch master updated: [SPARK-48057][PYTHON][CONNECT][TESTS] Enable `GroupedApplyInPandasTests.test_grouped_with_empty_partition`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new dab20b31388b [SPARK-48057][PYTHON][CONNECT][TESTS] Enable `GroupedApplyInPandasTests.test_grouped_with_empty_partition` dab20b31388b is described below commit dab20b31388ba7bcd2ab4d4424cbbd072bf84c30 Author: Ruifeng Zheng AuthorDate: Tue Apr 30 12:19:18 2024 -0700 [SPARK-48057][PYTHON][CONNECT][TESTS] Enable `GroupedApplyInPandasTests.test_grouped_with_empty_partition` ### What changes were proposed in this pull request? Enable `GroupedApplyInPandasTests. test_grouped_with_empty_partition` ### Why are the changes needed? test coverage ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #46299 from zhengruifeng/fix_test_grouped_with_empty_partition. Authored-by: Ruifeng Zheng Signed-off-by: Dongjoon Hyun --- python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py | 4 python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py b/python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py index 1cc4ce012623..8a1da440c799 100644 --- a/python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py @@ -38,10 +38,6 @@ class GroupedApplyInPandasTests(GroupedApplyInPandasTestsMixin, ReusedConnectTes def test_apply_in_pandas_returning_incompatible_type(self): super().test_apply_in_pandas_returning_incompatible_type() -@unittest.skip("Spark Connect doesn't support RDD but the test depends on it.") -def test_grouped_with_empty_partition(self): -super().test_grouped_with_empty_partition() - if __name__ == "__main__": from pyspark.sql.tests.connect.test_parity_pandas_grouped_map import * # noqa: F401 diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py index f43dafc0a4a1..1e86e12eb74f 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py @@ -680,13 +680,13 @@ class GroupedApplyInPandasTestsMixin: data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)] expected = [Row(id=1, x=5), Row(id=1, x=5), Row(id=2, x=4)] num_parts = len(data) + 1 -df = self.spark.createDataFrame(self.sc.parallelize(data, numSlices=num_parts)) +df = self.spark.createDataFrame(data).repartition(num_parts) f = pandas_udf( lambda pdf: pdf.assign(x=pdf["x"].sum()), "id long, x int", PandasUDFType.GROUPED_MAP ) -result = df.groupBy("id").apply(f).collect() +result = df.groupBy("id").apply(f).sort("id").collect() self.assertEqual(result, expected) def test_grouped_over_window(self): - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (0329479acb67 -> 9caa6f7f8b8e)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 0329479acb67 [SPARK-47359][SQL] Support TRANSLATE function to work with collated strings add 9caa6f7f8b8e [SPARK-48061][SQL][TESTS] Parameterize max limits of `spark.sql.test.randomDataGenerator` No new revisions were added by this update. Summary of changes: .../test/scala/org/apache/spark/sql/RandomDataGenerator.scala| 9 ++--- 1 file changed, 6 insertions(+), 3 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47359][SQL] Support TRANSLATE function to work with collated strings
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 0329479acb67 [SPARK-47359][SQL] Support TRANSLATE function to work with collated strings 0329479acb67 is described below commit 0329479acb6758c4d3e53d514ea832a181d31065 Author: Milan Dankovic AuthorDate: Tue Apr 30 22:28:56 2024 +0800 [SPARK-47359][SQL] Support TRANSLATE function to work with collated strings ### What changes were proposed in this pull request? Extend built-in string functions to support non-binary, non-lowercase collation for: `translate` ### Why are the changes needed? Update collation support for built-in string functions in Spark. ### Does this PR introduce _any_ user-facing change? Yes, users should now be able to use COLLATE within arguments for built-in string function TRANSLATE in Spark SQL queries, using non-binary collations such as UNICODE_CI. ### How was this patch tested? Unit tests for queries using StringTranslate (CollationStringExpressionsSuite.scala). ### Was this patch authored or co-authored using generative AI tooling? No Closes #45820 from miland-db/miland-db/string-translate. Authored-by: Milan Dankovic Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/util/CollationSupport.java | 85 ++ .../sql/catalyst/analysis/CollationTypeCasts.scala | 3 +- .../catalyst/expressions/stringExpressions.scala | 28 --- .../sql/CollationStringExpressionsSuite.scala | 74 +++ 4 files changed, 180 insertions(+), 10 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java index 9778ca31209e..b77671cee90b 100644 --- a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java +++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java @@ -25,7 +25,9 @@ import org.apache.spark.unsafe.UTF8StringBuilder; import org.apache.spark.unsafe.types.UTF8String; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.regex.Pattern; import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET; @@ -483,6 +485,56 @@ public final class CollationSupport { } } + public static class StringTranslate { +public static UTF8String exec(final UTF8String source, Map dict, +final int collationId) { + CollationFactory.Collation collation = CollationFactory.fetchCollation(collationId); + if (collation.supportsBinaryEquality) { +return execBinary(source, dict); + } else if (collation.supportsLowercaseEquality) { +return execLowercase(source, dict); + } else { +return execICU(source, dict, collationId); + } +} +public static String genCode(final String source, final String dict, final int collationId) { + CollationFactory.Collation collation = CollationFactory.fetchCollation(collationId); + String expr = "CollationSupport.EndsWith.exec"; + if (collation.supportsBinaryEquality) { +return String.format(expr + "Binary(%s, %s)", source, dict); + } else if (collation.supportsLowercaseEquality) { +return String.format(expr + "Lowercase(%s, %s)", source, dict); + } else { +return String.format(expr + "ICU(%s, %s, %d)", source, dict, collationId); + } +} +public static UTF8String execBinary(final UTF8String source, Map dict) { + return source.translate(dict); +} +public static UTF8String execLowercase(final UTF8String source, Map dict) { + String srcStr = source.toString(); + StringBuilder sb = new StringBuilder(); + int charCount = 0; + for (int k = 0; k < srcStr.length(); k += charCount) { +int codePoint = srcStr.codePointAt(k); +charCount = Character.charCount(codePoint); +String subStr = srcStr.substring(k, k + charCount); +String translated = dict.get(subStr.toLowerCase()); +if (null == translated) { + sb.append(subStr); +} else if (!"\0".equals(translated)) { + sb.append(translated); +} + } + return UTF8String.fromString(sb.toString()); +} +public static UTF8String execICU(final UTF8String source, Map dict, +final int collationId) { + return source.translate(CollationAwareUTF8String.getCollationAwareDict( +source, dict, collationId)); +} + } + // TODO: Add more collation-aware string expressions. /** @@ -808,6 +860,39 @@ public final class CollationSupport { } } +private static Map getCollationAwareDict(UTF8String string, +
(spark) branch master updated: [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c8c249204178 [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source c8c249204178 is described below commit c8c2492041782b9be7f10647191dcd0d5f6a5a8a Author: Chaoqin Li AuthorDate: Tue Apr 30 22:08:32 2024 +0900 [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source ### What changes were proposed in this pull request? SimpleDataSourceStreamReader is a simplified version of the DataSourceStreamReader interface. There are 3 functions that needs to be defined 1. Read data and return the end offset. _def read(self, start: Offset) -> (Iterator[Tuple], Offset)_ 2. Read data between start and end offset, this is required for exactly once read. _def readBetweenOffset(self, start: Offset, end: Offset) -> Iterator[Tuple]_ 3. initial start offset of the streaming query. _def initialOffset() -> dict_ The implementation wrap the SimpleDataSourceStreamReader instance in a DataSourceStreamReader that prefetch and cache data in latestOffset. The record prefetched in python process will be sent to JVM as arrow record batches in planInputPartitions() and cached by block manager and read by partition reader from executor later.. ### Why are the changes needed? Compared to DataSourceStreamReader interface, the simplified interface has some advantages. It doesn’t require developers to reason about data partitioning. It doesn’t require getting the latest offset before reading data. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add unit test and integration test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45977 from chaoqin-li1123/simple_reader_impl. Lead-authored-by: Chaoqin Li Co-authored-by: chaoqin-li1123 <55518381+chaoqin-li1...@users.noreply.github.com> Signed-off-by: Jungtaek Lim --- .../scala/org/apache/spark/storage/BlockId.scala | 8 + python/pyspark/sql/datasource.py | 129 - python/pyspark/sql/datasource_internal.py | 146 ++ .../streaming/python_streaming_source_runner.py| 58 +++- python/pyspark/sql/worker/plan_data_source_read.py | 142 +- .../v2/python/PythonMicroBatchStream.scala | 61 +++- .../datasources/v2/python/PythonScan.scala | 3 + .../PythonStreamingPartitionReaderFactory.scala| 89 ++ .../python/PythonStreamingSourceRunner.scala | 57 +++- .../python/PythonStreamingDataSourceSuite.scala| 307 - 10 files changed, 911 insertions(+), 89 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala b/core/src/main/scala/org/apache/spark/storage/BlockId.scala index 585d9a886b47..6eb015d56b2c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala @@ -170,6 +170,11 @@ case class StreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { override def name: String = "input-" + streamId + "-" + uniqueId } +@DeveloperApi +case class PythonStreamBlockId(streamId: Int, uniqueId: Long) extends BlockId { + override def name: String = "python-stream-" + streamId + "-" + uniqueId +} + /** Id associated with temporary local data managed as blocks. Not serializable. */ private[spark] case class TempLocalBlockId(id: UUID) extends BlockId { override def name: String = "temp_local_" + id @@ -213,6 +218,7 @@ object BlockId { val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r val TASKRESULT = "taskresult_([0-9]+)".r val STREAM = "input-([0-9]+)-([0-9]+)".r + val PYTHON_STREAM = "python-stream-([0-9]+)-([0-9]+)".r val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r val TEST = "test_(.*)".r @@ -250,6 +256,8 @@ object BlockId { TaskResultBlockId(taskId.toLong) case STREAM(streamId, uniqueId) => StreamBlockId(streamId.toInt, uniqueId.toLong) +case PYTHON_STREAM(streamId, uniqueId) => + PythonStreamBlockId(streamId.toInt, uniqueId.toLong) case TEMP_LOCAL(uuid) => TempLocalBlockId(UUID.fromString(uuid)) case TEMP_SHUFFLE(uuid) => diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py index c08b5b7af77f..6cac7e35ff41 100644 --- a/python/pyspark/sql/datasource.py +++ b/python/pyspark/sql/datasource.py @@ -183,11 +183,36 @@ class DataSource(ABC): message_parameters={"feature": "streamWriter"}, ) +def simpleStreamReader(self, schema: StructType)
(spark) branch master updated: [SPARK-48003][SQL] Add collation support for hll sketch aggregate
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3aea6c258bf3 [SPARK-48003][SQL] Add collation support for hll sketch aggregate 3aea6c258bf3 is described below commit 3aea6c258bf3541d7f53cd3914244f817ed36ff6 Author: Uros Bojanic <157381213+uros...@users.noreply.github.com> AuthorDate: Tue Apr 30 20:58:43 2024 +0800 [SPARK-48003][SQL] Add collation support for hll sketch aggregate ### What changes were proposed in this pull request? Introduce collation awareness for hll sketch aggregate. ### Why are the changes needed? Add collation support for hyperloglog expressions in Spark. ### Does this PR introduce _any_ user-facing change? Yes, users should now be able to use collated strings within arguments for hyperloglog function: hll_sketch_agg. ### How was this patch tested? E2e sql tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46241 from uros-db/hll-agg. Authored-by: Uros Bojanic <157381213+uros...@users.noreply.github.com> Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/util/CollationFactory.java | 14 ++ .../aggregate/datasketchesAggregates.scala| 8 ++-- .../scala/org/apache/spark/sql/CollationSuite.scala | 19 +++ 3 files changed, 39 insertions(+), 2 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java index 93691e28c692..863445b6 100644 --- a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java +++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java @@ -25,6 +25,7 @@ import java.util.function.ToLongFunction; import com.ibm.icu.text.RuleBasedCollator; import com.ibm.icu.text.StringSearch; import com.ibm.icu.util.ULocale; +import com.ibm.icu.text.CollationKey; import com.ibm.icu.text.Collator; import org.apache.spark.SparkException; @@ -270,4 +271,17 @@ public final class CollationFactory { int collationId = collationNameToId(collationName); return collationTable[collationId]; } + + public static UTF8String getCollationKey(UTF8String input, int collationId) { +Collation collation = fetchCollation(collationId); +if (collation.supportsBinaryEquality) { + return input; +} else if (collation.supportsLowercaseEquality) { + return input.toLowerCase(); +} else { + CollationKey collationKey = collation.collator.getCollationKey(input.toString()); + return UTF8String.fromBytes(collationKey.toByteArray()); +} + } + } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala index 02925f3625d2..2102428131f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala @@ -25,7 +25,9 @@ import org.apache.spark.SparkUnsupportedOperationException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionDescription, Literal} import org.apache.spark.sql.catalyst.trees.BinaryLike +import org.apache.spark.sql.catalyst.util.CollationFactory import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.internal.types.StringTypeAnyCollation import org.apache.spark.sql.types.{AbstractDataType, BinaryType, BooleanType, DataType, IntegerType, LongType, StringType, TypeCollection} import org.apache.spark.unsafe.types.UTF8String @@ -103,7 +105,7 @@ case class HllSketchAgg( override def prettyName: String = "hll_sketch_agg" override def inputTypes: Seq[AbstractDataType] = -Seq(TypeCollection(IntegerType, LongType, StringType, BinaryType), IntegerType) +Seq(TypeCollection(IntegerType, LongType, StringTypeAnyCollation, BinaryType), IntegerType) override def dataType: DataType = BinaryType @@ -137,7 +139,9 @@ case class HllSketchAgg( // TODO: implement support for decimal/datetime/interval types case IntegerType => sketch.update(v.asInstanceOf[Int]) case LongType => sketch.update(v.asInstanceOf[Long]) -case StringType => sketch.update(v.asInstanceOf[UTF8String].toString) +case st: StringType => + val cKey = CollationFactory.getCollationKey(v.asInstanceOf[UTF8String], st.collationId) + sketch.update(cKey.toString)
(spark) branch master updated (332570f42203 -> 94763438943e)
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 332570f42203 [SPARK-48052][PYTHON][CONNECT] Recover `pyspark-connect` CI by parent classes add 94763438943e [SPARK-48050][SS] Log logical plan at query start No new revisions were added by this update. Summary of changes: .../org/apache/spark/sql/execution/streaming/StreamExecution.scala| 4 1 file changed, 4 insertions(+) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated (12a507464f10 -> 332570f42203)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 12a507464f10 [SPARK-47566][SQL] Support SubstringIndex function to work with collated strings add 332570f42203 [SPARK-48052][PYTHON][CONNECT] Recover `pyspark-connect` CI by parent classes No new revisions were added by this update. Summary of changes: python/pyspark/ml/functions.py | 4 +- python/pyspark/sql/connect/dataframe.py | 48 + python/pyspark/sql/dataframe.py | 115 3 files changed, 97 insertions(+), 70 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47566][SQL] Support SubstringIndex function to work with collated strings
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 12a507464f10 [SPARK-47566][SQL] Support SubstringIndex function to work with collated strings 12a507464f10 is described below commit 12a507464f106d299511d16c2a436cbc0257bc8a Author: Milan Dankovic AuthorDate: Tue Apr 30 17:19:01 2024 +0800 [SPARK-47566][SQL] Support SubstringIndex function to work with collated strings ### What changes were proposed in this pull request? Extend built-in string functions to support non-binary, non-lowercase collation for: substring_index. ### Why are the changes needed? Update collation support for built-in string functions in Spark. ### Does this PR introduce _any_ user-facing change? Yes, users should now be able to use COLLATE within arguments for built-in string function SUBSTRING_INDEX in Spark SQL queries, using non-binary collations such as UNICODE_CI. ### How was this patch tested? Unit tests for queries using SubstringIndex (`CollationStringExpressionsSuite.scala`). ### Was this patch authored or co-authored using generative AI tooling? No ### To consider: There is no check for collation match between string and delimiter, it will be introduced with Implicit Casting. We can remove the original `public UTF8String subStringIndex(UTF8String delim, int count)` method, and get the existing behavior using `subStringIndex(delim, count, 0)`. Closes #45725 from miland-db/miland-db/substringIndex-stringLocate. Authored-by: Milan Dankovic Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/util/CollationSupport.java | 169 + .../org/apache/spark/unsafe/types/UTF8String.java | 28 +++- .../spark/unsafe/types/CollationSupportSuite.java | 83 ++ .../sql/catalyst/analysis/CollationTypeCasts.scala | 5 + .../catalyst/expressions/stringExpressions.scala | 15 +- .../sql/CollationStringExpressionsSuite.scala | 31 6 files changed, 323 insertions(+), 8 deletions(-) diff --git a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java index 0c81b99de916..9778ca31209e 100644 --- a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java +++ b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java @@ -28,6 +28,9 @@ import java.util.ArrayList; import java.util.List; import java.util.regex.Pattern; +import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET; +import static org.apache.spark.unsafe.Platform.copyMemory; + /** * Static entry point for collation-aware expressions (StringExpressions, RegexpExpressions, and * other expressions that require custom collation support), as well as private utility methods for @@ -441,6 +444,45 @@ public final class CollationSupport { } } + public static class SubstringIndex { +public static UTF8String exec(final UTF8String string, final UTF8String delimiter, +final int count, final int collationId) { + CollationFactory.Collation collation = CollationFactory.fetchCollation(collationId); + if (collation.supportsBinaryEquality) { +return execBinary(string, delimiter, count); + } else if (collation.supportsLowercaseEquality) { +return execLowercase(string, delimiter, count); + } else { +return execICU(string, delimiter, count, collationId); + } +} +public static String genCode(final String string, final String delimiter, +final int count, final int collationId) { + CollationFactory.Collation collation = CollationFactory.fetchCollation(collationId); + String expr = "CollationSupport.SubstringIndex.exec"; + if (collation.supportsBinaryEquality) { +return String.format(expr + "Binary(%s, %s, %d)", string, delimiter, count); + } else if (collation.supportsLowercaseEquality) { +return String.format(expr + "Lowercase(%s, %s, %d)", string, delimiter, count); + } else { +return String.format(expr + "ICU(%s, %s, %d, %d)", string, delimiter, count, collationId); + } +} +public static UTF8String execBinary(final UTF8String string, final UTF8String delimiter, +final int count) { + return string.subStringIndex(delimiter, count); +} +public static UTF8String execLowercase(final UTF8String string, final UTF8String delimiter, +final int count) { + return CollationAwareUTF8String.lowercaseSubStringIndex(string, delimiter, count); +} +public static UTF8String execICU(final UTF8String string, final UTF8String delimiter, +final int count, final int
(spark) branch master updated: [SPARK-46122][SQL] Set `spark.sql.legacy.createHiveTableByDefault` to `false` by default
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 9e8c4aa3f43a [SPARK-46122][SQL] Set `spark.sql.legacy.createHiveTableByDefault` to `false` by default 9e8c4aa3f43a is described below commit 9e8c4aa3f43a3d99bff56cca319db623abc473ee Author: Dongjoon Hyun AuthorDate: Tue Apr 30 01:44:37 2024 -0700 [SPARK-46122][SQL] Set `spark.sql.legacy.createHiveTableByDefault` to `false` by default ### What changes were proposed in this pull request? This PR aims to switch `spark.sql.legacy.createHiveTableByDefault` to `false` by default in order to move away from this legacy behavior from `Apache Spark 4.0.0` while the legacy functionality will be preserved during Apache Spark 4.x period by setting `spark.sql.legacy.createHiveTableByDefault=true`. ### Why are the changes needed? Historically, this behavior change was merged at `Apache Spark 3.0.0` activity in SPARK-30098 and reverted officially during the `3.0.0 RC` period. - 2019-12-06: #26736 (58be82a) - 2019-12-06: https://lists.apache.org/thread/g90dz1og1zt4rr5h091rn1zqo50y759j - 2020-05-16: #28517 At `Apache Spark 3.1.0`, we had another discussion and defined it as `Legacy` behavior via a new configuration by reusing the JIRA ID, SPARK-30098. - 2020-12-01: https://lists.apache.org/thread/8c8k1jk61pzlcosz3mxo4rkj5l23r204 - 2020-12-03: #30554 Last year, this was proposed again twice and `Apache Spark 4.0.0` is a good time to make a decision for Apache Spark future direction. - SPARK-42603 on 2023-02-27 as an independent idea. - SPARK-46122 on 2023-11-27 as a part of Apache Spark 4.0.0 idea ### Does this PR introduce _any_ user-facing change? Yes, the migration document is updated. ### How was this patch tested? Pass the CIs with the adjusted test cases. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46207 from dongjoon-hyun/SPARK-46122. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- docs/sql-migration-guide.md | 1 + python/pyspark/sql/tests/test_readwriter.py | 5 ++--- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala| 2 +- .../apache/spark/sql/execution/command/PlanResolutionSuite.scala | 8 +++- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 1e0fdadde1e3..07562babc87d 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -25,6 +25,7 @@ license: | ## Upgrading from Spark SQL 3.5 to 4.0 - Since Spark 4.0, `spark.sql.ansi.enabled` is on by default. To restore the previous behavior, set `spark.sql.ansi.enabled` to `false` or `SPARK_ANSI_SQL_MODE` to `false`. +- Since Spark 4.0, `CREATE TABLE` syntax without `USING` and `STORED AS` will use the value of `spark.sql.sources.default` as the table provider instead of `Hive`. To restore the previous behavior, set `spark.sql.legacy.createHiveTableByDefault` to `true`. - Since Spark 4.0, the default behaviour when inserting elements in a map is changed to first normalize keys -0.0 to 0.0. The affected SQL functions are `create_map`, `map_from_arrays`, `map_from_entries`, and `map_concat`. To restore the previous behaviour, set `spark.sql.legacy.disableMapKeyNormalization` to `true`. - Since Spark 4.0, the default value of `spark.sql.maxSinglePartitionBytes` is changed from `Long.MaxValue` to `128m`. To restore the previous behavior, set `spark.sql.maxSinglePartitionBytes` to `9223372036854775807`(`Long.MaxValue`). - Since Spark 4.0, any read of SQL tables takes into consideration the SQL configs `spark.sql.files.ignoreCorruptFiles`/`spark.sql.files.ignoreMissingFiles` instead of the core config `spark.files.ignoreCorruptFiles`/`spark.files.ignoreMissingFiles`. diff --git a/python/pyspark/sql/tests/test_readwriter.py b/python/pyspark/sql/tests/test_readwriter.py index 5784d2c72973..e752856d0316 100644 --- a/python/pyspark/sql/tests/test_readwriter.py +++ b/python/pyspark/sql/tests/test_readwriter.py @@ -247,10 +247,9 @@ class ReadwriterV2TestsMixin: def test_create_without_provider(self): df = self.df -with self.assertRaisesRegex( -AnalysisException, "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT" -): +with self.table("test_table"): df.writeTo("test_table").create() +self.assertEqual(100, self.spark.sql("select * from test_table").count()) def test_table_overwrite(self): df = self.df diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
(spark) branch master updated: [SPARK-48055][PYTHON][CONNECT][TESTS] Enable `PandasUDFScalarParityTests.{test_vectorized_udf_empty_partition, test_vectorized_udf_struct_with_empty_partition}`
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new ed5aa56f1200 [SPARK-48055][PYTHON][CONNECT][TESTS] Enable `PandasUDFScalarParityTests.{test_vectorized_udf_empty_partition, test_vectorized_udf_struct_with_empty_partition}` ed5aa56f1200 is described below commit ed5aa56f1200bc1b0a455269eeb57863b2043fa1 Author: Ruifeng Zheng AuthorDate: Tue Apr 30 14:37:30 2024 +0800 [SPARK-48055][PYTHON][CONNECT][TESTS] Enable `PandasUDFScalarParityTests.{test_vectorized_udf_empty_partition, test_vectorized_udf_struct_with_empty_partition}` ### What changes were proposed in this pull request? enable two test in `PandasUDFScalarParityTests` ### Why are the changes needed? test coverage ### Does this PR introduce _any_ user-facing change? no, test only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #46296 from zhengruifeng/enable_test_vectorized_udf_empty_partition. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../sql/tests/connect/test_parity_pandas_udf_scalar.py| 11 --- python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py | 8 +--- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py b/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py index b42bfaf0f58d..590ab695ee07 100644 --- a/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py @@ -21,17 +21,6 @@ from pyspark.testing.connectutils import ReusedConnectTestCase class PandasUDFScalarParityTests(ScalarPandasUDFTestsMixin, ReusedConnectTestCase): -def test_nondeterministic_vectorized_udf_in_aggregate(self): -self.check_nondeterministic_analysis_exception() - -@unittest.skip("Spark Connect doesn't support RDD but the test depends on it.") -def test_vectorized_udf_empty_partition(self): -super().test_vectorized_udf_empty_partition() - -@unittest.skip("Spark Connect doesn't support RDD but the test depends on it.") -def test_vectorized_udf_struct_with_empty_partition(self): -super().test_vectorized_udf_struct_with_empty_partition() - # TODO(SPARK-43727): Parity returnType check in Spark Connect @unittest.skip("Fails in Spark Connect, should enable.") def test_vectorized_udf_wrong_return_type(self): diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py index 9edd585da6a0..38bc633cd1ed 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py @@ -764,15 +764,17 @@ class ScalarPandasUDFTestsMixin: self.assertEqual(df.collect(), res.collect()) def test_vectorized_udf_empty_partition(self): -df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)) +df = self.spark.createDataFrame([Row(id=1)]).repartition(2) for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]: f = pandas_udf(lambda x: x, LongType(), udf_type) res = df.select(f(col("id"))) self.assertEqual(df.collect(), res.collect()) def test_vectorized_udf_struct_with_empty_partition(self): -df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2)).withColumn( -"name", lit("John Doe") +df = ( +self.spark.createDataFrame([Row(id=1)]) +.repartition(2) +.withColumn("name", lit("John Doe")) ) @pandas_udf("first string, last string") - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.4 updated: [SPARK-47129][CONNECT][SQL][3.4] Make ResolveRelations cache connect plan properly
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.4 by this push: new 5f58fa7738eb [SPARK-47129][CONNECT][SQL][3.4] Make ResolveRelations cache connect plan properly 5f58fa7738eb is described below commit 5f58fa7738eb51d6319fdd6e95ced69f40241cb4 Author: Ruifeng Zheng AuthorDate: Tue Apr 30 14:32:52 2024 +0800 [SPARK-47129][CONNECT][SQL][3.4] Make ResolveRelations cache connect plan properly ### What changes were proposed in this pull request? Make `ResolveRelations` handle plan id properly cherry-pick bugfix https://github.com/apache/spark/pull/45214 to 3.4 ### Why are the changes needed? bug fix for Spark Connect, it won't affect classic Spark SQL before this PR: ``` from pyspark.sql import functions as sf spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1") spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2") df1 = spark.read.table("test_table_1") df2 = spark.read.table("test_table_2") df3 = spark.read.table("test_table_1") join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2) join2 = df3.join(join1, how="left", on=join1.index==df3.id) join2.schema ``` fails with ``` AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704 ``` That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect ``` === Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations === '[#12]Join LeftOuter, '`==`('index, 'id) '[#12]Join LeftOuter, '`==`('index, 'id) !:- '[#9]UnresolvedRelation [test_table_1], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 !+- '[#11]Project ['index, 'value_2] : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[#10]Join Inner, '`==`('id, 'index) +- '[#11]Project ['index, 'value_2] ! :- '[#7]UnresolvedRelation [test_table_1], [], false +- '[#10]Join Inner, '`==`('id, 'index) ! +- '[#8]UnresolvedRelation [test_table_2], [], false :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 ! : +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ! +- '[#8]SubqueryAlias spark_catalog.default.test_table_2 ! +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false Can not resolve 'id with plan 7 ``` `[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one ``` :- '[#9]SubqueryAlias spark_catalog.default.test_table_1 +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false ``` ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? added ut ### Was this patch authored or co-authored using generative AI tooling? ci Closes #46290 from zhengruifeng/connect_fix_read_join_34. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- python/pyspark/sql/tests/test_readwriter.py| 21 + .../spark/sql/catalyst/analysis/Analyzer.scala | 27 -- 2 files changed, 41 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/tests/test_readwriter.py b/python/pyspark/sql/tests/test_readwriter.py index f51b0ef06208..9113fb350f63 100644 --- a/python/pyspark/sql/tests/test_readwriter.py +++ b/python/pyspark/sql/tests/test_readwriter.py @@ -181,6 +181,27 @@ class ReadwriterTestsMixin: df.write.mode("overwrite").insertInto("test_table", False) self.assertEqual(6, self.spark.sql("select * from test_table").count()) +def test_cached_table(self): +with self.table("test_cached_table_1"): +self.spark.range(10).withColumn( +"value_1", +lit(1), +).write.saveAsTable("test_cached_table_1") + +with self.table("test_cached_table_2"): +self.spark.range(10).withColumnRenamed("id", "index").withColumn( +"value_2", lit(2) +).write.saveAsTable("test_cached_table_2") + +df1 = self.spark.read.table("test_cached_table_1") +