yuxiqian commented on code in PR #3572:
URL: https://github.com/apache/flink-cdc/pull/3572#discussion_r1836206210


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##########
@@ -342,27 +342,77 @@ private CreateTableEvent 
transformCreateTableEvent(CreateTableEvent createTableE
             }
         }
 
+        cachePreTransformProcessor(tableId, createTableEvent.getSchema());
+        if (preTransformProcessorMap.containsKey(tableId)) {
+            return preTransformProcessorMap
+                    .get(tableId)
+                    .preTransformCreateTableEvent(createTableEvent);
+        }
+        return createTableEvent;
+    }
+
+    private void cachePreTransformProcessor(TableId tableId, Schema 
tableSchema) {
+        LinkedHashSet<Column> referencedColumnsSet = new LinkedHashSet<>();
+        boolean hasMatchTransform = false;
         for (PreTransformer transform : transforms) {
-            Selectors selectors = transform.getSelectors();
-            if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
+            if (!transform.getSelectors().isMatch(tableId)) {
+                continue;
+            }
+            if (!transform.getProjection().isPresent()) {
+                processProjectionTransform(tableId, tableSchema, 
referencedColumnsSet, null);
+                hasMatchTransform = true;
+            } else {

Review Comment:
   Is it correct if we omit this branch and let it fallback to 
`if(!hasMatchTransform)` condition? Seems the handling logic is the same.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##########
@@ -342,27 +342,77 @@ private CreateTableEvent 
transformCreateTableEvent(CreateTableEvent createTableE
             }
         }
 
+        cachePreTransformProcessor(tableId, createTableEvent.getSchema());
+        if (preTransformProcessorMap.containsKey(tableId)) {
+            return preTransformProcessorMap
+                    .get(tableId)
+                    .preTransformCreateTableEvent(createTableEvent);
+        }
+        return createTableEvent;
+    }
+
+    private void cachePreTransformProcessor(TableId tableId, Schema 
tableSchema) {
+        LinkedHashSet<Column> referencedColumnsSet = new LinkedHashSet<>();
+        boolean hasMatchTransform = false;
         for (PreTransformer transform : transforms) {
-            Selectors selectors = transform.getSelectors();
-            if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
+            if (!transform.getSelectors().isMatch(tableId)) {
+                continue;
+            }
+            if (!transform.getProjection().isPresent()) {
+                processProjectionTransform(tableId, tableSchema, 
referencedColumnsSet, null);
+                hasMatchTransform = true;
+            } else {
                 TransformProjection transformProjection = 
transform.getProjection().get();
-                TransformFilter transformFilter = 
transform.getFilter().orElse(null);
                 if (transformProjection.isValid()) {
-                    if (!preTransformProcessorMap.containsKey(tableId)) {
-                        preTransformProcessorMap.put(
-                                tableId,
-                                new PreTransformProcessor(
-                                        tableChangeInfo, transformProjection, 
transformFilter));
-                    }
-                    PreTransformProcessor preTransformProcessor =
-                            preTransformProcessorMap.get(tableId);
-                    // TODO: Currently this code wrongly filters out rows that 
weren't referenced in
-                    // the first matching transform rule but in the following 
transform rules.
-                    return 
preTransformProcessor.preTransformCreateTableEvent(createTableEvent);
+                    processProjectionTransform(
+                            tableId, tableSchema, referencedColumnsSet, 
transform);
+                    hasMatchTransform = true;
                 }
             }
         }
-        return createTableEvent;
+        if (!hasMatchTransform) {
+            processProjectionTransform(tableId, tableSchema, 
referencedColumnsSet, null);
+        }
+    }
+
+    public void processProjectionTransform(
+            TableId tableId,
+            Schema tableSchema,
+            LinkedHashSet<Column> referencedColumnsSet,
+            @Nullable PreTransformer transform) {
+        // If this TableId isn't presented in any transform block, it should 
behave like a "*"
+        // projection and should be regarded as asterisk-ful.
+        if (transform == null) {
+            referencedColumnsSet.addAll(tableSchema.getColumns());
+            hasAsteriskMap.put(tableId, true);
+        } else {
+            TransformProjection transformProjection = 
transform.getProjection().get();
+            boolean hasAsterisk = 
TransformParser.hasAsterisk(transformProjection.getProjection());
+            if (hasAsterisk) {
+                referencedColumnsSet.addAll(tableSchema.getColumns());
+                hasAsteriskMap.put(tableId, true);
+            } else {
+                TransformFilter transformFilter = 
transform.getFilter().orElse(null);
+                List<Column> referencedColumns =
+                        TransformParser.generateReferencedColumns(
+                                transformProjection.getProjection(),
+                                transformFilter != null ? 
transformFilter.getExpression() : null,
+                                tableSchema.getColumns());
+                // update referenced columns of other projections of the same 
tableId, if any
+                referencedColumnsSet.addAll(referencedColumns);
+                hasAsteriskMap.put(tableId, false);

Review Comment:
   Should we check if `hasAsteriskMap[tableId]` is already `true`, perhaps set 
by a previous transform rule?
   
   Add: I think it should not be a problem, since if someone puts asteriskful 
and non-asteriskful rules together, it will fail when column count changes from 
upstream anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to