This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ce864721 [INLONG-7291][Sort] Fix bug of dirtysink not opening for 
jdbc multiple sink (#7288)
3ce864721 is described below

commit 3ce864721bd78fca74b5cbbe97904ef72604aad2
Author: Yizhou Yang <[email protected]>
AuthorDate: Thu Feb 2 10:03:54 2023 +0800

    [INLONG-7291][Sort] Fix bug of dirtysink not opening for jdbc multiple sink 
(#7288)
---
 .../inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java    | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
index 67e9adac0..ead5b58a3 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/JdbcMultiBatchingOutputFormat.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
 import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
 import 
org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
@@ -219,6 +220,11 @@ public class JdbcMultiBatchingOutputFormat<In, JdbcIn, 
JdbcExec extends JdbcBatc
                             executionOptions.getBatchIntervalMs(),
                             executionOptions.getBatchIntervalMs(),
                             TimeUnit.MILLISECONDS);
+            try {
+                dirtySinkHelper.open(new Configuration());
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
         }
     }
 

Reply via email to