This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch revert-995-schema-option
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 26ac9b213af0d69b6b3e637a6365f3801542dcf9
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Apr 26 18:23:25 2023 +0800

    Revert "[flink] Don't serialize physical column and primary keys to 
TableSchema options when table has non-physical columns and watermark (#995)"
    
    This reverts commit abc2d64cf62ab712d03ba1b16d439b0b022101b7.
---
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 113 +++++---------
 .../flink/utils/FlinkCatalogPropertiesUtil.java    | 163 ---------------------
 .../flink/FlinkCatalogPropertiesUtilTest.java      | 153 -------------------
 .../apache/paimon/flink/ReadWriteTableITCase.java  | 110 --------------
 4 files changed, 36 insertions(+), 503 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 40ebb18fa..800f8b235 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -26,12 +26,9 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
-import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.table.api.Schema.UnresolvedColumn;
-import org.apache.flink.table.api.TableColumn;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.WatermarkSpec;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
@@ -66,20 +63,12 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
 import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.flink.LogicalTypeConversion.toDataType;
 import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType;
-import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
-import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn;
-import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec;
-import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount;
-import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns;
-import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeWatermarkSpec;
 
 /** Catalog for paimon. */
 public class FlinkCatalog extends AbstractCatalog {
@@ -212,11 +201,13 @@ public class FlinkCatalog extends AbstractCatalog {
         Map<String, String> options = table.getOptions();
         if (options.containsKey(CONNECTOR.key())) {
             throw new CatalogException(
-                    "Paimon Catalog only supports paimon tables ,"
-                            + " and you don't need to specify  'connector'= '"
-                            + FlinkCatalogFactory.IDENTIFIER
-                            + "' when using Paimon Catalog\n"
-                            + " You can create TEMPORARY table instead if you 
want to create the table of other connector.");
+                    String.format(
+                            "Paimon Catalog only supports paimon tables ,"
+                                    + " and you don't need to specify  
'connector'= '"
+                                    + FlinkCatalogFactory.IDENTIFIER
+                                    + "' when using Paimon Catalog\n"
+                                    + " You can create TEMPORARY table instead 
if you want to create the table of other connector.",
+                            options.get(CONNECTOR.key())));
         }
 
         // remove table path
@@ -325,46 +316,35 @@ public class FlinkCatalog extends AbstractCatalog {
     }
 
     private CatalogTableImpl toCatalogTable(Table table) {
+        TableSchema schema;
         Map<String, String> newOptions = new HashMap<>(table.options());
 
-        TableSchema.Builder builder = TableSchema.builder();
-
-        // add columns
-        List<RowType.RowField> rowFields = 
toLogicalType(table.rowType()).getFields();
-        int count = nonPhysicalColumnsCount(newOptions, 
table.rowType().getFieldNames());
-        int physicalColumnIndex = 0;
-        for (int i = 0; i < rowFields.size() + count; i++) {
-            String optionalName = newOptions.get(compoundKey(SCHEMA, i, NAME));
-            // to check old style physical column option
-            RowType.RowField field = rowFields.get(physicalColumnIndex);
-            if (optionalName == null || optionalName.equals(field.getName())) {
-                // build physical column from table row
+        // try to read schema from options
+        // in the case of virtual columns and watermark
+        DescriptorProperties tableSchemaProps = new DescriptorProperties(true);
+        tableSchemaProps.putProperties(newOptions);
+        Optional<TableSchema> optional =
+                tableSchemaProps.getOptionalTableSchema(
+                        org.apache.flink.table.descriptors.Schema.SCHEMA);
+        if (optional.isPresent()) {
+            schema = optional.get();
+
+            // remove schema from options
+            DescriptorProperties removeProperties = new 
DescriptorProperties(false);
+            removeProperties.putTableSchema(SCHEMA, schema);
+            removeProperties.asMap().keySet().forEach(newOptions::remove);
+        } else {
+            TableSchema.Builder builder = TableSchema.builder();
+            for (RowType.RowField field : 
toLogicalType(table.rowType()).getFields()) {
                 builder.field(field.getName(), 
fromLogicalToDataType(field.getType()));
-                physicalColumnIndex++;
-            } else {
-                // build non-physical column from options
-                builder.add(deserializeNonPhysicalColumn(newOptions, i));
             }
-        }
-
-        // extract watermark information
-        if (newOptions.keySet().stream()
-                .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, 
WATERMARK)))) {
-            builder.watermark(deserializeWatermarkSpec(newOptions));
-        }
+            if (table.primaryKeys().size() > 0) {
+                builder.primaryKey(table.primaryKeys().toArray(new String[0]));
+            }
 
-        // add primary keys
-        if (table.primaryKeys().size() > 0) {
-            builder.primaryKey(table.primaryKeys().toArray(new String[0]));
+            schema = builder.build();
         }
 
-        TableSchema schema = builder.build();
-
-        // remove schema from options
-        DescriptorProperties removeProperties = new 
DescriptorProperties(false);
-        removeProperties.putTableSchema(SCHEMA, schema);
-        removeProperties.asMap().keySet().forEach(newOptions::remove);
-
         return new DataCatalogTable(
                 table, schema, table.partitionKeys(), newOptions, 
table.comment().orElse(""));
     }
@@ -381,7 +361,13 @@ public class FlinkCatalog extends AbstractCatalog {
 
         // Serialize virtual columns and watermark to the options
         // This is what Flink SQL needs, the storage itself does not need them
-        options.putAll(columnOptions(schema));
+        if (schema.getTableColumns().stream().anyMatch(c -> !c.isPhysical())
+                || schema.getWatermarkSpecs().size() > 0) {
+            DescriptorProperties tableSchemaProps = new 
DescriptorProperties(true);
+            tableSchemaProps.putTableSchema(
+                    org.apache.flink.table.descriptors.Schema.SCHEMA, schema);
+            options.putAll(tableSchemaProps.asMap());
+        }
 
         return new Schema(
                 addColumnComments(toDataType(rowType).getFields(), 
getColumnComments(catalogTable)),
@@ -408,33 +394,6 @@ public class FlinkCatalog extends AbstractCatalog {
                 .collect(Collectors.toList());
     }
 
-    /** Only reserve necessary options. */
-    private static Map<String, String> columnOptions(TableSchema schema) {
-        Map<String, String> columnOptions = new HashMap<>();
-        // field name -> index
-        final Map<String, Integer> indexMap = new HashMap<>();
-        List<TableColumn> tableColumns = schema.getTableColumns();
-        for (int i = 0; i < tableColumns.size(); i++) {
-            indexMap.put(tableColumns.get(i).getName(), i);
-        }
-
-        // non-physical columns
-        List<TableColumn> nonPhysicalColumns =
-                tableColumns.stream().filter(c -> 
!c.isPhysical()).collect(Collectors.toList());
-        if (!nonPhysicalColumns.isEmpty()) {
-            columnOptions.putAll(serializeNonPhysicalColumns(indexMap, 
nonPhysicalColumns));
-        }
-
-        // watermark
-        List<WatermarkSpec> watermarkSpecs = schema.getWatermarkSpecs();
-        if (!watermarkSpecs.isEmpty()) {
-            Preconditions.checkArgument(watermarkSpecs.size() == 1);
-            
columnOptions.putAll(serializeWatermarkSpec(watermarkSpecs.get(0)));
-        }
-
-        return columnOptions;
-    }
-
     public static Identifier toIdentifier(ObjectPath path) {
         return new Identifier(path.getDatabaseName(), path.getObjectName());
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
deleted file mode 100644
index 011a1a4e0..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.utils;
-
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.WatermarkSpec;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
-import org.apache.flink.table.types.utils.TypeConversions;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA;
-import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
-import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-
-/**
- * Utilities for ser/deserializing non-physical columns and watermark 
into/from a map of string
- * properties.
- */
-public class FlinkCatalogPropertiesUtil {
-
-    public static Map<String, String> serializeNonPhysicalColumns(
-            Map<String, Integer> indexMap, List<TableColumn> 
nonPhysicalColumns) {
-        Map<String, String> serialized = new HashMap<>();
-        for (TableColumn c : nonPhysicalColumns) {
-            int index = indexMap.get(c.getName());
-            serialized.put(compoundKey(SCHEMA, index, NAME), c.getName());
-            serialized.put(
-                    compoundKey(SCHEMA, index, DATA_TYPE),
-                    c.getType().getLogicalType().asSerializableString());
-            if (c instanceof TableColumn.ComputedColumn) {
-                TableColumn.ComputedColumn computedColumn = 
(TableColumn.ComputedColumn) c;
-                serialized.put(compoundKey(SCHEMA, index, EXPR), 
computedColumn.getExpression());
-            } else {
-                TableColumn.MetadataColumn metadataColumn = 
(TableColumn.MetadataColumn) c;
-                serialized.put(
-                        compoundKey(SCHEMA, index, METADATA),
-                        
metadataColumn.getMetadataAlias().orElse(metadataColumn.getName()));
-                serialized.put(
-                        compoundKey(SCHEMA, index, VIRTUAL),
-                        Boolean.toString(metadataColumn.isVirtual()));
-            }
-        }
-        return serialized;
-    }
-
-    public static Map<String, String> serializeWatermarkSpec(WatermarkSpec 
watermarkSpec) {
-        Map<String, String> serializedWatermarkSpec = new HashMap<>();
-        String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
-        serializedWatermarkSpec.put(
-                compoundKey(watermarkPrefix, WATERMARK_ROWTIME),
-                watermarkSpec.getRowtimeAttribute());
-        serializedWatermarkSpec.put(
-                compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR),
-                watermarkSpec.getWatermarkExpr());
-        serializedWatermarkSpec.put(
-                compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE),
-                
watermarkSpec.getWatermarkExprOutputType().getLogicalType().asSerializableString());
-
-        return serializedWatermarkSpec;
-    }
-
-    private static final Pattern SCHEMA_COLUMN_NAME_SUFFIX = 
Pattern.compile("\\d+\\.name");
-
-    public static int nonPhysicalColumnsCount(
-            Map<String, String> tableOptions, List<String> physicalColumns) {
-        int count = 0;
-        for (Map.Entry<String, String> entry : tableOptions.entrySet()) {
-            if (isColumnNameKey(entry.getKey()) && 
!physicalColumns.contains(entry.getValue())) {
-                count++;
-            }
-        }
-
-        return count;
-    }
-
-    private static boolean isColumnNameKey(String key) {
-        return key.startsWith(SCHEMA)
-                && 
SCHEMA_COLUMN_NAME_SUFFIX.matcher(key.substring(SCHEMA.length() + 1)).matches();
-    }
-
-    public static TableColumn deserializeNonPhysicalColumn(Map<String, String> 
options, int index) {
-        String nameKey = compoundKey(SCHEMA, index, NAME);
-        String dataTypeKey = compoundKey(SCHEMA, index, DATA_TYPE);
-        String exprKey = compoundKey(SCHEMA, index, EXPR);
-        String metadataKey = compoundKey(SCHEMA, index, METADATA);
-        String virtualKey = compoundKey(SCHEMA, index, VIRTUAL);
-
-        String name = options.get(nameKey);
-        DataType dataType =
-                TypeConversions.fromLogicalToDataType(
-                        LogicalTypeParser.parse(options.get(dataTypeKey)));
-
-        TableColumn column;
-        if (options.containsKey(exprKey)) {
-            column = TableColumn.computed(name, dataType, 
options.get(exprKey));
-        } else if (options.containsKey(metadataKey)) {
-            String metadataAlias = options.get(metadataKey);
-            boolean isVirtual = Boolean.parseBoolean(options.get(virtualKey));
-            column =
-                    metadataAlias.equals(name)
-                            ? TableColumn.metadata(name, dataType, isVirtual)
-                            : TableColumn.metadata(name, dataType, 
metadataAlias, isVirtual);
-        } else {
-            throw new RuntimeException(
-                    String.format(
-                            "Failed to build non-physical column. Current 
index is %s, options are %s",
-                            index, options));
-        }
-
-        return column;
-    }
-
-    public static WatermarkSpec deserializeWatermarkSpec(Map<String, String> 
options) {
-        String watermarkPrefixKey = compoundKey(SCHEMA, WATERMARK);
-
-        String rowtimeKey = compoundKey(watermarkPrefixKey, 0, 
WATERMARK_ROWTIME);
-        String exprKey = compoundKey(watermarkPrefixKey, 0, 
WATERMARK_STRATEGY_EXPR);
-        String dataTypeKey = compoundKey(watermarkPrefixKey, 0, 
WATERMARK_STRATEGY_DATA_TYPE);
-
-        String rowtimeAttribute = options.get(rowtimeKey);
-        String watermarkExpressionString = options.get(exprKey);
-        DataType watermarkExprOutputType =
-                TypeConversions.fromLogicalToDataType(
-                        LogicalTypeParser.parse(options.get(dataTypeKey)));
-
-        return new WatermarkSpec(
-                rowtimeAttribute, watermarkExpressionString, 
watermarkExprOutputType);
-    }
-
-    public static String compoundKey(Object... components) {
-        return 
Stream.of(components).map(Object::toString).collect(Collectors.joining("."));
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
deleted file mode 100644
index 9268a236b..000000000
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink;
-
-import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
-
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.api.TableColumn;
-import org.apache.flink.table.api.WatermarkSpec;
-import org.junit.jupiter.api.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE;
-import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
-import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA;
-import static org.apache.flink.table.descriptors.DescriptorProperties.NAME;
-import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
-import static 
org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
-import static org.apache.flink.table.descriptors.Schema.SCHEMA;
-import static 
org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link FlinkCatalogPropertiesUtil}. */
-public class FlinkCatalogPropertiesUtilTest {
-
-    @Test
-    public void testSerDeNonPhysicalColumns() {
-        Map<String, Integer> indexMap = new HashMap<>();
-        indexMap.put("comp", 2);
-        indexMap.put("meta1", 3);
-        indexMap.put("meta2", 5);
-        List<TableColumn> columns = new ArrayList<>();
-        columns.add(TableColumn.computed("comp", DataTypes.INT(), "`k` * 2"));
-        columns.add(TableColumn.metadata("meta1", DataTypes.VARCHAR(10)));
-        columns.add(TableColumn.metadata("meta2", 
DataTypes.BIGINT().notNull(), "price", true));
-
-        // validate serialization
-        Map<String, String> serialized =
-                
FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns(indexMap, columns);
-
-        Map<String, String> expected = new HashMap<>();
-        expected.put(compoundKey(SCHEMA, 2, NAME), "comp");
-        expected.put(compoundKey(SCHEMA, 2, DATA_TYPE), "INT");
-        expected.put(compoundKey(SCHEMA, 2, EXPR), "`k` * 2");
-
-        expected.put(compoundKey(SCHEMA, 3, NAME), "meta1");
-        expected.put(compoundKey(SCHEMA, 3, DATA_TYPE), "VARCHAR(10)");
-        expected.put(compoundKey(SCHEMA, 3, METADATA), "meta1");
-        expected.put(compoundKey(SCHEMA, 3, VIRTUAL), "false");
-
-        expected.put(compoundKey(SCHEMA, 5, NAME), "meta2");
-        expected.put(compoundKey(SCHEMA, 5, DATA_TYPE), "BIGINT NOT NULL");
-        expected.put(compoundKey(SCHEMA, 5, METADATA), "price");
-        expected.put(compoundKey(SCHEMA, 5, VIRTUAL), "true");
-
-        assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected);
-
-        // validate deserialization
-        List<TableColumn> deserialized = new ArrayList<>();
-        
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
 2));
-        
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
 3));
-        
deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized,
 5));
-
-        assertThat(deserialized).isEqualTo(columns);
-
-        // validate that
-    }
-
-    @Test
-    public void testSerDeWatermarkSpec() {
-        WatermarkSpec watermarkSpec =
-                new WatermarkSpec(
-                        "test_time",
-                        "`test_time` - INTERVAL '0.001' SECOND",
-                        DataTypes.TIMESTAMP(3));
-
-        // validate serialization
-        Map<String, String> serialized =
-                
FlinkCatalogPropertiesUtil.serializeWatermarkSpec(watermarkSpec);
-
-        Map<String, String> expected = new HashMap<>();
-        String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0);
-        expected.put(compoundKey(watermarkPrefix, WATERMARK_ROWTIME), 
"test_time");
-        expected.put(
-                compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR),
-                "`test_time` - INTERVAL '0.001' SECOND");
-        expected.put(compoundKey(watermarkPrefix, 
WATERMARK_STRATEGY_DATA_TYPE), "TIMESTAMP(3)");
-
-        assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected);
-
-        // validate serialization
-        WatermarkSpec deserialized =
-                
FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(serialized);
-        assertThat(deserialized).isEqualTo(watermarkSpec);
-    }
-
-    @Test
-    public void testNonPhysicalColumnsCount() {
-        Map<String, String> oldStyleOptions = new HashMap<>();
-        // physical
-        oldStyleOptions.put(compoundKey(SCHEMA, 0, NAME), "phy1");
-        oldStyleOptions.put(compoundKey(SCHEMA, 0, DATA_TYPE), "INT");
-        oldStyleOptions.put(compoundKey(SCHEMA, 1, NAME), "phy2");
-        oldStyleOptions.put(compoundKey(SCHEMA, 1, DATA_TYPE), "INT NOT NULL");
-
-        // non-physical
-        oldStyleOptions.put(compoundKey(SCHEMA, 2, NAME), "comp");
-        oldStyleOptions.put(compoundKey(SCHEMA, 2, DATA_TYPE), "INT");
-        oldStyleOptions.put(compoundKey(SCHEMA, 2, EXPR), "`k` * 2");
-
-        oldStyleOptions.put(compoundKey(SCHEMA, 3, NAME), "meta1");
-        oldStyleOptions.put(compoundKey(SCHEMA, 3, DATA_TYPE), "VARCHAR(10)");
-        oldStyleOptions.put(compoundKey(SCHEMA, 3, METADATA), "meta1");
-        oldStyleOptions.put(compoundKey(SCHEMA, 3, VIRTUAL), "false");
-
-        oldStyleOptions.put(compoundKey(SCHEMA, 4, NAME), "meta2");
-        oldStyleOptions.put(compoundKey(SCHEMA, 4, DATA_TYPE), "BIGINT NOT 
NULL");
-        oldStyleOptions.put(compoundKey(SCHEMA, 4, METADATA), "price");
-        oldStyleOptions.put(compoundKey(SCHEMA, 4, VIRTUAL), "true");
-
-        // other options
-        oldStyleOptions.put("schema.unknown.name", "test");
-
-        assertThat(
-                        FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount(
-                                oldStyleOptions, Arrays.asList("phy1", 
"phy2")))
-                .isEqualTo(3);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index dbc12ffe2..df09ad98e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -23,11 +23,8 @@ import org.apache.paimon.flink.sink.FlinkTableSink;
 import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.testutils.assertj.AssertionUtils;
-import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.BlockingIterator;
 
 import org.apache.flink.api.dag.Transformation;
@@ -1369,90 +1366,6 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                         "+I[c, INT, true, null, AS `a` + `b`, null]");
     }
 
-    @Test
-    public void testCleanedSchemaOptions() {
-        String ddl =
-                "CREATE TABLE T (\n"
-                        + "id INT,\n"
-                        + "price INT,\n"
-                        + "record_time TIMESTAMP_LTZ(3) METADATA FROM 
'timestamp' VIRTUAL,\n"
-                        + "comp AS price * 2,\n"
-                        + "order_time TIMESTAMP(3),\n"
-                        + "WATERMARK FOR order_time AS order_time - INTERVAL 
'5' SECOND,\n"
-                        + "PRIMARY KEY (id) NOT ENFORCED\n"
-                        + ");";
-        bEnv.executeSql(ddl);
-
-        // validate schema options
-        SchemaManager schemaManager =
-                new SchemaManager(LocalFileIO.create(), new Path(warehouse, 
"default.db/T"));
-        TableSchema schema = schemaManager.latest().get();
-        Map<String, String> expected = new HashMap<>();
-        // metadata column
-        expected.put("schema.2.name", "record_time");
-        expected.put("schema.2.data-type", "TIMESTAMP(3) WITH LOCAL TIME 
ZONE");
-        expected.put("schema.2.metadata", "timestamp");
-        expected.put("schema.2.virtual", "true");
-        // computed column
-        expected.put("schema.3.name", "comp");
-        expected.put("schema.3.data-type", "INT");
-        expected.put("schema.3.expr", "`price` * 2");
-        // watermark
-        expected.put("schema.watermark.0.rowtime", "order_time");
-        expected.put("schema.watermark.0.strategy.expr", "`order_time` - 
INTERVAL '5' SECOND");
-        expected.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
-
-        
assertThat(schema.options()).containsExactlyInAnyOrderEntriesOf(expected);
-
-        validateSchemaOptionResult();
-    }
-
-    @Test
-    public void testReadFromOldStyleSchemaOptions() throws Exception {
-        Map<String, String> oldStyleOptions = new HashMap<>();
-        oldStyleOptions.put("schema.0.name", "id");
-        oldStyleOptions.put("schema.0.data-type", "INT NOT NULL");
-
-        oldStyleOptions.put("schema.1.name", "price");
-        oldStyleOptions.put("schema.1.data-type", "INT");
-
-        oldStyleOptions.put("schema.2.name", "record_time");
-        oldStyleOptions.put("schema.2.data-type", "TIMESTAMP(3) WITH LOCAL 
TIME ZONE");
-        oldStyleOptions.put("schema.2.metadata", "timestamp");
-        oldStyleOptions.put("schema.2.virtual", "true");
-
-        oldStyleOptions.put("schema.3.name", "comp");
-        oldStyleOptions.put("schema.3.data-type", "INT");
-        oldStyleOptions.put("schema.3.expr", "`price` * 2");
-
-        oldStyleOptions.put("schema.4.name", "order_time");
-        oldStyleOptions.put("schema.4.data-type", "TIMESTAMP(3)");
-
-        oldStyleOptions.put("schema.watermark.0.rowtime", "order_time");
-        oldStyleOptions.put(
-                "schema.watermark.0.strategy.expr", "`order_time` - INTERVAL 
'5' SECOND");
-        oldStyleOptions.put("schema.watermark.0.strategy.data-type", 
"TIMESTAMP(3)");
-
-        oldStyleOptions.put("schema.primary-key.name", "constrain_pk");
-        oldStyleOptions.put("schema.primary-key.columns", "id");
-
-        // create corresponding table
-        Schema schema =
-                Schema.newBuilder()
-                        .column("id", DataTypes.INT().notNull())
-                        .column("price", DataTypes.INT())
-                        .column("order_time", DataTypes.TIMESTAMP(3))
-                        .options(oldStyleOptions)
-                        .primaryKey("id")
-                        .build();
-
-        SchemaManager schemaManager =
-                new SchemaManager(LocalFileIO.create(), new Path(warehouse, 
"default.db/T"));
-        schemaManager.createTable(schema);
-
-        validateSchemaOptionResult();
-    }
-
     // 
----------------------------------------------------------------------------------------------------------------
     // Tools
     // 
----------------------------------------------------------------------------------------------------------------
@@ -1570,27 +1483,4 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
                                         + "Please switch to batch mode, and 
perform INSERT OVERWRITE to rescale current data layout first.",
                                 bucketNum));
     }
-
-    private void validateSchemaOptionResult() {
-        // validate columns
-        List<String> descResults =
-                CollectionUtil.iteratorToList(bEnv.executeSql("DESC 
T").collect()).stream()
-                        .map(Object::toString)
-                        .collect(Collectors.toList());
-        assertThat(descResults)
-                .isEqualTo(
-                        Arrays.asList(
-                                "+I[id, INT, false, PRI(id), null, null]",
-                                "+I[price, INT, true, null, null, null]",
-                                "+I[record_time, TIMESTAMP_LTZ(3), true, null, 
METADATA FROM 'timestamp' VIRTUAL, null]",
-                                "+I[comp, INT, true, null, AS `price` * 2, 
null]",
-                                "+I[order_time, TIMESTAMP(3), true, null, 
null, `order_time` - INTERVAL '5' SECOND]"));
-
-        // validate WITH options doesn't contains 'schema.'
-        String showResult =
-                CollectionUtil.iteratorToList(bEnv.executeSql("SHOW CREATE 
TABLE T").collect())
-                        .get(0)
-                        .toString();
-        assertThat(showResult.contains("schema.")).isFalse();
-    }
 }

Reply via email to