This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 5449864  [Bug] fix stopload bug when restore from checkpoint (#232)
5449864 is described below

commit 544986419c70b755b62ea1435d8694410ab0a49a
Author: wudi <[email protected]>
AuthorDate: Tue Nov 21 21:42:23 2023 +0800

    [Bug] fix stopload bug when restore from checkpoint (#232)
    
    If you restore from checkpoint, the checkpoint table is tb1, but the newly 
imported table does not have tb1. You can abort normally, but an error will be 
reported during stopload.
---
 .../main/java/org/apache/doris/flink/sink/writer/DorisWriter.java | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 8550a21..c5ce847 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -209,13 +209,15 @@ public class DorisWriter<IN> implements 
StatefulSink.StatefulSinkWriter<IN, Dori
         }
         // disable exception checker before stop load.
         globalLoading = false;
-        // clean loadingMap
-        loadingMap.clear();
 
         // submit stream load http request
         List<DorisCommittable> committableList = new ArrayList<>();
         for(Map.Entry<String, DorisStreamLoad> streamLoader : 
dorisStreamLoadMap.entrySet()){
             String tableIdentifier = streamLoader.getKey();
+            if(!loadingMap.getOrDefault(tableIdentifier, false)){
+                LOG.debug("skip table {}, no data need to load.", 
tableIdentifier);
+                continue;
+            }
             DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
             LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
             String currentLabel = 
labelGenerator.generateTableLabel(curCheckpointId);
@@ -229,6 +231,8 @@ public class DorisWriter<IN> implements 
StatefulSink.StatefulSinkWriter<IN, Dori
                 committableList.add(new 
DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), 
txnId));
             }
         }
+        // clean loadingMap
+        loadingMap.clear();
         return committableList;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to