This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch revert-995-schema-option in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 26ac9b213af0d69b6b3e637a6365f3801542dcf9 Author: Jingsong Lee <[email protected]> AuthorDate: Wed Apr 26 18:23:25 2023 +0800 Revert "[flink] Don't serialize physical column and primary keys to TableSchema options when table has non-physical columns and watermark (#995)" This reverts commit abc2d64cf62ab712d03ba1b16d439b0b022101b7. --- .../java/org/apache/paimon/flink/FlinkCatalog.java | 113 +++++--------- .../flink/utils/FlinkCatalogPropertiesUtil.java | 163 --------------------- .../flink/FlinkCatalogPropertiesUtilTest.java | 153 ------------------- .../apache/paimon/flink/ReadWriteTableITCase.java | 110 -------------- 4 files changed, 36 insertions(+), 503 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java index 40ebb18fa..800f8b235 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java @@ -26,12 +26,9 @@ import org.apache.paimon.schema.SchemaChange; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataField; -import org.apache.paimon.utils.Preconditions; import org.apache.flink.table.api.Schema.UnresolvedColumn; -import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.WatermarkSpec; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; @@ -66,20 +63,12 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import static org.apache.flink.table.descriptors.DescriptorProperties.NAME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; import static org.apache.flink.table.descriptors.Schema.SCHEMA; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; import static org.apache.paimon.CoreOptions.PATH; import static org.apache.paimon.flink.LogicalTypeConversion.toDataType; import static org.apache.paimon.flink.LogicalTypeConversion.toLogicalType; -import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey; -import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn; -import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.deserializeWatermarkSpec; -import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount; -import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns; -import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.serializeWatermarkSpec; /** Catalog for paimon. */ public class FlinkCatalog extends AbstractCatalog { @@ -212,11 +201,13 @@ public class FlinkCatalog extends AbstractCatalog { Map<String, String> options = table.getOptions(); if (options.containsKey(CONNECTOR.key())) { throw new CatalogException( - "Paimon Catalog only supports paimon tables ," - + " and you don't need to specify 'connector'= '" - + FlinkCatalogFactory.IDENTIFIER - + "' when using Paimon Catalog\n" - + " You can create TEMPORARY table instead if you want to create the table of other connector."); + String.format( + "Paimon Catalog only supports paimon tables ," + + " and you don't need to specify 'connector'= '" + + FlinkCatalogFactory.IDENTIFIER + + "' when using Paimon Catalog\n" + + " You can create TEMPORARY table instead if you want to create the table of other connector.", + options.get(CONNECTOR.key()))); } // remove table path @@ -325,46 +316,35 @@ public class FlinkCatalog extends AbstractCatalog { } private CatalogTableImpl toCatalogTable(Table table) { + TableSchema schema; Map<String, String> newOptions = new HashMap<>(table.options()); - TableSchema.Builder builder = TableSchema.builder(); - - // add columns - List<RowType.RowField> rowFields = toLogicalType(table.rowType()).getFields(); - int count = nonPhysicalColumnsCount(newOptions, table.rowType().getFieldNames()); - int physicalColumnIndex = 0; - for (int i = 0; i < rowFields.size() + count; i++) { - String optionalName = newOptions.get(compoundKey(SCHEMA, i, NAME)); - // to check old style physical column option - RowType.RowField field = rowFields.get(physicalColumnIndex); - if (optionalName == null || optionalName.equals(field.getName())) { - // build physical column from table row + // try to read schema from options + // in the case of virtual columns and watermark + DescriptorProperties tableSchemaProps = new DescriptorProperties(true); + tableSchemaProps.putProperties(newOptions); + Optional<TableSchema> optional = + tableSchemaProps.getOptionalTableSchema( + org.apache.flink.table.descriptors.Schema.SCHEMA); + if (optional.isPresent()) { + schema = optional.get(); + + // remove schema from options + DescriptorProperties removeProperties = new DescriptorProperties(false); + removeProperties.putTableSchema(SCHEMA, schema); + removeProperties.asMap().keySet().forEach(newOptions::remove); + } else { + TableSchema.Builder builder = TableSchema.builder(); + for (RowType.RowField field : toLogicalType(table.rowType()).getFields()) { builder.field(field.getName(), fromLogicalToDataType(field.getType())); - physicalColumnIndex++; - } else { - // build non-physical column from options - builder.add(deserializeNonPhysicalColumn(newOptions, i)); } - } - - // extract watermark information - if (newOptions.keySet().stream() - .anyMatch(key -> key.startsWith(compoundKey(SCHEMA, WATERMARK)))) { - builder.watermark(deserializeWatermarkSpec(newOptions)); - } + if (table.primaryKeys().size() > 0) { + builder.primaryKey(table.primaryKeys().toArray(new String[0])); + } - // add primary keys - if (table.primaryKeys().size() > 0) { - builder.primaryKey(table.primaryKeys().toArray(new String[0])); + schema = builder.build(); } - TableSchema schema = builder.build(); - - // remove schema from options - DescriptorProperties removeProperties = new DescriptorProperties(false); - removeProperties.putTableSchema(SCHEMA, schema); - removeProperties.asMap().keySet().forEach(newOptions::remove); - return new DataCatalogTable( table, schema, table.partitionKeys(), newOptions, table.comment().orElse("")); } @@ -381,7 +361,13 @@ public class FlinkCatalog extends AbstractCatalog { // Serialize virtual columns and watermark to the options // This is what Flink SQL needs, the storage itself does not need them - options.putAll(columnOptions(schema)); + if (schema.getTableColumns().stream().anyMatch(c -> !c.isPhysical()) + || schema.getWatermarkSpecs().size() > 0) { + DescriptorProperties tableSchemaProps = new DescriptorProperties(true); + tableSchemaProps.putTableSchema( + org.apache.flink.table.descriptors.Schema.SCHEMA, schema); + options.putAll(tableSchemaProps.asMap()); + } return new Schema( addColumnComments(toDataType(rowType).getFields(), getColumnComments(catalogTable)), @@ -408,33 +394,6 @@ public class FlinkCatalog extends AbstractCatalog { .collect(Collectors.toList()); } - /** Only reserve necessary options. */ - private static Map<String, String> columnOptions(TableSchema schema) { - Map<String, String> columnOptions = new HashMap<>(); - // field name -> index - final Map<String, Integer> indexMap = new HashMap<>(); - List<TableColumn> tableColumns = schema.getTableColumns(); - for (int i = 0; i < tableColumns.size(); i++) { - indexMap.put(tableColumns.get(i).getName(), i); - } - - // non-physical columns - List<TableColumn> nonPhysicalColumns = - tableColumns.stream().filter(c -> !c.isPhysical()).collect(Collectors.toList()); - if (!nonPhysicalColumns.isEmpty()) { - columnOptions.putAll(serializeNonPhysicalColumns(indexMap, nonPhysicalColumns)); - } - - // watermark - List<WatermarkSpec> watermarkSpecs = schema.getWatermarkSpecs(); - if (!watermarkSpecs.isEmpty()) { - Preconditions.checkArgument(watermarkSpecs.size() == 1); - columnOptions.putAll(serializeWatermarkSpec(watermarkSpecs.get(0))); - } - - return columnOptions; - } - public static Identifier toIdentifier(ObjectPath path) { return new Identifier(path.getDatabaseName(), path.getObjectName()); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java deleted file mode 100644 index 011a1a4e0..000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/FlinkCatalogPropertiesUtil.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.utils; - -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.WatermarkSpec; -import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.utils.LogicalTypeParser; -import org.apache.flink.table.types.utils.TypeConversions; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import static org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR; -import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA; -import static org.apache.flink.table.descriptors.DescriptorProperties.NAME; -import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; - -/** - * Utilities for ser/deserializing non-physical columns and watermark into/from a map of string - * properties. - */ -public class FlinkCatalogPropertiesUtil { - - public static Map<String, String> serializeNonPhysicalColumns( - Map<String, Integer> indexMap, List<TableColumn> nonPhysicalColumns) { - Map<String, String> serialized = new HashMap<>(); - for (TableColumn c : nonPhysicalColumns) { - int index = indexMap.get(c.getName()); - serialized.put(compoundKey(SCHEMA, index, NAME), c.getName()); - serialized.put( - compoundKey(SCHEMA, index, DATA_TYPE), - c.getType().getLogicalType().asSerializableString()); - if (c instanceof TableColumn.ComputedColumn) { - TableColumn.ComputedColumn computedColumn = (TableColumn.ComputedColumn) c; - serialized.put(compoundKey(SCHEMA, index, EXPR), computedColumn.getExpression()); - } else { - TableColumn.MetadataColumn metadataColumn = (TableColumn.MetadataColumn) c; - serialized.put( - compoundKey(SCHEMA, index, METADATA), - metadataColumn.getMetadataAlias().orElse(metadataColumn.getName())); - serialized.put( - compoundKey(SCHEMA, index, VIRTUAL), - Boolean.toString(metadataColumn.isVirtual())); - } - } - return serialized; - } - - public static Map<String, String> serializeWatermarkSpec(WatermarkSpec watermarkSpec) { - Map<String, String> serializedWatermarkSpec = new HashMap<>(); - String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0); - serializedWatermarkSpec.put( - compoundKey(watermarkPrefix, WATERMARK_ROWTIME), - watermarkSpec.getRowtimeAttribute()); - serializedWatermarkSpec.put( - compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR), - watermarkSpec.getWatermarkExpr()); - serializedWatermarkSpec.put( - compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE), - watermarkSpec.getWatermarkExprOutputType().getLogicalType().asSerializableString()); - - return serializedWatermarkSpec; - } - - private static final Pattern SCHEMA_COLUMN_NAME_SUFFIX = Pattern.compile("\\d+\\.name"); - - public static int nonPhysicalColumnsCount( - Map<String, String> tableOptions, List<String> physicalColumns) { - int count = 0; - for (Map.Entry<String, String> entry : tableOptions.entrySet()) { - if (isColumnNameKey(entry.getKey()) && !physicalColumns.contains(entry.getValue())) { - count++; - } - } - - return count; - } - - private static boolean isColumnNameKey(String key) { - return key.startsWith(SCHEMA) - && SCHEMA_COLUMN_NAME_SUFFIX.matcher(key.substring(SCHEMA.length() + 1)).matches(); - } - - public static TableColumn deserializeNonPhysicalColumn(Map<String, String> options, int index) { - String nameKey = compoundKey(SCHEMA, index, NAME); - String dataTypeKey = compoundKey(SCHEMA, index, DATA_TYPE); - String exprKey = compoundKey(SCHEMA, index, EXPR); - String metadataKey = compoundKey(SCHEMA, index, METADATA); - String virtualKey = compoundKey(SCHEMA, index, VIRTUAL); - - String name = options.get(nameKey); - DataType dataType = - TypeConversions.fromLogicalToDataType( - LogicalTypeParser.parse(options.get(dataTypeKey))); - - TableColumn column; - if (options.containsKey(exprKey)) { - column = TableColumn.computed(name, dataType, options.get(exprKey)); - } else if (options.containsKey(metadataKey)) { - String metadataAlias = options.get(metadataKey); - boolean isVirtual = Boolean.parseBoolean(options.get(virtualKey)); - column = - metadataAlias.equals(name) - ? TableColumn.metadata(name, dataType, isVirtual) - : TableColumn.metadata(name, dataType, metadataAlias, isVirtual); - } else { - throw new RuntimeException( - String.format( - "Failed to build non-physical column. Current index is %s, options are %s", - index, options)); - } - - return column; - } - - public static WatermarkSpec deserializeWatermarkSpec(Map<String, String> options) { - String watermarkPrefixKey = compoundKey(SCHEMA, WATERMARK); - - String rowtimeKey = compoundKey(watermarkPrefixKey, 0, WATERMARK_ROWTIME); - String exprKey = compoundKey(watermarkPrefixKey, 0, WATERMARK_STRATEGY_EXPR); - String dataTypeKey = compoundKey(watermarkPrefixKey, 0, WATERMARK_STRATEGY_DATA_TYPE); - - String rowtimeAttribute = options.get(rowtimeKey); - String watermarkExpressionString = options.get(exprKey); - DataType watermarkExprOutputType = - TypeConversions.fromLogicalToDataType( - LogicalTypeParser.parse(options.get(dataTypeKey))); - - return new WatermarkSpec( - rowtimeAttribute, watermarkExpressionString, watermarkExprOutputType); - } - - public static String compoundKey(Object... components) { - return Stream.of(components).map(Object::toString).collect(Collectors.joining(".")); - } -} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java deleted file mode 100644 index 9268a236b..000000000 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogPropertiesUtilTest.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink; - -import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil; - -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.api.TableColumn; -import org.apache.flink.table.api.WatermarkSpec; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.table.descriptors.DescriptorProperties.DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR; -import static org.apache.flink.table.descriptors.DescriptorProperties.METADATA; -import static org.apache.flink.table.descriptors.DescriptorProperties.NAME; -import static org.apache.flink.table.descriptors.DescriptorProperties.VIRTUAL; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE; -import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR; -import static org.apache.flink.table.descriptors.Schema.SCHEMA; -import static org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil.compoundKey; -import static org.assertj.core.api.Assertions.assertThat; - -/** Test for {@link FlinkCatalogPropertiesUtil}. */ -public class FlinkCatalogPropertiesUtilTest { - - @Test - public void testSerDeNonPhysicalColumns() { - Map<String, Integer> indexMap = new HashMap<>(); - indexMap.put("comp", 2); - indexMap.put("meta1", 3); - indexMap.put("meta2", 5); - List<TableColumn> columns = new ArrayList<>(); - columns.add(TableColumn.computed("comp", DataTypes.INT(), "`k` * 2")); - columns.add(TableColumn.metadata("meta1", DataTypes.VARCHAR(10))); - columns.add(TableColumn.metadata("meta2", DataTypes.BIGINT().notNull(), "price", true)); - - // validate serialization - Map<String, String> serialized = - FlinkCatalogPropertiesUtil.serializeNonPhysicalColumns(indexMap, columns); - - Map<String, String> expected = new HashMap<>(); - expected.put(compoundKey(SCHEMA, 2, NAME), "comp"); - expected.put(compoundKey(SCHEMA, 2, DATA_TYPE), "INT"); - expected.put(compoundKey(SCHEMA, 2, EXPR), "`k` * 2"); - - expected.put(compoundKey(SCHEMA, 3, NAME), "meta1"); - expected.put(compoundKey(SCHEMA, 3, DATA_TYPE), "VARCHAR(10)"); - expected.put(compoundKey(SCHEMA, 3, METADATA), "meta1"); - expected.put(compoundKey(SCHEMA, 3, VIRTUAL), "false"); - - expected.put(compoundKey(SCHEMA, 5, NAME), "meta2"); - expected.put(compoundKey(SCHEMA, 5, DATA_TYPE), "BIGINT NOT NULL"); - expected.put(compoundKey(SCHEMA, 5, METADATA), "price"); - expected.put(compoundKey(SCHEMA, 5, VIRTUAL), "true"); - - assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected); - - // validate deserialization - List<TableColumn> deserialized = new ArrayList<>(); - deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 2)); - deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 3)); - deserialized.add(FlinkCatalogPropertiesUtil.deserializeNonPhysicalColumn(serialized, 5)); - - assertThat(deserialized).isEqualTo(columns); - - // validate that - } - - @Test - public void testSerDeWatermarkSpec() { - WatermarkSpec watermarkSpec = - new WatermarkSpec( - "test_time", - "`test_time` - INTERVAL '0.001' SECOND", - DataTypes.TIMESTAMP(3)); - - // validate serialization - Map<String, String> serialized = - FlinkCatalogPropertiesUtil.serializeWatermarkSpec(watermarkSpec); - - Map<String, String> expected = new HashMap<>(); - String watermarkPrefix = compoundKey(SCHEMA, WATERMARK, 0); - expected.put(compoundKey(watermarkPrefix, WATERMARK_ROWTIME), "test_time"); - expected.put( - compoundKey(watermarkPrefix, WATERMARK_STRATEGY_EXPR), - "`test_time` - INTERVAL '0.001' SECOND"); - expected.put(compoundKey(watermarkPrefix, WATERMARK_STRATEGY_DATA_TYPE), "TIMESTAMP(3)"); - - assertThat(serialized).containsExactlyInAnyOrderEntriesOf(expected); - - // validate serialization - WatermarkSpec deserialized = - FlinkCatalogPropertiesUtil.deserializeWatermarkSpec(serialized); - assertThat(deserialized).isEqualTo(watermarkSpec); - } - - @Test - public void testNonPhysicalColumnsCount() { - Map<String, String> oldStyleOptions = new HashMap<>(); - // physical - oldStyleOptions.put(compoundKey(SCHEMA, 0, NAME), "phy1"); - oldStyleOptions.put(compoundKey(SCHEMA, 0, DATA_TYPE), "INT"); - oldStyleOptions.put(compoundKey(SCHEMA, 1, NAME), "phy2"); - oldStyleOptions.put(compoundKey(SCHEMA, 1, DATA_TYPE), "INT NOT NULL"); - - // non-physical - oldStyleOptions.put(compoundKey(SCHEMA, 2, NAME), "comp"); - oldStyleOptions.put(compoundKey(SCHEMA, 2, DATA_TYPE), "INT"); - oldStyleOptions.put(compoundKey(SCHEMA, 2, EXPR), "`k` * 2"); - - oldStyleOptions.put(compoundKey(SCHEMA, 3, NAME), "meta1"); - oldStyleOptions.put(compoundKey(SCHEMA, 3, DATA_TYPE), "VARCHAR(10)"); - oldStyleOptions.put(compoundKey(SCHEMA, 3, METADATA), "meta1"); - oldStyleOptions.put(compoundKey(SCHEMA, 3, VIRTUAL), "false"); - - oldStyleOptions.put(compoundKey(SCHEMA, 4, NAME), "meta2"); - oldStyleOptions.put(compoundKey(SCHEMA, 4, DATA_TYPE), "BIGINT NOT NULL"); - oldStyleOptions.put(compoundKey(SCHEMA, 4, METADATA), "price"); - oldStyleOptions.put(compoundKey(SCHEMA, 4, VIRTUAL), "true"); - - // other options - oldStyleOptions.put("schema.unknown.name", "test"); - - assertThat( - FlinkCatalogPropertiesUtil.nonPhysicalColumnsCount( - oldStyleOptions, Arrays.asList("phy1", "phy2"))) - .isEqualTo(3); - } -} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java index dbc12ffe2..df09ad98e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java @@ -23,11 +23,8 @@ import org.apache.paimon.flink.sink.FlinkTableSink; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; -import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.schema.TableSchema; import org.apache.paimon.testutils.assertj.AssertionUtils; -import org.apache.paimon.types.DataTypes; import org.apache.paimon.utils.BlockingIterator; import org.apache.flink.api.dag.Transformation; @@ -1369,90 +1366,6 @@ public class ReadWriteTableITCase extends AbstractTestBase { "+I[c, INT, true, null, AS `a` + `b`, null]"); } - @Test - public void testCleanedSchemaOptions() { - String ddl = - "CREATE TABLE T (\n" - + "id INT,\n" - + "price INT,\n" - + "record_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,\n" - + "comp AS price * 2,\n" - + "order_time TIMESTAMP(3),\n" - + "WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND,\n" - + "PRIMARY KEY (id) NOT ENFORCED\n" - + ");"; - bEnv.executeSql(ddl); - - // validate schema options - SchemaManager schemaManager = - new SchemaManager(LocalFileIO.create(), new Path(warehouse, "default.db/T")); - TableSchema schema = schemaManager.latest().get(); - Map<String, String> expected = new HashMap<>(); - // metadata column - expected.put("schema.2.name", "record_time"); - expected.put("schema.2.data-type", "TIMESTAMP(3) WITH LOCAL TIME ZONE"); - expected.put("schema.2.metadata", "timestamp"); - expected.put("schema.2.virtual", "true"); - // computed column - expected.put("schema.3.name", "comp"); - expected.put("schema.3.data-type", "INT"); - expected.put("schema.3.expr", "`price` * 2"); - // watermark - expected.put("schema.watermark.0.rowtime", "order_time"); - expected.put("schema.watermark.0.strategy.expr", "`order_time` - INTERVAL '5' SECOND"); - expected.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)"); - - assertThat(schema.options()).containsExactlyInAnyOrderEntriesOf(expected); - - validateSchemaOptionResult(); - } - - @Test - public void testReadFromOldStyleSchemaOptions() throws Exception { - Map<String, String> oldStyleOptions = new HashMap<>(); - oldStyleOptions.put("schema.0.name", "id"); - oldStyleOptions.put("schema.0.data-type", "INT NOT NULL"); - - oldStyleOptions.put("schema.1.name", "price"); - oldStyleOptions.put("schema.1.data-type", "INT"); - - oldStyleOptions.put("schema.2.name", "record_time"); - oldStyleOptions.put("schema.2.data-type", "TIMESTAMP(3) WITH LOCAL TIME ZONE"); - oldStyleOptions.put("schema.2.metadata", "timestamp"); - oldStyleOptions.put("schema.2.virtual", "true"); - - oldStyleOptions.put("schema.3.name", "comp"); - oldStyleOptions.put("schema.3.data-type", "INT"); - oldStyleOptions.put("schema.3.expr", "`price` * 2"); - - oldStyleOptions.put("schema.4.name", "order_time"); - oldStyleOptions.put("schema.4.data-type", "TIMESTAMP(3)"); - - oldStyleOptions.put("schema.watermark.0.rowtime", "order_time"); - oldStyleOptions.put( - "schema.watermark.0.strategy.expr", "`order_time` - INTERVAL '5' SECOND"); - oldStyleOptions.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)"); - - oldStyleOptions.put("schema.primary-key.name", "constrain_pk"); - oldStyleOptions.put("schema.primary-key.columns", "id"); - - // create corresponding table - Schema schema = - Schema.newBuilder() - .column("id", DataTypes.INT().notNull()) - .column("price", DataTypes.INT()) - .column("order_time", DataTypes.TIMESTAMP(3)) - .options(oldStyleOptions) - .primaryKey("id") - .build(); - - SchemaManager schemaManager = - new SchemaManager(LocalFileIO.create(), new Path(warehouse, "default.db/T")); - schemaManager.createTable(schema); - - validateSchemaOptionResult(); - } - // ---------------------------------------------------------------------------------------------------------------- // Tools // ---------------------------------------------------------------------------------------------------------------- @@ -1570,27 +1483,4 @@ public class ReadWriteTableITCase extends AbstractTestBase { + "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", bucketNum)); } - - private void validateSchemaOptionResult() { - // validate columns - List<String> descResults = - CollectionUtil.iteratorToList(bEnv.executeSql("DESC T").collect()).stream() - .map(Object::toString) - .collect(Collectors.toList()); - assertThat(descResults) - .isEqualTo( - Arrays.asList( - "+I[id, INT, false, PRI(id), null, null]", - "+I[price, INT, true, null, null, null]", - "+I[record_time, TIMESTAMP_LTZ(3), true, null, METADATA FROM 'timestamp' VIRTUAL, null]", - "+I[comp, INT, true, null, AS `price` * 2, null]", - "+I[order_time, TIMESTAMP(3), true, null, null, `order_time` - INTERVAL '5' SECOND]")); - - // validate WITH options doesn't contains 'schema.' - String showResult = - CollectionUtil.iteratorToList(bEnv.executeSql("SHOW CREATE TABLE T").collect()) - .get(0) - .toString(); - assertThat(showResult.contains("schema.")).isFalse(); - } }
