This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch release-3.2 in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 5a80757443ecc816ff6db0cb7618aaa4d4bce9b7 Author: Hongshun Wang <[email protected]> AuthorDate: Thu Aug 22 09:58:55 2024 +0800 [FLINK-36094][cdc-runtime] Improve the Exception that SchemaRegistryRequestHandler thrown This closes #3558. (cherry picked from commit 6205a5a0f16d2cf72ff751573351c4e15ea59efb) --- .../common/exceptions/SchemaEvolveException.java | 6 +- .../UnsupportedSchemaChangeEventException.java | 28 +++++- .../cdc/connectors/values/ValuesDatabase.java | 3 +- .../flink/cdc/pipeline/tests/MysqlE2eITCase.java | 2 +- .../cdc/pipeline/tests/SchemaEvolveE2eITCase.java | 23 +++-- .../cdc/pipeline/tests/TransformE2eITCase.java | 2 +- .../flink/cdc/pipeline/tests/UdfE2eITCase.java | 2 +- .../runtime/operators/schema/SchemaOperator.java | 65 +----------- .../schema/coordinator/SchemaRegistry.java | 110 +++++++++++---------- .../coordinator/SchemaRegistryRequestHandler.java | 56 +++++++---- .../event/ApplyEvolvedSchemaChangeRequest.java | 73 -------------- .../event/ApplyEvolvedSchemaChangeResponse.java | 32 ------ .../event/ApplyOriginalSchemaChangeRequest.java | 71 ------------- .../event/ApplyOriginalSchemaChangeResponse.java | 31 ------ .../schema/event/ReleaseUpstreamResponse.java | 42 +------- .../runtime/operators/schema/SchemaEvolveTest.java | 2 + .../operators/EventOperatorTestHarness.java | 11 +-- .../schema/CollectingMetadataApplier.java | 4 +- 18 files changed, 149 insertions(+), 414 deletions(-) diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java index fc5b482f1..52950fb32 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/SchemaEvolveException.java @@ -24,9 +24,9 @@ import javax.annotation.Nullable; /** An exception occurred during schema evolution. */ public class SchemaEvolveException extends FlinkRuntimeException { - private final SchemaChangeEvent applyingEvent; - private final String exceptionMessage; - private final @Nullable Throwable cause; + protected final SchemaChangeEvent applyingEvent; + protected final String exceptionMessage; + protected final @Nullable Throwable cause; public SchemaEvolveException(SchemaChangeEvent applyingEvent, String exceptionMessage) { this(applyingEvent, exceptionMessage, null); diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java index 4be20525a..55d53e7d9 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/exceptions/UnsupportedSchemaChangeEventException.java @@ -19,10 +19,36 @@ package org.apache.flink.cdc.common.exceptions; import org.apache.flink.cdc.common.event.SchemaChangeEvent; +import javax.annotation.Nullable; + /** A special kind of {@link SchemaEvolveException} that sink doesn't support such event type. */ public class UnsupportedSchemaChangeEventException extends SchemaEvolveException { public UnsupportedSchemaChangeEventException(SchemaChangeEvent applyingEvent) { - super(applyingEvent, "Sink doesn't support such schema change event.", null); + this(applyingEvent, "Sink doesn't support such schema change event."); + } + + public UnsupportedSchemaChangeEventException( + SchemaChangeEvent applyingEvent, String exceptionMessage) { + this(applyingEvent, exceptionMessage, null); + } + + public UnsupportedSchemaChangeEventException( + SchemaChangeEvent applyingEvent, String exceptionMessage, @Nullable Throwable cause) { + super(applyingEvent, exceptionMessage, cause); + } + + @Override + public String toString() { + return "UnsupportedSchemaChangeEventException{" + + "applyingEvent=" + + applyingEvent + + ", exceptionMessage='" + + exceptionMessage + + '\'' + + ", cause='" + + cause + + '\'' + + '}'; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java index e19c4a844..bbe36a25a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java @@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; @@ -150,7 +151,7 @@ public class ValuesDatabase { tableId, ((CreateTableEvent) schemaChangeEvent).getSchema())); } } else { - throw new SchemaEvolveException( + throw new UnsupportedSchemaChangeEventException( schemaChangeEvent, "Rejected schema change event since error.on.schema.change is enabled.", null); diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java index ada384e07..917873bb8 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java @@ -252,7 +252,7 @@ public class MysqlE2eITCase extends PipelineTestEnvironment { long endTimeout = System.currentTimeMillis() + MysqlE2eITCase.EVENT_WAITING_TIMEOUT; while (System.currentTimeMillis() < endTimeout) { String stdout = taskManagerConsumer.toUtf8String(); - if (stdout.contains(event)) { + if (stdout.contains(event + "\n")) { result = true; break; } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java index d187853da..c84a8824f 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/SchemaEvolveE2eITCase.java @@ -110,9 +110,9 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { true, false, false, - Collections.singletonList( - "java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\""), - Collections.singletonList( + Collections.emptyList(), + Arrays.asList( + "java.lang.IllegalStateException: Incompatible types: \"INT\" and \"DOUBLE\"", "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy")); } @@ -123,11 +123,10 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { false, true, false, - Collections.singletonList( - "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}"), + Collections.emptyList(), Arrays.asList( - "Failed to apply schema change event AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}.", - "SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", + "Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabl [...] + "UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}", "org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy")); } @@ -143,8 +142,8 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17], op=INSERT, meta=()}", "DataChangeEvent{tableId=%s.members, before=[], after=[1013, Fiona, null], op=INSERT, meta=()}"), Arrays.asList( - "Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members.", - "SchemaEvolveException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}")); + "Failed to apply schema change AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]} to table %s.members. Caused by: UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabl [...] + "UnsupportedSchemaChangeEventException{applyingEvent=AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}, exceptionMessage='Rejected schema change event since error.on.schema.change is enabled.', cause='null'}")); } @Test @@ -181,7 +180,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { false, false, Arrays.asList( - "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}", + "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`gender` TINYINT, position=LAST, existedColumnName=null}]}", "DataChangeEvent{tableId=%s.members, before=[], after=[1012, Eve, 17, 0], op=INSERT, meta=()}", "AlterColumnTypeEvent{tableId=%s.members, nameMapping={age=DOUBLE}}", "AddColumnEvent{tableId=%s.members, addedColumns=[ColumnWithPosition{column=`precise_age` DOUBLE, position=LAST, existedColumnName=null}]}", @@ -348,7 +347,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { List<String> expectedJmEvents = expectedJobManagerEvents.stream() - .map(s -> String.format(s, dbName, dbName)) + .map(s -> String.format(s, dbName, dbName, dbName)) .collect(Collectors.toList()); validateResult(expectedJmEvents, jobManagerConsumer); @@ -392,7 +391,7 @@ public class SchemaEvolveE2eITCase extends PipelineTestEnvironment { long endTimeout = System.currentTimeMillis() + SchemaEvolveE2eITCase.EVENT_WAITING_TIMEOUT; while (System.currentTimeMillis() < endTimeout) { String stdout = consumer.toUtf8String(); - if (stdout.contains(event)) { + if (stdout.contains(event + "\n")) { result = true; break; } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index e27977153..6f612ca95 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -736,7 +736,7 @@ public class TransformE2eITCase extends PipelineTestEnvironment { long endTimeout = System.currentTimeMillis() + timeout; while (System.currentTimeMillis() < endTimeout) { String stdout = taskManagerConsumer.toUtf8String(); - if (stdout.contains(event)) { + if (stdout.contains(event + "\n")) { result = true; break; } diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java index c965e4dec..938e2d98e 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/UdfE2eITCase.java @@ -386,7 +386,7 @@ public class UdfE2eITCase extends PipelineTestEnvironment { long endTimeout = System.currentTimeMillis() + timeout; while (System.currentTimeMillis() < endTimeout) { String stdout = taskManagerConsumer.toUtf8String(); - if (stdout.contains(event)) { + if (stdout.contains(event + "\n")) { result = true; break; } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index d3a9e158b..a700fd39c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.runtime.operators.schema; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.VisibleForTesting; @@ -29,7 +28,6 @@ import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.schema.Column; @@ -40,8 +38,6 @@ import org.apache.flink.cdc.common.types.DataTypeFamily; import org.apache.flink.cdc.common.types.DataTypeRoot; import org.apache.flink.cdc.common.utils.ChangeEventUtils; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils; import org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest; import org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest; @@ -431,56 +427,9 @@ public class SchemaOperator extends AbstractStreamOperator<Event> ReleaseUpstreamResponse schemaEvolveResponse = requestReleaseUpstream(); List<SchemaChangeEvent> finishedSchemaChangeEvents = schemaEvolveResponse.getFinishedSchemaChangeEvents(); - List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChangeEvents = - schemaEvolveResponse.getFailedSchemaChangeEvents(); - List<SchemaChangeEvent> ignoredSchemaChangeEvents = - schemaEvolveResponse.getIgnoredSchemaChangeEvents(); - - if (schemaChangeBehavior == SchemaChangeBehavior.EVOLVE - || schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION) { - if (schemaEvolveResponse.hasException()) { - throw new RuntimeException( - String.format( - "Failed to apply schema change event %s.\nExceptions: %s", - schemaChangeEvent, - schemaEvolveResponse.getPrintableFailedSchemaChangeEvents())); - } - } else if (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE - || schemaChangeBehavior == SchemaChangeBehavior.LENIENT - || schemaChangeBehavior == SchemaChangeBehavior.IGNORE) { - if (schemaEvolveResponse.hasException()) { - schemaEvolveResponse - .getFailedSchemaChangeEvents() - .forEach( - e -> - LOG.warn( - "Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}", - e.f0, - e.f1)); - } - } else { - throw new SchemaEvolveException( - schemaChangeEvent, - "Unexpected schema change behavior: " + schemaChangeBehavior); - } // Update evolved schema changes based on apply results - requestApplyEvolvedSchemaChanges(tableId, finishedSchemaChangeEvents); finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e))); - - LOG.info( - "Applied schema change event {} to downstream. Among {} total evolved events, {} succeeded, {} failed, and {} ignored.", - schemaChangeEvent, - expectedSchemaChangeEvents.size(), - finishedSchemaChangeEvents.size(), - failedSchemaChangeEvents.size(), - ignoredSchemaChangeEvents.size()); - - schemaOperatorMetrics.increaseFinishedSchemaChangeEvents( - finishedSchemaChangeEvents.size()); - schemaOperatorMetrics.increaseFailedSchemaChangeEvents(failedSchemaChangeEvents.size()); - schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents( - ignoredSchemaChangeEvents.size()); } } @@ -489,16 +438,6 @@ public class SchemaOperator extends AbstractStreamOperator<Event> return sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent)); } - private void requestApplyOriginalSchemaChanges( - TableId tableId, SchemaChangeEvent schemaChangeEvent) { - sendRequestToCoordinator(new ApplyOriginalSchemaChangeRequest(tableId, schemaChangeEvent)); - } - - private void requestApplyEvolvedSchemaChanges( - TableId tableId, List<SchemaChangeEvent> schemaChangeEvents) { - sendRequestToCoordinator(new ApplyEvolvedSchemaChangeRequest(tableId, schemaChangeEvents)); - } - private ReleaseUpstreamResponse requestReleaseUpstream() throws InterruptedException, TimeoutException { CoordinationResponse coordinationResponse = @@ -538,7 +477,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event> return optionalSchema.get(); } catch (Exception e) { throw new IllegalStateException( - String.format("Unable to get latest schema for table \"%s\"", tableId)); + String.format("Unable to get latest schema for table \"%s\"", tableId), e); } } @@ -553,7 +492,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event> return optionalSchema.get(); } catch (Exception e) { throw new IllegalStateException( - String.format("Unable to get latest schema for table \"%s\"", tableId)); + String.format("Unable to get latest schema for table \"%s\"", tableId), e); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java index cc7c68207..9087ae4b3 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java @@ -17,15 +17,13 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; +import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeResponse; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeResponse; import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent; import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaRequest; import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse; @@ -151,18 +149,23 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH @Override public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) throws Exception { - if (event instanceof FlushSuccessEvent) { - FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event; - LOG.info( - "Sink subtask {} succeed flushing for table {}.", - flushSuccessEvent.getSubtask(), - flushSuccessEvent.getTableId().toString()); - requestHandler.flushSuccess( - flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask()); - } else if (event instanceof SinkWriterRegisterEvent) { - requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask()); - } else { - throw new FlinkException("Unrecognized Operator Event: " + event); + try { + if (event instanceof FlushSuccessEvent) { + FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event; + LOG.info( + "Sink subtask {} succeed flushing for table {}.", + flushSuccessEvent.getSubtask(), + flushSuccessEvent.getTableId().toString()); + requestHandler.flushSuccess( + flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask()); + } else if (event instanceof SinkWriterRegisterEvent) { + requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask()); + } else { + throw new FlinkException("Unrecognized Operator Event: " + event); + } + } catch (Throwable t) { + context.failJob(t); + throw t; } } @@ -180,6 +183,9 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH // Serialize SchemaDerivation mapping SchemaDerivation.serializeDerivationMapping(schemaDerivation, out); resultFuture.complete(baos.toByteArray()); + } catch (Throwable t) { + context.failJob(t); + throw t; } } @@ -191,33 +197,29 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH @Override public CompletableFuture<CoordinationResponse> handleCoordinationRequest( CoordinationRequest request) { - if (request instanceof SchemaChangeRequest) { - SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request; - return requestHandler.handleSchemaChangeRequest(schemaChangeRequest); - } else if (request instanceof ReleaseUpstreamRequest) { - return requestHandler.handleReleaseUpstreamRequest(); - } else if (request instanceof GetEvolvedSchemaRequest) { - return CompletableFuture.completedFuture( - wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request)))); - } else if (request instanceof GetOriginalSchemaRequest) { - return CompletableFuture.completedFuture( - wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request))); - } else if (request instanceof ApplyOriginalSchemaChangeRequest) { - return CompletableFuture.completedFuture( - wrap( - handleApplyOriginalSchemaChangeRequest( - (ApplyOriginalSchemaChangeRequest) request))); - } else if (request instanceof ApplyEvolvedSchemaChangeRequest) { - return CompletableFuture.completedFuture( - wrap( - handleApplyEvolvedSchemaChangeRequest( - (ApplyEvolvedSchemaChangeRequest) request))); - } else if (request instanceof SchemaChangeResultRequest) { - return requestHandler.getSchemaChangeResult(); - } else if (request instanceof RefreshPendingListsRequest) { - return requestHandler.refreshPendingLists(); - } else { - throw new IllegalArgumentException("Unrecognized CoordinationRequest type: " + request); + try { + if (request instanceof SchemaChangeRequest) { + SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request; + return requestHandler.handleSchemaChangeRequest(schemaChangeRequest); + } else if (request instanceof ReleaseUpstreamRequest) { + return requestHandler.handleReleaseUpstreamRequest(); + } else if (request instanceof GetEvolvedSchemaRequest) { + return CompletableFuture.completedFuture( + wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request)))); + } else if (request instanceof GetOriginalSchemaRequest) { + return CompletableFuture.completedFuture( + wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request))); + } else if (request instanceof SchemaChangeResultRequest) { + return requestHandler.getSchemaChangeResult(); + } else if (request instanceof RefreshPendingListsRequest) { + return requestHandler.refreshPendingLists(); + } else { + throw new IllegalArgumentException( + "Unrecognized CoordinationRequest type: " + request); + } + } catch (Throwable t) { + context.failJob(t); + throw t; } } @@ -275,6 +277,9 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH throw new IOException( "Unrecognized serialization version " + schemaManagerSerializerVersion); } + } catch (Throwable t) { + context.failJob(t); + throw t; } } @@ -342,18 +347,15 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH } } - private ApplyOriginalSchemaChangeResponse handleApplyOriginalSchemaChangeRequest( - ApplyOriginalSchemaChangeRequest applyOriginalSchemaChangeRequest) { - schemaManager.applyOriginalSchemaChange( - applyOriginalSchemaChangeRequest.getSchemaChangeEvent()); - return new ApplyOriginalSchemaChangeResponse(); + // --------------------Only visible for test ----------------- + + @VisibleForTesting + public void handleApplyOriginalSchemaChangeEvent(SchemaChangeEvent schemaChangeEvent) { + schemaManager.applyOriginalSchemaChange(schemaChangeEvent); } - private ApplyEvolvedSchemaChangeResponse handleApplyEvolvedSchemaChangeRequest( - ApplyEvolvedSchemaChangeRequest applyEvolvedSchemaChangeRequest) { - applyEvolvedSchemaChangeRequest - .getSchemaChangeEvent() - .forEach(schemaManager::applyEvolvedSchemaChange); - return new ApplyEvolvedSchemaChangeResponse(); + @VisibleForTesting + public void handleApplyEvolvedSchemaChangeRequest(SchemaChangeEvent schemaChangeEvent) { + schemaManager.applyEvolvedSchemaChange(schemaChangeEvent); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index da88753e5..60280ce48 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.runtime.operators.schema.coordinator; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.AddColumnEvent; import org.apache.flink.cdc.common.event.AlterColumnTypeEvent; @@ -27,7 +26,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; @@ -87,7 +86,6 @@ public class SchemaRegistryRequestHandler implements Closeable { private final List<PendingSchemaChange> pendingSchemaChanges; private final List<SchemaChangeEvent> finishedSchemaChanges; - private final List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChanges; private final List<SchemaChangeEvent> ignoredSchemaChanges; /** Sink writers which have sent flush success events for the request. */ @@ -95,6 +93,8 @@ public class SchemaRegistryRequestHandler implements Closeable { /** Status of the execution of current schema change request. */ private volatile boolean isSchemaChangeApplying; + /** Actual exception if failed to apply schema change. */ + private volatile Throwable schemaChangeException; /** Executor service to execute schema change. */ private final ExecutorService schemaChangeThreadPool; @@ -110,7 +110,6 @@ public class SchemaRegistryRequestHandler implements Closeable { this.flushedSinkWriters = new HashSet<>(); this.pendingSchemaChanges = new LinkedList<>(); this.finishedSchemaChanges = new LinkedList<>(); - this.failedSchemaChanges = new LinkedList<>(); this.ignoredSchemaChanges = new LinkedList<>(); this.schemaManager = schemaManager; this.schemaDerivation = schemaDerivation; @@ -128,8 +127,8 @@ public class SchemaRegistryRequestHandler implements Closeable { private void applySchemaChange( TableId tableId, List<SchemaChangeEvent> derivedSchemaChangeEvents) { isSchemaChangeApplying = true; + schemaChangeException = null; finishedSchemaChanges.clear(); - failedSchemaChanges.clear(); ignoredSchemaChanges.clear(); for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) { @@ -146,17 +145,27 @@ public class SchemaRegistryRequestHandler implements Closeable { try { metadataApplier.applySchemaChange(changeEvent); LOG.debug("Applied schema change {} to table {}.", changeEvent, tableId); + schemaManager.applyEvolvedSchemaChange(changeEvent); finishedSchemaChanges.add(changeEvent); - } catch (SchemaEvolveException e) { + } catch (Throwable t) { LOG.error( "Failed to apply schema change {} to table {}. Caused by: {}", changeEvent, tableId, - e); - failedSchemaChanges.add(Tuple2.of(changeEvent, e)); + t); + if (!shouldIgnoreException(t)) { + schemaChangeException = t; + break; + } else { + LOG.warn( + "Failed to apply event {}, but keeps running in tolerant mode. Caused by: {}", + changeEvent, + t); + } } } } + PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0); if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) { startNextSchemaChangeRequest(); @@ -241,6 +250,10 @@ public class SchemaRegistryRequestHandler implements Closeable { () -> applySchemaChange(tableId, waitFlushSuccess.derivedSchemaChangeEvents)); Thread.sleep(1000); + if (schemaChangeException != null) { + throw new RuntimeException("Failed to apply schema change.", schemaChangeException); + } + if (isSchemaChangeApplying) { waitFlushSuccess .getResponseFuture() @@ -248,12 +261,7 @@ public class SchemaRegistryRequestHandler implements Closeable { } else { waitFlushSuccess .getResponseFuture() - .complete( - wrap( - new ReleaseUpstreamResponse( - finishedSchemaChanges, - failedSchemaChanges, - ignoredSchemaChanges))); + .complete(wrap(new ReleaseUpstreamResponse(finishedSchemaChanges))); } } } @@ -292,16 +300,15 @@ public class SchemaRegistryRequestHandler implements Closeable { } public CompletableFuture<CoordinationResponse> getSchemaChangeResult() { + if (schemaChangeException != null) { + throw new RuntimeException("Failed to apply schema change.", schemaChangeException); + } + if (isSchemaChangeApplying) { return CompletableFuture.supplyAsync(() -> wrap(new SchemaChangeProcessingResponse())); } else { return CompletableFuture.supplyAsync( - () -> - wrap( - new ReleaseUpstreamResponse( - finishedSchemaChanges, - failedSchemaChanges, - ignoredSchemaChanges))); + () -> wrap(new ReleaseUpstreamResponse(finishedSchemaChanges))); } } @@ -420,6 +427,15 @@ public class SchemaRegistryRequestHandler implements Closeable { } } + private boolean shouldIgnoreException(Throwable throwable) { + + // In IGNORE mode, will never try to apply schema change events + // In EVOLVE and and LENIENT mode, such failure will not be tolerated + // In EXCEPTION mode, an exception will be thrown once captured + return (throwable instanceof UnsupportedSchemaChangeEventException) + && (schemaChangeBehavior == SchemaChangeBehavior.TRY_EVOLVE); + } + private static class PendingSchemaChange { private final SchemaChangeRequest changeRequest; private List<SchemaChangeEvent> derivedSchemaChangeEvents; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java deleted file mode 100644 index f6798af51..000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeRequest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.schema.event; - -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.runtime.operators.coordination.CoordinationRequest; - -import java.util.List; -import java.util.Objects; - -/** - * The request from {@link SchemaOperator} to {@link SchemaRegistry} to request apply evolved schema - * changes. - */ -public class ApplyEvolvedSchemaChangeRequest implements CoordinationRequest { - - private static final long serialVersionUID = 1L; - - /** The sender of the request. */ - private final TableId tableId; - /** The schema changes. */ - private final List<SchemaChangeEvent> schemaChangeEvent; - - public ApplyEvolvedSchemaChangeRequest( - TableId tableId, List<SchemaChangeEvent> schemaChangeEvent) { - this.tableId = tableId; - this.schemaChangeEvent = schemaChangeEvent; - } - - public TableId getTableId() { - return tableId; - } - - public List<SchemaChangeEvent> getSchemaChangeEvent() { - return schemaChangeEvent; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof ApplyEvolvedSchemaChangeRequest)) { - return false; - } - ApplyEvolvedSchemaChangeRequest that = (ApplyEvolvedSchemaChangeRequest) o; - return Objects.equals(tableId, that.tableId) - && Objects.equals(schemaChangeEvent, that.schemaChangeEvent); - } - - @Override - public int hashCode() { - return Objects.hash(tableId, schemaChangeEvent); - } -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java deleted file mode 100644 index 787adfc5e..000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyEvolvedSchemaChangeResponse.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.schema.event; - -import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.runtime.operators.coordination.CoordinationResponse; - -/** - * The response from {@link SchemaRegistry} to {@link SchemaOperator} to request apply original - * schema changes, the evolved schema changes come from original schema changes with different - * schema evolution strategy. - */ -public class ApplyEvolvedSchemaChangeResponse implements CoordinationResponse { - - private static final long serialVersionUID = 1L; -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java deleted file mode 100644 index d4c5d7fee..000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeRequest.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.schema.event; - -import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.runtime.operators.coordination.CoordinationRequest; - -import java.util.Objects; - -/** - * The request from {@link SchemaOperator} to {@link SchemaRegistry} to request apply original - * schema changes. - */ -public class ApplyOriginalSchemaChangeRequest implements CoordinationRequest { - - private static final long serialVersionUID = 1L; - - /** The sender of the request. */ - private final TableId tableId; - /** The schema changes. */ - private final SchemaChangeEvent schemaChangeEvent; - - public ApplyOriginalSchemaChangeRequest(TableId tableId, SchemaChangeEvent schemaChangeEvent) { - this.tableId = tableId; - this.schemaChangeEvent = schemaChangeEvent; - } - - public TableId getTableId() { - return tableId; - } - - public SchemaChangeEvent getSchemaChangeEvent() { - return schemaChangeEvent; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof ApplyOriginalSchemaChangeRequest)) { - return false; - } - ApplyOriginalSchemaChangeRequest that = (ApplyOriginalSchemaChangeRequest) o; - return Objects.equals(tableId, that.tableId) - && Objects.equals(schemaChangeEvent, that.schemaChangeEvent); - } - - @Override - public int hashCode() { - return Objects.hash(tableId, schemaChangeEvent); - } -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeResponse.java deleted file mode 100644 index 0a92e9656..000000000 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ApplyOriginalSchemaChangeResponse.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.cdc.runtime.operators.schema.event; - -import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; -import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.runtime.operators.coordination.CoordinationResponse; - -/** - * The response from {@link SchemaRegistry} to {@link SchemaOperator} to request apply original - * schema changes. - */ -public class ApplyOriginalSchemaChangeResponse implements CoordinationResponse { - - private static final long serialVersionUID = 1L; -} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java index f577f1120..bea880962 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/ReleaseUpstreamResponse.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.runtime.operators.schema.event; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; @@ -25,7 +24,6 @@ import org.apache.flink.runtime.operators.coordination.CoordinationResponse; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; /** * The response for {@link ReleaseUpstreamRequest} from {@link SchemaRegistry} to {@link @@ -41,50 +39,19 @@ public class ReleaseUpstreamResponse implements CoordinationResponse { */ private final List<SchemaChangeEvent> finishedSchemaChangeEvents; - private final List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChangeEvents; - - private final List<SchemaChangeEvent> ignoredSchemaChangeEvents; - - public ReleaseUpstreamResponse( - List<SchemaChangeEvent> finishedSchemaChangeEvents, - List<Tuple2<SchemaChangeEvent, Throwable>> failedSchemaChangeEvents, - List<SchemaChangeEvent> ignoredSchemaChangeEvents) { + public ReleaseUpstreamResponse(List<SchemaChangeEvent> finishedSchemaChangeEvents) { this.finishedSchemaChangeEvents = finishedSchemaChangeEvents; - this.failedSchemaChangeEvents = failedSchemaChangeEvents; - this.ignoredSchemaChangeEvents = ignoredSchemaChangeEvents; } public List<SchemaChangeEvent> getFinishedSchemaChangeEvents() { return finishedSchemaChangeEvents; } - public List<Tuple2<SchemaChangeEvent, Throwable>> getFailedSchemaChangeEvents() { - return failedSchemaChangeEvents; - } - - public List<SchemaChangeEvent> getIgnoredSchemaChangeEvents() { - return ignoredSchemaChangeEvents; - } - - public String getPrintableFailedSchemaChangeEvents() { - return failedSchemaChangeEvents.stream() - .map(e -> "Failed to apply " + e.f0 + ". Caused by: " + e.f1) - .collect(Collectors.joining("\n")); - } - - public boolean hasException() { - return !failedSchemaChangeEvents.isEmpty(); - } - @Override public String toString() { return "ReleaseUpstreamResponse{" + "finishedSchemaChangeEvents=" + finishedSchemaChangeEvents - + ", failedSchemaChangeEvents=" - + failedSchemaChangeEvents - + ", ignoredSchemaChangeEvents=" - + ignoredSchemaChangeEvents + '}'; } @@ -97,14 +64,11 @@ public class ReleaseUpstreamResponse implements CoordinationResponse { return false; } ReleaseUpstreamResponse that = (ReleaseUpstreamResponse) object; - return Objects.equals(finishedSchemaChangeEvents, that.finishedSchemaChangeEvents) - && Objects.equals(failedSchemaChangeEvents, that.failedSchemaChangeEvents) - && Objects.equals(ignoredSchemaChangeEvents, that.ignoredSchemaChangeEvents); + return Objects.equals(finishedSchemaChangeEvents, that.finishedSchemaChangeEvents); } @Override public int hashCode() { - return Objects.hash( - finishedSchemaChangeEvents, failedSchemaChangeEvents, ignoredSchemaChangeEvents); + return Objects.hash(finishedSchemaChangeEvents); } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java index bf61ad260..d3e0de8d0 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java @@ -1036,6 +1036,8 @@ public class SchemaEvolveTest { Column.physicalColumn( "height", DOUBLE, "Height data"))))); Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents)) + .cause() + .cause() .isExactlyInstanceOf(RuntimeException.class) .hasMessageContaining("Failed to apply schema change"); harness.close(); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java index b5262d2d5..354304599 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java @@ -25,8 +25,6 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyEvolvedSchemaChangeRequest; -import org.apache.flink.cdc.runtime.operators.schema.event.ApplyOriginalSchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent; import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaRequest; import org.apache.flink.cdc.runtime.operators.schema.event.GetEvolvedSchemaResponse; @@ -56,7 +54,6 @@ import org.apache.flink.util.SerializedValue; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedList; import java.util.Set; @@ -166,14 +163,10 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex } public void registerTableSchema(TableId tableId, Schema schema) { - schemaRegistry.handleCoordinationRequest( - new ApplyOriginalSchemaChangeRequest( - tableId, new CreateTableEvent(tableId, schema))); + schemaRegistry.handleApplyOriginalSchemaChangeEvent(new CreateTableEvent(tableId, schema)); schemaRegistry.handleCoordinationRequest( new SchemaChangeRequest(tableId, new CreateTableEvent(tableId, schema))); - schemaRegistry.handleCoordinationRequest( - new ApplyEvolvedSchemaChangeRequest( - tableId, Collections.singletonList(new CreateTableEvent(tableId, schema)))); + schemaRegistry.handleApplyEvolvedSchemaChangeRequest(new CreateTableEvent(tableId, schema)); } public Schema getLatestOriginalSchema(TableId tableId) throws Exception { diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java index 9398e1f1e..9384fa757 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.sink.MetadataApplier; import java.time.Duration; @@ -83,8 +84,7 @@ public class CollectingMetadataApplier implements MetadataApplier { try { Thread.sleep(duration.toMillis()); if (errorsOnEventTypes.contains(schemaChangeEvent.getType())) { - throw new SchemaEvolveException( - schemaChangeEvent, "Dummy metadata apply exception for test.", null); + throw new UnsupportedSchemaChangeEventException(schemaChangeEvent); } } catch (InterruptedException ignore) { // Ignores sleep interruption
