This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 799f00816 [spark] Fix update append table with spark3.3- (#3566)
799f00816 is described below

commit 799f00816d940384ce855b4c9ddf2830144ce0b9
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jun 21 10:52:18 2024 +0800

    [spark] Fix update append table with spark3.3- (#3566)
---
 .../apache/paimon/spark/sql/UpdateTableTest.scala  | 21 ++++++++++
 .../apache/paimon/spark/sql/UpdateTableTest.scala  | 21 ++++++++++
 .../apache/paimon/spark/sql/UpdateTableTest.scala  | 21 ++++++++++
 .../apache/paimon/spark/sql/UpdateTableTest.scala  | 21 ++++++++++
 .../spark/commands/UpdatePaimonTableCommand.scala  |  3 +-
 ...teTableTest.scala => UpdateTableTestBase.scala} | 45 ++++++++++++----------
 6 files changed, 111 insertions(+), 21 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
new file mode 100644
index 000000000..194aab278
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class UpdateTableTest extends UpdateTableTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
new file mode 100644
index 000000000..194aab278
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class UpdateTableTest extends UpdateTableTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
new file mode 100644
index 000000000..194aab278
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class UpdateTableTest extends UpdateTableTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
new file mode 100644
index 000000000..194aab278
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class UpdateTableTest extends UpdateTableTestBase {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index ee41c5ebb..7ca51bc96 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark.commands
 
 import org.apache.paimon.spark.PaimonSplitScan
+import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
@@ -107,7 +108,7 @@ case class UpdatePaimonTableCommand(
         touchedFiles,
         rawConvertible = true,
         table.store().pathFactory())
-      val toUpdateScanRelation = DataSourceV2ScanRelation(
+      val toUpdateScanRelation = Compatibility.createDataSourceV2ScanRelation(
         relation,
         PaimonSplitScan(table, touchedDataSplits),
         relation.output)
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
similarity index 92%
rename from 
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
rename to 
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
index c438c03de..d7222a197 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
@@ -24,7 +24,7 @@ import org.apache.paimon.spark.catalyst.analysis.Update
 
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
 
-class UpdateTableTest extends PaimonSparkTestBase {
+abstract class UpdateTableTestBase extends PaimonSparkTestBase {
 
   import testImplicits._
 
@@ -133,25 +133,30 @@ class UpdateTableTest extends PaimonSparkTestBase {
       Seq((1, "a3", "2024"), (2, "b3", "2024"), (3, "c3", "2025"), (4, "d3", 
"2025")).toDF()
     )
 
-    // IN
-    spark.sql("""
-                |UPDATE T
-                |SET name = concat(substring(name, 0, 1), '5')
-                |WHERE id IN (SELECT key FROM source)""".stripMargin)
-    checkAnswer(
-      spark.sql("SELECT * FROM T ORDER BY id"),
-      Seq((1, "a3", "2024"), (2, "b5", "2024"), (3, "c3", "2025"), (4, "d5", 
"2025")).toDF()
-    )
-
-    // NOT IN
-    spark.sql("""
-                |UPDATE T
-                |SET name = concat(substring(name, 0, 1), '6')
-                |WHERE id NOT IN (SELECT key FROM source)""".stripMargin)
-    checkAnswer(
-      spark.sql("SELECT * FROM T ORDER BY id"),
-      Seq((1, "a6", "2024"), (2, "b5", "2024"), (3, "c6", "2025"), (4, "d5", 
"2025")).toDF()
-    )
+    // Spark support using Exists/In subqueries in Project node since Spark34 
which is needed
+    // in [[UpdatePaimonTableCommand#performUpdateForNonPkTable]].
+    // So this case can only be passed since Spark34, todo: support it with 
Spark33-
+    if (gteqSpark3_4) {
+      // IN
+      spark.sql("""
+                  |UPDATE T
+                  |SET name = concat(substring(name, 0, 1), '5')
+                  |WHERE id IN (SELECT key FROM source)""".stripMargin)
+      checkAnswer(
+        spark.sql("SELECT * FROM T ORDER BY id"),
+        Seq((1, "a3", "2024"), (2, "b5", "2024"), (3, "c3", "2025"), (4, "d5", 
"2025")).toDF()
+      )
+
+      // NOT IN
+      spark.sql("""
+                  |UPDATE T
+                  |SET name = concat(substring(name, 0, 1), '6')
+                  |WHERE id NOT IN (SELECT key FROM source)""".stripMargin)
+      checkAnswer(
+        spark.sql("SELECT * FROM T ORDER BY id"),
+        Seq((1, "a6", "2024"), (2, "b5", "2024"), (3, "c6", "2025"), (4, "d5", 
"2025")).toDF()
+      )
+    }
   }
 
   CoreOptions.MergeEngine.values().foreach {

Reply via email to