This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 82139d433eb Pipe: fix losing points when enable batch mode introduced 
by progress commit framework refactor (#11722)
82139d433eb is described below

commit 82139d433eb386e8da9f61325b89d11e533c1a7a
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Dec 19 10:01:39 2023 +0800

    Pipe: fix losing points when enable batch mode introduced by progress 
commit framework refactor (#11722)
    
    With the introduction of `PipeEventCommitter`, each DR assigns commit id 
independently to events. Events with the same commit id in different DRs may be 
lost if using commit id as a condition for duplication check.
---
 .../payload/evolvable/builder/PipeTransferBatchReqBuilder.java       | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
index 9367ec2c184..a4e43f84516 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/builder/PipeTransferBatchReqBuilder.java
@@ -114,8 +114,9 @@ public abstract class PipeTransferBatchReqBuilder 
implements AutoCloseable {
     final TPipeTransferReq req = buildTabletInsertionReq(event);
     final long requestCommitId = ((EnrichedEvent) event).getCommitId();
 
-    if (requestCommitIds.isEmpty()
-        || !requestCommitIds.get(requestCommitIds.size() - 
1).equals(requestCommitId)) {
+    // The deduplication logic here is to avoid the accumulation of the same 
event in a batch when
+    // retrying.
+    if ((events.isEmpty() || !events.get(events.size() - 1).equals(event))) {
       reqs.add(req);
       events.add(event);
       requestCommitIds.add(requestCommitId);

Reply via email to