[
https://issues.apache.org/jira/browse/FLINK-31494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu updated FLINK-31494:
----------------------------
Description:
Introduce {{SqlNodeConverter}} for {{SqlToOperationConverter}}, following
Timo's idea in FLINK-31368
class like:
{code:java}
public interface SqlNodeConverter<S extends SqlNode> {
Operation convertSqlNode(S node, ConvertContext context);
}
/** Registry of SqlNode converters. */
public class SqlNodeConverters {
private static final Map<Class<?>, SqlNodeConverter<?>> CONVERTERS = new
HashMap<>();
static {
// register all the converters here
register(new SqlCreateCatalogConverter());
}
}
{code}
was:
Introduce OperationExecutor for SqlToOperationConverter, following Timo's idea
in FLINK-31368
class like:
{code:java}
public interface OperationExecutor{
// The Operation supported by the current OperationExecutor
Class<? extends Operation> supportedOperation();
// The SqlNode type supported by the current OperationExecutor
Class<? extends SqlNode> supportedSqlNode();
// OperationExecutor is used to convert the validated sqlNode into Operation
Operation execute(SqlNode validated);
} {code}
Add an OperationExecutors to manage all OperationExecutors and be responsible
for performing conversion sqlNode.
{code:java}
public class OperationExecutors{
private static Map<Class<? extends SqlNode>,OperationExecutor > executors = new
HashMap<>();
static{
addExecutor(SqlCreateCatalog.class,CreateCatalogExecutor.class);
// .....
}
private static void addExecutor(sqlnodeClass,operationExecutorsClass){
executors.put(sqlnodeClass,operationExecutorsClass.newInstance);
}
public OperationExecutor getExecutor(sqlnodeClass){
return executors.get(validated.getClass());
}
public Operation execute(SqlNode validated){
return executors.get(validated.getClass()).executor(validated);
}
} {code}
This can be used in SqlToOperationConverter.java
{code:java}
private static Optional<Operation> convertValidatedSqlNode(
FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode
validated) {
OperationExecutor<?> executor = OperationExecutors.getExecutor(validated);
if (executor == null){
return Optional.empty();
}
Operation operation = executor.execute(validated);
if (operation!=null){
return Optional.of(operation);
}
}
{code}
> Introduce SqlNodeConverter for SqlToOperationConverter
> ------------------------------------------------------
>
> Key: FLINK-31494
> URL: https://issues.apache.org/jira/browse/FLINK-31494
> Project: Flink
> Issue Type: Sub-task
> Components: Table SQL / Planner
> Reporter: xuzhiwen
> Assignee: Jark Wu
> Priority: Major
>
> Introduce {{SqlNodeConverter}} for {{SqlToOperationConverter}}, following
> Timo's idea in FLINK-31368
> class like:
> {code:java}
> public interface SqlNodeConverter<S extends SqlNode> {
> Operation convertSqlNode(S node, ConvertContext context);
> }
> /** Registry of SqlNode converters. */
> public class SqlNodeConverters {
> private static final Map<Class<?>, SqlNodeConverter<?>> CONVERTERS = new
> HashMap<>();
> static {
> // register all the converters here
> register(new SqlCreateCatalogConverter());
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)