Shawn-Hx commented on code in PR #3912:
URL: https://github.com/apache/flink-cdc/pull/3912#discussion_r1951890862
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java:
##########
@@ -219,73 +227,14 @@ protected void handleUnrecoverableError(String
taskDescription, Throwable t) {
*/
public void handleSchemaChangeRequest(
SchemaChangeRequest request,
CompletableFuture<CoordinationResponse> responseFuture) {
-
- // We use subTaskId to identify each schema change request
- int subTaskId = request.getSubTaskId();
-
- if (schemaChangeStatus == RequestStatus.IDLE) {
- if (activeSinkWriters.size() < currentParallelism) {
- LOG.info(
- "Not all active sink writers have been registered.
Current {}, expected {}.",
- activeSinkWriters.size(),
- currentParallelism);
-
responseFuture.complete(wrap(SchemaChangeResponse.waitingForFlush()));
- return;
- }
-
- if (!activeSinkWriters.equals(flushedSinkWriters.get(subTaskId))) {
- LOG.info(
- "Not all active sink writers have completed flush.
Flushed writers: {}, expected: {}.",
- flushedSinkWriters.get(subTaskId),
- activeSinkWriters);
-
responseFuture.complete(wrap(SchemaChangeResponse.waitingForFlush()));
- return;
- }
-
- LOG.info(
- "All sink writers have flushed for subTaskId {}. Switching
to APPLYING state and starting schema evolution...",
- subTaskId);
- flushedSinkWriters.remove(subTaskId);
- schemaChangeStatus = RequestStatus.APPLYING;
- pendingResponseFuture = responseFuture;
- startSchemaChangesEvolve(request, responseFuture);
- } else {
- responseFuture.complete(wrap(SchemaChangeResponse.busy()));
Review Comment:
Method `SchemaChangeResponse.busy()` can be removed. Also for
`ResponseCode.BUSY`.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java:
##########
@@ -219,73 +227,14 @@ protected void handleUnrecoverableError(String
taskDescription, Throwable t) {
*/
public void handleSchemaChangeRequest(
SchemaChangeRequest request,
CompletableFuture<CoordinationResponse> responseFuture) {
-
- // We use subTaskId to identify each schema change request
- int subTaskId = request.getSubTaskId();
-
- if (schemaChangeStatus == RequestStatus.IDLE) {
- if (activeSinkWriters.size() < currentParallelism) {
- LOG.info(
- "Not all active sink writers have been registered.
Current {}, expected {}.",
- activeSinkWriters.size(),
- currentParallelism);
-
responseFuture.complete(wrap(SchemaChangeResponse.waitingForFlush()));
- return;
- }
-
- if (!activeSinkWriters.equals(flushedSinkWriters.get(subTaskId))) {
- LOG.info(
- "Not all active sink writers have completed flush.
Flushed writers: {}, expected: {}.",
- flushedSinkWriters.get(subTaskId),
- activeSinkWriters);
-
responseFuture.complete(wrap(SchemaChangeResponse.waitingForFlush()));
Review Comment:
Method `SchemaChangeResponse.waitingForFlush()` can be removed. Also for
`ResponseCode.WAITING_FOR_FLUSH`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]