wuchong commented on code in PR #22208:
URL: https://github.com/apache/flink/pull/22208#discussion_r1141037935


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableLikeConverter.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTableLike;
+import org.apache.flink.table.operations.Operation;
+
+import static 
org.apache.flink.table.planner.operations.CreateTableConverterUtils.convertCreateTable;
+
+/** A converter for {@link SqlCreateTableLike}. */
+public class SqlCreateTableLikeConverter implements 
SqlNodeConverter<SqlCreateTableLike> {
+
+    @Override
+    public Operation convertSqlNode(SqlCreateTableLike node, ConvertContext 
context) {
+        return convertCreateTable(context, node);

Review Comment:
   You can simply call `return new 
SqlCreateTableConverter().convertSqlNode(node, context);` to reuse the 
conversion logic if we would move the util there. 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableConverter.java:
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.table.operations.Operation;
+
+import static 
org.apache.flink.table.planner.operations.CreateTableConverterUtils.convertCreateTable;
+
+/** A converter for {@link SqlCreateTable}. */
+public class SqlCreateTableConverter implements 
SqlNodeConverter<SqlCreateTable> {
+
+    @Override
+    public Operation convertSqlNode(SqlCreateTable node, ConvertContext 
context) {
+        return convertCreateTable(context, node);

Review Comment:
   This makes the design of `SqlNodeConverter` less useful because the core 
logic is out of it. Please move the implementation of `convertCreateTable` here.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlCreateTableAsConverter.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlCreateTableAs;
+import org.apache.flink.table.operations.Operation;
+
+import static 
org.apache.flink.table.planner.operations.CreateTableConverterUtils.convertCreateTableAs;
+
+/** A converter for {@link SqlCreateTableAs}. */
+public class SqlCreateTableAsConverter implements 
SqlNodeConverter<SqlCreateTableAs> {
+    @Override
+    public Operation convertSqlNode(SqlCreateTableAs node, ConvertContext 
context) {
+        return convertCreateTableAs(context, node);

Review Comment:
   Please move the implementation of `convertCreateTableAs` here.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/CreateTableConverterUtils.java:
##########
@@ -85,8 +74,10 @@ Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
     }
 
     /** Convert the {@link SqlCreateTableAs} node. */
-    Operation convertCreateTableAS(
-            FlinkPlannerImpl flinkPlanner, SqlCreateTableAs sqlCreateTableAs) {
+    public static Operation convertCreateTableAs(
+            SqlNodeConverter.ConvertContext context, SqlCreateTableAs 
sqlCreateTableAs) {
+        CatalogManager catalogManager = context.getCatalogManager();
+        FlinkPlannerImpl flinkPlanner = context.getFlinkPlannerImpl();

Review Comment:
   I'm trying not to expose `FlinkPlannerImpl` to the converters because it 
pulls out many complex objects. I think we can simply call 
`SqlNodeConverters#convertSqlNode(sqlCreateTableAs.getAsQuery(), context)` once 
we migrate query conversion. 



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/CreateTableConverterUtils.java:
##########
@@ -50,32 +49,22 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
-/** Helper class for converting {@link SqlCreateTable} to {@link 
CreateTableOperation}. */
-class SqlCreateTableConverter {
+/** A utility class for {@link SqlCreateTable} conversion. */
+public class CreateTableConverterUtils {
 
-    private final MergeTableLikeUtil mergeTableLikeUtil;
-    private final CatalogManager catalogManager;
+    private CreateTableConverterUtils() {}
 
-    SqlCreateTableConverter(
-            FlinkCalciteSqlValidator sqlValidator,
-            CatalogManager catalogManager,
-            Function<SqlNode, String> escapeExpression) {
-        this.mergeTableLikeUtil =
-                new MergeTableLikeUtil(
-                        sqlValidator, escapeExpression, 
catalogManager.getDataTypeFactory());
-        this.catalogManager = catalogManager;
-    }
-
-    /** Convert the {@link SqlCreateTable} node. */
-    Operation convertCreateTable(SqlCreateTable sqlCreateTable) {
-        CatalogTable catalogTable = createCatalogTable(sqlCreateTable);
+    /** Convert {@link SqlCreateTable} or {@link SqlCreateTableAs} node. */

Review Comment:
   ```suggestion
       /** Convert {@link SqlCreateTable} or {@link SqlCreateTableLike} node. */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to