xzw0223 created FLINK-31494: ------------------------------- Summary: Introduce OperationExecutor for SqlToOperationConverter Key: FLINK-31494 URL: https://issues.apache.org/jira/browse/FLINK-31494 Project: Flink Issue Type: Sub-task Components: Table SQL / Planner Reporter: xzw0223
Introduce OperationExecutor for SqlToOperationConverter, following Timo's idea in FLINK-31368 class like: ```java public interface OperationExecutor { // The Operation supported by the current OperationExecutor Class<? extends Operation> supportedOperation(); // The SqlNode type supported by the current OperationExecutor Class<? extends SqlNode> supportedSqlNode(); // OperationExecutor is used to convert the validated sqlNode into Operation Operation execute(SqlNode validated); } ``` Add an OperationExecutors to manage all OperationExecutors and be responsible for performing conversion sqlNode. ```java public class OperationExecutors{ private static Map<Class<? extends SqlNode>,OperationExecutor > executors = new HashMap<>(); static{ addExecutor(SqlCreateCatalog.class,CreateCatalogExecutor.class); // ..... } private static void addExecutor(sqlnodeClass,operationExecutorsClass) { executors.put(sqlnodeClass,operationExecutorsClass.newInstance); } public OperationExecutor getExecutor(sqlnodeClass) { return executors.get(validated.getClass()); } public Operation execute(SqlNode validated){ return executors.get(validated.getClass()).executor(validated); } ``` This can be used in SqlToOperationConverter.java ```java private static Optional<Operation> convertValidatedSqlNode( FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode validated) { OperationExecutor<?> executor = OperationExecutors.getExecutor(validated); if (executor == null) { return Optional.empty(); } Operation operation = executor.execute(validated); if (operation!=null) { return Optional.of(operation); } } ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)