This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new abc2d64cf [flink] Don't serialize physical column and primary keys to
TableSchema options when table has non-physical columns and watermark (#995)
abc2d64cf is described below
commit abc2d64cf62ab712d03ba1b16d439b0b022101b7
Author: yuzelin <[email protected]>
AuthorDate: Wed Apr 26 17:09:08 2023 +0800
[flink] Don't serialize physical column and primary keys to TableSchema
options when table has non-physical columns and watermark (#995)
---
.../java/org/apache/paimon/flink/FlinkCatalog.java | 113 +++++++++-----
.../flink/utils/FlinkCatalogPropertiesUtil.java | 163 +++++++++++++++++++++
.../flink/FlinkCatalogPropertiesUtilTest.java | 153 +++++++++++++++++++
.../apache/paimon/flink/ReadWriteTableITCase.java | 110 ++++++++++++++
4 files changed, 503 insertions(+), 36 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 800f8b235..40ebb18fa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -26,9 +26,12 @@ import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Preconditions;
import org.apache.flink.table.api.Schema.UnresolvedColumn;
+import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
@@ -63,12 +66,20 @@ import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
+import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
+import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
+import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
+import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount;
+import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns;
+import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeWatermarkSpec;
/** Catalog for paimon. */
public class FlinkCatalog extends AbstractCatalog {
@@ -201,13 +212,11 @@ public class FlinkCatalog extends AbstractCatalog {
Map<String, String> options = table.getOptions();
if (options.containsKey(CONNECTOR.key())) {
throw new CatalogException(
- String.format(
- "Paimon Catalog only supports paimon tables ,"
- + " and you don't need to specify
'connector'= '"
- + FlinkCatalogFactory.IDENTIFIER
- + "' when using Paimon Catalog\n"
- + " You can create TEMPORARY table instead
if you want to create the table of other connector.",
- options.get(CONNECTOR.key())));
+ "Paimon Catalog only supports paimon tables ,"
+ + " and you don't need to specify 'connector'= '"
+ + FlinkCatalogFactory.IDENTIFIER
+ + "' when using Paimon Catalog\n"
+ + " You can create TEMPORARY table instead if you
want to create the table of other connector.");
}
// remove table path
@@ -316,35 +325,46 @@ public class FlinkCatalog extends AbstractCatalog {
}
private CatalogTableImpl toCatalogTable(Table table) {
- TableSchema schema;
Map<String, String> newOptions = new HashMap<>(table.options());
- // try to read schema from options
- // in the case of virtual columns and watermark
- DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
- tableSchemaProps.putProperties(newOptions);
- Optional<TableSchema> optional =
- tableSchemaProps.getOptionalTableSchema(
- org.apache.flink.table.descriptors.Schema.SCHEMA);
- if (optional.isPresent()) {
- schema = optional.get();
-
- // remove schema from options
- DescriptorProperties removeProperties = new
DescriptorProperties(false);
- removeProperties.putTableSchema(SCHEMA, schema);
- removeProperties.asMap().keySet().forEach(newOptions::remove);
- } else {
- TableSchema.Builder builder = TableSchema.builder();
- for (RowType.RowField field :
toLogicalType(table.rowType()).getFields()) {
+ TableSchema.Builder builder = TableSchema.builder();
+
+ // add columns
+ List<RowType.RowField> rowFields =
toLogicalType(table.rowType()).getFields();
+ int count = nonPhysicalColumnsCount(newOptions,
table.rowType().getFieldNames());
+ int physicalColumnIndex = 0;
+ for (int i = 0; i < rowFields.size() + count; i++) {
+ String optionalName = newOptions.get(compoundKey(SCHEMA, i, NAME));
+ // to check old style physical column option
+ RowType.RowField field = rowFields.get(physicalColumnIndex);
+ if (optionalName == null || optionalName.equals(field.getName())) {
+ // build physical column from table row
builder.field(field.getName(),
fromLogicalToDataType(field.getType()));
+ physicalColumnIndex++;
+ } else {
+ // build non-physical column from options
+ builder.add(deserializeNonPhysicalColumn(newOptions, i));
}
- if (table.primaryKeys().size() > 0) {
- builder.primaryKey(table.primaryKeys().toArray(new String[0]));
- }
+ }
+
+ // extract watermark information
+ if (newOptions.keySet().stream()
+ .anyMatch(key -> key.startsWith(compoundKey(SCHEMA,
WATERMARK)))) {
+ builder.watermark(deserializeWatermarkSpec(newOptions));
+ }
- schema = builder.build();
+ // add primary keys
+ if (table.primaryKeys().size() > 0) {
+ builder.primaryKey(table.primaryKeys().toArray(new String[0]));
}
+ TableSchema schema = builder.build();
+
+ // remove schema from options
+ DescriptorProperties removeProperties = new
DescriptorProperties(false);
+ removeProperties.putTableSchema(SCHEMA, schema);
+ removeProperties.asMap().keySet().forEach(newOptions::remove);
+
return new DataCatalogTable(
table, schema, table.partitionKeys(), newOptions,
table.comment().orElse(""));
}
@@ -361,13 +381,7 @@ public class FlinkCatalog extends AbstractCatalog {
// Serialize virtual columns and watermark to the options
// This is what Flink SQL needs, the storage itself does not need them
- if (schema.getTableColumns().stream().anyMatch(c -> !c.isPhysical())
- || schema.getWatermarkSpecs().size() > 0) {
- DescriptorProperties tableSchemaProps = new
DescriptorProperties(true);
- tableSchemaProps.putTableSchema(
- org.apache.flink.table.descriptors.Schema.SCHEMA, schema);
- options.putAll(tableSchemaProps.asMap());
- }
+ options.putAll(columnOptions(schema));
return new Schema(
addColumnComments(toDataType(rowType).getFields(),
getColumnComments(catalogTable)),
@@ -394,6 +408,33 @@ public class FlinkCatalog extends AbstractCatalog {
.collect(Collectors.toList());
}
+ /** Only reserve necessary options. */
+ private static Map<String, String> columnOptions(TableSchema schema) {
+ Map<String, String> columnOptions = new HashMap<>();
+ // field name -> index
+ final Map<String, Integer> indexMap = new HashMap<>();
+ List<TableColumn> tableColumns = schema.getTableColumns();
+ for (int i = 0; i < tableColumns.size(); i++) {
+ indexMap.put(tableColumns.get(i).getName(), i);
+ }
+
+ // non-physical columns
+ List<TableColumn> nonPhysicalColumns =
+ tableColumns.stream().filter(c ->
!c.isPhysical()).collect(Collectors.toList());
+ if (!nonPhysicalColumns.isEmpty()) {
+ columnOptions.putAll(serializeNonPhysicalColumns(indexMap,
nonPhysicalColumns));
+ }
+
+ // watermark
+ List<WatermarkSpec> watermarkSpecs = schema.getWatermarkSpecs();
+ if (!watermarkSpecs.isEmpty()) {
+ Preconditions.checkArgument(watermarkSpecs.size() == 1);
+
columnOptions.putAll(serializeWatermarkSpec(watermarkSpecs.get(0)));
+ }
+
+ return columnOptions;
+ }
+
public static Identifier toIdentifier(ObjectPath path) {
return new Identifier(path.getDatabaseName(), path.getObjectName());
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
new file mode 100644
index 000000000..011a1a4e0
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.WatermarkSpec;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA;
+import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
+import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL;
+import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
+import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
+import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * Utilities for ser/deserializing non-physical columns and watermark
into/from a map of string
+ * properties.
+ */
+public class FlinkCatalogPropertiesUtil {
+
+ public static Map<String, String> serializeNonPhysicalColumns(
+ Map<String, Integer> indexMap, List<TableColumn>
nonPhysicalColumns) {
+ Map<String, String> serialized = new HashMap<>();
+ for (TableColumn c : nonPhysicalColumns) {
+ int index = indexMap.get(c.getName());
+ serialized.put(compoundKey(SCHEMA, index, NAME), c.getName());
+ serialized.put(
+ compoundKey(SCHEMA, index, DATA_TYPE),
+ c.getType().getLogicalType().asSerializableString());
+ if (c instanceof TableColumn.ComputedColumn) {
+ TableColumn.ComputedColumn computedColumn =
(TableColumn.ComputedColumn) c;
+ serialized.put(compoundKey(SCHEMA, index, EXPR),
computedColumn.getExpression());
+ } else {
+ TableColumn.MetadataColumn metadataColumn =
(TableColumn.MetadataColumn) c;
+ serialized.put(
+ compoundKey(SCHEMA, index, METADATA),
+
metadataColumn.getMetadataAlias().orElse(metadataColumn.getName()));
+ serialized.put(
+ compoundKey(SCHEMA, index, VIRTUAL),
+ Boolean.toString(metadataColumn.isVirtual()));
+ }
+ }
+ return serialized;
+ }
+
+ public static Map<String, String> serializeWatermarkSpec(WatermarkSpec
watermarkSpec) {
+ Map<String, String> serializedWatermarkSpec = new HashMap<>();
+ String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
+ serializedWatermarkSpec.put(
+ compoundKey(watermarkPrefix, WATERMARK_ROWTIME),
+ watermarkSpec.getRowtimeAttribute());
+ serializedWatermarkSpec.put(
+ compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR),
+ watermarkSpec.getWatermarkExpr());
+ serializedWatermarkSpec.put(
+ compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE),
+
watermarkSpec.getWatermarkExprOutputType().getLogicalType().asSerializableString());
+
+ return serializedWatermarkSpec;
+ }
+
+ private static final Pattern SCHEMA_COLUMN_NAME_SUFFIX =
Pattern.compile("\\d+\\.name");
+
+ public static int nonPhysicalColumnsCount(
+ Map<String, String> tableOptions, List<String> physicalColumns) {
+ int count = 0;
+ for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
+ if (isColumnNameKey(entry.getKey()) &&
!physicalColumns.contains(entry.getValue())) {
+ count++;
+ }
+ }
+
+ return count;
+ }
+
+ private static boolean isColumnNameKey(String key) {
+ return key.startsWith(SCHEMA)
+ &&
SCHEMA_COLUMN_NAME_SUFFIX.matcher(key.substring(SCHEMA.length() + 1)).matches();
+ }
+
+ public static TableColumn deserializeNonPhysicalColumn(Map<String, String>
options, int index) {
+ String nameKey = compoundKey(SCHEMA, index, NAME);
+ String dataTypeKey = compoundKey(SCHEMA, index, DATA_TYPE);
+ String exprKey = compoundKey(SCHEMA, index, EXPR);
+ String metadataKey = compoundKey(SCHEMA, index, METADATA);
+ String virtualKey = compoundKey(SCHEMA, index, VIRTUAL);
+
+ String name = options.get(nameKey);
+ DataType dataType =
+ TypeConversions.fromLogicalToDataType(
+ LogicalTypeParser.parse(options.get(dataTypeKey)));
+
+ TableColumn column;
+ if (options.containsKey(exprKey)) {
+ column = TableColumn.computed(name, dataType,
options.get(exprKey));
+ } else if (options.containsKey(metadataKey)) {
+ String metadataAlias = options.get(metadataKey);
+ boolean isVirtual = Boolean.parseBoolean(options.get(virtualKey));
+ column =
+ metadataAlias.equals(name)
+ ? TableColumn.metadata(name, dataType, isVirtual)
+ : TableColumn.metadata(name, dataType,
metadataAlias, isVirtual);
+ } else {
+ throw new RuntimeException(
+ String.format(
+ "Failed to build non-physical column. Current
index is %s, options are %s",
+ index, options));
+ }
+
+ return column;
+ }
+
+ public static WatermarkSpec deserializeWatermarkSpec(Map<String, String>
options) {
+ String watermarkPrefixKey = compoundKey(SCHEMA, WATERMARK);
+
+ String rowtimeKey = compoundKey(watermarkPrefixKey, 0,
WATERMARK_ROWTIME);
+ String exprKey = compoundKey(watermarkPrefixKey, 0,
WATERMARK_STRATEGY_EXPR);
+ String dataTypeKey = compoundKey(watermarkPrefixKey, 0,
WATERMARK_STRATEGY_DATA_TYPE);
+
+ String rowtimeAttribute = options.get(rowtimeKey);
+ String watermarkExpressionString = options.get(exprKey);
+ DataType watermarkExprOutputType =
+ TypeConversions.fromLogicalToDataType(
+ LogicalTypeParser.parse(options.get(dataTypeKey)));
+
+ return new WatermarkSpec(
+ rowtimeAttribute, watermarkExpressionString,
watermarkExprOutputType);
+ }
+
+ public static String compoundKey(Object... components) {
+ return
Stream.of(components).map(Object::toString).collect(Collectors.joining("."));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
new file mode 100644
index 000000000..9268a236b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.WatermarkSpec;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA;
+import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
+import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL;
+import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
+import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
+import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FlinkCatalogPropertiesUtil}. */
+public class FlinkCatalogPropertiesUtilTest {
+
+ @Test
+ public void testSerDeNonPhysicalColumns() {
+ Map<String, Integer> indexMap = new HashMap<>();
+ indexMap.put("comp", 2);
+ indexMap.put("meta1", 3);
+ indexMap.put("meta2", 5);
+ List<TableColumn> columns = new ArrayList<>();
+ columns.add(TableColumn.computed("comp", DataTypes.INT(), "`k` * 2"));
+ columns.add(TableColumn.metadata("meta1", DataTypes.VARCHAR(10)));
+ columns.add(TableColumn.metadata("meta2",
DataTypes.BIGINT().notNull(), "price", true));
+
+ // validate serialization
+ Map<String, String> serialized =
+
FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns(indexMap, columns);
+
+ Map<String, String> expected = new HashMap<>();
+ expected.put(compoundKey(SCHEMA, 2, NAME), "comp");
+ expected.put(compoundKey(SCHEMA, 2, DATA_TYPE), "INT");
+ expected.put(compoundKey(SCHEMA, 2, EXPR), "`k` * 2");
+
+ expected.put(compoundKey(SCHEMA, 3, NAME), "meta1");
+ expected.put(compoundKey(SCHEMA, 3, DATA_TYPE), "VARCHAR(10)");
+ expected.put(compoundKey(SCHEMA, 3, METADATA), "meta1");
+ expected.put(compoundKey(SCHEMA, 3, VIRTUAL), "false");
+
+ expected.put(compoundKey(SCHEMA, 5, NAME), "meta2");
+ expected.put(compoundKey(SCHEMA, 5, DATA_TYPE), "BIGINT NOT NULL");
+ expected.put(compoundKey(SCHEMA, 5, METADATA), "price");
+ expected.put(compoundKey(SCHEMA, 5, VIRTUAL), "true");
+
+ assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected);
+
+ // validate deserialization
+ List<TableColumn> deserialized = new ArrayList<>();
+
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
2));
+
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
3));
+
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
5));
+
+ assertThat(deserialized).isEqualTo(columns);
+
+ // validate that
+ }
+
+ @Test
+ public void testSerDeWatermarkSpec() {
+ WatermarkSpec watermarkSpec =
+ new WatermarkSpec(
+ "test_time",
+ "`test_time` - INTERVAL '0.001' SECOND",
+ DataTypes.TIMESTAMP(3));
+
+ // validate serialization
+ Map<String, String> serialized =
+
FlinkCatalogPropertiesUtil.serializeWatermarkSpec(watermarkSpec);
+
+ Map<String, String> expected = new HashMap<>();
+ String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
+ expected.put(compoundKey(watermarkPrefix, WATERMARK_ROWTIME),
"test_time");
+ expected.put(
+ compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR),
+ "`test_time` - INTERVAL '0.001' SECOND");
+ expected.put(compoundKey(watermarkPrefix,
WATERMARK_STRATEGY_DATA_TYPE), "TIMESTAMP(3)");
+
+ assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected);
+
+ // validate serialization
+ WatermarkSpec deserialized =
+
FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(serialized);
+ assertThat(deserialized).isEqualTo(watermarkSpec);
+ }
+
+ @Test
+ public void testNonPhysicalColumnsCount() {
+ Map<String, String> oldStyleOptions = new HashMap<>();
+ // physical
+ oldStyleOptions.put(compoundKey(SCHEMA, 0, NAME), "phy1");
+ oldStyleOptions.put(compoundKey(SCHEMA, 0, DATA_TYPE), "INT");
+ oldStyleOptions.put(compoundKey(SCHEMA, 1, NAME), "phy2");
+ oldStyleOptions.put(compoundKey(SCHEMA, 1, DATA_TYPE), "INT NOT NULL");
+
+ // non-physical
+ oldStyleOptions.put(compoundKey(SCHEMA, 2, NAME), "comp");
+ oldStyleOptions.put(compoundKey(SCHEMA, 2, DATA_TYPE), "INT");
+ oldStyleOptions.put(compoundKey(SCHEMA, 2, EXPR), "`k` * 2");
+
+ oldStyleOptions.put(compoundKey(SCHEMA, 3, NAME), "meta1");
+ oldStyleOptions.put(compoundKey(SCHEMA, 3, DATA_TYPE), "VARCHAR(10)");
+ oldStyleOptions.put(compoundKey(SCHEMA, 3, METADATA), "meta1");
+ oldStyleOptions.put(compoundKey(SCHEMA, 3, VIRTUAL), "false");
+
+ oldStyleOptions.put(compoundKey(SCHEMA, 4, NAME), "meta2");
+ oldStyleOptions.put(compoundKey(SCHEMA, 4, DATA_TYPE), "BIGINT NOT
NULL");
+ oldStyleOptions.put(compoundKey(SCHEMA, 4, METADATA), "price");
+ oldStyleOptions.put(compoundKey(SCHEMA, 4, VIRTUAL), "true");
+
+ // other options
+ oldStyleOptions.put("schema.unknown.name", "test");
+
+ assertThat(
+ FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount(
+ oldStyleOptions, Arrays.asList("phy1",
"phy2")))
+ .isEqualTo(3);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index df09ad98e..dbc12ffe2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -23,8 +23,11 @@ import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.testutils.assertj.AssertionUtils;
+import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.api.dag.Transformation;
@@ -1366,6 +1369,90 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
"+I[c, INT, true, null, AS `a` + `b`, null]");
}
+ @Test
+ public void testCleanedSchemaOptions() {
+ String ddl =
+ "CREATE TABLE T (\n"
+ + "id INT,\n"
+ + "price INT,\n"
+ + "record_time TIMESTAMP_LTZ(3) METADATA FROM
'timestamp' VIRTUAL,\n"
+ + "comp AS price * 2,\n"
+ + "order_time TIMESTAMP(3),\n"
+ + "WATERMARK FOR order_time AS order_time - INTERVAL
'5' SECOND,\n"
+ + "PRIMARY KEY (id) NOT ENFORCED\n"
+ + ");";
+ bEnv.executeSql(ddl);
+
+ // validate schema options
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new Path(warehouse,
"default.db/T"));
+ TableSchema schema = schemaManager.latest().get();
+ Map<String, String> expected = new HashMap<>();
+ // metadata column
+ expected.put("schema.2.name", "record_time");
+ expected.put("schema.2.data-type", "TIMESTAMP(3) WITH LOCAL TIME
ZONE");
+ expected.put("schema.2.metadata", "timestamp");
+ expected.put("schema.2.virtual", "true");
+ // computed column
+ expected.put("schema.3.name", "comp");
+ expected.put("schema.3.data-type", "INT");
+ expected.put("schema.3.expr", "`price` * 2");
+ // watermark
+ expected.put("schema.watermark.0.rowtime", "order_time");
+ expected.put("schema.watermark.0.strategy.expr", "`order_time` -
INTERVAL '5' SECOND");
+ expected.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
+
+
assertThat(schema.options()).containsExactlyInAnyOrderEntriesOf(expected);
+
+ validateSchemaOptionResult();
+ }
+
+ @Test
+ public void testReadFromOldStyleSchemaOptions() throws Exception {
+ Map<String, String> oldStyleOptions = new HashMap<>();
+ oldStyleOptions.put("schema.0.name", "id");
+ oldStyleOptions.put("schema.0.data-type", "INT NOT NULL");
+
+ oldStyleOptions.put("schema.1.name", "price");
+ oldStyleOptions.put("schema.1.data-type", "INT");
+
+ oldStyleOptions.put("schema.2.name", "record_time");
+ oldStyleOptions.put("schema.2.data-type", "TIMESTAMP(3) WITH LOCAL
TIME ZONE");
+ oldStyleOptions.put("schema.2.metadata", "timestamp");
+ oldStyleOptions.put("schema.2.virtual", "true");
+
+ oldStyleOptions.put("schema.3.name", "comp");
+ oldStyleOptions.put("schema.3.data-type", "INT");
+ oldStyleOptions.put("schema.3.expr", "`price` * 2");
+
+ oldStyleOptions.put("schema.4.name", "order_time");
+ oldStyleOptions.put("schema.4.data-type", "TIMESTAMP(3)");
+
+ oldStyleOptions.put("schema.watermark.0.rowtime", "order_time");
+ oldStyleOptions.put(
+ "schema.watermark.0.strategy.expr", "`order_time` - INTERVAL
'5' SECOND");
+ oldStyleOptions.put("schema.watermark.0.strategy.data-type",
"TIMESTAMP(3)");
+
+ oldStyleOptions.put("schema.primary-key.name", "constrain_pk");
+ oldStyleOptions.put("schema.primary-key.columns", "id");
+
+ // create corresponding table
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT().notNull())
+ .column("price", DataTypes.INT())
+ .column("order_time", DataTypes.TIMESTAMP(3))
+ .options(oldStyleOptions)
+ .primaryKey("id")
+ .build();
+
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new Path(warehouse,
"default.db/T"));
+ schemaManager.createTable(schema);
+
+ validateSchemaOptionResult();
+ }
+
//
----------------------------------------------------------------------------------------------------------------
// Tools
//
----------------------------------------------------------------------------------------------------------------
@@ -1483,4 +1570,27 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
+ "Please switch to batch mode, and
perform INSERT OVERWRITE to rescale current data layout first.",
bucketNum));
}
+
+ private void validateSchemaOptionResult() {
+ // validate columns
+ List<String> descResults =
+ CollectionUtil.iteratorToList(bEnv.executeSql("DESC
T").collect()).stream()
+ .map(Object::toString)
+ .collect(Collectors.toList());
+ assertThat(descResults)
+ .isEqualTo(
+ Arrays.asList(
+ "+I[id, INT, false, PRI(id), null, null]",
+ "+I[price, INT, true, null, null, null]",
+ "+I[record_time, TIMESTAMP_LTZ(3), true, null,
METADATA FROM 'timestamp' VIRTUAL, null]",
+ "+I[comp, INT, true, null, AS `price` * 2,
null]",
+ "+I[order_time, TIMESTAMP(3), true, null,
null, `order_time` - INTERVAL '5' SECOND]"));
+
+ // validate WITH options doesn't contains 'schema.'
+ String showResult =
+ CollectionUtil.iteratorToList(bEnv.executeSql("SHOW CREATE
TABLE T").collect())
+ .get(0)
+ .toString();
+ assertThat(showResult.contains("schema.")).isFalse();
+ }
}