This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 60497e7dfddf27b25b80b913ef259010e97569aa Author: xuyang <xyzhong...@163.com> AuthorDate: Thu Jan 9 10:29:49 2025 +0800 [FLINK-36497][table] Remove deprecated method CatalogTable#toProperties Co-authored-by: shiwei10 <shiwe...@staff.weibo.com> --- docs/content.zh/docs/dev/table/catalogs.md | 4 +- docs/content/docs/dev/table/catalogs.md | 4 +- flink-python/pyflink/table/catalog.py | 8 +- .../table/api/internal/TableEnvironmentImpl.java | 6 +- .../flink/table/catalog/CatalogTableImpl.java | 118 --------------------- .../flink/table/catalog/ConnectorCatalogTable.java | 7 -- .../flink/table/factories/TableFactoryUtil.java | 7 +- .../table/operations/ddl/CreateTableOperation.java | 8 +- .../flink/table/catalog/CatalogTableImpTest.java | 97 ----------------- .../flink/table/catalog/CatalogPropertiesUtil.java | 2 +- .../apache/flink/table/catalog/CatalogTable.java | 13 --- .../flink/table/catalog/DefaultCatalogTable.java | 6 -- .../flink/table/catalog/ResolvedCatalogTable.java | 1 - .../table/legacy/factories/TableSinkFactory.java | 3 +- .../table/legacy/factories/TableSourceFactory.java | 3 +- .../table/planner/connectors/DynamicSinkUtils.java | 3 +- .../operations/SqlCreateTableConverter.java | 4 +- .../table/planner/delegation/PlannerBase.scala | 2 +- .../planner/catalog/JavaCatalogTableTest.java | 5 - .../operations/SqlDdlToOperationConverterTest.java | 8 +- .../table/planner/utils/testTableSourceSinks.scala | 4 +- 21 files changed, 35 insertions(+), 278 deletions(-) diff --git a/docs/content.zh/docs/dev/table/catalogs.md b/docs/content.zh/docs/dev/table/catalogs.md index 0c68ca85cd7..dfbad59df7b 100644 --- a/docs/content.zh/docs/dev/table/catalogs.md +++ b/docs/content.zh/docs/dev/table/catalogs.md @@ -361,13 +361,13 @@ catalog.list_databases() {{< tab "Java/Scala" >}} ```java // create table -catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false); +catalog.createTable(new ObjectPath("mydb", "mytable"), CatalogTable.newBuilder()...build(), false); // drop table catalog.dropTable(new ObjectPath("mydb", "mytable"), false); // alter table -catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false); +catalog.alterTable(new ObjectPath("mydb", "mytable"), CatalogTable.newBuilder()...build(), false); // rename table catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table"); diff --git a/docs/content/docs/dev/table/catalogs.md b/docs/content/docs/dev/table/catalogs.md index 98e324b34f5..851d4f2884a 100644 --- a/docs/content/docs/dev/table/catalogs.md +++ b/docs/content/docs/dev/table/catalogs.md @@ -365,13 +365,13 @@ catalog.list_databases() {{< tab "Java/Scala" >}} ```java // create table -catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false); +catalog.createTable(new ObjectPath("mydb", "mytable"), CatalogTable.newBuilder()...build(), false); // drop table catalog.dropTable(new ObjectPath("mydb", "mytable"), false); // alter table -catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false); +catalog.alterTable(new ObjectPath("mydb", "mytable"), CatalogTable.newBuilder()...build(), false); // rename table catalog.renameTable(new ObjectPath("mydb", "mytable"), "my_new_table"); diff --git a/flink-python/pyflink/table/catalog.py b/flink-python/pyflink/table/catalog.py index f6069a60a1b..4046650025b 100644 --- a/flink-python/pyflink/table/catalog.py +++ b/flink-python/pyflink/table/catalog.py @@ -830,8 +830,12 @@ class CatalogBaseTable(object): gateway = get_gateway() return CatalogBaseTable( - gateway.jvm.org.apache.flink.table.catalog.CatalogTableImpl( - schema._j_table_schema, partition_keys, properties, comment)) + gateway.jvm.org.apache.flink.table.catalog.CatalogTable.newBuilder() + .schema(schema._j_table_schema.toSchema()) + .comment(comment) + .partitionKeys(partition_keys) + .options(properties) + .build()) @staticmethod def create_view( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 4b60d880777..3c807a8a777 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -950,8 +950,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } Catalog catalog = catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName()); - ResolvedCatalogTable catalogTable = - catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable()); + ResolvedCatalogTable catalogTable = createTableOperation.getCatalogTable(); Optional<DynamicTableSink> stagingDynamicTableSink = getSupportsStagingDynamicTableSink(createTableOperation, catalog, catalogTable); if (stagingDynamicTableSink.isPresent()) { @@ -985,8 +984,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { ObjectIdentifier tableIdentifier = createTableOperation.getTableIdentifier(); Catalog catalog = catalogManager.getCatalogOrThrowException(tableIdentifier.getCatalogName()); - ResolvedCatalogTable catalogTable = - catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable()); + ResolvedCatalogTable catalogTable = createTableOperation.getCatalogTable(); Optional<DynamicTableSink> stagingDynamicTableSink = getSupportsStagingDynamicTableSink(createTableOperation, catalog, catalogTable); if (stagingDynamicTableSink.isPresent()) { diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java deleted file mode 100644 index 61c3ef06cab..00000000000 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.catalog; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.legacy.api.TableSchema; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import static org.apache.flink.table.legacy.descriptors.Schema.SCHEMA; - -/** - * A catalog table implementation. - * - * @deprecated Use {@link CatalogTable#newBuilder()} or a custom implementation instead. Don't - * implement against this internal class. It can lead to unintended side effects if code checks - * against this class instead of the common interface. - */ -@Deprecated -@Internal -public class CatalogTableImpl extends AbstractCatalogTable { - - public CatalogTableImpl( - TableSchema tableSchema, Map<String, String> properties, String comment) { - this(tableSchema, new ArrayList<>(), properties, comment); - } - - public CatalogTableImpl( - TableSchema tableSchema, - List<String> partitionKeys, - Map<String, String> properties, - String comment) { - super(tableSchema, partitionKeys, properties, comment); - } - - @Override - public CatalogBaseTable copy() { - return new CatalogTableImpl( - getSchema().copy(), - new ArrayList<>(getPartitionKeys()), - new HashMap<>(getOptions()), - getComment()); - } - - @Override - public Optional<String> getDescription() { - return Optional.of(getComment()); - } - - @Override - public Optional<String> getDetailedDescription() { - return Optional.of("This is a catalog table in an im-memory catalog"); - } - - @Override - public Map<String, String> toProperties() { - DescriptorProperties descriptor = new DescriptorProperties(false); - - descriptor.putTableSchema(SCHEMA, getSchema()); - descriptor.putPartitionKeys(getPartitionKeys()); - - Map<String, String> properties = new HashMap<>(getOptions()); - - descriptor.putProperties(properties); - - return descriptor.asMap(); - } - - @Override - public CatalogTable copy(Map<String, String> options) { - return new CatalogTableImpl(getSchema(), getPartitionKeys(), options, getComment()); - } - - /** Construct a {@link CatalogTableImpl} from complete properties that contains table schema. */ - public static CatalogTableImpl fromProperties(Map<String, String> properties) { - DescriptorProperties descriptorProperties = new DescriptorProperties(false); - descriptorProperties.putProperties(properties); - TableSchema tableSchema = descriptorProperties.getTableSchema(SCHEMA); - List<String> partitionKeys = descriptorProperties.getPartitionKeys(); - return new CatalogTableImpl( - tableSchema, - partitionKeys, - removeRedundant(properties, tableSchema, partitionKeys), - ""); - } - - /** Construct catalog table properties from {@link #toProperties()}. */ - public static Map<String, String> removeRedundant( - Map<String, String> properties, TableSchema schema, List<String> partitionKeys) { - Map<String, String> ret = new HashMap<>(properties); - DescriptorProperties descriptorProperties = new DescriptorProperties(false); - descriptorProperties.putTableSchema(SCHEMA, schema); - descriptorProperties.putPartitionKeys(partitionKeys); - descriptorProperties.asMap().keySet().forEach(ret::remove); - return ret; - } -} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java index 07e48035c0a..352a0dd3c95 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java @@ -93,13 +93,6 @@ public class ConnectorCatalogTable<T1, T2> extends AbstractCatalogTable { return isBatch; } - @Override - public Map<String, String> toProperties() { - // This effectively makes sure the table cannot be persisted in a catalog. - throw new UnsupportedOperationException( - "ConnectorCatalogTable cannot be converted to properties"); - } - @Override public CatalogTable copy(Map<String, String> options) { throw new UnsupportedOperationException( diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java index c27adbd12d4..3b16348a5a6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java @@ -28,6 +28,7 @@ import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.listener.CatalogModificationListener; import org.apache.flink.table.catalog.listener.CatalogModificationListenerFactory; import org.apache.flink.table.descriptors.ConnectorDescriptorValidator; @@ -52,7 +53,8 @@ public class TableFactoryUtil { public static <T> TableSource<T> findAndCreateTableSource(TableSourceFactory.Context context) { try { return TableFactoryService.find( - TableSourceFactory.class, context.getTable().toProperties()) + TableSourceFactory.class, + ((ResolvedCatalogTable) context.getTable()).toProperties()) .createTableSource(context); } catch (Throwable t) { throw new TableException("findAndCreateTableSource failed.", t); @@ -81,7 +83,8 @@ public class TableFactoryUtil { public static <T> TableSink<T> findAndCreateTableSink(TableSinkFactory.Context context) { try { return TableFactoryService.find( - TableSinkFactory.class, context.getTable().toProperties()) + TableSinkFactory.class, + ((ResolvedCatalogTable) context.getTable()).toProperties()) .createTableSink(context); } catch (Throwable t) { throw new TableException("findAndCreateTableSink failed.", t); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java index 1e0641e711c..2d3f91710e3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java @@ -21,8 +21,8 @@ package org.apache.flink.table.operations.ddl; import org.apache.flink.annotation.Internal; import org.apache.flink.table.api.internal.TableResultImpl; import org.apache.flink.table.api.internal.TableResultInternal; -import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.OperationUtils; @@ -34,13 +34,13 @@ import java.util.Map; @Internal public class CreateTableOperation implements CreateOperation { private final ObjectIdentifier tableIdentifier; - private final CatalogTable catalogTable; + private final ResolvedCatalogTable catalogTable; private final boolean ignoreIfExists; private final boolean isTemporary; public CreateTableOperation( ObjectIdentifier tableIdentifier, - CatalogTable catalogTable, + ResolvedCatalogTable catalogTable, boolean ignoreIfExists, boolean isTemporary) { this.tableIdentifier = tableIdentifier; @@ -49,7 +49,7 @@ public class CreateTableOperation implements CreateOperation { this.isTemporary = isTemporary; } - public CatalogTable getCatalogTable() { + public ResolvedCatalogTable getCatalogTable() { return catalogTable; } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTableImpTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTableImpTest.java deleted file mode 100644 index 1310e5d292e..00000000000 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogTableImpTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.catalog; - -import org.apache.flink.table.api.DataTypes; -import org.apache.flink.table.descriptors.DescriptorProperties; -import org.apache.flink.table.legacy.api.TableSchema; -import org.apache.flink.table.legacy.descriptors.Schema; - -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Test for {@link CatalogTableImpl}. */ -class CatalogTableImpTest { - private static final String TEST = "test"; - - @Test - void testToProperties() { - TableSchema schema = createTableSchema(); - Map<String, String> prop = createProperties(); - CatalogTable table = new CatalogTableImpl(schema, createPartitionKeys(), prop, TEST); - - DescriptorProperties descriptorProperties = new DescriptorProperties(false); - descriptorProperties.putProperties(table.toProperties()); - - assertThat(descriptorProperties.getTableSchema(Schema.SCHEMA)).isEqualTo(schema); - } - - @Test - void testFromProperties() { - TableSchema schema = createTableSchema(); - Map<String, String> prop = createProperties(); - CatalogTable table = new CatalogTableImpl(schema, createPartitionKeys(), prop, TEST); - - CatalogTableImpl tableFromProperties = - CatalogTableImpl.fromProperties(table.toProperties()); - - assertThat(table.getOptions()).isEqualTo(tableFromProperties.getOptions()); - assertThat(table.getPartitionKeys()).isEqualTo(tableFromProperties.getPartitionKeys()); - assertThat(table.getSchema()).isEqualTo(tableFromProperties.getSchema()); - } - - @Test - void testNullComment() { - TableSchema schema = createTableSchema(); - Map<String, String> prop = createProperties(); - CatalogTable table = new CatalogTableImpl(schema, createPartitionKeys(), prop, null); - - assertThat(table.getComment()).isEmpty(); - assertThat(table.getDescription()).isEqualTo(Optional.of("")); - } - - private static Map<String, String> createProperties() { - return new HashMap<String, String>() { - { - put("k", "v"); - put("K1", "V1"); // for test case-sensitive - } - }; - } - - private static TableSchema createTableSchema() { - return TableSchema.builder() - .field("first", DataTypes.STRING()) - .field("second", DataTypes.INT()) - .field("third", DataTypes.DOUBLE()) - .field("Fourth", DataTypes.BOOLEAN()) // for test case-sensitive - .build(); - } - - private static List<String> createPartitionKeys() { - return Arrays.asList("second", "third"); - } -} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java index fe2807ce8ee..088870a6db1 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPropertiesUtil.java @@ -236,7 +236,7 @@ public final class CatalogPropertiesUtil { .schema(schema) .comment(comment) .partitionKeys(partitionKeys) - .options(properties) + .options(options) .snapshot(snapshot) .build(); } catch (Exception e) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java index 11712d03c18..5777a1755be 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java @@ -98,19 +98,6 @@ public interface CatalogTable extends CatalogBaseTable { */ CatalogTable copy(Map<String, String> options); - /** - * Serializes this instance into a map of string-based properties. - * - * <p>Compared to the pure table options in {@link #getOptions()}, the map includes schema, - * partitioning, and other characteristics in a serialized form. - * - * @deprecated Only a {@link ResolvedCatalogTable} is serializable to properties. - */ - @Deprecated - default Map<String, String> toProperties() { - return Collections.emptyMap(); - } - /** Return the snapshot specified for the table. Return Optional.empty() if not specified. */ default Optional<Long> getSnapshot() { return Optional.empty(); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java index d2952c48030..b06e6acee26 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/DefaultCatalogTable.java @@ -127,12 +127,6 @@ public class DefaultCatalogTable implements CatalogTable { return Optional.empty(); } - @Override - public Map<String, String> toProperties() { - throw new UnsupportedOperationException( - "Only a resolved catalog table can be serialized into a map of string properties."); - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java index e52f6aa5a13..7f6aba71209 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogTable.java @@ -71,7 +71,6 @@ public final class ResolvedCatalogTable * symmetric. The framework will resolve functions and perform other validation tasks. A catalog * implementation must not deal with this during a read operation. */ - @Override public Map<String, String> toProperties() { return CatalogPropertiesUtil.serializeCatalogTable(this); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSinkFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSinkFactory.java index e3294a551d5..9a22bd50e25 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSinkFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSinkFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.legacy.sinks.TableSink; @@ -65,7 +66,7 @@ public interface TableSinkFactory<T> extends TableFactory { */ @Deprecated default TableSink<T> createTableSink(ObjectPath tablePath, CatalogTable table) { - return createTableSink(table.toProperties()); + return createTableSink(((ResolvedCatalogTable) table).toProperties()); } /** diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSourceFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSourceFactory.java index 73073f17258..b85fb6eddd5 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSourceFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/legacy/factories/TableSourceFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.legacy.sources.TableSource; @@ -65,7 +66,7 @@ public interface TableSourceFactory<T> extends TableFactory { */ @Deprecated default TableSource<T> createTableSource(ObjectPath tablePath, CatalogTable table) { - return createTableSource(table.toProperties()); + return createTableSource(((ResolvedCatalogTable) table).toProperties()); } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java index 285ec39bc52..5af511c83f3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java @@ -224,8 +224,7 @@ public final class DynamicSinkUtils { Map<String, String> staticPartitions, boolean isOverwrite, DynamicTableSink sink) { - final ResolvedCatalogTable catalogTable = - (ResolvedCatalogTable) createTableOperation.getCatalogTable(); + final ResolvedCatalogTable catalogTable = createTableOperation.getCatalogTable(); final ObjectIdentifier identifier = createTableOperation.getTableIdentifier(); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java index b45e93f7b85..56129869fc8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java @@ -84,7 +84,7 @@ class SqlCreateTableConverter { /** Convert the {@link SqlCreateTable} node. */ Operation convertCreateTable(SqlCreateTable sqlCreateTable) { - CatalogTable catalogTable = createCatalogTable(sqlCreateTable); + ResolvedCatalogTable catalogTable = createCatalogTable(sqlCreateTable); UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(sqlCreateTable.fullTableName()); @@ -188,7 +188,7 @@ class SqlCreateTableConverter { return catalogManager.resolveCatalogTable(catalogTable); } - private CatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) { + private ResolvedCatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) { final Schema sourceTableSchema; final Optional<TableDistribution> sourceTableDistribution; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index 2254776c9a1..19f50472cb6 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -528,7 +528,7 @@ abstract class PlannerBase( staticPartitions: JMap[String, String], isOverwrite: Boolean): RelNode = { val input = createRelBuilder.queryOperation(queryOperation).build() - val table = catalogManager.resolveCatalogTable(createTableOperation.getCatalogTable) + val table = createTableOperation.getCatalogTable val identifier = createTableOperation.getTableIdentifier val catalog = toScala(catalogManager.getCatalog(identifier.getCatalogName)) diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java index ea36fe66fea..d4c8d1f1aa6 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/JavaCatalogTableTest.java @@ -285,11 +285,6 @@ class JavaCatalogTableTest extends TableTestBase { return this; } - @Override - public Map<String, String> toProperties() { - return Collections.emptyMap(); - } - @Override public Map<String, String> getOptions() { Map<String, String> map = new HashMap<>(); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index 8a359d60aed..16c10a4b229 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -263,7 +263,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion Operation operation = parse(sql, planner, parser); assertThat(operation).isInstanceOf(CreateTableOperation.class); CreateTableOperation op = (CreateTableOperation) operation; - CatalogTable catalogTable = op.getCatalogTable(); + ResolvedCatalogTable catalogTable = op.getCatalogTable(); assertThat(catalogTable.getPartitionKeys()).hasSameElementsAs(Arrays.asList("a", "d")); assertThat(catalogTable.getSchema().getFieldNames()) .isEqualTo(new String[] {"a", "b", "c", "d"}); @@ -275,9 +275,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion DataTypes.INT(), DataTypes.VARCHAR(Integer.MAX_VALUE) }); - assertThat(catalogTable).isInstanceOf(ResolvedCatalogTable.class); - ResolvedCatalogTable resolvedCatalogTable = (ResolvedCatalogTable) catalogTable; - resolvedCatalogTable + catalogTable .getResolvedSchema() .getColumn(0) .ifPresent( @@ -418,7 +416,7 @@ public class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversion SqlNodeToOperationConversion.convert(planner, catalogManager, node).get(); assertThat(operation).isInstanceOf(CreateTableOperation.class); CreateTableOperation op = (CreateTableOperation) operation; - CatalogTable catalogTable = op.getCatalogTable(); + ResolvedCatalogTable catalogTable = op.getCatalogTable(); Map<String, String> properties = catalogTable.toProperties(); Map<String, String> expected = new HashMap<>(); expected.put("schema.0.name", "a"); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala index 5bcbf0b9a54..48f61d221a7 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/testTableSourceSinks.scala @@ -462,11 +462,11 @@ class TestOptionsTableFactory extends TableSourceFactory[Row] with TableSinkFact } override def createTableSource(context: TableSourceFactory.Context): TableSource[Row] = { - createPropertiesSource(context.getTable.toProperties) + createPropertiesSource(context.getTable.asInstanceOf[ResolvedCatalogTable].toProperties) } override def createTableSink(context: TableSinkFactory.Context): TableSink[Row] = { - createPropertiesSink(context.getTable.toProperties) + createPropertiesSink(context.getTable.asInstanceOf[ResolvedCatalogTable].toProperties) } }