This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e92b76ff [spark] Fix partition transform converter (#3876)
9e92b76ff is described below

commit 9e92b76ff2cfe130e6766e40fc421644e33e3464
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Aug 2 19:14:02 2024 +0800

    [spark] Fix partition transform converter (#3876)
---
 .../java/org/apache/paimon/spark/SparkCatalog.java | 32 ++++++++++++++--------
 .../org/apache/paimon/spark/sql/DDLTestBase.scala  | 10 +++++++
 2 files changed, 30 insertions(+), 12 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index dd3efc724..a2ea6d0fa 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -37,6 +37,7 @@ import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.connector.catalog.TableChange;
 import org.apache.spark.sql.connector.expressions.FieldReference;
+import org.apache.spark.sql.connector.expressions.IdentityTransform;
 import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.internal.SessionState;
@@ -46,6 +47,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -387,14 +389,6 @@ public class SparkCatalog extends SparkBaseCatalog {
 
     private Schema toInitialSchema(
             StructType schema, Transform[] partitions, Map<String, String> 
properties) {
-        checkArgument(
-                Arrays.stream(partitions)
-                        .allMatch(
-                                partition -> {
-                                    NamedReference[] references = 
partition.references();
-                                    return references.length == 1
-                                            && references[0] instanceof 
FieldReference;
-                                }));
         Map<String, String> normalizedProperties = mergeSQLConf(properties);
         normalizedProperties.remove(PRIMARY_KEY_IDENTIFIER);
         normalizedProperties.remove(TableCatalog.PROP_COMMENT);
@@ -409,10 +403,7 @@ public class SparkCatalog extends SparkBaseCatalog {
                 Schema.newBuilder()
                         .options(normalizedProperties)
                         .primaryKey(primaryKeys)
-                        .partitionKeys(
-                                Arrays.stream(partitions)
-                                        .map(partition -> 
partition.references()[0].describe())
-                                        .collect(Collectors.toList()))
+                        .partitionKeys(convertPartitionTransforms(partitions))
                         
.comment(properties.getOrDefault(TableCatalog.PROP_COMMENT, null));
 
         for (StructField field : schema.fields()) {
@@ -474,6 +465,23 @@ public class SparkCatalog extends SparkBaseCatalog {
         }
     }
 
+    protected List<String> convertPartitionTransforms(Transform[] transforms) {
+        List<String> partitionColNames = new ArrayList<>(transforms.length);
+        for (Transform transform : transforms) {
+            if (!(transform instanceof IdentityTransform)) {
+                throw new UnsupportedOperationException(
+                        "Unsupported partition transform: " + transform);
+            }
+            NamedReference ref = ((IdentityTransform) transform).ref();
+            if (!(ref instanceof FieldReference || ref.fieldNames().length != 
1)) {
+                throw new UnsupportedOperationException(
+                        "Unsupported partition transform: " + transform);
+            }
+            partitionColNames.add(ref.fieldNames()[0]);
+        }
+        return partitionColNames;
+    }
+
     // --------------------- unsupported methods ----------------------------
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
index db749a636..e1113add9 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DDLTestBase.scala
@@ -446,4 +446,14 @@ abstract class DDLTestBase extends PaimonSparkTestBase {
         }
     }
   }
+
+  test("Paimon DDL: create table with unsupported partitioned by") {
+    val error = intercept[RuntimeException] {
+      sql(s"""
+             |CREATE TABLE T (id STRING, name STRING, pt STRING)
+             |PARTITIONED BY (substr(pt, 1, 2))
+             |""".stripMargin)
+    }.getMessage
+    assert(error.contains("Unsupported partition transform"))
+  }
 }

Reply via email to