yuxiqian commented on code in PR #3563:
URL: https://github.com/apache/flink-cdc/pull/3563#discussion_r1726507995
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -412,36 +404,62 @@ private void handleSchemaChangeEvent(TableId tableId,
SchemaChangeEvent schemaCh
schemaChangeEvent));
}
- // The request will need to send a FlushEvent or block until flushing
finished
+ // The request will block if another schema change event is being
handled
SchemaChangeResponse response = requestSchemaChange(tableId,
schemaChangeEvent);
- if (!response.getSchemaChangeEvents().isEmpty()) {
- LOG.info(
- "Sending the FlushEvent for table {} in subtask {}.",
- tableId,
- getRuntimeContext().getIndexOfThisSubtask());
+ if (response.isAccepted()) {
+ LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId,
tableId);
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
List<SchemaChangeEvent> expectedSchemaChangeEvents =
response.getSchemaChangeEvents();
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
// The request will block until flushing finished in each sink
writer
- ReleaseUpstreamResponse schemaEvolveResponse =
requestReleaseUpstream();
+ SchemaChangeResultResponse schemaEvolveResponse =
requestSchemaChangeResult();
List<SchemaChangeEvent> finishedSchemaChangeEvents =
schemaEvolveResponse.getFinishedSchemaChangeEvents();
// Update evolved schema changes based on apply results
finishedSchemaChangeEvents.forEach(e -> output.collect(new
StreamRecord<>(e)));
+ } else if (response.isDuplicate()) {
+ LOG.info(
+ ">{} Schema change event {} has been handled in another
subTask already.",
+ subTaskId,
+ schemaChangeEvent);
+ } else if (response.isIgnored()) {
+ LOG.info(
+ ">{} Schema change event {} has been ignored. No schema
evolution needed.",
+ subTaskId,
+ schemaChangeEvent);
+ } else {
+ throw new IllegalStateException("Unexpected response status " +
response);
}
}
private SchemaChangeResponse requestSchemaChange(
- TableId tableId, SchemaChangeEvent schemaChangeEvent) {
- return sendRequestToCoordinator(new SchemaChangeRequest(tableId,
schemaChangeEvent));
+ TableId tableId, SchemaChangeEvent schemaChangeEvent)
+ throws InterruptedException, TimeoutException {
+ long schemaEvolveTimeOutMillis = System.currentTimeMillis() +
rpcTimeOutInMillis;
Review Comment:
This has been exposed as a pipeline configuration as
`schema-operator.rpc-timeout`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]