This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 1547dabe56d Pipe: rebind progress index to prematurely flushed tsfile
insertion event to avoid data loss when syncing data between clusters (#13519)
1547dabe56d is described below
commit 1547dabe56d3933c74e9dbe3bd41ef20214ea34b
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Sep 18 10:14:11 2024 +0800
Pipe: rebind progress index to prematurely flushed tsfile insertion event
to avoid data loss when syncing data between clusters (#13519)
---
.../common/tsfile/PipeTsFileInsertionEvent.java | 29 +++++-
.../realtime/assigner/PipeDataRegionAssigner.java | 29 +++++-
.../PipeTimePartitionProgressIndexKeeper.java | 100 +++++++++++++++++++++
.../dataregion/tsfile/TsFileResource.java | 7 ++
.../db/pipe/extractor/PipeRealtimeExtractTest.java | 7 ++
.../TsFileResourceProgressIndexTest.java | 5 +-
6 files changed, 173 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 9fab25ee38b..f951cb23d72 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
@@ -72,6 +73,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
// May be updated after it is flushed. Should be negative if not set.
private long flushPointCount = TsFileProcessor.FLUSH_POINT_COUNT_NOT_SET;
+ private ProgressIndex overridingProgressIndex;
+
public PipeTsFileInsertionEvent(
final TsFileResource resource,
final boolean isLoaded,
@@ -228,6 +231,10 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
return flushPointCount;
}
+ public long getTimePartitionId() {
+ return resource.getTimePartition();
+ }
+
/////////////////////////// EnrichedEvent ///////////////////////////
@Override
@@ -266,6 +273,11 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
}
}
+ @Override
+ public void bindProgressIndex(final ProgressIndex overridingProgressIndex) {
+ this.overridingProgressIndex = overridingProgressIndex;
+ }
+
@Override
public ProgressIndex getProgressIndex() {
try {
@@ -275,6 +287,9 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
tsFile);
return MinimumProgressIndex.INSTANCE;
}
+ if (Objects.nonNull(overridingProgressIndex)) {
+ return overridingProgressIndex;
+ }
return resource.getMaxProgressIndexAfterClose();
} catch (final InterruptedException e) {
LOGGER.warn(
@@ -285,6 +300,18 @@ public class PipeTsFileInsertionEvent extends
EnrichedEvent implements TsFileIns
}
}
+ @Override
+ protected void reportProgress() {
+ super.reportProgress();
+ if (Objects.isNull(overridingProgressIndex)) {
+ PipeTimePartitionProgressIndexKeeper.getInstance()
+ .eliminateProgressIndex(
+ resource.getDataRegionId(),
+ resource.getTimePartition(),
+ resource.getMaxProgressIndexAfterClose());
+ }
+ }
+
@Override
public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
@@ -340,7 +367,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
implements TsFileIns
// TODO: use IDeviceID
deviceID ->
pipePattern.mayOverlapWithDevice(((PlainDeviceID)
deviceID).toStringID()));
- } catch (final IOException e) {
+ } catch (final Exception e) {
LOGGER.warn(
"Pipe {}: failed to get devices from TsFile {}, extract it anyway",
pipeName,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index c36a8877dff..849c2eecf56 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
@@ -57,6 +59,8 @@ public class PipeDataRegionAssigner implements Closeable {
private int counter = 0;
+ private ProgressIndex maxProgressIndexForTsFileInsertionEvent =
MinimumProgressIndex.INSTANCE;
+
public String getDataRegionId() {
return dataRegionId;
}
@@ -128,8 +132,11 @@ public class PipeDataRegionAssigner implements Closeable {
extractor.getRealtimeDataExtractionEndTime());
final EnrichedEvent innerEvent = copiedEvent.getEvent();
if (innerEvent instanceof PipeTsFileInsertionEvent) {
- ((PipeTsFileInsertionEvent) innerEvent)
-
.disableMod4NonTransferPipes(extractor.isShouldTransferModFile());
+ final PipeTsFileInsertionEvent tsFileInsertionEvent =
+ (PipeTsFileInsertionEvent) innerEvent;
+ tsFileInsertionEvent.disableMod4NonTransferPipes(
+ extractor.isShouldTransferModFile());
+
bindOrUpdateProgressIndexForTsFileInsertionEvent(tsFileInsertionEvent);
}
if
(!copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
@@ -148,6 +155,24 @@ public class PipeDataRegionAssigner implements Closeable {
event.decreaseReferenceCount(PipeDataRegionAssigner.class.getName(),
false);
}
+ private void bindOrUpdateProgressIndexForTsFileInsertionEvent(
+ final PipeTsFileInsertionEvent event) {
+ if (PipeTimePartitionProgressIndexKeeper.getInstance()
+ .isProgressIndexAfterOrEquals(
+ dataRegionId, event.getTimePartitionId(),
event.getProgressIndex())) {
+
event.bindProgressIndex(maxProgressIndexForTsFileInsertionEvent.deepCopy());
+ LOGGER.warn(
+ "Data region {} bind {} to event {} because it was flushed
prematurely.",
+ dataRegionId,
+ maxProgressIndexForTsFileInsertionEvent,
+ event);
+ } else {
+ maxProgressIndexForTsFileInsertionEvent =
+
maxProgressIndexForTsFileInsertionEvent.updateToMinimumEqualOrIsAfterProgressIndex(
+ event.getProgressIndex());
+ }
+ }
+
public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) {
matcher.register(extractor);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java
new file mode 100644
index 00000000000..f4f81e48fe1
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
+
+import org.apache.iotdb.commons.consensus.index.ProgressIndex;
+
+import org.apache.tsfile.utils.Pair;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PipeTimePartitionProgressIndexKeeper {
+
+ // data region id -> (time partition id, <max progress index, is valid>)
+ private final Map<String, Map<Long, Pair<ProgressIndex, Boolean>>>
progressIndexKeeper =
+ new ConcurrentHashMap<>();
+
+ public synchronized void updateProgressIndex(
+ final String dataRegionId, final long timePartitionId, final
ProgressIndex progressIndex) {
+ progressIndexKeeper
+ .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+ .compute(
+ timePartitionId,
+ (k, v) -> {
+ if (v == null) {
+ return new Pair<>(progressIndex.deepCopy(), true);
+ }
+ return new Pair<>(
+
v.getLeft().updateToMinimumEqualOrIsAfterProgressIndex(progressIndex), true);
+ });
+ }
+
+ public synchronized void eliminateProgressIndex(
+ final String dataRegionId, final long timePartitionId, final
ProgressIndex progressIndex) {
+ progressIndexKeeper
+ .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+ .compute(
+ timePartitionId,
+ (k, v) -> {
+ if (v == null) {
+ return null;
+ }
+ if (v.getRight() && v.getLeft().equals(progressIndex)) {
+ return new Pair<>(v.getLeft(), false);
+ }
+ return v;
+ });
+ }
+
+ public synchronized boolean isProgressIndexAfterOrEquals(
+ final String dataRegionId, final long timePartitionId, final
ProgressIndex progressIndex) {
+ return progressIndexKeeper
+ .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
+ .entrySet()
+ .stream()
+ .filter(entry -> entry.getKey() != timePartitionId)
+ .map(Entry::getValue)
+ .filter(pair -> pair.right)
+ .map(Pair::getLeft)
+ .anyMatch(index -> progressIndex.isAfter(index) ||
progressIndex.equals(index));
+ }
+
+ //////////////////////////// singleton ////////////////////////////
+
+ private static class PipeTimePartitionProgressIndexKeeperHolder {
+
+ private static final PipeTimePartitionProgressIndexKeeper INSTANCE =
+ new PipeTimePartitionProgressIndexKeeper();
+
+ private PipeTimePartitionProgressIndexKeeperHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeTimePartitionProgressIndexKeeper getInstance() {
+ return
PipeTimePartitionProgressIndexKeeper.PipeTimePartitionProgressIndexKeeperHolder.INSTANCE;
+ }
+
+ private PipeTimePartitionProgressIndexKeeper() {
+ // empty constructor
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 10033eb7381..2d08f7d4a68 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.PartitionViolationException;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.InsertionCompactionCandidateStatus;
@@ -1137,6 +1138,9 @@ public class TsFileResource {
(maxProgressIndex == null
? progressIndex.deepCopy()
:
maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex));
+
+ PipeTimePartitionProgressIndexKeeper.getInstance()
+ .updateProgressIndex(getDataRegionId(), getTimePartition(),
maxProgressIndex);
}
public void setProgressIndex(ProgressIndex progressIndex) {
@@ -1145,6 +1149,9 @@ public class TsFileResource {
}
maxProgressIndex = progressIndex.deepCopy();
+
+ PipeTimePartitionProgressIndexKeeper.getInstance()
+ .updateProgressIndex(getDataRegionId(), getTimePartition(),
maxProgressIndex);
}
public ProgressIndex getMaxProgressIndexAfterClose() throws
IllegalStateException {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
index cec7a5f29e1..46e95791706 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/extractor/PipeRealtimeExtractTest.java
@@ -272,6 +272,13 @@ public class PipeRealtimeExtractTest {
resource.updateStartTime(
new PlainDeviceID(String.join(TsFileConstant.PATH_SEPARATOR,
device)), 0);
+ try {
+ resource.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+
PipeInsertionDataNodeListener.getInstance()
.listenToInsertNode(
dataRegionId,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
index 3c5fd714d8e..bb36674a7bb 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
@@ -40,6 +40,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.Spy;
import javax.annotation.Nonnull;
@@ -59,7 +61,7 @@ public class TsFileResourceProgressIndexTest {
private final File file =
new File(
TsFileNameGenerator.generateNewTsFilePath(TestConstant.BASE_OUTPUT_PATH, 1, 1,
1, 1));
- private final TsFileResource tsFileResource = new TsFileResource(file);
+ @Spy private TsFileResource tsFileResource = Mockito.spy(new
TsFileResource(file));
private final Map<IDeviceID, Integer> deviceToIndex = new HashMap<>();
private final long[] startTimes = new long[DEVICE_NUM];
private final long[] endTimes = new long[DEVICE_NUM];
@@ -81,6 +83,7 @@ public class TsFileResourceProgressIndexTest {
});
tsFileResource.setTimeIndex(deviceTimeIndex);
tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
+ Mockito.doReturn("1").when(tsFileResource).getDataRegionId();
IntStream.range(0, INDEX_NUM).forEach(i -> indexList.add(new
MockProgressIndex(i)));
}