This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 360254be7c7 Fix recover duplicate (#12272)
360254be7c7 is described below

commit 360254be7c79af6e7a3472f15992baa5221596ba
Author: shuwenwei <[email protected]>
AuthorDate: Wed Apr 3 09:38:47 2024 +0800

    Fix recover duplicate (#12272)
---
 .../wal/recover/file/TsFilePlanRedoer.java         |  8 +-
 .../file/UnsealedTsFileRecoverPerformer.java       | 12 ++-
 .../wal/recover/file/TsFilePlanRedoerTest.java     | 36 +++------
 .../file/UnsealedTsFileRecoverPerformerTest.java   | 87 ++++++++++++++++++++++
 4 files changed, 110 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
index 2ecb64e28d2..1859b32fcc6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
@@ -41,15 +41,11 @@ import java.util.List;
  */
 public class TsFilePlanRedoer {
   private final TsFileResource tsFileResource;
-  // only unsequence file tolerates duplicated data
-  private final boolean sequence;
-
   // store data when redoing logs
   private IMemTable recoveryMemTable;
 
-  public TsFilePlanRedoer(TsFileResource tsFileResource, boolean sequence) {
+  public TsFilePlanRedoer(TsFileResource tsFileResource) {
     this.tsFileResource = tsFileResource;
-    this.sequence = sequence;
     this.recoveryMemTable =
         new PrimitiveMemTable(tsFileResource.getDatabaseName(), 
tsFileResource.getDataRegionId());
     
WritingMetrics.getInstance().recordActiveMemTableCount(tsFileResource.getDataRegionId(),
 1);
@@ -89,7 +85,7 @@ public class TsFilePlanRedoer {
       } else {
         minTimeInNode = ((InsertTabletNode) node).getTimes()[0];
       }
-      if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTimeInNode && 
sequence) {
+      if (lastEndTime != Long.MIN_VALUE && lastEndTime >= minTimeInNode) {
         return;
       }
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
index d2b3fd07d9a..f22e4b30022 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformer.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.wal.recover.file;
 
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
@@ -80,7 +81,7 @@ public class UnsealedTsFileRecoverPerformer extends 
AbstractTsFileRecoverPerform
     this.dataRegionId = tsFileResource.getDataRegionId();
     this.sequence = sequence;
     this.callbackAfterUnsealedTsFileRecovered = 
callbackAfterUnsealedTsFileRecovered;
-    this.walRedoer = new TsFilePlanRedoer(tsFileResource, sequence);
+    this.walRedoer = new TsFilePlanRedoer(tsFileResource);
     this.recoverListener = new 
WALRecoverListener(tsFileResource.getTsFilePath());
   }
 
@@ -188,6 +189,15 @@ public class UnsealedTsFileRecoverPerformer extends 
AbstractTsFileRecoverPerform
         case MEMORY_TABLE_SNAPSHOT:
           IMemTable memTable = (IMemTable) walEntry.getValue();
           if (!memTable.isSignalMemTable()) {
+            if (tsFileResource != null) {
+              for (IDeviceID device : tsFileResource.getDevices()) {
+                memTable.delete(
+                    new PartialPath(device, "*"),
+                    new PartialPath(device),
+                    tsFileResource.getStartTime(device),
+                    tsFileResource.getEndTime(device));
+              }
+            }
             walRedoer.resetRecoveryMemTable(memTable);
           }
           // update memtable's database and dataRegionId
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoerTest.java
index 8b9e0034aa9..de1662f303b 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoerTest.java
@@ -146,7 +146,7 @@ public class TsFilePlanRedoerTest {
         });
 
     // redo InsertTabletPlan, vsg processor is used to test IdTable, don't 
test IdTable here
-    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true);
+    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
     planRedoer.redoInsert(insertRowNode);
 
     // check data in memTable
@@ -238,7 +238,7 @@ public class TsFilePlanRedoerTest {
         });
 
     // redo InsertTabletPlan, vsg processor is used to test IdTable, don't 
test IdTable here
-    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true);
+    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
     planRedoer.redoInsert(insertRowNode1);
     planRedoer.redoInsert(insertRowNode2);
 
@@ -326,7 +326,7 @@ public class TsFilePlanRedoerTest {
         });
 
     // redo InsertTabletPlan, vsg processor is used to test IdTable, don't 
test IdTable here
-    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true);
+    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
     planRedoer.redoInsert(insertTabletNode);
 
     // check data in memTable
@@ -432,7 +432,7 @@ public class TsFilePlanRedoerTest {
         });
 
     // redo InsertTabletPlan, vsg processor is used to test IdTable, don't 
test IdTable here
-    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true);
+    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
     planRedoer.redoInsert(insertTabletNode);
 
     // check data in memTable
@@ -510,7 +510,7 @@ public class TsFilePlanRedoerTest {
             times.length);
 
     // redo InsertTabletPlan, vsg processor is used to test IdTable, don't 
test IdTable here
-    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true);
+    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
     planRedoer.redoInsert(insertTabletNode);
 
     // check data in memTable
@@ -561,7 +561,7 @@ public class TsFilePlanRedoerTest {
             times.length);
 
     // redo InsertTabletPlan, vsg processor is used to test IdTable, don't 
test IdTable here
-    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, false);
+    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
     planRedoer.redoInsert(insertTabletNode);
 
     // check data in memTable
@@ -572,29 +572,13 @@ public class TsFilePlanRedoerTest {
             DEVICE1_NAME, "s1", new MeasurementSchema("s1", TSDataType.INT32, 
TSEncoding.RLE));
     ReadOnlyMemChunk memChunk =
         recoveryMemTable.query(new QueryContext(), fullPath, Long.MIN_VALUE, 
null);
-    IPointReader iterator = memChunk.getPointReader();
-    int time = 1;
-    while (iterator.hasNextTimeValuePair()) {
-      TimeValuePair timeValuePair = iterator.nextTimeValuePair();
-      assertEquals(time, timeValuePair.getTimestamp());
-      assertEquals(100, timeValuePair.getValue().getInt());
-      ++time;
-    }
-    assertEquals(3, time);
+    assertTrue(memChunk == null || memChunk.isEmpty());
     // check d1.s2
     fullPath =
         new MeasurementPath(
             DEVICE1_NAME, "s2", new MeasurementSchema("s2", TSDataType.INT64, 
TSEncoding.RLE));
     memChunk = recoveryMemTable.query(new QueryContext(), fullPath, 
Long.MIN_VALUE, null);
-    iterator = memChunk.getPointReader();
-    time = 1;
-    while (iterator.hasNextTimeValuePair()) {
-      TimeValuePair timeValuePair = iterator.nextTimeValuePair();
-      assertEquals(time, timeValuePair.getTimestamp());
-      assertEquals(10000, timeValuePair.getValue().getLong());
-      ++time;
-    }
-    assertEquals(3, time);
+    assertTrue(memChunk == null || memChunk.isEmpty());
   }
 
   @Test
@@ -619,7 +603,7 @@ public class TsFilePlanRedoerTest {
     // redo DeleteDataNode, vsg processor is used to test IdTable, don't test 
IdTable here
     File modsFile = new File(FILE_NAME.concat(ModificationFile.FILE_SUFFIX));
     assertFalse(modsFile.exists());
-    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, false);
+    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
     planRedoer.redoDelete(deleteDataNode);
     assertTrue(modsFile.exists());
   }
@@ -692,7 +676,7 @@ public class TsFilePlanRedoerTest {
             columns,
             times.length);
     // redo InsertTabletPlan, data region is used to test IdTable, don't test 
IdTable here
-    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource, true);
+    TsFilePlanRedoer planRedoer = new TsFilePlanRedoer(tsFileResource);
     insertTabletNode.setMeasurementSchemas(schemas);
     planRedoer.redoInsert(insertTabletNode);
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
index cf679e62fe5..e92bc7a46d9 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/UnsealedTsFileRecoverPerformerTest.java
@@ -18,17 +18,22 @@
  */
 package org.apache.iotdb.db.storageengine.dataregion.wal.recover.file;
 
+import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.DataRegionException;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntry;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALInfoEntry;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALRecoverException;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.TsFileUtilsForRecoverTest;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -38,17 +43,21 @@ import org.apache.iotdb.tsfile.file.metadata.PlainDeviceID;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.BatchData;
 import org.apache.iotdb.tsfile.read.common.Chunk;
 import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.reader.chunk.ChunkReader;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DoubleDataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.FloatDataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.IntDataPoint;
 import org.apache.iotdb.tsfile.write.record.datapoint.LongDataPoint;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -57,6 +66,7 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.channels.FileChannel;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
@@ -318,4 +328,81 @@ public class UnsealedTsFileRecoverPerformerTest {
       recoverPerformer.endRecovery();
     }
   }
+
+  @Test
+  public void testRecoverDuplicate()
+      throws IllegalPathException, IOException, WriteProcessException, 
DataRegionException,
+          WALRecoverException {
+    // generate crashed .tsfile
+    File file = new File(FILE_NAME);
+    generateCrashedFile(file);
+    assertTrue(file.exists());
+    assertFalse(new 
File(FILE_NAME.concat(TsFileResource.RESOURCE_SUFFIX)).exists());
+    assertFalse(new 
File(FILE_NAME.concat(ModificationFile.FILE_SUFFIX)).exists());
+    tsFileResource = new TsFileResource(file);
+
+    int fakeMemTableId = 1;
+
+    IMemTable memTable = new PrimitiveMemTable();
+    memTable.setDatabaseAndDataRegionId(SG_NAME, "0");
+    List<IMeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT32));
+    memTable.write(DEVICE1_NAME, schemaList, 1, new Object[] {100000});
+    WALEntry duplicateMemTableSnapshotWalEntry = new 
WALInfoEntry(fakeMemTableId++, memTable);
+
+    InsertRowNode insertRowNode =
+        new InsertRowNode(
+            new PlanNodeId("plannode 1"),
+            new PartialPath(DEVICE1_NAME),
+            false,
+            new String[] {"s1"},
+            new TSDataType[] {TSDataType.INT32},
+            2,
+            new Integer[] {20},
+            false);
+    insertRowNode.setMeasurementSchemas(
+        new MeasurementSchema[] {new MeasurementSchema("s1", 
TSDataType.INT32)});
+    WALEntry duplicateWalEntry = new WALInfoEntry(fakeMemTableId++, 
insertRowNode);
+
+    InsertRowNode insertRowNode2 =
+        new InsertRowNode(
+            new PlanNodeId("plannode 2"),
+            new PartialPath(DEVICE1_NAME),
+            false,
+            new String[] {"s1"},
+            new TSDataType[] {TSDataType.INT32},
+            10,
+            new Integer[] {10},
+            false);
+    insertRowNode2.setMeasurementSchemas(
+        new MeasurementSchema[] {new MeasurementSchema("s1", 
TSDataType.INT32)});
+
+    WALEntry normalWalEntry = new WALInfoEntry(fakeMemTableId++, 
insertRowNode2);
+
+    try (UnsealedTsFileRecoverPerformer performer =
+        new UnsealedTsFileRecoverPerformer(tsFileResource, true, p -> 
assertFalse(p.canWrite()))) {
+      performer.startRecovery();
+      performer.redoLog(duplicateMemTableSnapshotWalEntry);
+      performer.redoLog(duplicateWalEntry);
+      performer.redoLog(normalWalEntry);
+      performer.endRecovery();
+      performer.getTsFileResource();
+    }
+
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_NAME)) {
+      List<ChunkMetadata> chunkMetadataList =
+          reader.getChunkMetadataList(new Path(DEVICE1_NAME, "s1", true));
+      for (ChunkMetadata chunkMetadata : chunkMetadataList) {
+        Chunk chunk = reader.readMemChunk(chunkMetadata);
+        ChunkReader chunkReader = new ChunkReader(chunk);
+        while (chunkReader.hasNextSatisfiedPage()) {
+          BatchData batchData = chunkReader.nextPageData();
+          while (batchData.hasCurrent()) {
+            Assert.assertEquals((int) batchData.currentTime(), 
batchData.currentValue());
+            batchData.next();
+          }
+        }
+      }
+    }
+  }
 }

Reply via email to