(spark) branch master updated: [SPARK-48016][SQL][TESTS][FOLLOWUP] Update Java 21 golden file

2024-04-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 65cf5b18648a [SPARK-48016][SQL][TESTS][FOLLOWUP] Update Java 21 golden 
file
65cf5b18648a is described below

commit 65cf5b18648a81fc9b0787d03f23f7465c20f3ec
Author: Dongjoon Hyun 
AuthorDate: Tue Apr 30 22:42:02 2024 -0700

[SPARK-48016][SQL][TESTS][FOLLOWUP] Update Java 21 golden file

### What changes were proposed in this pull request?

This is a follow-up of SPARK-48016 to update the missed Java 21 golden file.
- #46286

### Why are the changes needed?

To recover Java 21 CIs:
- https://github.com/apache/spark/actions/workflows/build_java21.yml
- https://github.com/apache/spark/actions/workflows/build_maven_java21.yml
- 
https://github.com/apache/spark/actions/workflows/build_maven_java21_macos14.yml

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual tests. I regenerated all in Java 21 and this was the only one 
affected.
```
$ SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly 
org.apache.spark.sql.SQLQueryTestSuite"
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46313 from dongjoon-hyun/SPARK-48016.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .../results/try_arithmetic.sql.out.java21  | 64 ++
 1 file changed, 64 insertions(+)

diff --git 
a/sql/core/src/test/resources/sql-tests/results/try_arithmetic.sql.out.java21 
b/sql/core/src/test/resources/sql-tests/results/try_arithmetic.sql.out.java21
index dcdb9d0dcb19..002a0dfcf37e 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/try_arithmetic.sql.out.java21
+++ 
b/sql/core/src/test/resources/sql-tests/results/try_arithmetic.sql.out.java21
@@ -15,6 +15,22 @@ struct
 NULL
 
 
+-- !query
+SELECT try_add(2147483647, decimal(1))
+-- !query schema
+struct
+-- !query output
+2147483648
+
+
+-- !query
+SELECT try_add(2147483647, "1")
+-- !query schema
+struct
+-- !query output
+2.147483648E9
+
+
 -- !query
 SELECT try_add(-2147483648, -1)
 -- !query schema
@@ -249,6 +265,22 @@ struct
 NULL
 
 
+-- !query
+SELECT try_divide(1, decimal(0))
+-- !query schema
+struct
+-- !query output
+NULL
+
+
+-- !query
+SELECT try_divide(1, "0")
+-- !query schema
+struct
+-- !query output
+NULL
+
+
 -- !query
 SELECT try_divide(interval 2 year, 2)
 -- !query schema
@@ -313,6 +345,22 @@ struct
 NULL
 
 
+-- !query
+SELECT try_subtract(2147483647, decimal(-1))
+-- !query schema
+struct
+-- !query output
+2147483648
+
+
+-- !query
+SELECT try_subtract(2147483647, "-1")
+-- !query schema
+struct
+-- !query output
+2.147483648E9
+
+
 -- !query
 SELECT try_subtract(-2147483648, 1)
 -- !query schema
@@ -409,6 +457,22 @@ struct
 NULL
 
 
+-- !query
+SELECT try_multiply(2147483647, decimal(-2))
+-- !query schema
+struct
+-- !query output
+-4294967294
+
+
+-- !query
+SELECT try_multiply(2147483647, "-2")
+-- !query schema
+struct
+-- !query output
+-4.294967294E9
+
+
 -- !query
 SELECT try_multiply(-2147483648, 2)
 -- !query schema


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48047][SQL] Reduce memory pressure of empty TreeNode tags

2024-04-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 02206cd66dbf [SPARK-48047][SQL] Reduce memory pressure of empty 
TreeNode tags
02206cd66dbf is described below

commit 02206cd66dbfc8de602a685b032f1805bcf8e36f
Author: Nick Young 
AuthorDate: Tue Apr 30 22:07:20 2024 -0700

[SPARK-48047][SQL] Reduce memory pressure of empty TreeNode tags

### What changes were proposed in this pull request?

- Changed the `tags` variable of the `TreeNode` class to initialize lazily. 
This will reduce unnecessary driver memory pressure.

### Why are the changes needed?

- Plans with large expression or operator trees are known to cause driver 
memory pressure; this is one step in alleviating that issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UT covers behavior. Outwards facing behavior does not change.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #46285 from n-young-db/treenode-tags.

Authored-by: Nick Young 
Signed-off-by: Dongjoon Hyun 
---
 .../apache/spark/sql/catalyst/trees/TreeNode.scala | 24 ++
 1 file changed, 20 insertions(+), 4 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 94e893d468b3..dd39f3182bfb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -78,8 +78,16 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
   /**
* A mutable map for holding auxiliary information of this tree node. It 
will be carried over
* when this node is copied via `makeCopy`, or transformed via 
`transformUp`/`transformDown`.
+   * We lazily evaluate the `tags` since the default size of a `mutable.Map` 
is nonzero. This
+   * will reduce unnecessary memory pressure.
*/
-  private val tags: mutable.Map[TreeNodeTag[_], Any] = mutable.Map.empty
+  private[this] var _tags: mutable.Map[TreeNodeTag[_], Any] = null
+  private def tags: mutable.Map[TreeNodeTag[_], Any] = {
+if (_tags eq null) {
+  _tags = mutable.Map.empty
+}
+_tags
+  }
 
   /**
* Default tree pattern [[BitSet] for a [[TreeNode]].
@@ -147,11 +155,13 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
 ineffectiveRules.get(ruleId.id)
   }
 
+  def isTagsEmpty: Boolean = (_tags eq null) || _tags.isEmpty
+
   def copyTagsFrom(other: BaseType): Unit = {
 // SPARK-32753: it only makes sense to copy tags to a new node
 // but it's too expensive to detect other cases likes node removal
 // so we make a compromise here to copy tags to node with no tags
-if (tags.isEmpty) {
+if (isTagsEmpty && !other.isTagsEmpty) {
   tags ++= other.tags
 }
   }
@@ -161,11 +171,17 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
   }
 
   def getTagValue[T](tag: TreeNodeTag[T]): Option[T] = {
-tags.get(tag).map(_.asInstanceOf[T])
+if (isTagsEmpty) {
+  None
+} else {
+  tags.get(tag).map(_.asInstanceOf[T])
+}
   }
 
   def unsetTagValue[T](tag: TreeNodeTag[T]): Unit = {
-tags -= tag
+if (!isTagsEmpty) {
+  tags -= tag
+}
   }
 
   /**


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated (c71d02ab7c80 -> 991763c2cdf8)

2024-04-30 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from c71d02ab7c80 [SPARK-48028][TESTS] Regenerate benchmark results after 
turning ANSI on
 add 991763c2cdf8 [SPARK-46894][PYTHON] Move PySpark error conditions into 
standalone JSON file

No new revisions were added by this update.

Summary of changes:
 python/MANIFEST.in |9 +-
 python/docs/source/getting_started/install.rst |4 +-
 .../{error_classes.py => error-conditions.json}|   28 -
 python/pyspark/errors/error_classes.py | 1165 +---
 python/pyspark/errors/exceptions/__init__.py   |   40 +-
 python/pyspark/errors_doc_gen.py   |2 +-
 6 files changed, 29 insertions(+), 1219 deletions(-)
 copy python/pyspark/errors/{error_classes.py => error-conditions.json} (97%)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48063][CORE] Enable `spark.stage.ignoreDecommissionFetchFailure` by default

2024-04-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new f3cc8f930383 [SPARK-48063][CORE] Enable 
`spark.stage.ignoreDecommissionFetchFailure` by default
f3cc8f930383 is described below

commit f3cc8f930383659b9f99e56b38de4b97d588e20b
Author: Dongjoon Hyun 
AuthorDate: Tue Apr 30 15:19:00 2024 -0700

[SPARK-48063][CORE] Enable `spark.stage.ignoreDecommissionFetchFailure` by 
default

### What changes were proposed in this pull request?

This PR aims to **enable `spark.stage.ignoreDecommissionFetchFailure` by 
default** while keeping 
`spark.scheduler.maxRetainedRemovedDecommissionExecutors=0` without any change 
for Apache Spark 4.0.0 in order to help a user use this feature more easily by 
setting only one configuration, 
`spark.scheduler.maxRetainedRemovedDecommissionExecutors`.

### Why are the changes needed?

This feature was added at Apache Spark 3.4.0 via SPARK-40481 and 
SPARK-40979 and has been used for two years to support executor decommissioning 
features in the production.
- #37924
- #38441

### Does this PR introduce _any_ user-facing change?

No because `spark.scheduler.maxRetainedRemovedDecommissionExecutors` is 
still `0`.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46308 from dongjoon-hyun/SPARK-48063.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 core/src/main/scala/org/apache/spark/internal/config/package.scala | 2 +-
 docs/configuration.md  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index b2cbb6f6deb6..2e207422ae06 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2403,7 +2403,7 @@ package object config {
 s"count ${STAGE_MAX_CONSECUTIVE_ATTEMPTS.key}")
   .version("3.4.0")
   .booleanConf
-  .createWithDefault(false)
+  .createWithDefault(true)
 
   private[spark] val SCHEDULER_MAX_RETAINED_REMOVED_EXECUTORS =
 ConfigBuilder("spark.scheduler.maxRetainedRemovedDecommissionExecutors")
diff --git a/docs/configuration.md b/docs/configuration.md
index d5e2a569fdea..2e612ffd9ab9 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3072,7 +3072,7 @@ Apart from these, the following properties are also 
available, and may be useful
 
 
   spark.stage.ignoreDecommissionFetchFailure
-  false
+  true
   
 Whether ignore stage fetch failure caused by executor decommission when
 count spark.stage.maxConsecutiveAttempts


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-48060][SS][TESTS] Fix `StreamingQueryHashPartitionVerifySuite` to update golden files correctly

2024-04-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new faab553cac70 [SPARK-48060][SS][TESTS] Fix 
`StreamingQueryHashPartitionVerifySuite` to update golden files correctly
faab553cac70 is described below

commit faab553cac70eefeec286b1823b70ad62bed87f8
Author: Dongjoon Hyun 
AuthorDate: Tue Apr 30 12:50:07 2024 -0700

[SPARK-48060][SS][TESTS] Fix `StreamingQueryHashPartitionVerifySuite` to 
update golden files correctly

### What changes were proposed in this pull request?

This PR aims to fix `StreamingQueryHashPartitionVerifySuite` to update 
golden files correctly.
- The documentation is added.
- Newly generated files are updated.

### Why are the changes needed?

Previously, `SPARK_GENERATE_GOLDEN_FILES` doesn't work as expected because 
it updates the files under `target` directory. We need to update `src/test` 
files.

**BEFORE**
```
$ SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly 
*StreamingQueryHashPartitionVerifySuite"

$ git status
On branch master
Your branch is up to date with 'apache/master'.

nothing to commit, working tree clean
```

**AFTER**
```
$ SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly 
*StreamingQueryHashPartitionVerifySuite" \
-Dspark.sql.test.randomDataGenerator.maxStrLen=100 \
-Dspark.sql.test.randomDataGenerator.maxArraySize=4

$ git status
On branch SPARK-48060
Your branch is up to date with 'dongjoon/SPARK-48060'.

Changes not staged for commit:
  (use "git add ..." to update what will be committed)
  (use "git restore ..." to discard changes in working directory)
modified:   
sql/core/src/test/resources/structured-streaming/partition-tests/randomSchemas
modified:   
sql/core/src/test/resources/structured-streaming/partition-tests/rowsAndPartIds

no changes added to commit (use "git add" and/or "git commit -a")
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs. I regenerate the data like the following.

```
$ SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "sql/testOnly 
*StreamingQueryHashPartitionVerifySuite" \
-Dspark.sql.test.randomDataGenerator.maxStrLen=100 \
-Dspark.sql.test.randomDataGenerator.maxArraySize=4
```

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #46304 from dongjoon-hyun/SPARK-48060.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 .../partition-tests/randomSchemas  |   2 +-
 .../partition-tests/rowsAndPartIds | Bin 4862115 -> 13341426 
bytes
 .../StreamingQueryHashPartitionVerifySuite.scala   |  22 +++--
 3 files changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/test/resources/structured-streaming/partition-tests/randomSchemas
 
b/sql/core/src/test/resources/structured-streaming/partition-tests/randomSchemas
index 8d6ff942610c..f6eadd776cc6 100644
--- 
a/sql/core/src/test/resources/structured-streaming/partition-tests/randomSchemas
+++ 
b/sql/core/src/test/resources/structured-streaming/partition-tests/randomSchemas
@@ -1 +1 @@
-col_0 STRUCT NOT 
NULL, col_3: FLOAT NOT NULL, col_4: INT NOT NULL>,col_1 STRUCT, col_3: 
ARRAY NOT NULL, col_4: ARRAY, col_5: TIMESTAMP NOT NULL, col_6: 
STRUCT, col_1: BIGINT NOT NULL> NOT NULL, col_7: 
ARRAY NOT NULL, col_8: ARRAY, col_9: BIGINT NOT NULL> NOT 
NULL,col_2 BIGINT NOT NULL,col_3 STRUCT,col_1 STRUCT NOT NULL,col_2 STRING NOT 
NULL,col_3 STRUCT, col_2: ARRAY NOT 
NULL> NOT NULL,col_4 BINARY NOT NULL,col_5 ARRAY NOT NULL,col_6 
ARRAY,col_7 DOUBLE NOT NULL,col_8 ARRAY NOT NULL,col_9 
ARRAY,col_10 FLOAT NOT NULL,col_11 STRUCT NOT NULL>, col_1: STRUCT NOT NULL, col_1: 
INT, col_2: STRUCT

(spark) branch master updated: [SPARK-48057][PYTHON][CONNECT][TESTS] Enable `GroupedApplyInPandasTests.test_grouped_with_empty_partition`

2024-04-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new dab20b31388b [SPARK-48057][PYTHON][CONNECT][TESTS] Enable 
`GroupedApplyInPandasTests.test_grouped_with_empty_partition`
dab20b31388b is described below

commit dab20b31388ba7bcd2ab4d4424cbbd072bf84c30
Author: Ruifeng Zheng 
AuthorDate: Tue Apr 30 12:19:18 2024 -0700

[SPARK-48057][PYTHON][CONNECT][TESTS] Enable 
`GroupedApplyInPandasTests.test_grouped_with_empty_partition`

### What changes were proposed in this pull request?
Enable `GroupedApplyInPandasTests. test_grouped_with_empty_partition`

### Why are the changes needed?
test coverage

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #46299 from zhengruifeng/fix_test_grouped_with_empty_partition.

Authored-by: Ruifeng Zheng 
Signed-off-by: Dongjoon Hyun 
---
 python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py | 4 
 python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py | 4 ++--
 2 files changed, 2 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py 
b/python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py
index 1cc4ce012623..8a1da440c799 100644
--- a/python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/connect/test_parity_pandas_grouped_map.py
@@ -38,10 +38,6 @@ class 
GroupedApplyInPandasTests(GroupedApplyInPandasTestsMixin, ReusedConnectTes
 def test_apply_in_pandas_returning_incompatible_type(self):
 super().test_apply_in_pandas_returning_incompatible_type()
 
-@unittest.skip("Spark Connect doesn't support RDD but the test depends on 
it.")
-def test_grouped_with_empty_partition(self):
-super().test_grouped_with_empty_partition()
-
 
 if __name__ == "__main__":
 from pyspark.sql.tests.connect.test_parity_pandas_grouped_map import *  # 
noqa: F401
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py 
b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
index f43dafc0a4a1..1e86e12eb74f 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_grouped_map.py
@@ -680,13 +680,13 @@ class GroupedApplyInPandasTestsMixin:
 data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
 expected = [Row(id=1, x=5), Row(id=1, x=5), Row(id=2, x=4)]
 num_parts = len(data) + 1
-df = self.spark.createDataFrame(self.sc.parallelize(data, 
numSlices=num_parts))
+df = self.spark.createDataFrame(data).repartition(num_parts)
 
 f = pandas_udf(
 lambda pdf: pdf.assign(x=pdf["x"].sum()), "id long, x int", 
PandasUDFType.GROUPED_MAP
 )
 
-result = df.groupBy("id").apply(f).collect()
+result = df.groupBy("id").apply(f).sort("id").collect()
 self.assertEqual(result, expected)
 
 def test_grouped_over_window(self):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated (0329479acb67 -> 9caa6f7f8b8e)

2024-04-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 0329479acb67 [SPARK-47359][SQL] Support TRANSLATE function to work 
with collated strings
 add 9caa6f7f8b8e [SPARK-48061][SQL][TESTS] Parameterize max limits of 
`spark.sql.test.randomDataGenerator`

No new revisions were added by this update.

Summary of changes:
 .../test/scala/org/apache/spark/sql/RandomDataGenerator.scala| 9 ++---
 1 file changed, 6 insertions(+), 3 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-47359][SQL] Support TRANSLATE function to work with collated strings

2024-04-30 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 0329479acb67 [SPARK-47359][SQL] Support TRANSLATE function to work 
with collated strings
0329479acb67 is described below

commit 0329479acb6758c4d3e53d514ea832a181d31065
Author: Milan Dankovic 
AuthorDate: Tue Apr 30 22:28:56 2024 +0800

[SPARK-47359][SQL] Support TRANSLATE function to work with collated strings

### What changes were proposed in this pull request?
Extend built-in string functions to support non-binary, non-lowercase 
collation for: `translate`

### Why are the changes needed?
Update collation support for built-in string functions in Spark.

### Does this PR introduce _any_ user-facing change?
Yes, users should now be able to use COLLATE within arguments for built-in 
string function TRANSLATE in Spark SQL queries, using non-binary collations 
such as UNICODE_CI.

### How was this patch tested?
Unit tests for queries using StringTranslate 
(CollationStringExpressionsSuite.scala).

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45820 from miland-db/miland-db/string-translate.

Authored-by: Milan Dankovic 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/util/CollationSupport.java  | 85 ++
 .../sql/catalyst/analysis/CollationTypeCasts.scala |  3 +-
 .../catalyst/expressions/stringExpressions.scala   | 28 ---
 .../sql/CollationStringExpressionsSuite.scala  | 74 +++
 4 files changed, 180 insertions(+), 10 deletions(-)

diff --git 
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java
 
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java
index 9778ca31209e..b77671cee90b 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java
@@ -25,7 +25,9 @@ import org.apache.spark.unsafe.UTF8StringBuilder;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Pattern;
 
 import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
@@ -483,6 +485,56 @@ public final class CollationSupport {
 }
   }
 
+  public static class StringTranslate {
+public static UTF8String exec(final UTF8String source, Map 
dict,
+final int collationId) {
+  CollationFactory.Collation collation = 
CollationFactory.fetchCollation(collationId);
+  if (collation.supportsBinaryEquality) {
+return execBinary(source, dict);
+  } else if (collation.supportsLowercaseEquality) {
+return execLowercase(source, dict);
+  } else {
+return execICU(source, dict, collationId);
+  }
+}
+public static String genCode(final String source, final String dict, final 
int collationId) {
+  CollationFactory.Collation collation = 
CollationFactory.fetchCollation(collationId);
+  String expr = "CollationSupport.EndsWith.exec";
+  if (collation.supportsBinaryEquality) {
+return String.format(expr + "Binary(%s, %s)", source, dict);
+  } else if (collation.supportsLowercaseEquality) {
+return String.format(expr + "Lowercase(%s, %s)", source, dict);
+  } else {
+return String.format(expr + "ICU(%s, %s, %d)", source, dict, 
collationId);
+  }
+}
+public static UTF8String execBinary(final UTF8String source, Map dict) {
+  return source.translate(dict);
+}
+public static UTF8String execLowercase(final UTF8String source, 
Map dict) {
+  String srcStr = source.toString();
+  StringBuilder sb = new StringBuilder();
+  int charCount = 0;
+  for (int k = 0; k < srcStr.length(); k += charCount) {
+int codePoint = srcStr.codePointAt(k);
+charCount = Character.charCount(codePoint);
+String subStr = srcStr.substring(k, k + charCount);
+String translated = dict.get(subStr.toLowerCase());
+if (null == translated) {
+  sb.append(subStr);
+} else if (!"\0".equals(translated)) {
+  sb.append(translated);
+}
+  }
+  return UTF8String.fromString(sb.toString());
+}
+public static UTF8String execICU(final UTF8String source, Map dict,
+final int collationId) {
+  return source.translate(CollationAwareUTF8String.getCollationAwareDict(
+source, dict, collationId));
+}
+  }
+
   // TODO: Add more collation-aware string expressions.
 
   /**
@@ -808,6 +860,39 @@ public final class CollationSupport {
   }
 }
 
+private static Map getCollationAwareDict(UTF8String string,
+   

(spark) branch master updated: [SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python streaming data source

2024-04-30 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new c8c249204178 [SPARK-47793][SS][PYTHON] Implement 
SimpleDataSourceStreamReader for python streaming data source
c8c249204178 is described below

commit c8c2492041782b9be7f10647191dcd0d5f6a5a8a
Author: Chaoqin Li 
AuthorDate: Tue Apr 30 22:08:32 2024 +0900

[SPARK-47793][SS][PYTHON] Implement SimpleDataSourceStreamReader for python 
streaming data source

### What changes were proposed in this pull request?
SimpleDataSourceStreamReader is a simplified version of the 
DataSourceStreamReader interface.

There are 3 functions that needs to be defined

1. Read data and return the end offset.
_def read(self, start: Offset) -> (Iterator[Tuple], Offset)_

2. Read data between start and end offset, this is required for exactly 
once read.
_def readBetweenOffset(self, start: Offset, end: Offset) -> Iterator[Tuple]_

3. initial start offset of the streaming query.
_def initialOffset() -> dict_

The implementation wrap the SimpleDataSourceStreamReader instance in a 
DataSourceStreamReader that prefetch and cache data in latestOffset. The record 
prefetched in python process will be sent to JVM as arrow record batches in 
planInputPartitions() and cached by block manager and read by partition reader 
from executor later..

### Why are the changes needed?
Compared to DataSourceStreamReader interface, the simplified interface has 
some advantages.
It doesn’t require developers to reason about data partitioning.
It doesn’t require getting the latest offset before reading data.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Add unit test and integration test.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #45977 from chaoqin-li1123/simple_reader_impl.

Lead-authored-by: Chaoqin Li 
Co-authored-by: chaoqin-li1123 
<55518381+chaoqin-li1...@users.noreply.github.com>
Signed-off-by: Jungtaek Lim 
---
 .../scala/org/apache/spark/storage/BlockId.scala   |   8 +
 python/pyspark/sql/datasource.py   | 129 -
 python/pyspark/sql/datasource_internal.py  | 146 ++
 .../streaming/python_streaming_source_runner.py|  58 +++-
 python/pyspark/sql/worker/plan_data_source_read.py | 142 +-
 .../v2/python/PythonMicroBatchStream.scala |  61 +++-
 .../datasources/v2/python/PythonScan.scala |   3 +
 .../PythonStreamingPartitionReaderFactory.scala|  89 ++
 .../python/PythonStreamingSourceRunner.scala   |  57 +++-
 .../python/PythonStreamingDataSourceSuite.scala| 307 -
 10 files changed, 911 insertions(+), 89 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockId.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
index 585d9a886b47..6eb015d56b2c 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockId.scala
@@ -170,6 +170,11 @@ case class StreamBlockId(streamId: Int, uniqueId: Long) 
extends BlockId {
   override def name: String = "input-" + streamId + "-" + uniqueId
 }
 
+@DeveloperApi
+case class PythonStreamBlockId(streamId: Int, uniqueId: Long) extends BlockId {
+  override def name: String = "python-stream-" + streamId + "-" + uniqueId
+}
+
 /** Id associated with temporary local data managed as blocks. Not 
serializable. */
 private[spark] case class TempLocalBlockId(id: UUID) extends BlockId {
   override def name: String = "temp_local_" + id
@@ -213,6 +218,7 @@ object BlockId {
   val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
   val TASKRESULT = "taskresult_([0-9]+)".r
   val STREAM = "input-([0-9]+)-([0-9]+)".r
+  val PYTHON_STREAM = "python-stream-([0-9]+)-([0-9]+)".r
   val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
   val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
   val TEST = "test_(.*)".r
@@ -250,6 +256,8 @@ object BlockId {
   TaskResultBlockId(taskId.toLong)
 case STREAM(streamId, uniqueId) =>
   StreamBlockId(streamId.toInt, uniqueId.toLong)
+case PYTHON_STREAM(streamId, uniqueId) =>
+  PythonStreamBlockId(streamId.toInt, uniqueId.toLong)
 case TEMP_LOCAL(uuid) =>
   TempLocalBlockId(UUID.fromString(uuid))
 case TEMP_SHUFFLE(uuid) =>
diff --git a/python/pyspark/sql/datasource.py b/python/pyspark/sql/datasource.py
index c08b5b7af77f..6cac7e35ff41 100644
--- a/python/pyspark/sql/datasource.py
+++ b/python/pyspark/sql/datasource.py
@@ -183,11 +183,36 @@ class DataSource(ABC):
 message_parameters={"feature": "streamWriter"},
 )
 
+def simpleStreamReader(self, schema: StructType) 

(spark) branch master updated: [SPARK-48003][SQL] Add collation support for hll sketch aggregate

2024-04-30 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 3aea6c258bf3 [SPARK-48003][SQL] Add collation support for hll sketch 
aggregate
3aea6c258bf3 is described below

commit 3aea6c258bf3541d7f53cd3914244f817ed36ff6
Author: Uros Bojanic <157381213+uros...@users.noreply.github.com>
AuthorDate: Tue Apr 30 20:58:43 2024 +0800

[SPARK-48003][SQL] Add collation support for hll sketch aggregate

### What changes were proposed in this pull request?
Introduce collation awareness for hll sketch aggregate.

### Why are the changes needed?
Add collation support for hyperloglog expressions in Spark.

### Does this PR introduce _any_ user-facing change?
Yes, users should now be able to use collated strings within arguments for 
hyperloglog function: hll_sketch_agg.

### How was this patch tested?
E2e sql tests.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #46241 from uros-db/hll-agg.

Authored-by: Uros Bojanic <157381213+uros...@users.noreply.github.com>
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/util/CollationFactory.java | 14 ++
 .../aggregate/datasketchesAggregates.scala|  8 ++--
 .../scala/org/apache/spark/sql/CollationSuite.scala   | 19 +++
 3 files changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
 
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
index 93691e28c692..863445b6 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationFactory.java
@@ -25,6 +25,7 @@ import java.util.function.ToLongFunction;
 import com.ibm.icu.text.RuleBasedCollator;
 import com.ibm.icu.text.StringSearch;
 import com.ibm.icu.util.ULocale;
+import com.ibm.icu.text.CollationKey;
 import com.ibm.icu.text.Collator;
 
 import org.apache.spark.SparkException;
@@ -270,4 +271,17 @@ public final class CollationFactory {
 int collationId = collationNameToId(collationName);
 return collationTable[collationId];
   }
+
+  public static UTF8String getCollationKey(UTF8String input, int collationId) {
+Collation collation = fetchCollation(collationId);
+if (collation.supportsBinaryEquality) {
+  return input;
+} else if (collation.supportsLowercaseEquality) {
+  return input.toLowerCase();
+} else {
+  CollationKey collationKey = 
collation.collator.getCollationKey(input.toString());
+  return UTF8String.fromBytes(collationKey.toByteArray());
+}
+  }
+
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
index 02925f3625d2..2102428131f6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/datasketchesAggregates.scala
@@ -25,7 +25,9 @@ import org.apache.spark.SparkUnsupportedOperationException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, 
Expression, ExpressionDescription, Literal}
 import org.apache.spark.sql.catalyst.trees.BinaryLike
+import org.apache.spark.sql.catalyst.util.CollationFactory
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.internal.types.StringTypeAnyCollation
 import org.apache.spark.sql.types.{AbstractDataType, BinaryType, BooleanType, 
DataType, IntegerType, LongType, StringType, TypeCollection}
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -103,7 +105,7 @@ case class HllSketchAgg(
   override def prettyName: String = "hll_sketch_agg"
 
   override def inputTypes: Seq[AbstractDataType] =
-Seq(TypeCollection(IntegerType, LongType, StringType, BinaryType), 
IntegerType)
+Seq(TypeCollection(IntegerType, LongType, StringTypeAnyCollation, 
BinaryType), IntegerType)
 
   override def dataType: DataType = BinaryType
 
@@ -137,7 +139,9 @@ case class HllSketchAgg(
 // TODO: implement support for decimal/datetime/interval types
 case IntegerType => sketch.update(v.asInstanceOf[Int])
 case LongType => sketch.update(v.asInstanceOf[Long])
-case StringType => sketch.update(v.asInstanceOf[UTF8String].toString)
+case st: StringType =>
+  val cKey = 
CollationFactory.getCollationKey(v.asInstanceOf[UTF8String], st.collationId)
+  sketch.update(cKey.toString)
   

(spark) branch master updated (332570f42203 -> 94763438943e)

2024-04-30 Thread kabhwan
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 332570f42203 [SPARK-48052][PYTHON][CONNECT] Recover `pyspark-connect` 
CI by parent classes
 add 94763438943e [SPARK-48050][SS] Log logical plan at query start

No new revisions were added by this update.

Summary of changes:
 .../org/apache/spark/sql/execution/streaming/StreamExecution.scala| 4 
 1 file changed, 4 insertions(+)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated (12a507464f10 -> 332570f42203)

2024-04-30 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


from 12a507464f10 [SPARK-47566][SQL] Support SubstringIndex function to 
work with collated strings
 add 332570f42203 [SPARK-48052][PYTHON][CONNECT] Recover `pyspark-connect` 
CI by parent classes

No new revisions were added by this update.

Summary of changes:
 python/pyspark/ml/functions.py  |   4 +-
 python/pyspark/sql/connect/dataframe.py |  48 +
 python/pyspark/sql/dataframe.py | 115 
 3 files changed, 97 insertions(+), 70 deletions(-)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch master updated: [SPARK-47566][SQL] Support SubstringIndex function to work with collated strings

2024-04-30 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 12a507464f10 [SPARK-47566][SQL] Support SubstringIndex function to 
work with collated strings
12a507464f10 is described below

commit 12a507464f106d299511d16c2a436cbc0257bc8a
Author: Milan Dankovic 
AuthorDate: Tue Apr 30 17:19:01 2024 +0800

[SPARK-47566][SQL] Support SubstringIndex function to work with collated 
strings

### What changes were proposed in this pull request?
Extend built-in string functions to support non-binary, non-lowercase 
collation for: substring_index.

### Why are the changes needed?
Update collation support for built-in string functions in Spark.

### Does this PR introduce _any_ user-facing change?
Yes, users should now be able to use COLLATE within arguments for built-in 
string function SUBSTRING_INDEX in Spark SQL queries, using non-binary 
collations such as UNICODE_CI.

### How was this patch tested?
Unit tests for queries using SubstringIndex 
(`CollationStringExpressionsSuite.scala`).

### Was this patch authored or co-authored using generative AI tooling?
No

### To consider:
There is no check for collation match between string and delimiter, it will 
be introduced with Implicit Casting.

We can remove the original `public UTF8String subStringIndex(UTF8String 
delim, int count)` method, and get the existing behavior using 
`subStringIndex(delim, count, 0)`.

Closes #45725 from miland-db/miland-db/substringIndex-stringLocate.

Authored-by: Milan Dankovic 
Signed-off-by: Wenchen Fan 
---
 .../spark/sql/catalyst/util/CollationSupport.java  | 169 +
 .../org/apache/spark/unsafe/types/UTF8String.java  |  28 +++-
 .../spark/unsafe/types/CollationSupportSuite.java  |  83 ++
 .../sql/catalyst/analysis/CollationTypeCasts.scala |   5 +
 .../catalyst/expressions/stringExpressions.scala   |  15 +-
 .../sql/CollationStringExpressionsSuite.scala  |  31 
 6 files changed, 323 insertions(+), 8 deletions(-)

diff --git 
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java
 
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java
index 0c81b99de916..9778ca31209e 100644
--- 
a/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java
+++ 
b/common/unsafe/src/main/java/org/apache/spark/sql/catalyst/util/CollationSupport.java
@@ -28,6 +28,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.regex.Pattern;
 
+import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
+import static org.apache.spark.unsafe.Platform.copyMemory;
+
 /**
  * Static entry point for collation-aware expressions (StringExpressions, 
RegexpExpressions, and
  * other expressions that require custom collation support), as well as 
private utility methods for
@@ -441,6 +444,45 @@ public final class CollationSupport {
 }
   }
 
+  public static class SubstringIndex {
+public static UTF8String exec(final UTF8String string, final UTF8String 
delimiter,
+final int count, final int collationId) {
+  CollationFactory.Collation collation = 
CollationFactory.fetchCollation(collationId);
+  if (collation.supportsBinaryEquality) {
+return execBinary(string, delimiter, count);
+  } else if (collation.supportsLowercaseEquality) {
+return execLowercase(string, delimiter, count);
+  } else {
+return execICU(string, delimiter, count, collationId);
+  }
+}
+public static String genCode(final String string, final String delimiter,
+final int count, final int collationId) {
+  CollationFactory.Collation collation = 
CollationFactory.fetchCollation(collationId);
+  String expr = "CollationSupport.SubstringIndex.exec";
+  if (collation.supportsBinaryEquality) {
+return String.format(expr + "Binary(%s, %s, %d)", string, delimiter, 
count);
+  } else if (collation.supportsLowercaseEquality) {
+return String.format(expr + "Lowercase(%s, %s, %d)", string, 
delimiter, count);
+  } else {
+return String.format(expr + "ICU(%s, %s, %d, %d)", string, delimiter, 
count, collationId);
+  }
+}
+public static UTF8String execBinary(final UTF8String string, final 
UTF8String delimiter,
+final int count) {
+  return string.subStringIndex(delimiter, count);
+}
+public static UTF8String execLowercase(final UTF8String string, final 
UTF8String delimiter,
+final int count) {
+  return CollationAwareUTF8String.lowercaseSubStringIndex(string, 
delimiter, count);
+}
+public static UTF8String execICU(final UTF8String string, final UTF8String 
delimiter,
+final int count, final int 

(spark) branch master updated: [SPARK-46122][SQL] Set `spark.sql.legacy.createHiveTableByDefault` to `false` by default

2024-04-30 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 9e8c4aa3f43a [SPARK-46122][SQL] Set 
`spark.sql.legacy.createHiveTableByDefault` to `false` by default
9e8c4aa3f43a is described below

commit 9e8c4aa3f43a3d99bff56cca319db623abc473ee
Author: Dongjoon Hyun 
AuthorDate: Tue Apr 30 01:44:37 2024 -0700

[SPARK-46122][SQL] Set `spark.sql.legacy.createHiveTableByDefault` to 
`false` by default

### What changes were proposed in this pull request?

This PR aims to switch `spark.sql.legacy.createHiveTableByDefault` to 
`false` by default in order to move away from this legacy behavior from `Apache 
Spark 4.0.0` while the legacy functionality will be preserved during Apache 
Spark 4.x period by setting `spark.sql.legacy.createHiveTableByDefault=true`.

### Why are the changes needed?

Historically, this behavior change was merged at `Apache Spark 3.0.0` 
activity in SPARK-30098 and reverted officially during the `3.0.0 RC` period.

- 2019-12-06: #26736 (58be82a)
- 2019-12-06: 
https://lists.apache.org/thread/g90dz1og1zt4rr5h091rn1zqo50y759j
- 2020-05-16: #28517

At `Apache Spark 3.1.0`, we had another discussion and defined it as 
`Legacy` behavior via a new configuration by reusing the JIRA ID, SPARK-30098.
- 2020-12-01: 
https://lists.apache.org/thread/8c8k1jk61pzlcosz3mxo4rkj5l23r204
- 2020-12-03: #30554

Last year, this was proposed again twice and `Apache Spark 4.0.0` is a good 
time to make a decision for Apache Spark future direction.
- SPARK-42603 on 2023-02-27 as an independent idea.
- SPARK-46122 on 2023-11-27 as a part of Apache Spark 4.0.0 idea

### Does this PR introduce _any_ user-facing change?

Yes, the migration document is updated.

### How was this patch tested?

Pass the CIs with the adjusted test cases.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #46207 from dongjoon-hyun/SPARK-46122.

Authored-by: Dongjoon Hyun 
Signed-off-by: Dongjoon Hyun 
---
 docs/sql-migration-guide.md   | 1 +
 python/pyspark/sql/tests/test_readwriter.py   | 5 ++---
 .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala| 2 +-
 .../apache/spark/sql/execution/command/PlanResolutionSuite.scala  | 8 +++-
 4 files changed, 7 insertions(+), 9 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 1e0fdadde1e3..07562babc87d 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -25,6 +25,7 @@ license: |
 ## Upgrading from Spark SQL 3.5 to 4.0
 
 - Since Spark 4.0, `spark.sql.ansi.enabled` is on by default. To restore the 
previous behavior, set `spark.sql.ansi.enabled` to `false` or 
`SPARK_ANSI_SQL_MODE` to `false`.
+- Since Spark 4.0, `CREATE TABLE` syntax without `USING` and `STORED AS` will 
use the value of `spark.sql.sources.default` as the table provider instead of 
`Hive`. To restore the previous behavior, set 
`spark.sql.legacy.createHiveTableByDefault` to `true`.
 - Since Spark 4.0, the default behaviour when inserting elements in a map is 
changed to first normalize keys -0.0 to 0.0. The affected SQL functions are 
`create_map`, `map_from_arrays`, `map_from_entries`, and `map_concat`. To 
restore the previous behaviour, set 
`spark.sql.legacy.disableMapKeyNormalization` to `true`.
 - Since Spark 4.0, the default value of `spark.sql.maxSinglePartitionBytes` is 
changed from `Long.MaxValue` to `128m`. To restore the previous behavior, set 
`spark.sql.maxSinglePartitionBytes` to `9223372036854775807`(`Long.MaxValue`).
 - Since Spark 4.0, any read of SQL tables takes into consideration the SQL 
configs 
`spark.sql.files.ignoreCorruptFiles`/`spark.sql.files.ignoreMissingFiles` 
instead of the core config 
`spark.files.ignoreCorruptFiles`/`spark.files.ignoreMissingFiles`.
diff --git a/python/pyspark/sql/tests/test_readwriter.py 
b/python/pyspark/sql/tests/test_readwriter.py
index 5784d2c72973..e752856d0316 100644
--- a/python/pyspark/sql/tests/test_readwriter.py
+++ b/python/pyspark/sql/tests/test_readwriter.py
@@ -247,10 +247,9 @@ class ReadwriterV2TestsMixin:
 
 def test_create_without_provider(self):
 df = self.df
-with self.assertRaisesRegex(
-AnalysisException, "NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT"
-):
+with self.table("test_table"):
 df.writeTo("test_table").create()
+self.assertEqual(100, self.spark.sql("select * from 
test_table").count())
 
 def test_table_overwrite(self):
 df = self.df
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 

(spark) branch master updated: [SPARK-48055][PYTHON][CONNECT][TESTS] Enable `PandasUDFScalarParityTests.{test_vectorized_udf_empty_partition, test_vectorized_udf_struct_with_empty_partition}`

2024-04-30 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new ed5aa56f1200 [SPARK-48055][PYTHON][CONNECT][TESTS] Enable 
`PandasUDFScalarParityTests.{test_vectorized_udf_empty_partition, 
test_vectorized_udf_struct_with_empty_partition}`
ed5aa56f1200 is described below

commit ed5aa56f1200bc1b0a455269eeb57863b2043fa1
Author: Ruifeng Zheng 
AuthorDate: Tue Apr 30 14:37:30 2024 +0800

[SPARK-48055][PYTHON][CONNECT][TESTS] Enable 
`PandasUDFScalarParityTests.{test_vectorized_udf_empty_partition, 
test_vectorized_udf_struct_with_empty_partition}`

### What changes were proposed in this pull request?
enable two test in `PandasUDFScalarParityTests`

### Why are the changes needed?
test coverage

### Does this PR introduce _any_ user-facing change?
no, test only

### How was this patch tested?
ci

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #46296 from zhengruifeng/enable_test_vectorized_udf_empty_partition.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 .../sql/tests/connect/test_parity_pandas_udf_scalar.py| 11 ---
 python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py |  8 +---
 2 files changed, 5 insertions(+), 14 deletions(-)

diff --git a/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py 
b/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py
index b42bfaf0f58d..590ab695ee07 100644
--- a/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/connect/test_parity_pandas_udf_scalar.py
@@ -21,17 +21,6 @@ from pyspark.testing.connectutils import 
ReusedConnectTestCase
 
 
 class PandasUDFScalarParityTests(ScalarPandasUDFTestsMixin, 
ReusedConnectTestCase):
-def test_nondeterministic_vectorized_udf_in_aggregate(self):
-self.check_nondeterministic_analysis_exception()
-
-@unittest.skip("Spark Connect doesn't support RDD but the test depends on 
it.")
-def test_vectorized_udf_empty_partition(self):
-super().test_vectorized_udf_empty_partition()
-
-@unittest.skip("Spark Connect doesn't support RDD but the test depends on 
it.")
-def test_vectorized_udf_struct_with_empty_partition(self):
-super().test_vectorized_udf_struct_with_empty_partition()
-
 # TODO(SPARK-43727): Parity returnType check in Spark Connect
 @unittest.skip("Fails in Spark Connect, should enable.")
 def test_vectorized_udf_wrong_return_type(self):
diff --git a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py 
b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
index 9edd585da6a0..38bc633cd1ed 100644
--- a/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
+++ b/python/pyspark/sql/tests/pandas/test_pandas_udf_scalar.py
@@ -764,15 +764,17 @@ class ScalarPandasUDFTestsMixin:
 self.assertEqual(df.collect(), res.collect())
 
 def test_vectorized_udf_empty_partition(self):
-df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 2))
+df = self.spark.createDataFrame([Row(id=1)]).repartition(2)
 for udf_type in [PandasUDFType.SCALAR, PandasUDFType.SCALAR_ITER]:
 f = pandas_udf(lambda x: x, LongType(), udf_type)
 res = df.select(f(col("id")))
 self.assertEqual(df.collect(), res.collect())
 
 def test_vectorized_udf_struct_with_empty_partition(self):
-df = self.spark.createDataFrame(self.sc.parallelize([Row(id=1)], 
2)).withColumn(
-"name", lit("John Doe")
+df = (
+self.spark.createDataFrame([Row(id=1)])
+.repartition(2)
+.withColumn("name", lit("John Doe"))
 )
 
 @pandas_udf("first string, last string")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



(spark) branch branch-3.4 updated: [SPARK-47129][CONNECT][SQL][3.4] Make ResolveRelations cache connect plan properly

2024-04-30 Thread ruifengz
This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
 new 5f58fa7738eb [SPARK-47129][CONNECT][SQL][3.4] Make ResolveRelations 
cache connect plan properly
5f58fa7738eb is described below

commit 5f58fa7738eb51d6319fdd6e95ced69f40241cb4
Author: Ruifeng Zheng 
AuthorDate: Tue Apr 30 14:32:52 2024 +0800

[SPARK-47129][CONNECT][SQL][3.4] Make ResolveRelations cache connect plan 
properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix https://github.com/apache/spark/pull/45214 to 3.4

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", 
sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", 
sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve 
dataframe column "id". It's probably because of illegal references like 
`df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work 
with Spark Connect

```
=== Applying Rule 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[#12]Join LeftOuter, '`==`('index, 'id) '[#12]Join 
LeftOuter, '`==`('index, 'id)
!:- '[#9]UnresolvedRelation [test_table_1], [], false :- 
'[#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[#11]Project ['index, 'value_2]  :  +- 
'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[#10]Join Inner, '`==`('id, 'index)   +- 
'[#11]Project ['index, 'value_2]
!  :- '[#7]UnresolvedRelation [test_table_1], [], false  +- 
'[#10]Join Inner, '`==`('id, 'index)
!  +- '[#8]UnresolvedRelation [test_table_2], [], false :- 
'[#9]SubqueryAlias spark_catalog.default.test_table_1
!   :  +- 
'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- 
'[#8]SubqueryAlias spark_catalog.default.test_table_2
!  +- 
'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to 
the cached one
```
:- '[#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, 
[], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes #46290 from zhengruifeng/connect_fix_read_join_34.

Authored-by: Ruifeng Zheng 
Signed-off-by: Ruifeng Zheng 
---
 python/pyspark/sql/tests/test_readwriter.py| 21 +
 .../spark/sql/catalyst/analysis/Analyzer.scala | 27 --
 2 files changed, 41 insertions(+), 7 deletions(-)

diff --git a/python/pyspark/sql/tests/test_readwriter.py 
b/python/pyspark/sql/tests/test_readwriter.py
index f51b0ef06208..9113fb350f63 100644
--- a/python/pyspark/sql/tests/test_readwriter.py
+++ b/python/pyspark/sql/tests/test_readwriter.py
@@ -181,6 +181,27 @@ class ReadwriterTestsMixin:
 df.write.mode("overwrite").insertInto("test_table", False)
 self.assertEqual(6, self.spark.sql("select * from 
test_table").count())
 
+def test_cached_table(self):
+with self.table("test_cached_table_1"):
+self.spark.range(10).withColumn(
+"value_1",
+lit(1),
+).write.saveAsTable("test_cached_table_1")
+
+with self.table("test_cached_table_2"):
+self.spark.range(10).withColumnRenamed("id", 
"index").withColumn(
+"value_2", lit(2)
+).write.saveAsTable("test_cached_table_2")
+
+df1 = self.spark.read.table("test_cached_table_1")
+