This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new abc2d64cf [flink] Don't serialize physical column and primary keys to 
TableSchema options when table has non-physical columns and watermark (#995)
abc2d64cf is described below

commit abc2d64cf62ab712d03ba1b16d439b0b022101b7
Author: yuzelin <[email protected]>
AuthorDate: Wed Apr 26 17:09:08 2023 +0800

    [flink] Don't serialize physical column and primary keys to TableSchema 
options when table has non-physical columns and watermark (#995)
---
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 113 +++++++++-----
 .../flink/utils/FlinkCatalogPropertiesUtil.java    | 163 +++++++++++++++++++++
 .../flink/FlinkCatalogPropertiesUtilTest.java      | 153 +++++++++++++++++++
 .../apache/paimon/flink/ReadWriteTableITCase.java  | 110 ++++++++++++++
 4 files changed, 503 insertions(+), 36 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 800f8b235..40ebb18fa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -26,9 +26,12 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.table.api.Schema.UnresolvedColumn;
+import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.WatermarkSpec;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
@@ -63,12 +66,20 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
+import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
+import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
+import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
+import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
+import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount;
+import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns;
+import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeWatermarkSpec;
 
 /** Catalog for paimon. */
 public class FlinkCatalog extends AbstractCatalog {
@@ -201,13 +212,11 @@ public class FlinkCatalog extends AbstractCatalog {
         Map<String, String> options = table.getOptions();
         if (options.containsKey(CONNECTOR.key())) {
             throw new CatalogException(
-                    String.format(
-                            "Paimon Catalog only supports paimon tables ,"
-                                    + " and you don't need to specify  
'connector'= '"
-                                    + FlinkCatalogFactory.IDENTIFIER
-                                    + "' when using Paimon Catalog\n"
-                                    + " You can create TEMPORARY table instead 
if you want to create the table of other connector.",
-                            options.get(CONNECTOR.key())));
+                    "Paimon Catalog only supports paimon tables ,"
+                            + " and you don't need to specify  'connector'= '"
+                            + FlinkCatalogFactory.IDENTIFIER
+                            + "' when using Paimon Catalog\n"
+                            + " You can create TEMPORARY table instead if you 
want to create the table of other connector.");
         }
 
         // remove table path
@@ -316,35 +325,46 @@ public class FlinkCatalog extends AbstractCatalog {
     }
 
     private CatalogTableImpl toCatalogTable(Table table) {
-        TableSchema schema;
         Map<String, String> newOptions = new HashMap<>(table.options());
 
-        // try to read schema from options
-        // in the case of virtual columns and watermark
-        DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
-        tableSchemaProps.putProperties(newOptions);
-        Optional<TableSchema> optional =
-                tableSchemaProps.getOptionalTableSchema(
-                        org.apache.flink.table.descriptors.Schema.SCHEMA);
-        if (optional.isPresent()) {
-            schema = optional.get();
-
-            // remove schema from options
-            DescriptorProperties removeProperties = new 
DescriptorProperties(false);
-            removeProperties.putTableSchema(SCHEMA, schema);
-            removeProperties.asMap().keySet().forEach(newOptions::remove);
-        } else {
-            TableSchema.Builder builder = TableSchema.builder();
-            for (RowType.RowField field : 
toLogicalType(table.rowType()).getFields()) {
+        TableSchema.Builder builder = TableSchema.builder();
+
+        // add columns
+        List<RowType.RowField> rowFields = 
toLogicalType(table.rowType()).getFields();
+        int count = nonPhysicalColumnsCount(newOptions, 
table.rowType().getFieldNames());
+        int physicalColumnIndex = 0;
+        for (int i = 0; i < rowFields.size() + count; i++) {
+            String optionalName = newOptions.get(compoundKey(SCHEMA, i, NAME));
+            // to check old style physical column option
+            RowType.RowField field = rowFields.get(physicalColumnIndex);
+            if (optionalName == null || optionalName.equals(field.getName())) {
+                // build physical column from table row
                 builder.field(field.getName(), 
fromLogicalToDataType(field.getType()));
+                physicalColumnIndex++;
+            } else {
+                // build non-physical column from options
+                builder.add(deserializeNonPhysicalColumn(newOptions, i));
             }
-            if (table.primaryKeys().size() > 0) {
-                builder.primaryKey(table.primaryKeys().toArray(new String[0]));
-            }
+        }
+
+        // extract watermark information
+        if (newOptions.keySet().stream()
+                .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, 
WATERMARK)))) {
+            builder.watermark(deserializeWatermarkSpec(newOptions));
+        }
 
-            schema = builder.build();
+        // add primary keys
+        if (table.primaryKeys().size() > 0) {
+            builder.primaryKey(table.primaryKeys().toArray(new String[0]));
         }
 
+        TableSchema schema = builder.build();
+
+        // remove schema from options
+        DescriptorProperties removeProperties = new 
DescriptorProperties(false);
+        removeProperties.putTableSchema(SCHEMA, schema);
+        removeProperties.asMap().keySet().forEach(newOptions::remove);
+
         return new DataCatalogTable(
                 table, schema, table.partitionKeys(), newOptions, 
table.comment().orElse(""));
     }
@@ -361,13 +381,7 @@ public class FlinkCatalog extends AbstractCatalog {
 
         // Serialize virtual columns and watermark to the options
         // This is what Flink SQL needs, the storage itself does not need them
-        if (schema.getTableColumns().stream().anyMatch(c -> !c.isPhysical())
-                || schema.getWatermarkSpecs().size() > 0) {
-            DescriptorProperties tableSchemaProps = new 
DescriptorProperties(true);
-            tableSchemaProps.putTableSchema(
-                    org.apache.flink.table.descriptors.Schema.SCHEMA, schema);
-            options.putAll(tableSchemaProps.asMap());
-        }
+        options.putAll(columnOptions(schema));
 
         return new Schema(
                 addColumnComments(toDataType(rowType).getFields(), 
getColumnComments(catalogTable)),
@@ -394,6 +408,33 @@ public class FlinkCatalog extends AbstractCatalog {
                 .collect(Collectors.toList());
     }
 
+    /** Only reserve necessary options. */
+    private static Map<String, String> columnOptions(TableSchema schema) {
+        Map<String, String> columnOptions = new HashMap<>();
+        // field name -> index
+        final Map<String, Integer> indexMap = new HashMap<>();
+        List<TableColumn> tableColumns = schema.getTableColumns();
+        for (int i = 0; i < tableColumns.size(); i++) {
+            indexMap.put(tableColumns.get(i).getName(), i);
+        }
+
+        // non-physical columns
+        List<TableColumn> nonPhysicalColumns =
+                tableColumns.stream().filter(c -> 
!c.isPhysical()).collect(Collectors.toList());
+        if (!nonPhysicalColumns.isEmpty()) {
+            columnOptions.putAll(serializeNonPhysicalColumns(indexMap, 
nonPhysicalColumns));
+        }
+
+        // watermark
+        List<WatermarkSpec> watermarkSpecs = schema.getWatermarkSpecs();
+        if (!watermarkSpecs.isEmpty()) {
+            Preconditions.checkArgument(watermarkSpecs.size() == 1);
+            
columnOptions.putAll(serializeWatermarkSpec(watermarkSpecs.get(0)));
+        }
+
+        return columnOptions;
+    }
+
     public static Identifier toIdentifier(ObjectPath path) {
         return new Identifier(path.getDatabaseName(), path.getObjectName());
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
new file mode 100644
index 000000000..011a1a4e0
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.WatermarkSpec;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA;
+import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
+import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL;
+import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
+import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
+import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+
+/**
+ * Utilities for ser/deserializing non-physical columns and watermark 
into/from a map of string
+ * properties.
+ */
+public class FlinkCatalogPropertiesUtil {
+
+    public static Map<String, String> serializeNonPhysicalColumns(
+            Map<String, Integer> indexMap, List<TableColumn> 
nonPhysicalColumns) {
+        Map<String, String> serialized = new HashMap<>();
+        for (TableColumn c : nonPhysicalColumns) {
+            int index = indexMap.get(c.getName());
+            serialized.put(compoundKey(SCHEMA, index, NAME), c.getName());
+            serialized.put(
+                    compoundKey(SCHEMA, index, DATA_TYPE),
+                    c.getType().getLogicalType().asSerializableString());
+            if (c instanceof TableColumn.ComputedColumn) {
+                TableColumn.ComputedColumn computedColumn = 
(TableColumn.ComputedColumn) c;
+                serialized.put(compoundKey(SCHEMA, index, EXPR), 
computedColumn.getExpression());
+            } else {
+                TableColumn.MetadataColumn metadataColumn = 
(TableColumn.MetadataColumn) c;
+                serialized.put(
+                        compoundKey(SCHEMA, index, METADATA),
+                        
metadataColumn.getMetadataAlias().orElse(metadataColumn.getName()));
+                serialized.put(
+                        compoundKey(SCHEMA, index, VIRTUAL),
+                        Boolean.toString(metadataColumn.isVirtual()));
+            }
+        }
+        return serialized;
+    }
+
+    public static Map<String, String> serializeWatermarkSpec(WatermarkSpec 
watermarkSpec) {
+        Map<String, String> serializedWatermarkSpec = new HashMap<>();
+        String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
+        serializedWatermarkSpec.put(
+                compoundKey(watermarkPrefix, WATERMARK_ROWTIME),
+                watermarkSpec.getRowtimeAttribute());
+        serializedWatermarkSpec.put(
+                compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR),
+                watermarkSpec.getWatermarkExpr());
+        serializedWatermarkSpec.put(
+                compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE),
+                
watermarkSpec.getWatermarkExprOutputType().getLogicalType().asSerializableString());
+
+        return serializedWatermarkSpec;
+    }
+
+    private static final Pattern SCHEMA_COLUMN_NAME_SUFFIX = 
Pattern.compile("\\d+\\.name");
+
+    public static int nonPhysicalColumnsCount(
+            Map<String, String> tableOptions, List<String> physicalColumns) {
+        int count = 0;
+        for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
+            if (isColumnNameKey(entry.getKey()) && 
!physicalColumns.contains(entry.getValue())) {
+                count++;
+            }
+        }
+
+        return count;
+    }
+
+    private static boolean isColumnNameKey(String key) {
+        return key.startsWith(SCHEMA)
+                && 
SCHEMA_COLUMN_NAME_SUFFIX.matcher(key.substring(SCHEMA.length() + 1)).matches();
+    }
+
+    public static TableColumn deserializeNonPhysicalColumn(Map<String, String> 
options, int index) {
+        String nameKey = compoundKey(SCHEMA, index, NAME);
+        String dataTypeKey = compoundKey(SCHEMA, index, DATA_TYPE);
+        String exprKey = compoundKey(SCHEMA, index, EXPR);
+        String metadataKey = compoundKey(SCHEMA, index, METADATA);
+        String virtualKey = compoundKey(SCHEMA, index, VIRTUAL);
+
+        String name = options.get(nameKey);
+        DataType dataType =
+                TypeConversions.fromLogicalToDataType(
+                        LogicalTypeParser.parse(options.get(dataTypeKey)));
+
+        TableColumn column;
+        if (options.containsKey(exprKey)) {
+            column = TableColumn.computed(name, dataType, 
options.get(exprKey));
+        } else if (options.containsKey(metadataKey)) {
+            String metadataAlias = options.get(metadataKey);
+            boolean isVirtual = Boolean.parseBoolean(options.get(virtualKey));
+            column =
+                    metadataAlias.equals(name)
+                            ? TableColumn.metadata(name, dataType, isVirtual)
+                            : TableColumn.metadata(name, dataType, 
metadataAlias, isVirtual);
+        } else {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to build non-physical column. Current 
index is %s, options are %s",
+                            index, options));
+        }
+
+        return column;
+    }
+
+    public static WatermarkSpec deserializeWatermarkSpec(Map<String, String> 
options) {
+        String watermarkPrefixKey = compoundKey(SCHEMA, WATERMARK);
+
+        String rowtimeKey = compoundKey(watermarkPrefixKey, 0, 
WATERMARK_ROWTIME);
+        String exprKey = compoundKey(watermarkPrefixKey, 0, 
WATERMARK_STRATEGY_EXPR);
+        String dataTypeKey = compoundKey(watermarkPrefixKey, 0, 
WATERMARK_STRATEGY_DATA_TYPE);
+
+        String rowtimeAttribute = options.get(rowtimeKey);
+        String watermarkExpressionString = options.get(exprKey);
+        DataType watermarkExprOutputType =
+                TypeConversions.fromLogicalToDataType(
+                        LogicalTypeParser.parse(options.get(dataTypeKey)));
+
+        return new WatermarkSpec(
+                rowtimeAttribute, watermarkExpressionString, 
watermarkExprOutputType);
+    }
+
+    public static String compoundKey(Object... components) {
+        return 
Stream.of(components).map(Object::toString).collect(Collectors.joining("."));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
new file mode 100644
index 000000000..9268a236b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.api.WatermarkSpec;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE;
+import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
+import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA;
+import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
+import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL;
+import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
+import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
+import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
+import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link FlinkCatalogPropertiesUtil}. */
+public class FlinkCatalogPropertiesUtilTest {
+
+    @Test
+    public void testSerDeNonPhysicalColumns() {
+        Map<String, Integer> indexMap = new HashMap<>();
+        indexMap.put("comp", 2);
+        indexMap.put("meta1", 3);
+        indexMap.put("meta2", 5);
+        List<TableColumn> columns = new ArrayList<>();
+        columns.add(TableColumn.computed("comp", DataTypes.INT(), "`k` * 2"));
+        columns.add(TableColumn.metadata("meta1", DataTypes.VARCHAR(10)));
+        columns.add(TableColumn.metadata("meta2", 
DataTypes.BIGINT().notNull(), "price", true));
+
+        // validate serialization
+        Map<String, String> serialized =
+                
FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns(indexMap, columns);
+
+        Map<String, String> expected = new HashMap<>();
+        expected.put(compoundKey(SCHEMA, 2, NAME), "comp");
+        expected.put(compoundKey(SCHEMA, 2, DATA_TYPE), "INT");
+        expected.put(compoundKey(SCHEMA, 2, EXPR), "`k` * 2");
+
+        expected.put(compoundKey(SCHEMA, 3, NAME), "meta1");
+        expected.put(compoundKey(SCHEMA, 3, DATA_TYPE), "VARCHAR(10)");
+        expected.put(compoundKey(SCHEMA, 3, METADATA), "meta1");
+        expected.put(compoundKey(SCHEMA, 3, VIRTUAL), "false");
+
+        expected.put(compoundKey(SCHEMA, 5, NAME), "meta2");
+        expected.put(compoundKey(SCHEMA, 5, DATA_TYPE), "BIGINT NOT NULL");
+        expected.put(compoundKey(SCHEMA, 5, METADATA), "price");
+        expected.put(compoundKey(SCHEMA, 5, VIRTUAL), "true");
+
+        assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected);
+
+        // validate deserialization
+        List<TableColumn> deserialized = new ArrayList<>();
+        
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
 2));
+        
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
 3));
+        
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
 5));
+
+        assertThat(deserialized).isEqualTo(columns);
+
+        // validate that
+    }
+
+    @Test
+    public void testSerDeWatermarkSpec() {
+        WatermarkSpec watermarkSpec =
+                new WatermarkSpec(
+                        "test_time",
+                        "`test_time` - INTERVAL '0.001' SECOND",
+                        DataTypes.TIMESTAMP(3));
+
+        // validate serialization
+        Map<String, String> serialized =
+                
FlinkCatalogPropertiesUtil.serializeWatermarkSpec(watermarkSpec);
+
+        Map<String, String> expected = new HashMap<>();
+        String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
+        expected.put(compoundKey(watermarkPrefix, WATERMARK_ROWTIME), 
"test_time");
+        expected.put(
+                compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR),
+                "`test_time` - INTERVAL '0.001' SECOND");
+        expected.put(compoundKey(watermarkPrefix, 
WATERMARK_STRATEGY_DATA_TYPE), "TIMESTAMP(3)");
+
+        assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected);
+
+        // validate serialization
+        WatermarkSpec deserialized =
+                
FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(serialized);
+        assertThat(deserialized).isEqualTo(watermarkSpec);
+    }
+
+    @Test
+    public void testNonPhysicalColumnsCount() {
+        Map<String, String> oldStyleOptions = new HashMap<>();
+        // physical
+        oldStyleOptions.put(compoundKey(SCHEMA, 0, NAME), "phy1");
+        oldStyleOptions.put(compoundKey(SCHEMA, 0, DATA_TYPE), "INT");
+        oldStyleOptions.put(compoundKey(SCHEMA, 1, NAME), "phy2");
+        oldStyleOptions.put(compoundKey(SCHEMA, 1, DATA_TYPE), "INT NOT NULL");
+
+        // non-physical
+        oldStyleOptions.put(compoundKey(SCHEMA, 2, NAME), "comp");
+        oldStyleOptions.put(compoundKey(SCHEMA, 2, DATA_TYPE), "INT");
+        oldStyleOptions.put(compoundKey(SCHEMA, 2, EXPR), "`k` * 2");
+
+        oldStyleOptions.put(compoundKey(SCHEMA, 3, NAME), "meta1");
+        oldStyleOptions.put(compoundKey(SCHEMA, 3, DATA_TYPE), "VARCHAR(10)");
+        oldStyleOptions.put(compoundKey(SCHEMA, 3, METADATA), "meta1");
+        oldStyleOptions.put(compoundKey(SCHEMA, 3, VIRTUAL), "false");
+
+        oldStyleOptions.put(compoundKey(SCHEMA, 4, NAME), "meta2");
+        oldStyleOptions.put(compoundKey(SCHEMA, 4, DATA_TYPE), "BIGINT NOT 
NULL");
+        oldStyleOptions.put(compoundKey(SCHEMA, 4, METADATA), "price");
+        oldStyleOptions.put(compoundKey(SCHEMA, 4, VIRTUAL), "true");
+
+        // other options
+        oldStyleOptions.put("schema.unknown.name", "test");
+
+        assertThat(
+                        FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount(
+                                oldStyleOptions, Arrays.asList("phy1", 
"phy2")))
+                .isEqualTo(3);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index df09ad98e..dbc12ffe2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -23,8 +23,11 @@ import org.apache.paimon.flink.sink.FlinkTableSink;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.testutils.assertj.AssertionUtils;
+import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.api.dag.Transformation;
@@ -1366,6 +1369,90 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                         "+I[c, INT, true, null, AS `a` + `b`, null]");
     }
 
+    @Test
+    public void testCleanedSchemaOptions() {
+        String ddl =
+                "CREATE TABLE T (\n"
+                        + "id INT,\n"
+                        + "price INT,\n"
+                        + "record_time TIMESTAMP_LTZ(3) METADATA FROM 
'timestamp' VIRTUAL,\n"
+                        + "comp AS price * 2,\n"
+                        + "order_time TIMESTAMP(3),\n"
+                        + "WATERMARK FOR order_time AS order_time - INTERVAL 
'5' SECOND,\n"
+                        + "PRIMARY KEY (id) NOT ENFORCED\n"
+                        + ");";
+        bEnv.executeSql(ddl);
+
+        // validate schema options
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new Path(warehouse, 
"default.db/T"));
+        TableSchema schema = schemaManager.latest().get();
+        Map<String, String> expected = new HashMap<>();
+        // metadata column
+        expected.put("schema.2.name", "record_time");
+        expected.put("schema.2.data-type", "TIMESTAMP(3) WITH LOCAL TIME 
ZONE");
+        expected.put("schema.2.metadata", "timestamp");
+        expected.put("schema.2.virtual", "true");
+        // computed column
+        expected.put("schema.3.name", "comp");
+        expected.put("schema.3.data-type", "INT");
+        expected.put("schema.3.expr", "`price` * 2");
+        // watermark
+        expected.put("schema.watermark.0.rowtime", "order_time");
+        expected.put("schema.watermark.0.strategy.expr", "`order_time` - 
INTERVAL '5' SECOND");
+        expected.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
+
+        
assertThat(schema.options()).containsExactlyInAnyOrderEntriesOf(expected);
+
+        validateSchemaOptionResult();
+    }
+
+    @Test
+    public void testReadFromOldStyleSchemaOptions() throws Exception {
+        Map<String, String> oldStyleOptions = new HashMap<>();
+        oldStyleOptions.put("schema.0.name", "id");
+        oldStyleOptions.put("schema.0.data-type", "INT NOT NULL");
+
+        oldStyleOptions.put("schema.1.name", "price");
+        oldStyleOptions.put("schema.1.data-type", "INT");
+
+        oldStyleOptions.put("schema.2.name", "record_time");
+        oldStyleOptions.put("schema.2.data-type", "TIMESTAMP(3) WITH LOCAL 
TIME ZONE");
+        oldStyleOptions.put("schema.2.metadata", "timestamp");
+        oldStyleOptions.put("schema.2.virtual", "true");
+
+        oldStyleOptions.put("schema.3.name", "comp");
+        oldStyleOptions.put("schema.3.data-type", "INT");
+        oldStyleOptions.put("schema.3.expr", "`price` * 2");
+
+        oldStyleOptions.put("schema.4.name", "order_time");
+        oldStyleOptions.put("schema.4.data-type", "TIMESTAMP(3)");
+
+        oldStyleOptions.put("schema.watermark.0.rowtime", "order_time");
+        oldStyleOptions.put(
+                "schema.watermark.0.strategy.expr", "`order_time` - INTERVAL 
'5' SECOND");
+        oldStyleOptions.put("schema.watermark.0.strategy.data-type", 
"TIMESTAMP(3)");
+
+        oldStyleOptions.put("schema.primary-key.name", "constrain_pk");
+        oldStyleOptions.put("schema.primary-key.columns", "id");
+
+        // create corresponding table
+        Schema schema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT().notNull())
+                        .column("price", DataTypes.INT())
+                        .column("order_time", DataTypes.TIMESTAMP(3))
+                        .options(oldStyleOptions)
+                        .primaryKey("id")
+                        .build();
+
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new Path(warehouse, 
"default.db/T"));
+        schemaManager.createTable(schema);
+
+        validateSchemaOptionResult();
+    }
+
     // 
----------------------------------------------------------------------------------------------------------------
     // Tools
     // 
----------------------------------------------------------------------------------------------------------------
@@ -1483,4 +1570,27 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                                         + "Please switch to batch mode, and 
perform INSERT OVERWRITE to rescale current data layout first.",
                                 bucketNum));
     }
+
+    private void validateSchemaOptionResult() {
+        // validate columns
+        List<String> descResults =
+                CollectionUtil.iteratorToList(bEnv.executeSql("DESC 
T").collect()).stream()
+                        .map(Object::toString)
+                        .collect(Collectors.toList());
+        assertThat(descResults)
+                .isEqualTo(
+                        Arrays.asList(
+                                "+I[id, INT, false, PRI(id), null, null]",
+                                "+I[price, INT, true, null, null, null]",
+                                "+I[record_time, TIMESTAMP_LTZ(3), true, null, 
METADATA FROM 'timestamp' VIRTUAL, null]",
+                                "+I[comp, INT, true, null, AS `price` * 2, 
null]",
+                                "+I[order_time, TIMESTAMP(3), true, null, 
null, `order_time` - INTERVAL '5' SECOND]"));
+
+        // validate WITH options doesn't contains 'schema.'
+        String showResult =
+                CollectionUtil.iteratorToList(bEnv.executeSql("SHOW CREATE 
TABLE T").collect())
+                        .get(0)
+                        .toString();
+        assertThat(showResult.contains("schema.")).isFalse();
+    }
 }

Reply via email to