(spark) branch branch-3.5 updated: [SPARK-48138][CONNECT][TESTS] Disable a flaky `SparkSessionE2ESuite.interrupt tag` test

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 36da89deccc9 [SPARK-48138][CONNECT][TESTS] Disable a flaky 
`SparkSessionE2ESuite.interrupt tag` test
36da89deccc9 is described below

commit 36da89deccc916a6f32d9bf6d6f2fd8e288da917
Author: Dongjoon Hyun 
AuthorDate: Mon May 6 13:45:54 2024 +0800

[SPARK-48138][CONNECT][TESTS] Disable a flaky 
`SparkSessionE2ESuite.interrupt tag` test

### What changes were proposed in this pull request?

This PR aims to disable  a flaky test, `SparkSessionE2ESuite.interrupt 
tag`, temporarily.

To re-enable this, SPARK-48139 is created as a blocker issue for 4.0.0.

### Why are the changes needed?

This test case was added at `Apache Spark 3.5.0` but has been unstable 
unfortunately until now.
- #42009

We tried to stabilize this test case before `Apache Spark 4.0.0-preview`.
- #45173
- #46374

However, it's still flaky.

- https://github.com/apache/spark/actions/runs/8962353911/job/24611130573 
(Master, 2024-05-05)
- https://github.com/apache/spark/actions/runs/8948176536/job/24581022674 
(Master, 2024-05-04)

This PR aims to stablize CI first and to focus this flaky issue as a 
blocker level before going on `Spark Connect GA` in SPARK-48139 before Apache 
Spark 4.0.0.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46396 from dongjoon-hyun/SPARK-48138.

Authored-by: Dongjoon Hyun 
Signed-off-by: yangjie01 
(cherry picked from commit 8294c5962febe53eebdff79f65f5f293d93a1997)
Signed-off-by: Dongjoon Hyun 
---
 .../jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
index c76dc724828e..e9c2f0c45750 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/SparkSessionE2ESuite.scala
@@ -108,7 +108,8 @@ class SparkSessionE2ESuite extends RemoteSparkSession {
 assert(interrupted.length == 2, s"Interrupted operations: $interrupted.")
   }
 
-  test("interrupt tag") {
+  // TODO(SPARK-48139): Re-enable `SparkSessionE2ESuite.interrupt tag`
+  ignore("interrupt tag") {
 val session = spark
 import session.implicits._
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-48037][CORE][3.5] Fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 58b71307795b [SPARK-48037][CORE][3.5] Fix SortShuffleWriter lacks 
shuffle write related metrics resulting in potentially inaccurate data
58b71307795b is described below

commit 58b71307795b6060be97431e0c5c8ab95205ea79
Author: sychen 
AuthorDate: Tue May 7 22:39:02 2024 -0700

[SPARK-48037][CORE][3.5] Fix SortShuffleWriter lacks shuffle write related 
metrics resulting in potentially inaccurate data

### What changes were proposed in this pull request?
This PR aims to fix SortShuffleWriter lacks shuffle write related metrics 
resulting in potentially inaccurate data.

### Why are the changes needed?
When the shuffle writer is SortShuffleWriter, it does not use 
SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain 
runtime statistics and the rowCount obtained is 0.

Some optimization rules rely on rowCount statistics, such as 
`EliminateLimits`. Because rowCount is 0, it removes the limit operator. At 
this time, we get data results without limit.


https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L168-L172


https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2067-L2070

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Production environment verification.

**master metrics**
https://github.com/apache/spark/assets/3898450/dc9b6e8a-93ec-4f59-a903-71aa5b11962c;>

**PR metrics**

https://github.com/apache/spark/assets/3898450/2d73b773-2dcc-4d23-81de-25dcadac86c1;>

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46459 from cxzl25/SPARK-48037-3.5.

Authored-by: sychen 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/shuffle/sort/SortShuffleManager.scala|  2 +-
 .../spark/shuffle/sort/SortShuffleWriter.scala |  6 ++--
 .../spark/util/collection/ExternalSorter.scala |  9 +++---
 .../shuffle/sort/SortShuffleWriterSuite.scala  |  3 ++
 .../sql/execution/UnsafeRowSerializerSuite.scala   |  3 +-
 .../adaptive/AdaptiveQueryExecSuite.scala  | 32 --
 6 files changed, 43 insertions(+), 12 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 46aca07ce43f..79dff6f87534 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -176,7 +176,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) 
extends ShuffleManager
   metrics,
   shuffleExecutorComponents)
   case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
-new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)
+new SortShuffleWriter(other, mapId, context, metrics, 
shuffleExecutorComponents)
 }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 8613fe11a4c2..3be7d24f7e4e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -21,6 +21,7 @@ import org.apache.spark._
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleWriter}
+import org.apache.spark.shuffle.ShuffleWriteMetricsReporter
 import org.apache.spark.shuffle.api.ShuffleExecutorComponents
 import org.apache.spark.util.collection.ExternalSorter
 
@@ -28,6 +29,7 @@ private[spark] class SortShuffleWriter[K, V, C](
 handle: BaseShuffleHandle[K, V, C],
 mapId: Long,
 context: TaskContext,
+writeMetrics: ShuffleWriteMetricsReporter,
 shuffleExecutorComponents: ShuffleExecutorComponents)
   extends ShuffleWriter[K, V] with Logging {
 
@@ -46,8 +48,6 @@ private[spark] class SortShuffleWriter[K, V, C](
 
   private var partitionLengths: Array[Long] = _
 
-  private val writeMetrics = context.taskMetrics().shuffleWriteMetrics
-
   /** Write a bunch of records to this task's output */
   override def write(records: Iterator[Product2[K, V]]): Unit = {
 sorter = if (dep.mapSideCombine) {
@@ -67,7 +67,7 @@ private[spark] class SortShuffleWriter[K, V, C](
 // (see SPARK-3570).
 val 

(spark) branch master updated (5f883117203d -> 52a7f634e913)

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 5f883117203d [SPARK-47914][SQL] Do not display the splits parameter in 
Range
 add 52a7f634e913 [SPARK-48183][PYTHON][DOCS] Update error contribution 
guide to respect new error class file

No new revisions were added by this update.

Summary of changes:
 python/docs/source/development/contributing.rst | 4 ++--
 python/pyspark/errors/utils.py  | 8 
 2 files changed, 6 insertions(+), 6 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-47914][SQL] Do not display the splits parameter in Range

2024-05-07 Thread yao
This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5f883117203d [SPARK-47914][SQL] Do not display the splits parameter in 
Range
5f883117203d is described below

commit 5f883117203d823cb9914f483e314633845ecaa5
Author: guihuawen 
AuthorDate: Wed May 8 12:04:35 2024 +0800

[SPARK-47914][SQL] Do not display the splits parameter in Range

### What changes were proposed in this pull request?
[SQL]

explain extended select * from range(0, 4);

Before this pr, the split is also displayed in the logical execution phase 
as None, if it is not be set.
`

plan

== Parsed Logical Plan ==

'Project [*]

+- 'UnresolvedTableValuedFunction [range], [0, 4]

== Analyzed Logical Plan ==

id: bigint

Project 
[id#11L](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-47914?filter=allissues#11L)

+- Range (0, 4, step=1, splits=None)

== Optimized Logical Plan ==

Range (0, 4, step=1, splits=None)

== Physical Plan ==

*(1) Range (0, 4, step=1, splits=1)`

After this pr, the split will not be displayed in the logical execution 
phase , if it is not set. At the same time,  it will be  be displayed when it 
is be set.

`

plan

== Parsed Logical Plan ==

'Project [*]

+- 'UnresolvedTableValuedFunction [range], [0, 4]

== Analyzed Logical Plan ==

id: bigint

Project 
[id#11L](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-47914?filter=allissues#11L)

+- Range (0, 4, step=1)

== Optimized Logical Plan ==

Range (0, 4, step=1)

== Physical Plan ==

*(1) Range (0, 4, step=1, splits=1)`

### Why are the changes needed?
If the split is not be set.  it is also displayed in the logical execution 
phase as None, which is not very user-friendly.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

Closes #46136 from guixiaowen/SPARK-47914.

Authored-by: guihuawen 
Signed-off-by: Kent Yao 
---
 .../plans/logical/basicLogicalOperators.scala  |  3 +-
 .../sql-tests/analyzer-results/group-by.sql.out|  8 ++--
 .../analyzer-results/identifier-clause.sql.out |  2 +-
 .../analyzer-results/join-lateral.sql.out  |  2 +-
 .../sql-tests/analyzer-results/limit.sql.out   |  2 +-
 .../named-function-arguments.sql.out   |  4 +-
 .../analyzer-results/non-excludable-rule.sql.out   | 12 +++---
 .../postgreSQL/aggregates_part1.sql.out| 20 -
 .../analyzer-results/postgreSQL/int8.sql.out   |  4 +-
 .../analyzer-results/postgreSQL/join.sql.out   |  6 +--
 .../analyzer-results/postgreSQL/numeric.sql.out| 10 ++---
 .../analyzer-results/postgreSQL/text.sql.out   |  2 +-
 .../analyzer-results/postgreSQL/union.sql.out  | 50 +++---
 .../postgreSQL/window_part1.sql.out|  4 +-
 .../postgreSQL/window_part2.sql.out| 20 -
 .../postgreSQL/window_part3.sql.out|  8 ++--
 .../sql-compatibility-functions.sql.out|  2 +-
 .../analyzer-results/sql-session-variables.sql.out |  2 +-
 .../scalar-subquery-predicate.sql.out  |  4 +-
 .../scalar-subquery/scalar-subquery-select.sql.out |  4 +-
 .../table-valued-functions.sql.out | 14 +++---
 .../typeCoercion/native/concat.sql.out | 18 
 .../typeCoercion/native/elt.sql.out|  8 ++--
 .../udf/postgreSQL/udf-aggregates_part1.sql.out| 20 -
 .../udf/postgreSQL/udf-join.sql.out|  2 +-
 .../analyzer-results/udf/udf-group-by.sql.out  |  4 +-
 .../results/named-function-arguments.sql.out   |  2 +-
 .../scala/org/apache/spark/sql/ExplainSuite.scala  |  6 +--
 28 files changed, 122 insertions(+), 121 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 4fd640afe3b2..9242a06cf1d6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1071,7 +1071,8 @@ case class Range(
   override def newInstance(): Range = copy(output = 
output.map(_.newInstance()))
 
   override def simpleString(maxFields: Int): String = {
-s"Range ($start, $end, step=$step, splits=$numSlices)"
+val splits = if (numSlices.isDefined) { s", splits=$numSlices" 

(spark) branch branch-3.5 updated: [MINOR][PYTHON][TESTS] Remove the doc in error message tests to allow other PyArrow versions in tests

2024-05-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 704f956dcbed [MINOR][PYTHON][TESTS] Remove the doc in error message 
tests to allow other PyArrow versions in tests
704f956dcbed is described below

commit 704f956dcbeddc9067e4ec502c4fd07175171cac
Author: Hyukjin Kwon 
AuthorDate: Tue May 7 20:07:25 2024 -0700

[MINOR][PYTHON][TESTS] Remove the doc in error message tests to allow other 
PyArrow versions in tests

This PR is a minor change to support more PyArrow versions in the test.

To support more PyArrow versions in the test. it can fail: 
(https://github.com/HyukjinKwon/spark/actions/runs/8994639538/job/24708397027)

```
Traceback (most recent call last):
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py",
 line 585, in _test_merge_error
self.__test_merge_error(
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py",
 line 606, in __test_merge_error
with self.assertRaisesRegex(error_class, error_message_regex):
AssertionError: "Return type of the user-defined function should be 
pandas.DataFrame, but is int64." does not match "
  An exception was thrown from the Python worker. Please see the stack 
trace below.
Traceback (most recent call last):
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1834, in main
process()
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1826, in process
serializer.dump_stream(out_iter, outfile)
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
 line 531, in dump_stream
return ArrowStreamSerializer.dump_stream(self, 
init_stream_yield_batches(), stream)
   

  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
 line 104, in dump_stream
for batch in iterator:
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py",
 line 524, in init_stream_yield_batches
for series in iterator:
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1694, in mapper
return f(df1_keys, df1_vals, df2_keys, df2_vals)
   ^
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
370, in 
return lambda kl, vl, kr, vr: [(wrapped(kl, vl, kr, vr), 
to_arrow_type(return_type))]
^^^
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
364, in wrapped
verify_pandas_result(
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
234, in verify_pandas_result
raise PySparkTypeError(
pyspark.errors.exceptions.base.PySparkTypeError: [UDF_RETURN_TYPE] Return 
type of the user-defined function should be pandas.DataFrame, but is int.
```

No, test-only.

Ci should validate it.

No.

Closes #46453 from HyukjinKwon/minor-test.

Authored-by: Hyukjin Kwon 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py | 2 +-
 python/pyspark/sql/tests/pandas/test_pandas_map.py   | 4 ++--
 python/pyspark/sql/tests/test_arrow_map.py   | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
index c3cd0f37b103..948ef4a53f2c 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py
@@ -166,7 +166,7 @@ class CogroupedApplyInPandasTestsMixin:
 fn=lambda lft, rgt: lft.size + rgt.size,
 error_class=PythonException,
 error_message_regex="Return type of the user-defined function "
-"should be pandas.DataFrame, but is int64.",
+"should be pandas.DataFrame, but is int",
 )
 
 def test_apply_in_pandas_returning_column_names(self):
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_map.py
index c3ba7b3e93a0..4b2be2bcf844 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_map.py
@@ -151,14 +151,14 @@ class MapInPandasTestsMixin:
 with self.assertRaisesRegex(
 PythonException,
 

(spark) branch master updated (6588554aa4cc -> 3b1ea0fde44e)

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 6588554aa4cc [SPARK-48149][INFRA][FOLLOWUP] Use single quotation mark
 add 3b1ea0fde44e [MINOR][PYTHON][TESTS] Remove the doc in error message 
tests to allow other PyArrow versions in tests

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/tests/pandas/test_pandas_cogrouped_map.py | 2 +-
 python/pyspark/sql/tests/pandas/test_pandas_map.py   | 4 ++--
 python/pyspark/sql/tests/test_arrow_map.py   | 4 ++--
 3 files changed, 5 insertions(+), 5 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48149][INFRA][FOLLOWUP] Use single quotation mark

2024-05-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6588554aa4cc [SPARK-48149][INFRA][FOLLOWUP] Use single quotation mark
6588554aa4cc is described below

commit 6588554aa4cc3c7f57d762e0b159d05dc70d75aa
Author: Dongjoon Hyun 
AuthorDate: Wed May 8 11:35:19 2024 +0900

[SPARK-48149][INFRA][FOLLOWUP] Use single quotation mark

### What changes were proposed in this pull request?

This is a follow-up of
- #46407

### Why are the changes needed?

To fix the invalid syntax error
- https://github.com/apache/spark/actions/runs/8989374763
> The workflow is not valid. .github/workflows/build_python.yml (Line: 37, 
Col: 24): Unexpected symbol: '"pypy3"'.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual review.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46454 from dongjoon-hyun/SPARK-48149-2.

Authored-by: Dongjoon Hyun 
Signed-off-by: Hyukjin Kwon 
---
 .github/workflows/build_python.yml | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/build_python.yml 
b/.github/workflows/build_python.yml
index 9195dc4af518..efa281d6a279 100644
--- a/.github/workflows/build_python.yml
+++ b/.github/workflows/build_python.yml
@@ -34,9 +34,9 @@ jobs:
   fail-fast: false
   matrix:
 include:
-  - pyversion: ${{ github.event.schedule == '0 15 * * *' && "pypy3" }}
-  - pyversion: ${{ github.event.schedule == '0 17 * * *' && 
"python3.10" }}
-  - pyversion: ${{ github.event.schedule == '0 19 * * *' && 
"python3.12" }}
+  - pyversion: ${{ github.event.schedule == '0 15 * * *' && 'pypy3' }}
+  - pyversion: ${{ github.event.schedule == '0 17 * * *' && 
'python3.10' }}
+  - pyversion: ${{ github.event.schedule == '0 19 * * *' && 
'python3.12' }}
 permissions:
   packages: write
 name: Run


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48131][CORE][FOLLOWUP] Add a new configuration for the MDC key of Task Name

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 84c5b919d998 [SPARK-48131][CORE][FOLLOWUP] Add a new configuration for 
the MDC key of Task Name
84c5b919d998 is described below

commit 84c5b919d99872858d2f98db21fd3482f27dcbfc
Author: Gengliang Wang 
AuthorDate: Tue May 7 19:18:50 2024 -0700

[SPARK-48131][CORE][FOLLOWUP] Add a new configuration for the MDC key of 
Task Name

### What changes were proposed in this pull request?

Introduce a new Spark config `spark.log.legacyTaskNameMdc.enabled`:
When true, the MDC key `mdc.taskName` will be set in the logs, which is 
consistent with the behavior of Spark 3.1 to Spark 3.5 releases. When false, 
the logging framework will use `task_name` as the MDC key for consistency with 
other new MDC keys.

### Why are the changes needed?

As discussed in 
https://github.com/apache/spark/pull/46386#issuecomment-2098985001, we should 
add a configuration and migration guide about the change in the MDC key of Task 
Name.
### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manual test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #46446 from gengliangwang/addConfig.

Authored-by: Gengliang Wang 
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/executor/Executor.scala  | 11 +--
 .../main/scala/org/apache/spark/internal/config/package.scala | 10 ++
 docs/core-migration-guide.md  |  2 ++
 3 files changed, 21 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 3edba45ef89f..68c38fb6179f 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -95,6 +95,13 @@ private[spark] class Executor(
 
   private[executor] val conf = env.conf
 
+  // SPARK-48131: Unify MDC key mdc.taskName and task_name in Spark 4.0 
release.
+  private[executor] val taskNameMDCKey = if 
(conf.get(LEGACY_TASK_NAME_MDC_ENABLED)) {
+"mdc.taskName"
+  } else {
+LogKeys.TASK_NAME.name
+  }
+
   // SPARK-40235: updateDependencies() uses a ReentrantLock instead of the 
`synchronized` keyword
   // so that tasks can exit quickly if they are interrupted while waiting on 
another task to
   // finish downloading dependencies.
@@ -914,7 +921,7 @@ private[spark] class Executor(
 try {
   mdc.foreach { case (key, value) => MDC.put(key, value) }
   // avoid overriding the takName by the user
-  MDC.put(LogKeys.TASK_NAME.name, taskName)
+  MDC.put(taskNameMDCKey, taskName)
 } catch {
   case _: NoSuchFieldError => logInfo("MDC is not supported.")
 }
@@ -923,7 +930,7 @@ private[spark] class Executor(
   private def cleanMDCForTask(taskName: String, mdc: Seq[(String, String)]): 
Unit = {
 try {
   mdc.foreach { case (key, _) => MDC.remove(key) }
-  MDC.remove(LogKeys.TASK_NAME.name)
+  MDC.remove(taskNameMDCKey)
 } catch {
   case _: NoSuchFieldError => logInfo("MDC is not supported.")
 }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index a5be6084de36..87402d2cc17e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -152,6 +152,16 @@ package object config {
   .booleanConf
   .createWithDefault(true)
 
+  private[spark] val LEGACY_TASK_NAME_MDC_ENABLED =
+ConfigBuilder("spark.log.legacyTaskNameMdc.enabled")
+  .doc("When true, the MDC (Mapped Diagnostic Context) key `mdc.taskName` 
will be set in the " +
+"log output, which is the behavior of Spark version 3.1 through Spark 
3.5 releases. " +
+"When false, the logging framework will use `task_name` as the MDC 
key, " +
+"aligning it with the naming convention of newer MDC keys introduced 
in Spark 4.0 release.")
+  .version("4.0.0")
+  .booleanConf
+  .createWithDefault(false)
+
   private[spark] val DRIVER_LOG_LOCAL_DIR =
 ConfigBuilder("spark.driver.log.localDir")
   .doc("Specifies a local directory to write driver logs and enable Driver 
Log UI Tab.")
diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md
index 95c7929a6241..28a9dd0f4371 100644
--- a/docs/core-migration-guide.md
+++ b/docs/core-migration-guide.md
@@ -46,6 +46,8 @@ license: |
   - Set the Spark configuration `spark.log.structuredLogging.enabled` to 
`false`. 
   - Use a custom log4j configuration 

(spark) branch master updated: [SPARK-48126][CORE] Make `spark.log.structuredLogging.enabled` effective

2024-05-07 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 6bbf6b1eff2c [SPARK-48126][CORE] Make 
`spark.log.structuredLogging.enabled` effective
6bbf6b1eff2c is described below

commit 6bbf6b1eff2cffe8d116ebba0194fac233b42348
Author: Gengliang Wang 
AuthorDate: Tue May 7 19:10:27 2024 -0700

[SPARK-48126][CORE] Make `spark.log.structuredLogging.enabled` effective

### What changes were proposed in this pull request?

Currently, the spark conf `spark.log.structuredLogging.enabled` is not 
taking effect. The current code base checks this config in the method 
`prepareSubmitEnvironment`. However, Log4j is already initialized before that.

This PR is to fix it by checking the config 
`spark.log.structuredLogging.enabled` before the initialization of Log4j.
Also, this PR enhances the doc for this configuration.

### Why are the changes needed?

Bug fix. After the fix, the Spark conf 
`spark.log.structuredLogging.enabled` takes effect.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manual test.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: GPT-4
I used GPT-4 to improve the documents.

Closes #46452 from gengliangwang/makeConfEffective.

Authored-by: Gengliang Wang 
Signed-off-by: Gengliang Wang 
---
 .../org/apache/spark/deploy/SparkSubmit.scala  | 33 --
 .../org/apache/spark/internal/config/package.scala |  9 +++---
 docs/configuration.md  |  6 +++-
 docs/core-migration-guide.md   |  4 ++-
 4 files changed, 31 insertions(+), 21 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 076aa8387dc5..5a7e5542cbd0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -69,10 +69,20 @@ private[spark] class SparkSubmit extends Logging {
 
   def doSubmit(args: Array[String]): Unit = {
 val appArgs = parseArguments(args)
+val sparkConf = appArgs.toSparkConf()
+
 // For interpreters, structured logging is disabled by default to avoid 
generating mixed
 // plain text and structured logs on the same console.
 if (isShell(appArgs.primaryResource) || isSqlShell(appArgs.mainClass)) {
   Logging.disableStructuredLogging()
+} else {
+  // For non-shell applications, enable structured logging if it's not 
explicitly disabled
+  // via the configuration `spark.log.structuredLogging.enabled`.
+  if (sparkConf.getBoolean(STRUCTURED_LOGGING_ENABLED.key, defaultValue = 
true)) {
+Logging.enableStructuredLogging()
+  } else {
+Logging.disableStructuredLogging()
+  }
 }
 // Initialize logging if it hasn't been done yet. Keep track of whether 
logging needs to
 // be reset before the application starts.
@@ -82,9 +92,9 @@ private[spark] class SparkSubmit extends Logging {
   logInfo(appArgs.toString)
 }
 appArgs.action match {
-  case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
-  case SparkSubmitAction.KILL => kill(appArgs)
-  case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
+  case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog, sparkConf)
+  case SparkSubmitAction.KILL => kill(appArgs, sparkConf)
+  case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs, 
sparkConf)
   case SparkSubmitAction.PRINT_VERSION => printVersion()
 }
   }
@@ -96,12 +106,11 @@ private[spark] class SparkSubmit extends Logging {
   /**
* Kill an existing submission.
*/
-  private def kill(args: SparkSubmitArguments): Unit = {
+  private def kill(args: SparkSubmitArguments, sparkConf: SparkConf): Unit = {
 if (RestSubmissionClient.supportsRestClient(args.master)) {
   new RestSubmissionClient(args.master)
 .killSubmission(args.submissionToKill)
 } else {
-  val sparkConf = args.toSparkConf()
   sparkConf.set("spark.master", args.master)
   SparkSubmitUtils
 .getSubmitOperations(args.master)
@@ -112,12 +121,11 @@ private[spark] class SparkSubmit extends Logging {
   /**
* Request the status of an existing submission.
*/
-  private def requestStatus(args: SparkSubmitArguments): Unit = {
+  private def requestStatus(args: SparkSubmitArguments, sparkConf: SparkConf): 
Unit = {
 if (RestSubmissionClient.supportsRestClient(args.master)) {
   new RestSubmissionClient(args.master)
 .requestSubmissionStatus(args.submissionToRequestStatusFor)
 } else {
-  val sparkConf = 

(spark) branch master updated: [SPARK-48045][PYTHON] Pandas API groupby with multi-agg-relabel ignores as_index=False

2024-05-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 67ae23934b56 [SPARK-48045][PYTHON] Pandas API groupby with 
multi-agg-relabel ignores as_index=False
67ae23934b56 is described below

commit 67ae23934b56761617c2fb217ae6cf6f2d8f619b
Author: sai 
AuthorDate: Wed May 8 09:44:16 2024 +0900

[SPARK-48045][PYTHON] Pandas API groupby with multi-agg-relabel ignores 
as_index=False

### What changes were proposed in this pull request?
In a Scenario where we use GroupBy in PySpark API with relabeling of 
aggregate columns and using as_index = False,
the columns with which we group by are not returned in the DataFrame. The 
change proposes to fix this bug.

Example:
ps.DataFrame({"a": [0, 0], "b": [0, 1]}).groupby("a", 
as_index=False).agg(b_max=("b", "max"))

Result:
_  b_max
0  1

Required Result:
_  a  b_max
0  0  1

### Why are the changes needed?
The relabeling part of the code only uses only the aggregate columns. In a 
scenario where as_index=True, it is not an issue as the columns with which we 
group by are included in the index. When as_index=False, we need to append the 
columns with which we grouped by to the relabeling code.

Please, check the commits/PR for the code changes

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Passed GA
- Passed Build tests
- Unit Tested including scenarios in addition to the one provided in the 
Jira ticket

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46391 from sinaiamonkar-sai/SPARK-48045-2.

Authored-by: sai 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/pandas/groupby.py|  7 ++-
 python/pyspark/pandas/tests/groupby/test_groupby.py | 21 +
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index ec47ab75c43c..55627a4c740c 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -308,6 +308,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
 )
 
 if not self._as_index:
+index_cols = psdf._internal.column_labels
 should_drop_index = set(
 i for i, gkey in enumerate(self._groupkeys) if gkey._psdf is 
not self._psdf
 )
@@ -322,8 +323,12 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
 psdf = psdf.reset_index(level=should_drop_index, drop=drop)
 if len(should_drop_index) < len(self._groupkeys):
 psdf = psdf.reset_index()
+index_cols = [c for c in psdf._internal.column_labels if c not in 
index_cols]
+if relabeling:
+psdf = psdf[pd.Index(index_cols + list(order))]
+psdf.columns = pd.Index([c[0] for c in index_cols] + 
list(columns))
 
-if relabeling:
+if relabeling and self._as_index:
 psdf = psdf[order]
 psdf.columns = columns  # type: ignore[assignment]
 return psdf
diff --git a/python/pyspark/pandas/tests/groupby/test_groupby.py 
b/python/pyspark/pandas/tests/groupby/test_groupby.py
index 5867f7b62fa5..b58bfddb4b99 100644
--- a/python/pyspark/pandas/tests/groupby/test_groupby.py
+++ b/python/pyspark/pandas/tests/groupby/test_groupby.py
@@ -451,6 +451,27 @@ class GroupByTestsMixin:
 pdf.groupby([("x", "a"), ("x", "b")]).diff().sort_index(),
 )
 
+def test_aggregate_relabel_index_false(self):
+pdf = pd.DataFrame(
+{
+"A": [0, 0, 1, 1, 1],
+"B": ["a", "a", "b", "a", "b"],
+"C": [10, 15, 10, 20, 30],
+}
+)
+psdf = ps.from_pandas(pdf)
+
+self.assert_eq(
+pdf.groupby(["B", "A"], as_index=False)
+.agg(C_MAX=("C", "max"))
+.sort_values(["B", "A"])
+.reset_index(drop=True),
+psdf.groupby(["B", "A"], as_index=False)
+.agg(C_MAX=("C", "max"))
+.sort_values(["B", "A"])
+.reset_index(drop=True),
+)
+
 
 class GroupByTests(
 GroupByTestsMixin,


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-47240][CORE][PART1] Migrate logInfo with variables to structured logging framework

2024-05-07 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new a15adeb3a215 [SPARK-47240][CORE][PART1] Migrate logInfo with variables 
to structured logging framework
a15adeb3a215 is described below

commit a15adeb3a215ad2ef7222e18112d23cdffa8569a
Author: Tuan Pham 
AuthorDate: Tue May 7 17:35:35 2024 -0700

[SPARK-47240][CORE][PART1] Migrate logInfo with variables to structured 
logging framework

The PR aims to migrate `logInfo` in Core module with variables to 
structured logging framework.

### Why are the changes needed?

To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46362 from zeotuan/coreInfo.

Lead-authored-by: Tuan Pham 
Co-authored-by: Gengliang Wang 
Signed-off-by: Gengliang Wang 
---
 .../scala/org/apache/spark/internal/LogKey.scala   | 48 ++-
 .../org/apache/spark/BarrierCoordinator.scala  | 17 ---
 .../org/apache/spark/BarrierTaskContext.scala  | 35 +++---
 .../apache/spark/ExecutorAllocationManager.scala   | 13 --
 .../scala/org/apache/spark/MapOutputTracker.scala  | 48 ---
 .../main/scala/org/apache/spark/SparkContext.scala | 54 ++
 .../apache/spark/api/python/PythonHadoopUtil.scala |  2 +-
 .../org/apache/spark/api/python/PythonRDD.scala|  6 ++-
 .../org/apache/spark/api/python/PythonRunner.scala |  2 +-
 .../org/apache/spark/api/python/PythonUtils.scala  |  7 +--
 .../spark/api/python/StreamingPythonRunner.scala   | 12 +++--
 .../scala/org/apache/spark/deploy/Client.scala | 18 
 .../spark/deploy/ExternalShuffleService.scala  | 10 ++--
 .../apache/spark/rdd/ReliableCheckpointRDD.scala   |  9 ++--
 .../spark/rdd/ReliableRDDCheckpointData.scala  |  6 ++-
 .../scala/org/apache/spark/ui/JettyUtils.scala |  7 ++-
 .../main/scala/org/apache/spark/ui/SparkUI.scala   |  4 +-
 .../scala/org/apache/spark/util/ListenerBus.scala  |  7 +--
 .../scala/org/apache/spark/util/SignalUtils.scala  |  2 +-
 19 files changed, 205 insertions(+), 102 deletions(-)

diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index c127f9c3d1f9..14e822c6349f 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -26,13 +26,15 @@ trait LogKey {
 }
 
 /**
- * Various keys used for mapped diagnostic contexts(MDC) in logging.
- * All structured logging keys should be defined here for standardization.
+ * Various keys used for mapped diagnostic contexts(MDC) in logging. All 
structured logging keys
+ * should be defined here for standardization.
  */
 object LogKeys {
   case object ACCUMULATOR_ID extends LogKey
+  case object ACTUAL_BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
   case object ACTUAL_NUM_FILES extends LogKey
   case object ACTUAL_PARTITION_COLUMN extends LogKey
+  case object ADDED_JARS extends LogKey
   case object AGGREGATE_FUNCTIONS extends LogKey
   case object ALPHA extends LogKey
   case object ANALYSIS_ERROR extends LogKey
@@ -42,7 +44,10 @@ object LogKeys {
   case object APP_NAME extends LogKey
   case object APP_STATE extends LogKey
   case object ARGS extends LogKey
+  case object AUTH_ENABLED extends LogKey
   case object BACKUP_FILE extends LogKey
+  case object BARRIER_EPOCH extends LogKey
+  case object BARRIER_ID extends LogKey
   case object BATCH_ID extends LogKey
   case object BATCH_NAME extends LogKey
   case object BATCH_TIMESTAMP extends LogKey
@@ -55,6 +60,7 @@ object LogKeys {
   case object BOOT extends LogKey
   case object BROADCAST extends LogKey
   case object BROADCAST_ID extends LogKey
+  case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
   case object BUCKET extends LogKey
   case object BYTECODE_SIZE extends LogKey
   case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey
@@ -62,6 +68,7 @@ object LogKeys {
   case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey
   case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey
   case object CALL_SITE_LONG_FORM extends LogKey
+  case object CALL_SITE_SHORT_FORM extends LogKey
   case object CATALOG_NAME extends LogKey
   case object CATEGORICAL_FEATURES extends LogKey
   case object CHECKPOINT_FILE extends LogKey
@@ -142,11 +149,13 @@ object LogKeys {
   case object DEPRECATED_KEY extends LogKey
   case object DESCRIPTION extends LogKey
   case object DESIRED_NUM_PARTITIONS extends LogKey
+  case object 

(spark) branch master updated (5e49665ac39b -> 553e1b85c42a)

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 5e49665ac39b [SPARK-47960][SS] Allow chaining other stateful operators 
after transformWithState operator
 add 553e1b85c42a [SPARK-48152][BUILD] Make `spark-profiler` as a part of 
release and publish to maven central repo

No new revisions were added by this update.

Summary of changes:
 .github/workflows/maven_test.yml| 10 +-
 connector/profiler/README.md|  2 +-
 connector/profiler/pom.xml  |  6 +-
 dev/create-release/release-build.sh |  2 +-
 dev/test-dependencies.sh|  2 +-
 docs/building-spark.md  |  7 +++
 pom.xml |  3 +++
 7 files changed, 23 insertions(+), 9 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-47960][SS] Allow chaining other stateful operators after transformWithState operator

2024-05-07 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 5e49665ac39b [SPARK-47960][SS] Allow chaining other stateful operators 
after transformWithState operator
5e49665ac39b is described below

commit 5e49665ac39b49b875d6970f93df59aedd830fa5
Author: Bhuwan Sahni 
AuthorDate: Wed May 8 09:20:01 2024 +0900

[SPARK-47960][SS] Allow chaining other stateful operators after 
transformWithState operator

### What changes were proposed in this pull request?

This PR adds support to define event time column in the output dataset of 
`TransformWithState` operator. The new event time column will be used to 
evaluate watermark expressions in downstream operators.

1. Note that the transformWithState operator does not enforce that values 
generated by user's computation adhere to the watermark semantics. (no output 
rows are generated which have event time less than watermark).
2. Updated the watermark value passed in TimerInfo as evictionWatermark, 
rather than lateEventsWatermark.
3. Ensure that event time column can only be defined in output if a 
watermark has been defined previously.

### Why are the changes needed?

This change is required to support chaining of stateful operators after 
`transformWithState`. Event time column is required to evaluate watermark 
expressions in downstream stateful operators.

### Does this PR introduce _any_ user-facing change?

Yes. Adds a new version of transformWithState API which allows redefining 
the event time column.

### How was this patch tested?

Added unit test cases.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #45376 from sahnib/tws-chaining-stateful-operators.

Authored-by: Bhuwan Sahni 
Signed-off-by: Jungtaek Lim 
---
 .../src/main/resources/error/error-conditions.json |  14 +
 .../spark/sql/catalyst/analysis/Analyzer.scala |   3 +
 .../ResolveUpdateEventTimeWatermarkColumn.scala|  52 +++
 .../plans/logical/EventTimeWatermark.scala |  79 +++-
 .../spark/sql/catalyst/plans/logical/object.scala  |   2 +-
 .../sql/catalyst/rules/RuleIdCollection.scala  |   1 +
 .../spark/sql/catalyst/trees/TreePatterns.scala|   1 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |   7 +
 .../spark/sql/errors/QueryExecutionErrors.scala|  11 +
 .../apache/spark/sql/KeyValueGroupedDataset.scala  | 178 -
 .../spark/sql/execution/SparkStrategies.scala  |  12 +
 .../streaming/EventTimeWatermarkExec.scala |  88 -
 .../execution/streaming/IncrementalExecution.scala |  24 +-
 .../streaming/TransformWithStateExec.scala |  32 +-
 .../execution/streaming/statefulOperators.scala|   2 +-
 .../TransformWithStateChainingSuite.scala  | 411 +
 16 files changed, 866 insertions(+), 51 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index bae94a0ab97e..8a64c4c590e8 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -125,6 +125,12 @@
 ],
 "sqlState" : "428FR"
   },
+  "CANNOT_ASSIGN_EVENT_TIME_COLUMN_WITHOUT_WATERMARK" : {
+"message" : [
+  "Watermark needs to be defined to reassign event time column. Failed to 
find watermark definition in the streaming query."
+],
+"sqlState" : "42611"
+  },
   "CANNOT_CAST_DATATYPE" : {
 "message" : [
   "Cannot cast  to ."
@@ -1057,6 +1063,14 @@
 },
 "sqlState" : "4274K"
   },
+  "EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED" : {
+"message" : [
+  "Previous node emitted a row with eventTime= which 
is older than current_watermark_value=",
+  "This can lead to correctness issues in the stateful operators 
downstream in the execution pipeline.",
+  "Please correct the operator logic to emit rows after current global 
watermark value."
+],
+"sqlState" : "42815"
+  },
   "EMPTY_JSON_FIELD_VALUE" : {
 "message" : [
   "Failed to parse an empty string for data type ."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index c29432c916f9..55b6f1af7fd8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -331,6 +331,7 @@ class Analyzer(override val catalogManager: CatalogManager) 
extends RuleExecutor
   Seq(
 ResolveWithCTE,
 ExtractDistributedSequenceID) ++
+  

(spark) branch branch-3.5 updated: [SPARK-48178][INFRA][3.5] Run `build/scala-213/java-11-17` jobs of `branch-3.5` only if needed

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 15b5d2a55837 [SPARK-48178][INFRA][3.5] Run 
`build/scala-213/java-11-17` jobs of `branch-3.5` only if needed
15b5d2a55837 is described below

commit 15b5d2a558371395547461d7b37f20610432dea0
Author: Dongjoon Hyun 
AuthorDate: Tue May 7 15:54:50 2024 -0700

[SPARK-48178][INFRA][3.5] Run `build/scala-213/java-11-17` jobs of 
`branch-3.5` only if needed

### What changes were proposed in this pull request?

This PR aims to run `build`, `scala-213`, and `java-11-17` job of 
`branch-3.5` only if needed to reduce the maximum concurrency of Apache Spark 
GitHub Action usage.

### Why are the changes needed?

To meet ASF Infra GitHub Action policy, we need to reduce the maximum 
concurrency.
- https://infra.apache.org/github-actions-policy.html

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46449 from dongjoon-hyun/SPARK-48178.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .github/workflows/build_and_test.yml | 9 -
 1 file changed, 4 insertions(+), 5 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index fa40b2d0a390..9c3dc95d0f66 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -85,17 +85,16 @@ jobs:
   sparkr=`./dev/is-changed.py -m sparkr`
   tpcds=`./dev/is-changed.py -m sql`
   docker=`./dev/is-changed.py -m docker-integration-tests`
-  # 'build', 'scala-213', and 'java-11-17' are always true for now.
-  # It does not save significant time and most of PRs trigger the 
build.
+  build=`./dev/is-changed.py -m 
"core,unsafe,kvstore,avro,utils,network-common,network-shuffle,repl,launcher,examples,sketch,graphx,catalyst,hive-thriftserver,streaming,sql-kafka-0-10,streaming-kafka-0-10,mllib-local,mllib,yarn,mesos,kubernetes,hadoop-cloud,spark-ganglia-lgpl,sql,hive"`
   precondition="
 {
-  \"build\": \"true\",
+  \"build\": \"$build\",
   \"pyspark\": \"$pyspark\",
   \"sparkr\": \"$sparkr\",
   \"tpcds-1g\": \"$tpcds\",
   \"docker-integration-tests\": \"$docker\",
-  \"scala-213\": \"true\",
-  \"java-11-17\": \"true\",
+  \"scala-213\": \"$build\",
+  \"java-11-17\": \"$build\",
   \"lint\" : \"true\",
   \"k8s-integration-tests\" : \"true\",
   \"breaking-changes-buf\" : \"true\",


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-48173][SQL][3.5] CheckAnalysis should see the entire query plan

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 2f8e7cbe98df [SPARK-48173][SQL][3.5] CheckAnalysis should see the 
entire query plan
2f8e7cbe98df is described below

commit 2f8e7cbe98df97ee0ae51a20796192c95e750721
Author: Wenchen Fan 
AuthorDate: Tue May 7 15:25:15 2024 -0700

[SPARK-48173][SQL][3.5] CheckAnalysis should see the entire query plan

backport https://github.com/apache/spark/pull/46439 to 3.5

### What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/38029 . Some 
custom check rules need to see the entire query plan tree to get some context, 
but https://github.com/apache/spark/pull/38029 breaks it as it checks the query 
plan of dangling CTE relations recursively.

This PR fixes it by putting back the dangling CTE relation in the main 
query plan and then check the main query plan.

### Why are the changes needed?

Revert the breaking change to custom check rules

### Does this PR introduce _any_ user-facing change?

No for most users. This restores the behavior of Spark 3.3 and earlier for 
custom check rules.

### How was this patch tested?

existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #46442 from cloud-fan/check2.

Lead-authored-by: Wenchen Fan 
Co-authored-by: Wenchen Fan 
Signed-off-by: Dongjoon Hyun 
---
 .../sql/catalyst/analysis/CheckAnalysis.scala  | 38 +++---
 1 file changed, 33 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 7f10bdbc80ca..485015f2efab 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -141,17 +141,45 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
   errorClass, missingCol, orderedCandidates, a.origin)
   }
 
+  private def checkUnreferencedCTERelations(
+  cteMap: mutable.Map[Long, (CTERelationDef, Int, mutable.Map[Long, Int])],
+  visited: mutable.Map[Long, Boolean],
+  danglingCTERelations: mutable.ArrayBuffer[CTERelationDef],
+  cteId: Long): Unit = {
+if (visited(cteId)) {
+  return
+}
+val (cteDef, _, refMap) = cteMap(cteId)
+refMap.foreach { case (id, _) =>
+  checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, id)
+}
+danglingCTERelations.append(cteDef)
+visited(cteId) = true
+  }
+
   def checkAnalysis(plan: LogicalPlan): Unit = {
 val inlineCTE = InlineCTE(alwaysInline = true)
 val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int, 
mutable.Map[Long, Int])]
 inlineCTE.buildCTEMap(plan, cteMap)
-cteMap.values.foreach { case (relation, refCount, _) =>
-  // If a CTE relation is never used, it will disappear after inline. Here 
we explicitly check
-  // analysis for it, to make sure the entire query plan is valid.
-  if (refCount == 0) checkAnalysis0(relation.child)
+val danglingCTERelations = mutable.ArrayBuffer.empty[CTERelationDef]
+val visited: mutable.Map[Long, Boolean] = 
mutable.Map.empty.withDefaultValue(false)
+// If a CTE relation is never used, it will disappear after inline. Here 
we explicitly collect
+// these dangling CTE relations, and put them back in the main query, to 
make sure the entire
+// query plan is valid.
+cteMap.foreach { case (cteId, (_, refCount, _)) =>
+  // If a CTE relation ref count is 0, the other CTE relations that 
reference it should also be
+  // collected. This code will also guarantee the leaf relations that do 
not reference
+  // any others are collected first.
+  if (refCount == 0) {
+checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, 
cteId)
+  }
 }
 // Inline all CTEs in the plan to help check query plan structures in 
subqueries.
-checkAnalysis0(inlineCTE(plan))
+var inlinedPlan: LogicalPlan = inlineCTE(plan)
+if (danglingCTERelations.nonEmpty) {
+  inlinedPlan = WithCTE(inlinedPlan, danglingCTERelations.toSeq)
+}
+checkAnalysis0(inlinedPlan)
 plan.setAnalyzed()
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-48179][INFRA][3.5] Pin `nbsphinx` to `0.9.3`

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new a24ec1d8f76c [SPARK-48179][INFRA][3.5] Pin `nbsphinx` to `0.9.3`
a24ec1d8f76c is described below

commit a24ec1d8f76c7bf47e491086f14ea202b6806cd8
Author: Dongjoon Hyun 
AuthorDate: Tue May 7 15:23:24 2024 -0700

[SPARK-48179][INFRA][3.5] Pin `nbsphinx` to `0.9.3`

### What changes were proposed in this pull request?

This PR aims to pin `nbsphinx` to `0.9.3` to recover `branch-3.5` CI.

### Why are the changes needed?

From yesterday, `branch-3.5` commit build is broken.
- https://github.com/apache/spark/actions/runs/8978558438/job/24659197282
```
Exception occurred:
  File "/usr/local/lib/python3.9/dist-packages/nbsphinx/__init__.py", line 
1316, in apply
for section in self.document.findall(docutils.nodes.section):
AttributeError: 'document' object has no attribute 'findall'
The full traceback has been saved in /tmp/sphinx-err-qz4y0bav.log, if you 
want to report the issue to the developers.
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs on this PR.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46448 from dongjoon-hyun/nbsphinx.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .github/workflows/build_and_test.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 8488540b415d..fa40b2d0a390 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -682,7 +682,7 @@ jobs:
 #   See also https://issues.apache.org/jira/browse/SPARK-35375.
 # Pin the MarkupSafe to 2.0.1 to resolve the CI error.
 #   See also https://issues.apache.org/jira/browse/SPARK-38279.
-python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme 
'sphinx-copybutton==0.5.2' nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 
'pyzmq<24.0.0' 'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 
'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 
'sphinxcontrib-serializinghtml==1.1.5' 'nest-asyncio==1.5.8' 'rpds-py==0.16.2' 
'alabaster==0.7.13'
+python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme 
'sphinx-copybutton==0.5.2' 'nbsphinx==0.9.3' numpydoc 'jinja2<3.0.0' 
'markupsafe==2.0.1' 'pyzmq<24.0.0' 'sphinxcontrib-applehelp==1.0.4' 
'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 
'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5' 
'nest-asyncio==1.5.8' 'rpds-py==0.16.2' 'alabaster==0.7.13'
 python3.9 -m pip install ipython_genutils # See SPARK-38517
 python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' 
'pyarrow==12.0.1' pandas 'plotly>=4.8'
 python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-48167][PYTHON][TESTS][FOLLOWUP][3.5] Reformat test_readwriter.py to fix Python Linter error

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 03bc2b188d21 [SPARK-48167][PYTHON][TESTS][FOLLOWUP][3.5] Reformat 
test_readwriter.py to fix Python Linter error
03bc2b188d21 is described below

commit 03bc2b188d2111b5c4cc5bc13ebd0455602028a8
Author: Dongjoon Hyun 
AuthorDate: Tue May 7 13:38:08 2024 -0700

[SPARK-48167][PYTHON][TESTS][FOLLOWUP][3.5] Reformat test_readwriter.py to 
fix Python Linter error

### What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/46430 to fix 
Python linter failure.

### Why are the changes needed?

To recover `branch-3.5` CI,
- https://github.com/apache/spark/actions/runs/8981228745/job/24666400664

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass Python Linter in this PR builder.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46445 from dongjoon-hyun/SPARK-48167.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/sql/tests/test_readwriter.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_readwriter.py 
b/python/pyspark/sql/tests/test_readwriter.py
index e903d3383b74..7911a82c61fc 100644
--- a/python/pyspark/sql/tests/test_readwriter.py
+++ b/python/pyspark/sql/tests/test_readwriter.py
@@ -247,7 +247,8 @@ class ReadwriterV2TestsMixin:
 self.assertEqual(100, self.spark.sql("select * from 
test_table").count())
 
 @unittest.skipIf(
-"SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Known behavior 
change in 4.0")
+"SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Known behavior 
change in 4.0"
+)
 def test_create_without_provider(self):
 df = self.df
 with self.assertRaisesRegex(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48134][CORE] Spark core (java side): Migrate `error/warn/info` with variables to structured logging framework

2024-05-07 Thread gengliang
This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3d9d1f3dc05a [SPARK-48134][CORE] Spark core (java side): Migrate 
`error/warn/info` with variables to structured logging framework
3d9d1f3dc05a is described below

commit 3d9d1f3dc05a2825bf315c68fc4e4232354dbd00
Author: panbingkun 
AuthorDate: Tue May 7 13:08:00 2024 -0700

[SPARK-48134][CORE] Spark core (java side): Migrate `error/warn/info` with 
variables to structured logging framework

### What changes were proposed in this pull request?
The pr aims to
1.migrate `error/warn/info` in module `core` with variables to `structured 
logging framework` for java side.
2.convert all dependencies on `org.slf4j.Logger & org.slf4j.LoggerFactory` 
to `org.apache.spark.internal.Logger & 
org.apache.spark.internal.LoggerFactory`, in order to completely `prohibit` 
importing `org.slf4j.Logger & org.slf4j.LoggerFactory` in java code later.

### Why are the changes needed?
To enhance Apache Spark's logging system by implementing structured logging.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #46390 from panbingkun/core_java_sl.

Authored-by: panbingkun 
Signed-off-by: Gengliang Wang 
---
 .../java/org/apache/spark/internal/Logger.java | 21 +++-
 .../scala/org/apache/spark/internal/LogKey.scala   |  9 +++
 .../org/apache/spark/io/ReadAheadInputStream.java  | 19 ---
 .../org/apache/spark/memory/TaskMemoryManager.java | 28 +++---
 .../shuffle/sort/BypassMergeSortShuffleWriter.java |  9 ---
 .../spark/shuffle/sort/ShuffleExternalSorter.java  | 25 ++-
 .../spark/shuffle/sort/UnsafeShuffleWriter.java|  9 ---
 .../sort/io/LocalDiskShuffleMapOutputWriter.java   | 10 
 .../apache/spark/unsafe/map/BytesToBytesMap.java   | 12 ++
 .../unsafe/sort/UnsafeExternalSorter.java  | 21 +---
 .../unsafe/sort/UnsafeSorterSpillReader.java   |  4 ++--
 11 files changed, 113 insertions(+), 54 deletions(-)

diff --git a/common/utils/src/main/java/org/apache/spark/internal/Logger.java 
b/common/utils/src/main/java/org/apache/spark/internal/Logger.java
index 2b4dd3bb45bc..d8ab26424bae 100644
--- a/common/utils/src/main/java/org/apache/spark/internal/Logger.java
+++ b/common/utils/src/main/java/org/apache/spark/internal/Logger.java
@@ -34,6 +34,10 @@ public class Logger {
 this.slf4jLogger = slf4jLogger;
   }
 
+  public boolean isErrorEnabled() {
+return slf4jLogger.isErrorEnabled();
+  }
+
   public void error(String msg) {
 slf4jLogger.error(msg);
   }
@@ -58,6 +62,10 @@ public class Logger {
 }
   }
 
+  public boolean isWarnEnabled() {
+return slf4jLogger.isWarnEnabled();
+  }
+
   public void warn(String msg) {
 slf4jLogger.warn(msg);
   }
@@ -82,6 +90,10 @@ public class Logger {
 }
   }
 
+  public boolean isInfoEnabled() {
+return slf4jLogger.isInfoEnabled();
+  }
+
   public void info(String msg) {
 slf4jLogger.info(msg);
   }
@@ -106,6 +118,10 @@ public class Logger {
 }
   }
 
+  public boolean isDebugEnabled() {
+return slf4jLogger.isDebugEnabled();
+  }
+
   public void debug(String msg) {
 slf4jLogger.debug(msg);
   }
@@ -126,6 +142,10 @@ public class Logger {
 slf4jLogger.debug(msg, throwable);
   }
 
+  public boolean isTraceEnabled() {
+return slf4jLogger.isTraceEnabled();
+  }
+
   public void trace(String msg) {
 slf4jLogger.trace(msg);
   }
@@ -146,7 +166,6 @@ public class Logger {
 slf4jLogger.trace(msg, throwable);
   }
 
-
   private void withLogContext(
   String pattern,
   MDC[] mdcs,
diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala 
b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
index d4e1d9f535af..c127f9c3d1f9 100644
--- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
+++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala
@@ -168,6 +168,7 @@ object LogKeys {
   case object EXCEPTION extends LogKey
   case object EXECUTE_INFO extends LogKey
   case object EXECUTE_KEY extends LogKey
+  case object EXECUTION_MEMORY_SIZE extends LogKey
   case object EXECUTION_PLAN_LEAVES extends LogKey
   case object EXECUTOR_BACKEND extends LogKey
   case object EXECUTOR_DESIRED_COUNT extends LogKey
@@ -302,6 +303,7 @@ object LogKeys {
   case object MAX_SLOTS extends LogKey
   case object MAX_SPLIT_BYTES extends LogKey
   case object MAX_TABLE_PARTITION_METADATA_SIZE extends LogKey
+  case object MEMORY_CONSUMER extends LogKey
   case object MEMORY_POOL_NAME extends LogKey
   case 

(spark) branch master updated (26c50369edb2 -> e24f8965e066)

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 26c50369edb2 [SPARK-48174][INFRA] Merge `connect` back to the original 
test pipeline
 add e24f8965e066 [SPARK-48037][CORE] Fix SortShuffleWriter lacks shuffle 
write related metrics resulting in potentially inaccurate data

No new revisions were added by this update.

Summary of changes:
 .../spark/shuffle/sort/SortShuffleManager.scala|  2 +-
 .../spark/shuffle/sort/SortShuffleWriter.scala |  6 +++---
 .../spark/util/collection/ExternalSorter.scala |  9 +
 .../shuffle/sort/SortShuffleWriterSuite.scala  |  3 +++
 .../sql/execution/UnsafeRowSerializerSuite.scala   |  3 ++-
 .../adaptive/AdaptiveQueryExecSuite.scala  | 23 ++
 6 files changed, 37 insertions(+), 9 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r69013 - /dev/spark/KEYS

2024-05-07 Thread wenchen
Author: wenchen
Date: Tue May  7 17:07:43 2024
New Revision: 69013

Log:
Update KEYS

Modified:
dev/spark/KEYS

Modified: dev/spark/KEYS
==
--- dev/spark/KEYS (original)
+++ dev/spark/KEYS Tue May  7 17:07:43 2024
@@ -704,61 +704,62 @@ kyHyHY5kPG9HfDOSahPz
 =SDAz
 -END PGP PUBLIC KEY BLOCK-
 
-pub   4096R/4F4FDC8A 2018-09-18
-uid  Wenchen Fan (CODE SIGNING KEY) 
-sub   4096R/6F3F5B0E 2018-09-18
+pub   rsa4096 2024-05-07 [SC]
+  4DC9676CEF9A83E98FCA02784D6620843CD87F5A
+uid  Wenchen Fan (CODE SIGNING KEY) 
+sub   rsa4096 2024-05-07 [E]
 
 -BEGIN PGP PUBLIC KEY BLOCK-
-Version: GnuPG v1
 
-mQINBFugiYgBEAC4DsJBWF3VjWiKEiD8XNPRTg3Bnw52fe4bTB9Jvh/q0VStJjO7
-CSHZ1/P5h60zbS5UWLP2mt+c0FaW6wv7PxafCnd1MPENGBkttZbC4UjWDSbPp0vx
-fkUfrAqflWvO1AaCveg2MlyQdLZ1HwVz+PDLWqE+Ev2p3Si4Jfx5P2O9FmWt8a/b
-Wea/4gfy/5zFWRberQjt4CkSBuNU+cOo19/n32JJJYbRqrzFAGs/DJUIxNXC1qef
-c2iB3dyff1mkLb9Vzd1RfhZaSNUElo67o4Vi6SswgvHxoE03wIcoJvBTafqLxy6p
-mt5SAzOyvvmOVcLNqP9i5+c4sBrxvQ2ZEZrZt7dKfhbh4W8ged/TNWMoNOCX2usD
-Fj17KrFAEaeqtEwRdwZMxGqKI/NxANkdPSxS4T/JQoi+N6LBJ88yzmeCquA8MT0b
-/H4ziyjgrSRugCE6jcsbuObQsDxiqPSSXeWSjPoYq876JcqAgZzSYYdlGVw2J9Vb
-46hhEqhGk+91vK6CtyuhKv5KXk1B3Rhhc5znKWcahD3cpISxwTSzN9OwQHEd8Ovv
-x0WAhY3WOexrBekH7Sy00gjaHSAHFj3ReITfffWkv6t4TGLyohEOfgdxFvq03Fhd
-p7bWDmux47jP6AUUjP0VXRsG9ev3ch+bbcbRlo15HPBtyehoPn4BellFAQARAQAB
+mQINBGY6XpcBEADBeNz3IBYriwrPzMYJJO5u1DaWAJ4Sryx6PUZgvssrcqojYVTh
+MjtlBkWRcNquAyDrVlU1vtq1yMq5KopQoAEi/l3xaEDZZ0IFAob6+GlGXEon2Jvf
+0FXQsx+Df4nMVl7KPqh68T++Z4GkvK5wyyN9uaUTWL2deGeinVxTh6qWQT8YiCd5
+wof+Dk5IIzKQ5VIBhU/U9S0jo/pqhH4okcZGTyT2Q7sfg4eXl5+Y2OR334RkvTcX
+uJjcnJ8BUbBSm1UhNg4OGBEJgi+lE1GEgw4juOfTAPh9fx8SCLhuX0m6Qc/y9bAK
+Q4zejbF5F2Um9dqrZqg6Egp+nlzydn59hq9owSnQ6JdoA/PLcgoign0sghu9xGCR
+GpgI2kS7Q8bu6dy7T0BfUerLZ1FHu7nCT2ZNSIh/Y2eOhuBhUr3llg8xa3PZZob/
+2sZE2dJ3g/qp2Nbo+s5Q5kELtuo6cZD0EISQwt68hGWIgxs0vtci2c2kQYFS0oqw
+fGynEeDFZRHV3ET5rioYaoPi70Cnibght5ocL0t6sl0RQQVp6k2i1aofJbZA480N
+ivuJ5agGaSRxmIDk6JlDsHJGxO9oC066ZLJiR6i0JUinGP7sw/nNmgup/AB+y4hW
+9WdeAFyYmuYysDRRyE6z1MPDp1R00MyGxHNFDF64/JPY/nKKFdXp+aCazwARAQAB
 tDNXZW5jaGVuIEZhbiAoQ09ERSBTSUdOSU5HIEtFWSkgPHdlbmNoZW5AYXBhY2hl
-Lm9yZz6JAjgEEwECACIFAlugiYgCGwMGCwkIBwMCBhUIAgkKCwQWAgMBAh4BAheA
-AAoJEGuscolPT9yKvqUP/i34exSQNs9NcDvjOQhrvpjYCarL4mdQZOjIn9JWxeWr
-3nkzC9ozEIrb1zt8pqhiYr6qJhmx2EJgIwZTZZ9O0qHFMmYhYn/9/KKidE0XN6t3
-dFcbtRB1PGlc9b34PZNfdhD8PWA/UB1QC0TdTRNKhrIGGIZocrkaBral6uMJZAyV
-kbb+s21cRupPLM2wmU1k3U4WxnaIq2foErhaPC9+OEDAcLH/OxwiekJTCsvZypzE
-1laxo21rX1kgYzeAuqP4BfX5ARyrfM3O31Gh8asrx1bXD4z7dHqJxdJjh7ycdJdT
-VLcqy6LVsRanubiJ7cg4LkEz7Xxm3RLC3ZLbnYr72ljV5wzQ3r9gE+5rEnacZw7P
-9fQkAY5W2fqgfSn6Zx2SwPGhXosMdlp4zAWH3aCZj+DAx8XNG0sm3RE465zGxI9w
-jIE8V9RYhUoQnmfQA+lqUIrkmEneXYPUvct6F6B5tRNOXsceVG+c4O3o+ueJKEv3
-DvI4UyGD8q5k7IT24sOKa3CZmq3dsutfpecZdbrPT5MV13vrdJ0DZufpAi/mN3vL
-obkYyKRwyObUCiG6K9REoUJbes6+oX7SDXUCE1AI6UJrSuSXHazY/FWUjZaZClPR
-yxO51LjsEbkmzKggtZMljlAnTfRi/sEt7pC0bx1SCwoBKfLUbXclRJTtr1Ac6MiM
-uQINBFugiYgBEADNXxoluY3P7UvGAjprc03pHmg+VHzDQCHZpQ21Qgg4cUgEsckd
-J6Vd7CrqLbTuYrQc7vq1UKV/5HWo3d8nK4eeM77DYo7fbmy1gMZ2okHy/EpV5i6d
-gArAQM/nbEiCB9grgiFrLdApfPzeHHGXSbrF3BWyIkc/5mJ+VuLC+FY3xWDvHNAf
-G8m7zrV5BlKVZl+WYRd9LQEvhaZG74ettro2zkT7UGagdFZtNFzUAQb3hnbYvlpY
-OTJu1zUEkeYDsAgp+nbuAbgkGcH2iohsozVx2kKPzmRLmYmVeh725AtHJCsDbyuS
-863bf1+xEz1K7k2P7FMbf8R1I8qxOjTA6tCRSkSYxPzEgcYB/ShAtX6N/J/mubsP
-ow5NKAsSfdvH/Xk66umAFu2b0V1B8gFvKifZ4GEmKkJkLJ0Pw5Bspsx56BDqQCaf
-d1lJcmm7MEQTydgC37UhfJIISdnJ/MGjT8oYyKhxfcfjQ6Gp2/nY0O9akg43chOl
-ZvDk9EmuX+oEmM2aMBSIcOwEPvOCaIN62D8m/PeVYzuYBCYEkcURLMN8NnxLhu2q
-MYjdKG1tc5tkxyaxJQLTTvxV6xYcz1qZZbk/YKp2yAIfMFJElhPkqidB0pGNIncC
-myAnbrgDzEPzRIH9KVrFMb4Gzq3l+/ZSPxSLwxAO9I0o8jmfB0QSpZAdqwARAQAB
-iQIfBBgBAgAJBQJboImIAhsMAAoJEGuscolPT9yKeQ0P/jykTh5nrHDaldxZE3eu
-Z6I7fJXJEV6i5tmve5Q827EnEaLK07XlaHfP+/sYy9fWs16sNTLHgoNNtfUn738J
-W08UtEADGFp6DntZg0+h5CksDNXr4t06ndqyniBMw1aTClrfW/9O8240VlrCrveD
-GQmFziY1DePKV9pR0A6xsOGhLtL057T5uJ64lPJewgbg7X7dD0OlHYgcNYn+XGAY
-OJYHuk5Nchm47iaciU3kDuECLWmDqyMBTT+B5/hcrwR9zUc+NjCjVVdsxxZlSixw
-TOQvYxTdqhMwMedUuWmTkJ1XDU6AJSx013ma9OC1tKqwmjaSUTQDZFKVPy6yL+YH
-SqRuC0M0p151rnMwljs2hPwuOOEm82PVsGKNytXtilt5LheGwyBPHTnX1ai2kvUC
-HDcwOSR/CVeRLsWi7JBTXsfkflEuwasCEags42R8HzSCGYZwm8cauHnBQ9BxZVCr
-iEdCcXfL2dgEIeiI5FXDlNzeXKihatHUYGBeZG0OO4oKoRKieF+pRvEJv4X7HRiL
-Tj8aAFzBcLb5RZ0I9PIILQyy16B4ZdjyDjrjeIx1hxO7J/ZRX2DA62/yxo0Qa0SF
-qmd2/zxUPGuOvX/e79ATOM8KMQFbGikLTcjW9gcUuBnLPCu0WN2KF+ykz33y1rH9
-DFZYf6DnKNwDnBzBuAx2S+P0
-=a0mL
+Lm9yZz6JAlEEEwEIADsWIQRNyWds75qD6Y/KAnhNZiCEPNh/WgUCZjpelwIbAwUL
+CQgHAgIiAgYVCgkICwIEFgIDAQIeBwIXgAAKCRBNZiCEPNh/WkofD/9sI7J3i9Ck
+NOlHpVnjAaHjyGX5cVA2dZGniJdLf5yOKOI6pu7dMW+NThsXO1Iv+BRYo7una6/Q
+vUquKKxCXIN3vNmKIB1e9lj4MaIhCRmXUSQxjkVa9JW3P/F520Ct3VjiCZ5IjPv+
+g1hF/wrkuuoAFlcC/bfGWafkaZgszavSpCdp27mUXUNbvLW0dPJ3+ay4cDPuT1DI
+6DhB8qpqN7gInDFACW2qtQ2KZh1JFGy5ZccQ9dB3t/B4BYiUie6a3eQWgKqLF1hw
+8yHY3DkCVGfnXJk4+LMWqgazQxoB6oZjBvoQYtGOPXr1ZbmtiRHCDM5KmZ+QmIXB

(spark) branch master updated: [SPARK-48174][INFRA] Merge `connect` back to the original test pipeline

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 26c50369edb2 [SPARK-48174][INFRA] Merge `connect` back to the original 
test pipeline
26c50369edb2 is described below

commit 26c50369edb21d616361a4b22a555ed7b7412a4e
Author: Dongjoon Hyun 
AuthorDate: Tue May 7 09:34:59 2024 -0700

[SPARK-48174][INFRA] Merge `connect` back to the original test pipeline

### What changes were proposed in this pull request?

This PR aims to merge connect back to the original test pipeline to reduce 
the maximum concurrency of GitHub Action by one.
- https://infra.apache.org/github-actions-policy.html
  > All workflows SHOULD have a job concurrency level less than or equal to 
15.

### Why are the changes needed?

This is a partial recover from the following.
- #45107

We stabilized the root cause of #45107 via the following PRs. In addition 
we will disable a flaky test case if exists.

- #46395
- #46396
- #46425

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46441 from dongjoon-hyun/SPARK-48174.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .github/workflows/build_and_test.yml | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 286f8e1193d9..00ba16265dce 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -156,9 +156,8 @@ jobs:
 mllib-local, mllib, graphx
   - >-
 streaming, sql-kafka-0-10, streaming-kafka-0-10, 
streaming-kinesis-asl,
-kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf
+kubernetes, hadoop-cloud, spark-ganglia-lgpl, protobuf, connect
   - yarn
-  - connect
 # Here, we split Hive and SQL tests into some of slow ones and the 
rest of them.
 included-tags: [""]
 excluded-tags: [""]


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48035][SQL][FOLLOWUP] Fix try_add/try_multiply being semantic equal to add/multiply

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 808186835077 [SPARK-48035][SQL][FOLLOWUP] Fix try_add/try_multiply 
being semantic equal to add/multiply
808186835077 is described below

commit 808186835077cf50f10262c633f19de4ccc09d9d
Author: Supun Nakandala 
AuthorDate: Tue May 7 09:17:01 2024 -0700

[SPARK-48035][SQL][FOLLOWUP] Fix try_add/try_multiply being semantic equal 
to add/multiply

### What changes were proposed in this pull request?
- This is a follow-up to the previous PR: 
https://github.com/apache/spark/pull/46307.
- With the new changes we do the evalMode check in the `collectOperands` 
function instead of introducing a new function.

### Why are the changes needed?
- Better code quality and readability.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- Existing unit tests.

### Was this patch authored or co-authored using generative AI tooling?
- No

Closes #46414 from db-scnakandala/db-scnakandala/master.

Authored-by: Supun Nakandala 
Signed-off-by: Dongjoon Hyun 
---
 .../sql/catalyst/expressions/Expression.scala  | 14 -
 .../sql/catalyst/expressions/arithmetic.scala  | 23 --
 2 files changed, 8 insertions(+), 29 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
index 2759f5a29c79..de15ec43c4f3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
@@ -1378,20 +1378,6 @@ trait CommutativeExpression extends Expression {
   }
 reorderResult
   }
-
-  /**
-   * Helper method to collect the evaluation mode of the commutative 
expressions. This is
-   * used by the canonicalized methods of [[Add]] and [[Multiply]] operators 
to ensure that
-   * all operands have the same evaluation mode before reordering the operands.
-   */
-  protected def collectEvalModes(
-  e: Expression,
-  f: PartialFunction[CommutativeExpression, Seq[EvalMode.Value]]
-  ): Seq[EvalMode.Value] = e match {
-case c: CommutativeExpression if f.isDefinedAt(c) =>
-  f(c) ++ c.children.flatMap(collectEvalModes(_, f))
-case _ => Nil
-  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
index 91c10a53af8a..a085a4e3a8a3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala
@@ -452,14 +452,12 @@ case class Add(
 copy(left = newLeft, right = newRight)
 
   override lazy val canonicalized: Expression = {
-val evalModes = collectEvalModes(this, {case Add(_, _, evalMode) => 
Seq(evalMode)})
-lazy val reorderResult = buildCanonicalizedPlan(
-  { case Add(l, r, _) => Seq(l, r) },
+val reorderResult = buildCanonicalizedPlan(
+  { case Add(l, r, em) if em == evalMode => Seq(l, r) },
   { case (l: Expression, r: Expression) => Add(l, r, evalMode)},
   Some(evalMode)
 )
-if (resolved && evalModes.forall(_ == evalMode) && reorderResult.resolved 
&&
-  reorderResult.dataType == dataType) {
+if (resolved && reorderResult.resolved && reorderResult.dataType == 
dataType) {
   reorderResult
 } else {
   // SPARK-40903: Avoid reordering decimal Add for canonicalization if the 
result data type is
@@ -609,16 +607,11 @@ case class Multiply(
 newLeft: Expression, newRight: Expression): Multiply = copy(left = 
newLeft, right = newRight)
 
   override lazy val canonicalized: Expression = {
-val evalModes = collectEvalModes(this, {case Multiply(_, _, evalMode) => 
Seq(evalMode)})
-if (evalModes.forall(_ == evalMode)) {
-  buildCanonicalizedPlan(
-{ case Multiply(l, r, _) => Seq(l, r) },
-{ case (l: Expression, r: Expression) => Multiply(l, r, evalMode)},
-Some(evalMode)
-  )
-} else {
-  withCanonicalizedChildren
-}
+buildCanonicalizedPlan(
+  { case Multiply(l, r, em) if em == evalMode => Seq(l, r) },
+  { case (l: Expression, r: Expression) => Multiply(l, r, evalMode) },
+  Some(evalMode)
+)
   }
 }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-41547][CONNECT][TESTS] Re-eneable Spark Connect function tests with ANSI mode

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8f719adcf556 [SPARK-41547][CONNECT][TESTS] Re-eneable Spark Connect 
function tests with ANSI mode
8f719adcf556 is described below

commit 8f719adcf556f23ba66d3742266f4ca2e4875530
Author: Martin Grund 
AuthorDate: Tue May 7 09:14:06 2024 -0700

[SPARK-41547][CONNECT][TESTS] Re-eneable Spark Connect function tests with 
ANSI mode

### What changes were proposed in this pull request?
This patch re-enables the previously failing tests after enablement of ANSI 
SQL.

### Why are the changes needed?
Spark 4 / ANSI SQL

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Re-enabled tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46432 from grundprinzip/grundprinzip/SPARK-41547.

Authored-by: Martin Grund 
Signed-off-by: Dongjoon Hyun 
---
 .../sql/tests/connect/test_connect_function.py | 33 ++
 1 file changed, 21 insertions(+), 12 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_connect_function.py 
b/python/pyspark/sql/tests/connect/test_connect_function.py
index 2f21dd5a7d3a..9d4db8cf7d15 100644
--- a/python/pyspark/sql/tests/connect/test_connect_function.py
+++ b/python/pyspark/sql/tests/connect/test_connect_function.py
@@ -2030,7 +2030,6 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, 
PandasOnSparkTestUtils, S
 (CF.sentences, SF.sentences),
 (CF.initcap, SF.initcap),
 (CF.soundex, SF.soundex),
-(CF.bin, SF.bin),
 (CF.hex, SF.hex),
 (CF.unhex, SF.unhex),
 (CF.length, SF.length),
@@ -2043,6 +2042,19 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, 
PandasOnSparkTestUtils, S
 sdf.select(sfunc("a"), sfunc(sdf.b)).toPandas(),
 )
 
+query = """
+SELECT * FROM VALUES
+('   1   ', '2   ', NULL), ('   3', NULL, '4')
+AS tab(a, b, c)
+"""
+cdf = self.connect.sql(query)
+sdf = self.spark.sql(query)
+
+self.assert_eq(
+cdf.select(CF.bin(cdf.a), CF.bin(cdf.b)).toPandas(),
+sdf.select(SF.bin(sdf.a), SF.bin(sdf.b)).toPandas(),
+)
+
 def test_string_functions_multi_args(self):
 query = """
 SELECT * FROM VALUES
@@ -2149,15 +2161,15 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, 
PandasOnSparkTestUtils, S
 def test_date_ts_functions(self):
 query = """
 SELECT * FROM VALUES
-('1997/02/28 10:30:00', '2023/03/01 06:00:00', 'JST', 1428476400, 
2020, 12, 6),
-('2000/01/01 04:30:05', '2020/05/01 12:15:00', 'PST', 1403892395, 
2022, 12, 6)
+('1997-02-28 10:30:00', '2023-03-01 06:00:00', 'JST', 1428476400, 
2020, 12, 6),
+('2000-01-01 04:30:05', '2020-05-01 12:15:00', 'PST', 1403892395, 
2022, 12, 6)
 AS tab(ts1, ts2, tz, seconds, Y, M, D)
 """
 # +---+---+---+--++---+---+
 # |ts1|ts2| tz|   seconds|   Y|  M|  D|
 # +---+---+---+--++---+---+
-# |1997/02/28 10:30:00|2023/03/01 06:00:00|JST|1428476400|2020| 12|  6|
-# |2000/01/01 04:30:05|2020/05/01 12:15:00|PST|1403892395|2022| 12|  6|
+# |1997-02-28 10:30:00|2023-03-01 06:00:00|JST|1428476400|2020| 12|  6|
+# |2000-01-01 04:30:05|2020-05-01 12:15:00|PST|1403892395|2022| 12|  6|
 # +---+---+---+--++---+---+
 
 cdf = self.connect.sql(query)
@@ -2213,14 +2225,14 @@ class SparkConnectFunctionTests(ReusedConnectTestCase, 
PandasOnSparkTestUtils, S
 (CF.to_date, SF.to_date),
 ]:
 self.assert_eq(
-cdf.select(cfunc(cdf.ts1, format="-MM-dd")).toPandas(),
-sdf.select(sfunc(sdf.ts1, format="-MM-dd")).toPandas(),
+cdf.select(cfunc(cdf.ts1, format="-MM-dd 
HH:mm:ss")).toPandas(),
+sdf.select(sfunc(sdf.ts1, format="-MM-dd 
HH:mm:ss")).toPandas(),
 )
 self.compare_by_show(
 # [left]:  datetime64[ns, America/Los_Angeles]
 # [right]: datetime64[ns]
-cdf.select(CF.to_timestamp(cdf.ts1, format="-MM-dd")),
-sdf.select(SF.to_timestamp(sdf.ts1, format="-MM-dd")),
+cdf.select(CF.to_timestamp(cdf.ts1, format="-MM-dd HH:mm:ss")),
+sdf.select(SF.to_timestamp(sdf.ts1, format="-MM-dd HH:mm:ss")),
 )
 
 # With tz 

(spark) branch master updated (925457cadd22 -> a3eebcf39687)

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 925457cadd22 [SPARK-48169][SQL] Use lazy BadRecordException cause in 
all parsers and remove the old constructor, which was meant for the migration
 add a3eebcf39687 [SPARK-48170][PYTHON][CONNECT][TESTS] Enable 
`ArrowPythonUDFParityTests.test_err_return_type`

No new revisions were added by this update.

Summary of changes:
 python/pyspark/sql/tests/connect/test_parity_arrow_python_udf.py | 4 
 1 file changed, 4 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48169][SQL] Use lazy BadRecordException cause in all parsers and remove the old constructor, which was meant for the migration

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 925457cadd22 [SPARK-48169][SQL] Use lazy BadRecordException cause in 
all parsers and remove the old constructor, which was meant for the migration
925457cadd22 is described below

commit 925457cadd229673323e91a82d0b504145f509e0
Author: Vladimir Golubev 
AuthorDate: Tue May 7 09:09:00 2024 -0700

[SPARK-48169][SQL] Use lazy BadRecordException cause in all parsers and 
remove the old constructor, which was meant for the migration

### What changes were proposed in this pull request?
Use factory function for the exception cause in `BadRecordException` to 
avoid constructing heavy exceptions in the underlying parser. Now they are 
constructed on-demand in `FailureSafeParser`. A follow-up for 
https://github.com/apache/spark/pull/46400

### Why are the changes needed?
- Speed-up `JacksonParser` and `StaxXmlParser`, since they throw 
user-facing exceptions to `FailureSafeParser`
- Refactoring - leave only one constructor in `BadRecordException`

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
- `testOnly org.apache.spark.sql.catalyst.json.JacksonParserSuite`
- `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite`

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46438 from 
vladimirg-db/vladimirg-db/use-lazy-exception-cause-in-all-bad-record-exception-invocations.

Authored-by: Vladimir Golubev 
Signed-off-by: Dongjoon Hyun 
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   |  2 +-
 .../spark/sql/catalyst/json/JacksonParser.scala| 12 ++--
 .../sql/catalyst/util/BadRecordException.scala | 10 +-
 .../spark/sql/catalyst/xml/StaxXmlParser.scala | 22 --
 4 files changed, 20 insertions(+), 26 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index 37d9143e5b5a..8d06789a7512 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -359,7 +359,7 @@ class UnivocityParser(
 } else {
   if (badRecordException.isDefined) {
 throw BadRecordException(
-  () => currentInput, () => Array[InternalRow](requiredRow.get), 
badRecordException.get)
+  () => currentInput, () => Array(requiredRow.get), 
badRecordException.get)
   } else {
 requiredRow
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index d1093a3b1be1..3c42f72fa6b6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -613,7 +613,7 @@ class JacksonParser(
 // JSON parser currently doesn't support partial results for corrupted 
records.
 // For such records, all fields other than the field configured by
 // `columnNameOfCorruptRecord` are set to `null`.
-throw BadRecordException(() => recordLiteral(record), cause = e)
+throw BadRecordException(() => recordLiteral(record), cause = () => e)
   case e: CharConversionException if options.encoding.isEmpty =>
 val msg =
   """JSON parser cannot handle a character in its input.
@@ -621,17 +621,17 @@ class JacksonParser(
 |""".stripMargin + e.getMessage
 val wrappedCharException = new CharConversionException(msg)
 wrappedCharException.initCause(e)
-throw BadRecordException(() => recordLiteral(record), cause = 
wrappedCharException)
+throw BadRecordException(() => recordLiteral(record), cause = () => 
wrappedCharException)
   case PartialResultException(row, cause) =>
 throw BadRecordException(
   record = () => recordLiteral(record),
   partialResults = () => Array(row),
-  convertCauseForPartialResult(cause))
+  cause = () => convertCauseForPartialResult(cause))
   case PartialResultArrayException(rows, cause) =>
 throw BadRecordException(
   record = () => recordLiteral(record),
   partialResults = () => rows,
-  cause)
+  cause = () => cause)
   // These exceptions should never be thrown outside of JacksonParser.
   // They are used for the control flow in the parser. We add them here 
for completeness
   // since they also indicate a bad record.
@@ 

(spark) branch master updated (493493d6c5bb -> 9e0a87eb4cf2)

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 493493d6c5bb [SPARK-48173][SQL] CheckAnalysis should see the entire 
query plan
 add 9e0a87eb4cf2 [SPARK-48165][BUILD] Update `ap-loader` to 3.0-9

No new revisions were added by this update.

Summary of changes:
 connector/profiler/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48173][SQL] CheckAnalysis should see the entire query plan

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 493493d6c5bb [SPARK-48173][SQL] CheckAnalysis should see the entire 
query plan
493493d6c5bb is described below

commit 493493d6c5bbbaa0b04f5548ac1ccd9502e8b8fa
Author: Wenchen Fan 
AuthorDate: Tue May 7 08:02:25 2024 -0700

[SPARK-48173][SQL] CheckAnalysis should see the entire query plan

### What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/38029 . Some 
custom check rules need to see the entire query plan tree to get some context, 
but https://github.com/apache/spark/pull/38029 breaks it as it checks the query 
plan of dangling CTE relations recursively.

This PR fixes it by putting back the dangling CTE relation in the main 
query plan and then check the main query plan.

### Why are the changes needed?

Revert the breaking change to custom check rules

### Does this PR introduce _any_ user-facing change?

No for most users. This restores the behavior of Spark 3.3 and earlier for 
custom check rules.

### How was this patch tested?

existing tests.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #46439 from cloud-fan/check.

Authored-by: Wenchen Fan 
Signed-off-by: Dongjoon Hyun 
---
 .../sql/catalyst/analysis/CheckAnalysis.scala  | 39 +++---
 1 file changed, 20 insertions(+), 19 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index d1b336b08955..e55f23b6aa86 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -145,15 +145,16 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
   private def checkUnreferencedCTERelations(
   cteMap: mutable.Map[Long, (CTERelationDef, Int, mutable.Map[Long, Int])],
   visited: mutable.Map[Long, Boolean],
+  danglingCTERelations: mutable.ArrayBuffer[CTERelationDef],
   cteId: Long): Unit = {
 if (visited(cteId)) {
   return
 }
 val (cteDef, _, refMap) = cteMap(cteId)
 refMap.foreach { case (id, _) =>
-  checkUnreferencedCTERelations(cteMap, visited, id)
+  checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, id)
 }
-checkAnalysis0(cteDef.child)
+danglingCTERelations.append(cteDef)
 visited(cteId) = true
   }
 
@@ -161,35 +162,35 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
 val inlineCTE = InlineCTE(alwaysInline = true)
 val cteMap = mutable.HashMap.empty[Long, (CTERelationDef, Int, 
mutable.Map[Long, Int])]
 inlineCTE.buildCTEMap(plan, cteMap)
+val danglingCTERelations = mutable.ArrayBuffer.empty[CTERelationDef]
 val visited: mutable.Map[Long, Boolean] = 
mutable.Map.empty.withDefaultValue(false)
-cteMap.foreach { case (cteId, (relation, refCount, _)) =>
-  // If a CTE relation is never used, it will disappear after inline. Here 
we explicitly check
-  // analysis for it, to make sure the entire query plan is valid.
-  try {
-// If a CTE relation ref count is 0, the other CTE relations that 
reference it
-// should also be checked by checkAnalysis0. This code will also 
guarantee the leaf
-// relations that do not reference any others are checked first.
-if (refCount == 0) {
-  checkUnreferencedCTERelations(cteMap, visited, cteId)
-}
-  } catch {
-case e: AnalysisException =>
-  throw new ExtendedAnalysisException(e, relation.child)
+// If a CTE relation is never used, it will disappear after inline. Here 
we explicitly collect
+// these dangling CTE relations, and put them back in the main query, to 
make sure the entire
+// query plan is valid.
+cteMap.foreach { case (cteId, (_, refCount, _)) =>
+  // If a CTE relation ref count is 0, the other CTE relations that 
reference it should also be
+  // collected. This code will also guarantee the leaf relations that do 
not reference
+  // any others are collected first.
+  if (refCount == 0) {
+checkUnreferencedCTERelations(cteMap, visited, danglingCTERelations, 
cteId)
   }
 }
 // Inline all CTEs in the plan to help check query plan structures in 
subqueries.
-var inlinedPlan: Option[LogicalPlan] = None
+var inlinedPlan: LogicalPlan = plan
 try {
-  inlinedPlan = Some(inlineCTE(plan))
+  inlinedPlan = inlineCTE(plan)
 } catch 

(spark) branch master updated: [SPARK-47297][SQL] Add collation support for format expressions

2024-05-07 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 148f5335427c [SPARK-47297][SQL] Add collation support for format 
expressions
148f5335427c is described below

commit 148f5335427c3aea39cbcce967e18a3b35a88687
Author: Uros Bojanic <157381213+uros...@users.noreply.github.com>
AuthorDate: Tue May 7 23:00:30 2024 +0800

[SPARK-47297][SQL] Add collation support for format expressions

### What changes were proposed in this pull request?
Introduce collation awareness for format expressions: to_number, 
try_to_number, to_char, space.

### Why are the changes needed?
Add collation support for format expressions in Spark.

### Does this PR introduce _any_ user-facing change?
Yes, users should now be able to use collated strings within arguments for 
format functions: to_number, try_to_number, to_char, space.

### How was this patch tested?
E2e sql tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #46423 from uros-db/format-expressions.

Authored-by: Uros Bojanic <157381213+uros...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
---
 .../expressions/numberFormatExpressions.scala  |  14 ++-
 .../catalyst/expressions/stringExpressions.scala   |   2 +-
 .../spark/sql/CollationSQLExpressionsSuite.scala   | 132 -
 3 files changed, 141 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala
index 6d95d7e620a2..e914190c0645 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/numberFormatExpressions.scala
@@ -26,6 +26,8 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
 import org.apache.spark.sql.catalyst.expressions.codegen.Block.BlockHelper
 import org.apache.spark.sql.catalyst.util.ToNumberParser
 import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.types.StringTypeAnyCollation
 import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, 
DatetimeType, Decimal, DecimalType, StringType}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -47,7 +49,8 @@ abstract class ToNumberBase(left: Expression, right: 
Expression, errorOnFail: Bo
 DecimalType.USER_DEFAULT
   }
 
-  override def inputTypes: Seq[DataType] = Seq(StringType, StringType)
+  override def inputTypes: Seq[AbstractDataType] =
+Seq(StringTypeAnyCollation, StringTypeAnyCollation)
 
   override def checkInputDataTypes(): TypeCheckResult = {
 val inputTypeCheck = super.checkInputDataTypes()
@@ -247,8 +250,9 @@ object ToCharacterBuilder extends ExpressionBuilder {
   inputExpr.dataType match {
 case _: DatetimeType => DateFormatClass(inputExpr, format)
 case _: BinaryType =>
-  if (!(format.dataType == StringType && format.foldable)) {
-throw QueryCompilationErrors.nonFoldableArgumentError(funcName, 
"format", StringType)
+  if (!(format.dataType.isInstanceOf[StringType] && format.foldable)) {
+throw QueryCompilationErrors.nonFoldableArgumentError(funcName, 
"format",
+  format.dataType)
   }
   val fmt = format.eval()
   if (fmt == null) {
@@ -279,8 +283,8 @@ case class ToCharacter(left: Expression, right: Expression)
 }
   }
 
-  override def dataType: DataType = StringType
-  override def inputTypes: Seq[AbstractDataType] = Seq(DecimalType, StringType)
+  override def dataType: DataType = SQLConf.get.defaultStringType
+  override def inputTypes: Seq[AbstractDataType] = Seq(DecimalType, 
StringTypeAnyCollation)
   override def checkInputDataTypes(): TypeCheckResult = {
 val inputTypeCheck = super.checkInputDataTypes()
 if (inputTypeCheck.isSuccess) {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index 0769c8e609ec..c2ea17de1953 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -1906,7 +1906,7 @@ case class StringRepeat(str: Expression, times: 
Expression)
 case class StringSpace(child: Expression)
   extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {
 
-  override def dataType: DataType = 

(spark) branch master updated: [SPARK-48171][CORE] Clean up the use of deprecated constructors of `o.rocksdb.Logger`

2024-05-07 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c326f3c143ff [SPARK-48171][CORE] Clean up the use of deprecated 
constructors of `o.rocksdb.Logger`
c326f3c143ff is described below

commit c326f3c143ffdd56954706aeb4e0b82ac819bf03
Author: yangjie01 
AuthorDate: Tue May 7 07:33:38 2024 -0700

[SPARK-48171][CORE] Clean up the use of deprecated constructors of 
`o.rocksdb.Logger`

### What changes were proposed in this pull request?
This pr aims to clean up the use of deprecated constructors of 
`o.rocksdb.Logger`, the change ref to


https://github.com/facebook/rocksdb/blob/5c2be544f5509465957706c955b6d623e889ac4e/java/src/main/java/org/rocksdb/Logger.java#L39-L54

```
/**
   * AbstractLogger constructor.
   *
   * Important: the log level set within
   * the {link org.rocksdb.Options} instance will be used as
   * maximum log level of RocksDB.
   *
   * param options {link org.rocksdb.Options} instance.
   *
   * deprecated Use {link Logger#Logger(InfoLogLevel)} instead, e.g. {code 
new
   * Logger(options.infoLogLevel())}.
   */
  Deprecated
  public Logger(final Options options) {
this(options.infoLogLevel());
  }
```

### Why are the changes needed?
Clean up deprecated api usage.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46436 from LuciferYang/rocksdb-deprecation.

Authored-by: yangjie01 
Signed-off-by: Dongjoon Hyun 
---
 .../src/main/java/org/apache/spark/network/util/RocksDBProvider.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java
index f3b7b48355a0..2b5ea01d94c9 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/RocksDBProvider.java
@@ -136,7 +136,7 @@ public class RocksDBProvider {
 private static final Logger LOG = 
LoggerFactory.getLogger(RocksDBLogger.class);
 
 RocksDBLogger(Options options) {
-  super(options);
+  super(options.infoLogLevel());
 }
 
 @Override


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48143][SQL] Use lightweight exceptions for control-flow between UnivocityParser and FailureSafeParser

2024-05-07 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 326dbb447873 [SPARK-48143][SQL] Use lightweight exceptions for 
control-flow between UnivocityParser and FailureSafeParser
326dbb447873 is described below

commit 326dbb4478732eb9b7a683511e69206f2b21bd37
Author: Vladimir Golubev 
AuthorDate: Tue May 7 20:28:50 2024 +0800

[SPARK-48143][SQL] Use lightweight exceptions for control-flow between 
UnivocityParser and FailureSafeParser

# What changes were proposed in this pull request?
New lightweight exception for control-flow between UnivocityParser and 
FalureSafeParser to speed-up malformed CSV parsing

### Why are the changes needed?
Parsing in `PermissiveMode` is slow due to heavy exception construction 
(stacktrace filling + string template substitution in `SparkRuntimeException`)

### Does this PR introduce _any_ user-facing change?
No, since `FailureSafeParser` unwraps `BadRecordException` and correctly 
rethrows user-facing exceptions in `FailFastMode`

### How was this patch tested?
- `testOnly org.apache.spark.sql.catalyst.csv.UnivocityParserSuite`
- Manually run csv benchmark on DB benchmark workspace
- Manually checked correct and malformed csv in sherk-shell 
(org.apache.spark.SparkException is thrown with the stacktrace)

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46400 from vladimirg-db/vladimirg-db/speed-up-csv-parser.

Authored-by: Vladimir Golubev 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/csv/UnivocityParser.scala   | 10 +-
 .../spark/sql/catalyst/json/JacksonParser.scala|  5 ++---
 .../sql/catalyst/util/BadRecordException.scala | 23 ++
 .../sql/catalyst/util/FailureSafeParser.scala  |  2 +-
 .../spark/sql/catalyst/xml/StaxXmlParser.scala |  5 ++---
 5 files changed, 29 insertions(+), 16 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
index a5158d8a22c6..37d9143e5b5a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala
@@ -316,17 +316,17 @@ class UnivocityParser(
   throw BadRecordException(
 () => getCurrentInput,
 () => Array.empty,
-QueryExecutionErrors.malformedCSVRecordError(""))
+() => QueryExecutionErrors.malformedCSVRecordError(""))
 }
 
 val currentInput = getCurrentInput
 
-var badRecordException: Option[Throwable] = if (tokens.length != 
parsedSchema.length) {
+var badRecordException: Option[() => Throwable] = if (tokens.length != 
parsedSchema.length) {
   // If the number of tokens doesn't match the schema, we should treat it 
as a malformed record.
   // However, we still have chance to parse some of the tokens. It 
continues to parses the
   // tokens normally and sets null when `ArrayIndexOutOfBoundsException` 
occurs for missing
   // tokens.
-  Some(QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
+  Some(() => 
QueryExecutionErrors.malformedCSVRecordError(currentInput.toString))
 } else None
 // When the length of the returned tokens is identical to the length of 
the parsed schema,
 // we just need to:
@@ -348,7 +348,7 @@ class UnivocityParser(
   } catch {
 case e: SparkUpgradeException => throw e
 case NonFatal(e) =>
-  badRecordException = badRecordException.orElse(Some(e))
+  badRecordException = badRecordException.orElse(Some(() => e))
   // Use the corresponding DEFAULT value associated with the column, 
if any.
   row.update(i, 
ResolveDefaultColumns.existenceDefaultValues(requiredSchema)(i))
   }
@@ -359,7 +359,7 @@ class UnivocityParser(
 } else {
   if (badRecordException.isDefined) {
 throw BadRecordException(
-  () => currentInput, () => Array(requiredRow.get), 
badRecordException.get)
+  () => currentInput, () => Array[InternalRow](requiredRow.get), 
badRecordException.get)
   } else {
 requiredRow
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
index eadd0a4f8ab9..d1093a3b1be1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala
@@ -613,7 +613,7 @@ class JacksonParser(
 // JSON parser currently doesn't 

(spark) branch branch-3.5 updated: [SPARK-48086][PYTHON][TESTS][3.5] Remove obsolete comment

2024-05-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 1735d7d3fd66 [SPARK-48086][PYTHON][TESTS][3.5] Remove obsolete comment
1735d7d3fd66 is described below

commit 1735d7d3fd660560e15457793ccd9b758bb360f8
Author: Hyukjin Kwon 
AuthorDate: Tue May 7 18:50:47 2024 +0900

[SPARK-48086][PYTHON][TESTS][3.5] Remove obsolete comment

### What changes were proposed in this pull request?

This PR proposes to remove those comments for skipped tests related to 
different Arrow version in JVM.

### Why are the changes needed?

Arrow version incompatibility is something we cannot avoid. We should just 
skip those tests for now.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Linters.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46431 from HyukjinKwon/SPARK-48086.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py | 1 -
 python/pyspark/sql/tests/pandas/test_pandas_udf.py| 2 --
 2 files changed, 3 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py 
b/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py
index 960f7f11e873..a508fe1059ed 100644
--- a/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py
@@ -32,7 +32,6 @@ class PandasUDFScalarParityTests(ScalarPandasUDFTestsMixin, 
ReusedConnectTestCas
 def test_vectorized_udf_struct_with_empty_partition(self):
 super().test_vectorized_udf_struct_with_empty_partition()
 
-# TODO(SPARK-48086): Reenable this test case
 @unittest.skipIf(
 "SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Failed with 
different Client <> Server"
 )
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf.py 
b/python/pyspark/sql/tests/pandas/test_pandas_udf.py
index 4673375ccf69..b54e5608f3d0 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf.py
@@ -262,7 +262,6 @@ class PandasUDFTestsMixin:
 .collect,
 )
 
-# TODO(SPARK-48086): Reenable this test case
 @unittest.skipIf(
 "SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Failed with 
different Client <> Server"
 )
@@ -289,7 +288,6 @@ class PandasUDFTestsMixin:
 with 
self.sql_conf({"spark.sql.execution.pandas.convertToArrowArraySafely": False}):
 df.select(["A"]).withColumn("udf", udf("A")).collect()
 
-# TODO(SPARK-48086): Reenable this test case
 @unittest.skipIf(
 "SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Failed with 
different Client <> Server"
 )


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-47267][SQL] Add collation support for hash expressions

2024-05-07 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 08c6bb9bf32f [SPARK-47267][SQL] Add collation support for hash 
expressions
08c6bb9bf32f is described below

commit 08c6bb9bf32f31b5b9870d56cc4c16ab97616da6
Author: Uros Bojanic <157381213+uros...@users.noreply.github.com>
AuthorDate: Tue May 7 17:13:34 2024 +0800

[SPARK-47267][SQL] Add collation support for hash expressions

### What changes were proposed in this pull request?
Introduce collation awareness for hash expressions: MD5, SHA2, SHA1, CRC32, 
MURMUR3, XXHASH64.

### Why are the changes needed?
Add collation support for hash expressions in Spark.

### Does this PR introduce _any_ user-facing change?
Yes, users should now be able to use collated strings within arguments for 
hash functions: md5, sha2, sha1, crc32, hash, xxhash64.

### How was this patch tested?
E2e sql tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #46422 from uros-db/hash-expressions.

Authored-by: Uros Bojanic <157381213+uros...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/expressions/hash.scala  |   6 +-
 .../spark/sql/CollationSQLExpressionsSuite.scala   | 179 +
 2 files changed, 182 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
index 5089cea136a8..fa342f641509 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala
@@ -63,7 +63,7 @@ import org.apache.spark.util.ArrayImplicits._
 case class Md5(child: Expression)
   extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {
 
-  override def dataType: DataType = StringType
+  override def dataType: DataType = SQLConf.get.defaultStringType
 
   override def inputTypes: Seq[DataType] = Seq(BinaryType)
 
@@ -103,7 +103,7 @@ case class Md5(child: Expression)
 case class Sha2(left: Expression, right: Expression)
   extends BinaryExpression with ImplicitCastInputTypes with NullIntolerant 
with Serializable {
 
-  override def dataType: DataType = StringType
+  override def dataType: DataType = SQLConf.get.defaultStringType
   override def nullable: Boolean = true
 
   override def inputTypes: Seq[DataType] = Seq(BinaryType, IntegerType)
@@ -169,7 +169,7 @@ case class Sha2(left: Expression, right: Expression)
 case class Sha1(child: Expression)
   extends UnaryExpression with ImplicitCastInputTypes with NullIntolerant {
 
-  override def dataType: DataType = StringType
+  override def dataType: DataType = SQLConf.get.defaultStringType
 
   override def inputTypes: Seq[DataType] = Seq(BinaryType)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala
index fa82405109f1..596923d975a5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/CollationSQLExpressionsSuite.scala
@@ -28,6 +28,185 @@ class CollationSQLExpressionsSuite
   extends QueryTest
   with SharedSparkSession {
 
+  test("Support Md5 hash expression with collation") {
+case class Md5TestCase(
+  input: String,
+  collationName: String,
+  result: String
+)
+
+val testCases = Seq(
+  Md5TestCase("Spark", "UTF8_BINARY", "8cde774d6f7333752ed72cacddb05126"),
+  Md5TestCase("Spark", "UTF8_BINARY_LCASE", 
"8cde774d6f7333752ed72cacddb05126"),
+  Md5TestCase("SQL", "UNICODE", "9778840a0100cb30c982876741b0b5a2"),
+  Md5TestCase("SQL", "UNICODE_CI", "9778840a0100cb30c982876741b0b5a2")
+)
+
+// Supported collations
+testCases.foreach(t => {
+  val query =
+s"""
+   |select md5('${t.input}')
+   |""".stripMargin
+  // Result & data type
+  withSQLConf(SqlApiConf.DEFAULT_COLLATION -> t.collationName) {
+val testQuery = sql(query)
+checkAnswer(testQuery, Row(t.result))
+val dataType = StringType(t.collationName)
+assert(testQuery.schema.fields.head.dataType.sameType(dataType))
+  }
+})
+  }
+
+  test("Support Sha2 hash expression with collation") {
+case class Sha2TestCase(
+  input: String,
+  collationName: String,
+  bitLength: Int,
+  result: String
+)
+
+val testCases = Seq(
+  Sha2TestCase("Spark", "UTF8_BINARY", 256,
+"529bc3b07127ecb7e53a4dcf1991d9152c24537d919178022b2c42657f79a26b"),
+  Sha2TestCase("Spark", 

(spark) branch master updated (7f8ef96cea27 -> c4df12cc884c)

2024-05-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 7f8ef96cea27 [SPARK-48166][SQL] Avoid using BadRecordException as 
user-facing error in VariantExpressionEvalUtils
 add c4df12cc884c [SPARK-48113][CONNECT] Allow Plugins to integrate with 
Spark Connect

No new revisions were added by this update.

Summary of changes:
 .../main/scala/org/apache/spark/sql/Column.scala   | 28 -
 .../scala/org/apache/spark/sql/SparkSession.scala  | 59 ---
 .../org/apache/spark/sql/ClientDatasetSuite.scala  | 24 
 .../apache/spark/sql/PlanGenerationTestSuite.scala | 26 +
 .../CheckConnectJvmClientCompatibility.scala   |  9 +++
 .../spark/sql/connect/ConnectProtoUtils.scala  | 55 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  | 23 ++--
 .../connect/planner/SparkConnectPlannerSuite.scala | 40 +
 .../planner/SparkConnectPlannerTestUtils.scala | 67 ++
 .../plugin/SparkConnectPluginRegistrySuite.scala   | 14 +++--
 10 files changed, 169 insertions(+), 176 deletions(-)
 create mode 100644 
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/ConnectProtoUtils.scala
 create mode 100644 
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectPlannerTestUtils.scala


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48166][SQL] Avoid using BadRecordException as user-facing error in VariantExpressionEvalUtils

2024-05-07 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 7f8ef96cea27 [SPARK-48166][SQL] Avoid using BadRecordException as 
user-facing error in VariantExpressionEvalUtils
7f8ef96cea27 is described below

commit 7f8ef96cea27d52d0bdda3808c6c48534dcd8567
Author: Vladimir Golubev 
AuthorDate: Tue May 7 16:13:46 2024 +0800

[SPARK-48166][SQL] Avoid using BadRecordException as user-facing error in 
VariantExpressionEvalUtils

### What changes were proposed in this pull request?
Stop using `BadRecordException` in a user-facing context. Currently it is 
thrown then the `parse_json` input is malformed.

### Why are the changes needed?
`BadRecordException` is an internal exception designed for 
`FailureSafeParser`.

### Does this PR introduce _any_ user-facing change?
Yes, `parse_json` will not expose `BadRecordException` in error.

### How was this patch tested?
`testOnly 
org.apache.spark.sql.catalyst.expressions.variant.VariantExpressionEvalUtilsSuite`

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46428 from 
vladimirg-db/vladimirg-db/get-rid-of-bad-record-exception-in-variant-expression-eval-utils.

Authored-by: Vladimir Golubev 
Signed-off-by: Wenchen Fan 
---
 .../sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala
index f468e9745605..eb235eb854e0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/variant/VariantExpressionEvalUtils.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions.variant
 import scala.util.control.NonFatal
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.util.{ArrayData, BadRecordException, 
MapData}
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.types._
 import org.apache.spark.types.variant.{Variant, VariantBuilder, 
VariantSizeLimitException, VariantUtil}
@@ -48,7 +48,7 @@ object VariantExpressionEvalUtils {
   .variantSizeLimitError(VariantUtil.SIZE_LIMIT, "parse_json"))
   case NonFatal(e) =>
 
parseJsonFailure(QueryExecutionErrors.malformedRecordsDetectedInRecordParsingError(
-  input.toString, BadRecordException(() => input, cause = e)))
+  input.toString, e))
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48058][SPARK-43727][PYTHON][CONNECT][TESTS][FOLLOWUP] Code clean up

2024-05-07 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 98d4ab734de0 
[SPARK-48058][SPARK-43727][PYTHON][CONNECT][TESTS][FOLLOWUP] Code clean up
98d4ab734de0 is described below

commit 98d4ab734de05eb5eec83011ed965cfb5b51e4b5
Author: Ruifeng Zheng 
AuthorDate: Tue May 7 15:35:54 2024 +0800

[SPARK-48058][SPARK-43727][PYTHON][CONNECT][TESTS][FOLLOWUP] Code clean up

### What changes were proposed in this pull request?
after https://github.com/apache/spark/pull/46300, the two tests are 
actually the same as


https://github.com/apache/spark/blob/678aeb7ef7086bd962df7ac6d1c5f39151a0515b/python/pyspark/sql/tests/pandas/test_pandas_udf.py#L110-L125

and


https://github.com/apache/spark/blob/678aeb7ef7086bd962df7ac6d1c5f39151a0515b/python/pyspark/sql/tests/pandas/test_pandas_udf.py#L55-L70

So no need to override them in the parity tests

### Why are the changes needed?
clean up

### Does this PR introduce _any_ user-facing change?
no, test only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #46429 from zhengruifeng/return_type_followup.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .../sql/tests/connect/test_parity_pandas_udf.py| 35 +-
 1 file changed, 1 insertion(+), 34 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_udf.py 
b/python/pyspark/sql/tests/connect/test_parity_pandas_udf.py
index b732a875fb0a..7f280a009f78 100644
--- a/python/pyspark/sql/tests/connect/test_parity_pandas_udf.py
+++ b/python/pyspark/sql/tests/connect/test_parity_pandas_udf.py
@@ -14,8 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-from pyspark.sql.functions import pandas_udf, PandasUDFType
-from pyspark.sql.types import DoubleType, StructType, StructField
+
 from pyspark.sql.tests.pandas.test_pandas_udf import PandasUDFTestsMixin
 from pyspark.testing.connectutils import ReusedConnectTestCase
 
@@ -24,38 +23,6 @@ class PandasUDFParityTests(PandasUDFTestsMixin, 
ReusedConnectTestCase):
 def test_udf_wrong_arg(self):
 self.check_udf_wrong_arg()
 
-def test_pandas_udf_decorator_with_return_type_string(self):
-@pandas_udf("v double", PandasUDFType.GROUPED_MAP)
-def foo(x):
-return x
-
-self.assertEqual(foo.returnType, StructType([StructField("v", 
DoubleType(), True)]))
-self.assertEqual(foo.evalType, PandasUDFType.GROUPED_MAP)
-
-@pandas_udf(returnType="double", functionType=PandasUDFType.SCALAR)
-def foo(x):
-return x
-
-self.assertEqual(foo.returnType, DoubleType())
-self.assertEqual(foo.evalType, PandasUDFType.SCALAR)
-
-def test_pandas_udf_basic_with_return_type_string(self):
-udf = pandas_udf(lambda x: x, "double", PandasUDFType.SCALAR)
-self.assertEqual(udf.returnType, DoubleType())
-self.assertEqual(udf.evalType, PandasUDFType.SCALAR)
-
-udf = pandas_udf(lambda x: x, "v double", PandasUDFType.GROUPED_MAP)
-self.assertEqual(udf.returnType, StructType([StructField("v", 
DoubleType(), True)]))
-self.assertEqual(udf.evalType, PandasUDFType.GROUPED_MAP)
-
-udf = pandas_udf(lambda x: x, "v double", 
functionType=PandasUDFType.GROUPED_MAP)
-self.assertEqual(udf.returnType, StructType([StructField("v", 
DoubleType(), True)]))
-self.assertEqual(udf.evalType, PandasUDFType.GROUPED_MAP)
-
-udf = pandas_udf(lambda x: x, returnType="v double", 
functionType=PandasUDFType.GROUPED_MAP)
-self.assertEqual(udf.returnType, StructType([StructField("v", 
DoubleType(), True)]))
-self.assertEqual(udf.evalType, PandasUDFType.GROUPED_MAP)
-
 
 if __name__ == "__main__":
 import unittest


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-48167][CONNECT][TESTS] Skip known behaviour change by SPARK-46122

2024-05-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new ec7e888d7082 [SPARK-48167][CONNECT][TESTS] Skip known behaviour change 
by SPARK-46122
ec7e888d7082 is described below

commit ec7e888d70822796146ffb2769ccae759baf24f4
Author: Hyukjin Kwon 
AuthorDate: Tue May 7 16:16:02 2024 +0900

[SPARK-48167][CONNECT][TESTS] Skip known behaviour change by SPARK-46122

### What changes were proposed in this pull request?

This PR proposes to skip `test_create_without_provider` in backward 
compatibility test in 
https://github.com/apache/spark/actions/workflows/build_python_connect35.yml

### Why are the changes needed?

This is a intentional behaviour change (SPARK-46122)

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Tested in my fork

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46430 from HyukjinKwon/SPARK-48167.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/sql/tests/test_readwriter.py | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/python/pyspark/sql/tests/test_readwriter.py 
b/python/pyspark/sql/tests/test_readwriter.py
index 921d2eba5ac7..e903d3383b74 100644
--- a/python/pyspark/sql/tests/test_readwriter.py
+++ b/python/pyspark/sql/tests/test_readwriter.py
@@ -16,6 +16,7 @@
 #
 
 import os
+import unittest
 import shutil
 import tempfile
 
@@ -245,6 +246,8 @@ class ReadwriterV2TestsMixin:
 df.writeTo("test_table").using("parquet").create()
 self.assertEqual(100, self.spark.sql("select * from 
test_table").count())
 
+@unittest.skipIf(
+"SPARK_SKIP_CONNECT_COMPAT_TESTS" in os.environ, "Known behavior 
change in 4.0")
 def test_create_without_provider(self):
 df = self.df
 with self.assertRaisesRegex(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.5 updated: [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking error message in UDF

2024-05-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new f92580aa111f [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in 
the test checking error message in UDF
f92580aa111f is described below

commit f92580aa111f8531cdb229a2d1bb0234764d7262
Author: Hyukjin Kwon 
AuthorDate: Tue May 7 15:59:20 2024 +0900

[SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking 
error message in UDF

This PR reduces traceback so the actual error `ZeroDivisionError` can be 
tested in 
`pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception`

So long traceback doesn't affect the test case. It can fail as below:

```
==
FAIL [1.883s]: test_stream_exception 
(pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception)
--
Traceback (most recent call last):
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py",
 line 287, in test_stream_exception
sq.processAllAvailable()
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py",
 line 129, in processAllAvailable
self._execute_streaming_query_cmd(cmd)
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py",
 line 177, in _execute_streaming_query_cmd
(_, properties) = self._session.client.execute_command(exec_cmd)
  ^^
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 982, in execute_command
data, _, _, _, properties = self._execute_and_fetch(req)

  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 1283, in _execute_and_fetch
for response in self._execute_and_fetch_as_iterator(req):
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 1264, in _execute_and_fetch_as_iterator
self._handle_error(error)
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 
[150](https://github.com/HyukjinKwon/spark/actions/runs/8978991632/job/24660689666#step:9:151)3,
 in _handle_error
self._handle_rpc_error(error)
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 1539, in _handle_rpc_error
raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.StreamingQueryException: [STREAM_FAILED] 
Query [id = 1c0c440d-0b48-41b1-9a03-071e7e13de82, runId = 
692ec338-963a-43b1-89cb-2a8b7cb1e21a] terminated with exception: Job aborted 
due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure: 
Lost task 0.0 in stage 39.0 (TID 58) 
(fv-az714-234.22nzjvkrszmuhkvqy55p1tioig.phxx.internal.cloudapp.net executor 
driver): org.apache.spark.api.python.PythonException: Traceback (most recent 
call last):
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1834, in main
process()
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1826, in process
serializer.dump_stream(out_iter, outfile)
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
line 224, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
line 145, in dump_stream
for obj in iterator:
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
line 213, in _batched
for item in iterator:
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1734, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in 
udfs)
 
^
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1734, in 
result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in 
udfs)
   ^^^
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
112, in 
return args_kwargs_offsets, lambda *a: func(*a)
   
  File 

(spark) branch master updated: [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking error message in UDF

2024-05-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new eee179135ed2 [SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in 
the test checking error message in UDF
eee179135ed2 is described below

commit eee179135ed21dbdd8b342d053c9eda849e2de77
Author: Hyukjin Kwon 
AuthorDate: Tue May 7 15:59:20 2024 +0900

[SPARK-48090][SS][PYTHON][TESTS] Shorten the traceback in the test checking 
error message in UDF

### What changes were proposed in this pull request?

This PR reduces traceback so the actual error `ZeroDivisionError` can be 
tested in 
`pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception`

### Why are the changes needed?

So long traceback doesn't affect the test case. It can fail as below:

```
==
FAIL [1.883s]: test_stream_exception 
(pyspark.sql.tests.connect.streaming.test_parity_streaming.StreamingParityTests.test_stream_exception)
--
Traceback (most recent call last):
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/streaming/test_streaming.py",
 line 287, in test_stream_exception
sq.processAllAvailable()
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py",
 line 129, in processAllAvailable
self._execute_streaming_query_cmd(cmd)
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/streaming/query.py",
 line 177, in _execute_streaming_query_cmd
(_, properties) = self._session.client.execute_command(exec_cmd)
  ^^
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 982, in execute_command
data, _, _, _, properties = self._execute_and_fetch(req)

  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 1283, in _execute_and_fetch
for response in self._execute_and_fetch_as_iterator(req):
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 1264, in _execute_and_fetch_as_iterator
self._handle_error(error)
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 
[150](https://github.com/HyukjinKwon/spark/actions/runs/8978991632/job/24660689666#step:9:151)3,
 in _handle_error
self._handle_rpc_error(error)
  File 
"/home/runner/work/spark/spark-3.5/python/pyspark/sql/connect/client/core.py", 
line 1539, in _handle_rpc_error
raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.StreamingQueryException: [STREAM_FAILED] 
Query [id = 1c0c440d-0b48-41b1-9a03-071e7e13de82, runId = 
692ec338-963a-43b1-89cb-2a8b7cb1e21a] terminated with exception: Job aborted 
due to stage failure: Task 0 in stage 39.0 failed 1 times, most recent failure: 
Lost task 0.0 in stage 39.0 (TID 58) 
(fv-az714-234.22nzjvkrszmuhkvqy55p1tioig.phxx.internal.cloudapp.net executor 
driver): org.apache.spark.api.python.PythonException: Traceback (most recent 
call last):
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1834, in main
process()
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1826, in process
serializer.dump_stream(out_iter, outfile)
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
line 224, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
line 145, in dump_stream
for obj in iterator:
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/serializers.py", 
line 213, in _batched
for item in iterator:
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1734, in mapper
result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in 
udfs)
 
^
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
1734, in 
result = tuple(f(*[a[o] for o in arg_offsets]) for arg_offsets, f in 
udfs)
   ^^^
  File 
"/home/runner/work/spark/spark/python/lib/pyspark.zip/pyspark/worker.py", line 
112, in 
return args_kwargs_offsets, lambda *a: func(*a)
 

(spark) branch branch-3.5 updated: [SPARK-48083][SPARK-48084][ML][TESTS] Remove JIRA comments for reenabling ML compatibility tests

2024-05-07 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
 new 997100119e6c [SPARK-48083][SPARK-48084][ML][TESTS] Remove JIRA 
comments for reenabling ML compatibility tests
997100119e6c is described below

commit 997100119e6c893c46b12cc32a4f96721c8f3a22
Author: Weichen Xu 
AuthorDate: Tue May 7 15:58:01 2024 +0900

[SPARK-48083][SPARK-48084][ML][TESTS] Remove JIRA comments for reenabling 
ML compatibility tests

### What changes were proposed in this pull request?

Enable spark ml test

### Why are the changes needed?

For test_connect_classification.py,
session.copyFromLocalToFs failure with 3.5 client <> 4.0 server

this is not an issue,
copyFromLocalToFs requires spark server to config 
spark.connect.copyFromLocalToFs.allowDestLocal to False, because the test can 
only use local fs.

For test_connect_evaluation.py,
This test error pyspark.ml.connect.evaluation not working in 3.5 client <> 
4.0 server is caused by cloudpickle forward incompatibility, it is not related 
to ML code

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46419 from WeichenXu123/reenable-test-3.5.

Authored-by: Weichen Xu 
Signed-off-by: Hyukjin Kwon 
---
 python/pyspark/ml/tests/connect/test_connect_classification.py | 1 -
 python/pyspark/ml/tests/connect/test_connect_evaluation.py | 1 -
 2 files changed, 2 deletions(-)

diff --git a/python/pyspark/ml/tests/connect/test_connect_classification.py 
b/python/pyspark/ml/tests/connect/test_connect_classification.py
index 2763d3f613ae..f0d60a117e12 100644
--- a/python/pyspark/ml/tests/connect/test_connect_classification.py
+++ b/python/pyspark/ml/tests/connect/test_connect_classification.py
@@ -21,7 +21,6 @@ import os
 from pyspark.sql import SparkSession
 from pyspark.ml.tests.connect.test_legacy_mode_classification import 
ClassificationTestsMixin
 
-# TODO(SPARK-48083): Reenable this test case
 have_torch = "SPARK_SKIP_CONNECT_COMPAT_TESTS" not in os.environ
 try:
 import torch  # noqa: F401
diff --git a/python/pyspark/ml/tests/connect/test_connect_evaluation.py 
b/python/pyspark/ml/tests/connect/test_connect_evaluation.py
index 35af54605ca8..58076dfe0bbe 100644
--- a/python/pyspark/ml/tests/connect/test_connect_evaluation.py
+++ b/python/pyspark/ml/tests/connect/test_connect_evaluation.py
@@ -20,7 +20,6 @@ import unittest
 from pyspark.sql import SparkSession
 from pyspark.ml.tests.connect.test_legacy_mode_evaluation import 
EvaluationTestsMixin
 
-# TODO(SPARK-48084): Reenable this test case
 have_torcheval = "SPARK_SKIP_CONNECT_COMPAT_TESTS" not in os.environ
 try:
 import torcheval  # noqa: F401


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48027][SQL] InjectRuntimeFilter for multi-level join should check child join type

2024-05-07 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new b5e39bedab14 [SPARK-48027][SQL] InjectRuntimeFilter for multi-level 
join should check child join type
b5e39bedab14 is described below

commit b5e39bedab14a7fd800597ee0114b07448c1b0f9
Author: Angerszh 
AuthorDate: Tue May 7 14:47:40 2024 +0800

[SPARK-48027][SQL] InjectRuntimeFilter for multi-level join should check 
child join type

### What changes were proposed in this pull request?
In our prod we meet a case
```
with
refund_info as (
  select
b_key,
1 as b_type
  from  default.table_b
),
next_month_time as (
  select /*+ broadcast(b, c) */
 c_key
   ,1 as c_time
  FROM default.table_c
)
select
a.loan_id
,c.c_time
,b.type
from  (
select
   a_key
from default.table_a2
union
   select
 a_key
   from  default.table_a1
) a
left join refund_info b  on a.loan_id = b.loan_id
left join next_month_time c on a.loan_id = c.loan_id
;
```
In this query, it inject table_b as table_c's runtime bloom filter, but 
table_b join condition is LEFT OUTER, causing table_c missing data.

Caused by

![image](https://github.com/apache/spark/assets/46485123/be45e211-23e4-4105-98b4-aa571c87665f)

InjectRuntimeFilter.extractSelectiveFilterOverScan(), when handle join, 
since left plan (a left outer join b's a) is a UNION then the extract result is 
NONE, then zip left/right keys to extract from join's right,  finnaly cause 
this issue.

### Why are the changes needed?
Fix data correctness issue

### Does this PR introduce _any_ user-facing change?
Yea, fix data incorrect issue

### How was this patch tested?
For the existed PR, it fix the wrong case

Before: It extract a LEFT_ANTI_JOIN's right child to the outside bf3its 
not correct
```
Join Inner, (c3#45926 = c1#45914)
:- Join LeftAnti, (c1#45914 = c2#45920)
:  :- Filter isnotnull(c1#45914)
:  :  +- Relation 
default.bf1[a1#45912,b1#45913,c1#45914,d1#45915,e1#45916,f1#45917] parquet
:  +- Project [c2#45920]
: +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND 
isnotnull(c2#45920))
:+- Relation 
default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet
+- Filter (isnotnull(c3#45926) AND might_contain(scalar-subquery#48719 [], 
xxhash64(c3#45926, 42)))
   :  +- Aggregate [bloom_filter_agg(xxhash64(c2#45920, 42), 100, 
8388608, 0, 0) AS bloomFilter#48718]
   : +- Project [c2#45920]
   :+- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND 
isnotnull(c2#45920))
   :   +- Relation 
default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet
   +- Relation 
default.bf3[a3#45924,b3#45925,c3#45926,d3#45927,e3#45928,f3#45929] parquet
```

After:
```
Join Inner, (c3#45926 = c1#45914)
:- Join LeftAnti, (c1#45914 = c2#45920)
:  :- Filter isnotnull(c1#45914)
:  :  +- Relation 
default.bf1[a1#45912,b1#45913,c1#45914,d1#45915,e1#45916,f1#45917] parquet
:  +- Project [c2#45920]
: +- Filter ((isnotnull(a2#45918) AND (a2#45918 = 5)) AND 
isnotnull(c2#45920))
:+- Relation 
default.bf2[a2#45918,b2#45919,c2#45920,d2#45921,e2#45922,f2#45923] parquet
+- Filter (isnotnull(c3#45926))
   +- Relation 
default.bf3[a3#45924,b3#45925,c3#45926,d3#45927,e3#45928,f3#45929] parquet
```

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes #46263 from AngersZh/SPARK-48027.

Lead-authored-by: Angerszh 
Co-authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 .../catalyst/optimizer/InjectRuntimeFilter.scala   | 44 +++---
 .../spark/sql/InjectRuntimeFilterSuite.scala   |  4 +-
 2 files changed, 32 insertions(+), 16 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
index 9c150f1f3308..3bb7c4d1ceca 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala
@@ -120,7 +120,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
   hasHitSelectiveFilter = hasHitSelectiveFilter || 
isLikelySelective(condition),
   currentPlan,
   targetKey)
-  case ExtractEquiJoinKeys(_, lkeys, rkeys, _,