This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f200fa0e [flink] Fix creating table from Flink will make all table 
properties in custom properties (#1748)
6f200fa0e is described below

commit 6f200fa0eb730ab266b75498062868c60371bf22
Author: Rion Williams <[email protected]>
AuthorDate: Wed Dec 24 20:16:27 2025 -0600

    [flink] Fix creating table from Flink will make all table properties in 
custom properties (#1748)
---
 .../apache/fluss/flink/utils/FlinkConversions.java | 30 ++++++++++++------
 .../fluss/flink/catalog/FlinkCatalogITCase.java    | 37 ++++++++++++++++++++--
 .../fluss/flink/utils/FlinkConversionsTest.java    | 37 ++++++++++++++++++++++
 3 files changed, 92 insertions(+), 12 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index c571df496..3088c9d30 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -210,15 +210,24 @@ public class FlinkConversions {
                         .collect(Collectors.toList()));
 
         // convert some flink options to fluss table configs.
-        Map<String, String> properties = 
convertFlinkOptionsToFlussTableProperties(flinkTableConf);
+        Map<String, String> storageProperties =
+                convertFlinkOptionsToFlussTableProperties(flinkTableConf);
 
-        if (properties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) {
+        if (storageProperties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) {
             for (String autoIncrementColumn :
-                    
properties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) {
+                    
storageProperties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) {
                 schemBuilder.enableAutoIncrement(autoIncrementColumn);
             }
         }
 
+        // serialize computed column and watermark spec to custom properties
+        Map<String, String> customProperties =
+                extractCustomProperties(flinkTableConf, storageProperties);
+        CatalogPropertiesUtils.serializeComputedColumns(
+                customProperties, resolvedSchema.getColumns());
+        CatalogPropertiesUtils.serializeWatermarkSpecs(
+                customProperties, 
catalogBaseTable.getResolvedSchema().getWatermarkSpecs());
+
         Schema schema = schemBuilder.build();
 
         resolvedSchema.getColumns().stream()
@@ -236,12 +245,6 @@ public class FlinkConversions {
                         ? ((ResolvedCatalogTable) 
catalogBaseTable).getPartitionKeys()
                         : ((ResolvedCatalogMaterializedTable) 
catalogBaseTable).getPartitionKeys();
 
-        Map<String, String> customProperties = flinkTableConf.toMap();
-        CatalogPropertiesUtils.serializeComputedColumns(
-                customProperties, resolvedSchema.getColumns());
-        CatalogPropertiesUtils.serializeWatermarkSpecs(
-                customProperties, 
catalogBaseTable.getResolvedSchema().getWatermarkSpecs());
-
         // Set materialized table flags to fluss table custom properties
         if (CatalogTableAdapter.isMaterializedTable(tableKind)) {
             CatalogMaterializedTable.RefreshMode refreshMode =
@@ -283,7 +286,7 @@ public class FlinkConversions {
                 .partitionedBy(partitionKeys)
                 .distributedBy(bucketNum, bucketKey)
                 .comment(comment)
-                .properties(properties)
+                .properties(storageProperties)
                 .customProperties(customProperties)
                 .build();
     }
@@ -644,4 +647,11 @@ public class FlinkConversions {
                 .serializedRefreshHandler(refreshHandlerBytes);
         return builder.build();
     }
+
+    private static Map<String, String> extractCustomProperties(
+            Configuration allProperties, Map<String, String> 
flussTableProperties) {
+        Map<String, String> customProperties = new 
HashMap<>(allProperties.toMap());
+        customProperties.keySet().removeAll(flussTableProperties.keySet());
+        return customProperties;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index bc04543a4..e544bfd1e 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -17,6 +17,9 @@
 
 package org.apache.fluss.flink.catalog;
 
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
@@ -25,6 +28,7 @@ import org.apache.fluss.exception.InvalidAlterTableException;
 import org.apache.fluss.exception.InvalidConfigException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 
@@ -586,7 +590,9 @@ abstract class FlinkCatalogITCase {
                         + "    cost AS price * quantity,\n"
                         + "    order_time TIMESTAMP(3),\n"
                         + "    WATERMARK FOR order_time AS order_time - 
INTERVAL '5' SECOND\n"
-                        + ") with ('k1' = 'v1')");
+                        + ") with ('k1' = 'v1', 'bucket.num' = '2', "
+                        + "'table.datalake.format' = 'paimon', "
+                        + "'client.connect-timeout' = '120s')");
         CatalogTable table =
                 (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, 
"expression_test"));
         Schema.Builder schemaBuilder = Schema.newBuilder();
@@ -606,9 +612,36 @@ abstract class FlinkCatalogITCase {
         Map<String, String> expectedOptions = new HashMap<>();
         expectedOptions.put("k1", "v1");
         expectedOptions.put(BUCKET_KEY.key(), "user");
-        expectedOptions.put(BUCKET_NUMBER.key(), "1");
+        expectedOptions.put(BUCKET_NUMBER.key(), "2");
         expectedOptions.put("table.datalake.format", "paimon");
+        expectedOptions.put("client.connect-timeout", "120s");
         assertOptionsEqual(table.getOptions(), expectedOptions);
+
+        // assert the stored table/custom configs
+        Configuration clientConfig = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+        try (Connection conn = 
ConnectionFactory.createConnection(clientConfig)) {
+            Admin admin = conn.getAdmin();
+            TableInfo tableInfo =
+                    admin.getTableInfo(TablePath.of(DEFAULT_DB, 
"expression_test")).get();
+
+            Map<String, String> expectedTableProperties = new HashMap<>();
+            expectedTableProperties.put("table.datalake.format", "paimon");
+            expectedTableProperties.put("table.replication.factor", "1");
+            
assertThat(tableInfo.getProperties().toMap()).isEqualTo(expectedTableProperties);
+
+            Map<String, String> expectedCustomProperties = new HashMap<>();
+            expectedCustomProperties.put("k1", "v1");
+            expectedCustomProperties.put("client.connect-timeout", "120s");
+            expectedCustomProperties.put(
+                    "schema.watermark.0.strategy.expr", "`order_time` - 
INTERVAL '5' SECOND");
+            expectedCustomProperties.put("schema.watermark.0.rowtime", 
"order_time");
+            
expectedCustomProperties.put("schema.watermark.0.strategy.data-type", 
"TIMESTAMP(3)");
+            expectedCustomProperties.put("schema.4.name", "cost");
+            expectedCustomProperties.put("schema.4.expr", "`price` * 
`quantity`");
+            expectedCustomProperties.put("schema.4.data-type", "DOUBLE");
+            expectedCustomProperties.put("bucket.num", "2");
+            
assertThat(tableInfo.getCustomProperties().toMap()).isEqualTo(expectedCustomProperties);
+        }
     }
 
     @Test
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
index cafc9d8fa..b511c9050 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
@@ -258,6 +258,43 @@ public class FlinkConversionsTest {
                 
.containsEntry(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
     }
 
+    @Test
+    void testTableConversionForCustomProperties() {
+        Map<String, String> options = new HashMap<>();
+        // forward table option & enum type
+        options.put(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed");
+        // forward client memory option
+        options.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), 
"64mb");
+        // forward client duration option
+        options.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
+
+        ResolvedSchema schema =
+                new ResolvedSchema(
+                        Collections.singletonList(
+                                Column.physical(
+                                        "order_id",
+                                        
org.apache.flink.table.api.DataTypes.STRING().notNull())),
+                        Collections.emptyList(),
+                        null);
+        CatalogTable flinkTable =
+                CatalogTable.of(
+                        Schema.newBuilder().fromResolvedSchema(schema).build(),
+                        "test comment",
+                        Collections.emptyList(),
+                        options);
+
+        TableDescriptor flussTable =
+                FlinkConversions.toFlussTable(new 
ResolvedCatalogTable(flinkTable, schema));
+
+        assertThat(flussTable.getProperties())
+                .containsEntry(ConfigOptions.TABLE_LOG_FORMAT.key(), 
"indexed");
+
+        HashMap<String, String> customProperties = new HashMap<>();
+        
customProperties.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), 
"64mb");
+        customProperties.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), 
"32s");
+        
assertThat(flussTable.getCustomProperties()).containsExactlyEntriesOf(customProperties);
+    }
+
     @Test
     void testOptionConversions() {
         ConfigOption<?> flinkOption = 
FlinkConversions.toFlinkOption(ConfigOptions.TABLE_KV_FORMAT);

Reply via email to