This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 060d2032e [hotfix][cdc-runtime] Fix schema registry hanging in
multiple parallelism
060d2032e is described below
commit 060d2032e839c817f9b139e7699685290a68e2dc
Author: yuxiqian <[email protected]>
AuthorDate: Fri Aug 23 13:47:08 2024 +0800
[hotfix][cdc-runtime] Fix schema registry hanging in multiple parallelism
This closes #3567.
---
.../schema/coordinator/SchemaRegistry.java | 14 ++++++--
.../coordinator/SchemaRegistryRequestHandler.java | 42 +++++++++++++++++-----
2 files changed, 45 insertions(+), 11 deletions(-)
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 8ea3a1f93..617daed92 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -104,6 +104,12 @@ public class SchemaRegistry implements
OperatorCoordinator, CoordinationRequestH
private SchemaChangeBehavior schemaChangeBehavior;
+ /**
+ * Current parallelism. Use this to verify if Schema Registry has
collected enough flush success
+ * events from sink operators.
+ */
+ private int currentParallelism;
+
public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
@@ -135,7 +141,9 @@ public class SchemaRegistry implements OperatorCoordinator,
CoordinationRequestH
public void start() throws Exception {
LOG.info("Starting SchemaRegistry for {}.", operatorName);
this.failedReasons.clear();
- LOG.info("Started SchemaRegistry for {}.", operatorName);
+ this.currentParallelism = context.currentParallelism();
+ LOG.info(
+ "Started SchemaRegistry for {}. Parallelism: {}",
operatorName, currentParallelism);
}
@Override
@@ -155,7 +163,9 @@ public class SchemaRegistry implements OperatorCoordinator,
CoordinationRequestH
flushSuccessEvent.getSubtask(),
flushSuccessEvent.getTableId().toString());
requestHandler.flushSuccess(
- flushSuccessEvent.getTableId(),
flushSuccessEvent.getSubtask());
+ flushSuccessEvent.getTableId(),
+ flushSuccessEvent.getSubtask(),
+ currentParallelism);
} else if (event instanceof SinkWriterRegisterEvent) {
requestHandler.registerSinkWriter(((SinkWriterRegisterEvent)
event).getSubtask());
} else {
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index 99019a6b4..444fb41d2 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -48,11 +48,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
@@ -103,8 +103,8 @@ public class SchemaRegistryRequestHandler implements
Closeable {
this.schemaDerivation = schemaDerivation;
this.schemaChangeBehavior = schemaChangeBehavior;
- this.activeSinkWriters = new HashSet<>();
- this.flushedSinkWriters = new HashSet<>();
+ this.activeSinkWriters = ConcurrentHashMap.newKeySet();
+ this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
this.currentDerivedSchemaChangeEvents = new ArrayList<>();
@@ -122,7 +122,7 @@ public class SchemaRegistryRequestHandler implements
Closeable {
SchemaChangeRequest request) {
if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE,
RequestStatus.WAITING_FOR_FLUSH)) {
LOG.info(
- "Received schema change event request {} from table {}.
Start to buffer requests for others.",
+ "Received schema change event request {} from table {}.
SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will
be blocked.",
request.getSchemaChangeEvent(),
request.getTableId().toString());
SchemaChangeEvent event = request.getSchemaChangeEvent();
@@ -134,7 +134,11 @@ public class SchemaRegistryRequestHandler implements
Closeable {
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH,
RequestStatus.IDLE),
- "Illegal schemaChangeStatus state: should still in
WAITING_FOR_FLUSH state if event was duplicated.");
+ "Illegal schemaChangeStatus state: should still in
WAITING_FOR_FLUSH state if event was duplicated, not "
+ + schemaChangeStatus.get());
+ LOG.info(
+ "SchemaChangeStatus switched from WAITING_FOR_FLUSH to
IDLE for request {} due to duplicated request.",
+ request);
return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
}
schemaManager.applyOriginalSchemaChange(event);
@@ -149,7 +153,11 @@ public class SchemaRegistryRequestHandler implements
Closeable {
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH,
RequestStatus.IDLE),
- "Illegal schemaChangeStatus state: should still in
WAITING_FOR_FLUSH state if event was ignored.");
+ "Illegal schemaChangeStatus state: should still in
WAITING_FOR_FLUSH state if event was ignored, not "
+ + schemaChangeStatus.get());
+ LOG.info(
+ "SchemaChangeStatus switched from WAITING_FOR_FLUSH to
IDLE for request {} due to ignored request.",
+ request);
return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
}
@@ -220,7 +228,11 @@ public class SchemaRegistryRequestHandler implements
Closeable {
}
Preconditions.checkState(
schemaChangeStatus.compareAndSet(RequestStatus.APPLYING,
RequestStatus.FINISHED),
- "Illegal schemaChangeStatus state: should be APPLYING before
applySchemaChange finishes");
+ "Illegal schemaChangeStatus state: should be APPLYING before
applySchemaChange finishes, not "
+ + schemaChangeStatus.get());
+ LOG.info(
+ "SchemaChangeStatus switched from APPLYING to FINISHED for
request {}.",
+ currentDerivedSchemaChangeEvents);
}
/**
@@ -239,13 +251,21 @@ public class SchemaRegistryRequestHandler implements
Closeable {
* @param tableId the subtask in SchemaOperator and table that the
FlushEvent is about
* @param sinkSubtask the sink subtask succeed flushing
*/
- public void flushSuccess(TableId tableId, int sinkSubtask) {
+ public void flushSuccess(TableId tableId, int sinkSubtask, int
parallelism) {
flushedSinkWriters.add(sinkSubtask);
+ if (activeSinkWriters.size() < parallelism) {
+ LOG.info(
+ "Not all active sink writers have been registered. Current
{}, expected {}.",
+ activeSinkWriters.size(),
+ parallelism);
+ return;
+ }
if (flushedSinkWriters.equals(activeSinkWriters)) {
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH,
RequestStatus.APPLYING),
- "Illegal schemaChangeStatus state: should be
WAITING_FOR_FLUSH before collecting enough FlushEvents");
+ "Illegal schemaChangeStatus state: should be
WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
+ + schemaChangeStatus);
LOG.info(
"All sink subtask have flushed for table {}. Start to
apply schema change.",
tableId.toString());
@@ -259,6 +279,10 @@ public class SchemaRegistryRequestHandler implements
Closeable {
!schemaChangeStatus.get().equals(RequestStatus.IDLE),
"Illegal schemaChangeStatus: should not be IDLE before getting
schema change request results.");
if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED,
RequestStatus.IDLE)) {
+ LOG.info(
+ "SchemaChangeStatus switched from FINISHED to IDLE for
request {}",
+ currentDerivedSchemaChangeEvents);
+
// This request has been finished, return it and prepare for the
next request
List<SchemaChangeEvent> finishedEvents =
clearCurrentSchemaChangeRequest();
return CompletableFuture.supplyAsync(