This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 77785c1c0 [hotfix[cdc-runtime] Close MetadataApplier in SchemaRegistry when the job stops 77785c1c0 is described below commit 77785c1c085e9ad981a0df14056bd525e3368c5b Author: Leonard Xu <xbjt...@gmail.com> AuthorDate: Thu Jan 16 20:07:30 2025 +0800 [hotfix[cdc-runtime] Close MetadataApplier in SchemaRegistry when the job stops This closes #3864 --- .../flink/cdc/runtime/operators/schema/common/SchemaRegistry.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java index 566dffa93..149049177 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java @@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; import java.time.Duration; import java.util.List; import java.util.Map; @@ -130,6 +131,12 @@ public abstract class SchemaRegistry implements OperatorCoordinator, Coordinatio public void close() throws Exception { LOG.info("Closing SchemaRegistry - {}.", operatorName); coordinatorExecutor.shutdown(); + try { + metadataApplier.close(); + } catch (Exception e) { + LOG.error("Failed to close metadata applier.", e); + throw new IOException("Failed to close metadata applier.", e); + } } // ------------------------------