This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 3a9597339a0 [FLINK-36494][table-common] Remove deprecated method Catalog#getTableFactory (#25948) 3a9597339a0 is described below commit 3a9597339a085cf2ff47b4475bfc08814f0c206c Author: Xuyang <xyzhong...@163.com> AuthorDate: Mon Jan 13 09:50:35 2025 +0800 [FLINK-36494][table-common] Remove deprecated method Catalog#getTableFactory (#25948) --- .../table/api/internal/TableEnvironmentImpl.java | 1 - .../flink/table/factories/TableFactoryUtil.java | 43 +-------- .../table/operations/TruncateTableOperation.java | 1 - .../org/apache/flink/table/catalog/Catalog.java | 15 --- .../planner/operations/DeletePushDownUtils.java | 8 +- .../operations/SqlNodeToOperationConversion.java | 3 +- .../planner/plan/FlinkCalciteCatalogReader.java | 1 - .../table/planner/delegation/PlannerBase.scala | 2 - .../plan/schema/LegacyCatalogSourceTable.scala | 1 - .../operations/DeletePushDownUtilsTest.java | 4 +- .../planner/plan/common/TableFactoryTest.scala | 101 --------------------- 11 files changed, 6 insertions(+), 174 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 3b205333963..4b60d880777 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -1016,7 +1016,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { ResolvedCatalogTable catalogTable) { if (tableConfig.get(TableConfigOptions.TABLE_RTAS_CTAS_ATOMICITY_ENABLED)) { if (!TableFactoryUtil.isLegacyConnectorOptions( - catalog, tableConfig, isStreamingMode, createTableOperation.getTableIdentifier(), diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java index 613d0822bd2..c27adbd12d4 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java @@ -23,7 +23,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableException; -import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogTable; @@ -39,12 +38,9 @@ import org.apache.flink.table.legacy.factories.TableSourceFactory; import org.apache.flink.table.legacy.sinks.TableSink; import org.apache.flink.table.legacy.sources.TableSource; -import javax.annotation.Nullable; - import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.stream.Collectors; /** Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}. */ @@ -70,7 +66,6 @@ public class TableFactoryUtil { */ @SuppressWarnings("unchecked") public static <T> TableSource<T> findAndCreateTableSource( - @Nullable Catalog catalog, ObjectIdentifier objectIdentifier, CatalogTable catalogTable, ReadableConfig configuration, @@ -78,20 +73,7 @@ public class TableFactoryUtil { TableSourceFactory.Context context = new TableSourceFactoryContextImpl( objectIdentifier, catalogTable, configuration, isTemporary); - Optional<TableFactory> factoryOptional = - catalog == null ? Optional.empty() : catalog.getTableFactory(); - if (factoryOptional.isPresent()) { - TableFactory factory = factoryOptional.get(); - if (factory instanceof TableSourceFactory) { - return ((TableSourceFactory<T>) factory).createTableSource(context); - } else { - throw new ValidationException( - "Cannot query a sink-only table. " - + "TableFactory provided by catalog must implement TableSourceFactory"); - } - } else { - return findAndCreateTableSource(context); - } + return findAndCreateTableSource(context); } /** Returns a table sink matching the context. */ @@ -113,7 +95,6 @@ public class TableFactoryUtil { */ @SuppressWarnings("unchecked") public static <T> TableSink<T> findAndCreateTableSink( - @Nullable Catalog catalog, ObjectIdentifier objectIdentifier, CatalogTable catalogTable, ReadableConfig configuration, @@ -126,30 +107,11 @@ public class TableFactoryUtil { configuration, !isStreamingMode, isTemporary); - if (catalog == null) { - return findAndCreateTableSink(context); - } else { - return createTableSinkForCatalogTable(catalog, context) - .orElseGet(() -> findAndCreateTableSink(context)); - } - } - - /** - * Creates a table sink for a {@link CatalogTable} using table factory associated with the - * catalog. - */ - public static Optional<TableSink> createTableSinkForCatalogTable( - Catalog catalog, TableSinkFactory.Context context) { - TableFactory tableFactory = catalog.getTableFactory().orElse(null); - if (tableFactory instanceof TableSinkFactory) { - return Optional.ofNullable(((TableSinkFactory) tableFactory).createTableSink(context)); - } - return Optional.empty(); + return findAndCreateTableSink(context); } /** Checks whether the {@link CatalogTable} uses legacy connector sink options. */ public static boolean isLegacyConnectorOptions( - @Nullable Catalog catalog, ReadableConfig configuration, boolean isStreamingMode, ObjectIdentifier objectIdentifier, @@ -165,7 +127,6 @@ public class TableFactoryUtil { // try to create legacy table source using the options, // some legacy factories may use the 'type' key TableFactoryUtil.findAndCreateTableSink( - catalog, objectIdentifier, catalogTable, configuration, diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java index 64c070851be..186f704b511 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/TruncateTableOperation.java @@ -67,7 +67,6 @@ public class TruncateTableOperation implements ExecutableOperation { } if (TableFactoryUtil.isLegacyConnectorOptions( - catalogManager.getCatalog(objectIdentifier.getCatalogName()).orElse(null), ctx.getTableConfig(), ctx.isStreamingMode(), tableIdentifier, diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index 84edc3679e0..f0dcc90bbc0 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -43,7 +43,6 @@ import org.apache.flink.table.expressions.Expression; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.Factory; import org.apache.flink.table.factories.FunctionDefinitionFactory; -import org.apache.flink.table.legacy.factories.TableFactory; import org.apache.flink.table.procedures.Procedure; import javax.annotation.Nullable; @@ -87,20 +86,6 @@ public interface Catalog { return Optional.empty(); } - /** - * Get an optional {@link TableFactory} instance that's responsible for generating table-related - * instances stored in this catalog, instances such as source/sink. - * - * @return an optional TableFactory instance - * @deprecated Use {@link #getFactory()} for the new factory stack. The new factory stack uses - * the new table sources and sinks defined in FLIP-95 and a slightly different discovery - * mechanism. - */ - @Deprecated - default Optional<TableFactory> getTableFactory() { - return Optional.empty(); - } - /** * Get an optional {@link FunctionDefinitionFactory} instance that's responsible for * instantiating function definitions. diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java index 6c4f72e6b0d..a7323b3fa69 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java @@ -21,7 +21,6 @@ package org.apache.flink.table.planner.operations; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.ContextResolvedTable; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -68,9 +67,7 @@ public class DeletePushDownUtils { * can't get the {@link DynamicTableSink}. */ public static Optional<DynamicTableSink> getDynamicTableSink( - ContextResolvedTable contextResolvedTable, - LogicalTableModify tableModify, - CatalogManager catalogManager) { + ContextResolvedTable contextResolvedTable, LogicalTableModify tableModify) { final FlinkContext context = ShortcutUtils.unwrapContext(tableModify.getCluster()); CatalogBaseTable catalogBaseTable = contextResolvedTable.getTable(); @@ -83,9 +80,6 @@ public class DeletePushDownUtils { // only consider the CatalogTable that doesn't use legacy connector sink option if (!contextResolvedTable.isAnonymous() && !TableFactoryUtil.isLegacyConnectorOptions( - catalogManager - .getCatalog(objectIdentifier.getCatalogName()) - .orElse(null), context.getTableConfig(), !context.isBatchMode(), objectIdentifier, diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java index 04602b91c15..2947aa504f5 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConversion.java @@ -1223,8 +1223,7 @@ public class SqlNodeToOperationConversion { catalogManager.qualifyIdentifier(unresolvedTableIdentifier)); // try push down delete Optional<DynamicTableSink> optionalDynamicTableSink = - DeletePushDownUtils.getDynamicTableSink( - contextResolvedTable, tableModify, catalogManager); + DeletePushDownUtils.getDynamicTableSink(contextResolvedTable, tableModify); if (optionalDynamicTableSink.isPresent()) { DynamicTableSink dynamicTableSink = optionalDynamicTableSink.get(); // if the table sink supports delete push down diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java index 35733aedd89..b8770d2ed90 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/FlinkCalciteCatalogReader.java @@ -235,7 +235,6 @@ public class FlinkCalciteCatalogReader extends CalciteCatalogReader { TableSchemaUtils.removeTimeAttributeFromResolvedSchema( originTable.getResolvedSchema()); TableFactoryUtil.findAndCreateTableSource( - schemaTable.getContextResolvedTable().getCatalog().orElse(null), schemaTable.getContextResolvedTable().getIdentifier(), new ResolvedCatalogTable( CatalogTable.of( diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index cea31d7f594..2254776c9a1 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -470,7 +470,6 @@ abstract class PlannerBase( if ( !contextResolvedTable.isAnonymous && TableFactoryUtil.isLegacyConnectorOptions( - catalogManager.getCatalog(objectIdentifier.getCatalogName).orElse(null), tableConfig, isStreamingMode, objectIdentifier, @@ -479,7 +478,6 @@ abstract class PlannerBase( ) ) { val tableSink = TableFactoryUtil.findAndCreateTableSink( - catalog.orNull, objectIdentifier, tableToFind, getTableConfig, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala index 8537dd47f89..b811f21cd27 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala @@ -180,7 +180,6 @@ class LegacyCatalogSourceTable[T]( TableSchemaUtils.removeTimeAttributeFromResolvedSchema( schemaTable.getContextResolvedTable.getResolvedSchema) val tableSource = TableFactoryUtil.findAndCreateTableSource( - schemaTable.getContextResolvedTable.getCatalog.orElse(null), identifier, new ResolvedCatalogTable( CatalogTable.of( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java index 09870eb9735..7aca5b69f3b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java @@ -99,7 +99,7 @@ public class DeletePushDownUtilsTest { tableId, catalog, catalogManager.resolveCatalogTable(catalogTable)); LogicalTableModify tableModify = getTableModifyFromSql("DELETE FROM t"); Optional<DynamicTableSink> optionalDynamicTableSink = - DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify, catalogManager); + DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify); // verify we can get the dynamic table sink assertThat(optionalDynamicTableSink).isPresent(); assertThat(optionalDynamicTableSink.get()) @@ -115,7 +115,7 @@ public class DeletePushDownUtilsTest { tableId, catalog, catalogManager.resolveCatalogTable(catalogTable)); tableModify = getTableModifyFromSql("DELETE FROM t1"); optionalDynamicTableSink = - DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify, catalogManager); + DeletePushDownUtils.getDynamicTableSink(resolvedTable, tableModify); // verify it should be empty since it's not an instance of DynamicTableSink but is legacy // TableSink assertThat(optionalDynamicTableSink).isEmpty(); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala deleted file mode 100644 index bd2125e975a..00000000000 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/common/TableFactoryTest.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.planner.plan.common - -import org.apache.flink.table.catalog.{GenericInMemoryCatalog, ObjectIdentifier} -import org.apache.flink.table.legacy.factories.TableFactory -import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory -import org.apache.flink.table.planner.plan.utils.TestContextTableFactory -import org.apache.flink.table.planner.utils.TableTestBase -import org.apache.flink.testutils.junit.extensions.parameterized.{ParameterizedTestExtension, Parameters} - -import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.{BeforeEach, TestTemplate} -import org.junit.jupiter.api.extension.ExtendWith - -import java.util.Optional - -@ExtendWith(Array(classOf[ParameterizedTestExtension])) -class TableFactoryTest(isBatch: Boolean) extends TableTestBase { - - private val util = if (isBatch) batchTestUtil() else streamTestUtil() - - @BeforeEach - def before(): Unit = { - // we should clean the data to avoid serialization exception due to dirty data - TestCollectionTableFactory.reset() - } - - @TestTemplate - def testTableSourceSinkFactory(): Unit = { - val factory = new TestContextTableFactory( - ObjectIdentifier.of("cat", "default", "t1"), - ObjectIdentifier.of("cat", "default", "t2"), - isBatch) - util.tableEnv.getConfig.set(TestContextTableFactory.REQUIRED_KEY, Boolean.box(true)) - util.tableEnv.registerCatalog( - "cat", - new GenericInMemoryCatalog("default") { - override def getTableFactory: Optional[TableFactory] = Optional.of(factory) - }) - util.tableEnv.useCatalog("cat") - - val sourceDDL = - """ - |create table t1( - | a int, - | b varchar, - | c as a + 1 - |) with ( - | 'connector.type' = 'filesystem', - | 'connector.path' = '/to/my/path1', - | 'format.type' = 'csv' - |) - """.stripMargin - val sinkDDL = - """ - |create table t2( - | a int, - | b as c - 1, - | c int - |) with ( - | 'connector.type' = 'filesystem', - | 'connector.path' = '/to/my/path2', - | 'format.type' = 'csv' - |) - """.stripMargin - val query = - """ - |insert into t2 - |select t1.a, t1.c from t1 - """.stripMargin - util.tableEnv.executeSql(sourceDDL) - util.tableEnv.executeSql(sinkDDL) - - util.tableEnv.explainSql(query) - assertThat(factory.hasInvokedSource).isTrue - assertThat(factory.hasInvokedSink).isTrue - } -} - -object TableFactoryTest { - @Parameters(name = "isBatch: {0}") - def parameters(): java.util.Collection[Boolean] = { - java.util.Arrays.asList(true, false) - } -}