leonardBang commented on code in PR #3632:
URL: https://github.com/apache/flink-cdc/pull/3632#discussion_r1832296112


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##########
@@ -265,12 +260,31 @@ private SchemaChangeEvent 
cacheCreateTable(CreateTableEvent event) {
     private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent 
event) {
         TableId tableId = event.tableId();
         PreTransformChangeInfo tableChangeInfo = 
preTransformChangeInfoMap.get(tableId);
+
         Schema originalSchema =
                 
SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event);
         Schema preTransformedSchema = 
tableChangeInfo.getPreTransformedSchema();
-        Optional<SchemaChangeEvent> schemaChangeEvent =
-                SchemaUtils.transformSchemaChangeEvent(
-                        hasAsteriskMap.get(tableId), 
referencedColumnsMap.get(tableId), event);
+
+        Optional<SchemaChangeEvent> schemaChangeEvent;
+        if (hasAsteriskMap.getOrDefault(tableId, true)) {
+            // If this TableId is asterisk-ful, we should use the latest 
upstream schema as
+            // referenced columns to perform schema evolution, not of the 
original ones generated
+            // when creating tables. If hasAsteriskMap has no entry for this 
TableId, it means that
+            // this TableId has not been captured by any transform rules, and 
should be regarded as

Review Comment:
   ```suggestion
               // this TableId has not been referecned by any transform rules, 
and should be regarded as
   ```



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##########
@@ -265,12 +260,31 @@ private SchemaChangeEvent 
cacheCreateTable(CreateTableEvent event) {
     private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent 
event) {
         TableId tableId = event.tableId();
         PreTransformChangeInfo tableChangeInfo = 
preTransformChangeInfoMap.get(tableId);
+

Review Comment:
   useless change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to