This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 1547dabe56d Pipe: rebind progress index to prematurely flushed tsfile 
insertion event to avoid data loss when syncing data between clusters (#13519)
1547dabe56d is described below

commit 1547dabe56d3933c74e9dbe3bd41ef20214ea34b
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Sep 18 10:14:11 2024 +0800

    Pipe: rebind progress index to prematurely flushed tsfile insertion event 
to avoid data loss when syncing data between clusters (#13519)
---
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  29 +++++-
 .../realtime/assigner/PipeDataRegionAssigner.java  |  29 +++++-
 .../PipeTimePartitionProgressIndexKeeper.java      | 100 +++++++++++++++++++++
 .../dataregion/tsfile/TsFileResource.java          |   7 ++
 .../db/pipe/extractor/PipeRealtimeExtractTest.java |   7 ++
 .../TsFileResourceProgressIndexTest.java           |   5 +-
 6 files changed, 173 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 9fab25ee38b..f951cb23d72 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
 import 
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
 import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
 import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
@@ -72,6 +73,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
   // May be updated after it is flushed. Should be negative if not set.
   private long flushPointCount = TsFileProcessor.FLUSH_POINT_COUNT_NOT_SET;
 
+  private ProgressIndex overridingProgressIndex;
+
   public PipeTsFileInsertionEvent(
       final TsFileResource resource,
       final boolean isLoaded,
@@ -228,6 +231,10 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
     return flushPointCount;
   }
 
+  public long getTimePartitionId() {
+    return resource.getTimePartition();
+  }
+
   /////////////////////////// EnrichedEvent ///////////////////////////
 
   @Override
@@ -266,6 +273,11 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
     }
   }
 
+  @Override
+  public void bindProgressIndex(final ProgressIndex overridingProgressIndex) {
+    this.overridingProgressIndex = overridingProgressIndex;
+  }
+
   @Override
   public ProgressIndex getProgressIndex() {
     try {
@@ -275,6 +287,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
             tsFile);
         return MinimumProgressIndex.INSTANCE;
       }
+      if (Objects.nonNull(overridingProgressIndex)) {
+        return overridingProgressIndex;
+      }
       return resource.getMaxProgressIndexAfterClose();
     } catch (final InterruptedException e) {
       LOGGER.warn(
@@ -285,6 +300,18 @@ public class PipeTsFileInsertionEvent extends 
EnrichedEvent implements TsFileIns
     }
   }
 
+  @Override
+  protected void reportProgress() {
+    super.reportProgress();
+    if (Objects.isNull(overridingProgressIndex)) {
+      PipeTimePartitionProgressIndexKeeper.getInstance()
+          .eliminateProgressIndex(
+              resource.getDataRegionId(),
+              resource.getTimePartition(),
+              resource.getMaxProgressIndexAfterClose());
+    }
+  }
+
   @Override
   public PipeTsFileInsertionEvent 
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
       final String pipeName,
@@ -340,7 +367,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent 
implements TsFileIns
               // TODO: use IDeviceID
               deviceID ->
                   pipePattern.mayOverlapWithDevice(((PlainDeviceID) 
deviceID).toStringID()));
-    } catch (final IOException e) {
+    } catch (final Exception e) {
       LOGGER.warn(
           "Pipe {}: failed to get devices from TsFile {}, extract it anyway",
           pipeName,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index c36a8877dff..849c2eecf56 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
 
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
@@ -57,6 +59,8 @@ public class PipeDataRegionAssigner implements Closeable {
 
   private int counter = 0;
 
+  private ProgressIndex maxProgressIndexForTsFileInsertionEvent = 
MinimumProgressIndex.INSTANCE;
+
   public String getDataRegionId() {
     return dataRegionId;
   }
@@ -128,8 +132,11 @@ public class PipeDataRegionAssigner implements Closeable {
                       extractor.getRealtimeDataExtractionEndTime());
               final EnrichedEvent innerEvent = copiedEvent.getEvent();
               if (innerEvent instanceof PipeTsFileInsertionEvent) {
-                ((PipeTsFileInsertionEvent) innerEvent)
-                    
.disableMod4NonTransferPipes(extractor.isShouldTransferModFile());
+                final PipeTsFileInsertionEvent tsFileInsertionEvent =
+                    (PipeTsFileInsertionEvent) innerEvent;
+                tsFileInsertionEvent.disableMod4NonTransferPipes(
+                    extractor.isShouldTransferModFile());
+                
bindOrUpdateProgressIndexForTsFileInsertionEvent(tsFileInsertionEvent);
               }
 
               if 
(!copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
@@ -148,6 +155,24 @@ public class PipeDataRegionAssigner implements Closeable {
     event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName(), 
false);
   }
 
+  private void bindOrUpdateProgressIndexForTsFileInsertionEvent(
+      final PipeTsFileInsertionEvent event) {
+    if (PipeTimePartitionProgressIndexKeeper.getInstance()
+        .isProgressIndexAfterOrEquals(
+            dataRegionId, event.getTimePartitionId(), 
event.getProgressIndex())) {
+      
event.bindProgressIndex(maxProgressIndexForTsFileInsertionEvent.deepCopy());
+      LOGGER.warn(
+          "Data region {} bind {} to event {} because it was flushed 
prematurely.",
+          dataRegionId,
+          maxProgressIndexForTsFileInsertionEvent,
+          event);
+    } else {
+      maxProgressIndexForTsFileInsertionEvent =
+          
maxProgressIndexForTsFileInsertionEvent.updateToMinimumEqualOrIsAfterProgressIndex(
+              event.getProgressIndex());
+    }
+  }
+
   public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) {
     matcher.register(extractor);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java
new file mode 100644
index 00000000000..f4f81e48fe1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeTimePartitionProgressIndexKeeper {
+
+  // data region id -> (time partition id, <max progress index, is valid>)
+  private final Map<String, Map<Long, Pair<ProgressIndex, Boolean>>> 
progressIndexKeeper =
+      new ConcurrentHashMap<>();
+
+  public synchronized void updateProgressIndex(
+      final String dataRegionId, final long timePartitionId, final 
ProgressIndex progressIndex) {
+    progressIndexKeeper
+        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+        .compute(
+            timePartitionId,
+            (k, v) -> {
+              if (v == null) {
+                return new Pair<>(progressIndex.deepCopy(), true);
+              }
+              return new Pair<>(
+                  
v.getLeft().updateToMinimumEqualOrIsAfterProgressIndex(progressIndex), true);
+            });
+  }
+
+  public synchronized void eliminateProgressIndex(
+      final String dataRegionId, final long timePartitionId, final 
ProgressIndex progressIndex) {
+    progressIndexKeeper
+        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+        .compute(
+            timePartitionId,
+            (k, v) -> {
+              if (v == null) {
+                return null;
+              }
+              if (v.getRight() && v.getLeft().equals(progressIndex)) {
+                return new Pair<>(v.getLeft(), false);
+              }
+              return v;
+            });
+  }
+
+  public synchronized boolean isProgressIndexAfterOrEquals(
+      final String dataRegionId, final long timePartitionId, final 
ProgressIndex progressIndex) {
+    return progressIndexKeeper
+        .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+        .entrySet()
+        .stream()
+        .filter(entry -> entry.getKey() != timePartitionId)
+        .map(Entry::getValue)
+        .filter(pair -> pair.right)
+        .map(Pair::getLeft)
+        .anyMatch(index -> progressIndex.isAfter(index) || 
progressIndex.equals(index));
+  }
+
+  //////////////////////////// singleton ////////////////////////////
+
+  private static class PipeTimePartitionProgressIndexKeeperHolder {
+
+    private static final PipeTimePartitionProgressIndexKeeper INSTANCE =
+        new PipeTimePartitionProgressIndexKeeper();
+
+    private PipeTimePartitionProgressIndexKeeperHolder() {
+      // empty constructor
+    }
+  }
+
+  public static PipeTimePartitionProgressIndexKeeper getInstance() {
+    return 
PipeTimePartitionProgressIndexKeeper.PipeTimePartitionProgressIndexKeeperHolder.INSTANCE;
+  }
+
+  private PipeTimePartitionProgressIndexKeeper() {
+    // empty constructor
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 10033eb7381..2d08f7d4a68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.PartitionViolationException;
+import 
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.InsertionCompactionCandidateStatus;
@@ -1137,6 +1138,9 @@ public class TsFileResource {
         (maxProgressIndex == null
             ? progressIndex.deepCopy()
             : 
maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex));
+
+    PipeTimePartitionProgressIndexKeeper.getInstance()
+        .updateProgressIndex(getDataRegionId(), getTimePartition(), 
maxProgressIndex);
   }
 
   public void setProgressIndex(ProgressIndex progressIndex) {
@@ -1145,6 +1149,9 @@ public class TsFileResource {
     }
 
     maxProgressIndex = progressIndex.deepCopy();
+
+    PipeTimePartitionProgressIndexKeeper.getInstance()
+        .updateProgressIndex(getDataRegionId(), getTimePartition(), 
maxProgressIndex);
   }
 
   public ProgressIndex getMaxProgressIndexAfterClose() throws 
IllegalStateException {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
index cec7a5f29e1..46e95791706 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
@@ -272,6 +272,13 @@ public class PipeRealtimeExtractTest {
             resource.updateStartTime(
                 new PlainDeviceID(String.join(TsFileConstant.PATH_SEPARATOR, 
device)), 0);
 
+            try {
+              resource.close();
+            } catch (IOException e) {
+              e.printStackTrace();
+              throw new RuntimeException(e);
+            }
+
             PipeInsertionDataNodeListener.getInstance()
                 .listenToInsertNode(
                     dataRegionId,
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
index 3c5fd714d8e..bb36674a7bb 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
@@ -40,6 +40,8 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.Spy;
 
 import javax.annotation.Nonnull;
 
@@ -59,7 +61,7 @@ public class TsFileResourceProgressIndexTest {
   private final File file =
       new File(
           
TsFileNameGenerator.generateNewTsFilePath(TestConstant.BASE_OUTPUT_PATH, 1, 1, 
1, 1));
-  private final TsFileResource tsFileResource = new TsFileResource(file);
+  @Spy private TsFileResource tsFileResource = Mockito.spy(new 
TsFileResource(file));
   private final Map<IDeviceID, Integer> deviceToIndex = new HashMap<>();
   private final long[] startTimes = new long[DEVICE_NUM];
   private final long[] endTimes = new long[DEVICE_NUM];
@@ -81,6 +83,7 @@ public class TsFileResourceProgressIndexTest {
             });
     tsFileResource.setTimeIndex(deviceTimeIndex);
     tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+    Mockito.doReturn("1").when(tsFileResource).getDataRegionId();
 
     IntStream.range(0, INDEX_NUM).forEach(i -> indexList.add(new 
MockProgressIndex(i)));
   }

Reply via email to