wuchong commented on code in PR #22208:
URL: https://github.com/apache/flink/pull/22208#discussion_r1141288126


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableConverter.java:
##########
@@ -50,51 +49,30 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
-/** Helper class for converting {@link SqlCreateTable} to {@link 
CreateTableOperation}. */
-class SqlCreateTableConverter {
+/** A converter for {@link SqlCreateTable}. */
+public class SqlCreateTableConverter implements 
SqlNodeConverter<SqlCreateTable> {
 
-    private final MergeTableLikeUtil mergeTableLikeUtil;
-    private final CatalogManager catalogManager;
-
-    SqlCreateTableConverter(
-            FlinkCalciteSqlValidator sqlValidator,
-            CatalogManager catalogManager,
-            Function<SqlNode, String> escapeExpression) {
-        this.mergeTableLikeUtil =
-                new MergeTableLikeUtil(
-                        sqlValidator, escapeExpression, 
catalogManager.getDataTypeFactory());
-        this.catalogManager = catalogManager;
-    }
-
-    /** Convert the {@link SqlCreateTable} node. */
-    Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
-        CatalogTable catalogTable = createCatalogTable(sqlCreateTable);
-
-        UnresolvedIdentifier unresolvedIdentifier =
-                UnresolvedIdentifier.of(sqlCreateTable.fullTableName());
-        ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
-
-        return new CreateTableOperation(
-                identifier,
-                catalogTable,
-                sqlCreateTable.isIfNotExists(),
-                sqlCreateTable.isTemporary());
+    @Override
+    public Operation convertSqlNode(SqlCreateTable node, ConvertContext 
context) {
+        if (node instanceof SqlCreateTableAs) {
+            return convertCreateTableAs(context, (SqlCreateTableAs) node);
+        }
+        return convertCreateTable(context, node);
     }
 
     /** Convert the {@link SqlCreateTableAs} node. */
-    Operation convertCreateTableAS(
-            FlinkPlannerImpl flinkPlanner, SqlCreateTableAs sqlCreateTableAs) {
+    public Operation convertCreateTableAs(
+            SqlNodeConverter.ConvertContext context, SqlCreateTableAs 
sqlCreateTableAs) {
+        CatalogManager catalogManager = context.getCatalogManager();
         UnresolvedIdentifier unresolvedIdentifier =
                 UnresolvedIdentifier.of(sqlCreateTableAs.fullTableName());
         ObjectIdentifier identifier = 
catalogManager.qualifyIdentifier(unresolvedIdentifier);
 
         PlannerQueryOperation query =
                 (PlannerQueryOperation)
-                        SqlNodeToOperationConversion.convert(
-                                        flinkPlanner, catalogManager, 
sqlCreateTableAs.getAsQuery())
+                        
SqlNodeConverters.convertSqlNode(sqlCreateTableAs.getAsQuery(), context)

Review Comment:
   This doesn't work for now because we didn't migrate the query SqlNode yet, 
and the build failed with ["CTAS unsupported node type 
SqlSelect"](https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=47331&view=logs&j=0c940707-2659-5648-cbe6-a1ad63045f0a&t=075c2716-8010-5565-fe08-3c4bb45824a4).
   
   I have submitted a pull request for migrating the query SqlNode #22213



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to