This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new ee94f552 [fix] Fixed the issue that batchwriter may be blocked when 
writing to multiple tables (#511)
ee94f552 is described below

commit ee94f552e8f56900d375cac30804ac6e94f90bae
Author: wudi <[email protected]>
AuthorDate: Wed Nov 20 09:58:55 2024 +0800

    [fix] Fixed the issue that batchwriter may be blocked when writing to 
multiple tables (#511)
---
 .../doris/flink/sink/batch/DorisBatchStreamLoad.java | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 3cfda604..3747257d 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -178,7 +178,7 @@ public class DorisBatchStreamLoad implements Serializable {
      * @param record
      * @throws IOException
      */
-    public synchronized void writeRecord(String database, String table, byte[] 
record) {
+    public void writeRecord(String database, String table, byte[] record) {
         checkFlushException();
         String bufferKey = getTableIdentifier(database, table);
 
@@ -228,15 +228,15 @@ public class DorisBatchStreamLoad implements Serializable 
{
         }
     }
 
-    public synchronized boolean bufferFullFlush(String bufferKey) {
+    public boolean bufferFullFlush(String bufferKey) {
         return doFlush(bufferKey, false, true);
     }
 
-    public synchronized boolean intervalFlush() {
+    public boolean intervalFlush() {
         return doFlush(null, false, false);
     }
 
-    public synchronized boolean checkpointFlush() {
+    public boolean checkpointFlush() {
         return doFlush(null, true, false);
     }
 
@@ -254,6 +254,10 @@ public class DorisBatchStreamLoad implements Serializable {
     }
 
     private synchronized boolean flush(String bufferKey, boolean waitUtilDone) 
{
+        if (bufferMap.isEmpty()) {
+            // bufferMap may have been flushed by other threads
+            return false;
+        }
         if (null == bufferKey) {
             boolean flush = false;
             for (String key : bufferMap.keySet()) {
@@ -270,7 +274,7 @@ public class DorisBatchStreamLoad implements Serializable {
         } else if (bufferMap.containsKey(bufferKey)) {
             flushBuffer(bufferKey);
         } else {
-            throw new DorisBatchLoadException("buffer not found for key: " + 
bufferKey);
+            LOG.warn("buffer not found for key: {}, may be already flushed.", 
bufferKey);
         }
         if (waitUtilDone) {
             waitAsyncLoadFinish();
@@ -281,6 +285,7 @@ public class DorisBatchStreamLoad implements Serializable {
     private synchronized void flushBuffer(String bufferKey) {
         BatchRecordBuffer buffer = bufferMap.get(bufferKey);
         
buffer.setLabelName(labelGenerator.generateBatchLabel(buffer.getTable()));
+        LOG.debug("flush buffer for key {} with label {}", bufferKey, 
buffer.getLabelName());
         putRecordToFlushQueue(buffer);
         bufferMap.remove(bufferKey);
     }
@@ -408,11 +413,6 @@ public class DorisBatchStreamLoad implements Serializable {
                             load(bf.getLabelName(), bf);
                         }
                     }
-
-                    if (flushQueue.size() < flushQueueSize) {
-                        // Avoid waiting for 2 rounds of intervalMs
-                        doFlush(null, false, false);
-                    }
                 } catch (Exception e) {
                     LOG.error("worker running error", e);
                     exception.set(e);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to