This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 799f00816 [spark] Fix update append table with spark3.3- (#3566)
799f00816 is described below
commit 799f00816d940384ce855b4c9ddf2830144ce0b9
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Jun 21 10:52:18 2024 +0800
[spark] Fix update append table with spark3.3- (#3566)
---
.../apache/paimon/spark/sql/UpdateTableTest.scala | 21 ++++++++++
.../apache/paimon/spark/sql/UpdateTableTest.scala | 21 ++++++++++
.../apache/paimon/spark/sql/UpdateTableTest.scala | 21 ++++++++++
.../apache/paimon/spark/sql/UpdateTableTest.scala | 21 ++++++++++
.../spark/commands/UpdatePaimonTableCommand.scala | 3 +-
...teTableTest.scala => UpdateTableTestBase.scala} | 45 ++++++++++++----------
6 files changed, 111 insertions(+), 21 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
new file mode 100644
index 000000000..194aab278
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class UpdateTableTest extends UpdateTableTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
new file mode 100644
index 000000000..194aab278
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class UpdateTableTest extends UpdateTableTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
new file mode 100644
index 000000000..194aab278
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class UpdateTableTest extends UpdateTableTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
new file mode 100644
index 000000000..194aab278
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class UpdateTableTest extends UpdateTableTestBase {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index ee41c5ebb..7ca51bc96 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -19,6 +19,7 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.spark.PaimonSplitScan
+import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
@@ -107,7 +108,7 @@ case class UpdatePaimonTableCommand(
touchedFiles,
rawConvertible = true,
table.store().pathFactory())
- val toUpdateScanRelation = DataSourceV2ScanRelation(
+ val toUpdateScanRelation = Compatibility.createDataSourceV2ScanRelation(
relation,
PaimonSplitScan(table, touchedDataSplits),
relation.output)
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
similarity index 92%
rename from
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
rename to
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
index c438c03de..d7222a197 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
@@ -24,7 +24,7 @@ import org.apache.paimon.spark.catalyst.analysis.Update
import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
-class UpdateTableTest extends PaimonSparkTestBase {
+abstract class UpdateTableTestBase extends PaimonSparkTestBase {
import testImplicits._
@@ -133,25 +133,30 @@ class UpdateTableTest extends PaimonSparkTestBase {
Seq((1, "a3", "2024"), (2, "b3", "2024"), (3, "c3", "2025"), (4, "d3",
"2025")).toDF()
)
- // IN
- spark.sql("""
- |UPDATE T
- |SET name = concat(substring(name, 0, 1), '5')
- |WHERE id IN (SELECT key FROM source)""".stripMargin)
- checkAnswer(
- spark.sql("SELECT * FROM T ORDER BY id"),
- Seq((1, "a3", "2024"), (2, "b5", "2024"), (3, "c3", "2025"), (4, "d5",
"2025")).toDF()
- )
-
- // NOT IN
- spark.sql("""
- |UPDATE T
- |SET name = concat(substring(name, 0, 1), '6')
- |WHERE id NOT IN (SELECT key FROM source)""".stripMargin)
- checkAnswer(
- spark.sql("SELECT * FROM T ORDER BY id"),
- Seq((1, "a6", "2024"), (2, "b5", "2024"), (3, "c6", "2025"), (4, "d5",
"2025")).toDF()
- )
+ // Spark support using Exists/In subqueries in Project node since Spark34
which is needed
+ // in [[UpdatePaimonTableCommand#performUpdateForNonPkTable]].
+ // So this case can only be passed since Spark34, todo: support it with
Spark33-
+ if (gteqSpark3_4) {
+ // IN
+ spark.sql("""
+ |UPDATE T
+ |SET name = concat(substring(name, 0, 1), '5')
+ |WHERE id IN (SELECT key FROM source)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a3", "2024"), (2, "b5", "2024"), (3, "c3", "2025"), (4, "d5",
"2025")).toDF()
+ )
+
+ // NOT IN
+ spark.sql("""
+ |UPDATE T
+ |SET name = concat(substring(name, 0, 1), '6')
+ |WHERE id NOT IN (SELECT key FROM source)""".stripMargin)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a6", "2024"), (2, "b5", "2024"), (3, "c6", "2025"), (4, "d5",
"2025")).toDF()
+ )
+ }
}
CoreOptions.MergeEngine.values().foreach {