yuxiqian commented on code in PR #3285:
URL: https://github.com/apache/flink-cdc/pull/3285#discussion_r1597351018


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##########
@@ -221,73 +240,67 @@ private TableInfo 
getTableInfoFromSchemaEvolutionClient(TableId tableId) throws
         return tableInfo;
     }
 
-    private void transformSchema(TableId tableId, Schema schema) throws 
Exception {
-        for (Tuple4<Selectors, Optional<TransformProjection>, 
Optional<TransformFilter>, Boolean>
-                transform : transforms) {
-            Selectors selectors = transform.f0;
-            if (selectors.isMatch(tableId) && transform.f1.isPresent()) {
-                TransformProjection transformProjection = transform.f1.get();
+    private Schema transformSchema(TableId tableId, Schema schema) throws 
Exception {
+        List<Schema> newSchemas = new ArrayList<>();
+        for (PostTransformers transform : transforms) {
+            Selectors selectors = transform.getSelectors();
+            if (selectors.isMatch(tableId) && 
transform.getProjection().isPresent()) {
+                TransformProjection transformProjection = 
transform.getProjection().get();
+                TransformFilter transformFilter = 
transform.getFilter().orElse(null);
                 if (transformProjection.isValid()) {
                     if 
(!transformProjectionProcessorMap.containsKey(transformProjection)) {
                         transformProjectionProcessorMap.put(
                                 transformProjection,
-                                
TransformProjectionProcessor.of(transformProjection));
+                                PostTransformProcessor.of(transformProjection, 
transformFilter));
                     }
-                    TransformProjectionProcessor transformProjectionProcessor =
+                    PostTransformProcessor postTransformProcessor =
                             
transformProjectionProcessorMap.get(transformProjection);
                     // update the columns of projection and add the column of 
projection into Schema
-                    
transformProjectionProcessor.processSchemaChangeEvent(schema);
+                    
newSchemas.add(postTransformProcessor.processSchemaChangeEvent(schema));
                 }
             }
         }
+        if (newSchemas.isEmpty()) {
+            return schema;
+        }
+
+        Schema firstSchema = newSchemas.get(0);
+        newSchemas.stream()
+                .skip(1)
+                .forEach(
+                        testSchema -> {
+                            if (!testSchema.equals(firstSchema)) {
+                                throw new IllegalArgumentException(
+                                        String.format(
+                                                "Incompatible transform rules 
result found. Inferred schema: %s and %s",
+                                                firstSchema, testSchema));
+                            }
+                        });
+        return firstSchema;

Review Comment:
   Added `SchemaUtils.mergeCompatibleUpstreamSchema` to resolve this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to