This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 9816848c4b46ed303423411c354b92fcdb10c2a5 Author: yuxiqian <[email protected]> AuthorDate: Mon Aug 26 14:07:54 2024 +0800 [hotfix][cdc-runtime] Keep upstream pending requests in order to avoid checkpoint hanging --- .../cdc/pipeline/tests/SchemaEvolveE2eITCase.java | 1 + .../tests/SchemaEvolvingTransformE2eITCase.java | 1 + .../runtime/operators/schema/SchemaOperator.java | 10 +- .../coordinator/SchemaRegistryRequestHandler.java | 148 +++++++++++++-------- .../schema/event/SchemaChangeRequest.java | 15 ++- .../runtime/partitioning/PrePartitionOperator.java | 7 + .../operators/EventOperatorTestHarness.java | 2 +- 7 files changed, 125 insertions(+), 59 deletions(-) diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index 0a8d7c483..64e3e9c57 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -338,6 +338,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { // triggers DropColumnEvent stmt.execute("ALTER TABLE members DROP COLUMN biological_sex"); stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);"); + stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);"); } List<String> expectedTmEvents = diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java index 8b306d1e7..edcac1bbe 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolvingTransformE2eITCase.java @@ -349,6 +349,7 @@ public class SchemaEvolvingTransformE2eITCase extends PipelineTestEnvironment { // triggers DropColumnEvent stmt.execute("ALTER TABLE members DROP COLUMN biological_sex"); stmt.execute("INSERT INTO members VALUES (1013, 'Fiona', 16);"); + stmt.execute("INSERT INTO members VALUES (1014, 'Gem', 17);"); } List<String> expectedTmEvents = diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index 1b4f50e89..a1bdd7885 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -50,6 +50,7 @@ import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; import org.apache.flink.runtime.operators.coordination.CoordinationRequest; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -440,7 +441,8 @@ public class SchemaOperator extends AbstractStreamOperator<Event> long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis; while (true) { SchemaChangeResponse response = - sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent)); + sendRequestToCoordinator( + new SchemaChangeRequest(tableId, schemaChangeEvent, subTaskId)); if (response.isRegistryBusy()) { if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) { LOG.info( @@ -609,4 +611,10 @@ public class SchemaOperator extends AbstractStreamOperator<Event> } } } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + // Needless to do anything, since AbstractStreamOperator#snapshotState and #processElement + // is guaranteed not to be mixed together. + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 9262a6a90..de5bde4e6 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -54,7 +54,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -77,7 +76,10 @@ public class SchemaRegistryRequestHandler implements Closeable { /** * Atomic flag indicating if current RequestHandler could accept more schema changes for now. */ - private final AtomicReference<RequestStatus> schemaChangeStatus; + private volatile RequestStatus schemaChangeStatus; + + private final List<Integer> pendingSubTaskIds; + private final Object schemaChangeRequestLock; private volatile Throwable currentChangeException; private volatile List<SchemaChangeEvent> currentDerivedSchemaChangeEvents; @@ -109,7 +111,10 @@ public class SchemaRegistryRequestHandler implements Closeable { this.currentDerivedSchemaChangeEvents = new ArrayList<>(); this.currentFinishedSchemaChanges = new ArrayList<>(); this.currentIgnoredSchemaChanges = new ArrayList<>(); - this.schemaChangeStatus = new AtomicReference<>(RequestStatus.IDLE); + + this.schemaChangeStatus = RequestStatus.IDLE; + this.pendingSubTaskIds = new ArrayList<>(); + this.schemaChangeRequestLock = new Object(); } /** @@ -119,54 +124,86 @@ public class SchemaRegistryRequestHandler implements Closeable { */ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest( SchemaChangeRequest request) { - if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH)) { - LOG.info( - "Received schema change event request {} from table {}. SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.", - request.getSchemaChangeEvent(), - request.getTableId().toString()); - SchemaChangeEvent event = request.getSchemaChangeEvent(); - - // If this schema change event has been requested by another subTask, ignore it. - if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) { - LOG.info("Event {} has been addressed before, ignoring it.", event); - clearCurrentSchemaChangeRequest(); - Preconditions.checkState( - schemaChangeStatus.compareAndSet( - RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE), - "Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated, not " - + schemaChangeStatus.get()); + + // We use requester subTask ID as the pending ticket, because there will be at most 1 schema + // change requests simultaneously from each subTask + int requestSubTaskId = request.getSubTaskId(); + + synchronized (schemaChangeRequestLock) { + // Make sure we handle the first request in the pending list to avoid out-of-order + // waiting and blocks checkpointing mechanism. + if (schemaChangeStatus == RequestStatus.IDLE) { + if (pendingSubTaskIds.isEmpty()) { + LOG.info( + "Received schema change event request {} from table {} from subTask {}. Pending list is empty, handling this.", + request.getSchemaChangeEvent(), + request.getTableId().toString(), + requestSubTaskId); + } else if (pendingSubTaskIds.get(0) == requestSubTaskId) { + LOG.info( + "Received schema change event request {} from table {} from subTask {}. It is on the first of the pending list, handling this.", + request.getSchemaChangeEvent(), + request.getTableId().toString(), + requestSubTaskId); + pendingSubTaskIds.remove(0); + } else { + LOG.info( + "Received schema change event request {} from table {} from subTask {}. It is not the first of the pending list ({}).", + request.getSchemaChangeEvent(), + request.getTableId().toString(), + requestSubTaskId, + pendingSubTaskIds); + if (!pendingSubTaskIds.contains(requestSubTaskId)) { + pendingSubTaskIds.add(requestSubTaskId); + } + return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy())); + } + + SchemaChangeEvent event = request.getSchemaChangeEvent(); + + // If this schema change event has been requested by another subTask, ignore it. + if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) { + LOG.info("Event {} has been addressed before, ignoring it.", event); + clearCurrentSchemaChangeRequest(); + LOG.info( + "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.", + request); + return CompletableFuture.completedFuture( + wrap(SchemaChangeResponse.duplicate())); + } + schemaManager.applyOriginalSchemaChange(event); + List<SchemaChangeEvent> derivedSchemaChangeEvents = + calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); + + // If this schema change event is filtered out by LENIENT mode or merging table + // route strategies, ignore it. + if (derivedSchemaChangeEvents.isEmpty()) { + LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event); + clearCurrentSchemaChangeRequest(); + LOG.info( + "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.", + request); + return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored())); + } + LOG.info( - "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.", - request); - return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate())); - } - schemaManager.applyOriginalSchemaChange(event); - List<SchemaChangeEvent> derivedSchemaChangeEvents = - calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); - - // If this schema change event is filtered out by LENIENT mode or merging table route - // strategies, ignore it. - if (derivedSchemaChangeEvents.isEmpty()) { - LOG.info("Event {} is omitted from sending to downstream, ignoring it.", event); - clearCurrentSchemaChangeRequest(); - Preconditions.checkState( - schemaChangeStatus.compareAndSet( - RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE), - "Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored, not " - + schemaChangeStatus.get()); + "SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked."); + // This request has been accepted. + schemaChangeStatus = RequestStatus.WAITING_FOR_FLUSH; + currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents); + return CompletableFuture.completedFuture( + wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents))); + } else { LOG.info( - "SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.", - request); - return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored())); + "Schema Registry is busy processing a schema change request, could not handle request {} for now. Added {} to pending list ({}).", + request, + requestSubTaskId, + pendingSubTaskIds); + if (!pendingSubTaskIds.contains(requestSubTaskId)) { + pendingSubTaskIds.add(requestSubTaskId); + } + return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy())); } - currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents); - return CompletableFuture.completedFuture( - wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents))); - } else { - LOG.info( - "Schema Registry is busy processing a schema change request, could not handle request {} for now.", - request); - return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy())); } } @@ -213,9 +250,10 @@ public class SchemaRegistryRequestHandler implements Closeable { } } Preconditions.checkState( - schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, RequestStatus.FINISHED), + schemaChangeStatus == RequestStatus.APPLYING, "Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not " - + schemaChangeStatus.get()); + + schemaChangeStatus); + schemaChangeStatus = RequestStatus.FINISHED; LOG.info( "SchemaChangeStatus switched from APPLYING to FINISHED for request {}.", currentDerivedSchemaChangeEvents); @@ -248,10 +286,11 @@ public class SchemaRegistryRequestHandler implements Closeable { } if (flushedSinkWriters.equals(activeSinkWriters)) { Preconditions.checkState( - schemaChangeStatus.compareAndSet( - RequestStatus.WAITING_FOR_FLUSH, RequestStatus.APPLYING), + schemaChangeStatus == RequestStatus.WAITING_FOR_FLUSH, "Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not " + schemaChangeStatus); + + schemaChangeStatus = RequestStatus.APPLYING; LOG.info( "All sink subtask have flushed for table {}. Start to apply schema change.", tableId.toString()); @@ -262,9 +301,10 @@ public class SchemaRegistryRequestHandler implements Closeable { public CompletableFuture<CoordinationResponse> getSchemaChangeResult() { Preconditions.checkState( - !schemaChangeStatus.get().equals(RequestStatus.IDLE), + schemaChangeStatus != RequestStatus.IDLE, "Illegal schemaChangeStatus: should not be IDLE before getting schema change request results."); - if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, RequestStatus.IDLE)) { + if (schemaChangeStatus == RequestStatus.FINISHED) { + schemaChangeStatus = RequestStatus.IDLE; LOG.info( "SchemaChangeStatus switched from FINISHED to IDLE for request {}", currentDerivedSchemaChangeEvents); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java index fda6b02f1..fbc5e9c40 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java @@ -36,10 +36,14 @@ public class SchemaChangeRequest implements CoordinationRequest { private final TableId tableId; /** The schema changes. */ private final SchemaChangeEvent schemaChangeEvent; + /** The ID of subTask that initiated the request. */ + private final int subTaskId; - public SchemaChangeRequest(TableId tableId, SchemaChangeEvent schemaChangeEvent) { + public SchemaChangeRequest( + TableId tableId, SchemaChangeEvent schemaChangeEvent, int subTaskId) { this.tableId = tableId; this.schemaChangeEvent = schemaChangeEvent; + this.subTaskId = subTaskId; } public TableId getTableId() { @@ -50,6 +54,10 @@ public class SchemaChangeRequest implements CoordinationRequest { return schemaChangeEvent; } + public int getSubTaskId() { + return subTaskId; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -60,11 +68,12 @@ public class SchemaChangeRequest implements CoordinationRequest { } SchemaChangeRequest that = (SchemaChangeRequest) o; return Objects.equals(tableId, that.tableId) - && Objects.equals(schemaChangeEvent, that.schemaChangeEvent); + && Objects.equals(schemaChangeEvent, that.schemaChangeEvent) + && subTaskId == that.subTaskId; } @Override public int hashCode() { - return Objects.hash(tableId, schemaChangeEvent); + return Objects.hash(tableId, schemaChangeEvent, subTaskId); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java index 1171ad07b..938e6950d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/partitioning/PrePartitionOperator.java @@ -31,6 +31,7 @@ import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; import org.apache.flink.cdc.runtime.serializer.event.EventSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -147,4 +148,10 @@ public class PrePartitionOperator extends AbstractStreamOperator<PartitioningEve } }); } + + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + // Needless to do anything, since AbstractStreamOperator#snapshotState and #processElement + // is guaranteed not to be mixed together. + } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java index 60952b424..a469a3a91 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java @@ -164,7 +164,7 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex public void registerTableSchema(TableId tableId, Schema schema) { schemaRegistry.handleCoordinationRequest( - new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema))); + new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema), 0)); schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new CreateTableEvent(tableId, schema)); }
