This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 61475a826 [lake/iceberg] thrown Exception when partition columns are 
of non-String type (#1831)
61475a826 is described below

commit 61475a8266896e9e48096e66e2ecd8d0575fc0b4
Author: Junbo Wang <[email protected]>
AuthorDate: Mon Oct 20 11:04:05 2025 +0800

    [lake/iceberg] thrown Exception when partition columns are of non-String 
type (#1831)
---
 .../fluss/lake/iceberg/IcebergLakeCatalog.java     |  9 ++++
 .../fluss/lake/iceberg/IcebergLakeCatalogTest.java | 61 ++++++++++++++++++----
 2 files changed, 60 insertions(+), 10 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 7b3a913b8..3b1e4bd0d 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -19,6 +19,7 @@ package org.apache.fluss.lake.iceberg;
 
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
@@ -51,6 +52,7 @@ import java.util.Set;
 import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.apache.iceberg.types.Type.TypeID.STRING;
 
 /** An Iceberg implementation of {@link LakeCatalog}. */
 public class IcebergLakeCatalog implements LakeCatalog {
@@ -214,6 +216,13 @@ public class IcebergLakeCatalog implements LakeCatalog {
         List<String> partitionKeys = tableDescriptor.getPartitionKeys();
         // always set identity partition with partition key
         for (String partitionKey : partitionKeys) {
+            if (!icebergSchema.findType(partitionKey).typeId().equals(STRING)) 
{
+                // TODO: Support other types of partition keys
+                throw new InvalidTableException(
+                        String.format(
+                                "Partition key only support string type for 
iceberg currently. Column `%s` is not string type.",
+                                partitionKey));
+            }
             builder.identity(partitionKey);
         }
 
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
index ef6bdb45e..725fa4fe5 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
@@ -17,7 +17,9 @@
 
 package org.apache.fluss.lake.iceberg;
 
+import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
@@ -29,14 +31,19 @@ import org.apache.iceberg.SortField;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
 import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
@@ -149,18 +156,19 @@ class IcebergLakeCatalogTest {
 
         Schema flussSchema =
                 Schema.newBuilder()
-                        .column("shop_id", DataTypes.BIGINT())
+                        .column("dt", DataTypes.STRING())
                         .column("user_id", DataTypes.BIGINT())
+                        .column("shop_id", DataTypes.BIGINT())
                         .column("num_orders", DataTypes.INT())
                         .column("total_amount", DataTypes.INT().copy(false))
-                        .primaryKey("shop_id", "user_id")
+                        .primaryKey("dt", "user_id")
                         .build();
 
         TableDescriptor tableDescriptor =
                 TableDescriptor.builder()
                         .schema(flussSchema)
                         .distributedBy(10)
-                        .partitionedBy("shop_id")
+                        .partitionedBy("dt")
                         .property("iceberg.write.format.default", "orc")
                         .property("fluss_k1", "fluss_v1")
                         .build();
@@ -178,18 +186,19 @@ class IcebergLakeCatalogTest {
         org.apache.iceberg.Schema expectIcebergSchema =
                 new org.apache.iceberg.Schema(
                         Arrays.asList(
-                                Types.NestedField.required(1, "shop_id", 
Types.LongType.get()),
+                                Types.NestedField.required(1, "dt", 
Types.StringType.get()),
                                 Types.NestedField.required(2, "user_id", 
Types.LongType.get()),
+                                Types.NestedField.optional(3, "shop_id", 
Types.LongType.get()),
                                 Types.NestedField.optional(
-                                        3, "num_orders", 
Types.IntegerType.get()),
+                                        4, "num_orders", 
Types.IntegerType.get()),
                                 Types.NestedField.required(
-                                        4, "total_amount", 
Types.IntegerType.get()),
+                                        5, "total_amount", 
Types.IntegerType.get()),
                                 Types.NestedField.required(
-                                        5, BUCKET_COLUMN_NAME, 
Types.IntegerType.get()),
+                                        6, BUCKET_COLUMN_NAME, 
Types.IntegerType.get()),
                                 Types.NestedField.required(
-                                        6, OFFSET_COLUMN_NAME, 
Types.LongType.get()),
+                                        7, OFFSET_COLUMN_NAME, 
Types.LongType.get()),
                                 Types.NestedField.required(
-                                        7, TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone())),
+                                        8, TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone())),
                         identifierFieldIds);
         
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
 
@@ -197,7 +206,7 @@ class IcebergLakeCatalogTest {
         assertThat(createdTable.spec().fields()).hasSize(2);
         // first should be partitioned by the fluss partition key
         PartitionField partitionField1 = createdTable.spec().fields().get(0);
-        assertThat(partitionField1.name()).isEqualTo("shop_id");
+        assertThat(partitionField1.name()).isEqualTo("dt");
         
assertThat(partitionField1.transform().toString()).isEqualTo("identity");
         assertThat(partitionField1.sourceId()).isEqualTo(1);
 
@@ -396,4 +405,36 @@ class IcebergLakeCatalogTest {
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessageContaining("Only one bucket key is supported for 
Iceberg");
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    void testIllegalPartitionKeyType(boolean isPrimaryKeyTable) throws 
Exception {
+        TablePath t1 =
+                TablePath.of(
+                        "test_db",
+                        isPrimaryKeyTable
+                                ? "pkIllegalPartitionKeyType"
+                                : "logIllegalPartitionKeyType");
+        Schema.Builder builder =
+                Schema.newBuilder()
+                        .column("c0", DataTypes.STRING())
+                        .column("c1", DataTypes.BOOLEAN());
+        if (isPrimaryKeyTable) {
+            builder.primaryKey("c0", "c1");
+        }
+        List<String> partitionKeys = List.of("c1");
+        TableDescriptor.Builder tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(builder.build())
+                        .distributedBy(1, "c0")
+                        .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), 
"true")
+                        .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, 
Duration.ofMillis(500));
+        tableDescriptor.partitionedBy(partitionKeys);
+
+        Assertions.assertThatThrownBy(
+                        () -> flussIcebergCatalog.createTable(t1, 
tableDescriptor.build()))
+                .isInstanceOf(InvalidTableException.class)
+                .hasMessage(
+                        "Partition key only support string type for iceberg 
currently. Column `c1` is not string type.");
+    }
 }

Reply via email to