godfreyhe commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436651750



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -146,6 +152,10 @@ public CatalogManager build() {
                }
        }
 
+       public void setCatalogTableSchemaResolver(CatalogTableSchemaResolver 
schemaResolver) {

Review comment:
       I also want to pass it in the ctor, but `Parser` is constructed in 
`Planner`, while `Planner` needs `CatalogManager` to construct. we can't get 
`Parser` instance when creating `CatalogManager`. see 
`TableEnvironmentImpl#create`. 
   
   A workaround approach is: using AtomicReference to hold the parser instance, 
code looks like:
   ```
   public static TableEnvironmentImpl create(EnvironmentSettings settings) {
   
                // temporary solution until FLINK-15635 is fixed
                ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
   
                TableConfig tableConfig = new TableConfig();
   
                ModuleManager moduleManager = new ModuleManager();
   
                    // create a parser reference first 
                AtomicReference<Parser> parserRef = new AtomicReference<>();
                CatalogManager catalogManager = CatalogManager.newBuilder()
                        .classLoader(classLoader)
                        .config(tableConfig.getConfiguration())
                        .defaultCatalog(
                                settings.getBuiltInCatalogName(),
                                new GenericInMemoryCatalog(
                                        settings.getBuiltInCatalogName(),
                                        settings.getBuiltInDatabaseName()))
                             // set CatalogTableSchemaResolver Supplier instead 
of CatalogTableSchemaResolver
                        .schemaResolverSupplier(() -> new 
CatalogTableSchemaResolver(parserRef.get(), settings.isStreamingMode()))
                        .build();
   
                FunctionCatalog functionCatalog = new 
FunctionCatalog(tableConfig, catalogManager, moduleManager);
   
                Map<String, String> executorProperties = 
settings.toExecutorProperties();
                Executor executor = 
ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
                        .create(executorProperties);
   
                Map<String, String> plannerProperties = 
settings.toPlannerProperties();
                Planner planner = 
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
                        .create(
                                plannerProperties,
                                executor,
                                tableConfig,
                                functionCatalog,
                                catalogManager);
                   // set parser reference
                parserRef.set(planner.getParser());
   
                return new TableEnvironmentImpl(
                        catalogManager,
                        moduleManager,
                        tableConfig,
                        executor,
                        functionCatalog,
                        planner,
                        settings.isStreamingMode()
                );
        }
   ```
   
   Users are very confused about how to create `StreamTableEnvironmentImpl` (I 
know some users use `StreamTableEnvironmentImpl`'s ctor to create a 
`StreamTableEnvironment` instead of use create method)
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to