(spark) branch branch-3.5 updated: [SPARK-48138][CONNECT][TESTS] Disable a flaky `SparkSessionE2ESuite.interrupt tag` test
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 36da89deccc9 [SPARK-48138][CONNECT][TESTS] Disable a flaky `SparkSessionE2ESuite.interrupt tag` test 36da89deccc9 is described below commit 36da89deccc916a6f32d9bf6d6f2fd8e288da917 Author: Dongjoon Hyun AuthorDate: Mon May 6 13:45:54 2024 +0800 [SPARK-48138][CONNECT][TESTS] Disable a flaky `SparkSessionE2ESuite.interrupt tag` test ### What changes were proposed in this pull request? This PR aims to disable a flaky test, `SparkSessionE2ESuite.interrupt tag`, temporarily. To re-enable this, SPARK-48139 is created as a blocker issue for 4.0.0. ### Why are the changes needed? This test case was added at `Apache Spark 3.5.0` but has been unstable unfortunately until now. - #42009 We tried to stabilize this test case before `Apache Spark 4.0.0-preview`. - #45173 - #46374 However, it's still flaky. - https://github.com/apache/spark/actions/runs/8962353911/job/24611130573 (Master, 2024-05-05) - https://github.com/apache/spark/actions/runs/8948176536/job/24581022674 (Master, 2024-05-04) This PR aims to stablize CI first and to focus this flaky issue as a blocker level before going on `Spark Connect GA` in SPARK-48139 before Apache Spark 4.0.0. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46396 from dongjoon-hyun/SPARK-48138. Authored-by: Dongjoon Hyun Signed-off-by: yangjie01 (cherry picked from commit 8294c5962febe53eebdff79f65f5f293d93a1997) Signed-off-by: Dongjoon Hyun --- .../jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala index c76dc724828e..e9c2f0c45750 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala @@ -108,7 +108,8 @@ class SparkSessionE2ESuite extends RemoteSparkSession { assert(interrupted.length == 2, s"Interrupted operations: $interrupted.") } - test("interrupt tag") { + // TODO(SPARK-48139): Re-enable `SparkSessionE2ESuite.interrupt tag` + ignore("interrupt tag") { val session = spark import session.implicits._ - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.5 updated: [SPARK-48037][CORE][3.5] Fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 58b71307795b [SPARK-48037][CORE][3.5] Fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data 58b71307795b is described below commit 58b71307795b6060be97431e0c5c8ab95205ea79 Author: sychen AuthorDate: Tue May 7 22:39:02 2024 -0700 [SPARK-48037][CORE][3.5] Fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data ### What changes were proposed in this pull request? This PR aims to fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data. ### Why are the changes needed? When the shuffle writer is SortShuffleWriter, it does not use SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain runtime statistics and the rowCount obtained is 0. Some optimization rules rely on rowCount statistics, such as `EliminateLimits`. Because rowCount is 0, it removes the limit operator. At this time, we get data results without limit. https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L168-L172 https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2067-L2070 ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Production environment verification. **master metrics** https://github.com/apache/spark/assets/3898450/dc9b6e8a-93ec-4f59-a903-71aa5b11962c;> **PR metrics** https://github.com/apache/spark/assets/3898450/2d73b773-2dcc-4d23-81de-25dcadac86c1;> ### Was this patch authored or co-authored using generative AI tooling? No Closes #46459 from cxzl25/SPARK-48037-3.5. Authored-by: sychen Signed-off-by: Dongjoon Hyun --- .../spark/shuffle/sort/SortShuffleManager.scala| 2 +- .../spark/shuffle/sort/SortShuffleWriter.scala | 6 ++-- .../spark/util/collection/ExternalSorter.scala | 9 +++--- .../shuffle/sort/SortShuffleWriterSuite.scala | 3 ++ .../sql/execution/UnsafeRowSerializerSuite.scala | 3 +- .../adaptive/AdaptiveQueryExecSuite.scala | 32 -- 6 files changed, 43 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 46aca07ce43f..79dff6f87534 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -176,7 +176,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager metrics, shuffleExecutorComponents) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => -new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents) +new SortShuffleWriter(other, mapId, context, metrics, shuffleExecutorComponents) } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 8613fe11a4c2..3be7d24f7e4e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -21,6 +21,7 @@ import org.apache.spark._ import org.apache.spark.internal.{config, Logging} import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleWriter} +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.util.collection.ExternalSorter @@ -28,6 +29,7 @@ private[spark] class SortShuffleWriter[K, V, C]( handle: BaseShuffleHandle[K, V, C], mapId: Long, context: TaskContext, +writeMetrics: ShuffleWriteMetricsReporter, shuffleExecutorComponents: ShuffleExecutorComponents) extends ShuffleWriter[K, V] with Logging { @@ -46,8 +48,6 @@ private[spark] class SortShuffleWriter[K, V, C]( private var partitionLengths: Array[Long] = _ - private val writeMetrics = context.taskMetrics().shuffleWriteMetrics - /** Write a bunch of records to this task's output */ override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { @@ -67,7 +67,7 @@ private[spark] class SortShuffleWriter[K, V, C]( // (see SPARK-3570). val
(spark) branch master updated (5f883117203d -> 52a7f634e913)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 5f883117203d [SPARK-47914][SQL] Do not display the splits parameter in Range add 52a7f634e913 [SPARK-48183][PYTHON][DOCS] Update error contribution guide to respect new error class file No new revisions were added by this update. Summary of changes: python/docs/source/development/contributing.rst | 4 ++-- python/pyspark/errors/utils.py | 8 2 files changed, 6 insertions(+), 6 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47914][SQL] Do not display the splits parameter in Range
This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5f883117203d [SPARK-47914][SQL] Do not display the splits parameter in Range 5f883117203d is described below commit 5f883117203d823cb9914f483e314633845ecaa5 Author: guihuawen AuthorDate: Wed May 8 12:04:35 2024 +0800 [SPARK-47914][SQL] Do not display the splits parameter in Range ### What changes were proposed in this pull request? [SQL] explain extended select * from range(0, 4); Before this pr, the split is also displayed in the logical execution phase as None, if it is not be set. ` plan == Parsed Logical Plan == 'Project [*] +- 'UnresolvedTableValuedFunction [range], [0, 4] == Analyzed Logical Plan == id: bigint Project [id#11L](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-47914?filter=allissues#11L) +- Range (0, 4, step=1, splits=None) == Optimized Logical Plan == Range (0, 4, step=1, splits=None) == Physical Plan == *(1) Range (0, 4, step=1, splits=1)` After this pr, the split will not be displayed in the logical execution phase , if it is not set. At the same time, it will be be displayed when it is be set. ` plan == Parsed Logical Plan == 'Project [*] +- 'UnresolvedTableValuedFunction [range], [0, 4] == Analyzed Logical Plan == id: bigint Project [id#11L](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-47914?filter=allissues#11L) +- Range (0, 4, step=1) == Optimized Logical Plan == Range (0, 4, step=1) == Physical Plan == *(1) Range (0, 4, step=1, splits=1)` ### Why are the changes needed? If the split is not be set. it is also displayed in the logical execution phase as None, which is not very user-friendly. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? Closes #46136 from guixiaowen/SPARK-47914. Authored-by: guihuawen Signed-off-by: Kent Yao --- .../plans/logical/basicLogicalOperators.scala | 3 +- .../sql-tests/analyzer-results/group-by.sql.out| 8 ++-- .../analyzer-results/identifier-clause.sql.out | 2 +- .../analyzer-results/join-lateral.sql.out | 2 +- .../sql-tests/analyzer-results/limit.sql.out | 2 +- .../named-function-arguments.sql.out | 4 +- .../analyzer-results/non-excludable-rule.sql.out | 12 +++--- .../postgreSQL/aggregates_part1.sql.out| 20 - .../analyzer-results/postgreSQL/int8.sql.out | 4 +- .../analyzer-results/postgreSQL/join.sql.out | 6 +-- .../analyzer-results/postgreSQL/numeric.sql.out| 10 ++--- .../analyzer-results/postgreSQL/text.sql.out | 2 +- .../analyzer-results/postgreSQL/union.sql.out | 50 +++--- .../postgreSQL/window_part1.sql.out| 4 +- .../postgreSQL/window_part2.sql.out| 20 - .../postgreSQL/window_part3.sql.out| 8 ++-- .../sql-compatibility-functions.sql.out| 2 +- .../analyzer-results/sql-session-variables.sql.out | 2 +- .../scalar-subquery-predicate.sql.out | 4 +- .../scalar-subquery/scalar-subquery-select.sql.out | 4 +- .../table-valued-functions.sql.out | 14 +++--- .../typeCoercion/native/concat.sql.out | 18 .../typeCoercion/native/elt.sql.out| 8 ++-- .../udf/postgreSQL/udf-aggregates_part1.sql.out| 20 - .../udf/postgreSQL/udf-join.sql.out| 2 +- .../analyzer-results/udf/udf-group-by.sql.out | 4 +- .../results/named-function-arguments.sql.out | 2 +- .../scala/org/apache/spark/sql/ExplainSuite.scala | 6 +-- 28 files changed, 122 insertions(+), 121 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 4fd640afe3b2..9242a06cf1d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1071,7 +1071,8 @@ case class Range( override def newInstance(): Range = copy(output = output.map(_.newInstance())) override def simpleString(maxFields: Int): String = { -s"Range ($start, $end, step=$step, splits=$numSlices)" +val splits = if (numSlices.isDefined) { s", splits=$numSlices"
(spark) branch branch-3.5 updated: [MINOR][PYTHON][TESTS] Remove the doc in error message tests to allow other PyArrow versions in tests
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 704f956dcbed [MINOR][PYTHON][TESTS] Remove the doc in error message tests to allow other PyArrow versions in tests 704f956dcbed is described below commit 704f956dcbeddc9067e4ec502c4fd07175171cac Author: Hyukjin Kwon AuthorDate: Tue May 7 20:07:25 2024 -0700 [MINOR][PYTHON][TESTS] Remove the doc in error message tests to allow other PyArrow versions in tests This PR is a minor change to support more PyArrow versions in the test. To support more PyArrow versions in the test. it can fail: (https://github.com/HyukjinKwon/spark/actions/runs/8994639538/job/24708397027) ``` Traceback (most recent call last): File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py", line 585, in _test_merge_error self.__test_merge_error( File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py", line 606, in __test_merge_error with self.assertRaisesRegex(error_class, error_message_regex): AssertionError: "Return type of the user-defined function should be pandas.DataFrame, but is int64." does not match " An exception was thrown from the Python worker. Please see the stack trace below. Traceback (most recent call last): File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main process() File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process serializer.dump_stream(out_iter, outfile) File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 531, in dump_stream return ArrowStreamSerializer.dump_stream(self, init_stream_yield_batches(), stream) File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 104, in dump_stream for batch in iterator: File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py", line 524, in init_stream_yield_batches for series in iterator: File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1694, in mapper return f(df1_keys, df1_vals, df2_keys, df2_vals) ^ File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 370, in return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), to_arrow_type(return_type))] ^^^ File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 364, in wrapped verify_pandas_result( File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 234, in verify_pandas_result raise PySparkTypeError( pyspark.errors.exceptions.base.PySparkTypeError: [UDF_RETURN_TYPE] Return type of the user-defined function should be pandas.DataFrame, but is int. ``` No, test-only. Ci should validate it. No. Closes #46453 from HyukjinKwon/minor-test. Authored-by: Hyukjin Kwon Signed-off-by: Dongjoon Hyun --- python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py | 2 +- python/pyspark/sql/tests/pandas/test_pandas_map.py | 4 ++-- python/pyspark/sql/tests/test_arrow_map.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py index c3cd0f37b103..948ef4a53f2c 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py @@ -166,7 +166,7 @@ class CogroupedApplyInPandasTestsMixin: fn=lambda lft, rgt: lft.size + rgt.size, error_class=PythonException, error_message_regex="Return type of the user-defined function " -"should be pandas.DataFrame, but is int64.", +"should be pandas.DataFrame, but is int", ) def test_apply_in_pandas_returning_column_names(self): diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py b/python/pyspark/sql/tests/pandas/test_pandas_map.py index c3ba7b3e93a0..4b2be2bcf844 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_map.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py @@ -151,14 +151,14 @@ class MapInPandasTestsMixin: with self.assertRaisesRegex( PythonException,
(spark) branch master updated (6588554aa4cc -> 3b1ea0fde44e)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 6588554aa4cc [SPARK-48149][INFRA][FOLLOWUP] Use single quotation mark add 3b1ea0fde44e [MINOR][PYTHON][TESTS] Remove the doc in error message tests to allow other PyArrow versions in tests No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py | 2 +- python/pyspark/sql/tests/pandas/test_pandas_map.py | 4 ++-- python/pyspark/sql/tests/test_arrow_map.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48149][INFRA][FOLLOWUP] Use single quotation mark
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6588554aa4cc [SPARK-48149][INFRA][FOLLOWUP] Use single quotation mark 6588554aa4cc is described below commit 6588554aa4cc3c7f57d762e0b159d05dc70d75aa Author: Dongjoon Hyun AuthorDate: Wed May 8 11:35:19 2024 +0900 [SPARK-48149][INFRA][FOLLOWUP] Use single quotation mark ### What changes were proposed in this pull request? This is a follow-up of - #46407 ### Why are the changes needed? To fix the invalid syntax error - https://github.com/apache/spark/actions/runs/8989374763 > The workflow is not valid. .github/workflows/build_python.yml (Line: 37, Col: 24): Unexpected symbol: '"pypy3"'. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46454 from dongjoon-hyun/SPARK-48149-2. Authored-by: Dongjoon Hyun Signed-off-by: Hyukjin Kwon --- .github/workflows/build_python.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/build_python.yml b/.github/workflows/build_python.yml index 9195dc4af518..efa281d6a279 100644 --- a/.github/workflows/build_python.yml +++ b/.github/workflows/build_python.yml @@ -34,9 +34,9 @@ jobs: fail-fast: false matrix: include: - - pyversion: ${{ github.event.schedule == '0 15 * * *' && "pypy3" }} - - pyversion: ${{ github.event.schedule == '0 17 * * *' && "python3.10" }} - - pyversion: ${{ github.event.schedule == '0 19 * * *' && "python3.12" }} + - pyversion: ${{ github.event.schedule == '0 15 * * *' && 'pypy3' }} + - pyversion: ${{ github.event.schedule == '0 17 * * *' && 'python3.10' }} + - pyversion: ${{ github.event.schedule == '0 19 * * *' && 'python3.12' }} permissions: packages: write name: Run - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48131][CORE][FOLLOWUP] Add a new configuration for the MDC key of Task Name
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 84c5b919d998 [SPARK-48131][CORE][FOLLOWUP] Add a new configuration for the MDC key of Task Name 84c5b919d998 is described below commit 84c5b919d99872858d2f98db21fd3482f27dcbfc Author: Gengliang Wang AuthorDate: Tue May 7 19:18:50 2024 -0700 [SPARK-48131][CORE][FOLLOWUP] Add a new configuration for the MDC key of Task Name ### What changes were proposed in this pull request? Introduce a new Spark config `spark.log.legacyTaskNameMdc.enabled`: When true, the MDC key `mdc.taskName` will be set in the logs, which is consistent with the behavior of Spark 3.1 to Spark 3.5 releases. When false, the logging framework will use `task_name` as the MDC key for consistency with other new MDC keys. ### Why are the changes needed? As discussed in https://github.com/apache/spark/pull/46386#issuecomment-2098985001, we should add a configuration and migration guide about the change in the MDC key of Task Name. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual test ### Was this patch authored or co-authored using generative AI tooling? No Closes #46446 from gengliangwang/addConfig. Authored-by: Gengliang Wang Signed-off-by: Dongjoon Hyun --- core/src/main/scala/org/apache/spark/executor/Executor.scala | 11 +-- .../main/scala/org/apache/spark/internal/config/package.scala | 10 ++ docs/core-migration-guide.md | 2 ++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 3edba45ef89f..68c38fb6179f 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -95,6 +95,13 @@ private[spark] class Executor( private[executor] val conf = env.conf + // SPARK-48131: Unify MDC key mdc.taskName and task_name in Spark 4.0 release. + private[executor] val taskNameMDCKey = if (conf.get(LEGACY_TASK_NAME_MDC_ENABLED)) { +"mdc.taskName" + } else { +LogKeys.TASK_NAME.name + } + // SPARK-40235: updateDependencies() uses a ReentrantLock instead of the `synchronized` keyword // so that tasks can exit quickly if they are interrupted while waiting on another task to // finish downloading dependencies. @@ -914,7 +921,7 @@ private[spark] class Executor( try { mdc.foreach { case (key, value) => MDC.put(key, value) } // avoid overriding the takName by the user - MDC.put(LogKeys.TASK_NAME.name, taskName) + MDC.put(taskNameMDCKey, taskName) } catch { case _: NoSuchFieldError => logInfo("MDC is not supported.") } @@ -923,7 +930,7 @@ private[spark] class Executor( private def cleanMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = { try { mdc.foreach { case (key, _) => MDC.remove(key) } - MDC.remove(LogKeys.TASK_NAME.name) + MDC.remove(taskNameMDCKey) } catch { case _: NoSuchFieldError => logInfo("MDC is not supported.") } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index a5be6084de36..87402d2cc17e 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -152,6 +152,16 @@ package object config { .booleanConf .createWithDefault(true) + private[spark] val LEGACY_TASK_NAME_MDC_ENABLED = +ConfigBuilder("spark.log.legacyTaskNameMdc.enabled") + .doc("When true, the MDC (Mapped Diagnostic Context) key `mdc.taskName` will be set in the " + +"log output, which is the behavior of Spark version 3.1 through Spark 3.5 releases. " + +"When false, the logging framework will use `task_name` as the MDC key, " + +"aligning it with the naming convention of newer MDC keys introduced in Spark 4.0 release.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) + private[spark] val DRIVER_LOG_LOCAL_DIR = ConfigBuilder("spark.driver.log.localDir") .doc("Specifies a local directory to write driver logs and enable Driver Log UI Tab.") diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 95c7929a6241..28a9dd0f4371 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -46,6 +46,8 @@ license: | - Set the Spark configuration `spark.log.structuredLogging.enabled` to `false`. - Use a custom log4j configuration
(spark) branch master updated: [SPARK-48126][CORE] Make `spark.log.structuredLogging.enabled` effective
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 6bbf6b1eff2c [SPARK-48126][CORE] Make `spark.log.structuredLogging.enabled` effective 6bbf6b1eff2c is described below commit 6bbf6b1eff2cffe8d116ebba0194fac233b42348 Author: Gengliang Wang AuthorDate: Tue May 7 19:10:27 2024 -0700 [SPARK-48126][CORE] Make `spark.log.structuredLogging.enabled` effective ### What changes were proposed in this pull request? Currently, the spark conf `spark.log.structuredLogging.enabled` is not taking effect. The current code base checks this config in the method `prepareSubmitEnvironment`. However, Log4j is already initialized before that. This PR is to fix it by checking the config `spark.log.structuredLogging.enabled` before the initialization of Log4j. Also, this PR enhances the doc for this configuration. ### Why are the changes needed? Bug fix. After the fix, the Spark conf `spark.log.structuredLogging.enabled` takes effect. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manual test. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: GPT-4 I used GPT-4 to improve the documents. Closes #46452 from gengliangwang/makeConfEffective. Authored-by: Gengliang Wang Signed-off-by: Gengliang Wang --- .../org/apache/spark/deploy/SparkSubmit.scala | 33 -- .../org/apache/spark/internal/config/package.scala | 9 +++--- docs/configuration.md | 6 +++- docs/core-migration-guide.md | 4 ++- 4 files changed, 31 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 076aa8387dc5..5a7e5542cbd0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -69,10 +69,20 @@ private[spark] class SparkSubmit extends Logging { def doSubmit(args: Array[String]): Unit = { val appArgs = parseArguments(args) +val sparkConf = appArgs.toSparkConf() + // For interpreters, structured logging is disabled by default to avoid generating mixed // plain text and structured logs on the same console. if (isShell(appArgs.primaryResource) || isSqlShell(appArgs.mainClass)) { Logging.disableStructuredLogging() +} else { + // For non-shell applications, enable structured logging if it's not explicitly disabled + // via the configuration `spark.log.structuredLogging.enabled`. + if (sparkConf.getBoolean(STRUCTURED_LOGGING_ENABLED.key, defaultValue = true)) { +Logging.enableStructuredLogging() + } else { +Logging.disableStructuredLogging() + } } // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the application starts. @@ -82,9 +92,9 @@ private[spark] class SparkSubmit extends Logging { logInfo(appArgs.toString) } appArgs.action match { - case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) - case SparkSubmitAction.KILL => kill(appArgs) - case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) + case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog, sparkConf) + case SparkSubmitAction.KILL => kill(appArgs, sparkConf) + case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs, sparkConf) case SparkSubmitAction.PRINT_VERSION => printVersion() } } @@ -96,12 +106,11 @@ private[spark] class SparkSubmit extends Logging { /** * Kill an existing submission. */ - private def kill(args: SparkSubmitArguments): Unit = { + private def kill(args: SparkSubmitArguments, sparkConf: SparkConf): Unit = { if (RestSubmissionClient.supportsRestClient(args.master)) { new RestSubmissionClient(args.master) .killSubmission(args.submissionToKill) } else { - val sparkConf = args.toSparkConf() sparkConf.set("spark.master", args.master) SparkSubmitUtils .getSubmitOperations(args.master) @@ -112,12 +121,11 @@ private[spark] class SparkSubmit extends Logging { /** * Request the status of an existing submission. */ - private def requestStatus(args: SparkSubmitArguments): Unit = { + private def requestStatus(args: SparkSubmitArguments, sparkConf: SparkConf): Unit = { if (RestSubmissionClient.supportsRestClient(args.master)) { new RestSubmissionClient(args.master) .requestSubmissionStatus(args.submissionToRequestStatusFor) } else { - val sparkConf =
(spark) branch master updated: [SPARK-48045][PYTHON] Pandas API groupby with multi-agg-relabel ignores as_index=False
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 67ae23934b56 [SPARK-48045][PYTHON] Pandas API groupby with multi-agg-relabel ignores as_index=False 67ae23934b56 is described below commit 67ae23934b56761617c2fb217ae6cf6f2d8f619b Author: sai AuthorDate: Wed May 8 09:44:16 2024 +0900 [SPARK-48045][PYTHON] Pandas API groupby with multi-agg-relabel ignores as_index=False ### What changes were proposed in this pull request? In a Scenario where we use GroupBy in PySpark API with relabeling of aggregate columns and using as_index = False, the columns with which we group by are not returned in the DataFrame. The change proposes to fix this bug. Example: ps.DataFrame({"a": [0, 0], "b": [0, 1]}).groupby("a", as_index=False).agg(b_max=("b", "max")) Result: _ b_max 0 1 Required Result: _ a b_max 0 0 1 ### Why are the changes needed? The relabeling part of the code only uses only the aggregate columns. In a scenario where as_index=True, it is not an issue as the columns with which we group by are included in the index. When as_index=False, we need to append the columns with which we grouped by to the relabeling code. Please, check the commits/PR for the code changes ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Passed GA - Passed Build tests - Unit Tested including scenarios in addition to the one provided in the Jira ticket ### Was this patch authored or co-authored using generative AI tooling? No Closes #46391 from sinaiamonkar-sai/SPARK-48045-2. Authored-by: sai Signed-off-by: Hyukjin Kwon --- python/pyspark/pandas/groupby.py| 7 ++- python/pyspark/pandas/tests/groupby/test_groupby.py | 21 + 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index ec47ab75c43c..55627a4c740c 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -308,6 +308,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): ) if not self._as_index: +index_cols = psdf._internal.column_labels should_drop_index = set( i for i, gkey in enumerate(self._groupkeys) if gkey._psdf is not self._psdf ) @@ -322,8 +323,12 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): psdf = psdf.reset_index(level=should_drop_index, drop=drop) if len(should_drop_index) < len(self._groupkeys): psdf = psdf.reset_index() +index_cols = [c for c in psdf._internal.column_labels if c not in index_cols] +if relabeling: +psdf = psdf[pd.Index(index_cols + list(order))] +psdf.columns = pd.Index([c[0] for c in index_cols] + list(columns)) -if relabeling: +if relabeling and self._as_index: psdf = psdf[order] psdf.columns = columns # type: ignore[assignment] return psdf diff --git a/python/pyspark/pandas/tests/groupby/test_groupby.py b/python/pyspark/pandas/tests/groupby/test_groupby.py index 5867f7b62fa5..b58bfddb4b99 100644 --- a/python/pyspark/pandas/tests/groupby/test_groupby.py +++ b/python/pyspark/pandas/tests/groupby/test_groupby.py @@ -451,6 +451,27 @@ class GroupByTestsMixin: pdf.groupby([("x", "a"), ("x", "b")]).diff().sort_index(), ) +def test_aggregate_relabel_index_false(self): +pdf = pd.DataFrame( +{ +"A": [0, 0, 1, 1, 1], +"B": ["a", "a", "b", "a", "b"], +"C": [10, 15, 10, 20, 30], +} +) +psdf = ps.from_pandas(pdf) + +self.assert_eq( +pdf.groupby(["B", "A"], as_index=False) +.agg(C_MAX=("C", "max")) +.sort_values(["B", "A"]) +.reset_index(drop=True), +psdf.groupby(["B", "A"], as_index=False) +.agg(C_MAX=("C", "max")) +.sort_values(["B", "A"]) +.reset_index(drop=True), +) + class GroupByTests( GroupByTestsMixin, - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47240][CORE][PART1] Migrate logInfo with variables to structured logging framework
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new a15adeb3a215 [SPARK-47240][CORE][PART1] Migrate logInfo with variables to structured logging framework a15adeb3a215 is described below commit a15adeb3a215ad2ef7222e18112d23cdffa8569a Author: Tuan Pham AuthorDate: Tue May 7 17:35:35 2024 -0700 [SPARK-47240][CORE][PART1] Migrate logInfo with variables to structured logging framework The PR aims to migrate `logInfo` in Core module with variables to structured logging framework. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46362 from zeotuan/coreInfo. Lead-authored-by: Tuan Pham Co-authored-by: Gengliang Wang Signed-off-by: Gengliang Wang --- .../scala/org/apache/spark/internal/LogKey.scala | 48 ++- .../org/apache/spark/BarrierCoordinator.scala | 17 --- .../org/apache/spark/BarrierTaskContext.scala | 35 +++--- .../apache/spark/ExecutorAllocationManager.scala | 13 -- .../scala/org/apache/spark/MapOutputTracker.scala | 48 --- .../main/scala/org/apache/spark/SparkContext.scala | 54 ++ .../apache/spark/api/python/PythonHadoopUtil.scala | 2 +- .../org/apache/spark/api/python/PythonRDD.scala| 6 ++- .../org/apache/spark/api/python/PythonRunner.scala | 2 +- .../org/apache/spark/api/python/PythonUtils.scala | 7 +-- .../spark/api/python/StreamingPythonRunner.scala | 12 +++-- .../scala/org/apache/spark/deploy/Client.scala | 18 .../spark/deploy/ExternalShuffleService.scala | 10 ++-- .../apache/spark/rdd/ReliableCheckpointRDD.scala | 9 ++-- .../spark/rdd/ReliableRDDCheckpointData.scala | 6 ++- .../scala/org/apache/spark/ui/JettyUtils.scala | 7 ++- .../main/scala/org/apache/spark/ui/SparkUI.scala | 4 +- .../scala/org/apache/spark/util/ListenerBus.scala | 7 +-- .../scala/org/apache/spark/util/SignalUtils.scala | 2 +- 19 files changed, 205 insertions(+), 102 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index c127f9c3d1f9..14e822c6349f 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -26,13 +26,15 @@ trait LogKey { } /** - * Various keys used for mapped diagnostic contexts(MDC) in logging. - * All structured logging keys should be defined here for standardization. + * Various keys used for mapped diagnostic contexts(MDC) in logging. All structured logging keys + * should be defined here for standardization. */ object LogKeys { case object ACCUMULATOR_ID extends LogKey + case object ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE extends LogKey case object ACTUAL_NUM_FILES extends LogKey case object ACTUAL_PARTITION_COLUMN extends LogKey + case object ADDED_JARS extends LogKey case object AGGREGATE_FUNCTIONS extends LogKey case object ALPHA extends LogKey case object ANALYSIS_ERROR extends LogKey @@ -42,7 +44,10 @@ object LogKeys { case object APP_NAME extends LogKey case object APP_STATE extends LogKey case object ARGS extends LogKey + case object AUTH_ENABLED extends LogKey case object BACKUP_FILE extends LogKey + case object BARRIER_EPOCH extends LogKey + case object BARRIER_ID extends LogKey case object BATCH_ID extends LogKey case object BATCH_NAME extends LogKey case object BATCH_TIMESTAMP extends LogKey @@ -55,6 +60,7 @@ object LogKeys { case object BOOT extends LogKey case object BROADCAST extends LogKey case object BROADCAST_ID extends LogKey + case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey case object BUCKET extends LogKey case object BYTECODE_SIZE extends LogKey case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey @@ -62,6 +68,7 @@ object LogKeys { case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey case object CALL_SITE_LONG_FORM extends LogKey + case object CALL_SITE_SHORT_FORM extends LogKey case object CATALOG_NAME extends LogKey case object CATEGORICAL_FEATURES extends LogKey case object CHECKPOINT_FILE extends LogKey @@ -142,11 +149,13 @@ object LogKeys { case object DEPRECATED_KEY extends LogKey case object DESCRIPTION extends LogKey case object DESIRED_NUM_PARTITIONS extends LogKey + case object
(spark) branch master updated (5e49665ac39b -> 553e1b85c42a)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 5e49665ac39b [SPARK-47960][SS] Allow chaining other stateful operators after transformWithState operator add 553e1b85c42a [SPARK-48152][BUILD] Make `spark-profiler` as a part of release and publish to maven central repo No new revisions were added by this update. Summary of changes: .github/workflows/maven_test.yml| 10 +- connector/profiler/README.md| 2 +- connector/profiler/pom.xml | 6 +- dev/create-release/release-build.sh | 2 +- dev/test-dependencies.sh| 2 +- docs/building-spark.md | 7 +++ pom.xml | 3 +++ 7 files changed, 23 insertions(+), 9 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47960][SS] Allow chaining other stateful operators after transformWithState operator
This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 5e49665ac39b [SPARK-47960][SS] Allow chaining other stateful operators after transformWithState operator 5e49665ac39b is described below commit 5e49665ac39b49b875d6970f93df59aedd830fa5 Author: Bhuwan Sahni AuthorDate: Wed May 8 09:20:01 2024 +0900 [SPARK-47960][SS] Allow chaining other stateful operators after transformWithState operator ### What changes were proposed in this pull request? This PR adds support to define event time column in the output dataset of `TransformWithState` operator. The new event time column will be used to evaluate watermark expressions in downstream operators. 1. Note that the transformWithState operator does not enforce that values generated by user's computation adhere to the watermark semantics. (no output rows are generated which have event time less than watermark). 2. Updated the watermark value passed in TimerInfo as evictionWatermark, rather than lateEventsWatermark. 3. Ensure that event time column can only be defined in output if a watermark has been defined previously. ### Why are the changes needed? This change is required to support chaining of stateful operators after `transformWithState`. Event time column is required to evaluate watermark expressions in downstream stateful operators. ### Does this PR introduce _any_ user-facing change? Yes. Adds a new version of transformWithState API which allows redefining the event time column. ### How was this patch tested? Added unit test cases. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45376 from sahnib/tws-chaining-stateful-operators. Authored-by: Bhuwan Sahni Signed-off-by: Jungtaek Lim --- .../src/main/resources/error/error-conditions.json | 14 + .../spark/sql/catalyst/analysis/Analyzer.scala | 3 + .../ResolveUpdateEventTimeWatermarkColumn.scala| 52 +++ .../plans/logical/EventTimeWatermark.scala | 79 +++- .../spark/sql/catalyst/plans/logical/object.scala | 2 +- .../sql/catalyst/rules/RuleIdCollection.scala | 1 + .../spark/sql/catalyst/trees/TreePatterns.scala| 1 + .../spark/sql/errors/QueryCompilationErrors.scala | 7 + .../spark/sql/errors/QueryExecutionErrors.scala| 11 + .../apache/spark/sql/KeyValueGroupedDataset.scala | 178 - .../spark/sql/execution/SparkStrategies.scala | 12 + .../streaming/EventTimeWatermarkExec.scala | 88 - .../execution/streaming/IncrementalExecution.scala | 24 +- .../streaming/TransformWithStateExec.scala | 32 +- .../execution/streaming/statefulOperators.scala| 2 +- .../TransformWithStateChainingSuite.scala | 411 + 16 files changed, 866 insertions(+), 51 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index bae94a0ab97e..8a64c4c590e8 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -125,6 +125,12 @@ ], "sqlState" : "428FR" }, + "CANNOT_ASSIGN_EVENT_TIME_COLUMN_WITHOUT_WATERMARK" : { +"message" : [ + "Watermark needs to be defined to reassign event time column. Failed to find watermark definition in the streaming query." +], +"sqlState" : "42611" + }, "CANNOT_CAST_DATATYPE" : { "message" : [ "Cannot cast to ." @@ -1057,6 +1063,14 @@ }, "sqlState" : "4274K" }, + "EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED" : { +"message" : [ + "Previous node emitted a row with eventTime= which is older than current_watermark_value=", + "This can lead to correctness issues in the stateful operators downstream in the execution pipeline.", + "Please correct the operator logic to emit rows after current global watermark value." +], +"sqlState" : "42815" + }, "EMPTY_JSON_FIELD_VALUE" : { "message" : [ "Failed to parse an empty string for data type ." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c29432c916f9..55b6f1af7fd8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -331,6 +331,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor Seq( ResolveWithCTE, ExtractDistributedSequenceID) ++ +
(spark) branch branch-3.5 updated: [SPARK-48178][INFRA][3.5] Run `build/scala-213/java-11-17` jobs of `branch-3.5` only if needed
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 15b5d2a55837 [SPARK-48178][INFRA][3.5] Run `build/scala-213/java-11-17` jobs of `branch-3.5` only if needed 15b5d2a55837 is described below commit 15b5d2a558371395547461d7b37f20610432dea0 Author: Dongjoon Hyun AuthorDate: Tue May 7 15:54:50 2024 -0700 [SPARK-48178][INFRA][3.5] Run `build/scala-213/java-11-17` jobs of `branch-3.5` only if needed ### What changes were proposed in this pull request? This PR aims to run `build`, `scala-213`, and `java-11-17` job of `branch-3.5` only if needed to reduce the maximum concurrency of Apache Spark GitHub Action usage. ### Why are the changes needed? To meet ASF Infra GitHub Action policy, we need to reduce the maximum concurrency. - https://infra.apache.org/github-actions-policy.html ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46449 from dongjoon-hyun/SPARK-48178. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .github/workflows/build_and_test.yml | 9 - 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index fa40b2d0a390..9c3dc95d0f66 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -85,17 +85,16 @@ jobs: sparkr=`./dev/is-changed.py -m sparkr` tpcds=`./dev/is-changed.py -m sql` docker=`./dev/is-changed.py -m docker-integration-tests` - # 'build', 'scala-213', and 'java-11-17' are always true for now. - # It does not save significant time and most of PRs trigger the build. + build=`./dev/is-changed.py -m "core,unsafe,kvstore,avro,utils,network-common,network-shuffle,repl,launcher,examples,sketch,graphx,catalyst,hive-thriftserver,streaming,sql-kafka-0-10,streaming-kafka-0-10,mllib-local,mllib,yarn,mesos,kubernetes,hadoop-cloud,spark-ganglia-lgpl,sql,hive"` precondition=" { - \"build\": \"true\", + \"build\": \"$build\", \"pyspark\": \"$pyspark\", \"sparkr\": \"$sparkr\", \"tpcds-1g\": \"$tpcds\", \"docker-integration-tests\": \"$docker\", - \"scala-213\": \"true\", - \"java-11-17\": \"true\", + \"scala-213\": \"$build\", + \"java-11-17\": \"$build\", \"lint\" : \"true\", \"k8s-integration-tests\" : \"true\", \"breaking-changes-buf\" : \"true\", - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.5 updated: [SPARK-48173][SQL][3.5] CheckAnalysis should see the entire query plan
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 2f8e7cbe98df [SPARK-48173][SQL][3.5] CheckAnalysis should see the entire query plan 2f8e7cbe98df is described below commit 2f8e7cbe98df97ee0ae51a20796192c95e750721 Author: Wenchen Fan AuthorDate: Tue May 7 15:25:15 2024 -0700 [SPARK-48173][SQL][3.5] CheckAnalysis should see the entire query plan backport https://github.com/apache/spark/pull/46439 to 3.5 ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/38029 . Some custom check rules need to see the entire query plan tree to get some context, but https://github.com/apache/spark/pull/38029 breaks it as it checks the query plan of dangling CTE relations recursively. This PR fixes it by putting back the dangling CTE relation in the main query plan and then check the main query plan. ### Why are the changes needed? Revert the breaking change to custom check rules ### Does this PR introduce _any_ user-facing change? No for most users. This restores the behavior of Spark 3.3 and earlier for custom check rules. ### How was this patch tested? existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46442 from cloud-fan/check2. Lead-authored-by: Wenchen Fan Co-authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/analysis/CheckAnalysis.scala | 38 +++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 7f10bdbc80ca..485015f2efab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -141,17 +141,45 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB errorClass, missingCol, orderedCandidates, a.origin) } + private def checkUnreferencedCTERelations( + cteMap: mutable.Map[Long, (CTERelationDef, Int, mutable.Map[Long, Int])], + visited: mutable.Map[Long, Boolean], + danglingCTERelations: mutable.ArrayBuffer[CTERelationDef], + cteId: Long): Unit = { +if (visited(cteId)) { + return +} +val (cteDef, _, refMap) = cteMap(cteId) +refMap.foreach { case (id, _) => + checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, id) +} +danglingCTERelations.append(cteDef) +visited(cteId) = true + } + def checkAnalysis(plan: LogicalPlan): Unit = { val inlineCTE = InlineCTE(alwaysInline = true) val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int, mutable.Map[Long, Int])] inlineCTE.buildCTEMap(plan, cteMap) -cteMap.values.foreach { case (relation, refCount, _) => - // If a CTE relation is never used, it will disappear after inline. Here we explicitly check - // analysis for it, to make sure the entire query plan is valid. - if (refCount == 0) checkAnalysis0(relation.child) +val danglingCTERelations = mutable.ArrayBuffer.empty[CTERelationDef] +val visited: mutable.Map[Long, Boolean] = mutable.Map.empty.withDefaultValue(false) +// If a CTE relation is never used, it will disappear after inline. Here we explicitly collect +// these dangling CTE relations, and put them back in the main query, to make sure the entire +// query plan is valid. +cteMap.foreach { case (cteId, (_, refCount, _)) => + // If a CTE relation ref count is 0, the other CTE relations that reference it should also be + // collected. This code will also guarantee the leaf relations that do not reference + // any others are collected first. + if (refCount == 0) { +checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, cteId) + } } // Inline all CTEs in the plan to help check query plan structures in subqueries. -checkAnalysis0(inlineCTE(plan)) +var inlinedPlan: LogicalPlan = inlineCTE(plan) +if (danglingCTERelations.nonEmpty) { + inlinedPlan = WithCTE(inlinedPlan, danglingCTERelations.toSeq) +} +checkAnalysis0(inlinedPlan) plan.setAnalyzed() } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.5 updated: [SPARK-48179][INFRA][3.5] Pin `nbsphinx` to `0.9.3`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new a24ec1d8f76c [SPARK-48179][INFRA][3.5] Pin `nbsphinx` to `0.9.3` a24ec1d8f76c is described below commit a24ec1d8f76c7bf47e491086f14ea202b6806cd8 Author: Dongjoon Hyun AuthorDate: Tue May 7 15:23:24 2024 -0700 [SPARK-48179][INFRA][3.5] Pin `nbsphinx` to `0.9.3` ### What changes were proposed in this pull request? This PR aims to pin `nbsphinx` to `0.9.3` to recover `branch-3.5` CI. ### Why are the changes needed? From yesterday, `branch-3.5` commit build is broken. - https://github.com/apache/spark/actions/runs/8978558438/job/24659197282 ``` Exception occurred: File "/usr/local/lib/python3.9/dist-packages/nbsphinx/__init__.py", line 1316, in apply for section in self.document.findall(docutils.nodes.section): AttributeError: 'document' object has no attribute 'findall' The full traceback has been saved in /tmp/sphinx-err-qz4y0bav.log, if you want to report the issue to the developers. ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs on this PR. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46448 from dongjoon-hyun/nbsphinx. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .github/workflows/build_and_test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 8488540b415d..fa40b2d0a390 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -682,7 +682,7 @@ jobs: # See also https://issues.apache.org/jira/browse/SPARK-35375. # Pin the MarkupSafe to 2.0.1 to resolve the CI error. # See also https://issues.apache.org/jira/browse/SPARK-38279. -python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme 'sphinx-copybutton==0.5.2' nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0' 'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5' 'nest-asyncio==1.5.8' 'rpds-py==0.16.2' 'alabaster==0.7.13' +python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme 'sphinx-copybutton==0.5.2' 'nbsphinx==0.9.3' numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0' 'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5' 'nest-asyncio==1.5.8' 'rpds-py==0.16.2' 'alabaster==0.7.13' python3.9 -m pip install ipython_genutils # See SPARK-38517 python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' 'pyarrow==12.0.1' pandas 'plotly>=4.8' python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.5 updated: [SPARK-48167][PYTHON][TESTS][FOLLOWUP][3.5] Reformat test_readwriter.py to fix Python Linter error
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 03bc2b188d21 [SPARK-48167][PYTHON][TESTS][FOLLOWUP][3.5] Reformat test_readwriter.py to fix Python Linter error 03bc2b188d21 is described below commit 03bc2b188d2111b5c4cc5bc13ebd0455602028a8 Author: Dongjoon Hyun AuthorDate: Tue May 7 13:38:08 2024 -0700 [SPARK-48167][PYTHON][TESTS][FOLLOWUP][3.5] Reformat test_readwriter.py to fix Python Linter error ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/46430 to fix Python linter failure. ### Why are the changes needed? To recover `branch-3.5` CI, - https://github.com/apache/spark/actions/runs/8981228745/job/24666400664 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass Python Linter in this PR builder. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46445 from dongjoon-hyun/SPARK-48167. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- python/pyspark/sql/tests/test_readwriter.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_readwriter.py b/python/pyspark/sql/tests/test_readwriter.py index e903d3383b74..7911a82c61fc 100644 --- a/python/pyspark/sql/tests/test_readwriter.py +++ b/python/pyspark/sql/tests/test_readwriter.py @@ -247,7 +247,8 @@ class ReadwriterV2TestsMixin: self.assertEqual(100, self.spark.sql("select * from test_table").count()) @unittest.skipIf( -"SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Known behavior change in 4.0") +"SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Known behavior change in 4.0" +) def test_create_without_provider(self): df = self.df with self.assertRaisesRegex( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48134][CORE] Spark core (java side): Migrate `error/warn/info` with variables to structured logging framework
This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 3d9d1f3dc05a [SPARK-48134][CORE] Spark core (java side): Migrate `error/warn/info` with variables to structured logging framework 3d9d1f3dc05a is described below commit 3d9d1f3dc05a2825bf315c68fc4e4232354dbd00 Author: panbingkun AuthorDate: Tue May 7 13:08:00 2024 -0700 [SPARK-48134][CORE] Spark core (java side): Migrate `error/warn/info` with variables to structured logging framework ### What changes were proposed in this pull request? The pr aims to 1.migrate `error/warn/info` in module `core` with variables to `structured logging framework` for java side. 2.convert all dependencies on `org.slf4j.Logger & org.slf4j.LoggerFactory` to `org.apache.spark.internal.Logger & org.apache.spark.internal.LoggerFactory`, in order to completely `prohibit` importing `org.slf4j.Logger & org.slf4j.LoggerFactory` in java code later. ### Why are the changes needed? To enhance Apache Spark's logging system by implementing structured logging. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? - Pass GA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46390 from panbingkun/core_java_sl. Authored-by: panbingkun Signed-off-by: Gengliang Wang --- .../java/org/apache/spark/internal/Logger.java | 21 +++- .../scala/org/apache/spark/internal/LogKey.scala | 9 +++ .../org/apache/spark/io/ReadAheadInputStream.java | 19 --- .../org/apache/spark/memory/TaskMemoryManager.java | 28 +++--- .../shuffle/sort/BypassMergeSortShuffleWriter.java | 9 --- .../spark/shuffle/sort/ShuffleExternalSorter.java | 25 ++- .../spark/shuffle/sort/UnsafeShuffleWriter.java| 9 --- .../sort/io/LocalDiskShuffleMapOutputWriter.java | 10 .../apache/spark/unsafe/map/BytesToBytesMap.java | 12 ++ .../unsafe/sort/UnsafeExternalSorter.java | 21 +--- .../unsafe/sort/UnsafeSorterSpillReader.java | 4 ++-- 11 files changed, 113 insertions(+), 54 deletions(-) diff --git a/common/utils/src/main/java/org/apache/spark/internal/Logger.java b/common/utils/src/main/java/org/apache/spark/internal/Logger.java index 2b4dd3bb45bc..d8ab26424bae 100644 --- a/common/utils/src/main/java/org/apache/spark/internal/Logger.java +++ b/common/utils/src/main/java/org/apache/spark/internal/Logger.java @@ -34,6 +34,10 @@ public class Logger { this.slf4jLogger = slf4jLogger; } + public boolean isErrorEnabled() { +return slf4jLogger.isErrorEnabled(); + } + public void error(String msg) { slf4jLogger.error(msg); } @@ -58,6 +62,10 @@ public class Logger { } } + public boolean isWarnEnabled() { +return slf4jLogger.isWarnEnabled(); + } + public void warn(String msg) { slf4jLogger.warn(msg); } @@ -82,6 +90,10 @@ public class Logger { } } + public boolean isInfoEnabled() { +return slf4jLogger.isInfoEnabled(); + } + public void info(String msg) { slf4jLogger.info(msg); } @@ -106,6 +118,10 @@ public class Logger { } } + public boolean isDebugEnabled() { +return slf4jLogger.isDebugEnabled(); + } + public void debug(String msg) { slf4jLogger.debug(msg); } @@ -126,6 +142,10 @@ public class Logger { slf4jLogger.debug(msg, throwable); } + public boolean isTraceEnabled() { +return slf4jLogger.isTraceEnabled(); + } + public void trace(String msg) { slf4jLogger.trace(msg); } @@ -146,7 +166,6 @@ public class Logger { slf4jLogger.trace(msg, throwable); } - private void withLogContext( String pattern, MDC[] mdcs, diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index d4e1d9f535af..c127f9c3d1f9 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -168,6 +168,7 @@ object LogKeys { case object EXCEPTION extends LogKey case object EXECUTE_INFO extends LogKey case object EXECUTE_KEY extends LogKey + case object EXECUTION_MEMORY_SIZE extends LogKey case object EXECUTION_PLAN_LEAVES extends LogKey case object EXECUTOR_BACKEND extends LogKey case object EXECUTOR_DESIRED_COUNT extends LogKey @@ -302,6 +303,7 @@ object LogKeys { case object MAX_SLOTS extends LogKey case object MAX_SPLIT_BYTES extends LogKey case object MAX_TABLE_PARTITION_METADATA_SIZE extends LogKey + case object MEMORY_CONSUMER extends LogKey case object MEMORY_POOL_NAME extends LogKey case
(spark) branch master updated (26c50369edb2 -> e24f8965e066)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 26c50369edb2 [SPARK-48174][INFRA] Merge `connect` back to the original test pipeline add e24f8965e066 [SPARK-48037][CORE] Fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data No new revisions were added by this update. Summary of changes: .../spark/shuffle/sort/SortShuffleManager.scala| 2 +- .../spark/shuffle/sort/SortShuffleWriter.scala | 6 +++--- .../spark/util/collection/ExternalSorter.scala | 9 + .../shuffle/sort/SortShuffleWriterSuite.scala | 3 +++ .../sql/execution/UnsafeRowSerializerSuite.scala | 3 ++- .../adaptive/AdaptiveQueryExecSuite.scala | 23 ++ 6 files changed, 37 insertions(+), 9 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
svn commit: r69013 - /dev/spark/KEYS
Author: wenchen Date: Tue May 7 17:07:43 2024 New Revision: 69013 Log: Update KEYS Modified: dev/spark/KEYS Modified: dev/spark/KEYS == --- dev/spark/KEYS (original) +++ dev/spark/KEYS Tue May 7 17:07:43 2024 @@ -704,61 +704,62 @@ kyHyHY5kPG9HfDOSahPz =SDAz -END PGP PUBLIC KEY BLOCK- -pub 4096R/4F4FDC8A 2018-09-18 -uid Wenchen Fan (CODE SIGNING KEY) -sub 4096R/6F3F5B0E 2018-09-18 +pub rsa4096 2024-05-07 [SC] + 4DC9676CEF9A83E98FCA02784D6620843CD87F5A +uid Wenchen Fan (CODE SIGNING KEY) +sub rsa4096 2024-05-07 [E] -BEGIN PGP PUBLIC KEY BLOCK- -Version: GnuPG v1 -mQINBFugiYgBEAC4DsJBWF3VjWiKEiD8XNPRTg3Bnw52fe4bTB9Jvh/q0VStJjO7 -CSHZ1/P5h60zbS5UWLP2mt+c0FaW6wv7PxafCnd1MPENGBkttZbC4UjWDSbPp0vx -fkUfrAqflWvO1AaCveg2MlyQdLZ1HwVz+PDLWqE+Ev2p3Si4Jfx5P2O9FmWt8a/b -Wea/4gfy/5zFWRberQjt4CkSBuNU+cOo19/n32JJJYbRqrzFAGs/DJUIxNXC1qef -c2iB3dyff1mkLb9Vzd1RfhZaSNUElo67o4Vi6SswgvHxoE03wIcoJvBTafqLxy6p -mt5SAzOyvvmOVcLNqP9i5+c4sBrxvQ2ZEZrZt7dKfhbh4W8ged/TNWMoNOCX2usD -Fj17KrFAEaeqtEwRdwZMxGqKI/NxANkdPSxS4T/JQoi+N6LBJ88yzmeCquA8MT0b -/H4ziyjgrSRugCE6jcsbuObQsDxiqPSSXeWSjPoYq876JcqAgZzSYYdlGVw2J9Vb -46hhEqhGk+91vK6CtyuhKv5KXk1B3Rhhc5znKWcahD3cpISxwTSzN9OwQHEd8Ovv -x0WAhY3WOexrBekH7Sy00gjaHSAHFj3ReITfffWkv6t4TGLyohEOfgdxFvq03Fhd -p7bWDmux47jP6AUUjP0VXRsG9ev3ch+bbcbRlo15HPBtyehoPn4BellFAQARAQAB +mQINBGY6XpcBEADBeNz3IBYriwrPzMYJJO5u1DaWAJ4Sryx6PUZgvssrcqojYVTh +MjtlBkWRcNquAyDrVlU1vtq1yMq5KopQoAEi/l3xaEDZZ0IFAob6+GlGXEon2Jvf +0FXQsx+Df4nMVl7KPqh68T++Z4GkvK5wyyN9uaUTWL2deGeinVxTh6qWQT8YiCd5 +wof+Dk5IIzKQ5VIBhU/U9S0jo/pqhH4okcZGTyT2Q7sfg4eXl5+Y2OR334RkvTcX +uJjcnJ8BUbBSm1UhNg4OGBEJgi+lE1GEgw4juOfTAPh9fx8SCLhuX0m6Qc/y9bAK +Q4zejbF5F2Um9dqrZqg6Egp+nlzydn59hq9owSnQ6JdoA/PLcgoign0sghu9xGCR +GpgI2kS7Q8bu6dy7T0BfUerLZ1FHu7nCT2ZNSIh/Y2eOhuBhUr3llg8xa3PZZob/ +2sZE2dJ3g/qp2Nbo+s5Q5kELtuo6cZD0EISQwt68hGWIgxs0vtci2c2kQYFS0oqw +fGynEeDFZRHV3ET5rioYaoPi70Cnibght5ocL0t6sl0RQQVp6k2i1aofJbZA480N +ivuJ5agGaSRxmIDk6JlDsHJGxO9oC066ZLJiR6i0JUinGP7sw/nNmgup/AB+y4hW +9WdeAFyYmuYysDRRyE6z1MPDp1R00MyGxHNFDF64/JPY/nKKFdXp+aCazwARAQAB tDNXZW5jaGVuIEZhbiAoQ09ERSBTSUdOSU5HIEtFWSkgPHdlbmNoZW5AYXBhY2hl -Lm9yZz6JAjgEEwECACIFAlugiYgCGwMGCwkIBwMCBhUIAgkKCwQWAgMBAh4BAheA -AAoJEGuscolPT9yKvqUP/i34exSQNs9NcDvjOQhrvpjYCarL4mdQZOjIn9JWxeWr -3nkzC9ozEIrb1zt8pqhiYr6qJhmx2EJgIwZTZZ9O0qHFMmYhYn/9/KKidE0XN6t3 -dFcbtRB1PGlc9b34PZNfdhD8PWA/UB1QC0TdTRNKhrIGGIZocrkaBral6uMJZAyV -kbb+s21cRupPLM2wmU1k3U4WxnaIq2foErhaPC9+OEDAcLH/OxwiekJTCsvZypzE -1laxo21rX1kgYzeAuqP4BfX5ARyrfM3O31Gh8asrx1bXD4z7dHqJxdJjh7ycdJdT -VLcqy6LVsRanubiJ7cg4LkEz7Xxm3RLC3ZLbnYr72ljV5wzQ3r9gE+5rEnacZw7P -9fQkAY5W2fqgfSn6Zx2SwPGhXosMdlp4zAWH3aCZj+DAx8XNG0sm3RE465zGxI9w -jIE8V9RYhUoQnmfQA+lqUIrkmEneXYPUvct6F6B5tRNOXsceVG+c4O3o+ueJKEv3 -DvI4UyGD8q5k7IT24sOKa3CZmq3dsutfpecZdbrPT5MV13vrdJ0DZufpAi/mN3vL -obkYyKRwyObUCiG6K9REoUJbes6+oX7SDXUCE1AI6UJrSuSXHazY/FWUjZaZClPR -yxO51LjsEbkmzKggtZMljlAnTfRi/sEt7pC0bx1SCwoBKfLUbXclRJTtr1Ac6MiM -uQINBFugiYgBEADNXxoluY3P7UvGAjprc03pHmg+VHzDQCHZpQ21Qgg4cUgEsckd -J6Vd7CrqLbTuYrQc7vq1UKV/5HWo3d8nK4eeM77DYo7fbmy1gMZ2okHy/EpV5i6d -gArAQM/nbEiCB9grgiFrLdApfPzeHHGXSbrF3BWyIkc/5mJ+VuLC+FY3xWDvHNAf -G8m7zrV5BlKVZl+WYRd9LQEvhaZG74ettro2zkT7UGagdFZtNFzUAQb3hnbYvlpY -OTJu1zUEkeYDsAgp+nbuAbgkGcH2iohsozVx2kKPzmRLmYmVeh725AtHJCsDbyuS -863bf1+xEz1K7k2P7FMbf8R1I8qxOjTA6tCRSkSYxPzEgcYB/ShAtX6N/J/mubsP -ow5NKAsSfdvH/Xk66umAFu2b0V1B8gFvKifZ4GEmKkJkLJ0Pw5Bspsx56BDqQCaf -d1lJcmm7MEQTydgC37UhfJIISdnJ/MGjT8oYyKhxfcfjQ6Gp2/nY0O9akg43chOl -ZvDk9EmuX+oEmM2aMBSIcOwEPvOCaIN62D8m/PeVYzuYBCYEkcURLMN8NnxLhu2q -MYjdKG1tc5tkxyaxJQLTTvxV6xYcz1qZZbk/YKp2yAIfMFJElhPkqidB0pGNIncC -myAnbrgDzEPzRIH9KVrFMb4Gzq3l+/ZSPxSLwxAO9I0o8jmfB0QSpZAdqwARAQAB -iQIfBBgBAgAJBQJboImIAhsMAAoJEGuscolPT9yKeQ0P/jykTh5nrHDaldxZE3eu -Z6I7fJXJEV6i5tmve5Q827EnEaLK07XlaHfP+/sYy9fWs16sNTLHgoNNtfUn738J -W08UtEADGFp6DntZg0+h5CksDNXr4t06ndqyniBMw1aTClrfW/9O8240VlrCrveD -GQmFziY1DePKV9pR0A6xsOGhLtL057T5uJ64lPJewgbg7X7dD0OlHYgcNYn+XGAY -OJYHuk5Nchm47iaciU3kDuECLWmDqyMBTT+B5/hcrwR9zUc+NjCjVVdsxxZlSixw -TOQvYxTdqhMwMedUuWmTkJ1XDU6AJSx013ma9OC1tKqwmjaSUTQDZFKVPy6yL+YH -SqRuC0M0p151rnMwljs2hPwuOOEm82PVsGKNytXtilt5LheGwyBPHTnX1ai2kvUC -HDcwOSR/CVeRLsWi7JBTXsfkflEuwasCEags42R8HzSCGYZwm8cauHnBQ9BxZVCr -iEdCcXfL2dgEIeiI5FXDlNzeXKihatHUYGBeZG0OO4oKoRKieF+pRvEJv4X7HRiL -Tj8aAFzBcLb5RZ0I9PIILQyy16B4ZdjyDjrjeIx1hxO7J/ZRX2DA62/yxo0Qa0SF -qmd2/zxUPGuOvX/e79ATOM8KMQFbGikLTcjW9gcUuBnLPCu0WN2KF+ykz33y1rH9 -DFZYf6DnKNwDnBzBuAx2S+P0 -=a0mL +Lm9yZz6JAlEEEwEIADsWIQRNyWds75qD6Y/KAnhNZiCEPNh/WgUCZjpelwIbAwUL +CQgHAgIiAgYVCgkICwIEFgIDAQIeBwIXgAAKCRBNZiCEPNh/WkofD/9sI7J3i9Ck +NOlHpVnjAaHjyGX5cVA2dZGniJdLf5yOKOI6pu7dMW+NThsXO1Iv+BRYo7una6/Q +vUquKKxCXIN3vNmKIB1e9lj4MaIhCRmXUSQxjkVa9JW3P/F520Ct3VjiCZ5IjPv+ +g1hF/wrkuuoAFlcC/bfGWafkaZgszavSpCdp27mUXUNbvLW0dPJ3+ay4cDPuT1DI +6DhB8qpqN7gInDFACW2qtQ2KZh1JFGy5ZccQ9dB3t/B4BYiUie6a3eQWgKqLF1hw +8yHY3DkCVGfnXJk4+LMWqgazQxoB6oZjBvoQYtGOPXr1ZbmtiRHCDM5KmZ+QmIXB
(spark) branch master updated: [SPARK-48174][INFRA] Merge `connect` back to the original test pipeline
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 26c50369edb2 [SPARK-48174][INFRA] Merge `connect` back to the original test pipeline 26c50369edb2 is described below commit 26c50369edb21d616361a4b22a555ed7b7412a4e Author: Dongjoon Hyun AuthorDate: Tue May 7 09:34:59 2024 -0700 [SPARK-48174][INFRA] Merge `connect` back to the original test pipeline ### What changes were proposed in this pull request? This PR aims to merge connect back to the original test pipeline to reduce the maximum concurrency of GitHub Action by one. - https://infra.apache.org/github-actions-policy.html > All workflows SHOULD have a job concurrency level less than or equal to 15. ### Why are the changes needed? This is a partial recover from the following. - #45107 We stabilized the root cause of #45107 via the following PRs. In addition we will disable a flaky test case if exists. - #46395 - #46396 - #46425 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46441 from dongjoon-hyun/SPARK-48174. Authored-by: Dongjoon Hyun Signed-off-by: Dongjoon Hyun --- .github/workflows/build_and_test.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 286f8e1193d9..00ba16265dce 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -156,9 +156,8 @@ jobs: mllib-local, mllib, graphx - >- streaming, sql-kafka-0-10, streaming-kafka-0-10, streaming-kinesis-asl, -kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf +kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf, connect - yarn - - connect # Here, we split Hive and SQL tests into some of slow ones and the rest of them. included-tags: [""] excluded-tags: [""] - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48035][SQL][FOLLOWUP] Fix try_add/try_multiply being semantic equal to add/multiply
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 808186835077 [SPARK-48035][SQL][FOLLOWUP] Fix try_add/try_multiply being semantic equal to add/multiply 808186835077 is described below commit 808186835077cf50f10262c633f19de4ccc09d9d Author: Supun Nakandala AuthorDate: Tue May 7 09:17:01 2024 -0700 [SPARK-48035][SQL][FOLLOWUP] Fix try_add/try_multiply being semantic equal to add/multiply ### What changes were proposed in this pull request? - This is a follow-up to the previous PR: https://github.com/apache/spark/pull/46307. - With the new changes we do the evalMode check in the `collectOperands` function instead of introducing a new function. ### Why are the changes needed? - Better code quality and readability. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Existing unit tests. ### Was this patch authored or co-authored using generative AI tooling? - No Closes #46414 from db-scnakandala/db-scnakandala/master. Authored-by: Supun Nakandala Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/expressions/Expression.scala | 14 - .../sql/catalyst/expressions/arithmetic.scala | 23 -- 2 files changed, 8 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 2759f5a29c79..de15ec43c4f3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -1378,20 +1378,6 @@ trait CommutativeExpression extends Expression { } reorderResult } - - /** - * Helper method to collect the evaluation mode of the commutative expressions. This is - * used by the canonicalized methods of [[Add]] and [[Multiply]] operators to ensure that - * all operands have the same evaluation mode before reordering the operands. - */ - protected def collectEvalModes( - e: Expression, - f: PartialFunction[CommutativeExpression, Seq[EvalMode.Value]] - ): Seq[EvalMode.Value] = e match { -case c: CommutativeExpression if f.isDefinedAt(c) => - f(c) ++ c.children.flatMap(collectEvalModes(_, f)) -case _ => Nil - } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala index 91c10a53af8a..a085a4e3a8a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala @@ -452,14 +452,12 @@ case class Add( copy(left = newLeft, right = newRight) override lazy val canonicalized: Expression = { -val evalModes = collectEvalModes(this, {case Add(_, _, evalMode) => Seq(evalMode)}) -lazy val reorderResult = buildCanonicalizedPlan( - { case Add(l, r, _) => Seq(l, r) }, +val reorderResult = buildCanonicalizedPlan( + { case Add(l, r, em) if em == evalMode => Seq(l, r) }, { case (l: Expression, r: Expression) => Add(l, r, evalMode)}, Some(evalMode) ) -if (resolved && evalModes.forall(_ == evalMode) && reorderResult.resolved && - reorderResult.dataType == dataType) { +if (resolved && reorderResult.resolved && reorderResult.dataType == dataType) { reorderResult } else { // SPARK-40903: Avoid reordering decimal Add for canonicalization if the result data type is @@ -609,16 +607,11 @@ case class Multiply( newLeft: Expression, newRight: Expression): Multiply = copy(left = newLeft, right = newRight) override lazy val canonicalized: Expression = { -val evalModes = collectEvalModes(this, {case Multiply(_, _, evalMode) => Seq(evalMode)}) -if (evalModes.forall(_ == evalMode)) { - buildCanonicalizedPlan( -{ case Multiply(l, r, _) => Seq(l, r) }, -{ case (l: Expression, r: Expression) => Multiply(l, r, evalMode)}, -Some(evalMode) - ) -} else { - withCanonicalizedChildren -} +buildCanonicalizedPlan( + { case Multiply(l, r, em) if em == evalMode => Seq(l, r) }, + { case (l: Expression, r: Expression) => Multiply(l, r, evalMode) }, + Some(evalMode) +) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-41547][CONNECT][TESTS] Re-eneable Spark Connect function tests with ANSI mode
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 8f719adcf556 [SPARK-41547][CONNECT][TESTS] Re-eneable Spark Connect function tests with ANSI mode 8f719adcf556 is described below commit 8f719adcf556f23ba66d3742266f4ca2e4875530 Author: Martin Grund AuthorDate: Tue May 7 09:14:06 2024 -0700 [SPARK-41547][CONNECT][TESTS] Re-eneable Spark Connect function tests with ANSI mode ### What changes were proposed in this pull request? This patch re-enables the previously failing tests after enablement of ANSI SQL. ### Why are the changes needed? Spark 4 / ANSI SQL ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Re-enabled tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #46432 from grundprinzip/grundprinzip/SPARK-41547. Authored-by: Martin Grund Signed-off-by: Dongjoon Hyun --- .../sql/tests/connect/test_connect_function.py | 33 ++ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py b/python/pyspark/sql/tests/connect/test_connect_function.py index 2f21dd5a7d3a..9d4db8cf7d15 100644 --- a/python/pyspark/sql/tests/connect/test_connect_function.py +++ b/python/pyspark/sql/tests/connect/test_connect_function.py @@ -2030,7 +2030,6 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, PandasOnSparkTestUtils, S (CF.sentences, SF.sentences), (CF.initcap, SF.initcap), (CF.soundex, SF.soundex), -(CF.bin, SF.bin), (CF.hex, SF.hex), (CF.unhex, SF.unhex), (CF.length, SF.length), @@ -2043,6 +2042,19 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, PandasOnSparkTestUtils, S sdf.select(sfunc("a"), sfunc(sdf.b)).toPandas(), ) +query = """ +SELECT * FROM VALUES +(' 1 ', '2 ', NULL), (' 3', NULL, '4') +AS tab(a, b, c) +""" +cdf = self.connect.sql(query) +sdf = self.spark.sql(query) + +self.assert_eq( +cdf.select(CF.bin(cdf.a), CF.bin(cdf.b)).toPandas(), +sdf.select(SF.bin(sdf.a), SF.bin(sdf.b)).toPandas(), +) + def test_string_functions_multi_args(self): query = """ SELECT * FROM VALUES @@ -2149,15 +2161,15 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, PandasOnSparkTestUtils, S def test_date_ts_functions(self): query = """ SELECT * FROM VALUES -('1997/02/28 10:30:00', '2023/03/01 06:00:00', 'JST', 1428476400, 2020, 12, 6), -('2000/01/01 04:30:05', '2020/05/01 12:15:00', 'PST', 1403892395, 2022, 12, 6) +('1997-02-28 10:30:00', '2023-03-01 06:00:00', 'JST', 1428476400, 2020, 12, 6), +('2000-01-01 04:30:05', '2020-05-01 12:15:00', 'PST', 1403892395, 2022, 12, 6) AS tab(ts1, ts2, tz, seconds, Y, M, D) """ # +---+---+---+--++---+---+ # |ts1|ts2| tz| seconds| Y| M| D| # +---+---+---+--++---+---+ -# |1997/02/28 10:30:00|2023/03/01 06:00:00|JST|1428476400|2020| 12| 6| -# |2000/01/01 04:30:05|2020/05/01 12:15:00|PST|1403892395|2022| 12| 6| +# |1997-02-28 10:30:00|2023-03-01 06:00:00|JST|1428476400|2020| 12| 6| +# |2000-01-01 04:30:05|2020-05-01 12:15:00|PST|1403892395|2022| 12| 6| # +---+---+---+--++---+---+ cdf = self.connect.sql(query) @@ -2213,14 +2225,14 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, PandasOnSparkTestUtils, S (CF.to_date, SF.to_date), ]: self.assert_eq( -cdf.select(cfunc(cdf.ts1, format="-MM-dd")).toPandas(), -sdf.select(sfunc(sdf.ts1, format="-MM-dd")).toPandas(), +cdf.select(cfunc(cdf.ts1, format="-MM-dd HH:mm:ss")).toPandas(), +sdf.select(sfunc(sdf.ts1, format="-MM-dd HH:mm:ss")).toPandas(), ) self.compare_by_show( # [left]: datetime64[ns, America/Los_Angeles] # [right]: datetime64[ns] -cdf.select(CF.to_timestamp(cdf.ts1, format="-MM-dd")), -sdf.select(SF.to_timestamp(sdf.ts1, format="-MM-dd")), +cdf.select(CF.to_timestamp(cdf.ts1, format="-MM-dd HH:mm:ss")), +sdf.select(SF.to_timestamp(sdf.ts1, format="-MM-dd HH:mm:ss")), ) # With tz
(spark) branch master updated (925457cadd22 -> a3eebcf39687)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 925457cadd22 [SPARK-48169][SQL] Use lazy BadRecordException cause in all parsers and remove the old constructor, which was meant for the migration add a3eebcf39687 [SPARK-48170][PYTHON][CONNECT][TESTS] Enable `ArrowPythonUDFParityTests.test_err_return_type` No new revisions were added by this update. Summary of changes: python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py | 4 1 file changed, 4 deletions(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48169][SQL] Use lazy BadRecordException cause in all parsers and remove the old constructor, which was meant for the migration
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 925457cadd22 [SPARK-48169][SQL] Use lazy BadRecordException cause in all parsers and remove the old constructor, which was meant for the migration 925457cadd22 is described below commit 925457cadd229673323e91a82d0b504145f509e0 Author: Vladimir Golubev AuthorDate: Tue May 7 09:09:00 2024 -0700 [SPARK-48169][SQL] Use lazy BadRecordException cause in all parsers and remove the old constructor, which was meant for the migration ### What changes were proposed in this pull request? Use factory function for the exception cause in `BadRecordException` to avoid constructing heavy exceptions in the underlying parser. Now they are constructed on-demand in `FailureSafeParser`. A follow-up for https://github.com/apache/spark/pull/46400 ### Why are the changes needed? - Speed-up `JacksonParser` and `StaxXmlParser`, since they throw user-facing exceptions to `FailureSafeParser` - Refactoring - leave only one constructor in `BadRecordException` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - `testOnly org.apache.spark.sql.catalyst.json.JacksonParserSuite` - `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes #46438 from vladimirg-db/vladimirg-db/use-lazy-exception-cause-in-all-bad-record-exception-invocations. Authored-by: Vladimir Golubev Signed-off-by: Dongjoon Hyun --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 2 +- .../spark/sql/catalyst/json/JacksonParser.scala| 12 ++-- .../sql/catalyst/util/BadRecordException.scala | 10 +- .../spark/sql/catalyst/xml/StaxXmlParser.scala | 22 -- 4 files changed, 20 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index 37d9143e5b5a..8d06789a7512 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -359,7 +359,7 @@ class UnivocityParser( } else { if (badRecordException.isDefined) { throw BadRecordException( - () => currentInput, () => Array[InternalRow](requiredRow.get), badRecordException.get) + () => currentInput, () => Array(requiredRow.get), badRecordException.get) } else { requiredRow } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index d1093a3b1be1..3c42f72fa6b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -613,7 +613,7 @@ class JacksonParser( // JSON parser currently doesn't support partial results for corrupted records. // For such records, all fields other than the field configured by // `columnNameOfCorruptRecord` are set to `null`. -throw BadRecordException(() => recordLiteral(record), cause = e) +throw BadRecordException(() => recordLiteral(record), cause = () => e) case e: CharConversionException if options.encoding.isEmpty => val msg = """JSON parser cannot handle a character in its input. @@ -621,17 +621,17 @@ class JacksonParser( |""".stripMargin + e.getMessage val wrappedCharException = new CharConversionException(msg) wrappedCharException.initCause(e) -throw BadRecordException(() => recordLiteral(record), cause = wrappedCharException) +throw BadRecordException(() => recordLiteral(record), cause = () => wrappedCharException) case PartialResultException(row, cause) => throw BadRecordException( record = () => recordLiteral(record), partialResults = () => Array(row), - convertCauseForPartialResult(cause)) + cause = () => convertCauseForPartialResult(cause)) case PartialResultArrayException(rows, cause) => throw BadRecordException( record = () => recordLiteral(record), partialResults = () => rows, - cause) + cause = () => cause) // These exceptions should never be thrown outside of JacksonParser. // They are used for the control flow in the parser. We add them here for completeness // since they also indicate a bad record. @@
(spark) branch master updated (493493d6c5bb -> 9e0a87eb4cf2)
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 493493d6c5bb [SPARK-48173][SQL] CheckAnalysis should see the entire query plan add 9e0a87eb4cf2 [SPARK-48165][BUILD] Update `ap-loader` to 3.0-9 No new revisions were added by this update. Summary of changes: connector/profiler/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48173][SQL] CheckAnalysis should see the entire query plan
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 493493d6c5bb [SPARK-48173][SQL] CheckAnalysis should see the entire query plan 493493d6c5bb is described below commit 493493d6c5bbbaa0b04f5548ac1ccd9502e8b8fa Author: Wenchen Fan AuthorDate: Tue May 7 08:02:25 2024 -0700 [SPARK-48173][SQL] CheckAnalysis should see the entire query plan ### What changes were proposed in this pull request? This is a follow-up of https://github.com/apache/spark/pull/38029 . Some custom check rules need to see the entire query plan tree to get some context, but https://github.com/apache/spark/pull/38029 breaks it as it checks the query plan of dangling CTE relations recursively. This PR fixes it by putting back the dangling CTE relation in the main query plan and then check the main query plan. ### Why are the changes needed? Revert the breaking change to custom check rules ### Does this PR introduce _any_ user-facing change? No for most users. This restores the behavior of Spark 3.3 and earlier for custom check rules. ### How was this patch tested? existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46439 from cloud-fan/check. Authored-by: Wenchen Fan Signed-off-by: Dongjoon Hyun --- .../sql/catalyst/analysis/CheckAnalysis.scala | 39 +++--- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index d1b336b08955..e55f23b6aa86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -145,15 +145,16 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB private def checkUnreferencedCTERelations( cteMap: mutable.Map[Long, (CTERelationDef, Int, mutable.Map[Long, Int])], visited: mutable.Map[Long, Boolean], + danglingCTERelations: mutable.ArrayBuffer[CTERelationDef], cteId: Long): Unit = { if (visited(cteId)) { return } val (cteDef, _, refMap) = cteMap(cteId) refMap.foreach { case (id, _) => - checkUnreferencedCTERelations(cteMap, visited, id) + checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, id) } -checkAnalysis0(cteDef.child) +danglingCTERelations.append(cteDef) visited(cteId) = true } @@ -161,35 +162,35 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB val inlineCTE = InlineCTE(alwaysInline = true) val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int, mutable.Map[Long, Int])] inlineCTE.buildCTEMap(plan, cteMap) +val danglingCTERelations = mutable.ArrayBuffer.empty[CTERelationDef] val visited: mutable.Map[Long, Boolean] = mutable.Map.empty.withDefaultValue(false) -cteMap.foreach { case (cteId, (relation, refCount, _)) => - // If a CTE relation is never used, it will disappear after inline. Here we explicitly check - // analysis for it, to make sure the entire query plan is valid. - try { -// If a CTE relation ref count is 0, the other CTE relations that reference it -// should also be checked by checkAnalysis0. This code will also guarantee the leaf -// relations that do not reference any others are checked first. -if (refCount == 0) { - checkUnreferencedCTERelations(cteMap, visited, cteId) -} - } catch { -case e: AnalysisException => - throw new ExtendedAnalysisException(e, relation.child) +// If a CTE relation is never used, it will disappear after inline. Here we explicitly collect +// these dangling CTE relations, and put them back in the main query, to make sure the entire +// query plan is valid. +cteMap.foreach { case (cteId, (_, refCount, _)) => + // If a CTE relation ref count is 0, the other CTE relations that reference it should also be + // collected. This code will also guarantee the leaf relations that do not reference + // any others are collected first. + if (refCount == 0) { +checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, cteId) } } // Inline all CTEs in the plan to help check query plan structures in subqueries. -var inlinedPlan: Option[LogicalPlan] = None +var inlinedPlan: LogicalPlan = plan try { - inlinedPlan = Some(inlineCTE(plan)) + inlinedPlan = inlineCTE(plan) } catch
(spark) branch master updated: [SPARK-47297][SQL] Add collation support for format expressions
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 148f5335427c [SPARK-47297][SQL] Add collation support for format expressions 148f5335427c is described below commit 148f5335427c3aea39cbcce967e18a3b35a88687 Author: Uros Bojanic <157381213+uros...@users.noreply.github.com> AuthorDate: Tue May 7 23:00:30 2024 +0800 [SPARK-47297][SQL] Add collation support for format expressions ### What changes were proposed in this pull request? Introduce collation awareness for format expressions: to_number, try_to_number, to_char, space. ### Why are the changes needed? Add collation support for format expressions in Spark. ### Does this PR introduce _any_ user-facing change? Yes, users should now be able to use collated strings within arguments for format functions: to_number, try_to_number, to_char, space. ### How was this patch tested? E2e sql tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46423 from uros-db/format-expressions. Authored-by: Uros Bojanic <157381213+uros...@users.noreply.github.com> Signed-off-by: Wenchen Fan --- .../expressions/numberFormatExpressions.scala | 14 ++- .../catalyst/expressions/stringExpressions.scala | 2 +- .../spark/sql/CollationSQLExpressionsSuite.scala | 132 - 3 files changed, 141 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala index 6d95d7e620a2..e914190c0645 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala @@ -26,6 +26,8 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper import org.apache.spark.sql.catalyst.util.ToNumberParser import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.types.StringTypeAnyCollation import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, DatetimeType, Decimal, DecimalType, StringType} import org.apache.spark.unsafe.types.UTF8String @@ -47,7 +49,8 @@ abstract class ToNumberBase(left: Expression, right: Expression, errorOnFail: Bo DecimalType.USER_DEFAULT } - override def inputTypes: Seq[DataType] = Seq(StringType, StringType) + override def inputTypes: Seq[AbstractDataType] = +Seq(StringTypeAnyCollation, StringTypeAnyCollation) override def checkInputDataTypes(): TypeCheckResult = { val inputTypeCheck = super.checkInputDataTypes() @@ -247,8 +250,9 @@ object ToCharacterBuilder extends ExpressionBuilder { inputExpr.dataType match { case _: DatetimeType => DateFormatClass(inputExpr, format) case _: BinaryType => - if (!(format.dataType == StringType && format.foldable)) { -throw QueryCompilationErrors.nonFoldableArgumentError(funcName, "format", StringType) + if (!(format.dataType.isInstanceOf[StringType] && format.foldable)) { +throw QueryCompilationErrors.nonFoldableArgumentError(funcName, "format", + format.dataType) } val fmt = format.eval() if (fmt == null) { @@ -279,8 +283,8 @@ case class ToCharacter(left: Expression, right: Expression) } } - override def dataType: DataType = StringType - override def inputTypes: Seq[AbstractDataType] = Seq(DecimalType, StringType) + override def dataType: DataType = SQLConf.get.defaultStringType + override def inputTypes: Seq[AbstractDataType] = Seq(DecimalType, StringTypeAnyCollation) override def checkInputDataTypes(): TypeCheckResult = { val inputTypeCheck = super.checkInputDataTypes() if (inputTypeCheck.isSuccess) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index 0769c8e609ec..c2ea17de1953 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -1906,7 +1906,7 @@ case class StringRepeat(str: Expression, times: Expression) case class StringSpace(child: Expression) extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant { - override def dataType: DataType =
(spark) branch master updated: [SPARK-48171][CORE] Clean up the use of deprecated constructors of `o.rocksdb.Logger`
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new c326f3c143ff [SPARK-48171][CORE] Clean up the use of deprecated constructors of `o.rocksdb.Logger` c326f3c143ff is described below commit c326f3c143ffdd56954706aeb4e0b82ac819bf03 Author: yangjie01 AuthorDate: Tue May 7 07:33:38 2024 -0700 [SPARK-48171][CORE] Clean up the use of deprecated constructors of `o.rocksdb.Logger` ### What changes were proposed in this pull request? This pr aims to clean up the use of deprecated constructors of `o.rocksdb.Logger`, the change ref to https://github.com/facebook/rocksdb/blob/5c2be544f5509465957706c955b6d623e889ac4e/java/src/main/java/org/rocksdb/Logger.java#L39-L54 ``` /** * AbstractLogger constructor. * * Important: the log level set within * the {link org.rocksdb.Options} instance will be used as * maximum log level of RocksDB. * * param options {link org.rocksdb.Options} instance. * * deprecated Use {link Logger#Logger(InfoLogLevel)} instead, e.g. {code new * Logger(options.infoLogLevel())}. */ Deprecated public Logger(final Options options) { this(options.infoLogLevel()); } ``` ### Why are the changes needed? Clean up deprecated api usage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #46436 from LuciferYang/rocksdb-deprecation. Authored-by: yangjie01 Signed-off-by: Dongjoon Hyun --- .../src/main/java/org/apache/spark/network/util/RocksDBProvider.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java b/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java index f3b7b48355a0..2b5ea01d94c9 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java @@ -136,7 +136,7 @@ public class RocksDBProvider { private static final Logger LOG = LoggerFactory.getLogger(RocksDBLogger.class); RocksDBLogger(Options options) { - super(options); + super(options.infoLogLevel()); } @Override - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48143][SQL] Use lightweight exceptions for control-flow between UnivocityParser and FailureSafeParser
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 326dbb447873 [SPARK-48143][SQL] Use lightweight exceptions for control-flow between UnivocityParser and FailureSafeParser 326dbb447873 is described below commit 326dbb4478732eb9b7a683511e69206f2b21bd37 Author: Vladimir Golubev AuthorDate: Tue May 7 20:28:50 2024 +0800 [SPARK-48143][SQL] Use lightweight exceptions for control-flow between UnivocityParser and FailureSafeParser # What changes were proposed in this pull request? New lightweight exception for control-flow between UnivocityParser and FalureSafeParser to speed-up malformed CSV parsing ### Why are the changes needed? Parsing in `PermissiveMode` is slow due to heavy exception construction (stacktrace filling + string template substitution in `SparkRuntimeException`) ### Does this PR introduce _any_ user-facing change? No, since `FailureSafeParser` unwraps `BadRecordException` and correctly rethrows user-facing exceptions in `FailFastMode` ### How was this patch tested? - `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite` - Manually run csv benchmark on DB benchmark workspace - Manually checked correct and malformed csv in sherk-shell (org.apache.spark.SparkException is thrown with the stacktrace) ### Was this patch authored or co-authored using generative AI tooling? No Closes #46400 from vladimirg-db/vladimirg-db/speed-up-csv-parser. Authored-by: Vladimir Golubev Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/csv/UnivocityParser.scala | 10 +- .../spark/sql/catalyst/json/JacksonParser.scala| 5 ++--- .../sql/catalyst/util/BadRecordException.scala | 23 ++ .../sql/catalyst/util/FailureSafeParser.scala | 2 +- .../spark/sql/catalyst/xml/StaxXmlParser.scala | 5 ++--- 5 files changed, 29 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index a5158d8a22c6..37d9143e5b5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -316,17 +316,17 @@ class UnivocityParser( throw BadRecordException( () => getCurrentInput, () => Array.empty, -QueryExecutionErrors.malformedCSVRecordError("")) +() => QueryExecutionErrors.malformedCSVRecordError("")) } val currentInput = getCurrentInput -var badRecordException: Option[Throwable] = if (tokens.length != parsedSchema.length) { +var badRecordException: Option[() => Throwable] = if (tokens.length != parsedSchema.length) { // If the number of tokens doesn't match the schema, we should treat it as a malformed record. // However, we still have chance to parse some of the tokens. It continues to parses the // tokens normally and sets null when `ArrayIndexOutOfBoundsException` occurs for missing // tokens. - Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString)) + Some(() => QueryExecutionErrors.malformedCSVRecordError(currentInput.toString)) } else None // When the length of the returned tokens is identical to the length of the parsed schema, // we just need to: @@ -348,7 +348,7 @@ class UnivocityParser( } catch { case e: SparkUpgradeException => throw e case NonFatal(e) => - badRecordException = badRecordException.orElse(Some(e)) + badRecordException = badRecordException.orElse(Some(() => e)) // Use the corresponding DEFAULT value associated with the column, if any. row.update(i, ResolveDefaultColumns.existenceDefaultValues(requiredSchema)(i)) } @@ -359,7 +359,7 @@ class UnivocityParser( } else { if (badRecordException.isDefined) { throw BadRecordException( - () => currentInput, () => Array(requiredRow.get), badRecordException.get) + () => currentInput, () => Array[InternalRow](requiredRow.get), badRecordException.get) } else { requiredRow } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index eadd0a4f8ab9..d1093a3b1be1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -613,7 +613,7 @@ class JacksonParser( // JSON parser currently doesn't
(spark) branch branch-3.5 updated: [SPARK-48086][PYTHON][TESTS][3.5] Remove obsolete comment
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 1735d7d3fd66 [SPARK-48086][PYTHON][TESTS][3.5] Remove obsolete comment 1735d7d3fd66 is described below commit 1735d7d3fd660560e15457793ccd9b758bb360f8 Author: Hyukjin Kwon AuthorDate: Tue May 7 18:50:47 2024 +0900 [SPARK-48086][PYTHON][TESTS][3.5] Remove obsolete comment ### What changes were proposed in this pull request? This PR proposes to remove those comments for skipped tests related to different Arrow version in JVM. ### Why are the changes needed? Arrow version incompatibility is something we cannot avoid. We should just skip those tests for now. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Linters. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46431 from HyukjinKwon/SPARK-48086. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py | 1 - python/pyspark/sql/tests/pandas/test_pandas_udf.py| 2 -- 2 files changed, 3 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py b/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py index 960f7f11e873..a508fe1059ed 100644 --- a/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py +++ b/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py @@ -32,7 +32,6 @@ class PandasUDFScalarParityTests(ScalarPandasUDFTestsMixin, ReusedConnectTestCas def test_vectorized_udf_struct_with_empty_partition(self): super().test_vectorized_udf_struct_with_empty_partition() -# TODO(SPARK-48086): Reenable this test case @unittest.skipIf( "SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Failed with different Client <> Server" ) diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf.py b/python/pyspark/sql/tests/pandas/test_pandas_udf.py index 4673375ccf69..b54e5608f3d0 100644 --- a/python/pyspark/sql/tests/pandas/test_pandas_udf.py +++ b/python/pyspark/sql/tests/pandas/test_pandas_udf.py @@ -262,7 +262,6 @@ class PandasUDFTestsMixin: .collect, ) -# TODO(SPARK-48086): Reenable this test case @unittest.skipIf( "SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Failed with different Client <> Server" ) @@ -289,7 +288,6 @@ class PandasUDFTestsMixin: with self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}): df.select(["A"]).withColumn("udf", udf("A")).collect() -# TODO(SPARK-48086): Reenable this test case @unittest.skipIf( "SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Failed with different Client <> Server" ) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-47267][SQL] Add collation support for hash expressions
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 08c6bb9bf32f [SPARK-47267][SQL] Add collation support for hash expressions 08c6bb9bf32f is described below commit 08c6bb9bf32f31b5b9870d56cc4c16ab97616da6 Author: Uros Bojanic <157381213+uros...@users.noreply.github.com> AuthorDate: Tue May 7 17:13:34 2024 +0800 [SPARK-47267][SQL] Add collation support for hash expressions ### What changes were proposed in this pull request? Introduce collation awareness for hash expressions: MD5, SHA2, SHA1, CRC32, MURMUR3, XXHASH64. ### Why are the changes needed? Add collation support for hash expressions in Spark. ### Does this PR introduce _any_ user-facing change? Yes, users should now be able to use collated strings within arguments for hash functions: md5, sha2, sha1, crc32, hash, xxhash64. ### How was this patch tested? E2e sql tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46422 from uros-db/hash-expressions. Authored-by: Uros Bojanic <157381213+uros...@users.noreply.github.com> Signed-off-by: Wenchen Fan --- .../spark/sql/catalyst/expressions/hash.scala | 6 +- .../spark/sql/CollationSQLExpressionsSuite.scala | 179 + 2 files changed, 182 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala index 5089cea136a8..fa342f641509 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala @@ -63,7 +63,7 @@ import org.apache.spark.util.ArrayImplicits._ case class Md5(child: Expression) extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant { - override def dataType: DataType = StringType + override def dataType: DataType = SQLConf.get.defaultStringType override def inputTypes: Seq[DataType] = Seq(BinaryType) @@ -103,7 +103,7 @@ case class Md5(child: Expression) case class Sha2(left: Expression, right: Expression) extends BinaryExpression with ImplicitCastInputTypes with NullIntolerant with Serializable { - override def dataType: DataType = StringType + override def dataType: DataType = SQLConf.get.defaultStringType override def nullable: Boolean = true override def inputTypes: Seq[DataType] = Seq(BinaryType, IntegerType) @@ -169,7 +169,7 @@ case class Sha2(left: Expression, right: Expression) case class Sha1(child: Expression) extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant { - override def dataType: DataType = StringType + override def dataType: DataType = SQLConf.get.defaultStringType override def inputTypes: Seq[DataType] = Seq(BinaryType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala index fa82405109f1..596923d975a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala @@ -28,6 +28,185 @@ class CollationSQLExpressionsSuite extends QueryTest with SharedSparkSession { + test("Support Md5 hash expression with collation") { +case class Md5TestCase( + input: String, + collationName: String, + result: String +) + +val testCases = Seq( + Md5TestCase("Spark", "UTF8_BINARY", "8cde774d6f7333752ed72cacddb05126"), + Md5TestCase("Spark", "UTF8_BINARY_LCASE", "8cde774d6f7333752ed72cacddb05126"), + Md5TestCase("SQL", "UNICODE", "9778840a0100cb30c982876741b0b5a2"), + Md5TestCase("SQL", "UNICODE_CI", "9778840a0100cb30c982876741b0b5a2") +) + +// Supported collations +testCases.foreach(t => { + val query = +s""" + |select md5('${t.input}') + |""".stripMargin + // Result & data type + withSQLConf(SqlApiConf.DEFAULT_COLLATION -> t.collationName) { +val testQuery = sql(query) +checkAnswer(testQuery, Row(t.result)) +val dataType = StringType(t.collationName) +assert(testQuery.schema.fields.head.dataType.sameType(dataType)) + } +}) + } + + test("Support Sha2 hash expression with collation") { +case class Sha2TestCase( + input: String, + collationName: String, + bitLength: Int, + result: String +) + +val testCases = Seq( + Sha2TestCase("Spark", "UTF8_BINARY", 256, +"529bc3b07127ecb7e53a4dcf1991d9152c24537d919178022b2c42657f79a26b"), + Sha2TestCase("Spark",
(spark) branch master updated (7f8ef96cea27 -> c4df12cc884c)
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/spark.git from 7f8ef96cea27 [SPARK-48166][SQL] Avoid using BadRecordException as user-facing error in VariantExpressionEvalUtils add c4df12cc884c [SPARK-48113][CONNECT] Allow Plugins to integrate with Spark Connect No new revisions were added by this update. Summary of changes: .../main/scala/org/apache/spark/sql/Column.scala | 28 - .../scala/org/apache/spark/sql/SparkSession.scala | 59 --- .../org/apache/spark/sql/ClientDatasetSuite.scala | 24 .../apache/spark/sql/PlanGenerationTestSuite.scala | 26 + .../CheckConnectJvmClientCompatibility.scala | 9 +++ .../spark/sql/connect/ConnectProtoUtils.scala | 55 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 23 ++-- .../connect/planner/SparkConnectPlannerSuite.scala | 40 + .../planner/SparkConnectPlannerTestUtils.scala | 67 ++ .../plugin/SparkConnectPluginRegistrySuite.scala | 14 +++-- 10 files changed, 169 insertions(+), 176 deletions(-) create mode 100644 connector/connect/common/src/main/scala/org/apache/spark/sql/connect/ConnectProtoUtils.scala create mode 100644 connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerTestUtils.scala - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48166][SQL] Avoid using BadRecordException as user-facing error in VariantExpressionEvalUtils
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 7f8ef96cea27 [SPARK-48166][SQL] Avoid using BadRecordException as user-facing error in VariantExpressionEvalUtils 7f8ef96cea27 is described below commit 7f8ef96cea27d52d0bdda3808c6c48534dcd8567 Author: Vladimir Golubev AuthorDate: Tue May 7 16:13:46 2024 +0800 [SPARK-48166][SQL] Avoid using BadRecordException as user-facing error in VariantExpressionEvalUtils ### What changes were proposed in this pull request? Stop using `BadRecordException` in a user-facing context. Currently it is thrown then the `parse_json` input is malformed. ### Why are the changes needed? `BadRecordException` is an internal exception designed for `FailureSafeParser`. ### Does this PR introduce _any_ user-facing change? Yes, `parse_json` will not expose `BadRecordException` in error. ### How was this patch tested? `testOnly org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtilsSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes #46428 from vladimirg-db/vladimirg-db/get-rid-of-bad-record-exception-in-variant-expression-eval-utils. Authored-by: Vladimir Golubev Signed-off-by: Wenchen Fan --- .../sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala index f468e9745605..eb235eb854e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.variant import scala.util.control.NonFatal import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.util.{ArrayData, BadRecordException, MapData} +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.types._ import org.apache.spark.types.variant.{Variant, VariantBuilder, VariantSizeLimitException, VariantUtil} @@ -48,7 +48,7 @@ object VariantExpressionEvalUtils { .variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json")) case NonFatal(e) => parseJsonFailure(QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError( - input.toString, BadRecordException(() => input, cause = e))) + input.toString, e)) } } - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48058][SPARK-43727][PYTHON][CONNECT][TESTS][FOLLOWUP] Code clean up
This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new 98d4ab734de0 [SPARK-48058][SPARK-43727][PYTHON][CONNECT][TESTS][FOLLOWUP] Code clean up 98d4ab734de0 is described below commit 98d4ab734de05eb5eec83011ed965cfb5b51e4b5 Author: Ruifeng Zheng AuthorDate: Tue May 7 15:35:54 2024 +0800 [SPARK-48058][SPARK-43727][PYTHON][CONNECT][TESTS][FOLLOWUP] Code clean up ### What changes were proposed in this pull request? after https://github.com/apache/spark/pull/46300, the two tests are actually the same as https://github.com/apache/spark/blob/678aeb7ef7086bd962df7ac6d1c5f39151a0515b/python/pyspark/sql/tests/pandas/test_pandas_udf.py#L110-L125 and https://github.com/apache/spark/blob/678aeb7ef7086bd962df7ac6d1c5f39151a0515b/python/pyspark/sql/tests/pandas/test_pandas_udf.py#L55-L70 So no need to override them in the parity tests ### Why are the changes needed? clean up ### Does this PR introduce _any_ user-facing change? no, test only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #46429 from zhengruifeng/return_type_followup. Authored-by: Ruifeng Zheng Signed-off-by: Ruifeng Zheng --- .../sql/tests/connect/test_parity_pandas_udf.py| 35 +- 1 file changed, 1 insertion(+), 34 deletions(-) diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_udf.py b/python/pyspark/sql/tests/connect/test_parity_pandas_udf.py index b732a875fb0a..7f280a009f78 100644 --- a/python/pyspark/sql/tests/connect/test_parity_pandas_udf.py +++ b/python/pyspark/sql/tests/connect/test_parity_pandas_udf.py @@ -14,8 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from pyspark.sql.functions import pandas_udf, PandasUDFType -from pyspark.sql.types import DoubleType, StructType, StructField + from pyspark.sql.tests.pandas.test_pandas_udf import PandasUDFTestsMixin from pyspark.testing.connectutils import ReusedConnectTestCase @@ -24,38 +23,6 @@ class PandasUDFParityTests(PandasUDFTestsMixin, ReusedConnectTestCase): def test_udf_wrong_arg(self): self.check_udf_wrong_arg() -def test_pandas_udf_decorator_with_return_type_string(self): -@pandas_udf("v double", PandasUDFType.GROUPED_MAP) -def foo(x): -return x - -self.assertEqual(foo.returnType, StructType([StructField("v", DoubleType(), True)])) -self.assertEqual(foo.evalType, PandasUDFType.GROUPED_MAP) - -@pandas_udf(returnType="double", functionType=PandasUDFType.SCALAR) -def foo(x): -return x - -self.assertEqual(foo.returnType, DoubleType()) -self.assertEqual(foo.evalType, PandasUDFType.SCALAR) - -def test_pandas_udf_basic_with_return_type_string(self): -udf = pandas_udf(lambda x: x, "double", PandasUDFType.SCALAR) -self.assertEqual(udf.returnType, DoubleType()) -self.assertEqual(udf.evalType, PandasUDFType.SCALAR) - -udf = pandas_udf(lambda x: x, "v double", PandasUDFType.GROUPED_MAP) -self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType(), True)])) -self.assertEqual(udf.evalType, PandasUDFType.GROUPED_MAP) - -udf = pandas_udf(lambda x: x, "v double", functionType=PandasUDFType.GROUPED_MAP) -self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType(), True)])) -self.assertEqual(udf.evalType, PandasUDFType.GROUPED_MAP) - -udf = pandas_udf(lambda x: x, returnType="v double", functionType=PandasUDFType.GROUPED_MAP) -self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType(), True)])) -self.assertEqual(udf.evalType, PandasUDFType.GROUPED_MAP) - if __name__ == "__main__": import unittest - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.5 updated: [SPARK-48167][CONNECT][TESTS] Skip known behaviour change by SPARK-46122
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new ec7e888d7082 [SPARK-48167][CONNECT][TESTS] Skip known behaviour change by SPARK-46122 ec7e888d7082 is described below commit ec7e888d70822796146ffb2769ccae759baf24f4 Author: Hyukjin Kwon AuthorDate: Tue May 7 16:16:02 2024 +0900 [SPARK-48167][CONNECT][TESTS] Skip known behaviour change by SPARK-46122 ### What changes were proposed in this pull request? This PR proposes to skip `test_create_without_provider` in backward compatibility test in https://github.com/apache/spark/actions/workflows/build_python_connect35.yml ### Why are the changes needed? This is a intentional behaviour change (SPARK-46122) ### Does this PR introduce _any_ user-facing change? No, test-only. ### How was this patch tested? Tested in my fork ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46430 from HyukjinKwon/SPARK-48167. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/tests/test_readwriter.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/sql/tests/test_readwriter.py b/python/pyspark/sql/tests/test_readwriter.py index 921d2eba5ac7..e903d3383b74 100644 --- a/python/pyspark/sql/tests/test_readwriter.py +++ b/python/pyspark/sql/tests/test_readwriter.py @@ -16,6 +16,7 @@ # import os +import unittest import shutil import tempfile @@ -245,6 +246,8 @@ class ReadwriterV2TestsMixin: df.writeTo("test_table").using("parquet").create() self.assertEqual(100, self.spark.sql("select * from test_table").count()) +@unittest.skipIf( +"SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Known behavior change in 4.0") def test_create_without_provider(self): df = self.df with self.assertRaisesRegex( - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch branch-3.5 updated: [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking error message in UDF
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new f92580aa111f [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking error message in UDF f92580aa111f is described below commit f92580aa111f8531cdb229a2d1bb0234764d7262 Author: Hyukjin Kwon AuthorDate: Tue May 7 15:59:20 2024 +0900 [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking error message in UDF This PR reduces traceback so the actual error `ZeroDivisionError` can be tested in `pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception` So long traceback doesn't affect the test case. It can fail as below: ``` == FAIL [1.883s]: test_stream_exception (pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception) -- Traceback (most recent call last): File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 287, in test_stream_exception sq.processAllAvailable() File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py", line 129, in processAllAvailable self._execute_streaming_query_cmd(cmd) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py", line 177, in _execute_streaming_query_cmd (_, properties) = self._session.client.execute_command(exec_cmd) ^^ File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 982, in execute_command data, _, _, _, properties = self._execute_and_fetch(req) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch for response in self._execute_and_fetch_as_iterator(req): File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator self._handle_error(error) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line [150](https://github.com/HyukjinKwon/spark/actions/runs/8978991632/job/24660689666#step:9:151)3, in _handle_error self._handle_rpc_error(error) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error raise convert_exception(info, status.message) from None pyspark.errors.exceptions.connect.StreamingQueryException: [STREAM_FAILED] Query [id = 1c0c440d-0b48-41b1-9a03-071e7e13de82, runId = 692ec338-963a-43b1-89cb-2a8b7cb1e21a] terminated with exception: Job aborted due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure: Lost task 0.0 in stage 39.0 (TID 58) (fv-az714-234.22nzjvkrszmuhkvqy55p1tioig.phxx.internal.cloudapp.net executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main process() File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process serializer.dump_stream(out_iter, outfile) File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 224, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 145, in dump_stream for obj in iterator: File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 213, in _batched for item in iterator: File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs) ^ File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs) ^^^ File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 112, in return args_kwargs_offsets, lambda *a: func(*a) File
(spark) branch master updated: [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking error message in UDF
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new eee179135ed2 [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking error message in UDF eee179135ed2 is described below commit eee179135ed21dbdd8b342d053c9eda849e2de77 Author: Hyukjin Kwon AuthorDate: Tue May 7 15:59:20 2024 +0900 [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking error message in UDF ### What changes were proposed in this pull request? This PR reduces traceback so the actual error `ZeroDivisionError` can be tested in `pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception` ### Why are the changes needed? So long traceback doesn't affect the test case. It can fail as below: ``` == FAIL [1.883s]: test_stream_exception (pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception) -- Traceback (most recent call last): File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py", line 287, in test_stream_exception sq.processAllAvailable() File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py", line 129, in processAllAvailable self._execute_streaming_query_cmd(cmd) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py", line 177, in _execute_streaming_query_cmd (_, properties) = self._session.client.execute_command(exec_cmd) ^^ File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 982, in execute_command data, _, _, _, properties = self._execute_and_fetch(req) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch for response in self._execute_and_fetch_as_iterator(req): File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator self._handle_error(error) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line [150](https://github.com/HyukjinKwon/spark/actions/runs/8978991632/job/24660689666#step:9:151)3, in _handle_error self._handle_rpc_error(error) File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error raise convert_exception(info, status.message) from None pyspark.errors.exceptions.connect.StreamingQueryException: [STREAM_FAILED] Query [id = 1c0c440d-0b48-41b1-9a03-071e7e13de82, runId = 692ec338-963a-43b1-89cb-2a8b7cb1e21a] terminated with exception: Job aborted due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure: Lost task 0.0 in stage 39.0 (TID 58) (fv-az714-234.22nzjvkrszmuhkvqy55p1tioig.phxx.internal.cloudapp.net executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1834, in main process() File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1826, in process serializer.dump_stream(out_iter, outfile) File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 224, in dump_stream self.serializer.dump_stream(self._batched(iterator), stream) File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 145, in dump_stream for obj in iterator: File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 213, in _batched for item in iterator: File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in mapper result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs) ^ File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 1734, in result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in udfs) ^^^ File "/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 112, in return args_kwargs_offsets, lambda *a: func(*a)
(spark) branch branch-3.5 updated: [SPARK-48083][SPARK-48084][ML][TESTS] Remove JIRA comments for reenabling ML compatibility tests
This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.5 by this push: new 997100119e6c [SPARK-48083][SPARK-48084][ML][TESTS] Remove JIRA comments for reenabling ML compatibility tests 997100119e6c is described below commit 997100119e6c893c46b12cc32a4f96721c8f3a22 Author: Weichen Xu AuthorDate: Tue May 7 15:58:01 2024 +0900 [SPARK-48083][SPARK-48084][ML][TESTS] Remove JIRA comments for reenabling ML compatibility tests ### What changes were proposed in this pull request? Enable spark ml test ### Why are the changes needed? For test_connect_classification.py, session.copyFromLocalToFs failure with 3.5 client <> 4.0 server this is not an issue, copyFromLocalToFs requires spark server to config spark.connect.copyFromLocalToFs.allowDestLocal to False, because the test can only use local fs. For test_connect_evaluation.py, This test error pyspark.ml.connect.evaluation not working in 3.5 client <> 4.0 server is caused by cloudpickle forward incompatibility, it is not related to ML code ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46419 from WeichenXu123/reenable-test-3.5. Authored-by: Weichen Xu Signed-off-by: Hyukjin Kwon --- python/pyspark/ml/tests/connect/test_connect_classification.py | 1 - python/pyspark/ml/tests/connect/test_connect_evaluation.py | 1 - 2 files changed, 2 deletions(-) diff --git a/python/pyspark/ml/tests/connect/test_connect_classification.py b/python/pyspark/ml/tests/connect/test_connect_classification.py index 2763d3f613ae..f0d60a117e12 100644 --- a/python/pyspark/ml/tests/connect/test_connect_classification.py +++ b/python/pyspark/ml/tests/connect/test_connect_classification.py @@ -21,7 +21,6 @@ import os from pyspark.sql import SparkSession from pyspark.ml.tests.connect.test_legacy_mode_classification import ClassificationTestsMixin -# TODO(SPARK-48083): Reenable this test case have_torch = "SPARK_SKIP_CONNECT_COMPAT_TESTS" not in os.environ try: import torch # noqa: F401 diff --git a/python/pyspark/ml/tests/connect/test_connect_evaluation.py b/python/pyspark/ml/tests/connect/test_connect_evaluation.py index 35af54605ca8..58076dfe0bbe 100644 --- a/python/pyspark/ml/tests/connect/test_connect_evaluation.py +++ b/python/pyspark/ml/tests/connect/test_connect_evaluation.py @@ -20,7 +20,6 @@ import unittest from pyspark.sql import SparkSession from pyspark.ml.tests.connect.test_legacy_mode_evaluation import EvaluationTestsMixin -# TODO(SPARK-48084): Reenable this test case have_torcheval = "SPARK_SKIP_CONNECT_COMPAT_TESTS" not in os.environ try: import torcheval # noqa: F401 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
(spark) branch master updated: [SPARK-48027][SQL] InjectRuntimeFilter for multi-level join should check child join type
This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/master by this push: new b5e39bedab14 [SPARK-48027][SQL] InjectRuntimeFilter for multi-level join should check child join type b5e39bedab14 is described below commit b5e39bedab14a7fd800597ee0114b07448c1b0f9 Author: Angerszh AuthorDate: Tue May 7 14:47:40 2024 +0800 [SPARK-48027][SQL] InjectRuntimeFilter for multi-level join should check child join type ### What changes were proposed in this pull request? In our prod we meet a case ``` with refund_info as ( select b_key, 1 as b_type from default.table_b ), next_month_time as ( select /*+ broadcast(b, c) */ c_key ,1 as c_time FROM default.table_c ) select a.loan_id ,c.c_time ,b.type from ( select a_key from default.table_a2 union select a_key from default.table_a1 ) a left join refund_info b on a.loan_id = b.loan_id left join next_month_time c on a.loan_id = c.loan_id ; ``` In this query, it inject table_b as table_c's runtime bloom filter, but table_b join condition is LEFT OUTER, causing table_c missing data. Caused by ![image](https://github.com/apache/spark/assets/46485123/be45e211-23e4-4105-98b4-aa571c87665f) InjectRuntimeFilter.extractSelectiveFilterOverScan(), when handle join, since left plan (a left outer join b's a) is a UNION then the extract result is NONE, then zip left/right keys to extract from join's right, finnaly cause this issue. ### Why are the changes needed? Fix data correctness issue ### Does this PR introduce _any_ user-facing change? Yea, fix data incorrect issue ### How was this patch tested? For the existed PR, it fix the wrong case Before: It extract a LEFT_ANTI_JOIN's right child to the outside bf3its not correct ``` Join Inner, (c3#45926 = c1#45914) :- Join LeftAnti, (c1#45914 = c2#45920) : :- Filter isnotnull(c1#45914) : : +- Relation default.bf1[a1#45912,b1#45913,c1#45914,d1#45915,e1#45916,f1#45917] parquet : +- Project [c2#45920] : +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920)) :+- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet +- Filter (isnotnull(c3#45926) AND might_contain(scalar-subquery#48719 [], xxhash64(c3#45926, 42))) : +- Aggregate [bloom_filter_agg(xxhash64(c2#45920, 42), 100, 8388608, 0, 0) AS bloomFilter#48718] : +- Project [c2#45920] :+- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920)) : +- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet +- Relation default.bf3[a3#45924,b3#45925,c3#45926,d3#45927,e3#45928,f3#45929] parquet ``` After: ``` Join Inner, (c3#45926 = c1#45914) :- Join LeftAnti, (c1#45914 = c2#45920) : :- Filter isnotnull(c1#45914) : : +- Relation default.bf1[a1#45912,b1#45913,c1#45914,d1#45915,e1#45916,f1#45917] parquet : +- Project [c2#45920] : +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND isnotnull(c2#45920)) :+- Relation default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet +- Filter (isnotnull(c3#45926)) +- Relation default.bf3[a3#45924,b3#45925,c3#45926,d3#45927,e3#45928,f3#45929] parquet ``` ### Was this patch authored or co-authored using generative AI tooling? NO Closes #46263 from AngersZh/SPARK-48027. Lead-authored-by: Angerszh Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../catalyst/optimizer/InjectRuntimeFilter.scala | 44 +++--- .../spark/sql/InjectRuntimeFilterSuite.scala | 4 +- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 9c150f1f3308..3bb7c4d1ceca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -120,7 +120,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J hasHitSelectiveFilter = hasHitSelectiveFilter || isLikelySelective(condition), currentPlan, targetKey) - case ExtractEquiJoinKeys(_, lkeys, rkeys, _,