This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 6f200fa0e [flink] Fix creating table from Flink will make all table
properties in custom properties (#1748)
6f200fa0e is described below
commit 6f200fa0eb730ab266b75498062868c60371bf22
Author: Rion Williams <[email protected]>
AuthorDate: Wed Dec 24 20:16:27 2025 -0600
[flink] Fix creating table from Flink will make all table properties in
custom properties (#1748)
---
.../apache/fluss/flink/utils/FlinkConversions.java | 30 ++++++++++++------
.../fluss/flink/catalog/FlinkCatalogITCase.java | 37 ++++++++++++++++++++--
.../fluss/flink/utils/FlinkConversionsTest.java | 37 ++++++++++++++++++++++
3 files changed, 92 insertions(+), 12 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
index c571df496..3088c9d30 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java
@@ -210,15 +210,24 @@ public class FlinkConversions {
.collect(Collectors.toList()));
// convert some flink options to fluss table configs.
- Map<String, String> properties =
convertFlinkOptionsToFlussTableProperties(flinkTableConf);
+ Map<String, String> storageProperties =
+ convertFlinkOptionsToFlussTableProperties(flinkTableConf);
- if (properties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) {
+ if (storageProperties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) {
for (String autoIncrementColumn :
-
properties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) {
+
storageProperties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) {
schemBuilder.enableAutoIncrement(autoIncrementColumn);
}
}
+ // serialize computed column and watermark spec to custom properties
+ Map<String, String> customProperties =
+ extractCustomProperties(flinkTableConf, storageProperties);
+ CatalogPropertiesUtils.serializeComputedColumns(
+ customProperties, resolvedSchema.getColumns());
+ CatalogPropertiesUtils.serializeWatermarkSpecs(
+ customProperties,
catalogBaseTable.getResolvedSchema().getWatermarkSpecs());
+
Schema schema = schemBuilder.build();
resolvedSchema.getColumns().stream()
@@ -236,12 +245,6 @@ public class FlinkConversions {
? ((ResolvedCatalogTable)
catalogBaseTable).getPartitionKeys()
: ((ResolvedCatalogMaterializedTable)
catalogBaseTable).getPartitionKeys();
- Map<String, String> customProperties = flinkTableConf.toMap();
- CatalogPropertiesUtils.serializeComputedColumns(
- customProperties, resolvedSchema.getColumns());
- CatalogPropertiesUtils.serializeWatermarkSpecs(
- customProperties,
catalogBaseTable.getResolvedSchema().getWatermarkSpecs());
-
// Set materialized table flags to fluss table custom properties
if (CatalogTableAdapter.isMaterializedTable(tableKind)) {
CatalogMaterializedTable.RefreshMode refreshMode =
@@ -283,7 +286,7 @@ public class FlinkConversions {
.partitionedBy(partitionKeys)
.distributedBy(bucketNum, bucketKey)
.comment(comment)
- .properties(properties)
+ .properties(storageProperties)
.customProperties(customProperties)
.build();
}
@@ -644,4 +647,11 @@ public class FlinkConversions {
.serializedRefreshHandler(refreshHandlerBytes);
return builder.build();
}
+
+ private static Map<String, String> extractCustomProperties(
+ Configuration allProperties, Map<String, String>
flussTableProperties) {
+ Map<String, String> customProperties = new
HashMap<>(allProperties.toMap());
+ customProperties.keySet().removeAll(flussTableProperties.keySet());
+ return customProperties;
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index bc04543a4..e544bfd1e 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -17,6 +17,9 @@
package org.apache.fluss.flink.catalog;
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
@@ -25,6 +28,7 @@ import org.apache.fluss.exception.InvalidAlterTableException;
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.testutils.FlussClusterExtension;
@@ -586,7 +590,9 @@ abstract class FlinkCatalogITCase {
+ " cost AS price * quantity,\n"
+ " order_time TIMESTAMP(3),\n"
+ " WATERMARK FOR order_time AS order_time -
INTERVAL '5' SECOND\n"
- + ") with ('k1' = 'v1')");
+ + ") with ('k1' = 'v1', 'bucket.num' = '2', "
+ + "'table.datalake.format' = 'paimon', "
+ + "'client.connect-timeout' = '120s')");
CatalogTable table =
(CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB,
"expression_test"));
Schema.Builder schemaBuilder = Schema.newBuilder();
@@ -606,9 +612,36 @@ abstract class FlinkCatalogITCase {
Map<String, String> expectedOptions = new HashMap<>();
expectedOptions.put("k1", "v1");
expectedOptions.put(BUCKET_KEY.key(), "user");
- expectedOptions.put(BUCKET_NUMBER.key(), "1");
+ expectedOptions.put(BUCKET_NUMBER.key(), "2");
expectedOptions.put("table.datalake.format", "paimon");
+ expectedOptions.put("client.connect-timeout", "120s");
assertOptionsEqual(table.getOptions(), expectedOptions);
+
+ // assert the stored table/custom configs
+ Configuration clientConfig = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+ try (Connection conn =
ConnectionFactory.createConnection(clientConfig)) {
+ Admin admin = conn.getAdmin();
+ TableInfo tableInfo =
+ admin.getTableInfo(TablePath.of(DEFAULT_DB,
"expression_test")).get();
+
+ Map<String, String> expectedTableProperties = new HashMap<>();
+ expectedTableProperties.put("table.datalake.format", "paimon");
+ expectedTableProperties.put("table.replication.factor", "1");
+
assertThat(tableInfo.getProperties().toMap()).isEqualTo(expectedTableProperties);
+
+ Map<String, String> expectedCustomProperties = new HashMap<>();
+ expectedCustomProperties.put("k1", "v1");
+ expectedCustomProperties.put("client.connect-timeout", "120s");
+ expectedCustomProperties.put(
+ "schema.watermark.0.strategy.expr", "`order_time` -
INTERVAL '5' SECOND");
+ expectedCustomProperties.put("schema.watermark.0.rowtime",
"order_time");
+
expectedCustomProperties.put("schema.watermark.0.strategy.data-type",
"TIMESTAMP(3)");
+ expectedCustomProperties.put("schema.4.name", "cost");
+ expectedCustomProperties.put("schema.4.expr", "`price` *
`quantity`");
+ expectedCustomProperties.put("schema.4.data-type", "DOUBLE");
+ expectedCustomProperties.put("bucket.num", "2");
+
assertThat(tableInfo.getCustomProperties().toMap()).isEqualTo(expectedCustomProperties);
+ }
}
@Test
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
index cafc9d8fa..b511c9050 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java
@@ -258,6 +258,43 @@ public class FlinkConversionsTest {
.containsEntry(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
}
+ @Test
+ void testTableConversionForCustomProperties() {
+ Map<String, String> options = new HashMap<>();
+ // forward table option & enum type
+ options.put(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed");
+ // forward client memory option
+ options.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(),
"64mb");
+ // forward client duration option
+ options.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
+
+ ResolvedSchema schema =
+ new ResolvedSchema(
+ Collections.singletonList(
+ Column.physical(
+ "order_id",
+
org.apache.flink.table.api.DataTypes.STRING().notNull())),
+ Collections.emptyList(),
+ null);
+ CatalogTable flinkTable =
+ CatalogTable.of(
+ Schema.newBuilder().fromResolvedSchema(schema).build(),
+ "test comment",
+ Collections.emptyList(),
+ options);
+
+ TableDescriptor flussTable =
+ FlinkConversions.toFlussTable(new
ResolvedCatalogTable(flinkTable, schema));
+
+ assertThat(flussTable.getProperties())
+ .containsEntry(ConfigOptions.TABLE_LOG_FORMAT.key(),
"indexed");
+
+ HashMap<String, String> customProperties = new HashMap<>();
+
customProperties.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(),
"64mb");
+ customProperties.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(),
"32s");
+
assertThat(flussTable.getCustomProperties()).containsExactlyEntriesOf(customProperties);
+ }
+
@Test
void testOptionConversions() {
ConfigOption<?> flinkOption =
FlinkConversions.toFlinkOption(ConfigOptions.TABLE_KV_FORMAT);