loserwang1024 commented on code in PR #3563:
URL: https://github.com/apache/flink-cdc/pull/3563#discussion_r1726446769
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -412,36 +404,62 @@ private void handleSchemaChangeEvent(TableId tableId,
SchemaChangeEvent schemaCh
schemaChangeEvent));
}
- // The request will need to send a FlushEvent or block until flushing
finished
+ // The request will block if another schema change event is being
handled
SchemaChangeResponse response = requestSchemaChange(tableId,
schemaChangeEvent);
- if (!response.getSchemaChangeEvents().isEmpty()) {
- LOG.info(
- "Sending the FlushEvent for table {} in subtask {}.",
- tableId,
- getRuntimeContext().getIndexOfThisSubtask());
+ if (response.isAccepted()) {
+ LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId,
tableId);
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
List<SchemaChangeEvent> expectedSchemaChangeEvents =
response.getSchemaChangeEvents();
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
// The request will block until flushing finished in each sink
writer
- ReleaseUpstreamResponse schemaEvolveResponse =
requestReleaseUpstream();
+ SchemaChangeResultResponse schemaEvolveResponse =
requestSchemaChangeResult();
List<SchemaChangeEvent> finishedSchemaChangeEvents =
schemaEvolveResponse.getFinishedSchemaChangeEvents();
// Update evolved schema changes based on apply results
finishedSchemaChangeEvents.forEach(e -> output.collect(new
StreamRecord<>(e)));
+ } else if (response.isDuplicate()) {
+ LOG.info(
+ ">{} Schema change event {} has been handled in another
subTask already.",
+ subTaskId,
+ schemaChangeEvent);
+ } else if (response.isIgnored()) {
+ LOG.info(
+ ">{} Schema change event {} has been ignored. No schema
evolution needed.",
+ subTaskId,
+ schemaChangeEvent);
+ } else {
+ throw new IllegalStateException("Unexpected response status " +
response);
}
}
private SchemaChangeResponse requestSchemaChange(
- TableId tableId, SchemaChangeEvent schemaChangeEvent) {
- return sendRequestToCoordinator(new SchemaChangeRequest(tableId,
schemaChangeEvent));
+ TableId tableId, SchemaChangeEvent schemaChangeEvent)
+ throws InterruptedException, TimeoutException {
+ long schemaEvolveTimeOutMillis = System.currentTimeMillis() +
rpcTimeOutInMillis;
+ while (true) {
+ SchemaChangeResponse response =
+ sendRequestToCoordinator(new SchemaChangeRequest(tableId,
schemaChangeEvent));
+ if (response.isRegistryBusy()) {
+ if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
+ LOG.info(
+ ">{} Schema Registry is busy now, waiting for next
request...",
Review Comment:
what is "> substask" meaning?
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaManager.java:
##########
@@ -93,6 +98,97 @@ public SchemaChangeBehavior getBehavior() {
return behavior;
}
+ /**
+ * This function checks if the given schema change event has been applied
already. If so, it
+ * will be ignored to avoid sending duplicate evolved schema change events
to sink metadata
+ * applier.
+ */
+ public final boolean
isOriginalSchemaChangeEventRedundant(SchemaChangeEvent event) {
+ TableId tableId = event.tableId();
+ Optional<Schema> latestSchema = getLatestOriginalSchema(tableId);
+ return Boolean.TRUE.equals(
+ SchemaChangeEventVisitor.visit(
Review Comment:
I wonder we should do it in registry or sink? @leonardBang , WDYT?
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -412,36 +404,62 @@ private void handleSchemaChangeEvent(TableId tableId,
SchemaChangeEvent schemaCh
schemaChangeEvent));
}
- // The request will need to send a FlushEvent or block until flushing
finished
+ // The request will block if another schema change event is being
handled
SchemaChangeResponse response = requestSchemaChange(tableId,
schemaChangeEvent);
- if (!response.getSchemaChangeEvents().isEmpty()) {
- LOG.info(
- "Sending the FlushEvent for table {} in subtask {}.",
- tableId,
- getRuntimeContext().getIndexOfThisSubtask());
+ if (response.isAccepted()) {
+ LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId,
tableId);
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
List<SchemaChangeEvent> expectedSchemaChangeEvents =
response.getSchemaChangeEvents();
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
// The request will block until flushing finished in each sink
writer
- ReleaseUpstreamResponse schemaEvolveResponse =
requestReleaseUpstream();
+ SchemaChangeResultResponse schemaEvolveResponse =
requestSchemaChangeResult();
List<SchemaChangeEvent> finishedSchemaChangeEvents =
schemaEvolveResponse.getFinishedSchemaChangeEvents();
// Update evolved schema changes based on apply results
finishedSchemaChangeEvents.forEach(e -> output.collect(new
StreamRecord<>(e)));
+ } else if (response.isDuplicate()) {
+ LOG.info(
+ ">{} Schema change event {} has been handled in another
subTask already.",
+ subTaskId,
+ schemaChangeEvent);
+ } else if (response.isIgnored()) {
+ LOG.info(
+ ">{} Schema change event {} has been ignored. No schema
evolution needed.",
+ subTaskId,
+ schemaChangeEvent);
+ } else {
+ throw new IllegalStateException("Unexpected response status " +
response);
}
}
private SchemaChangeResponse requestSchemaChange(
- TableId tableId, SchemaChangeEvent schemaChangeEvent) {
- return sendRequestToCoordinator(new SchemaChangeRequest(tableId,
schemaChangeEvent));
+ TableId tableId, SchemaChangeEvent schemaChangeEvent)
+ throws InterruptedException, TimeoutException {
+ long schemaEvolveTimeOutMillis = System.currentTimeMillis() +
rpcTimeOutInMillis;
Review Comment:
Please exposed DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT. If user restarted job
with binlog mode, lot of create table events will be sent, some operator may
wait for a long time.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -412,36 +404,62 @@ private void handleSchemaChangeEvent(TableId tableId,
SchemaChangeEvent schemaCh
schemaChangeEvent));
}
- // The request will need to send a FlushEvent or block until flushing
finished
+ // The request will block if another schema change event is being
handled
SchemaChangeResponse response = requestSchemaChange(tableId,
schemaChangeEvent);
- if (!response.getSchemaChangeEvents().isEmpty()) {
- LOG.info(
- "Sending the FlushEvent for table {} in subtask {}.",
- tableId,
- getRuntimeContext().getIndexOfThisSubtask());
+ if (response.isAccepted()) {
+ LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId,
tableId);
Review Comment:
larger than taskId?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]