This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9e92b76ff [spark] Fix partition transform converter (#3876)
9e92b76ff is described below
commit 9e92b76ff2cfe130e6766e40fc421644e33e3464
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Aug 2 19:14:02 2024 +0800
[spark] Fix partition transform converter (#3876)
---
.../java/org/apache/paimon/spark/SparkCatalog.java | 32 ++++++++++++++--------
.../org/apache/paimon/spark/sql/DDLTestBase.scala | 10 +++++++
2 files changed, 30 insertions(+), 12 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index dd3efc724..a2ea6d0fa 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -37,6 +37,7 @@ import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.expressions.FieldReference;
+import org.apache.spark.sql.connector.expressions.IdentityTransform;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.internal.SessionState;
@@ -46,6 +47,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -387,14 +389,6 @@ public class SparkCatalog extends SparkBaseCatalog {
private Schema toInitialSchema(
StructType schema, Transform[] partitions, Map<String, String>
properties) {
- checkArgument(
- Arrays.stream(partitions)
- .allMatch(
- partition -> {
- NamedReference[] references =
partition.references();
- return references.length == 1
- && references[0] instanceof
FieldReference;
- }));
Map<String, String> normalizedProperties = mergeSQLConf(properties);
normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
normalizedProperties.remove(TableCatalog.PROP_COMMENT);
@@ -409,10 +403,7 @@ public class SparkCatalog extends SparkBaseCatalog {
Schema.newBuilder()
.options(normalizedProperties)
.primaryKey(primaryKeys)
- .partitionKeys(
- Arrays.stream(partitions)
- .map(partition ->
partition.references()[0].describe())
- .collect(Collectors.toList()))
+ .partitionKeys(convertPartitionTransforms(partitions))
.comment(properties.getOrDefault(TableCatalog.PROP_COMMENT, null));
for (StructField field : schema.fields()) {
@@ -474,6 +465,23 @@ public class SparkCatalog extends SparkBaseCatalog {
}
}
+ protected List<String> convertPartitionTransforms(Transform[] transforms) {
+ List<String> partitionColNames = new ArrayList<>(transforms.length);
+ for (Transform transform : transforms) {
+ if (!(transform instanceof IdentityTransform)) {
+ throw new UnsupportedOperationException(
+ "Unsupported partition transform: " + transform);
+ }
+ NamedReference ref = ((IdentityTransform) transform).ref();
+ if (!(ref instanceof FieldReference || ref.fieldNames().length !=
1)) {
+ throw new UnsupportedOperationException(
+ "Unsupported partition transform: " + transform);
+ }
+ partitionColNames.add(ref.fieldNames()[0]);
+ }
+ return partitionColNames;
+ }
+
// --------------------- unsupported methods ----------------------------
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index db749a636..e1113add9 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -446,4 +446,14 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
}
}
}
+
+ test("Paimon DDL: create table with unsupported partitioned by") {
+ val error = intercept[RuntimeException] {
+ sql(s"""
+ |CREATE TABLE T (id STRING, name STRING, pt STRING)
+ |PARTITIONED BY (substr(pt, 1, 2))
+ |""".stripMargin)
+ }.getMessage
+ assert(error.contains("Unsupported partition transform"))
+ }
}