This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 567b28e26b65601759c51908c10fe362e48e1a89
Author: Marios Trivyzas <mat...@gmail.com>
AuthorDate: Wed Mar 9 11:10:48 2022 +0200

    [FLINK-26421] Use only EnvironmentSettings to configure the environment
    
    Use `EnvironmentSettings` with the new method `withConfiguration` in its
    `Builder` to specify configuration options on top of the one inherited
    by the environment (flink-conf.yml, CLI params).
    
    The `TableConfig` contains all of the config options specified by the env
    (flink-conf.yml, CLI params) + all the extra configuration defined by the
    user app through the `EnvironmentSettings`.
---
 .../generated/table_config_configuration.html      |  12 +++
 .../flink/connectors/hive/HiveDialectITCase.java   |  16 ++--
 .../table/catalog/hive/HiveCatalogITCase.java      |  13 +--
 flink-python/pyflink/java_gateway.py               |   1 +
 flink-python/pyflink/table/environment_settings.py |  23 +++++
 .../table/tests/test_environment_settings.py       |   4 +-
 .../table/tests/test_table_config_completeness.py  |   3 +-
 .../client/gateway/context/ExecutionContext.java   |  28 +++---
 .../client/gateway/context/SessionContext.java     |   3 +-
 .../api/bridge/java/StreamTableEnvironment.java    |   9 +-
 .../java/internal/StreamTableEnvironmentImpl.java  |  14 +--
 .../flink/table/api/EnvironmentSettings.java       | 104 +++++++++------------
 .../org/apache/flink/table/api/TableConfig.java    |  50 ++++++++--
 .../flink/table/api/config/TableConfigOptions.java |  18 ++++
 .../table/api/internal/TableEnvironmentImpl.java   |  20 ++--
 .../flink/table/api/EnvironmentSettingsTest.java   |   7 +-
 .../flink/table/catalog/FunctionCatalogTest.java   |   4 +-
 .../flink/table/utils/CatalogManagerMocks.java     |   8 +-
 .../flink/table/utils/TableEnvironmentMock.java    |  14 ++-
 .../api/bridge/scala/StreamTableEnvironment.scala  |  10 +-
 .../internal/StreamTableEnvironmentImpl.scala      |  16 ++--
 .../planner/delegation/DefaultPlannerFactory.java  |   9 +-
 .../apache/flink/table/api/EnvironmentTest.java    |  28 ++++++
 .../plan/nodes/exec/TransformationsTest.java       |   2 +-
 .../exec/serde/DynamicTableSinkSpecSerdeTest.java  |  20 ++--
 .../serde/DynamicTableSourceSpecSerdeTest.java     |  16 ++--
 .../expressions/utils/ExpressionTestBase.scala     |  16 ++--
 .../harness/GroupAggregateHarnessTest.scala        |   3 +-
 .../planner/runtime/harness/HarnessTestBase.scala  |  21 +----
 .../runtime/harness/OverAggregateHarnessTest.scala |  18 ++--
 .../planner/runtime/harness/RankHarnessTest.scala  |   3 +-
 .../harness/TableAggregateHarnessTest.scala        |   3 +-
 .../stream/sql/StreamTableEnvironmentITCase.scala  |  25 +++++
 .../flink/table/planner/utils/TableTestBase.scala  |  20 ++--
 34 files changed, 331 insertions(+), 230 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/table_config_configuration.html 
b/docs/layouts/shortcodes/generated/table_config_configuration.html
index 53e5dbc..fe9784a 100644
--- a/docs/layouts/shortcodes/generated/table_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/table_config_configuration.html
@@ -9,6 +9,18 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>table.builtin-catalog-name</h5><br> <span class="label 
label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">"default_catalog"</td>
+            <td>String</td>
+            <td>The name of the initial catalog to be created when 
instantiating a TableEnvironment.</td>
+        </tr>
+        <tr>
+            <td><h5>table.builtin-database-name</h5><br> <span class="label 
label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">"default_database"</td>
+            <td>String</td>
+            <td>The name of the default database in the initial catalog to be 
created when instantiating TableEnvironment.</td>
+        </tr>
+        <tr>
             <td><h5>table.dml-sync</h5><br> <span class="label 
label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index 24c6904..1aa6f50 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -25,7 +25,6 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
@@ -43,6 +42,7 @@ import 
org.apache.flink.table.operations.command.QuitOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
 import org.apache.flink.table.operations.command.SetOperation;
 import org.apache.flink.table.planner.delegation.hive.HiveParser;
+import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FileUtils;
@@ -89,11 +89,6 @@ import static org.junit.Assert.fail;
 /** Test Hive syntax when Hive dialect is used. */
 public class HiveDialectITCase {
 
-    private static final String DEFAULT_BUILTIN_CATALOG =
-            TableConfigOptions.TABLE_CATALOG_NAME.defaultValue();
-    private static final String DEFAULT_BUILTIN_DATABASE =
-            TableConfigOptions.TABLE_DATABASE_NAME.defaultValue();
-
     private TableEnvironment tableEnv;
     private HiveCatalog hiveCatalog;
     private String warehouse;
@@ -674,17 +669,18 @@ public class HiveDialectITCase {
         List<Row> catalogs =
                 CollectionUtil.iteratorToList(tableEnv.executeSql("show 
catalogs").collect());
         assertEquals(2, catalogs.size());
-        tableEnv.executeSql("use catalog " + DEFAULT_BUILTIN_CATALOG);
+        tableEnv.executeSql("use catalog " + 
CatalogManagerMocks.DEFAULT_CATALOG);
         List<Row> databases =
                 CollectionUtil.iteratorToList(tableEnv.executeSql("show 
databases").collect());
         assertEquals(1, databases.size());
-        assertEquals("+I[" + DEFAULT_BUILTIN_DATABASE + "]", 
databases.get(0).toString());
+        assertEquals(
+                "+I[" + CatalogManagerMocks.DEFAULT_DATABASE + "]", 
databases.get(0).toString());
         String catalogName =
                 tableEnv.executeSql("show current 
catalog").collect().next().toString();
-        assertEquals("+I[" + DEFAULT_BUILTIN_CATALOG + "]", catalogName);
+        assertEquals("+I[" + CatalogManagerMocks.DEFAULT_CATALOG + "]", 
catalogName);
         String databaseName =
                 tableEnv.executeSql("show current 
database").collect().next().toString();
-        assertEquals("+I[" + DEFAULT_BUILTIN_DATABASE + "]", databaseName);
+        assertEquals("+I[" + CatalogManagerMocks.DEFAULT_DATABASE + "]", 
databaseName);
     }
 
     @Test
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
index ad84ede..b18e11d 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/catalog/hive/HiveCatalogITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
@@ -42,6 +41,7 @@ import org.apache.flink.table.factories.ManagedTableFactory;
 import org.apache.flink.table.factories.TestManagedTableFactory;
 import 
org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
 import org.apache.flink.table.types.AbstractDataType;
+import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FileUtils;
@@ -87,11 +87,6 @@ import static org.assertj.core.api.Assertions.assertThat;
  */
 public class HiveCatalogITCase {
 
-    private static final String DEFAULT_BUILTIN_CATALOG =
-            TableConfigOptions.TABLE_CATALOG_NAME.defaultValue();
-    private static final String DEFAULT_BUILTIN_DATABASE =
-            TableConfigOptions.TABLE_DATABASE_NAME.defaultValue();
-
     @Rule public TemporaryFolder tempFolder = new TemporaryFolder();
 
     private static HiveCatalog hiveCatalog;
@@ -478,14 +473,14 @@ public class HiveCatalogITCase {
         tableEnv.registerCatalog(hiveCatalog.getName(), hiveCatalog);
         tableEnv.useCatalog(hiveCatalog.getName());
         tableEnv.executeSql("create table generic_table (x int) with 
('connector'='COLLECTION')");
-        tableEnv.useCatalog(DEFAULT_BUILTIN_CATALOG);
+        tableEnv.useCatalog(CatalogManagerMocks.DEFAULT_CATALOG);
         tableEnv.executeSql(
                 String.format(
                         "create table copy like `%s`.`default`.generic_table",
                         hiveCatalog.getName()));
-        Catalog builtInCat = 
tableEnv.getCatalog(DEFAULT_BUILTIN_CATALOG).get();
+        Catalog builtInCat = 
tableEnv.getCatalog(CatalogManagerMocks.DEFAULT_CATALOG).get();
         CatalogBaseTable catalogTable =
-                builtInCat.getTable(new ObjectPath(DEFAULT_BUILTIN_DATABASE, 
"copy"));
+                builtInCat.getTable(new 
ObjectPath(CatalogManagerMocks.DEFAULT_DATABASE, "copy"));
         assertThat(catalogTable.getOptions()).hasSize(1);
         assertThat(catalogTable.getOptions())
                 .containsEntry(FactoryUtil.CONNECTOR.key(), "COLLECTION");
diff --git a/flink-python/pyflink/java_gateway.py 
b/flink-python/pyflink/java_gateway.py
index b127240..fdb47a8 100644
--- a/flink-python/pyflink/java_gateway.py
+++ b/flink-python/pyflink/java_gateway.py
@@ -132,6 +132,7 @@ def import_flink_view(gateway):
     """
     # Import the classes used by PyFlink
     java_import(gateway.jvm, "org.apache.flink.table.api.*")
+    java_import(gateway.jvm, "org.apache.flink.table.api.config.*")
     java_import(gateway.jvm, "org.apache.flink.table.api.java.*")
     java_import(gateway.jvm, "org.apache.flink.table.api.bridge.java.*")
     java_import(gateway.jvm, "org.apache.flink.table.api.dataview.*")
diff --git a/flink-python/pyflink/table/environment_settings.py 
b/flink-python/pyflink/table/environment_settings.py
index 578567d..6819c4b 100644
--- a/flink-python/pyflink/table/environment_settings.py
+++ b/flink-python/pyflink/table/environment_settings.py
@@ -50,6 +50,15 @@ class EnvironmentSettings(object):
             gateway = get_gateway()
             self._j_builder = gateway.jvm.EnvironmentSettings.Builder()
 
+        def with_configuration(self, config: Configuration) -> 
'EnvironmentSettings.Builder':
+            """
+            Creates the EnvironmentSetting with specified Configuration.
+
+            :return: EnvironmentSettings.
+            """
+            self._j_builder = 
self._j_builder.withConfiguration(config._j_configuration)
+            return self
+
         def in_batch_mode(self) -> 'EnvironmentSettings.Builder':
             """
             Sets that the components should work in a batch mode. Streaming 
mode by default.
@@ -155,9 +164,20 @@ class EnvironmentSettings(object):
         Convert to `pyflink.common.Configuration`.
 
         :return: Configuration with specified value.
+
+        .. note:: Deprecated in 1.15. Please use
+                :func:`EnvironmentSettings.get_configuration` instead.
         """
         return 
Configuration(j_configuration=self._j_environment_settings.toConfiguration())
 
+    def get_configuration(self) -> Configuration:
+        """
+        Get the underlying `pyflink.common.Configuration`.
+
+        :return: Configuration with specified value.
+        """
+        return 
Configuration(j_configuration=self._j_environment_settings.getConfiguration())
+
     @staticmethod
     def new_instance() -> 'EnvironmentSettings.Builder':
         """
@@ -173,6 +193,9 @@ class EnvironmentSettings(object):
         Creates the EnvironmentSetting with specified Configuration.
 
         :return: EnvironmentSettings.
+
+        .. note:: Deprecated in 1.15. Please use
+                :func:`EnvironmentSettings.Builder.with_configuration` instead.
         """
         return EnvironmentSettings(
             
get_gateway().jvm.EnvironmentSettings.fromConfiguration(config._j_configuration))
diff --git a/flink-python/pyflink/table/tests/test_environment_settings.py 
b/flink-python/pyflink/table/tests/test_environment_settings.py
index 8722904..1a2f1ed6 100644
--- a/flink-python/pyflink/table/tests/test_environment_settings.py
+++ b/flink-python/pyflink/table/tests/test_environment_settings.py
@@ -50,7 +50,7 @@ class EnvironmentSettingsTests(PyFlinkTestCase):
 
         gateway = get_gateway()
 
-        DEFAULT_BUILTIN_CATALOG = 
gateway.jvm.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG
+        DEFAULT_BUILTIN_CATALOG = 
gateway.jvm.TableConfigOptions.TABLE_CATALOG_NAME.defaultValue()
 
         builder = EnvironmentSettings.new_instance()
 
@@ -67,7 +67,7 @@ class EnvironmentSettingsTests(PyFlinkTestCase):
 
         gateway = get_gateway()
 
-        DEFAULT_BUILTIN_DATABASE = 
gateway.jvm.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE
+        DEFAULT_BUILTIN_DATABASE = 
gateway.jvm.TableConfigOptions.TABLE_DATABASE_NAME.defaultValue()
 
         builder = EnvironmentSettings.new_instance()
 
diff --git a/flink-python/pyflink/table/tests/test_table_config_completeness.py 
b/flink-python/pyflink/table/tests/test_table_config_completeness.py
index 396f5d7..b2dc4c6 100644
--- a/flink-python/pyflink/table/tests/test_table_config_completeness.py
+++ b/flink-python/pyflink/table/tests/test_table_config_completeness.py
@@ -37,7 +37,8 @@ class 
TableConfigCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCas
     @classmethod
     def excluded_methods(cls):
         # internal interfaces, no need to expose to users.
-        return {'getPlannerConfig', 'setPlannerConfig', 'addJobParameter'}
+        return {'getPlannerConfig', 'setPlannerConfig', 'addJobParameter',
+                'setRootConfiguration', 'get', 'getOptional'}
 
     @classmethod
     def java_method_name(cls, python_method_name):
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index ae1a829..4c0f7f4 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -98,19 +98,20 @@ public class ExecutionContext {
     // 
------------------------------------------------------------------------------------------------------------------
 
     private StreamTableEnvironment createTableEnvironment() {
-        // checks the value of RUNTIME_MODE
-        EnvironmentSettings settings = 
EnvironmentSettings.fromConfiguration(flinkConfig);
+        EnvironmentSettings settings =
+                
EnvironmentSettings.newInstance().withConfiguration(flinkConfig).build();
 
-        TableConfig tableConfig = new TableConfig();
-        tableConfig.addConfiguration(flinkConfig);
-
-        StreamExecutionEnvironment streamExecEnv = 
createStreamExecutionEnvironment();
+        // We need not different StreamExecutionEnvironments to build and 
submit flink job,
+        // instead we just use 
StreamExecutionEnvironment#executeAsync(StreamGraph) method
+        // to execute existing StreamGraph.
+        // This requires StreamExecutionEnvironment to have a full flink 
configuration.
+        StreamExecutionEnvironment streamExecEnv =
+                new StreamExecutionEnvironment(new Configuration(flinkConfig), 
classLoader);
 
         final Executor executor = lookupExecutor(streamExecEnv);
         return createStreamTableEnvironment(
                 streamExecEnv,
                 settings,
-                tableConfig,
                 executor,
                 sessionState.catalogManager,
                 sessionState.moduleManager,
@@ -121,13 +122,16 @@ public class ExecutionContext {
     private StreamTableEnvironment createStreamTableEnvironment(
             StreamExecutionEnvironment env,
             EnvironmentSettings settings,
-            TableConfig tableConfig,
             Executor executor,
             CatalogManager catalogManager,
             ModuleManager moduleManager,
             FunctionCatalog functionCatalog,
             ClassLoader userClassLoader) {
 
+        TableConfig tableConfig = TableConfig.getDefault();
+        tableConfig.setRootConfiguration(executor.getConfiguration());
+        tableConfig.addConfiguration(settings.getConfiguration());
+
         final Planner planner =
                 PlannerFactoryUtil.createPlanner(
                         executor, tableConfig, moduleManager, catalogManager, 
functionCatalog);
@@ -161,12 +165,4 @@ public class ExecutionContext {
                     e);
         }
     }
-
-    private StreamExecutionEnvironment createStreamExecutionEnvironment() {
-        // We need not different StreamExecutionEnvironments to build and 
submit flink job,
-        // instead we just use 
StreamExecutionEnvironment#executeAsync(StreamGraph) method
-        // to execute existing StreamGraph.
-        // This requires StreamExecutionEnvironment to have a full flink 
configuration.
-        return new StreamExecutionEnvironment(new Configuration(flinkConfig), 
classLoader);
-    }
 }
diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
index c45d5af..d392460 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
@@ -221,7 +221,8 @@ public class SessionContext {
 
         ModuleManager moduleManager = new ModuleManager();
 
-        final EnvironmentSettings settings = 
EnvironmentSettings.fromConfiguration(configuration);
+        final EnvironmentSettings settings =
+                
EnvironmentSettings.newInstance().withConfiguration(configuration).build();
 
         CatalogManager catalogManager =
                 CatalogManager.newBuilder()
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
index 09e6519..6c96858 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.java
@@ -31,7 +31,6 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -92,9 +91,7 @@ public interface StreamTableEnvironment extends 
TableEnvironment {
      *     TableEnvironment}.
      */
     static StreamTableEnvironment create(StreamExecutionEnvironment 
executionEnvironment) {
-        return create(
-                executionEnvironment,
-                
EnvironmentSettings.fromConfiguration(executionEnvironment.getConfiguration()));
+        return create(executionEnvironment, 
EnvironmentSettings.newInstance().build());
     }
 
     /**
@@ -122,9 +119,7 @@ public interface StreamTableEnvironment extends 
TableEnvironment {
      */
     static StreamTableEnvironment create(
             StreamExecutionEnvironment executionEnvironment, 
EnvironmentSettings settings) {
-        TableConfig tableConfig = new TableConfig();
-        tableConfig.addConfiguration(settings.toConfiguration());
-        return StreamTableEnvironmentImpl.create(executionEnvironment, 
settings, tableConfig);
+        return StreamTableEnvironmentImpl.create(executionEnvironment, 
settings);
     }
 
     /**
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index 5d66cfa..cb8d38b 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -92,19 +92,23 @@ public final class StreamTableEnvironmentImpl extends 
AbstractStreamTableEnviron
     }
 
     public static StreamTableEnvironment create(
-            StreamExecutionEnvironment executionEnvironment,
-            EnvironmentSettings settings,
-            TableConfig tableConfig) {
+            StreamExecutionEnvironment executionEnvironment, 
EnvironmentSettings settings) {
 
         // temporary solution until FLINK-15635 is fixed
         final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
 
+        final Executor executor = lookupExecutor(classLoader, 
executionEnvironment);
+
+        final TableConfig tableConfig = TableConfig.getDefault();
+        tableConfig.setRootConfiguration(executor.getConfiguration());
+        tableConfig.addConfiguration(settings.getConfiguration());
+
         final ModuleManager moduleManager = new ModuleManager();
 
         final CatalogManager catalogManager =
                 CatalogManager.newBuilder()
                         .classLoader(classLoader)
-                        .config(tableConfig.getConfiguration())
+                        .config(tableConfig)
                         .defaultCatalog(
                                 settings.getBuiltInCatalogName(),
                                 new GenericInMemoryCatalog(
@@ -116,8 +120,6 @@ public final class StreamTableEnvironmentImpl extends 
AbstractStreamTableEnviron
         final FunctionCatalog functionCatalog =
                 new FunctionCatalog(tableConfig, catalogManager, 
moduleManager);
 
-        final Executor executor = lookupExecutor(classLoader, 
executionEnvironment);
-
         final Planner planner =
                 PlannerFactoryUtil.createPlanner(
                         executor, tableConfig, moduleManager, catalogManager, 
functionCatalog);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index d8a2464..258fcb8 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -21,11 +21,14 @@ package org.apache.flink.table.api;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.functions.UserDefinedFunction;
 
 import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
 import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_CATALOG_NAME;
+import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DATABASE_NAME;
 
 /**
  * Defines all parameters that initialize a table environment. Those 
parameters are used only during
@@ -53,32 +56,14 @@ public class EnvironmentSettings {
     private static final EnvironmentSettings DEFAULT_BATCH_MODE_SETTINGS =
             EnvironmentSettings.newInstance().inBatchMode().build();
 
-    public static final String DEFAULT_BUILTIN_CATALOG = "default_catalog";
-    public static final String DEFAULT_BUILTIN_DATABASE = "default_database";
-
-    /**
-     * Specifies the name of the initial catalog to be created when 
instantiating {@link
-     * TableEnvironment}.
-     */
-    private final String builtInCatalogName;
-
-    /**
-     * Specifies the name of the default database in the initial catalog to be 
created when
-     * instantiating {@link TableEnvironment}.
-     */
-    private final String builtInDatabaseName;
-
     /**
-     * Determines if the table environment should work in a batch ({@code 
false}) or streaming
-     * ({@code true}) mode.
+     * Holds all the configuration generated by the builder, together with any 
required additional
+     * configuration.
      */
-    private final boolean isStreamingMode;
+    private final Configuration configuration;
 
-    private EnvironmentSettings(
-            String builtInCatalogName, String builtInDatabaseName, boolean 
isStreamingMode) {
-        this.builtInCatalogName = builtInCatalogName;
-        this.builtInDatabaseName = builtInDatabaseName;
-        this.isStreamingMode = isStreamingMode;
+    private EnvironmentSettings(Configuration configuration) {
+        this.configuration = configuration;
     }
 
     /**
@@ -111,32 +96,28 @@ public class EnvironmentSettings {
         return new Builder();
     }
 
-    /** Creates an instance of {@link EnvironmentSettings} from configuration. 
*/
+    /**
+     * Creates an instance of {@link EnvironmentSettings} from configuration.
+     *
+     * @deprecated use {@link Builder#withConfiguration(Configuration)} 
instead.
+     */
+    @Deprecated
     public static EnvironmentSettings fromConfiguration(ReadableConfig 
configuration) {
-        final Builder builder = new Builder();
-        switch (configuration.get(RUNTIME_MODE)) {
-            case STREAMING:
-                builder.inStreamingMode();
-                break;
-            case BATCH:
-                builder.inBatchMode();
-                break;
-            case AUTOMATIC:
-            default:
-                throw new TableException(
-                        String.format(
-                                "Unsupported mode '%s' for '%s'. "
-                                        + "Only an explicit BATCH or STREAMING 
mode is supported in Table API.",
-                                configuration.get(RUNTIME_MODE), 
RUNTIME_MODE.key()));
-        }
-
-        return builder.build();
+        return new EnvironmentSettings((Configuration) configuration);
     }
 
-    /** Convert the environment setting to the {@link Configuration}. */
+    /**
+     * Convert the environment setting to the {@link Configuration}.
+     *
+     * @deprecated use {@link #getConfiguration} instead.
+     */
+    @Deprecated
     public Configuration toConfiguration() {
-        Configuration configuration = new Configuration();
-        configuration.set(RUNTIME_MODE, isStreamingMode() ? STREAMING : BATCH);
+        return configuration;
+    }
+
+    /** Get the underlying {@link Configuration}. */
+    public Configuration getConfiguration() {
         return configuration;
     }
 
@@ -145,7 +126,7 @@ public class EnvironmentSettings {
      * TableEnvironment}.
      */
     public String getBuiltInCatalogName() {
-        return builtInCatalogName;
+        return configuration.get(TABLE_CATALOG_NAME);
     }
 
     /**
@@ -153,31 +134,31 @@ public class EnvironmentSettings {
      * instantiating a {@link TableEnvironment}.
      */
     public String getBuiltInDatabaseName() {
-        return builtInDatabaseName;
+        return configuration.get(TABLE_DATABASE_NAME);
     }
 
     /** Tells if the {@link TableEnvironment} should work in a batch or 
streaming mode. */
     public boolean isStreamingMode() {
-        return isStreamingMode;
+        return configuration.get(RUNTIME_MODE) == STREAMING;
     }
 
     /** A builder for {@link EnvironmentSettings}. */
     @PublicEvolving
     public static class Builder {
 
-        private String builtInCatalogName = DEFAULT_BUILTIN_CATALOG;
-        private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE;
-        private boolean isStreamingMode = true;
+        private final Configuration configuration = new Configuration();
+
+        public Builder() {}
 
         /** Sets that the components should work in a batch mode. Streaming 
mode by default. */
         public Builder inBatchMode() {
-            this.isStreamingMode = false;
+            configuration.set(RUNTIME_MODE, BATCH);
             return this;
         }
 
         /** Sets that the components should work in a streaming mode. Enabled 
by default. */
         public Builder inStreamingMode() {
-            this.isStreamingMode = true;
+            configuration.set(RUNTIME_MODE, STREAMING);
             return this;
         }
 
@@ -193,10 +174,10 @@ public class EnvironmentSettings {
          * <p>It will also be the initial value for the current catalog which 
can be altered via
          * {@link TableEnvironment#useCatalog(String)}.
          *
-         * <p>Default: "default_catalog".
+         * <p>Default: {@link TableConfigOptions#TABLE_DATABASE_NAME}{@code 
.defaultValue()}.
          */
         public Builder withBuiltInCatalogName(String builtInCatalogName) {
-            this.builtInCatalogName = builtInCatalogName;
+            configuration.set(TABLE_CATALOG_NAME, builtInCatalogName);
             return this;
         }
 
@@ -212,17 +193,22 @@ public class EnvironmentSettings {
          * <p>It will also be the initial value for the current database which 
can be altered via
          * {@link TableEnvironment#useDatabase(String)}.
          *
-         * <p>Default: "default_database".
+         * <p>Default: {@link TableConfigOptions#TABLE_DATABASE_NAME}{@code 
.defaultValue()}.
          */
         public Builder withBuiltInDatabaseName(String builtInDatabaseName) {
-            this.builtInDatabaseName = builtInDatabaseName;
+            configuration.set(TABLE_DATABASE_NAME, builtInDatabaseName);
+            return this;
+        }
+
+        /** Add extra configuration to {@link EnvironmentSettings}. */
+        public Builder withConfiguration(Configuration configuration) {
+            this.configuration.addAll(configuration);
             return this;
         }
 
         /** Returns an immutable instance of {@link EnvironmentSettings}. */
         public EnvironmentSettings build() {
-            return new EnvironmentSettings(
-                    builtInCatalogName, builtInDatabaseName, isStreamingMode);
+            return new EnvironmentSettings(configuration);
         }
     }
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index 2565bb4..141ad78 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -19,11 +19,13 @@
 package org.apache.flink.table.api;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.configuration.WritableConfig;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.api.config.OptimizerConfigOptions;
@@ -35,6 +37,7 @@ import java.time.Duration;
 import java.time.ZoneId;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 import static java.time.ZoneId.SHORT_IDS;
 
@@ -68,7 +71,15 @@ import static java.time.ZoneId.SHORT_IDS;
  * @see OptimizerConfigOptions
  */
 @PublicEvolving
-public class TableConfig implements WritableConfig {
+public final class TableConfig implements WritableConfig, ReadableConfig {
+    //
+    // TableConfig is also a ReadableConfig which is built once the 
TableEnvironment is created and
+    // contains both the configuration defined in the environment 
(flink-conf.yaml + CLI params),
+    // stored in rootConfiguration, but also any extra configuration defined 
by the user in the
+    // application, which has precedence over the environment configuration. 
This way, any consumer
+    // of TableConfig cat get the complete view of the configuration 
(environment + user defined) by
+    // calling the get() and getOptional() methods.
+
     /** Defines if all fields need to be checked for NULL first. */
     private Boolean nullCheck = true;
 
@@ -84,6 +95,8 @@ public class TableConfig implements WritableConfig {
     /** A configuration object to hold all key/value configuration. */
     private final Configuration configuration = new Configuration();
 
+    private ReadableConfig rootConfiguration = new Configuration();
+
     /**
      * Sets a value for the given {@link ConfigOption}.
      *
@@ -121,12 +134,37 @@ public class TableConfig implements WritableConfig {
         return this;
     }
 
+    @Override
+    public <T> T get(ConfigOption<T> option) {
+        return configuration.getOptional(option).orElseGet(() -> 
rootConfiguration.get(option));
+    }
+
+    @Override
+    public <T> Optional<T> getOptional(ConfigOption<T> option) {
+        final Optional<T> tableValue = configuration.getOptional(option);
+        if (tableValue.isPresent()) {
+            return tableValue;
+        }
+        return rootConfiguration.getOptional(option);
+    }
+
     /** Gives direct access to the underlying key-value map for advanced 
configuration. */
     public Configuration getConfiguration() {
         return configuration;
     }
 
     /**
+     * Sets the given key-value configuration as {@link #rootConfiguration}, 
which contains any
+     * configuration set in the environment ({@code flink-conf.yaml} + {@code 
CLI} parameters.
+     *
+     * @param rootConfiguration key-value root configuration to be set
+     */
+    @Internal
+    public void setRootConfiguration(ReadableConfig rootConfiguration) {
+        this.rootConfiguration = rootConfiguration;
+    }
+
+    /**
      * Adds the given key-value configuration to the underlying configuration. 
It overwrites
      * existing keys.
      *
@@ -139,8 +177,7 @@ public class TableConfig implements WritableConfig {
 
     /** Returns the current SQL dialect. */
     public SqlDialect getSqlDialect() {
-        return SqlDialect.valueOf(
-                
getConfiguration().getString(TableConfigOptions.TABLE_SQL_DIALECT).toUpperCase());
+        return 
SqlDialect.valueOf(get(TableConfigOptions.TABLE_SQL_DIALECT).toUpperCase());
     }
 
     /** Sets the current SQL dialect to parse a SQL query. Flink's SQL 
behavior by default. */
@@ -320,9 +357,9 @@ public class TableConfig implements WritableConfig {
                 && !(maxTime.toMilliseconds() == 0 && minTime.toMilliseconds() 
== 0)) {
             throw new IllegalArgumentException(
                     "Difference between minTime: "
-                            + minTime.toString()
+                            + minTime
                             + " and maxTime: "
-                            + maxTime.toString()
+                            + maxTime
                             + " should be at least 5 minutes.");
         }
         setIdleStateRetention(Duration.ofMillis(minTime.toMilliseconds()));
@@ -398,8 +435,7 @@ public class TableConfig implements WritableConfig {
     @Experimental
     public void addJobParameter(String key, String value) {
         Map<String, String> params =
-                getConfiguration()
-                        .getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS)
+                getOptional(PipelineOptions.GLOBAL_JOB_PARAMETERS)
                         .map(HashMap::new)
                         .orElseGet(HashMap::new);
         params.put(key, value);
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
index 8601f38..d4afa20 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
@@ -41,6 +41,24 @@ public class TableConfigOptions {
     private TableConfigOptions() {}
 
     @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<String> TABLE_CATALOG_NAME =
+            key("table.builtin-catalog-name")
+                    .stringType()
+                    .defaultValue("default_catalog")
+                    .withDescription(
+                            "The name of the initial catalog to be created 
when "
+                                    + "instantiating a TableEnvironment.");
+
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<String> TABLE_DATABASE_NAME =
+            key("table.builtin-database-name")
+                    .stringType()
+                    .defaultValue("default_database")
+                    .withDescription(
+                            "The name of the default database in the initial 
catalog to be created "
+                                    + "when instantiating TableEnvironment.");
+
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
     public static final ConfigOption<Boolean> TABLE_DML_SYNC =
             key("table.dml-sync")
                     .booleanType()
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 8d6a5a9..a692e55 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -265,21 +265,22 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
     }
 
     public static TableEnvironmentImpl create(Configuration configuration) {
-        return create(EnvironmentSettings.fromConfiguration(configuration), 
configuration);
+        return 
create(EnvironmentSettings.newInstance().withConfiguration(configuration).build());
     }
 
     public static TableEnvironmentImpl create(EnvironmentSettings settings) {
-        return create(settings, settings.toConfiguration());
-    }
-
-    private static TableEnvironmentImpl create(
-            EnvironmentSettings settings, Configuration configuration) {
         // temporary solution until FLINK-15635 is fixed
         final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
 
+        final ExecutorFactory executorFactory =
+                FactoryUtil.discoverFactory(
+                        classLoader, ExecutorFactory.class, 
ExecutorFactory.DEFAULT_IDENTIFIER);
+        final Executor executor = 
executorFactory.create(settings.getConfiguration());
+
         // use configuration to init table config
         final TableConfig tableConfig = new TableConfig();
-        tableConfig.addConfiguration(configuration);
+        tableConfig.setRootConfiguration(executor.getConfiguration());
+        tableConfig.addConfiguration(settings.getConfiguration());
 
         final ModuleManager moduleManager = new ModuleManager();
 
@@ -297,11 +298,6 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         final FunctionCatalog functionCatalog =
                 new FunctionCatalog(tableConfig, catalogManager, 
moduleManager);
 
-        final ExecutorFactory executorFactory =
-                FactoryUtil.discoverFactory(
-                        classLoader, ExecutorFactory.class, 
ExecutorFactory.DEFAULT_IDENTIFIER);
-        final Executor executor = executorFactory.create(configuration);
-
         final Planner planner =
                 PlannerFactoryUtil.createPlanner(
                         executor, tableConfig, moduleManager, catalogManager, 
functionCatalog);
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/EnvironmentSettingsTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/EnvironmentSettingsTest.java
index bc24286..ad4630b 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/EnvironmentSettingsTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/api/EnvironmentSettingsTest.java
@@ -34,15 +34,16 @@ public class EnvironmentSettingsTest {
     public void testFromConfiguration() {
         Configuration configuration = new Configuration();
         configuration.setString("execution.runtime-mode", "batch");
-        EnvironmentSettings settings = 
EnvironmentSettings.fromConfiguration(configuration);
+        EnvironmentSettings settings =
+                
EnvironmentSettings.newInstance().withConfiguration(configuration).build();
 
         assertFalse("Expect batch mode.", settings.isStreamingMode());
     }
 
     @Test
-    public void testToConfiguration() {
+    public void testGetConfiguration() {
         EnvironmentSettings settings = new 
EnvironmentSettings.Builder().inBatchMode().build();
-        Configuration configuration = settings.toConfiguration();
+        Configuration configuration = settings.getConfiguration();
 
         assertEquals(RuntimeExecutionMode.BATCH, 
configuration.get(RUNTIME_MODE));
     }
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
index c94dee9..b30f806 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.catalog;
 
-import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionDefinition;
@@ -85,7 +85,7 @@ public class FunctionCatalogTest {
 
         functionCatalog =
                 new FunctionCatalog(
-                        TableConfig.getDefault(),
+                        new Configuration(),
                         CatalogManagerMocks.preparedCatalogManager()
                                 .defaultCatalog(DEFAULT_CATALOG, catalog)
                                 .build(),
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java
index 94bc182..c3fb2e4 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/CatalogManagerMocks.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.utils;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
@@ -30,9 +30,11 @@ import javax.annotation.Nullable;
 /** Mock implementations of {@link CatalogManager} for testing purposes. */
 public final class CatalogManagerMocks {
 
-    public static final String DEFAULT_CATALOG = 
EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
+    public static final String DEFAULT_CATALOG =
+            TableConfigOptions.TABLE_CATALOG_NAME.defaultValue();
 
-    public static final String DEFAULT_DATABASE = 
EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
+    public static final String DEFAULT_DATABASE =
+            TableConfigOptions.TABLE_DATABASE_NAME.defaultValue();
 
     public static CatalogManager createEmptyCatalogManager() {
         return createCatalogManager(null);
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
index cb09387..f917132 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.utils;
 
+import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
@@ -69,7 +70,7 @@ public class TableEnvironmentMock extends 
TableEnvironmentImpl {
     }
 
     private static TableEnvironmentMock getInstance(boolean isStreamingMode) {
-        final TableConfig tableConfig = createTableConfig();
+        final TableConfig tableConfig = TableConfig.getDefault();
         final CatalogManager catalogManager = 
CatalogManagerMocks.createEmptyCatalogManager();
         final ModuleManager moduleManager = new ModuleManager();
         return new TableEnvironmentMock(
@@ -77,22 +78,19 @@ public class TableEnvironmentMock extends 
TableEnvironmentImpl {
                 moduleManager,
                 tableConfig,
                 createExecutor(),
-                createFunctionCatalog(tableConfig, catalogManager, 
moduleManager),
+                createFunctionCatalog(
+                        tableConfig.getConfiguration(), catalogManager, 
moduleManager),
                 createPlanner(),
                 isStreamingMode);
     }
 
-    private static TableConfig createTableConfig() {
-        return TableConfig.getDefault();
-    }
-
     private static ExecutorMock createExecutor() {
         return new ExecutorMock();
     }
 
     private static FunctionCatalog createFunctionCatalog(
-            TableConfig tableConfig, CatalogManager catalogManager, 
ModuleManager moduleManager) {
-        return new FunctionCatalog(tableConfig, catalogManager, moduleManager);
+            ReadableConfig config, CatalogManager catalogManager, 
ModuleManager moduleManager) {
+        return new FunctionCatalog(config, catalogManager, moduleManager);
     }
 
     private static PlannerMock createPlanner() {
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
index e8463ee..345e2f9 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/StreamTableEnvironment.scala
@@ -18,7 +18,6 @@
 package org.apache.flink.table.api.bridge.scala
 
 import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
@@ -869,9 +868,7 @@ object StreamTableEnvironment {
     *                             [[TableEnvironment]].
     */
   def create(executionEnvironment: StreamExecutionEnvironment): 
StreamTableEnvironment = {
-    create(
-      executionEnvironment,
-      
EnvironmentSettings.fromConfiguration(executionEnvironment.getConfiguration))
+    create(executionEnvironment, EnvironmentSettings.newInstance().build)
   }
 
   /**
@@ -899,9 +896,6 @@ object StreamTableEnvironment {
       executionEnvironment: StreamExecutionEnvironment,
       settings: EnvironmentSettings)
     : StreamTableEnvironment = {
-    val config = new TableConfig()
-    config.addConfiguration(settings.toConfiguration)
-    StreamTableEnvironmentImpl
-      .create(executionEnvironment, settings, config)
+    StreamTableEnvironmentImpl.create(executionEnvironment, settings)
   }
 }
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index a968e49..cbc4f38 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.api.bridge.scala.internal
 
-import org.apache.flink.annotation.Internal
+import org.apache.flink.annotation.{Internal, VisibleForTesting}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.TimeCharacteristic
@@ -40,6 +40,7 @@ import org.apache.flink.types.Row
 import org.apache.flink.util.Preconditions
 
 import java.util.Optional
+
 import scala.collection.JavaConverters._
 
 /**
@@ -294,13 +295,19 @@ object StreamTableEnvironmentImpl {
 
   def create(
       executionEnvironment: StreamExecutionEnvironment,
-      settings: EnvironmentSettings,
-      tableConfig: TableConfig)
+      settings: EnvironmentSettings)
     : StreamTableEnvironmentImpl = {
 
     // temporary solution until FLINK-15635 is fixed
     val classLoader = Thread.currentThread.getContextClassLoader
 
+    val executor = AbstractStreamTableEnvironmentImpl.lookupExecutor(
+      classLoader, executionEnvironment.getWrappedStreamExecutionEnvironment)
+
+    val tableConfig = TableConfig.getDefault
+    tableConfig.setRootConfiguration(executor.getConfiguration)
+    tableConfig.addConfiguration(settings.getConfiguration)
+
     val moduleManager = new ModuleManager
 
     val catalogManager = CatalogManager.newBuilder
@@ -316,9 +323,6 @@ object StreamTableEnvironmentImpl {
 
     val functionCatalog = new FunctionCatalog(tableConfig, catalogManager, 
moduleManager)
 
-    val executor = AbstractStreamTableEnvironmentImpl.lookupExecutor(
-      classLoader, executionEnvironment.getWrappedStreamExecutionEnvironment)
-
     val planner = PlannerFactoryUtil.createPlanner(
       executor, tableConfig, moduleManager, catalogManager, functionCatalog)
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultPlannerFactory.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultPlannerFactory.java
index 4476a7a..b8659ba 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultPlannerFactory.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultPlannerFactory.java
@@ -29,6 +29,8 @@ import org.apache.flink.table.delegation.PlannerFactory;
 import java.util.Collections;
 import java.util.Set;
 
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+
 /** Factory for the default {@link Planner}. */
 @Internal
 public final class DefaultPlannerFactory implements PlannerFactory {
@@ -51,7 +53,7 @@ public final class DefaultPlannerFactory implements 
PlannerFactory {
     @Override
     public Planner create(Context context) {
         final RuntimeExecutionMode runtimeExecutionMode =
-                
context.getTableConfig().getConfiguration().get(ExecutionOptions.RUNTIME_MODE);
+                context.getTableConfig().get(ExecutionOptions.RUNTIME_MODE);
         switch (runtimeExecutionMode) {
             case STREAMING:
                 return new StreamPlanner(
@@ -70,8 +72,9 @@ public final class DefaultPlannerFactory implements 
PlannerFactory {
             default:
                 throw new TableException(
                         String.format(
-                                "Unknown runtime mode '%s'. This is a bug. 
Please consider filing an issue.",
-                                runtimeExecutionMode));
+                                "Unsupported mode '%s' for '%s'. Only an 
explicit BATCH or "
+                                        + "STREAMING mode is supported in 
Table API.",
+                                runtimeExecutionMode, RUNTIME_MODE.key()));
         }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java
index ff34953..eaa2582 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/api/EnvironmentTest.java
@@ -24,12 +24,15 @@ import org.apache.flink.configuration.PipelineOptions;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.types.Row;
 
 import org.junit.Test;
 
 import java.time.Duration;
+import java.util.concurrent.ExecutionException;
 
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 
 /** Tests for {@link TableEnvironment} that require a planner. */
@@ -61,4 +64,29 @@ public class EnvironmentTest {
         assertEquals(800, env.getConfig().getAutoWatermarkInterval());
         assertEquals(30000, env.getCheckpointConfig().getCheckpointInterval());
     }
+
+    @Test
+    public void testEnvironmentSettings() throws ExecutionException, 
InterruptedException {
+        Configuration conf = new Configuration();
+        conf.set(TableConfigOptions.TABLE_CATALOG_NAME, "myCatalog");
+        EnvironmentSettings settings =
+                
EnvironmentSettings.newInstance().withConfiguration(conf).build();
+
+        TableEnvironment tEnv = TableEnvironment.create(settings);
+        assertThat(tEnv.getConfig().get(TableConfigOptions.TABLE_CATALOG_NAME))
+                .isEqualTo("myCatalog");
+        assertThat(tEnv.getCurrentCatalog()).isEqualTo("myCatalog");
+
+        StreamTableEnvironment stEnv =
+                StreamTableEnvironment.create(
+                        StreamExecutionEnvironment.getExecutionEnvironment(), 
settings);
+        
assertThat(stEnv.getConfig().get(TableConfigOptions.TABLE_CATALOG_NAME))
+                .isEqualTo("myCatalog");
+
+        stEnv.getConfig()
+                .set(
+                        TableConfigOptions.TABLE_CATALOG_NAME,
+                        TableConfigOptions.TABLE_CATALOG_NAME.defaultValue());
+        assertThat(stEnv.getCurrentCatalog()).isEqualTo("myCatalog");
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
index 31ac8b5..b0667b1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java
@@ -110,7 +110,7 @@ class TransformationsTest {
     @Test
     public void testLegacyUid() {
         final TableEnvironment env =
-                
TableEnvironment.create(EnvironmentSettings.inStreamingMode().toConfiguration());
+                
TableEnvironment.create(EnvironmentSettings.inStreamingMode().getConfiguration());
         
env.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS,
 true);
 
         env.createTemporaryTable(
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
index 57a712f..d6857b5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSinkSpecSerdeTest.java
@@ -44,6 +44,7 @@ import org.apache.flink.table.planner.utils.PlannerMocks;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.utils.CatalogManagerMocks;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.parallel.Execution;
@@ -57,8 +58,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
-import static 
org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
 import static 
org.apache.flink.table.api.config.TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS;
 import static 
org.apache.flink.table.api.config.TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
@@ -99,8 +98,8 @@ class DynamicTableSinkSpecSerdeTest {
                 new DynamicTableSinkSpec(
                         ContextResolvedTable.temporary(
                                 ObjectIdentifier.of(
-                                        DEFAULT_BUILTIN_CATALOG,
-                                        DEFAULT_BUILTIN_DATABASE,
+                                        CatalogManagerMocks.DEFAULT_CATALOG,
+                                        CatalogManagerMocks.DEFAULT_DATABASE,
                                         "MyTable"),
                                 new ResolvedCatalogTable(catalogTable1, 
resolvedSchema1)),
                         null);
@@ -129,8 +128,8 @@ class DynamicTableSinkSpecSerdeTest {
                 new DynamicTableSinkSpec(
                         ContextResolvedTable.temporary(
                                 ObjectIdentifier.of(
-                                        DEFAULT_BUILTIN_CATALOG,
-                                        DEFAULT_BUILTIN_DATABASE,
+                                        CatalogManagerMocks.DEFAULT_CATALOG,
+                                        CatalogManagerMocks.DEFAULT_DATABASE,
                                         "MyTable"),
                                 new ResolvedCatalogTable(catalogTable2, 
resolvedSchema2)),
                         Arrays.asList(
@@ -165,8 +164,8 @@ class DynamicTableSinkSpecSerdeTest {
                 new DynamicTableSinkSpec(
                         ContextResolvedTable.temporary(
                                 ObjectIdentifier.of(
-                                        DEFAULT_BUILTIN_CATALOG,
-                                        DEFAULT_BUILTIN_DATABASE,
+                                        CatalogManagerMocks.DEFAULT_CATALOG,
+                                        CatalogManagerMocks.DEFAULT_DATABASE,
                                         "MyTable"),
                                 new ResolvedCatalogTable(catalogTable3, 
resolvedSchema3)),
                         Collections.singletonList(
@@ -214,7 +213,10 @@ class DynamicTableSinkSpecSerdeTest {
     void testDynamicTableSinkSpecSerdeWithEnrichmentOptions() throws Exception 
{
         // Test model
         ObjectIdentifier identifier =
-                ObjectIdentifier.of(DEFAULT_BUILTIN_CATALOG, 
DEFAULT_BUILTIN_DATABASE, "my_table");
+                ObjectIdentifier.of(
+                        CatalogManagerMocks.DEFAULT_CATALOG,
+                        CatalogManagerMocks.DEFAULT_DATABASE,
+                        "my_table");
 
         String formatPrefix = FactoryUtil.getFormatPrefix(FORMAT, 
TestFormatFactory.IDENTIFIER);
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
index a15acb7..3764f6b 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/DynamicTableSourceSpecSerdeTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.connector.file.table.FileSystemTableFactory;
 import org.apache.flink.formats.testcsv.TestCsvFormatFactory;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import 
org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanCompilation;
 import org.apache.flink.table.api.config.TableConfigOptions.CatalogPlanRestore;
 import org.apache.flink.table.catalog.CatalogManager;
@@ -72,8 +73,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Stream;
 
-import static 
org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_CATALOG;
-import static 
org.apache.flink.table.api.EnvironmentSettings.DEFAULT_BUILTIN_DATABASE;
 import static 
org.apache.flink.table.api.config.TableConfigOptions.PLAN_COMPILE_CATALOG_OBJECTS;
 import static 
org.apache.flink.table.api.config.TableConfigOptions.PLAN_RESTORE_CATALOG_OBJECTS;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
@@ -114,8 +113,8 @@ public class DynamicTableSourceSpecSerdeTest {
                 new DynamicTableSourceSpec(
                         ContextResolvedTable.temporary(
                                 ObjectIdentifier.of(
-                                        DEFAULT_BUILTIN_CATALOG,
-                                        DEFAULT_BUILTIN_DATABASE,
+                                        
TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(),
+                                        
TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(),
                                         "MyTable"),
                                 new ResolvedCatalogTable(catalogTable1, 
resolvedSchema1)),
                         null);
@@ -154,8 +153,8 @@ public class DynamicTableSourceSpecSerdeTest {
                 new DynamicTableSourceSpec(
                         ContextResolvedTable.temporary(
                                 ObjectIdentifier.of(
-                                        DEFAULT_BUILTIN_CATALOG,
-                                        DEFAULT_BUILTIN_DATABASE,
+                                        
TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(),
+                                        
TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(),
                                         "MyTable"),
                                 new ResolvedCatalogTable(catalogTable2, 
resolvedSchema2)),
                         Arrays.asList(
@@ -274,7 +273,10 @@ public class DynamicTableSourceSpecSerdeTest {
     void testDynamicTableSourceSpecSerdeWithEnrichmentOptions() throws 
Exception {
         // Test model
         ObjectIdentifier identifier =
-                ObjectIdentifier.of(DEFAULT_BUILTIN_CATALOG, 
DEFAULT_BUILTIN_DATABASE, "my_table");
+                ObjectIdentifier.of(
+                        TableConfigOptions.TABLE_CATALOG_NAME.defaultValue(),
+                        TableConfigOptions.TABLE_DATABASE_NAME.defaultValue(),
+                        "my_table");
 
         String formatPrefix = FactoryUtil.getFormatPrefix(FORMAT, 
TestFormatFactory.IDENTIFIER);
 
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
index 07dff77..52fc78b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/utils/ExpressionTestBase.scala
@@ -22,11 +22,11 @@ import org.apache.flink.api.common.TaskInfo
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext
 import org.apache.flink.api.common.functions.{MapFunction, RichFunction, 
RichMapFunction}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.configuration.Configuration
+import org.apache.flink.configuration.{ConfigOption, Configuration}
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api
 import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
-import org.apache.flink.table.api.config.ExecutionConfigOptions
+import org.apache.flink.table.api.config.{ExecutionConfigOptions, 
TableConfigOptions}
 import org.apache.flink.table.api.{EnvironmentSettings, TableConfig, 
TableException, ValidationException}
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.data.binary.BinaryRowData
@@ -38,6 +38,7 @@ import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, FunctionCodeGenerator}
 import org.apache.flink.table.planner.delegation.PlannerBase
+import org.apache.flink.table.planner.utils.TestingTableEnvironment
 import org.apache.flink.table.runtime.generated.GeneratedFunction
 import 
org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.types.AbstractDataType
@@ -51,10 +52,12 @@ import org.apache.calcite.rel.logical.LogicalCalc
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
+
 import org.junit.Assert.{assertEquals, assertTrue, fail}
 import org.junit.rules.ExpectedException
 import org.junit.{After, Before, Rule}
 
+import java.time.ZoneId
 import java.util.Collections
 
 import scala.collection.JavaConverters._
@@ -62,8 +65,6 @@ import scala.collection.mutable
 
 abstract class ExpressionTestBase {
 
-  val config = new TableConfig()
-
   // (originalExpr, optimizedExpr, expectedResult)
   private val validExprs = mutable.ArrayBuffer[(String, RexNode, String)]()
   // (originalSqlExpr, keywords, exceptionClass)
@@ -73,11 +74,14 @@ abstract class ExpressionTestBase {
     .ArrayBuffer[(Expression, String, Class[_ <: Throwable])]()
 
   private val env = StreamExecutionEnvironment.createLocalEnvironment(4)
-  private val setting = 
EnvironmentSettings.newInstance().inStreamingMode().build()
+  private val settings = 
EnvironmentSettings.newInstance().inStreamingMode().build()
   // use impl class instead of interface class to avoid
   // "Static methods in interface require -target:jvm-1.8"
-  private val tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
+  private val tEnv = StreamTableEnvironmentImpl.create(env, settings)
     .asInstanceOf[StreamTableEnvironmentImpl]
+
+  val config = tEnv.getConfig
+
   private val resolvedDataType = if (containsLegacyTypes) {
     TypeConversions.fromLegacyInfoToDataType(typeInfo)
   } else {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
index 39b73ef..8a81fc6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/GroupAggregateHarnessTest.scala
@@ -56,8 +56,7 @@ class GroupAggregateHarnessTest(mode: StateBackendMode, 
miniBatch: MiniBatchMode
   override def before(): Unit = {
     super.before()
     val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
-    val config = new TestTableConfig
-    this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
+    this.tEnv = StreamTableEnvironmentImpl.create(env, setting)
     // set mini batch
     val tableConfig = tEnv.getConfig
     miniBatch match {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
index 6aff43d..f634b15 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/HarnessTestBase.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.table.planner.runtime.harness
 
-import org.apache.flink.api.common.time.Time
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
 import org.apache.flink.api.java.functions.KeySelector
@@ -30,14 +29,13 @@ import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.streaming.api.transformations.{OneInputTransformation, 
PartitionTransformation}
 import org.apache.flink.streaming.api.watermark.Watermark
 import 
org.apache.flink.streaming.util.{KeyedOneInputStreamOperatorTestHarness, 
OneInputStreamOperatorTestHarness}
-import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.data.RowData
 import org.apache.flink.table.planner.JLong
 import org.apache.flink.table.planner.runtime.utils.StreamingTestBase
 import 
org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase.{HEAP_BACKEND,
 ROCKSDB_BACKEND, StateBackendMode}
+
 import org.junit.runners.Parameterized
 
-import java.time.Duration
 import java.util
 
 import scala.collection.JavaConversions._
@@ -120,23 +118,6 @@ class HarnessTestBase(mode: StateBackendMode) extends 
StreamingTestBase {
   def dropWatermarks(elements: Array[AnyRef]): util.Collection[AnyRef] = {
     elements.filter(e => !e.isInstanceOf[Watermark]).toList
   }
-
-  class TestTableConfig extends TableConfig {
-
-    private var minIdleStateRetentionTime = 0L
-
-    private var maxIdleStateRetentionTime = 0L
-
-    override def getMinIdleStateRetentionTime: Long = minIdleStateRetentionTime
-
-    override def getMaxIdleStateRetentionTime: Long = maxIdleStateRetentionTime
-
-    override def setIdleStateRetentionTime(minTime: Time, maxTime: Time): Unit 
= {
-      super.setIdleStateRetention(Duration.ofMillis(minTime.toMilliseconds))
-      minIdleStateRetentionTime = minTime.toMilliseconds
-      maxIdleStateRetentionTime = maxTime.toMilliseconds
-    }
-  }
 }
 
 object HarnessTestBase {
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
index 7c1f566..e2d28043 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/OverAggregateHarnessTest.scala
@@ -38,6 +38,7 @@ import org.junit.runners.Parameterized
 import org.junit.{Before, Test}
 
 import java.lang.{Long => JLong}
+import java.time.Duration
 import java.util.concurrent.ConcurrentLinkedQueue
 
 import scala.collection.mutable
@@ -49,8 +50,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
   override def before(): Unit = {
     super.before()
     val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
-    val config = new TestTableConfig
-    this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
+    this.tEnv = StreamTableEnvironmentImpl.create(env, setting)
   }
 
   @Test
@@ -127,9 +127,9 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
     expectedOutput.add(new StreamRecord(
       row(2L: JLong, "aaa", 8L: JLong, null, 7L: JLong, 8L: JLong)))
     expectedOutput.add(new StreamRecord(
-      row(2L: JLong, "aaa", 9L: JLong, null, 8L: JLong, 9L: JLong)))
+      row(2L: JLong, "aaa", 10L: JLong, null, 10L: JLong, 10L: JLong)))
     expectedOutput.add(new StreamRecord(
-      row(2L: JLong, "aaa", 10L: JLong, null, 9L: JLong, 10L: JLong)))
+      row(2L: JLong, "aaa", 9L: JLong, null, 8L: JLong, 9L: JLong)))
     expectedOutput.add(new StreamRecord(
       row(2L: JLong, "bbb", 40L: JLong, null, 40L: JLong, 40L: JLong)))
 
@@ -155,7 +155,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
       """.stripMargin
     val t1 = tEnv.sqlQuery(sql)
 
-    tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(4))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
     val testHarness = createHarnessTester(t1.toAppendStream[Row], 
"OverAggregate")
     val outputType = Array(
       DataTypes.BIGINT().getLogicalType,
@@ -306,7 +306,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
       """.stripMargin
     val t1 = tEnv.sqlQuery(sql)
 
-    tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(2), Time.seconds(4))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(2))
     val testHarness = createHarnessTester(t1.toAppendStream[Row], 
"OverAggregate")
     val assertor = new RowDataHarnessAssertor(
       Array(
@@ -533,7 +533,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
       """.stripMargin
     val t1 = tEnv.sqlQuery(sql)
 
-    tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
     val testHarness = createHarnessTester(t1.toAppendStream[Row], 
"OverAggregate")
     val assertor = new RowDataHarnessAssertor(
       Array(
@@ -685,7 +685,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
       """.stripMargin
     val t1 = tEnv.sqlQuery(sql)
 
-    tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
     val testHarness = createHarnessTester(t1.toAppendStream[Row], 
"OverAggregate")
     val assertor = new RowDataHarnessAssertor(
       Array(
@@ -827,7 +827,7 @@ class OverAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(m
       """.stripMargin
     val t1 = tEnv.sqlQuery(sql)
 
-    tEnv.getConfig.setIdleStateRetentionTime(Time.seconds(1), Time.seconds(2))
+    tEnv.getConfig.setIdleStateRetention(Duration.ofSeconds(1))
     val testHarness = createHarnessTester(t1.toAppendStream[Row], 
"OverAggregate")
     val assertor = new RowDataHarnessAssertor(
       Array(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
index 3c97370..14724a6 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/RankHarnessTest.scala
@@ -49,8 +49,7 @@ class RankHarnessTest(mode: StateBackendMode) extends 
HarnessTestBase(mode) {
   override def before(): Unit = {
     super.before()
     val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
-    val config = new TestTableConfig
-    this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
+    this.tEnv = StreamTableEnvironmentImpl.create(env, setting)
   }
 
   @Test
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
index d9dcf7f..8a4378d 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/harness/TableAggregateHarnessTest.scala
@@ -49,8 +49,7 @@ class TableAggregateHarnessTest(mode: StateBackendMode) 
extends HarnessTestBase(
   override def before(): Unit = {
     super.before()
     val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
-    val config = new TestTableConfig
-    this.tEnv = StreamTableEnvironmentImpl.create(env, setting, config)
+    this.tEnv = StreamTableEnvironmentImpl.create(env, setting)
   }
 
   val data = new mutable.MutableList[(Int, Int)]
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
index 1282aba..939d2c3 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamTableEnvironmentITCase.scala
@@ -19,11 +19,20 @@
 package org.apache.flink.table.planner.runtime.stream.sql
 
 import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.DataStreamSource
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.flink.table.api._
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
+import 
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
+import org.apache.flink.table.api.bridge.scala
 import org.apache.flink.table.api.bridge.scala._
+import org.apache.flink.table.api.config.TableConfigOptions
 import org.apache.flink.table.planner.runtime.utils.JavaPojos.{Device, Order, 
Person, ProductItem}
 import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, 
StringSink}
+import org.assertj.core.api.Assertions.assertThat
+
 import org.junit.Assert.assertEquals
 import org.junit.Test
 
@@ -137,4 +146,20 @@ class StreamTableEnvironmentITCase extends 
StreamingTestBase {
       "(true,Order{user=1, product='Product{name='beer', id=10}', amount=3})")
     assertEquals(expected.sorted, sink.getResults.sorted)
   }
+
+  @Test
+  def testTableConfigInheritsEnvironmentSettings(): Unit = {
+    val config = new Configuration
+    config.setString(TableConfigOptions.TABLE_CATALOG_NAME, "myCatalog")
+    val env = StreamExecutionEnvironment.getExecutionEnvironment(config)
+    val tEnv = StreamTableEnvironment.create(env)
+    
assertThat(tEnv.getConfig.get(TableConfigOptions.TABLE_CATALOG_NAME)).isEqualTo("myCatalog")
+
+    val scalaEnv = 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+      .getExecutionEnvironment
+    val scalaTEnv = scala.StreamTableEnvironment.create(
+      scalaEnv, 
EnvironmentSettings.newInstance.withConfiguration(config).build)
+    assertThat(scalaTEnv.getConfig.get(TableConfigOptions.TABLE_CATALOG_NAME))
+      .isEqualTo("myCatalog")
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 0d088d6..cb56447 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -1526,11 +1526,18 @@ object TestingTableEnvironment {
       catalogManager: Option[CatalogManager] = None,
       tableConfig: TableConfig): TestingTableEnvironment = {
 
-    tableConfig.addConfiguration(settings.toConfiguration)
-
     // temporary solution until FLINK-15635 is fixed
     val classLoader = Thread.currentThread.getContextClassLoader
 
+    val executorFactory = FactoryUtil.discoverFactory(
+      classLoader, classOf[ExecutorFactory], 
ExecutorFactory.DEFAULT_IDENTIFIER)
+
+    val executor = executorFactory.create(settings.getConfiguration)
+
+    tableConfig.setRootConfiguration(executor.getConfiguration)
+    tableConfig.addConfiguration(settings.getConfiguration)
+
+
     val moduleManager = new ModuleManager
 
     val catalogMgr = catalogManager match {
@@ -1538,7 +1545,7 @@ object TestingTableEnvironment {
       case _ =>
         CatalogManager.newBuilder
           .classLoader(classLoader)
-          .config(tableConfig.getConfiguration)
+          .config(tableConfig)
           .defaultCatalog(
             settings.getBuiltInCatalogName,
             new GenericInMemoryCatalog(
@@ -1547,12 +1554,7 @@ object TestingTableEnvironment {
           .build
     }
 
-    val functionCatalog = new FunctionCatalog(tableConfig, catalogMgr, 
moduleManager)
-
-    val executorFactory = FactoryUtil.discoverFactory(
-      classLoader, classOf[ExecutorFactory], 
ExecutorFactory.DEFAULT_IDENTIFIER)
-
-    val executor = executorFactory.create(tableConfig.getConfiguration)
+    val functionCatalog = new FunctionCatalog(settings.getConfiguration, 
catalogMgr, moduleManager)
 
     val planner = PlannerFactoryUtil.createPlanner(
       executor, tableConfig, moduleManager, catalogMgr, 
functionCatalog).asInstanceOf[PlannerBase]

Reply via email to