This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 5449864 [Bug] fix stopload bug when restore from checkpoint (#232)
5449864 is described below
commit 544986419c70b755b62ea1435d8694410ab0a49a
Author: wudi <[email protected]>
AuthorDate: Tue Nov 21 21:42:23 2023 +0800
[Bug] fix stopload bug when restore from checkpoint (#232)
If you restore from checkpoint, the checkpoint table is tb1, but the newly
imported table does not have tb1. You can abort normally, but an error will be
reported during stopload.
---
.../main/java/org/apache/doris/flink/sink/writer/DorisWriter.java | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 8550a21..c5ce847 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -209,13 +209,15 @@ public class DorisWriter<IN> implements
StatefulSink.StatefulSinkWriter<IN, Dori
}
// disable exception checker before stop load.
globalLoading = false;
- // clean loadingMap
- loadingMap.clear();
// submit stream load http request
List<DorisCommittable> committableList = new ArrayList<>();
for(Map.Entry<String, DorisStreamLoad> streamLoader :
dorisStreamLoadMap.entrySet()){
String tableIdentifier = streamLoader.getKey();
+ if(!loadingMap.getOrDefault(tableIdentifier, false)){
+ LOG.debug("skip table {}, no data need to load.",
tableIdentifier);
+ continue;
+ }
DorisStreamLoad dorisStreamLoad = streamLoader.getValue();
LabelGenerator labelGenerator = getLabelGenerator(tableIdentifier);
String currentLabel =
labelGenerator.generateTableLabel(curCheckpointId);
@@ -229,6 +231,8 @@ public class DorisWriter<IN> implements
StatefulSink.StatefulSinkWriter<IN, Dori
committableList.add(new
DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(),
txnId));
}
}
+ // clean loadingMap
+ loadingMap.clear();
return committableList;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]