Copilot commented on code in PR #4310:
URL: https://github.com/apache/flink-cdc/pull/4310#discussion_r2910557590


##########
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java:
##########
@@ -92,6 +92,17 @@ class PostTransformOperatorTest {
                     .physicalColumn("__table_name__", 
DataTypes.STRING().notNull())
                     .primaryKey("col1")
                     .build();
+    private static final Schema METADATA_SCHEMA_UPPER_CASE =
+            Schema.newBuilder()
+                    .physicalColumn("col1".toUpperCase(), 
DataTypes.STRING().notNull())
+                    .primaryKey("col1".toUpperCase())
+                    .build();
+    private static final Schema METADATA_SCHEMA_DUPLICATE_COLUMN_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("col1", DataTypes.STRING().notNull())
+                    .physicalColumn("col1".toUpperCase(), 
DataTypes.STRING().notNull())
+                    .primaryKey("col1".toUpperCase())
+                    .build();

Review Comment:
   `METADATA_SCHEMA_DUPLICATE_COLUMN_SCHEMA` is redundant/awkward 
(“DUPLICATE_COLUMN_SCHEMA”). Consider renaming to something clearer like 
`METADATA_SCHEMA_WITH_DUPLICATE_COLUMNS` (or similar) to better convey intent.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##########
@@ -194,64 +224,65 @@ private void processElementInternal(StreamRecord<Event> 
element) {
     // -------------------
 
     /**
-     * Apply effective transform rules to {@link CreateTableEvent}s based on 
effective transformers.
+     * Apply effective transform rule to {@link CreateTableEvent}s based on 
effective transformer.
      */
     private Optional<Event> processCreateTableEvent(
-            CreateTableEvent event, List<PostTransformer> 
effectiveTransformers) {
+            CreateTableEvent event, PostTransformer effectiveTransformer) {
         TableId tableId = event.tableId();
         Schema preSchema = event.getSchema();
 
-        // Apply transform rules and verify we can get a deterministic post 
schema
-        List<Schema> schemas =
-                effectiveTransformers.stream()
-                        .map(trans -> transformSchema(preSchema, trans))
-                        .collect(Collectors.toList());
-
         Schema postSchema =
-                
SchemaUtils.ensurePkNonNull(SchemaMergingUtils.strictlyMergeSchemas(schemas));
+                SchemaUtils.ensurePkNonNull(transformSchema(preSchema, 
effectiveTransformer));
 
         // Update transform info map
         postTransformInfoMap.put(
                 tableId, PostTransformChangeInfo.of(tableId, preSchema, 
postSchema));
 
         // Update "if-table-has-been–wildcard–matched" map
         boolean wildcardMatched =
-                effectiveTransformers.stream()
-                        .map(PostTransformer::getProjection)
-                        .flatMap(this::optionalToStream)
-                        .map(TransformProjection::getProjection)
-                        .anyMatch(TransformParser::hasAsterisk);
+                effectiveTransformer.getProjection().isPresent()
+                        && TransformParser.hasAsterisk(
+                                
effectiveTransformer.getProjection().get().getProjection());
         hasAsteriskMap.put(tableId, wildcardMatched);
         projectedColumnsMap.put(
                 tableId,
                 preSchema.getColumnNames().stream()
                         .filter(postSchema.getColumnNames()::contains)
                         .collect(Collectors.toList()));
 
-        return Optional.of(new CreateTableEvent(tableId, postSchema));
+        // Apply all effective post-converters
+        Optional<SchemaChangeEvent> createTableEvent =
+                Optional.of(new CreateTableEvent(tableId, postSchema));
+        return createTableEvent.map(Event.class::cast);

Review Comment:
   The comment says “Apply all effective post-converters”, but no converter 
logic is applied here (it only wraps `CreateTableEvent` in an `Optional`). 
Either implement the intended converter application for `CreateTableEvent` (if 
applicable) or remove/adjust the comment and simplify the return to avoid 
misleading future readers.
   ```suggestion
           // Create transformed CreateTableEvent based on post-transformed 
schema
           return Optional.of((Event) new CreateTableEvent(tableId, 
postSchema));
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java:
##########
@@ -194,64 +224,65 @@ private void processElementInternal(StreamRecord<Event> 
element) {
     // -------------------
 
     /**
-     * Apply effective transform rules to {@link CreateTableEvent}s based on 
effective transformers.
+     * Apply effective transform rule to {@link CreateTableEvent}s based on 
effective transformer.
      */
     private Optional<Event> processCreateTableEvent(
-            CreateTableEvent event, List<PostTransformer> 
effectiveTransformers) {
+            CreateTableEvent event, PostTransformer effectiveTransformer) {
         TableId tableId = event.tableId();
         Schema preSchema = event.getSchema();
 
-        // Apply transform rules and verify we can get a deterministic post 
schema
-        List<Schema> schemas =
-                effectiveTransformers.stream()
-                        .map(trans -> transformSchema(preSchema, trans))
-                        .collect(Collectors.toList());
-
         Schema postSchema =
-                
SchemaUtils.ensurePkNonNull(SchemaMergingUtils.strictlyMergeSchemas(schemas));
+                SchemaUtils.ensurePkNonNull(transformSchema(preSchema, 
effectiveTransformer));
 
         // Update transform info map
         postTransformInfoMap.put(
                 tableId, PostTransformChangeInfo.of(tableId, preSchema, 
postSchema));
 
         // Update "if-table-has-been–wildcard–matched" map
         boolean wildcardMatched =
-                effectiveTransformers.stream()
-                        .map(PostTransformer::getProjection)
-                        .flatMap(this::optionalToStream)
-                        .map(TransformProjection::getProjection)
-                        .anyMatch(TransformParser::hasAsterisk);
+                effectiveTransformer.getProjection().isPresent()
+                        && TransformParser.hasAsterisk(
+                                
effectiveTransformer.getProjection().get().getProjection());
         hasAsteriskMap.put(tableId, wildcardMatched);
         projectedColumnsMap.put(
                 tableId,
                 preSchema.getColumnNames().stream()
                         .filter(postSchema.getColumnNames()::contains)
                         .collect(Collectors.toList()));
 
-        return Optional.of(new CreateTableEvent(tableId, postSchema));
+        // Apply all effective post-converters
+        Optional<SchemaChangeEvent> createTableEvent =
+                Optional.of(new CreateTableEvent(tableId, postSchema));
+        return createTableEvent.map(Event.class::cast);
     }
 
     /**
      * Apply effective transform rules to other {@link SchemaChangeEvent}s 
based on effective
      * transformers and existing {@link PostTransformChangeInfo}.
      */
     private Optional<Event> processSchemaChangeEvent(
-            SchemaChangeEvent event, List<PostTransformer> 
effectiveTransformers) {
+            SchemaChangeEvent event, PostTransformer transformer) {
+        // CreateTableEvents should be handled in `processCreateTableEvent` 
method
+        Preconditions.checkArgument(
+                !(event instanceof CreateTableEvent),
+                "Unexpected CreateTableEvents in processSchemaChangeEvent 
method: %s",
+                event);
         TableId tableId = event.tableId();
+        if (!postTransformInfoMap.containsKey(tableId)) {
+            LOG.warn(
+                    "Met dangling schema change event {}, Table {} might have 
been dropped.",

Review Comment:
   The log message is grammatically awkward (“Met dangling ...”). Consider 
rephrasing to “Encountered a dangling schema change event ..., table ... might 
have been dropped.” to improve clarity.
   ```suggestion
                       "Encountered a dangling schema change event {}, table {} 
might have been dropped.",
   ```



##########
docs/content.zh/docs/core-concept/transform.md:
##########
@@ -346,14 +346,14 @@ transform:
 transform:
   - source-table: mydb.web_order
     projection: id, order_id
-    filter: UPPER(province) = 'SHANGHAI'
     description: classification mapping example
-  - source-table: mydb.web_order
+  - source-table: mydb.\.*
     projection: order_id as id, id as order_id
-    filter: UPPER(province) = 'BEIJING'
     description: classification mapping example
 ```
 
+这里 `mydb.web_order` 将被第一条规则处理,而 `mydb` 里的所有其他表将被第二条规则处理。

Review Comment:
   与英文文档同样的问题:示例仍标注为“classification mapping example”,但当前规则已不再体现“分类映射”(之前由不同 
filter 驱动)。建议更新描述来强调“按 source-table 首条匹配规则生效”,或改用单条规则 + `CASE WHEN` 
来展示分类映射的推荐写法。



##########
docs/content/docs/core-concept/transform.md:
##########
@@ -349,14 +349,14 @@ For example, we may define a transform rule as follows:
 transform:
   - source-table: mydb.web_order
     projection: id, order_id
-    filter: UPPER(province) = 'SHANGHAI'
     description: classification mapping example
-  - source-table: mydb.web_order
+  - source-table: mydb.\.*
     projection: order_id as id, id as order_id
-    filter: UPPER(province) = 'BEIJING'
     description: classification mapping example
 ```
 
+Here `mydb.web_order` will be handled in the first rule, while other tables in 
`mydb` will fall into the second one.

Review Comment:
   This example is labeled “classification mapping example” but the 
classification behavior (previously driven by different filters) is no longer 
present—both rules are now unconditional and only differ by 
selector/projection. Consider either updating the descriptions to reflect 
“first-match routing by table pattern”, or rewriting the example to demonstrate 
classification via a single rule using `CASE WHEN` (which matches the new 
recommended approach used in tests).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to