godfreyhe commented on a change in pull request #12335:
URL: https://github.com/apache/flink/pull/12335#discussion_r436651750
##########
File path:
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
##########
@@ -146,6 +152,10 @@ public CatalogManager build() {
}
}
+ public void setCatalogTableSchemaResolver(CatalogTableSchemaResolver
schemaResolver) {
Review comment:
I also want to pass it in the ctor, but `Parser` is constructed in
`Planner`, while `Planner` needs `CatalogManager` to construct. we can't get
`Parser` instance when creating `CatalogManager`. see
`TableEnvironmentImpl#create`.
A workaround approach is: using AtomicReference to hold the parser instance,
code looks like:
```
public static TableEnvironmentImpl create(EnvironmentSettings settings) {
// temporary solution until FLINK-15635 is fixed
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
TableConfig tableConfig = new TableConfig();
ModuleManager moduleManager = new ModuleManager();
// create a parser reference first
AtomicReference<Parser> parserRef = new AtomicReference<>();
CatalogManager catalogManager = CatalogManager.newBuilder()
.classLoader(classLoader)
.config(tableConfig.getConfiguration())
.defaultCatalog(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName()))
// set CatalogTableSchemaResolver Supplier instead
of CatalogTableSchemaResolver
.schemaResolverSupplier(() -> new
CatalogTableSchemaResolver(parserRef.get(), settings.isStreamingMode()))
.build();
FunctionCatalog functionCatalog = new
FunctionCatalog(tableConfig, catalogManager, moduleManager);
Map<String, String> executorProperties =
settings.toExecutorProperties();
Executor executor =
ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
.create(executorProperties);
Map<String, String> plannerProperties =
settings.toPlannerProperties();
Planner planner =
ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(
plannerProperties,
executor,
tableConfig,
functionCatalog,
catalogManager);
// set parser reference
parserRef.set(planner.getParser());
return new TableEnvironmentImpl(
catalogManager,
moduleManager,
tableConfig,
executor,
functionCatalog,
planner,
settings.isStreamingMode()
);
}
```
Users are very confused about how to create `StreamTableEnvironmentImpl` (I
know some users use `StreamTableEnvironmentImpl`'s ctor to create a
`StreamTableEnvironment` instead of use create method)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]