This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new a876af258 [minor][cdc-runtime] Run schema coordinator logic
asynchronously to avoid blocking the main thread
a876af258 is described below
commit a876af2589ef61bfb13bc2c01a258d5eeaa7afac
Author: yuxiqian <[email protected]>
AuthorDate: Wed Aug 28 00:38:02 2024 +0800
[minor][cdc-runtime] Run schema coordinator logic asynchronously to avoid
blocking the main thread
This closes #3557
---
.../tests/SchemaRegistryMigrationMock.java | 2 +
.../schema/coordinator/SchemaRegistry.java | 232 ++++++++++++++-------
.../schema/coordinator/SchemaRegistryProvider.java | 54 ++++-
.../coordinator/SchemaRegistryRequestHandler.java | 40 ++--
.../runtime/operators/schema/SchemaEvolveTest.java | 14 +-
.../operators/EventOperatorTestHarness.java | 32 ++-
.../MockedOperatorCoordinatorContext.java | 44 ++++
7 files changed, 314 insertions(+), 104 deletions(-)
diff --git
a/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
index bb19223e6..34cd1ea03 100644
---
a/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
+++
b/flink-cdc-migration-tests/flink-cdc-release-snapshot/src/main/java/org/apache/flink/cdc/migration/tests/SchemaRegistryMigrationMock.java
@@ -40,6 +40,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/** Dummy classes for migration test. Called via reflection. */
@@ -69,6 +70,7 @@ public class SchemaRegistryMigrationMock implements
MigrationMockBase {
return new SchemaRegistry(
"Dummy Name",
null,
+ Executors.newFixedThreadPool(1),
new MetadataApplier() {
@Override
public boolean acceptsSchemaEvolutionType(
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 617daed92..64d4fa1bc 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -37,7 +37,9 @@ import
org.apache.flink.runtime.operators.coordination.CoordinationRequestHandle
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +57,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import static
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
@@ -83,6 +86,9 @@ public class SchemaRegistry implements OperatorCoordinator,
CoordinationRequestH
/** The name of the operator this SchemaOperatorCoordinator is associated
with. */
private final String operatorName;
+ /** A single-thread executor to handle async execution of the coordinator.
*/
+ private final ExecutorService coordinatorExecutor;
+
/**
* Tracks the subtask failed reason to throw a more meaningful exception
in {@link
* #subtaskReset}.
@@ -113,18 +119,27 @@ public class SchemaRegistry implements
OperatorCoordinator, CoordinationRequestH
public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
+ ExecutorService executorService,
MetadataApplier metadataApplier,
List<RouteRule> routes) {
- this(operatorName, context, metadataApplier, routes,
SchemaChangeBehavior.EVOLVE);
+ this(
+ operatorName,
+ context,
+ executorService,
+ metadataApplier,
+ routes,
+ SchemaChangeBehavior.LENIENT);
}
public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
+ ExecutorService coordinatorExecutor,
MetadataApplier metadataApplier,
List<RouteRule> routes,
SchemaChangeBehavior schemaChangeBehavior) {
this.context = context;
+ this.coordinatorExecutor = coordinatorExecutor;
this.operatorName = operatorName;
this.failedReasons = new HashMap<>();
this.metadataApplier = metadataApplier;
@@ -133,7 +148,11 @@ public class SchemaRegistry implements
OperatorCoordinator, CoordinationRequestH
this.schemaDerivation = new SchemaDerivation(schemaManager, routes,
new HashMap<>());
this.requestHandler =
new SchemaRegistryRequestHandler(
- metadataApplier, schemaManager, schemaDerivation,
schemaChangeBehavior);
+ metadataApplier,
+ schemaManager,
+ schemaDerivation,
+ schemaChangeBehavior,
+ context);
this.schemaChangeBehavior = schemaChangeBehavior;
}
@@ -153,48 +172,87 @@ public class SchemaRegistry implements
OperatorCoordinator, CoordinationRequestH
}
@Override
- public void handleEventFromOperator(int subtask, int attemptNumber,
OperatorEvent event)
- throws Exception {
- try {
- if (event instanceof FlushSuccessEvent) {
- FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent)
event;
- LOG.info(
- "Sink subtask {} succeed flushing for table {}.",
- flushSuccessEvent.getSubtask(),
- flushSuccessEvent.getTableId().toString());
- requestHandler.flushSuccess(
- flushSuccessEvent.getTableId(),
- flushSuccessEvent.getSubtask(),
- currentParallelism);
- } else if (event instanceof SinkWriterRegisterEvent) {
- requestHandler.registerSinkWriter(((SinkWriterRegisterEvent)
event).getSubtask());
- } else {
- throw new FlinkException("Unrecognized Operator Event: " +
event);
- }
- } catch (Throwable t) {
- context.failJob(t);
- throw t;
- }
+ public void handleEventFromOperator(int subtask, int attemptNumber,
OperatorEvent event) {
+ runInEventLoop(
+ () -> {
+ try {
+ if (event instanceof FlushSuccessEvent) {
+ FlushSuccessEvent flushSuccessEvent =
(FlushSuccessEvent) event;
+ LOG.info(
+ "Sink subtask {} succeed flushing for
table {}.",
+ flushSuccessEvent.getSubtask(),
+ flushSuccessEvent.getTableId().toString());
+ requestHandler.flushSuccess(
+ flushSuccessEvent.getTableId(),
+ flushSuccessEvent.getSubtask(),
+ currentParallelism);
+ } else if (event instanceof SinkWriterRegisterEvent) {
+ requestHandler.registerSinkWriter(
+ ((SinkWriterRegisterEvent)
event).getSubtask());
+ } else {
+ throw new FlinkException("Unrecognized Operator
Event: " + event);
+ }
+ } catch (Throwable t) {
+ context.failJob(t);
+ throw t;
+ }
+ },
+ "handling event %s from subTask %d",
+ event,
+ subtask);
}
@Override
- public void checkpointCoordinator(long checkpointId,
CompletableFuture<byte[]> resultFuture)
- throws Exception {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(baos)) {
- // Serialize SchemaManager
- int schemaManagerSerializerVersion =
SchemaManager.SERIALIZER.getVersion();
- out.writeInt(schemaManagerSerializerVersion);
- byte[] serializedSchemaManager =
SchemaManager.SERIALIZER.serialize(schemaManager);
- out.writeInt(serializedSchemaManager.length);
- out.write(serializedSchemaManager);
- // Serialize SchemaDerivation mapping
- SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
- resultFuture.complete(baos.toByteArray());
- } catch (Throwable t) {
- context.failJob(t);
- throw t;
- }
+ public void checkpointCoordinator(long checkpointId,
CompletableFuture<byte[]> resultFuture) {
+ // we generate checkpoint in an async thread to not block the
JobManager's main thread, the
+ // coordinator state might be large if there are many schema changes
and monitor many
+ // tables.
+ runInEventLoop(
+ () -> {
+ try (ByteArrayOutputStream baos = new
ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos))
{
+ // Serialize SchemaManager
+ int schemaManagerSerializerVersion =
SchemaManager.SERIALIZER.getVersion();
+ out.writeInt(schemaManagerSerializerVersion);
+ byte[] serializedSchemaManager =
+
SchemaManager.SERIALIZER.serialize(schemaManager);
+ out.writeInt(serializedSchemaManager.length);
+ out.write(serializedSchemaManager);
+ // Serialize SchemaDerivation mapping
+
SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
+ resultFuture.complete(baos.toByteArray());
+ } catch (Throwable t) {
+ context.failJob(t);
+ throw t;
+ }
+ },
+ "taking checkpoint %d",
+ checkpointId);
+ }
+
+ private void runInEventLoop(
+ final ThrowingRunnable<Throwable> action,
+ final String actionName,
+ final Object... actionNameFormatParameters) {
+ coordinatorExecutor.execute(
+ () -> {
+ try {
+ action.run();
+ } catch (Throwable t) {
+ // if we have a JVM critical error, promote it
immediately, there is a good
+ // chance the logging or job failing will not succeed
anymore
+ ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
+
+ final String actionString =
+ String.format(actionName,
actionNameFormatParameters);
+ LOG.error(
+ "Uncaught exception in the
SchemaEvolutionCoordinator for {} while {}. Triggering job failover.",
+ operatorName,
+ actionString,
+ t);
+ context.failJob(t);
+ }
+ });
}
@Override
@@ -205,26 +263,34 @@ public class SchemaRegistry implements
OperatorCoordinator, CoordinationRequestH
@Override
public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
CoordinationRequest request) {
- try {
- if (request instanceof SchemaChangeRequest) {
- SchemaChangeRequest schemaChangeRequest =
(SchemaChangeRequest) request;
- return
requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
- } else if (request instanceof SchemaChangeResultRequest) {
- return requestHandler.getSchemaChangeResult();
- } else if (request instanceof GetEvolvedSchemaRequest) {
- return CompletableFuture.completedFuture(
-
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
- } else if (request instanceof GetOriginalSchemaRequest) {
- return CompletableFuture.completedFuture(
-
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
- } else {
- throw new IllegalArgumentException(
- "Unrecognized CoordinationRequest type: " + request);
- }
- } catch (Throwable t) {
- context.failJob(t);
- throw t;
- }
+ CompletableFuture<CoordinationResponse> responseFuture = new
CompletableFuture<>();
+ runInEventLoop(
+ () -> {
+ try {
+ if (request instanceof SchemaChangeRequest) {
+ SchemaChangeRequest schemaChangeRequest =
(SchemaChangeRequest) request;
+ requestHandler.handleSchemaChangeRequest(
+ schemaChangeRequest, responseFuture);
+ } else if (request instanceof
SchemaChangeResultRequest) {
+
requestHandler.getSchemaChangeResult(responseFuture);
+ } else if (request instanceof GetEvolvedSchemaRequest)
{
+ handleGetEvolvedSchemaRequest(
+ ((GetEvolvedSchemaRequest) request),
responseFuture);
+ } else if (request instanceof
GetOriginalSchemaRequest) {
+ handleGetOriginalSchemaRequest(
+ (GetOriginalSchemaRequest) request,
responseFuture);
+ } else {
+ throw new IllegalArgumentException(
+ "Unrecognized CoordinationRequest type: "
+ request);
+ }
+ } catch (Throwable t) {
+ context.failJob(t);
+ throw t;
+ }
+ },
+ "handling coordination request %s",
+ request);
+ return responseFuture;
}
@Override
@@ -253,7 +319,8 @@ public class SchemaRegistry implements OperatorCoordinator,
CoordinationRequestH
metadataApplier,
schemaManager,
schemaDerivation,
- schemaManager.getBehavior());
+ schemaManager.getBehavior(),
+ context);
break;
}
case 1:
@@ -274,7 +341,8 @@ public class SchemaRegistry implements OperatorCoordinator,
CoordinationRequestH
metadataApplier,
schemaManager,
schemaDerivation,
- schemaChangeBehavior);
+ schemaChangeBehavior,
+ context);
break;
}
default:
@@ -307,46 +375,56 @@ public class SchemaRegistry implements
OperatorCoordinator, CoordinationRequestH
// do nothing
}
- private GetEvolvedSchemaResponse handleGetEvolvedSchemaRequest(
- GetEvolvedSchemaRequest getEvolvedSchemaRequest) {
+ private void handleGetEvolvedSchemaRequest(
+ GetEvolvedSchemaRequest getEvolvedSchemaRequest,
+ CompletableFuture<CoordinationResponse> response) {
LOG.info("Handling evolved schema request: {}",
getEvolvedSchemaRequest);
int schemaVersion = getEvolvedSchemaRequest.getSchemaVersion();
TableId tableId = getEvolvedSchemaRequest.getTableId();
if (schemaVersion == GetEvolvedSchemaRequest.LATEST_SCHEMA_VERSION) {
- return new GetEvolvedSchemaResponse(
-
schemaManager.getLatestEvolvedSchema(tableId).orElse(null));
+ response.complete(
+ wrap(
+ new GetEvolvedSchemaResponse(
+
schemaManager.getLatestEvolvedSchema(tableId).orElse(null))));
} else {
try {
- return new GetEvolvedSchemaResponse(
- schemaManager.getEvolvedSchema(tableId,
schemaVersion));
+ response.complete(
+ wrap(
+ new GetEvolvedSchemaResponse(
+
schemaManager.getEvolvedSchema(tableId, schemaVersion))));
} catch (IllegalArgumentException iae) {
LOG.warn(
"Some client is requesting an non-existed evolved
schema for table {} with version {}",
tableId,
schemaVersion);
- return new GetEvolvedSchemaResponse(null);
+ response.complete(wrap(new GetEvolvedSchemaResponse(null)));
}
}
}
- private GetOriginalSchemaResponse handleGetOriginalSchemaRequest(
- GetOriginalSchemaRequest getOriginalSchemaRequest) {
+ private void handleGetOriginalSchemaRequest(
+ GetOriginalSchemaRequest getOriginalSchemaRequest,
+ CompletableFuture<CoordinationResponse> response) {
LOG.info("Handling original schema request: {}",
getOriginalSchemaRequest);
int schemaVersion = getOriginalSchemaRequest.getSchemaVersion();
TableId tableId = getOriginalSchemaRequest.getTableId();
if (schemaVersion == GetOriginalSchemaRequest.LATEST_SCHEMA_VERSION) {
- return new GetOriginalSchemaResponse(
-
schemaManager.getLatestOriginalSchema(tableId).orElse(null));
+ response.complete(
+ wrap(
+ new GetOriginalSchemaResponse(
+
schemaManager.getLatestOriginalSchema(tableId).orElse(null))));
} else {
try {
- return new GetOriginalSchemaResponse(
- schemaManager.getOriginalSchema(tableId,
schemaVersion));
+ response.complete(
+ wrap(
+ new GetOriginalSchemaResponse(
+
schemaManager.getOriginalSchema(tableId, schemaVersion))));
} catch (IllegalArgumentException iae) {
LOG.warn(
"Some client is requesting an non-existed original
schema for table {} with version {}",
tableId,
schemaVersion);
- return new GetOriginalSchemaResponse(null);
+ response.complete(wrap(new GetOriginalSchemaResponse(null)));
}
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
index dd7f2dc36..bc261e40f 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryProvider.java
@@ -23,8 +23,12 @@ import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.FatalExitExceptionHandler;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
/** Provider of {@link SchemaRegistry}. */
@Internal
@@ -57,7 +61,55 @@ public class SchemaRegistryProvider implements
OperatorCoordinator.Provider {
@Override
public OperatorCoordinator create(OperatorCoordinator.Context context)
throws Exception {
+ CoordinatorExecutorThreadFactory coordinatorThreadFactory =
+ new CoordinatorExecutorThreadFactory(
+ "schema-evolution-coordinator",
context.getUserCodeClassloader());
+ ExecutorService coordinatorExecutor =
+ Executors.newSingleThreadExecutor(coordinatorThreadFactory);
return new SchemaRegistry(
- operatorName, context, metadataApplier, routingRules,
schemaChangeBehavior);
+ operatorName,
+ context,
+ coordinatorExecutor,
+ metadataApplier,
+ routingRules,
+ schemaChangeBehavior);
+ }
+
+ /** A thread factory class that provides some helper methods. */
+ public static class CoordinatorExecutorThreadFactory implements
ThreadFactory {
+
+ private final String coordinatorThreadName;
+ private final ClassLoader cl;
+ private final Thread.UncaughtExceptionHandler errorHandler;
+
+ private Thread t;
+
+ CoordinatorExecutorThreadFactory(
+ final String coordinatorThreadName, final ClassLoader
contextClassLoader) {
+ this(coordinatorThreadName, contextClassLoader,
FatalExitExceptionHandler.INSTANCE);
+ }
+
+ CoordinatorExecutorThreadFactory(
+ final String coordinatorThreadName,
+ final ClassLoader contextClassLoader,
+ final Thread.UncaughtExceptionHandler errorHandler) {
+ this.coordinatorThreadName = coordinatorThreadName;
+ this.cl = contextClassLoader;
+ this.errorHandler = errorHandler;
+ }
+
+ @Override
+ public synchronized Thread newThread(Runnable r) {
+ if (t != null) {
+ throw new Error(
+ "This indicates that a fatal error has happened and
caused the "
+ + "coordinator executor thread to exit. Check
the earlier logs"
+ + "to see the root cause of the problem.");
+ }
+ t = new Thread(r, coordinatorThreadName);
+ t.setContextClassLoader(cl);
+ t.setUncaughtExceptionHandler(errorHandler);
+ return t;
+ }
}
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index 1310fb6be..847e343f2 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -39,6 +39,7 @@ import
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
import
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
import
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,15 +96,19 @@ public class SchemaRegistryRequestHandler implements
Closeable {
private final SchemaChangeBehavior schemaChangeBehavior;
+ private final OperatorCoordinator.Context context;
+
public SchemaRegistryRequestHandler(
MetadataApplier metadataApplier,
SchemaManager schemaManager,
SchemaDerivation schemaDerivation,
- SchemaChangeBehavior schemaChangeBehavior) {
+ SchemaChangeBehavior schemaChangeBehavior,
+ OperatorCoordinator.Context context) {
this.metadataApplier = metadataApplier;
this.schemaManager = schemaManager;
this.schemaDerivation = schemaDerivation;
this.schemaChangeBehavior = schemaChangeBehavior;
+ this.context = context;
this.activeSinkWriters = ConcurrentHashMap.newKeySet();
this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
@@ -123,8 +128,8 @@ public class SchemaRegistryRequestHandler implements
Closeable {
*
* @param request the received SchemaChangeRequest
*/
- public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
- SchemaChangeRequest request) {
+ public void handleSchemaChangeRequest(
+ SchemaChangeRequest request,
CompletableFuture<CoordinationResponse> response) {
// We use requester subTask ID as the pending ticket, because there
will be at most 1 schema
// change requests simultaneously from each subTask
@@ -157,7 +162,8 @@ public class SchemaRegistryRequestHandler implements
Closeable {
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
pendingSubTaskIds.add(requestSubTaskId);
}
- return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
+ response.complete(wrap(SchemaChangeResponse.busy()));
+ return;
}
SchemaChangeEvent event = request.getSchemaChangeEvent();
@@ -169,8 +175,8 @@ public class SchemaRegistryRequestHandler implements
Closeable {
LOG.info(
"SchemaChangeStatus switched from
WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
request);
- return CompletableFuture.completedFuture(
- wrap(SchemaChangeResponse.duplicate()));
+ response.complete(wrap(SchemaChangeResponse.duplicate()));
+ return;
}
schemaManager.applyOriginalSchemaChange(event);
List<SchemaChangeEvent> derivedSchemaChangeEvents =
@@ -184,7 +190,9 @@ public class SchemaRegistryRequestHandler implements
Closeable {
LOG.info(
"SchemaChangeStatus switched from
WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
request);
- return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
+
+ response.complete(wrap(SchemaChangeResponse.ignored()));
+ return;
}
LOG.info(
@@ -206,8 +214,8 @@ public class SchemaRegistryRequestHandler implements
Closeable {
}
});
currentDerivedSchemaChangeEvents = new
ArrayList<>(derivedSchemaChangeEvents);
- return CompletableFuture.completedFuture(
-
wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
+
+
response.complete(wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents)));
} else {
LOG.info(
"Schema Registry is busy processing a schema change
request, could not handle request {} for now. Added {} to pending list ({}).",
@@ -217,7 +225,7 @@ public class SchemaRegistryRequestHandler implements
Closeable {
if (!pendingSubTaskIds.contains(requestSubTaskId)) {
pendingSubTaskIds.add(requestSubTaskId);
}
- return
CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy()));
+ response.complete(wrap(SchemaChangeResponse.busy()));
}
}
}
@@ -314,7 +322,7 @@ public class SchemaRegistryRequestHandler implements
Closeable {
}
}
- public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
+ public void getSchemaChangeResult(CompletableFuture<CoordinationResponse>
response) {
Preconditions.checkState(
schemaChangeStatus != RequestStatus.IDLE,
"Illegal schemaChangeStatus: should not be IDLE before getting
schema change request results.");
@@ -326,11 +334,12 @@ public class SchemaRegistryRequestHandler implements
Closeable {
// This request has been finished, return it and prepare for the
next request
List<SchemaChangeEvent> finishedEvents =
clearCurrentSchemaChangeRequest();
- return CompletableFuture.supplyAsync(
- () -> wrap(new
SchemaChangeResultResponse(finishedEvents)));
+ SchemaChangeResultResponse resultResponse =
+ new SchemaChangeResultResponse(finishedEvents);
+ response.complete(wrap(resultResponse));
} else {
// Still working on schema change request, waiting it
- return CompletableFuture.supplyAsync(() -> wrap(new
SchemaChangeProcessingResponse()));
+ response.complete(wrap(new SchemaChangeProcessingResponse()));
}
}
@@ -459,7 +468,8 @@ public class SchemaRegistryRequestHandler implements
Closeable {
private List<SchemaChangeEvent> clearCurrentSchemaChangeRequest() {
if (currentChangeException != null) {
- throw new RuntimeException("Failed to apply schema change.",
currentChangeException);
+ context.failJob(
+ new RuntimeException("Failed to apply schema change.",
currentChangeException));
}
List<SchemaChangeEvent> finishedSchemaChanges =
new ArrayList<>(currentFinishedSchemaChanges);
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
index 51acbd536..e1f6a94c9 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
+import
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
@@ -1039,11 +1040,16 @@ public class SchemaEvolveTest {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"height", DOUBLE,
"Height data")))));
- Assertions.assertThatThrownBy(() -> processEvent(schemaOperator,
addColumnEvents))
+ processEvent(schemaOperator, addColumnEvents);
+ Assertions.assertThat(harness.isJobFailed()).isEqualTo(true);
+ Assertions.assertThat(harness.getJobFailureCause())
.cause()
- .cause()
- .isExactlyInstanceOf(RuntimeException.class)
- .hasMessageContaining("Failed to apply schema change");
+
.isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class)
+ .matches(
+ e ->
+ ((UnsupportedSchemaChangeEventException) e)
+ .getExceptionMessage()
+ .equals("Sink doesn't support such
schema change event."));
harness.close();
}
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
index a469a3a91..d8b977029 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
@@ -38,7 +38,6 @@ import
org.apache.flink.cdc.runtime.testutils.schema.TestingSchemaRegistryGatewa
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
-import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
@@ -56,6 +55,7 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Set;
+import java.util.concurrent.Executors;
import static
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.unwrap;
@@ -81,6 +81,7 @@ public class EventOperatorTestHarness<OP extends
AbstractStreamOperator<E>, E ex
private final SchemaRegistry schemaRegistry;
private final TestingSchemaRegistryGateway schemaRegistryGateway;
private final LinkedList<StreamRecord<E>> outputRecords = new
LinkedList<>();
+ private final MockedOperatorCoordinatorContext mockedContext;
public EventOperatorTestHarness(OP operator, int numOutputs) {
this(operator, numOutputs, null, SchemaChangeBehavior.EVOLVE);
@@ -94,11 +95,14 @@ public class EventOperatorTestHarness<OP extends
AbstractStreamOperator<E>, E ex
OP operator, int numOutputs, Duration duration,
SchemaChangeBehavior behavior) {
this.operator = operator;
this.numOutputs = numOutputs;
+ this.mockedContext =
+ new MockedOperatorCoordinatorContext(
+ SCHEMA_OPERATOR_ID,
Thread.currentThread().getContextClassLoader());
schemaRegistry =
new SchemaRegistry(
"SchemaOperator",
- new MockOperatorCoordinatorContext(
- SCHEMA_OPERATOR_ID,
Thread.currentThread().getContextClassLoader()),
+ mockedContext,
+ Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(duration),
new ArrayList<>(),
behavior);
@@ -113,11 +117,14 @@ public class EventOperatorTestHarness<OP extends
AbstractStreamOperator<E>, E ex
Set<SchemaChangeEventType> enabledEventTypes) {
this.operator = operator;
this.numOutputs = numOutputs;
+ this.mockedContext =
+ new MockedOperatorCoordinatorContext(
+ SCHEMA_OPERATOR_ID,
Thread.currentThread().getContextClassLoader());
schemaRegistry =
new SchemaRegistry(
"SchemaOperator",
- new MockOperatorCoordinatorContext(
- SCHEMA_OPERATOR_ID,
Thread.currentThread().getContextClassLoader()),
+ mockedContext,
+ Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(duration,
enabledEventTypes),
new ArrayList<>(),
behavior);
@@ -133,11 +140,14 @@ public class EventOperatorTestHarness<OP extends
AbstractStreamOperator<E>, E ex
Set<SchemaChangeEventType> errorsOnEventTypes) {
this.operator = operator;
this.numOutputs = numOutputs;
+ this.mockedContext =
+ new MockedOperatorCoordinatorContext(
+ SCHEMA_OPERATOR_ID,
Thread.currentThread().getContextClassLoader());
schemaRegistry =
new SchemaRegistry(
"SchemaOperator",
- new MockOperatorCoordinatorContext(
- SCHEMA_OPERATOR_ID,
Thread.currentThread().getContextClassLoader()),
+ mockedContext,
+ Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(
duration, enabledEventTypes,
errorsOnEventTypes),
new ArrayList<>(),
@@ -196,6 +206,14 @@ public class EventOperatorTestHarness<OP extends
AbstractStreamOperator<E>, E ex
.orElse(null);
}
+ public boolean isJobFailed() {
+ return mockedContext.isJobFailed();
+ }
+
+ public Throwable getJobFailureCause() {
+ return mockedContext.getFailureCause();
+ }
+
@Override
public void close() throws Exception {
operator.close();
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java
new file mode 100644
index 000000000..19ab961ee
--- /dev/null
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.testutils.operators;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import
org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
+
+/**
+ * This is a mocked version of Operator coordinator context that stores
failure cause for testing
+ * purposes only.
+ */
+public class MockedOperatorCoordinatorContext extends
MockOperatorCoordinatorContext {
+ public MockedOperatorCoordinatorContext(
+ OperatorID operatorID, ClassLoader userCodeClassLoader) {
+ super(operatorID, userCodeClassLoader);
+ }
+
+ private Throwable failureCause;
+
+ @Override
+ public void failJob(Throwable cause) {
+ super.failJob(cause);
+ failureCause = cause;
+ }
+
+ public Throwable getFailureCause() {
+ return failureCause;
+ }
+}