This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a1185766 [spark] Support write with composite partition key (#3985)
0a1185766 is described below

commit 0a1185766e2c1d63ff1e6b05588ac8b05be16473
Author: xuzifu666 <[email protected]>
AuthorDate: Sun Aug 18 22:35:41 2024 +0800

    [spark] Support write with composite partition key (#3985)
---
 .../java/org/apache/paimon/utils/StringUtils.java  |  4 +
 .../sql/PaimonCompositePartitionKeyTest.scala      | 21 +++++
 .../sql/PaimonCompositePartitionKeyTest.scala      | 21 +++++
 .../sql/PaimonCompositePartitionKeyTest.scala      | 21 +++++
 .../sql/PaimonCompositePartitionKeyTest.scala      | 21 +++++
 .../scala/org/apache/paimon/spark/SparkTable.scala |  3 +-
 .../scala/org/apache/spark/sql/PaimonUtils.scala   |  2 +-
 .../sql/PaimonCompositePartitionKeyTestBase.scala  | 97 ++++++++++++++++++++++
 8 files changed, 188 insertions(+), 2 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index 4d0d5b672..3dbc9779e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -551,6 +551,10 @@ public class StringUtils {
         return true;
     }
 
+    public static String quote(String str) {
+        return "`" + str + "`";
+    }
+
     public static String caseSensitiveConversion(String str, boolean 
allowUpperCase) {
         return allowUpperCase ? str : str.toLowerCase();
     }
diff --git 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
new file mode 100644
index 000000000..635185a9e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonCompositePartitionKeyTest extends 
PaimonCompositePartitionKeyTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
new file mode 100644
index 000000000..635185a9e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonCompositePartitionKeyTest extends 
PaimonCompositePartitionKeyTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
new file mode 100644
index 000000000..635185a9e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonCompositePartitionKeyTest extends 
PaimonCompositePartitionKeyTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
new file mode 100644
index 000000000..635185a9e
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonCompositePartitionKeyTest extends 
PaimonCompositePartitionKeyTestBase {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 902619f05..dd24fc6b1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.table.{DataTable, FileStoreTable, Table}
+import org.apache.paimon.utils.StringUtils
 
 import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, SupportsRead, SupportsWrite, TableCapability, 
TableCatalog}
 import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
@@ -49,7 +50,7 @@ case class SparkTable(table: Table)
   override lazy val schema: StructType = 
SparkTypeUtils.fromPaimonRowType(table.rowType)
 
   override def partitioning: Array[Transform] = {
-    table.partitionKeys().asScala.map(p => Expressions.identity(p)).toArray
+    table.partitionKeys().asScala.map(p => 
Expressions.identity(StringUtils.quote(p))).toArray
   }
 
   override def properties: JMap[String, String] = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index f4e551d83..4492d856a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -65,7 +65,7 @@ object PaimonUtils {
   }
 
   def fieldReference(name: String): NamedReference = {
-    FieldReference(name)
+    FieldReference(Seq(name))
   }
 
   def bytesToString(size: Long): String = {
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTestBase.scala
new file mode 100644
index 000000000..d03533aa4
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTestBase.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.trees.TreePattern.DYNAMIC_PRUNING_SUBQUERY
+import org.junit.jupiter.api.Assertions
+
+abstract class PaimonCompositePartitionKeyTestBase extends PaimonSparkTestBase 
{
+
+  import testImplicits._
+
+  test("PaimonCompositePartitionKeyTest") {
+    withTable("source", "t", "t1") {
+      Seq((1L, "x1", "2023"), (2L, "x2", "2023"), (5L, "x5", "2025"), (6L, 
"x6", "2026"))
+        .toDF("a", "b", "pt t")
+        .createOrReplaceTempView("source")
+
+      // test single composite partition key
+      spark.sql("""
+                  |CREATE TABLE t (id INT, name STRING, `pt t` STRING) 
PARTITIONED BY (`pt t`)
+                  |""".stripMargin)
+
+      spark.sql(
+        """
+          |INSERT INTO t(`id`, `name`, `pt t`) VALUES (1, "a", "2023"), (3, 
"c", "2023"), (5, "e", "2025"), (7, "g", "2027")
+          |""".stripMargin)
+
+      val df1 = spark.sql("""
+                            |SELECT t.id, t.name, source.b FROM source join t
+                            |ON source.`pt t` = t.`pt t` AND source.`pt t` = 
'2023'
+                            |ORDER BY t.id, source.b
+                            |""".stripMargin)
+      val qe1 = df1.queryExecution
+      
Assertions.assertFalse(qe1.analyzed.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
+      checkAnswer(
+        df1,
+        Row(1, "a", "x1") :: Row(1, "a", "x2") :: Row(3, "c", "x1") :: Row(3, 
"c", "x2") :: Nil)
+
+      val df2 = spark.sql("""
+                            |SELECT t.*, source.b FROM source join t
+                            |ON source.a = t.id AND source.`pt t` = t.`pt t` 
AND source.a > 3
+                            |""".stripMargin)
+      val qe2 = df1.queryExecution
+      
Assertions.assertFalse(qe2.analyzed.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
+      checkAnswer(df2, Row(5, "e", "2025", "x5") :: Nil)
+
+      // test normal and composite partitions key
+      spark.sql(
+        """
+          |CREATE TABLE t1 (id INT, name STRING, `pt t` STRING, v STRING) 
PARTITIONED BY (`pt t`, v)
+          |""".stripMargin)
+
+      spark.sql(
+        """
+          |INSERT INTO t1(`id`, `name`, `pt t`, v) VALUES (1, "a", "2023", 
"2222"), (3, "c", "2023", "2223"), (5, "e", "2025", "2224"), (7, "g", "2027", 
"2225")
+          |""".stripMargin)
+
+      val df3 =
+        spark.sql("""
+                    |SELECT t1.id, t1.name, source.b FROM source join t1
+                    |ON source.`pt t` = t1.`pt t` AND source.`pt t` = '2023' 
AND t1.v = '2223'
+                    |ORDER BY t1.id, source.b
+                    |""".stripMargin)
+      val qe3 = df3.queryExecution
+      
Assertions.assertFalse(qe3.analyzed.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
+      checkAnswer(df3, Row(3, "c", "x1") :: Row(3, "c", "x2") :: Nil)
+
+      val df4 = spark.sql("""
+                            |SELECT t1.*, source.b FROM source join t1
+                            |ON source.a = t1.id AND source.`pt t` = t1.`pt t` 
AND source.a > 3
+                            |""".stripMargin)
+      val qe4 = df4.queryExecution
+      
Assertions.assertFalse(qe4.analyzed.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
+      checkAnswer(df4, Row(5, "e", "2025", "2224", "x5") :: Nil)
+
+    }
+  }
+}

Reply via email to