This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 555425e  [Fix] lookup join close executor service (#162)
555425e is described below

commit 555425e4944a0640105504538cd349a4c6fa6fc7
Author: wudi <[email protected]>
AuthorDate: Mon Jul 17 10:02:27 2023 +0800

    [Fix] lookup join close executor service (#162)
---
 .../doris/flink/connection/SimpleJdbcConnectionProvider.java     | 2 +-
 .../org/apache/doris/flink/lookup/DorisJdbcLookupReader.java     | 7 +++++++
 .../java/org/apache/doris/flink/lookup/DorisLookupReader.java    | 3 ++-
 .../main/java/org/apache/doris/flink/lookup/ExecutionPool.java   | 9 +++++++--
 .../src/main/java/org/apache/doris/flink/lookup/Worker.java      | 2 ++
 .../doris/flink/table/DorisRowDataAsyncLookupFunction.java       | 5 +++++
 .../apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java | 3 +++
 7 files changed, 27 insertions(+), 4 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
index 4d42cd6..73141a3 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/connection/SimpleJdbcConnectionProvider.java
@@ -42,7 +42,7 @@ public class SimpleJdbcConnectionProvider implements 
JdbcConnectionProvider, Ser
 
     @Override
     public Connection getOrEstablishConnection() throws 
ClassNotFoundException, SQLException {
-        if (connection != null && !connection.isClosed()) {
+        if (connection != null && !connection.isClosed() && 
connection.isValid(10000)) {
             return connection;
         }
         try {
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
index 9090783..d935a8d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisJdbcLookupReader.java
@@ -129,4 +129,11 @@ public class DorisJdbcLookupReader extends 
DorisLookupReader {
             throw new IOException(e);
         }
     }
+
+    @Override
+    public void close() throws IOException {
+        if(this.pool != null){
+            this.pool.close();
+        }
+    }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
index b6c5073..7c8663e 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/DorisLookupReader.java
@@ -19,11 +19,12 @@ package org.apache.doris.flink.lookup;
 
 import org.apache.flink.table.data.RowData;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-public abstract class DorisLookupReader {
+public abstract class DorisLookupReader implements Closeable {
 
     public abstract CompletableFuture<List<RowData>> asyncGet(RowData record) 
throws IOException;
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
index c9638f5..9b930ff 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/ExecutionPool.java
@@ -103,9 +103,12 @@ public class ExecutionPool implements Closeable {
     @Override
     public void close() throws IOException {
         if (started.compareAndSet(true, false)) {
-            actionWatcherExecutorService.shutdown();
+            LOG.info("close executorService");
+            actionWatcherExecutorService.shutdownNow();
             workerStated.set(false);
-            workerExecutorService.shutdown();
+            workerExecutorService.shutdownNow();
+            this.actionWatcherExecutorService = null;
+            this.workerExecutorService = null;
             this.semaphore = null;
         }
     }
@@ -160,6 +163,7 @@ public class ExecutionPool implements Closeable {
                     if (firstGet != null) {
                         recordList.add(firstGet);
                         queue.drainTo(recordList, batchSize - 1);
+                        LOG.debug("fetch {} records from queue", 
recordList.size());
                         Map<String, List<Get>> getsByTable = new HashMap<>();
                         for (Get get : recordList) {
                             List<Get> list = 
getsByTable.computeIfAbsent(get.getRecord().getTableIdentifier(), (s) -> new 
ArrayList<>());
@@ -181,6 +185,7 @@ public class ExecutionPool implements Closeable {
                     }
                 }
             }
+            LOG.info("action watcher stop");
         }
 
         @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
index e29bfd1..c015921 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/lookup/Worker.java
@@ -153,6 +153,7 @@ public class Worker implements Runnable {
         for (int retry = 0; retry <= maxRetryTimes; retry++) {
             resultRecordMap = new HashMap<>();
             try {
+                long start = System.currentTimeMillis();
                 Connection conn = 
jdbcConnectionProvider.getOrEstablishConnection();
                 try (PreparedStatement ps = conn.prepareStatement(sql)) {
                     int paramIndex = 0;
@@ -174,6 +175,7 @@ public class Worker implements Runnable {
                         }
                     }
                 }
+                LOG.debug("query cost {}ms, batch {} records, sql is {}", 
System.currentTimeMillis()-start, recordList.size(), sql);
                 return resultRecordMap;
             } catch (Exception e) {
                 LOG.error(String.format("query doris error, retry times = %d", 
retry), e);
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
index e6b2166..da168ba 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataAsyncLookupFunction.java
@@ -68,6 +68,8 @@ public class DorisRowDataAsyncLookupFunction extends 
AsyncTableFunction<RowData>
     @Override
     public void open(FunctionContext context) throws Exception {
         super.open(context);
+        LOG.info("lookup options: threadSize {}, batchSize {}, queueSize {}",
+                lookupOptions.getJdbcReadThreadSize(), 
lookupOptions.getJdbcReadBatchSize(), 
lookupOptions.getJdbcReadBatchQueueSize());
         this.cache = cacheMaxSize == -1 || cacheExpireMs == -1
                 ? null
                 : CacheBuilder.newBuilder()
@@ -119,6 +121,9 @@ public class DorisRowDataAsyncLookupFunction extends 
AsyncTableFunction<RowData>
     @Override
     public void close() throws Exception {
         super.close();
+        if(lookupReader != null){
+            lookupReader.close();
+        }
     }
 
     @VisibleForTesting
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
index 93a0059..00a6c77 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataJdbcLookupFunction.java
@@ -111,6 +111,9 @@ public class DorisRowDataJdbcLookupFunction extends 
TableFunction<RowData> {
     @Override
     public void close() throws Exception {
         super.close();
+        if(lookupReader != null){
+            lookupReader.close();
+        }
     }
 
     @VisibleForTesting


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to