This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 360254be7c7 Fix recover duplicate (#12272)
360254be7c7 is described below
commit 360254be7c79af6e7a3472f15992baa5221596ba
Author: shuwenwei <[email protected]>
AuthorDate: Wed Apr 3 09:38:47 2024 +0800
Fix recover duplicate (#12272)
---
.../wal/recover/file/TsFilePlanRedoer.java | 8 +-
.../file/UnsealedTsFileRecoverPerformer.java | 12 ++-
.../wal/recover/file/TsFilePlanRedoerTest.java | 36 +++------
.../file/UnsealedTsFileRecoverPerformerTest.java | 87 ++++++++++++++++++++++
4 files changed, 110 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
index 2ecb64e28d2..1859b32fcc6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
@@ -41,15 +41,11 @@ import java.util.List;
*/
public class TsFilePlanRedoer {
private final TsFileResource tsFileResource;
- // only unsequence file tolerates duplicated data
- private final boolean sequence;
-
// store data when redoing logs
private IMemTable recoveryMemTable;
- public TsFilePlanRedoer(TsFileResource tsFileResource, boolean sequence) {
+ public TsFilePlanRedoer(TsFileResource tsFileResource) {
this.tsFileResource = tsFileResource;
- this.sequence = sequence;
this.recoveryMemTable =
new PrimitiveMemTable(tsFileResource.getDatabaseName(),
tsFileResource.getDataRegionId());
WritingMetrics.getInstance().recordActiveMemTableCount(tsFileResource.getDataRegionId(),
1);
@@ -89,7 +85,7 @@ public class TsFilePlanRedoer {
} else {
minTimeInNode = ((InsertTabletNode) node).getTimes()[0];
}
- if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTimeInNode &&
sequence) {
+ if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTimeInNode) {
return;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index d2b3fd07d9a..f22e4b30022 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.storageengine.dataregion.wal.recover.file;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
@@ -80,7 +81,7 @@ public class UnsealedTsFileRecoverPerformer extends
AbstractTsFileRecoverPerform
this.dataRegionId = tsFileResource.getDataRegionId();
this.sequence = sequence;
this.callbackAfterUnsealedTsFileRecovered =
callbackAfterUnsealedTsFileRecovered;
- this.walRedoer = new TsFilePlanRedoer(tsFileResource, sequence);
+ this.walRedoer = new TsFilePlanRedoer(tsFileResource);
this.recoverListener = new
WALRecoverListener(tsFileResource.getTsFilePath());
}
@@ -188,6 +189,15 @@ public class UnsealedTsFileRecoverPerformer extends
AbstractTsFileRecoverPerform
case MEMORY_TABLE_SNAPSHOT:
IMemTable memTable = (IMemTable) walEntry.getValue();
if (!memTable.isSignalMemTable()) {
+ if (tsFileResource != null) {
+ for (IDeviceID device : tsFileResource.getDevices()) {
+ memTable.delete(
+ new PartialPath(device, "*"),
+ new PartialPath(device),
+ tsFileResource.getStartTime(device),
+ tsFileResource.getEndTime(device));
+ }
+ }
walRedoer.resetRecoveryMemTable(memTable);
}
// update memtable's database and dataRegionId
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoerTest.java
index 8b9e0034aa9..de1662f303b 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoerTest.java
@@ -146,7 +146,7 @@ public class TsFilePlanRedoerTest {
});
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't
test IdTable here
- TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true);
+ TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoInsert(insertRowNode);
// check data in memTable
@@ -238,7 +238,7 @@ public class TsFilePlanRedoerTest {
});
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't
test IdTable here
- TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true);
+ TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoInsert(insertRowNode1);
planRedoer.redoInsert(insertRowNode2);
@@ -326,7 +326,7 @@ public class TsFilePlanRedoerTest {
});
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't
test IdTable here
- TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true);
+ TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoInsert(insertTabletNode);
// check data in memTable
@@ -432,7 +432,7 @@ public class TsFilePlanRedoerTest {
});
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't
test IdTable here
- TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true);
+ TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoInsert(insertTabletNode);
// check data in memTable
@@ -510,7 +510,7 @@ public class TsFilePlanRedoerTest {
times.length);
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't
test IdTable here
- TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true);
+ TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoInsert(insertTabletNode);
// check data in memTable
@@ -561,7 +561,7 @@ public class TsFilePlanRedoerTest {
times.length);
// redo InsertTabletPlan, vsg processor is used to test IdTable, don't
test IdTable here
- TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, false);
+ TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoInsert(insertTabletNode);
// check data in memTable
@@ -572,29 +572,13 @@ public class TsFilePlanRedoerTest {
DEVICE1_NAME, "s1", new MeasurementSchema("s1", TSDataType.INT32,
TSEncoding.RLE));
ReadOnlyMemChunk memChunk =
recoveryMemTable.query(new QueryContext(), fullPath, Long.MIN_VALUE,
null);
- IPointReader iterator = memChunk.getPointReader();
- int time = 1;
- while (iterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair = iterator.nextTimeValuePair();
- assertEquals(time, timeValuePair.getTimestamp());
- assertEquals(100, timeValuePair.getValue().getInt());
- ++time;
- }
- assertEquals(3, time);
+ assertTrue(memChunk == null || memChunk.isEmpty());
// check d1.s2
fullPath =
new MeasurementPath(
DEVICE1_NAME, "s2", new MeasurementSchema("s2", TSDataType.INT64,
TSEncoding.RLE));
memChunk = recoveryMemTable.query(new QueryContext(), fullPath,
Long.MIN_VALUE, null);
- iterator = memChunk.getPointReader();
- time = 1;
- while (iterator.hasNextTimeValuePair()) {
- TimeValuePair timeValuePair = iterator.nextTimeValuePair();
- assertEquals(time, timeValuePair.getTimestamp());
- assertEquals(10000, timeValuePair.getValue().getLong());
- ++time;
- }
- assertEquals(3, time);
+ assertTrue(memChunk == null || memChunk.isEmpty());
}
@Test
@@ -619,7 +603,7 @@ public class TsFilePlanRedoerTest {
// redo DeleteDataNode, vsg processor is used to test IdTable, don't test
IdTable here
File modsFile = new File(FILE_NAME.concat(ModificationFile.FILE_SUFFIX));
assertFalse(modsFile.exists());
- TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, false);
+ TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
planRedoer.redoDelete(deleteDataNode);
assertTrue(modsFile.exists());
}
@@ -692,7 +676,7 @@ public class TsFilePlanRedoerTest {
columns,
times.length);
// redo InsertTabletPlan, data region is used to test IdTable, don't test
IdTable here
- TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true);
+ TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
insertTabletNode.setMeasurementSchemas(schemas);
planRedoer.redoInsert(insertTabletNode);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
index cf679e62fe5..e92bc7a46d9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
@@ -18,17 +18,22 @@
*/
package org.apache.iotdb.db.storageengine.dataregion.wal.recover.file;
+import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.DataRegionException;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALInfoEntry;
+import
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALRecoverException;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.TsFileUtilsForRecoverTest;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -38,17 +43,21 @@ import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
import org.apache.iotdb.tsfile.read.common.Chunk;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
import org.apache.iotdb.tsfile.write.TsFileWriter;
import org.apache.iotdb.tsfile.write.record.TSRecord;
import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -57,6 +66,7 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -318,4 +328,81 @@ public class UnsealedTsFileRecoverPerformerTest {
recoverPerformer.endRecovery();
}
}
+
+ @Test
+ public void testRecoverDuplicate()
+ throws IllegalPathException, IOException, WriteProcessException,
DataRegionException,
+ WALRecoverException {
+ // generate crashed .tsfile
+ File file = new File(FILE_NAME);
+ generateCrashedFile(file);
+ assertTrue(file.exists());
+ assertFalse(new
File(FILE_NAME.concat(TsFileResource.RESOURCE_SUFFIX)).exists());
+ assertFalse(new
File(FILE_NAME.concat(ModificationFile.FILE_SUFFIX)).exists());
+ tsFileResource = new TsFileResource(file);
+
+ int fakeMemTableId = 1;
+
+ IMemTable memTable = new PrimitiveMemTable();
+ memTable.setDatabaseAndDataRegionId(SG_NAME, "0");
+ List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT32));
+ memTable.write(DEVICE1_NAME, schemaList, 1, new Object[] {100000});
+ WALEntry duplicateMemTableSnapshotWalEntry = new
WALInfoEntry(fakeMemTableId++, memTable);
+
+ InsertRowNode insertRowNode =
+ new InsertRowNode(
+ new PlanNodeId("plannode 1"),
+ new PartialPath(DEVICE1_NAME),
+ false,
+ new String[] {"s1"},
+ new TSDataType[] {TSDataType.INT32},
+ 2,
+ new Integer[] {20},
+ false);
+ insertRowNode.setMeasurementSchemas(
+ new MeasurementSchema[] {new MeasurementSchema("s1",
TSDataType.INT32)});
+ WALEntry duplicateWalEntry = new WALInfoEntry(fakeMemTableId++,
insertRowNode);
+
+ InsertRowNode insertRowNode2 =
+ new InsertRowNode(
+ new PlanNodeId("plannode 2"),
+ new PartialPath(DEVICE1_NAME),
+ false,
+ new String[] {"s1"},
+ new TSDataType[] {TSDataType.INT32},
+ 10,
+ new Integer[] {10},
+ false);
+ insertRowNode2.setMeasurementSchemas(
+ new MeasurementSchema[] {new MeasurementSchema("s1",
TSDataType.INT32)});
+
+ WALEntry normalWalEntry = new WALInfoEntry(fakeMemTableId++,
insertRowNode2);
+
+ try (UnsealedTsFileRecoverPerformer performer =
+ new UnsealedTsFileRecoverPerformer(tsFileResource, true, p ->
assertFalse(p.canWrite()))) {
+ performer.startRecovery();
+ performer.redoLog(duplicateMemTableSnapshotWalEntry);
+ performer.redoLog(duplicateWalEntry);
+ performer.redoLog(normalWalEntry);
+ performer.endRecovery();
+ performer.getTsFileResource();
+ }
+
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME)) {
+ List<ChunkMetadata> chunkMetadataList =
+ reader.getChunkMetadataList(new Path(DEVICE1_NAME, "s1", true));
+ for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+ Chunk chunk = reader.readMemChunk(chunkMetadata);
+ ChunkReader chunkReader = new ChunkReader(chunk);
+ while (chunkReader.hasNextSatisfiedPage()) {
+ BatchData batchData = chunkReader.nextPageData();
+ while (batchData.hasCurrent()) {
+ Assert.assertEquals((int) batchData.currentTime(),
batchData.currentValue());
+ batchData.next();
+ }
+ }
+ }
+ }
+ }
}