lvyanquan commented on code in PR #4056:
URL: https://github.com/apache/flink-cdc/pull/4056#discussion_r2277107272


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##########
@@ -144,54 +129,17 @@ public void setup(
     @Override
     public void initializeState(StateInitializationContext context) throws 
Exception {
         super.initializeState(context);
-        if (!shouldStoreSchemasInState) {
-            // Skip schema persistency if we're in the distributed schema mode 
or the batch
-            // execution mode.
-            return;
-        }
-        OperatorStateStore stateStore = context.getOperatorStateStore();
-        ListStateDescriptor<byte[]> descriptor =
-                new ListStateDescriptor<>("originalSchemaState", byte[].class);
-        state = stateStore.getUnionListState(descriptor);
-        if (context.isRestored()) {
-            for (byte[] serializedTableInfo : state.get()) {
-                PreTransformChangeInfo stateTableChangeInfo =
-                        PreTransformChangeInfo.SERIALIZER.deserialize(
-                                PreTransformChangeInfo.SERIALIZER.getVersion(),
-                                serializedTableInfo);
-                preTransformChangeInfoMap.put(
-                        stateTableChangeInfo.getTableId(), 
stateTableChangeInfo);
-
-                CreateTableEvent restoredCreateTableEvent =
-                        new CreateTableEvent(
-                                stateTableChangeInfo.getTableId(),
-                                
stateTableChangeInfo.getPreTransformedSchema());
-                // hasAsteriskMap needs to be recalculated after restoring 
from a checkpoint.
-                cacheTransformRuleInfo(restoredCreateTableEvent);
-            }
-        }
+        // Historically, transform operator maintains internal state to help 
transforming schemas
+        // correctly after restart.
+        // However, it is not required after
+        // 
https://code.alibaba-inc.com/ververica/flink-cdc/codereview/21946168 got 
merged, since

Review Comment:
   Avoid using non-public links.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java:
##########
@@ -377,7 +379,7 @@ private void reportFinishedSnapshotSplitsIfNeed() {
             FinishedSnapshotSplitsReportEvent reportEvent =
                     new FinishedSnapshotSplitsReportEvent(finishedOffsets);
             context.sendSourceEventToCoordinator(reportEvent);
-            LOG.debug(
+            LOG.info(

Review Comment:
   Avoid unnecessary change.



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java:
##########
@@ -766,7 +766,7 @@ public void emitRecord(
             }
         }
 
-        private void processElement(
+        protected void processElement(

Review Comment:
   Avoid unnecessary change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to