This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 75ce7711162 Pipe: fixed the bug that region follower tsfiles can not
record the IoTProgressIndex from region leader (#11980)
75ce7711162 is described below
commit 75ce7711162ce4a14a4e90463cc0f24846e03560
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jan 26 16:49:34 2024 +0800
Pipe: fixed the bug that region follower tsfiles can not record the
IoTProgressIndex from region leader (#11980)
---
.../dataregion/IoTConsensusDataRegionStateMachine.java | 2 +-
.../historical/PipeHistoricalDataRegionTsFileExtractor.java | 10 ++++++++--
.../db/storageengine/dataregion/tsfile/TsFileResource.java | 1 +
3 files changed, 10 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
index fb7ddd181b1..a1901031ac3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/IoTConsensusDataRegionStateMachine.java
@@ -90,7 +90,7 @@ public class IoTConsensusDataRegionStateMachine extends
DataRegionStateMachine {
final PlanNode planNode = grabInsertNode(indexedRequest);
if (planNode instanceof ComparableConsensusRequest) {
final IoTProgressIndex ioTProgressIndex =
- new IoTProgressIndex(batchRequest.getSourcePeerId(),
indexedRequest.getSearchIndex());
+ new IoTProgressIndex(batchRequest.getSourcePeerId(),
indexedRequest.getSyncIndex());
((ComparableConsensusRequest)
planNode).setProgressIndex(ioTProgressIndex);
}
deserializedRequest.add(planNode);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
index a64d2a4ae7f..26ac4da4d3f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -329,9 +329,12 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
tsFileManager.getTsFileList(true).stream()
.filter(
resource ->
- // Some resource may be not closed due to the control
of
+ // Some resource may not be closed due to the control
of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore
them.
resource.isClosed()
+ // Some different tsFiles may share the same max
progressIndex, thus
+ // tsFiles with an "equals" max progressIndex must
be transmitted to
+ // avoid data loss
&&
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
@@ -342,9 +345,12 @@ public class PipeHistoricalDataRegionTsFileExtractor
implements PipeHistoricalDa
tsFileManager.getTsFileList(false).stream()
.filter(
resource ->
- // Some resource may be not closed due to the control
of
+ // Some resource may not be closed due to the control
of
// PIPE_MIN_FLUSH_INTERVAL_IN_MS. We simply ignore
them.
resource.isClosed()
+ // Some different tsFiles may share the same max
progressIndex, thus
+ // tsFiles with an "equals" max progressIndex must
be transmitted to
+ // avoid data loss
&&
!startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
&&
isTsFileResourceOverlappedWithTimeRange(resource)
&&
isTsFileGeneratedAfterExtractionTimeLowerBound(resource))
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index fac186a1c9f..ecd06a68241 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.storageengine.dataregion.tsfile;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;