[ 
https://issues.apache.org/jira/browse/FLINK-31494?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jark Wu updated FLINK-31494:
----------------------------
    Description: 
Introduce {{SqlNodeConverter}} for {{SqlToOperationConverter}}, following 
Timo's idea in FLINK-31368
class like:

{code:java}
public interface SqlNodeConverter<S extends SqlNode> {

    Operation convertSqlNode(S node, ConvertContext context);

}


/** Registry of SqlNode converters. */
public class SqlNodeConverters {

    private static final Map<Class<?>, SqlNodeConverter<?>> CONVERTERS = new 
HashMap<>();

    static {
        // register all the converters here
        register(new SqlCreateCatalogConverter());
    }
}
{code}

  was:
Introduce OperationExecutor for SqlToOperationConverter, following Timo's idea 
in FLINK-31368
class like:
{code:java}
public interface OperationExecutor{  
// The Operation supported by the current OperationExecutor 
Class<? extends Operation> supportedOperation(); 
// The SqlNode type supported by the current OperationExecutor 
Class<? extends SqlNode> supportedSqlNode(); 
// OperationExecutor is used to convert the validated sqlNode into Operation 
Operation execute(SqlNode validated); 
} {code}
 

Add an OperationExecutors to manage all OperationExecutors and be responsible 
for performing conversion sqlNode.

 
{code:java}
public class OperationExecutors{
private static Map<Class<? extends SqlNode>,OperationExecutor > executors = new 
HashMap<>();
 static{ 
    addExecutor(SqlCreateCatalog.class,CreateCatalogExecutor.class); 
    // ..... 
 }
 private static void addExecutor(sqlnodeClass,operationExecutorsClass){         
    executors.put(sqlnodeClass,operationExecutorsClass.newInstance);
 } 
 public  OperationExecutor getExecutor(sqlnodeClass){
 return executors.get(validated.getClass()); 
 }
 public Operation execute(SqlNode validated){
 return executors.get(validated.getClass()).executor(validated); 
 }
} {code}
 

This can be used in SqlToOperationConverter.java 
{code:java}
private static Optional<Operation> convertValidatedSqlNode(
FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, SqlNode 
validated) {
OperationExecutor<?> executor = OperationExecutors.getExecutor(validated);
if (executor == null){
 return Optional.empty(); 
}
Operation operation = executor.execute(validated);
if (operation!=null){
 return Optional.of(operation); 
 }
}
 {code}


> Introduce SqlNodeConverter for SqlToOperationConverter
> ------------------------------------------------------
>
>                 Key: FLINK-31494
>                 URL: https://issues.apache.org/jira/browse/FLINK-31494
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>            Reporter: xuzhiwen
>            Assignee: Jark Wu
>            Priority: Major
>
> Introduce {{SqlNodeConverter}} for {{SqlToOperationConverter}}, following 
> Timo's idea in FLINK-31368
> class like:
> {code:java}
> public interface SqlNodeConverter<S extends SqlNode> {
>     Operation convertSqlNode(S node, ConvertContext context);
> }
> /** Registry of SqlNode converters. */
> public class SqlNodeConverters {
>     private static final Map<Class<?>, SqlNodeConverter<?>> CONVERTERS = new 
> HashMap<>();
>     static {
>         // register all the converters here
>         register(new SqlCreateCatalogConverter());
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to