This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 75ce7711162 Pipe: fixed the bug that region follower tsfiles can not 
record the IoTProgressIndex from region leader (#11980)
75ce7711162 is described below

commit 75ce7711162ce4a14a4e90463cc0f24846e03560
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jan 26 16:49:34 2024 +0800

    Pipe: fixed the bug that region follower tsfiles can not record the 
IoTProgressIndex from region leader (#11980)
---
 .../dataregion/IoTConsensusDataRegionStateMachine.java         |  2 +-
 .../historical/PipeHistoricalDataRegionTsFileExtractor.java    | 10 ++++++++--
 .../db/storageengine/dataregion/tsfile/TsFileResource.java     |  1 +
 3 files changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
index fb7ddd181b1..a1901031ac3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
@@ -90,7 +90,7 @@ public class IoTConsensusDataRegionStateMachine extends 
DataRegionStateMachine {
         final PlanNode planNode = grabInsertNode(indexedRequest);
         if (planNode instanceof ComparableConsensusRequest) {
           final IoTProgressIndex ioTProgressIndex =
-              new IoTProgressIndex(batchRequest.getSourcePeerId(), 
indexedRequest.getSearchIndex());
+              new IoTProgressIndex(batchRequest.getSourcePeerId(), 
indexedRequest.getSyncIndex());
           ((ComparableConsensusRequest) 
planNode).setProgressIndex(ioTProgressIndex);
         }
         deserializedRequest.add(planNode);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index a64d2a4ae7f..26ac4da4d3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -329,9 +329,12 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             tsFileManager.getTsFileList(true).stream()
                 .filter(
                     resource ->
-                        // Some resource may be not closed due to the control 
of
+                        // Some resource may not be closed due to the control 
of
                         // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore 
them.
                         resource.isClosed()
+                            // Some different tsFiles may share the same max 
progressIndex, thus
+                            // tsFiles with an "equals" max progressIndex must 
be transmitted to
+                            // avoid data loss
                             && 
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
@@ -342,9 +345,12 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
             tsFileManager.getTsFileList(false).stream()
                 .filter(
                     resource ->
-                        // Some resource may be not closed due to the control 
of
+                        // Some resource may not be closed due to the control 
of
                         // PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore 
them.
                         resource.isClosed()
+                            // Some different tsFiles may share the same max 
progressIndex, thus
+                            // tsFiles with an "equals" max progressIndex must 
be transmitted to
+                            // avoid data loss
                             && 
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
                             && 
isTsFileResourceOverlappedWithTimeRange(resource)
                             && 
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index fac186a1c9f..ecd06a68241 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.storageengine.dataregion.tsfile;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;

Reply via email to