lvyanquan commented on code in PR #4056:
URL: https://github.com/apache/flink-cdc/pull/4056#discussion_r2277107272
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java:
##########
@@ -144,54 +129,17 @@ public void setup(
@Override
public void initializeState(StateInitializationContext context) throws
Exception {
super.initializeState(context);
- if (!shouldStoreSchemasInState) {
- // Skip schema persistency if we're in the distributed schema mode
or the batch
- // execution mode.
- return;
- }
- OperatorStateStore stateStore = context.getOperatorStateStore();
- ListStateDescriptor<byte[]> descriptor =
- new ListStateDescriptor<>("originalSchemaState", byte[].class);
- state = stateStore.getUnionListState(descriptor);
- if (context.isRestored()) {
- for (byte[] serializedTableInfo : state.get()) {
- PreTransformChangeInfo stateTableChangeInfo =
- PreTransformChangeInfo.SERIALIZER.deserialize(
- PreTransformChangeInfo.SERIALIZER.getVersion(),
- serializedTableInfo);
- preTransformChangeInfoMap.put(
- stateTableChangeInfo.getTableId(),
stateTableChangeInfo);
-
- CreateTableEvent restoredCreateTableEvent =
- new CreateTableEvent(
- stateTableChangeInfo.getTableId(),
-
stateTableChangeInfo.getPreTransformedSchema());
- // hasAsteriskMap needs to be recalculated after restoring
from a checkpoint.
- cacheTransformRuleInfo(restoredCreateTableEvent);
- }
- }
+ // Historically, transform operator maintains internal state to help
transforming schemas
+ // correctly after restart.
+ // However, it is not required after
+ //
https://code.alibaba-inc.com/ververica/flink-cdc/codereview/21946168 got
merged, since
Review Comment:
Avoid using non-public links.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java:
##########
@@ -377,7 +379,7 @@ private void reportFinishedSnapshotSplitsIfNeed() {
FinishedSnapshotSplitsReportEvent reportEvent =
new FinishedSnapshotSplitsReportEvent(finishedOffsets);
context.sendSourceEventToCoordinator(reportEvent);
- LOG.debug(
+ LOG.info(
Review Comment:
Avoid unnecessary change.
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java:
##########
@@ -766,7 +766,7 @@ public void emitRecord(
}
}
- private void processElement(
+ protected void processElement(
Review Comment:
Avoid unnecessary change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]