This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 567b28e26b65601759c51908c10fe362e48e1a89 Author: Marios Trivyzas <mat...@gmail.com> AuthorDate: Wed Mar 9 11:10:48 2022 +0200 [FLINK-26421] Use only EnvironmentSettings to configure the environment Use `EnvironmentSettings` with the new method `withConfiguration` in its `Builder` to specify configuration options on top of the one inherited by the environment (flink-conf.yml, CLI params). The `TableConfig` contains all of the config options specified by the env (flink-conf.yml, CLI params) + all the extra configuration defined by the user app through the `EnvironmentSettings`. --- .../generated/table_config_configuration.html | 12 +++ .../flink/connectors/hive/HiveDialectITCase.java | 16 ++-- .../table/catalog/hive/HiveCatalogITCase.java | 13 +-- flink-python/pyflink/java_gateway.py | 1 + flink-python/pyflink/table/environment_settings.py | 23 +++++ .../table/tests/test_environment_settings.py | 4 +- .../table/tests/test_table_config_completeness.py | 3 +- .../client/gateway/context/ExecutionContext.java | 28 +++--- .../client/gateway/context/SessionContext.java | 3 +- .../api/bridge/java/StreamTableEnvironment.java | 9 +- .../java/internal/StreamTableEnvironmentImpl.java | 14 +-- .../flink/table/api/EnvironmentSettings.java | 104 +++++++++------------ .../org/apache/flink/table/api/TableConfig.java | 50 ++++++++-- .../flink/table/api/config/TableConfigOptions.java | 18 ++++ .../table/api/internal/TableEnvironmentImpl.java | 20 ++-- .../flink/table/api/EnvironmentSettingsTest.java | 7 +- .../flink/table/catalog/FunctionCatalogTest.java | 4 +- .../flink/table/utils/CatalogManagerMocks.java | 8 +- .../flink/table/utils/TableEnvironmentMock.java | 14 ++- .../api/bridge/scala/StreamTableEnvironment.scala | 10 +- .../internal/StreamTableEnvironmentImpl.scala | 16 ++-- .../planner/delegation/DefaultPlannerFactory.java | 9 +- .../apache/flink/table/api/EnvironmentTest.java | 28 ++++++ .../plan/nodes/exec/TransformationsTest.java | 2 +- .../exec/serde/DynamicTableSinkSpecSerdeTest.java | 20 ++-- .../serde/DynamicTableSourceSpecSerdeTest.java | 16 ++-- .../expressions/utils/ExpressionTestBase.scala | 16 ++-- .../harness/GroupAggregateHarnessTest.scala | 3 +- .../planner/runtime/harness/HarnessTestBase.scala | 21 +---- .../runtime/harness/OverAggregateHarnessTest.scala | 18 ++-- .../planner/runtime/harness/RankHarnessTest.scala | 3 +- .../harness/TableAggregateHarnessTest.scala | 3 +- .../stream/sql/StreamTableEnvironmentITCase.scala | 25 +++++ .../flink/table/planner/utils/TableTestBase.scala | 20 ++-- 34 files changed, 331 insertions(+), 230 deletions(-) diff --git a/docs/layouts/shortcodes/generated/table_config_configuration.html b/docs/layouts/shortcodes/generated/table_config_configuration.html index 53e5dbc..fe9784a 100644 --- a/docs/layouts/shortcodes/generated/table_config_configuration.html +++ b/docs/layouts/shortcodes/generated/table_config_configuration.html @@ -9,6 +9,18 @@ </thead> <tbody> <tr> + <td><h5>table.builtin-catalog-name</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">"default_catalog"</td> + <td>String</td> + <td>The name of the initial catalog to be created when instantiating a TableEnvironment.</td> + </tr> + <tr> + <td><h5>table.builtin-database-name</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">"default_database"</td> + <td>String</td> + <td>The name of the default database in the initial catalog to be created when instantiating TableEnvironment.</td> + </tr> + <tr> <td><h5>table.dml-sync</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">false</td> <td>Boolean</td> diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java index 24c6904..1aa6f50 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java @@ -25,7 +25,6 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; -import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogPartitionSpec; @@ -43,6 +42,7 @@ import org.apache.flink.table.operations.command.QuitOperation; import org.apache.flink.table.operations.command.ResetOperation; import org.apache.flink.table.operations.command.SetOperation; import org.apache.flink.table.planner.delegation.hive.HiveParser; +import org.apache.flink.table.utils.CatalogManagerMocks; import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FileUtils; @@ -89,11 +89,6 @@ import static org.junit.Assert.fail; /** Test Hive syntax when Hive dialect is used. */ public class HiveDialectITCase { - private static final String DEFAULT_BUILTIN_CATALOG = - TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(); - private static final String DEFAULT_BUILTIN_DATABASE = - TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(); - private TableEnvironment tableEnv; private HiveCatalog hiveCatalog; private String warehouse; @@ -674,17 +669,18 @@ public class HiveDialectITCase { List<Row> catalogs = CollectionUtil.iteratorToList(tableEnv.executeSql("show catalogs").collect()); assertEquals(2, catalogs.size()); - tableEnv.executeSql("use catalog " + DEFAULT_BUILTIN_CATALOG); + tableEnv.executeSql("use catalog " + CatalogManagerMocks.DEFAULT_CATALOG); List<Row> databases = CollectionUtil.iteratorToList(tableEnv.executeSql("show databases").collect()); assertEquals(1, databases.size()); - assertEquals("+I[" + DEFAULT_BUILTIN_DATABASE + "]", databases.get(0).toString()); + assertEquals( + "+I[" + CatalogManagerMocks.DEFAULT_DATABASE + "]", databases.get(0).toString()); String catalogName = tableEnv.executeSql("show current catalog").collect().next().toString(); - assertEquals("+I[" + DEFAULT_BUILTIN_CATALOG + "]", catalogName); + assertEquals("+I[" + CatalogManagerMocks.DEFAULT_CATALOG + "]", catalogName); String databaseName = tableEnv.executeSql("show current database").collect().next().toString(); - assertEquals("+I[" + DEFAULT_BUILTIN_DATABASE + "]", databaseName); + assertEquals("+I[" + CatalogManagerMocks.DEFAULT_DATABASE + "]", databaseName); } @Test diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java index ad84ede..b18e11d 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java @@ -27,7 +27,6 @@ import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.constraints.UniqueConstraint; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; @@ -42,6 +41,7 @@ import org.apache.flink.table.factories.ManagedTableFactory; import org.apache.flink.table.factories.TestManagedTableFactory; import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory; import org.apache.flink.table.types.AbstractDataType; +import org.apache.flink.table.utils.CatalogManagerMocks; import org.apache.flink.types.Row; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FileUtils; @@ -87,11 +87,6 @@ import static org.assertj.core.api.Assertions.assertThat; */ public class HiveCatalogITCase { - private static final String DEFAULT_BUILTIN_CATALOG = - TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(); - private static final String DEFAULT_BUILTIN_DATABASE = - TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(); - @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); private static HiveCatalog hiveCatalog; @@ -478,14 +473,14 @@ public class HiveCatalogITCase { tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog); tableEnv.useCatalog(hiveCatalog.getName()); tableEnv.executeSql("create table generic_table (x int) with ('connector'='COLLECTION')"); - tableEnv.useCatalog(DEFAULT_BUILTIN_CATALOG); + tableEnv.useCatalog(CatalogManagerMocks.DEFAULT_CATALOG); tableEnv.executeSql( String.format( "create table copy like `%s`.`default`.generic_table", hiveCatalog.getName())); - Catalog builtInCat = tableEnv.getCatalog(DEFAULT_BUILTIN_CATALOG).get(); + Catalog builtInCat = tableEnv.getCatalog(CatalogManagerMocks.DEFAULT_CATALOG).get(); CatalogBaseTable catalogTable = - builtInCat.getTable(new ObjectPath(DEFAULT_BUILTIN_DATABASE, "copy")); + builtInCat.getTable(new ObjectPath(CatalogManagerMocks.DEFAULT_DATABASE, "copy")); assertThat(catalogTable.getOptions()).hasSize(1); assertThat(catalogTable.getOptions()) .containsEntry(FactoryUtil.CONNECTOR.key(), "COLLECTION"); diff --git a/flink-python/pyflink/java_gateway.py b/flink-python/pyflink/java_gateway.py index b127240..fdb47a8 100644 --- a/flink-python/pyflink/java_gateway.py +++ b/flink-python/pyflink/java_gateway.py @@ -132,6 +132,7 @@ def import_flink_view(gateway): """ # Import the classes used by PyFlink java_import(gateway.jvm, "org.apache.flink.table.api.*") + java_import(gateway.jvm, "org.apache.flink.table.api.config.*") java_import(gateway.jvm, "org.apache.flink.table.api.java.*") java_import(gateway.jvm, "org.apache.flink.table.api.bridge.java.*") java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*") diff --git a/flink-python/pyflink/table/environment_settings.py b/flink-python/pyflink/table/environment_settings.py index 578567d..6819c4b 100644 --- a/flink-python/pyflink/table/environment_settings.py +++ b/flink-python/pyflink/table/environment_settings.py @@ -50,6 +50,15 @@ class EnvironmentSettings(object): gateway = get_gateway() self._j_builder = gateway.jvm.EnvironmentSettings.Builder() + def with_configuration(self, config: Configuration) -> 'EnvironmentSettings.Builder': + """ + Creates the EnvironmentSetting with specified Configuration. + + :return: EnvironmentSettings. + """ + self._j_builder = self._j_builder.withConfiguration(config._j_configuration) + return self + def in_batch_mode(self) -> 'EnvironmentSettings.Builder': """ Sets that the components should work in a batch mode. Streaming mode by default. @@ -155,9 +164,20 @@ class EnvironmentSettings(object): Convert to `pyflink.common.Configuration`. :return: Configuration with specified value. + + .. note:: Deprecated in 1.15. Please use + :func:`EnvironmentSettings.get_configuration` instead. """ return Configuration(j_configuration=self._j_environment_settings.toConfiguration()) + def get_configuration(self) -> Configuration: + """ + Get the underlying `pyflink.common.Configuration`. + + :return: Configuration with specified value. + """ + return Configuration(j_configuration=self._j_environment_settings.getConfiguration()) + @staticmethod def new_instance() -> 'EnvironmentSettings.Builder': """ @@ -173,6 +193,9 @@ class EnvironmentSettings(object): Creates the EnvironmentSetting with specified Configuration. :return: EnvironmentSettings. + + .. note:: Deprecated in 1.15. Please use + :func:`EnvironmentSettings.Builder.with_configuration` instead. """ return EnvironmentSettings( get_gateway().jvm.EnvironmentSettings.fromConfiguration(config._j_configuration)) diff --git a/flink-python/pyflink/table/tests/test_environment_settings.py b/flink-python/pyflink/table/tests/test_environment_settings.py index 8722904..1a2f1ed6 100644 --- a/flink-python/pyflink/table/tests/test_environment_settings.py +++ b/flink-python/pyflink/table/tests/test_environment_settings.py @@ -50,7 +50,7 @@ class EnvironmentSettingsTests(PyFlinkTestCase): gateway = get_gateway() - DEFAULT_BUILTIN_CATALOG = gateway.jvm.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG + DEFAULT_BUILTIN_CATALOG = gateway.jvm.TableConfigOptions.TABLE_CATALOG_NAME.defaultValue() builder = EnvironmentSettings.new_instance() @@ -67,7 +67,7 @@ class EnvironmentSettingsTests(PyFlinkTestCase): gateway = get_gateway() - DEFAULT_BUILTIN_DATABASE = gateway.jvm.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE + DEFAULT_BUILTIN_DATABASE = gateway.jvm.TableConfigOptions.TABLE_DATABASE_NAME.defaultValue() builder = EnvironmentSettings.new_instance() diff --git a/flink-python/pyflink/table/tests/test_table_config_completeness.py b/flink-python/pyflink/table/tests/test_table_config_completeness.py index 396f5d7..b2dc4c6 100644 --- a/flink-python/pyflink/table/tests/test_table_config_completeness.py +++ b/flink-python/pyflink/table/tests/test_table_config_completeness.py @@ -37,7 +37,8 @@ class TableConfigCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCas @classmethod def excluded_methods(cls): # internal interfaces, no need to expose to users. - return {'getPlannerConfig', 'setPlannerConfig', 'addJobParameter'} + return {'getPlannerConfig', 'setPlannerConfig', 'addJobParameter', + 'setRootConfiguration', 'get', 'getOptional'} @classmethod def java_method_name(cls, python_method_name): diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java index ae1a829..4c0f7f4 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java @@ -98,19 +98,20 @@ public class ExecutionContext { // ------------------------------------------------------------------------------------------------------------------ private StreamTableEnvironment createTableEnvironment() { - // checks the value of RUNTIME_MODE - EnvironmentSettings settings = EnvironmentSettings.fromConfiguration(flinkConfig); + EnvironmentSettings settings = + EnvironmentSettings.newInstance().withConfiguration(flinkConfig).build(); - TableConfig tableConfig = new TableConfig(); - tableConfig.addConfiguration(flinkConfig); - - StreamExecutionEnvironment streamExecEnv = createStreamExecutionEnvironment(); + // We need not different StreamExecutionEnvironments to build and submit flink job, + // instead we just use StreamExecutionEnvironment#executeAsync(StreamGraph) method + // to execute existing StreamGraph. + // This requires StreamExecutionEnvironment to have a full flink configuration. + StreamExecutionEnvironment streamExecEnv = + new StreamExecutionEnvironment(new Configuration(flinkConfig), classLoader); final Executor executor = lookupExecutor(streamExecEnv); return createStreamTableEnvironment( streamExecEnv, settings, - tableConfig, executor, sessionState.catalogManager, sessionState.moduleManager, @@ -121,13 +122,16 @@ public class ExecutionContext { private StreamTableEnvironment createStreamTableEnvironment( StreamExecutionEnvironment env, EnvironmentSettings settings, - TableConfig tableConfig, Executor executor, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog, ClassLoader userClassLoader) { + TableConfig tableConfig = TableConfig.getDefault(); + tableConfig.setRootConfiguration(executor.getConfiguration()); + tableConfig.addConfiguration(settings.getConfiguration()); + final Planner planner = PlannerFactoryUtil.createPlanner( executor, tableConfig, moduleManager, catalogManager, functionCatalog); @@ -161,12 +165,4 @@ public class ExecutionContext { e); } } - - private StreamExecutionEnvironment createStreamExecutionEnvironment() { - // We need not different StreamExecutionEnvironments to build and submit flink job, - // instead we just use StreamExecutionEnvironment#executeAsync(StreamGraph) method - // to execute existing StreamGraph. - // This requires StreamExecutionEnvironment to have a full flink configuration. - return new StreamExecutionEnvironment(new Configuration(flinkConfig), classLoader); - } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java index c45d5af..d392460 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java @@ -221,7 +221,8 @@ public class SessionContext { ModuleManager moduleManager = new ModuleManager(); - final EnvironmentSettings settings = EnvironmentSettings.fromConfiguration(configuration); + final EnvironmentSettings settings = + EnvironmentSettings.newInstance().withConfiguration(configuration).build(); CatalogManager catalogManager = CatalogManager.newBuilder() diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java index 09e6519..6c96858 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java @@ -31,7 +31,6 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; import org.apache.flink.table.connector.ChangelogMode; @@ -92,9 +91,7 @@ public interface StreamTableEnvironment extends TableEnvironment { * TableEnvironment}. */ static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) { - return create( - executionEnvironment, - EnvironmentSettings.fromConfiguration(executionEnvironment.getConfiguration())); + return create(executionEnvironment, EnvironmentSettings.newInstance().build()); } /** @@ -122,9 +119,7 @@ public interface StreamTableEnvironment extends TableEnvironment { */ static StreamTableEnvironment create( StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) { - TableConfig tableConfig = new TableConfig(); - tableConfig.addConfiguration(settings.toConfiguration()); - return StreamTableEnvironmentImpl.create(executionEnvironment, settings, tableConfig); + return StreamTableEnvironmentImpl.create(executionEnvironment, settings); } /** diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java index 5d66cfa..cb8d38b 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java @@ -92,19 +92,23 @@ public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnviron } public static StreamTableEnvironment create( - StreamExecutionEnvironment executionEnvironment, - EnvironmentSettings settings, - TableConfig tableConfig) { + StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) { // temporary solution until FLINK-15635 is fixed final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + final Executor executor = lookupExecutor(classLoader, executionEnvironment); + + final TableConfig tableConfig = TableConfig.getDefault(); + tableConfig.setRootConfiguration(executor.getConfiguration()); + tableConfig.addConfiguration(settings.getConfiguration()); + final ModuleManager moduleManager = new ModuleManager(); final CatalogManager catalogManager = CatalogManager.newBuilder() .classLoader(classLoader) - .config(tableConfig.getConfiguration()) + .config(tableConfig) .defaultCatalog( settings.getBuiltInCatalogName(), new GenericInMemoryCatalog( @@ -116,8 +120,6 @@ public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnviron final FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager); - final Executor executor = lookupExecutor(classLoader, executionEnvironment); - final Planner planner = PlannerFactoryUtil.createPlanner( executor, tableConfig, moduleManager, catalogManager, functionCatalog); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java index d8a2464..258fcb8 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java @@ -21,11 +21,14 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.functions.UserDefinedFunction; import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH; import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING; import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_CATALOG_NAME; +import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DATABASE_NAME; /** * Defines all parameters that initialize a table environment. Those parameters are used only during @@ -53,32 +56,14 @@ public class EnvironmentSettings { private static final EnvironmentSettings DEFAULT_BATCH_MODE_SETTINGS = EnvironmentSettings.newInstance().inBatchMode().build(); - public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog"; - public static final String DEFAULT_BUILTIN_DATABASE = "default_database"; - - /** - * Specifies the name of the initial catalog to be created when instantiating {@link - * TableEnvironment}. - */ - private final String builtInCatalogName; - - /** - * Specifies the name of the default database in the initial catalog to be created when - * instantiating {@link TableEnvironment}. - */ - private final String builtInDatabaseName; - /** - * Determines if the table environment should work in a batch ({@code false}) or streaming - * ({@code true}) mode. + * Holds all the configuration generated by the builder, together with any required additional + * configuration. */ - private final boolean isStreamingMode; + private final Configuration configuration; - private EnvironmentSettings( - String builtInCatalogName, String builtInDatabaseName, boolean isStreamingMode) { - this.builtInCatalogName = builtInCatalogName; - this.builtInDatabaseName = builtInDatabaseName; - this.isStreamingMode = isStreamingMode; + private EnvironmentSettings(Configuration configuration) { + this.configuration = configuration; } /** @@ -111,32 +96,28 @@ public class EnvironmentSettings { return new Builder(); } - /** Creates an instance of {@link EnvironmentSettings} from configuration. */ + /** + * Creates an instance of {@link EnvironmentSettings} from configuration. + * + * @deprecated use {@link Builder#withConfiguration(Configuration)} instead. + */ + @Deprecated public static EnvironmentSettings fromConfiguration(ReadableConfig configuration) { - final Builder builder = new Builder(); - switch (configuration.get(RUNTIME_MODE)) { - case STREAMING: - builder.inStreamingMode(); - break; - case BATCH: - builder.inBatchMode(); - break; - case AUTOMATIC: - default: - throw new TableException( - String.format( - "Unsupported mode '%s' for '%s'. " - + "Only an explicit BATCH or STREAMING mode is supported in Table API.", - configuration.get(RUNTIME_MODE), RUNTIME_MODE.key())); - } - - return builder.build(); + return new EnvironmentSettings((Configuration) configuration); } - /** Convert the environment setting to the {@link Configuration}. */ + /** + * Convert the environment setting to the {@link Configuration}. + * + * @deprecated use {@link #getConfiguration} instead. + */ + @Deprecated public Configuration toConfiguration() { - Configuration configuration = new Configuration(); - configuration.set(RUNTIME_MODE, isStreamingMode() ? STREAMING : BATCH); + return configuration; + } + + /** Get the underlying {@link Configuration}. */ + public Configuration getConfiguration() { return configuration; } @@ -145,7 +126,7 @@ public class EnvironmentSettings { * TableEnvironment}. */ public String getBuiltInCatalogName() { - return builtInCatalogName; + return configuration.get(TABLE_CATALOG_NAME); } /** @@ -153,31 +134,31 @@ public class EnvironmentSettings { * instantiating a {@link TableEnvironment}. */ public String getBuiltInDatabaseName() { - return builtInDatabaseName; + return configuration.get(TABLE_DATABASE_NAME); } /** Tells if the {@link TableEnvironment} should work in a batch or streaming mode. */ public boolean isStreamingMode() { - return isStreamingMode; + return configuration.get(RUNTIME_MODE) == STREAMING; } /** A builder for {@link EnvironmentSettings}. */ @PublicEvolving public static class Builder { - private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG; - private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE; - private boolean isStreamingMode = true; + private final Configuration configuration = new Configuration(); + + public Builder() {} /** Sets that the components should work in a batch mode. Streaming mode by default. */ public Builder inBatchMode() { - this.isStreamingMode = false; + configuration.set(RUNTIME_MODE, BATCH); return this; } /** Sets that the components should work in a streaming mode. Enabled by default. */ public Builder inStreamingMode() { - this.isStreamingMode = true; + configuration.set(RUNTIME_MODE, STREAMING); return this; } @@ -193,10 +174,10 @@ public class EnvironmentSettings { * <p>It will also be the initial value for the current catalog which can be altered via * {@link TableEnvironment#useCatalog(String)}. * - * <p>Default: "default_catalog". + * <p>Default: {@link TableConfigOptions#TABLE_DATABASE_NAME}{@code .defaultValue()}. */ public Builder withBuiltInCatalogName(String builtInCatalogName) { - this.builtInCatalogName = builtInCatalogName; + configuration.set(TABLE_CATALOG_NAME, builtInCatalogName); return this; } @@ -212,17 +193,22 @@ public class EnvironmentSettings { * <p>It will also be the initial value for the current database which can be altered via * {@link TableEnvironment#useDatabase(String)}. * - * <p>Default: "default_database". + * <p>Default: {@link TableConfigOptions#TABLE_DATABASE_NAME}{@code .defaultValue()}. */ public Builder withBuiltInDatabaseName(String builtInDatabaseName) { - this.builtInDatabaseName = builtInDatabaseName; + configuration.set(TABLE_DATABASE_NAME, builtInDatabaseName); + return this; + } + + /** Add extra configuration to {@link EnvironmentSettings}. */ + public Builder withConfiguration(Configuration configuration) { + this.configuration.addAll(configuration); return this; } /** Returns an immutable instance of {@link EnvironmentSettings}. */ public EnvironmentSettings build() { - return new EnvironmentSettings( - builtInCatalogName, builtInDatabaseName, isStreamingMode); + return new EnvironmentSettings(configuration); } } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java index 2565bb4..141ad78 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java @@ -19,11 +19,13 @@ package org.apache.flink.table.api; import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.PipelineOptions; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.configuration.WritableConfig; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.OptimizerConfigOptions; @@ -35,6 +37,7 @@ import java.time.Duration; import java.time.ZoneId; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import static java.time.ZoneId.SHORT_IDS; @@ -68,7 +71,15 @@ import static java.time.ZoneId.SHORT_IDS; * @see OptimizerConfigOptions */ @PublicEvolving -public class TableConfig implements WritableConfig { +public final class TableConfig implements WritableConfig, ReadableConfig { + // + // TableConfig is also a ReadableConfig which is built once the TableEnvironment is created and + // contains both the configuration defined in the environment (flink-conf.yaml + CLI params), + // stored in rootConfiguration, but also any extra configuration defined by the user in the + // application, which has precedence over the environment configuration. This way, any consumer + // of TableConfig cat get the complete view of the configuration (environment + user defined) by + // calling the get() and getOptional() methods. + /** Defines if all fields need to be checked for NULL first. */ private Boolean nullCheck = true; @@ -84,6 +95,8 @@ public class TableConfig implements WritableConfig { /** A configuration object to hold all key/value configuration. */ private final Configuration configuration = new Configuration(); + private ReadableConfig rootConfiguration = new Configuration(); + /** * Sets a value for the given {@link ConfigOption}. * @@ -121,12 +134,37 @@ public class TableConfig implements WritableConfig { return this; } + @Override + public <T> T get(ConfigOption<T> option) { + return configuration.getOptional(option).orElseGet(() -> rootConfiguration.get(option)); + } + + @Override + public <T> Optional<T> getOptional(ConfigOption<T> option) { + final Optional<T> tableValue = configuration.getOptional(option); + if (tableValue.isPresent()) { + return tableValue; + } + return rootConfiguration.getOptional(option); + } + /** Gives direct access to the underlying key-value map for advanced configuration. */ public Configuration getConfiguration() { return configuration; } /** + * Sets the given key-value configuration as {@link #rootConfiguration}, which contains any + * configuration set in the environment ({@code flink-conf.yaml} + {@code CLI} parameters. + * + * @param rootConfiguration key-value root configuration to be set + */ + @Internal + public void setRootConfiguration(ReadableConfig rootConfiguration) { + this.rootConfiguration = rootConfiguration; + } + + /** * Adds the given key-value configuration to the underlying configuration. It overwrites * existing keys. * @@ -139,8 +177,7 @@ public class TableConfig implements WritableConfig { /** Returns the current SQL dialect. */ public SqlDialect getSqlDialect() { - return SqlDialect.valueOf( - getConfiguration().getString(TableConfigOptions.TABLE_SQL_DIALECT).toUpperCase()); + return SqlDialect.valueOf(get(TableConfigOptions.TABLE_SQL_DIALECT).toUpperCase()); } /** Sets the current SQL dialect to parse a SQL query. Flink's SQL behavior by default. */ @@ -320,9 +357,9 @@ public class TableConfig implements WritableConfig { && !(maxTime.toMilliseconds() == 0 && minTime.toMilliseconds() == 0)) { throw new IllegalArgumentException( "Difference between minTime: " - + minTime.toString() + + minTime + " and maxTime: " - + maxTime.toString() + + maxTime + " should be at least 5 minutes."); } setIdleStateRetention(Duration.ofMillis(minTime.toMilliseconds())); @@ -398,8 +435,7 @@ public class TableConfig implements WritableConfig { @Experimental public void addJobParameter(String key, String value) { Map<String, String> params = - getConfiguration() - .getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS) + getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS) .map(HashMap::new) .orElseGet(HashMap::new); params.put(key, value); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java index 8601f38..d4afa20 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java @@ -41,6 +41,24 @@ public class TableConfigOptions { private TableConfigOptions() {} @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<String> TABLE_CATALOG_NAME = + key("table.builtin-catalog-name") + .stringType() + .defaultValue("default_catalog") + .withDescription( + "The name of the initial catalog to be created when " + + "instantiating a TableEnvironment."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<String> TABLE_DATABASE_NAME = + key("table.builtin-database-name") + .stringType() + .defaultValue("default_database") + .withDescription( + "The name of the default database in the initial catalog to be created " + + "when instantiating TableEnvironment."); + + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption<Boolean> TABLE_DML_SYNC = key("table.dml-sync") .booleanType() diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 8d6a5a9..a692e55 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -265,21 +265,22 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { } public static TableEnvironmentImpl create(Configuration configuration) { - return create(EnvironmentSettings.fromConfiguration(configuration), configuration); + return create(EnvironmentSettings.newInstance().withConfiguration(configuration).build()); } public static TableEnvironmentImpl create(EnvironmentSettings settings) { - return create(settings, settings.toConfiguration()); - } - - private static TableEnvironmentImpl create( - EnvironmentSettings settings, Configuration configuration) { // temporary solution until FLINK-15635 is fixed final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + final ExecutorFactory executorFactory = + FactoryUtil.discoverFactory( + classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER); + final Executor executor = executorFactory.create(settings.getConfiguration()); + // use configuration to init table config final TableConfig tableConfig = new TableConfig(); - tableConfig.addConfiguration(configuration); + tableConfig.setRootConfiguration(executor.getConfiguration()); + tableConfig.addConfiguration(settings.getConfiguration()); final ModuleManager moduleManager = new ModuleManager(); @@ -297,11 +298,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal { final FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager); - final ExecutorFactory executorFactory = - FactoryUtil.discoverFactory( - classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER); - final Executor executor = executorFactory.create(configuration); - final Planner planner = PlannerFactoryUtil.createPlanner( executor, tableConfig, moduleManager, catalogManager, functionCatalog); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/EnvironmentSettingsTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/EnvironmentSettingsTest.java index bc24286..ad4630b 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/EnvironmentSettingsTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/EnvironmentSettingsTest.java @@ -34,15 +34,16 @@ public class EnvironmentSettingsTest { public void testFromConfiguration() { Configuration configuration = new Configuration(); configuration.setString("execution.runtime-mode", "batch"); - EnvironmentSettings settings = EnvironmentSettings.fromConfiguration(configuration); + EnvironmentSettings settings = + EnvironmentSettings.newInstance().withConfiguration(configuration).build(); assertFalse("Expect batch mode.", settings.isStreamingMode()); } @Test - public void testToConfiguration() { + public void testGetConfiguration() { EnvironmentSettings settings = new EnvironmentSettings.Builder().inBatchMode().build(); - Configuration configuration = settings.toConfiguration(); + Configuration configuration = settings.getConfiguration(); assertEquals(RuntimeExecutionMode.BATCH, configuration.get(RUNTIME_MODE)); } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java index c94dee9..b30f806 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java @@ -18,7 +18,7 @@ package org.apache.flink.table.catalog; -import org.apache.flink.table.api.TableConfig; +import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.FunctionDefinition; @@ -85,7 +85,7 @@ public class FunctionCatalogTest { functionCatalog = new FunctionCatalog( - TableConfig.getDefault(), + new Configuration(), CatalogManagerMocks.preparedCatalogManager() .defaultCatalog(DEFAULT_CATALOG, catalog) .build(), diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java index 94bc182..c3fb2e4 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java @@ -20,7 +20,7 @@ package org.apache.flink.table.utils; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogManager; import org.apache.flink.table.catalog.GenericInMemoryCatalog; @@ -30,9 +30,11 @@ import javax.annotation.Nullable; /** Mock implementations of {@link CatalogManager} for testing purposes. */ public final class CatalogManagerMocks { - public static final String DEFAULT_CATALOG = EnvironmentSettings.DEFAULT_BUILTIN_CATALOG; + public static final String DEFAULT_CATALOG = + TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(); - public static final String DEFAULT_DATABASE = EnvironmentSettings.DEFAULT_BUILTIN_DATABASE; + public static final String DEFAULT_DATABASE = + TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(); public static CatalogManager createEmptyCatalogManager() { return createCatalogManager(null); diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java index cb09387..f917132 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java @@ -18,6 +18,7 @@ package org.apache.flink.table.utils; +import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.internal.TableEnvironmentImpl; @@ -69,7 +70,7 @@ public class TableEnvironmentMock extends TableEnvironmentImpl { } private static TableEnvironmentMock getInstance(boolean isStreamingMode) { - final TableConfig tableConfig = createTableConfig(); + final TableConfig tableConfig = TableConfig.getDefault(); final CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager(); final ModuleManager moduleManager = new ModuleManager(); return new TableEnvironmentMock( @@ -77,22 +78,19 @@ public class TableEnvironmentMock extends TableEnvironmentImpl { moduleManager, tableConfig, createExecutor(), - createFunctionCatalog(tableConfig, catalogManager, moduleManager), + createFunctionCatalog( + tableConfig.getConfiguration(), catalogManager, moduleManager), createPlanner(), isStreamingMode); } - private static TableConfig createTableConfig() { - return TableConfig.getDefault(); - } - private static ExecutorMock createExecutor() { return new ExecutorMock(); } private static FunctionCatalog createFunctionCatalog( - TableConfig tableConfig, CatalogManager catalogManager, ModuleManager moduleManager) { - return new FunctionCatalog(tableConfig, catalogManager, moduleManager); + ReadableConfig config, CatalogManager catalogManager, ModuleManager moduleManager) { + return new FunctionCatalog(config, catalogManager, moduleManager); } private static PlannerMock createPlanner() { diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala index e8463ee..345e2f9 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala @@ -18,7 +18,6 @@ package org.apache.flink.table.api.bridge.scala import org.apache.flink.annotation.PublicEvolving -import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} @@ -869,9 +868,7 @@ object StreamTableEnvironment { * [[TableEnvironment]]. */ def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment = { - create( - executionEnvironment, - EnvironmentSettings.fromConfiguration(executionEnvironment.getConfiguration)) + create(executionEnvironment, EnvironmentSettings.newInstance().build) } /** @@ -899,9 +896,6 @@ object StreamTableEnvironment { executionEnvironment: StreamExecutionEnvironment, settings: EnvironmentSettings) : StreamTableEnvironment = { - val config = new TableConfig() - config.addConfiguration(settings.toConfiguration) - StreamTableEnvironmentImpl - .create(executionEnvironment, settings, config) + StreamTableEnvironmentImpl.create(executionEnvironment, settings) } } diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala index a968e49..cbc4f38 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.table.api.bridge.scala.internal -import org.apache.flink.annotation.Internal +import org.apache.flink.annotation.{Internal, VisibleForTesting} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic @@ -40,6 +40,7 @@ import org.apache.flink.types.Row import org.apache.flink.util.Preconditions import java.util.Optional + import scala.collection.JavaConverters._ /** @@ -294,13 +295,19 @@ object StreamTableEnvironmentImpl { def create( executionEnvironment: StreamExecutionEnvironment, - settings: EnvironmentSettings, - tableConfig: TableConfig) + settings: EnvironmentSettings) : StreamTableEnvironmentImpl = { // temporary solution until FLINK-15635 is fixed val classLoader = Thread.currentThread.getContextClassLoader + val executor = AbstractStreamTableEnvironmentImpl.lookupExecutor( + classLoader, executionEnvironment.getWrappedStreamExecutionEnvironment) + + val tableConfig = TableConfig.getDefault + tableConfig.setRootConfiguration(executor.getConfiguration) + tableConfig.addConfiguration(settings.getConfiguration) + val moduleManager = new ModuleManager val catalogManager = CatalogManager.newBuilder @@ -316,9 +323,6 @@ object StreamTableEnvironmentImpl { val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager) - val executor = AbstractStreamTableEnvironmentImpl.lookupExecutor( - classLoader, executionEnvironment.getWrappedStreamExecutionEnvironment) - val planner = PlannerFactoryUtil.createPlanner( executor, tableConfig, moduleManager, catalogManager, functionCatalog) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultPlannerFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultPlannerFactory.java index 4476a7a..b8659ba 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultPlannerFactory.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultPlannerFactory.java @@ -29,6 +29,8 @@ import org.apache.flink.table.delegation.PlannerFactory; import java.util.Collections; import java.util.Set; +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; + /** Factory for the default {@link Planner}. */ @Internal public final class DefaultPlannerFactory implements PlannerFactory { @@ -51,7 +53,7 @@ public final class DefaultPlannerFactory implements PlannerFactory { @Override public Planner create(Context context) { final RuntimeExecutionMode runtimeExecutionMode = - context.getTableConfig().getConfiguration().get(ExecutionOptions.RUNTIME_MODE); + context.getTableConfig().get(ExecutionOptions.RUNTIME_MODE); switch (runtimeExecutionMode) { case STREAMING: return new StreamPlanner( @@ -70,8 +72,9 @@ public final class DefaultPlannerFactory implements PlannerFactory { default: throw new TableException( String.format( - "Unknown runtime mode '%s'. This is a bug. Please consider filing an issue.", - runtimeExecutionMode)); + "Unsupported mode '%s' for '%s'. Only an explicit BATCH or " + + "STREAMING mode is supported in Table API.", + runtimeExecutionMode, RUNTIME_MODE.key())); } } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java index ff34953..eaa2582 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java @@ -24,12 +24,15 @@ import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.types.Row; import org.junit.Test; import java.time.Duration; +import java.util.concurrent.ExecutionException; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; /** Tests for {@link TableEnvironment} that require a planner. */ @@ -61,4 +64,29 @@ public class EnvironmentTest { assertEquals(800, env.getConfig().getAutoWatermarkInterval()); assertEquals(30000, env.getCheckpointConfig().getCheckpointInterval()); } + + @Test + public void testEnvironmentSettings() throws ExecutionException, InterruptedException { + Configuration conf = new Configuration(); + conf.set(TableConfigOptions.TABLE_CATALOG_NAME, "myCatalog"); + EnvironmentSettings settings = + EnvironmentSettings.newInstance().withConfiguration(conf).build(); + + TableEnvironment tEnv = TableEnvironment.create(settings); + assertThat(tEnv.getConfig().get(TableConfigOptions.TABLE_CATALOG_NAME)) + .isEqualTo("myCatalog"); + assertThat(tEnv.getCurrentCatalog()).isEqualTo("myCatalog"); + + StreamTableEnvironment stEnv = + StreamTableEnvironment.create( + StreamExecutionEnvironment.getExecutionEnvironment(), settings); + assertThat(stEnv.getConfig().get(TableConfigOptions.TABLE_CATALOG_NAME)) + .isEqualTo("myCatalog"); + + stEnv.getConfig() + .set( + TableConfigOptions.TABLE_CATALOG_NAME, + TableConfigOptions.TABLE_CATALOG_NAME.defaultValue()); + assertThat(stEnv.getCurrentCatalog()).isEqualTo("myCatalog"); + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java index 31ac8b5..b0667b1 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java @@ -110,7 +110,7 @@ class TransformationsTest { @Test public void testLegacyUid() { final TableEnvironment env = - TableEnvironment.create(EnvironmentSettings.inStreamingMode().toConfiguration()); + TableEnvironment.create(EnvironmentSettings.inStreamingMode().getConfiguration()); env.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS, true); env.createTemporaryTable( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java index 57a712f..d6857b5 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java @@ -44,6 +44,7 @@ import org.apache.flink.table.planner.utils.PlannerMocks; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.utils.CatalogManagerMocks; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; @@ -57,8 +58,6 @@ import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; -import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG; -import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE; import static org.apache.flink.table.api.config.TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS; import static org.apache.flink.table.api.config.TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; @@ -99,8 +98,8 @@ class DynamicTableSinkSpecSerdeTest { new DynamicTableSinkSpec( ContextResolvedTable.temporary( ObjectIdentifier.of( - DEFAULT_BUILTIN_CATALOG, - DEFAULT_BUILTIN_DATABASE, + CatalogManagerMocks.DEFAULT_CATALOG, + CatalogManagerMocks.DEFAULT_DATABASE, "MyTable"), new ResolvedCatalogTable(catalogTable1, resolvedSchema1)), null); @@ -129,8 +128,8 @@ class DynamicTableSinkSpecSerdeTest { new DynamicTableSinkSpec( ContextResolvedTable.temporary( ObjectIdentifier.of( - DEFAULT_BUILTIN_CATALOG, - DEFAULT_BUILTIN_DATABASE, + CatalogManagerMocks.DEFAULT_CATALOG, + CatalogManagerMocks.DEFAULT_DATABASE, "MyTable"), new ResolvedCatalogTable(catalogTable2, resolvedSchema2)), Arrays.asList( @@ -165,8 +164,8 @@ class DynamicTableSinkSpecSerdeTest { new DynamicTableSinkSpec( ContextResolvedTable.temporary( ObjectIdentifier.of( - DEFAULT_BUILTIN_CATALOG, - DEFAULT_BUILTIN_DATABASE, + CatalogManagerMocks.DEFAULT_CATALOG, + CatalogManagerMocks.DEFAULT_DATABASE, "MyTable"), new ResolvedCatalogTable(catalogTable3, resolvedSchema3)), Collections.singletonList( @@ -214,7 +213,10 @@ class DynamicTableSinkSpecSerdeTest { void testDynamicTableSinkSpecSerdeWithEnrichmentOptions() throws Exception { // Test model ObjectIdentifier identifier = - ObjectIdentifier.of(DEFAULT_BUILTIN_CATALOG, DEFAULT_BUILTIN_DATABASE, "my_table"); + ObjectIdentifier.of( + CatalogManagerMocks.DEFAULT_CATALOG, + CatalogManagerMocks.DEFAULT_DATABASE, + "my_table"); String formatPrefix = FactoryUtil.getFormatPrefix(FORMAT, TestFormatFactory.IDENTIFIER); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java index a15acb7..3764f6b 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java @@ -23,6 +23,7 @@ import org.apache.flink.connector.file.table.FileSystemTableFactory; import org.apache.flink.formats.testcsv.TestCsvFormatFactory; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation; import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore; import org.apache.flink.table.catalog.CatalogManager; @@ -72,8 +73,6 @@ import java.util.HashMap; import java.util.Map; import java.util.stream.Stream; -import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG; -import static org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE; import static org.apache.flink.table.api.config.TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS; import static org.apache.flink.table.api.config.TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; @@ -114,8 +113,8 @@ public class DynamicTableSourceSpecSerdeTest { new DynamicTableSourceSpec( ContextResolvedTable.temporary( ObjectIdentifier.of( - DEFAULT_BUILTIN_CATALOG, - DEFAULT_BUILTIN_DATABASE, + TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(), + TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(), "MyTable"), new ResolvedCatalogTable(catalogTable1, resolvedSchema1)), null); @@ -154,8 +153,8 @@ public class DynamicTableSourceSpecSerdeTest { new DynamicTableSourceSpec( ContextResolvedTable.temporary( ObjectIdentifier.of( - DEFAULT_BUILTIN_CATALOG, - DEFAULT_BUILTIN_DATABASE, + TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(), + TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(), "MyTable"), new ResolvedCatalogTable(catalogTable2, resolvedSchema2)), Arrays.asList( @@ -274,7 +273,10 @@ public class DynamicTableSourceSpecSerdeTest { void testDynamicTableSourceSpecSerdeWithEnrichmentOptions() throws Exception { // Test model ObjectIdentifier identifier = - ObjectIdentifier.of(DEFAULT_BUILTIN_CATALOG, DEFAULT_BUILTIN_DATABASE, "my_table"); + ObjectIdentifier.of( + TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(), + TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(), + "my_table"); String formatPrefix = FactoryUtil.getFormatPrefix(FORMAT, TestFormatFactory.IDENTIFIER); diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala index 07dff77..52fc78b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala @@ -22,11 +22,11 @@ import org.apache.flink.api.common.TaskInfo import org.apache.flink.api.common.functions.util.RuntimeUDFContext import org.apache.flink.api.common.functions.{MapFunction, RichFunction, RichMapFunction} import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.configuration.Configuration +import org.apache.flink.configuration.{ConfigOption, Configuration} import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl -import org.apache.flink.table.api.config.ExecutionConfigOptions +import org.apache.flink.table.api.config.{ExecutionConfigOptions, TableConfigOptions} import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, TableException, ValidationException} import org.apache.flink.table.data.RowData import org.apache.flink.table.data.binary.BinaryRowData @@ -38,6 +38,7 @@ import org.apache.flink.table.expressions.Expression import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, FunctionCodeGenerator} import org.apache.flink.table.planner.delegation.PlannerBase +import org.apache.flink.table.planner.utils.TestingTableEnvironment import org.apache.flink.table.runtime.generated.GeneratedFunction import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType import org.apache.flink.table.types.AbstractDataType @@ -51,10 +52,12 @@ import org.apache.calcite.rel.logical.LogicalCalc import org.apache.calcite.rel.rules._ import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR + import org.junit.Assert.{assertEquals, assertTrue, fail} import org.junit.rules.ExpectedException import org.junit.{After, Before, Rule} +import java.time.ZoneId import java.util.Collections import scala.collection.JavaConverters._ @@ -62,8 +65,6 @@ import scala.collection.mutable abstract class ExpressionTestBase { - val config = new TableConfig() - // (originalExpr, optimizedExpr, expectedResult) private val validExprs = mutable.ArrayBuffer[(String, RexNode, String)]() // (originalSqlExpr, keywords, exceptionClass) @@ -73,11 +74,14 @@ abstract class ExpressionTestBase { .ArrayBuffer[(Expression, String, Class[_ <: Throwable])]() private val env = StreamExecutionEnvironment.createLocalEnvironment(4) - private val setting = EnvironmentSettings.newInstance().inStreamingMode().build() + private val settings = EnvironmentSettings.newInstance().inStreamingMode().build() // use impl class instead of interface class to avoid // "Static methods in interface require -target:jvm-1.8" - private val tEnv = StreamTableEnvironmentImpl.create(env, setting, config) + private val tEnv = StreamTableEnvironmentImpl.create(env, settings) .asInstanceOf[StreamTableEnvironmentImpl] + + val config = tEnv.getConfig + private val resolvedDataType = if (containsLegacyTypes) { TypeConversions.fromLegacyInfoToDataType(typeInfo) } else { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala index 39b73ef..8a81fc6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala @@ -56,8 +56,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, miniBatch: MiniBatchMode override def before(): Unit = { super.before() val setting = EnvironmentSettings.newInstance().inStreamingMode().build() - val config = new TestTableConfig - this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config) + this.tEnv = StreamTableEnvironmentImpl.create(env, setting) // set mini batch val tableConfig = tEnv.getConfig miniBatch match { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala index 6aff43d..f634b15 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala @@ -17,7 +17,6 @@ */ package org.apache.flink.table.planner.runtime.harness -import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.dag.Transformation import org.apache.flink.api.java.functions.KeySelector @@ -30,14 +29,13 @@ import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.streaming.api.transformations.{OneInputTransformation, PartitionTransformation} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, OneInputStreamOperatorTestHarness} -import org.apache.flink.table.api.TableConfig import org.apache.flink.table.data.RowData import org.apache.flink.table.planner.JLong import org.apache.flink.table.planner.runtime.utils.StreamingTestBase import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND, ROCKSDB_BACKEND, StateBackendMode} + import org.junit.runners.Parameterized -import java.time.Duration import java.util import scala.collection.JavaConversions._ @@ -120,23 +118,6 @@ class HarnessTestBase(mode: StateBackendMode) extends StreamingTestBase { def dropWatermarks(elements: Array[AnyRef]): util.Collection[AnyRef] = { elements.filter(e => !e.isInstanceOf[Watermark]).toList } - - class TestTableConfig extends TableConfig { - - private var minIdleStateRetentionTime = 0L - - private var maxIdleStateRetentionTime = 0L - - override def getMinIdleStateRetentionTime: Long = minIdleStateRetentionTime - - override def getMaxIdleStateRetentionTime: Long = maxIdleStateRetentionTime - - override def setIdleStateRetentionTime(minTime: Time, maxTime: Time): Unit = { - super.setIdleStateRetention(Duration.ofMillis(minTime.toMilliseconds)) - minIdleStateRetentionTime = minTime.toMilliseconds - maxIdleStateRetentionTime = maxTime.toMilliseconds - } - } } object HarnessTestBase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala index 7c1f566..e2d28043 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala @@ -38,6 +38,7 @@ import org.junit.runners.Parameterized import org.junit.{Before, Test} import java.lang.{Long => JLong} +import java.time.Duration import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.mutable @@ -49,8 +50,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m override def before(): Unit = { super.before() val setting = EnvironmentSettings.newInstance().inStreamingMode().build() - val config = new TestTableConfig - this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config) + this.tEnv = StreamTableEnvironmentImpl.create(env, setting) } @Test @@ -127,9 +127,9 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m expectedOutput.add(new StreamRecord( row(2L: JLong, "aaa", 8L: JLong, null, 7L: JLong, 8L: JLong))) expectedOutput.add(new StreamRecord( - row(2L: JLong, "aaa", 9L: JLong, null, 8L: JLong, 9L: JLong))) + row(2L: JLong, "aaa", 10L: JLong, null, 10L: JLong, 10L: JLong))) expectedOutput.add(new StreamRecord( - row(2L: JLong, "aaa", 10L: JLong, null, 9L: JLong, 10L: JLong))) + row(2L: JLong, "aaa", 9L: JLong, null, 8L: JLong, 9L: JLong))) expectedOutput.add(new StreamRecord( row(2L: JLong, "bbb", 40L: JLong, null, 40L: JLong, 40L: JLong))) @@ -155,7 +155,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m """.stripMargin val t1 = tEnv.sqlQuery(sql) - tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(4)) + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2)) val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate") val outputType = Array( DataTypes.BIGINT().getLogicalType, @@ -306,7 +306,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m """.stripMargin val t1 = tEnv.sqlQuery(sql) - tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(4)) + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2)) val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate") val assertor = new RowDataHarnessAssertor( Array( @@ -533,7 +533,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m """.stripMargin val t1 = tEnv.sqlQuery(sql) - tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(1), Time.seconds(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1)) val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate") val assertor = new RowDataHarnessAssertor( Array( @@ -685,7 +685,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m """.stripMargin val t1 = tEnv.sqlQuery(sql) - tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(1), Time.seconds(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1)) val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate") val assertor = new RowDataHarnessAssertor( Array( @@ -827,7 +827,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase(m """.stripMargin val t1 = tEnv.sqlQuery(sql) - tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(1), Time.seconds(2)) + tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1)) val testHarness = createHarnessTester(t1.toAppendStream[Row], "OverAggregate") val assertor = new RowDataHarnessAssertor( Array( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala index 3c97370..14724a6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala @@ -49,8 +49,7 @@ class RankHarnessTest(mode: StateBackendMode) extends HarnessTestBase(mode) { override def before(): Unit = { super.before() val setting = EnvironmentSettings.newInstance().inStreamingMode().build() - val config = new TestTableConfig - this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config) + this.tEnv = StreamTableEnvironmentImpl.create(env, setting) } @Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala index d9dcf7f..8a4378d 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala @@ -49,8 +49,7 @@ class TableAggregateHarnessTest(mode: StateBackendMode) extends HarnessTestBase( override def before(): Unit = { super.before() val setting = EnvironmentSettings.newInstance().inStreamingMode().build() - val config = new TestTableConfig - this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config) + this.tEnv = StreamTableEnvironmentImpl.create(env, setting) } val data = new mutable.MutableList[(Int, Int)] diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala index 1282aba..939d2c3 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala @@ -19,11 +19,20 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.scala._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStreamSource +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.table.api._ +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment +import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl +import org.apache.flink.table.api.bridge.scala import org.apache.flink.table.api.bridge.scala._ +import org.apache.flink.table.api.config.TableConfigOptions import org.apache.flink.table.planner.runtime.utils.JavaPojos.{Device, Order, Person, ProductItem} import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, StringSink} +import org.assertj.core.api.Assertions.assertThat + import org.junit.Assert.assertEquals import org.junit.Test @@ -137,4 +146,20 @@ class StreamTableEnvironmentITCase extends StreamingTestBase { "(true,Order{user=1, product='Product{name='beer', id=10}', amount=3})") assertEquals(expected.sorted, sink.getResults.sorted) } + + @Test + def testTableConfigInheritsEnvironmentSettings(): Unit = { + val config = new Configuration + config.setString(TableConfigOptions.TABLE_CATALOG_NAME, "myCatalog") + val env = StreamExecutionEnvironment.getExecutionEnvironment(config) + val tEnv = StreamTableEnvironment.create(env) + assertThat(tEnv.getConfig.get(TableConfigOptions.TABLE_CATALOG_NAME)).isEqualTo("myCatalog") + + val scalaEnv = org.apache.flink.streaming.api.scala.StreamExecutionEnvironment + .getExecutionEnvironment + val scalaTEnv = scala.StreamTableEnvironment.create( + scalaEnv, EnvironmentSettings.newInstance.withConfiguration(config).build) + assertThat(scalaTEnv.getConfig.get(TableConfigOptions.TABLE_CATALOG_NAME)) + .isEqualTo("myCatalog") + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 0d088d6..cb56447 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -1526,11 +1526,18 @@ object TestingTableEnvironment { catalogManager: Option[CatalogManager] = None, tableConfig: TableConfig): TestingTableEnvironment = { - tableConfig.addConfiguration(settings.toConfiguration) - // temporary solution until FLINK-15635 is fixed val classLoader = Thread.currentThread.getContextClassLoader + val executorFactory = FactoryUtil.discoverFactory( + classLoader, classOf[ExecutorFactory], ExecutorFactory.DEFAULT_IDENTIFIER) + + val executor = executorFactory.create(settings.getConfiguration) + + tableConfig.setRootConfiguration(executor.getConfiguration) + tableConfig.addConfiguration(settings.getConfiguration) + + val moduleManager = new ModuleManager val catalogMgr = catalogManager match { @@ -1538,7 +1545,7 @@ object TestingTableEnvironment { case _ => CatalogManager.newBuilder .classLoader(classLoader) - .config(tableConfig.getConfiguration) + .config(tableConfig) .defaultCatalog( settings.getBuiltInCatalogName, new GenericInMemoryCatalog( @@ -1547,12 +1554,7 @@ object TestingTableEnvironment { .build } - val functionCatalog = new FunctionCatalog(tableConfig, catalogMgr, moduleManager) - - val executorFactory = FactoryUtil.discoverFactory( - classLoader, classOf[ExecutorFactory], ExecutorFactory.DEFAULT_IDENTIFIER) - - val executor = executorFactory.create(tableConfig.getConfiguration) + val functionCatalog = new FunctionCatalog(settings.getConfiguration, catalogMgr, moduleManager) val planner = PlannerFactoryUtil.createPlanner( executor, tableConfig, moduleManager, catalogMgr, functionCatalog).asInstanceOf[PlannerBase]