This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0a1185766 [spark] Support write with composite partition key (#3985)
0a1185766 is described below
commit 0a1185766e2c1d63ff1e6b05588ac8b05be16473
Author: xuzifu666 <[email protected]>
AuthorDate: Sun Aug 18 22:35:41 2024 +0800
[spark] Support write with composite partition key (#3985)
---
.../java/org/apache/paimon/utils/StringUtils.java | 4 +
.../sql/PaimonCompositePartitionKeyTest.scala | 21 +++++
.../sql/PaimonCompositePartitionKeyTest.scala | 21 +++++
.../sql/PaimonCompositePartitionKeyTest.scala | 21 +++++
.../sql/PaimonCompositePartitionKeyTest.scala | 21 +++++
.../scala/org/apache/paimon/spark/SparkTable.scala | 3 +-
.../scala/org/apache/spark/sql/PaimonUtils.scala | 2 +-
.../sql/PaimonCompositePartitionKeyTestBase.scala | 97 ++++++++++++++++++++++
8 files changed, 188 insertions(+), 2 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index 4d0d5b672..3dbc9779e 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -551,6 +551,10 @@ public class StringUtils {
return true;
}
+ public static String quote(String str) {
+ return "`" + str + "`";
+ }
+
public static String caseSensitiveConversion(String str, boolean
allowUpperCase) {
return allowUpperCase ? str : str.toLowerCase();
}
diff --git
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
new file mode 100644
index 000000000..635185a9e
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonCompositePartitionKeyTest extends
PaimonCompositePartitionKeyTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
new file mode 100644
index 000000000..635185a9e
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonCompositePartitionKeyTest extends
PaimonCompositePartitionKeyTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
new file mode 100644
index 000000000..635185a9e
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonCompositePartitionKeyTest extends
PaimonCompositePartitionKeyTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
new file mode 100644
index 000000000..635185a9e
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class PaimonCompositePartitionKeyTest extends
PaimonCompositePartitionKeyTestBase {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 902619f05..dd24fc6b1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
import org.apache.paimon.options.Options
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.table.{DataTable, FileStoreTable, Table}
+import org.apache.paimon.utils.StringUtils
import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, SupportsRead, SupportsWrite, TableCapability,
TableCatalog}
import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
@@ -49,7 +50,7 @@ case class SparkTable(table: Table)
override lazy val schema: StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType)
override def partitioning: Array[Transform] = {
- table.partitionKeys().asScala.map(p => Expressions.identity(p)).toArray
+ table.partitionKeys().asScala.map(p =>
Expressions.identity(StringUtils.quote(p))).toArray
}
override def properties: JMap[String, String] = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index f4e551d83..4492d856a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -65,7 +65,7 @@ object PaimonUtils {
}
def fieldReference(name: String): NamedReference = {
- FieldReference(name)
+ FieldReference(Seq(name))
}
def bytesToString(size: Long): String = {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTestBase.scala
new file mode 100644
index 000000000..d03533aa4
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonCompositePartitionKeyTestBase.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.trees.TreePattern.DYNAMIC_PRUNING_SUBQUERY
+import org.junit.jupiter.api.Assertions
+
+abstract class PaimonCompositePartitionKeyTestBase extends PaimonSparkTestBase
{
+
+ import testImplicits._
+
+ test("PaimonCompositePartitionKeyTest") {
+ withTable("source", "t", "t1") {
+ Seq((1L, "x1", "2023"), (2L, "x2", "2023"), (5L, "x5", "2025"), (6L,
"x6", "2026"))
+ .toDF("a", "b", "pt t")
+ .createOrReplaceTempView("source")
+
+ // test single composite partition key
+ spark.sql("""
+ |CREATE TABLE t (id INT, name STRING, `pt t` STRING)
PARTITIONED BY (`pt t`)
+ |""".stripMargin)
+
+ spark.sql(
+ """
+ |INSERT INTO t(`id`, `name`, `pt t`) VALUES (1, "a", "2023"), (3,
"c", "2023"), (5, "e", "2025"), (7, "g", "2027")
+ |""".stripMargin)
+
+ val df1 = spark.sql("""
+ |SELECT t.id, t.name, source.b FROM source join t
+ |ON source.`pt t` = t.`pt t` AND source.`pt t` =
'2023'
+ |ORDER BY t.id, source.b
+ |""".stripMargin)
+ val qe1 = df1.queryExecution
+
Assertions.assertFalse(qe1.analyzed.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
+ checkAnswer(
+ df1,
+ Row(1, "a", "x1") :: Row(1, "a", "x2") :: Row(3, "c", "x1") :: Row(3,
"c", "x2") :: Nil)
+
+ val df2 = spark.sql("""
+ |SELECT t.*, source.b FROM source join t
+ |ON source.a = t.id AND source.`pt t` = t.`pt t`
AND source.a > 3
+ |""".stripMargin)
+ val qe2 = df1.queryExecution
+
Assertions.assertFalse(qe2.analyzed.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
+ checkAnswer(df2, Row(5, "e", "2025", "x5") :: Nil)
+
+ // test normal and composite partitions key
+ spark.sql(
+ """
+ |CREATE TABLE t1 (id INT, name STRING, `pt t` STRING, v STRING)
PARTITIONED BY (`pt t`, v)
+ |""".stripMargin)
+
+ spark.sql(
+ """
+ |INSERT INTO t1(`id`, `name`, `pt t`, v) VALUES (1, "a", "2023",
"2222"), (3, "c", "2023", "2223"), (5, "e", "2025", "2224"), (7, "g", "2027",
"2225")
+ |""".stripMargin)
+
+ val df3 =
+ spark.sql("""
+ |SELECT t1.id, t1.name, source.b FROM source join t1
+ |ON source.`pt t` = t1.`pt t` AND source.`pt t` = '2023'
AND t1.v = '2223'
+ |ORDER BY t1.id, source.b
+ |""".stripMargin)
+ val qe3 = df3.queryExecution
+
Assertions.assertFalse(qe3.analyzed.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
+ checkAnswer(df3, Row(3, "c", "x1") :: Row(3, "c", "x2") :: Nil)
+
+ val df4 = spark.sql("""
+ |SELECT t1.*, source.b FROM source join t1
+ |ON source.a = t1.id AND source.`pt t` = t1.`pt t`
AND source.a > 3
+ |""".stripMargin)
+ val qe4 = df4.queryExecution
+
Assertions.assertFalse(qe4.analyzed.containsPattern(DYNAMIC_PRUNING_SUBQUERY))
+ checkAnswer(df4, Row(5, "e", "2025", "2224", "x5") :: Nil)
+
+ }
+ }
+}