This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 77785c1c0 [hotfix[cdc-runtime] Close MetadataApplier in SchemaRegistry 
when the job stops
77785c1c0 is described below

commit 77785c1c085e9ad981a0df14056bd525e3368c5b
Author: Leonard Xu <xbjt...@gmail.com>
AuthorDate: Thu Jan 16 20:07:30 2025 +0800

    [hotfix[cdc-runtime] Close MetadataApplier in SchemaRegistry when the job 
stops
    
     This closes #3864
---
 .../flink/cdc/runtime/operators/schema/common/SchemaRegistry.java  | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
index 566dffa93..149049177 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
@@ -130,6 +131,12 @@ public abstract class SchemaRegistry implements 
OperatorCoordinator, Coordinatio
     public void close() throws Exception {
         LOG.info("Closing SchemaRegistry - {}.", operatorName);
         coordinatorExecutor.shutdown();
+        try {
+            metadataApplier.close();
+        } catch (Exception e) {
+            LOG.error("Failed to close metadata applier.", e);
+            throw new IOException("Failed to close metadata applier.", e);
+        }
     }
 
     // ------------------------------

Reply via email to