This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 2c6ad830d0d Pipe: Fixed the bug that the insertion for newer tsFile in 
one region may report progress beyond the older tsFile when it is not flushed 
(#15515) (#15596)
2c6ad830d0d is described below

commit 2c6ad830d0dee5dbef679d1fb107596c231fa5eb
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 29 18:15:13 2025 +0800

    Pipe: Fixed the bug that the insertion for newer tsFile in one region may 
report progress beyond the older tsFile when it is not flushed (#15515) (#15596)
---
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  9 ++--
 .../db/pipe/event/realtime/PipeRealtimeEvent.java  |  5 ++
 .../PipeRealtimeDataRegionHybridExtractor.java     |  3 ++
 .../realtime/assigner/PipeDataRegionAssigner.java  | 34 ++++++++++----
 ...ava => PipeTsFileEpochProgressIndexKeeper.java} | 53 ++++++++--------------
 .../dataregion/realtime/epoch/TsFileEpoch.java     | 14 ++++--
 .../dataregion/tsfile/TsFileResource.java          | 10 ++--
 7 files changed, 67 insertions(+), 61 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 20dee1bf567..b648af30e6f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
-import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
@@ -342,11 +342,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
 
   public void eliminateProgressIndex() {
     if (Objects.isNull(overridingProgressIndex)) {
-      PipeTimePartitionProgressIndexKeeper.getInstance()
-          .eliminateProgressIndex(
-              resource.getDataRegionId(),
-              resource.getTimePartition(),
-              resource.getMaxProgressIndexAfterClose());
+      PipeTsFileEpochProgressIndexKeeper.getInstance()
+          .eliminateProgressIndex(resource.getDataRegionId(), 
resource.getTsFilePath());
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 3818d0ac53b..d4a39193467 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -130,6 +130,11 @@ public class PipeRealtimeEvent extends EnrichedEvent {
     return event.internallyDecreaseResourceReferenceCount(holderMessage);
   }
 
+  @Override
+  public void bindProgressIndex(final ProgressIndex progressIndex) {
+    event.bindProgressIndex(progressIndex);
+  }
+
   @Override
   public ProgressIndex getProgressIndex() {
     return event.getProgressIndex();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index a4cebff2806..825dfd11345 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEve
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
 import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
 import 
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
@@ -471,6 +472,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends 
PipeRealtimeDataRegio
     switch (state) {
       case USING_TSFILE:
         // If the state is USING_TSFILE, discard the event and poll the next 
one.
+        PipeTsFileEpochProgressIndexKeeper.getInstance()
+            .eliminateProgressIndex(dataRegionId, 
event.getTsFileEpoch().getFilePath());
         return null;
       case EMPTY:
       case USING_TABLET:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 401cf9d4eb6..ed91f636ac1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
@@ -61,7 +62,7 @@ public class PipeDataRegionAssigner implements Closeable {
 
   private int counter = 0;
 
-  private final AtomicReference<ProgressIndex> 
maxProgressIndexForTsFileInsertionEvent =
+  private final AtomicReference<ProgressIndex> 
maxProgressIndexForRealtimeEvent =
       new AtomicReference<>(MinimumProgressIndex.INSTANCE);
 
   private final PipeEventCounter eventCounter = new 
PipeDataRegionEventCounter();
@@ -170,7 +171,11 @@ public class PipeDataRegionAssigner implements Closeable {
                     (PipeTsFileInsertionEvent) innerEvent;
                 tsFileInsertionEvent.disableMod4NonTransferPipes(
                     extractor.isShouldTransferModFile());
-                
bindOrUpdateProgressIndexForTsFileInsertionEvent(tsFileInsertionEvent);
+              }
+
+              if (innerEvent instanceof PipeTsFileInsertionEvent
+                  || innerEvent instanceof PipeInsertNodeTabletInsertionEvent) 
{
+                bindOrUpdateProgressIndexForRealtimeEvent(copiedEvent);
               }
 
               if 
(!copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
@@ -183,25 +188,34 @@ public class PipeDataRegionAssigner implements Closeable {
             });
   }
 
-  private void bindOrUpdateProgressIndexForTsFileInsertionEvent(
-      final PipeTsFileInsertionEvent event) {
-    if (PipeTimePartitionProgressIndexKeeper.getInstance()
+  private void bindOrUpdateProgressIndexForRealtimeEvent(final 
PipeRealtimeEvent event) {
+    if (PipeTsFileEpochProgressIndexKeeper.getInstance()
         .isProgressIndexAfterOrEquals(
-            dataRegionId, event.getTimePartitionId(), 
event.forceGetProgressIndex())) {
-      event.bindProgressIndex(maxProgressIndexForTsFileInsertionEvent.get());
+            dataRegionId,
+            event.getTsFileEpoch().getFilePath(),
+            getProgressIndex4RealtimeEvent(event))) {
+      event.bindProgressIndex(maxProgressIndexForRealtimeEvent.get());
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug(
             "Data region {} bind {} to event {} because it was flushed 
prematurely.",
             dataRegionId,
-            maxProgressIndexForTsFileInsertionEvent,
+            maxProgressIndexForRealtimeEvent,
             event.coreReportMessage());
       }
     } else {
-      maxProgressIndexForTsFileInsertionEvent.updateAndGet(
-          index -> 
index.updateToMinimumEqualOrIsAfterProgressIndex(event.forceGetProgressIndex()));
+      maxProgressIndexForRealtimeEvent.updateAndGet(
+          index ->
+              index.updateToMinimumEqualOrIsAfterProgressIndex(
+                  getProgressIndex4RealtimeEvent(event)));
     }
   }
 
+  private ProgressIndex getProgressIndex4RealtimeEvent(final PipeRealtimeEvent 
event) {
+    return event.getEvent() instanceof PipeTsFileInsertionEvent
+        ? ((PipeTsFileInsertionEvent) event.getEvent()).forceGetProgressIndex()
+        : event.getProgressIndex();
+  }
+
   public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) {
     matcher.register(extractor);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
similarity index 55%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
index 893b832b4cb..da2cde90667 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
@@ -21,61 +21,44 @@ package 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 
-import org.apache.tsfile.utils.Pair;
-
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 
-public class PipeTimePartitionProgressIndexKeeper {
+public class PipeTsFileEpochProgressIndexKeeper {
 
-  // data region id -> (time partition id, <max progress index, is valid>)
-  private final Map<String, Map<Long, Pair<ProgressIndex, Boolean>>> 
progressIndexKeeper =
+  // data region id -> (tsFile path, max progress index)
+  private final Map<String, Map<String, ProgressIndex>> progressIndexKeeper =
       new ConcurrentHashMap<>();
 
   public synchronized void updateProgressIndex(
-      final String dataRegionId, final long timePartitionId, final 
ProgressIndex progressIndex) {
+      final String dataRegionId, final String tsFileName, final ProgressIndex 
progressIndex) {
     progressIndexKeeper
         .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
         .compute(
-            timePartitionId,
-            (k, v) -> {
-              if (v == null) {
-                return new Pair<>(progressIndex, true);
-              }
-              return new Pair<>(
-                  
v.getLeft().updateToMinimumEqualOrIsAfterProgressIndex(progressIndex), true);
-            });
+            tsFileName,
+            (k, v) ->
+                v == null
+                    ? progressIndex
+                    : 
v.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex));
   }
 
   public synchronized void eliminateProgressIndex(
-      final String dataRegionId, final long timePartitionId, final 
ProgressIndex progressIndex) {
+      final String dataRegionId, final String filePath) {
     progressIndexKeeper
         .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
-        .compute(
-            timePartitionId,
-            (k, v) -> {
-              if (v == null) {
-                return null;
-              }
-              if (v.getRight() && !v.getLeft().isAfter(progressIndex)) {
-                return new Pair<>(v.getLeft(), false);
-              }
-              return v;
-            });
+        .remove(filePath);
   }
 
   public synchronized boolean isProgressIndexAfterOrEquals(
-      final String dataRegionId, final long timePartitionId, final 
ProgressIndex progressIndex) {
+      final String dataRegionId, final String tsFilePath, final ProgressIndex 
progressIndex) {
     return progressIndexKeeper
         .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
         .entrySet()
         .stream()
-        .filter(entry -> entry.getKey() != timePartitionId)
+        .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
         .map(Entry::getValue)
-        .filter(pair -> pair.right)
-        .map(Pair::getLeft)
         .filter(Objects::nonNull)
         .anyMatch(index -> !index.isAfter(progressIndex));
   }
@@ -84,19 +67,19 @@ public class PipeTimePartitionProgressIndexKeeper {
 
   private static class PipeTimePartitionProgressIndexKeeperHolder {
 
-    private static final PipeTimePartitionProgressIndexKeeper INSTANCE =
-        new PipeTimePartitionProgressIndexKeeper();
+    private static final PipeTsFileEpochProgressIndexKeeper INSTANCE =
+        new PipeTsFileEpochProgressIndexKeeper();
 
     private PipeTimePartitionProgressIndexKeeperHolder() {
       // empty constructor
     }
   }
 
-  public static PipeTimePartitionProgressIndexKeeper getInstance() {
-    return 
PipeTimePartitionProgressIndexKeeper.PipeTimePartitionProgressIndexKeeperHolder.INSTANCE;
+  public static PipeTsFileEpochProgressIndexKeeper getInstance() {
+    return 
PipeTsFileEpochProgressIndexKeeper.PipeTimePartitionProgressIndexKeeperHolder.INSTANCE;
   }
 
-  private PipeTimePartitionProgressIndexKeeper() {
+  private PipeTsFileEpochProgressIndexKeeper() {
     // empty constructor
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpoch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpoch.java
index 919ed20483b..c2db4c77c86 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpoch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpoch.java
@@ -34,20 +34,20 @@ public class TsFileEpoch {
       dataRegionExtractor2State;
   private final AtomicLong insertNodeMinTime;
 
-  public TsFileEpoch(String filePath) {
+  public TsFileEpoch(final String filePath) {
     this.filePath = filePath;
     this.dataRegionExtractor2State = new ConcurrentHashMap<>();
     this.insertNodeMinTime = new AtomicLong(Long.MAX_VALUE);
   }
 
-  public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor extractor) 
{
+  public TsFileEpoch.State getState(final PipeRealtimeDataRegionExtractor 
extractor) {
     return dataRegionExtractor2State
         .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
         .get();
   }
 
   public void migrateState(
-      PipeRealtimeDataRegionExtractor extractor, TsFileEpochStateMigrator 
visitor) {
+      final PipeRealtimeDataRegionExtractor extractor, final 
TsFileEpochStateMigrator visitor) {
     dataRegionExtractor2State
         .computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
         .getAndUpdate(visitor::migrate);
@@ -60,7 +60,7 @@ public class TsFileEpoch {
                 .setRecentProcessedTsFileEpochState(extractor.getTaskID(), 
state.get()));
   }
 
-  public void updateInsertNodeMinTime(long newComingMinTime) {
+  public void updateInsertNodeMinTime(final long newComingMinTime) {
     insertNodeMinTime.updateAndGet(recordedMinTime -> 
Math.min(recordedMinTime, newComingMinTime));
   }
 
@@ -68,6 +68,10 @@ public class TsFileEpoch {
     return insertNodeMinTime.get();
   }
 
+  public String getFilePath() {
+    return filePath;
+  }
+
   @Override
   public String toString() {
     return "TsFileEpoch{"
@@ -90,7 +94,7 @@ public class TsFileEpoch {
 
     private final int id;
 
-    State(int id) {
+    State(final int id) {
       this.id = id;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 9525b57e75b..87ae50e872f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.load.PartitionViolationException;
-import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.InsertionCompactionCandidateStatus;
@@ -1200,8 +1200,8 @@ public class TsFileResource {
             ? progressIndex
             : 
maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex));
 
-    PipeTimePartitionProgressIndexKeeper.getInstance()
-        .updateProgressIndex(getDataRegionId(), getTimePartition(), 
maxProgressIndex);
+    PipeTsFileEpochProgressIndexKeeper.getInstance()
+        .updateProgressIndex(getDataRegionId(), getTsFilePath(), 
maxProgressIndex);
   }
 
   public void setProgressIndex(ProgressIndex progressIndex) {
@@ -1211,8 +1211,8 @@ public class TsFileResource {
 
     maxProgressIndex = progressIndex;
 
-    PipeTimePartitionProgressIndexKeeper.getInstance()
-        .updateProgressIndex(getDataRegionId(), getTimePartition(), 
maxProgressIndex);
+    PipeTsFileEpochProgressIndexKeeper.getInstance()
+        .updateProgressIndex(getDataRegionId(), getTsFilePath(), 
maxProgressIndex);
   }
 
   public ProgressIndex getMaxProgressIndexAfterClose() throws 
IllegalStateException {

Reply via email to