This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 82139d433eb Pipe: fix losing points when enable batch mode introduced
by progress commit framework refactor (#11722)
82139d433eb is described below
commit 82139d433eb386e8da9f61325b89d11e533c1a7a
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Dec 19 10:01:39 2023 +0800
Pipe: fix losing points when enable batch mode introduced by progress
commit framework refactor (#11722)
With the introduction of `PipeEventCommitter`, each DR assigns commit id
independently to events. Events with the same commit id in different DRs may be
lost if using commit id as a condition for duplication check.
---
.../payload/evolvable/builder/PipeTransferBatchReqBuilder.java | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index 9367ec2c184..a4e43f84516 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -114,8 +114,9 @@ public abstract class PipeTransferBatchReqBuilder
implements AutoCloseable {
final TPipeTransferReq req = buildTabletInsertionReq(event);
final long requestCommitId = ((EnrichedEvent) event).getCommitId();
- if (requestCommitIds.isEmpty()
- || !requestCommitIds.get(requestCommitIds.size() -
1).equals(requestCommitId)) {
+ // The deduplication logic here is to avoid the accumulation of the same
event in a batch when
+ // retrying.
+ if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) {
reqs.add(req);
events.add(event);
requestCommitIds.add(requestCommitId);