This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 61475a826 [lake/iceberg] thrown Exception when partition columns are
of non-String type (#1831)
61475a826 is described below
commit 61475a8266896e9e48096e66e2ecd8d0575fc0b4
Author: Junbo Wang <[email protected]>
AuthorDate: Mon Oct 20 11:04:05 2025 +0800
[lake/iceberg] thrown Exception when partition columns are of non-String
type (#1831)
---
.../fluss/lake/iceberg/IcebergLakeCatalog.java | 9 ++++
.../fluss/lake/iceberg/IcebergLakeCatalogTest.java | 61 ++++++++++++++++++----
2 files changed, 60 insertions(+), 10 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 7b3a913b8..3b1e4bd0d 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -19,6 +19,7 @@ package org.apache.fluss.lake.iceberg;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.iceberg.utils.IcebergCatalogUtils;
@@ -51,6 +52,7 @@ import java.util.Set;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.apache.iceberg.types.Type.TypeID.STRING;
/** An Iceberg implementation of {@link LakeCatalog}. */
public class IcebergLakeCatalog implements LakeCatalog {
@@ -214,6 +216,13 @@ public class IcebergLakeCatalog implements LakeCatalog {
List<String> partitionKeys = tableDescriptor.getPartitionKeys();
// always set identity partition with partition key
for (String partitionKey : partitionKeys) {
+ if (!icebergSchema.findType(partitionKey).typeId().equals(STRING))
{
+ // TODO: Support other types of partition keys
+ throw new InvalidTableException(
+ String.format(
+ "Partition key only support string type for
iceberg currently. Column `%s` is not string type.",
+ partitionKey));
+ }
builder.identity(partitionKey);
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
index ef6bdb45e..725fa4fe5 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
@@ -17,7 +17,9 @@
package org.apache.fluss.lake.iceberg;
+import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
@@ -29,14 +31,19 @@ import org.apache.iceberg.SortField;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
@@ -149,18 +156,19 @@ class IcebergLakeCatalogTest {
Schema flussSchema =
Schema.newBuilder()
- .column("shop_id", DataTypes.BIGINT())
+ .column("dt", DataTypes.STRING())
.column("user_id", DataTypes.BIGINT())
+ .column("shop_id", DataTypes.BIGINT())
.column("num_orders", DataTypes.INT())
.column("total_amount", DataTypes.INT().copy(false))
- .primaryKey("shop_id", "user_id")
+ .primaryKey("dt", "user_id")
.build();
TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(flussSchema)
.distributedBy(10)
- .partitionedBy("shop_id")
+ .partitionedBy("dt")
.property("iceberg.write.format.default", "orc")
.property("fluss_k1", "fluss_v1")
.build();
@@ -178,18 +186,19 @@ class IcebergLakeCatalogTest {
org.apache.iceberg.Schema expectIcebergSchema =
new org.apache.iceberg.Schema(
Arrays.asList(
- Types.NestedField.required(1, "shop_id",
Types.LongType.get()),
+ Types.NestedField.required(1, "dt",
Types.StringType.get()),
Types.NestedField.required(2, "user_id",
Types.LongType.get()),
+ Types.NestedField.optional(3, "shop_id",
Types.LongType.get()),
Types.NestedField.optional(
- 3, "num_orders",
Types.IntegerType.get()),
+ 4, "num_orders",
Types.IntegerType.get()),
Types.NestedField.required(
- 4, "total_amount",
Types.IntegerType.get()),
+ 5, "total_amount",
Types.IntegerType.get()),
Types.NestedField.required(
- 5, BUCKET_COLUMN_NAME,
Types.IntegerType.get()),
+ 6, BUCKET_COLUMN_NAME,
Types.IntegerType.get()),
Types.NestedField.required(
- 6, OFFSET_COLUMN_NAME,
Types.LongType.get()),
+ 7, OFFSET_COLUMN_NAME,
Types.LongType.get()),
Types.NestedField.required(
- 7, TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone())),
+ 8, TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone())),
identifierFieldIds);
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
@@ -197,7 +206,7 @@ class IcebergLakeCatalogTest {
assertThat(createdTable.spec().fields()).hasSize(2);
// first should be partitioned by the fluss partition key
PartitionField partitionField1 = createdTable.spec().fields().get(0);
- assertThat(partitionField1.name()).isEqualTo("shop_id");
+ assertThat(partitionField1.name()).isEqualTo("dt");
assertThat(partitionField1.transform().toString()).isEqualTo("identity");
assertThat(partitionField1.sourceId()).isEqualTo(1);
@@ -396,4 +405,36 @@ class IcebergLakeCatalogTest {
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Only one bucket key is supported for
Iceberg");
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testIllegalPartitionKeyType(boolean isPrimaryKeyTable) throws
Exception {
+ TablePath t1 =
+ TablePath.of(
+ "test_db",
+ isPrimaryKeyTable
+ ? "pkIllegalPartitionKeyType"
+ : "logIllegalPartitionKeyType");
+ Schema.Builder builder =
+ Schema.newBuilder()
+ .column("c0", DataTypes.STRING())
+ .column("c1", DataTypes.BOOLEAN());
+ if (isPrimaryKeyTable) {
+ builder.primaryKey("c0", "c1");
+ }
+ List<String> partitionKeys = List.of("c1");
+ TableDescriptor.Builder tableDescriptor =
+ TableDescriptor.builder()
+ .schema(builder.build())
+ .distributedBy(1, "c0")
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
+ tableDescriptor.partitionedBy(partitionKeys);
+
+ Assertions.assertThatThrownBy(
+ () -> flussIcebergCatalog.createTable(t1,
tableDescriptor.build()))
+ .isInstanceOf(InvalidTableException.class)
+ .hasMessage(
+ "Partition key only support string type for iceberg
currently. Column `c1` is not string type.");
+ }
}