Shawn-Hx commented on code in PR #3680:
URL: https://github.com/apache/flink-cdc/pull/3680#discussion_r1881526319
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqSourceTestUtils.java:
##########
@@ -64,4 +67,18 @@ public static String getServerId(int parallelism) {
}
private MySqSourceTestUtils() {}
+
+ public static void loopCheck(
Review Comment:
nit: move this method before constructor
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java:
##########
@@ -131,53 +134,45 @@ public SchemaRegistryRequestHandler(
public void handleSchemaChangeRequest(
SchemaChangeRequest request,
CompletableFuture<CoordinationResponse> response) {
- // We use requester subTask ID as the pending ticket, because there
will be at most 1 schema
- // change requests simultaneously from each subTask
- int requestSubTaskId = request.getSubTaskId();
+ // We use nonce to identify each schema change request
+ long nonce = request.getNonce();
synchronized (schemaChangeRequestLock) {
// Make sure we handle the first request in the pending list to
avoid out-of-order
// waiting and blocks checkpointing mechanism.
if (schemaChangeStatus == RequestStatus.IDLE) {
- if (pendingSubTaskIds.isEmpty()) {
- LOG.info(
- "Received schema change event request {} from
table {} from subTask {}. Pending list is empty, handling this.",
- request.getSchemaChangeEvent(),
- request.getTableId().toString(),
- requestSubTaskId);
- } else if (pendingSubTaskIds.get(0) == requestSubTaskId) {
- LOG.info(
- "Received schema change event request {} from
table {} from subTask {}. It is on the first of the pending list, handling
this.",
- request.getSchemaChangeEvent(),
- request.getTableId().toString(),
- requestSubTaskId);
- pendingSubTaskIds.remove(0);
- } else {
- LOG.info(
- "Received schema change event request {} from
table {} from subTask {}. It is not the first of the pending list ({}).",
- request.getSchemaChangeEvent(),
- request.getTableId().toString(),
- requestSubTaskId,
- pendingSubTaskIds);
- if (!pendingSubTaskIds.contains(requestSubTaskId)) {
- pendingSubTaskIds.add(requestSubTaskId);
- }
- response.complete(wrap(SchemaChangeResponse.busy()));
- return;
- }
-
SchemaChangeEvent event = request.getSchemaChangeEvent();
// If this schema change event has been requested by another
subTask, ignore it.
if (schemaManager.isOriginalSchemaChangeEventRedundant(event))
{
LOG.info("Event {} has been addressed before, ignoring
it.", event);
- clearCurrentSchemaChangeRequest();
+ clearCurrentSchemaChangeRequest(nonce);
LOG.info(
"SchemaChangeStatus switched from
WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
Review Comment:
I think the log message needs to be updated because there is no
WAITING_FOR_FLUSH status anymore.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java:
##########
@@ -473,37 +485,44 @@ private void handleSchemaChangeEvent(TableId tableId,
SchemaChangeEvent schemaCh
}
private SchemaChangeResponse requestSchemaChange(
- TableId tableId, SchemaChangeEvent schemaChangeEvent)
+ TableId tableId, SchemaChangeEvent schemaChangeEvent, long nonce)
throws InterruptedException, TimeoutException {
long schemaEvolveTimeOutMillis = System.currentTimeMillis() +
rpcTimeOutInMillis;
while (true) {
SchemaChangeResponse response =
sendRequestToCoordinator(
- new SchemaChangeRequest(tableId,
schemaChangeEvent, subTaskId));
- if (response.isRegistryBusy()) {
- if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
+ new SchemaChangeRequest(tableId,
schemaChangeEvent, subTaskId, nonce));
+ if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
+ if (response.isRegistryBusy()) {
LOG.info(
"{}> Schema Registry is busy now, waiting for next
request...",
subTaskId);
Thread.sleep(1000);
+ } else if (response.isWaitingForFlush()) {
+ LOG.info(
+ "{}> Schema change event (with once {}) has not
collected enough flush success events from writers, waiting...",
Review Comment:
once -> nonce
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java:
##########
@@ -186,7 +181,7 @@ public void handleSchemaChangeRequest(
// route strategies, ignore it.
if (derivedSchemaChangeEvents.isEmpty()) {
LOG.info("Event {} is omitted from sending to downstream,
ignoring it.", event);
- clearCurrentSchemaChangeRequest();
+ clearCurrentSchemaChangeRequest(nonce);
LOG.info(
"SchemaChangeStatus switched from
WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
Review Comment:
As mentioned above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]