superdiaodiao commented on code in PR #25247:
URL: https://github.com/apache/flink/pull/25247#discussion_r1730233268


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/MergeTableAsUtil.java:
##########
@@ -63,9 +79,90 @@ public class MergeTableAsUtil {
     }
 
     /**
-     * Merges the schema part of the {@code sqlCreateTableAs} with the {@code 
sourceSchema}.
+     * Rewrites the query operation to include only the fields that may be 
persisted in the sink.
+     */
+    public PlannerQueryOperation maybeRewriteQuery(
+            CatalogManager catalogManager,
+            FlinkPlannerImpl flinkPlanner,
+            PlannerQueryOperation origQueryOperation,
+            SqlNode origQueryNode,
+            ResolvedCatalogTable sinkTable) {
+        FlinkCalciteSqlValidator sqlValidator = 
flinkPlanner.getOrCreateSqlValidator();
+        SqlRewriterUtils rewriterUtils = new SqlRewriterUtils(sqlValidator);
+        FlinkTypeFactory typeFactory = (FlinkTypeFactory) 
sqlValidator.getTypeFactory();
+
+        // Only fields that may be persisted will be included in the select 
query
+        RowType sinkRowType =
+                ((RowType) 
sinkTable.getResolvedSchema().toSinkRowDataType().getLogicalType());
+
+        Map<String, Integer> sourceFields =
+                IntStream.range(0, 
origQueryOperation.getResolvedSchema().getColumnNames().size())
+                        .boxed()
+                        .collect(
+                                Collectors.toMap(
+                                        
origQueryOperation.getResolvedSchema().getColumnNames()
+                                                ::get,
+                                        Function.identity()));
+
+        // assignedFields contains the new sink fields that are not present in 
the source
+        // and that will be included in the select query
+        LinkedHashMap<Integer, SqlNode> assignedFields = new LinkedHashMap<>();
+
+        // targetPositions contains the positions of the source fields that 
will be
+        // included in the select query
+        List<Object> targetPositions = new ArrayList<>();

Review Comment:
   Since the type of `sourceFields` is `Map<String, Integer>` , would it more 
clear to use `List<Integer>`?
   ```suggestion
           List<Integer> targetPositions = new ArrayList<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to