KurtYoung commented on a change in pull request #8844: 
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300322557
 
 

 ##########
 File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatements.java
 ##########
 @@ -18,62 +18,197 @@
 
 package org.apache.flink.table.sqlexec;
 
+import org.apache.flink.sql.parser.SqlProperty;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.calcite.FlinkTypeSystem;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.PlannerQueryOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.util.StringUtils;
 
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
 import org.apache.calcite.util.ReflectUtil;
 import org.apache.calcite.util.ReflectiveVisitor;
 
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * Mix-in tool class for {@code SqlNode} that allows DDL commands to be
  * executed directly.
  *
- * <p>For every kind of {@link SqlNode}, there needs a method named
- * #execute(type), the 'type' argument should be the subclass
- * type for the supported {@link SqlNode}.
+ * <p>For every kind of {@link SqlNode}, there needs to have a corresponding
+ * #execute(type) method, the 'type' argument should be the subclass
+ * of the supported {@link SqlNode}.
+ *
+ * <p>Every #execute() should return a {@link Operation} which can be used in
+ * {@link org.apache.flink.table.delegation.Planner}.
  */
 public class SqlExecutableStatements implements ReflectiveVisitor {
-       private TableEnvironment tableEnv;
+       private FlinkPlannerImpl flinkPlanner;
 
-       private final ReflectUtil.MethodDispatcher<Void> dispatcher =
-               ReflectUtil.createMethodDispatcher(Void.class,
+       private final ReflectUtil.MethodDispatcher<Operation> dispatcher =
+               ReflectUtil.createMethodDispatcher(Operation.class,
                        this,
                        "execute",
                        SqlNode.class);
 
        //~ Constructors 
-----------------------------------------------------------
 
-       private SqlExecutableStatements(TableEnvironment tableEnvironment) {
-               this.tableEnv = tableEnvironment;
+       private SqlExecutableStatements(FlinkPlannerImpl flinkPlanner) {
+               this.flinkPlanner = flinkPlanner;
        }
 
        /**
-        * This is the main entrance of executing all kinds of DDL/DML {@code 
SqlNode}s, different
+        * This is the main entrance for executing all kinds of DDL/DML {@code 
SqlNode}s, different
         * SqlNode will have it's implementation in the #execute(type) method 
whose 'type' argument
         * is subclass of {@code SqlNode}.
         *
         * <p>Caution that the {@link #execute(SqlNode)} should never expect to 
be invoked.
         *
-        * @param tableEnvironment TableEnvironment to interact with
+        * @param flinkPlanner     FlinkPlannerImpl to convert sql node to rel 
node
         * @param sqlNode          SqlNode to execute on
         */
-       public static void executeSqlNode(TableEnvironment tableEnvironment, 
SqlNode sqlNode) {
-               SqlExecutableStatements statement = new 
SqlExecutableStatements(tableEnvironment);
-               statement.dispatcher.invoke(sqlNode);
+       public static Operation executeSqlNode(FlinkPlannerImpl flinkPlanner, 
SqlNode sqlNode) {
+               // validate the query
+               final SqlNode validated = flinkPlanner.validate(sqlNode);
+               SqlExecutableStatements statement = new 
SqlExecutableStatements(flinkPlanner);
+               try {
+                       return statement.dispatcher.invoke(validated);
+               } catch (RuntimeException r) {
+                       // try to make the throws more friendly.
+                       if (r.getCause() instanceof InvocationTargetException) {
+                               final Throwable throwable = 
r.getCause().getCause();
+                               if (throwable instanceof ValidationException) {
+                                       throw new 
ValidationException(throwable.getMessage());
+                               } else if (throwable instanceof AssertionError) 
{
+                                       throw new 
AssertionError(throwable.getMessage());
+                               } else {
+                                       throw new RuntimeException(throwable);
+                               }
+                       } else {
+                               throw r;
+                       }
+               }
        }
 
        /**
         * Execute the {@link SqlCreateTable} node.
         */
-       public void execute(SqlCreateTable sqlCreateTable) {
-               // need to implement.
+       public Operation execute(SqlCreateTable sqlCreateTable) {
+               // set with properties
+               SqlNodeList propertyList = sqlCreateTable.getPropertyList();
+               Map<String, String> properties = new HashMap<>();
+               if (propertyList != null) {
+                       propertyList.getList().forEach(p ->
+                               properties.put(((SqlProperty) 
p).getKeyString().toLowerCase(),
+                                       ((SqlProperty) p).getValueString()));
+               }
+               String tableType = properties.get("connector");
+               if (StringUtils.isNullOrWhitespaceOnly(tableType)) {
+                       throw new SqlExecutionException("Option [connector] is 
required for"
+                               + " CREATE TABLE statement.");
+               }
+
+               TableSchema tableSchema = createTableSchema(sqlCreateTable,
+                       new FlinkTypeFactory(new FlinkTypeSystem())); // need 
to make type factory singleton ?
 
 Review comment:
   I don't think you can use `FlinkTypeFactory` anymore, once the tableEnv is 
brought up to api module. Use another way to replace it?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to