wuchong commented on a change in pull request #8844:
[FLINK-12951][table-planner] Add logic to bridge DDL to table source(…
URL: https://github.com/apache/flink/pull/8844#discussion_r300253609
##########
File path:
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatements.java
##########
@@ -18,62 +18,197 @@
package org.apache.flink.table.sqlexec;
+import org.apache.flink.sql.parser.SqlProperty;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.calcite.FlinkTypeSystem;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.PlannerQueryOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.util.StringUtils;
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.util.ReflectUtil;
import org.apache.calcite.util.ReflectiveVisitor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
/**
* Mix-in tool class for {@code SqlNode} that allows DDL commands to be
* executed directly.
*
- * <p>For every kind of {@link SqlNode}, there needs a method named
- * #execute(type), the 'type' argument should be the subclass
- * type for the supported {@link SqlNode}.
+ * <p>For every kind of {@link SqlNode}, there needs to have a corresponding
+ * #execute(type) method, the 'type' argument should be the subclass
+ * of the supported {@link SqlNode}.
+ *
+ * <p>Every #execute() should return a {@link Operation} which can be used in
+ * {@link org.apache.flink.table.delegation.Planner}.
*/
public class SqlExecutableStatements implements ReflectiveVisitor {
- private TableEnvironment tableEnv;
+ private FlinkPlannerImpl flinkPlanner;
- private final ReflectUtil.MethodDispatcher<Void> dispatcher =
- ReflectUtil.createMethodDispatcher(Void.class,
+ private final ReflectUtil.MethodDispatcher<Operation> dispatcher =
+ ReflectUtil.createMethodDispatcher(Operation.class,
Review comment:
1. I don't see a necessary to have a visitor here. Because we have know the
specific SqlNode type (DDL, DML or Query) before invoke these methods. I think
some static utility method will be more clear and enough here. For example:
`QueryOperation convertToQueryOperation(..)`, `CreateOperation
convertToCreateOperation(..)`, `ModifyOperation convertToModifyOperation(..)`
2. The method name and class name is not appropriate, because it doesn't
**execute** the statement (e.g. register table to catalog, insert data to
table), it only converts `SqlNode` to `Operation`s.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services