This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new e9d6173688e [to dev/1.3] Fix repair performerWhen the device is 
missing from the resource, this part of the data may be lost during file repair 
(#14422)
e9d6173688e is described below

commit e9d6173688eb45cd66b97f383b6db8a231d5b51e
Author: shuwenwei <[email protected]>
AuthorDate: Tue Dec 17 08:10:44 2024 +0800

    [to dev/1.3] Fix repair performerWhen the device is missing from the 
resource, this part of the data may be lost during file repair (#14422)
    
    * When the device is missing from the resource, this part of the data may 
be lost during file repair
    
    * fix compile
    
    * fix compile
---
 .../impl/ReadPointCompactionPerformer.java         |  6 ++-
 .../RepairUnsortedFileCompactionPerformer.java     | 13 +++++++
 .../rescon/memory/TsFileResourceManager.java       | 17 ++++++++
 .../repair/RepairUnsortedFileCompactionTest.java   | 45 ++++++++++++++++++++++
 .../rescon/memory/ResourceManagerTest.java         | 40 +++++++++++++++++++
 5 files changed, 120 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
index 5efb50d7b53..064cdecf841 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/ReadPointCompactionPerformer.java
@@ -103,7 +103,7 @@ public class ReadPointCompactionPerformer
     long queryId = 
QueryResourceManager.getInstance().assignCompactionQueryId();
     FragmentInstanceContext fragmentInstanceContext =
         
FragmentInstanceContext.createFragmentInstanceContextForCompaction(queryId);
-    QueryDataSource queryDataSource = new QueryDataSource(seqFiles, 
unseqFiles);
+    QueryDataSource queryDataSource = initQueryDataSource();
     QueryResourceManager.getInstance()
         .getQueryFileManager()
         .addUsedFilesForQuery(queryId, queryDataSource);
@@ -138,6 +138,10 @@ public class ReadPointCompactionPerformer
     }
   }
 
+  protected QueryDataSource initQueryDataSource() {
+    return new QueryDataSource(seqFiles, unseqFiles);
+  }
+
   @Override
   public void setTargetFiles(List<TsFileResource> targetFiles) {
     this.targetFiles = targetFiles;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
index 79a3b9fb157..18e9ef24df2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/performer/impl/RepairUnsortedFileCompactionPerformer.java
@@ -22,14 +22,17 @@ package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performe
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.AbstractCompactionWriter;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.RepairUnsortedFileCompactionWriter;
+import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
+import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.util.Collections;
 import java.util.List;
 
 /** Used for fixing files which contains internal unsorted data */
@@ -52,12 +55,22 @@ public class RepairUnsortedFileCompactionPerformer extends 
ReadPointCompactionPe
   public void perform() throws Exception {
     TsFileResource resource = !seqFiles.isEmpty() ? seqFiles.get(0) : 
unseqFiles.get(0);
     if (resource.getTsFileRepairStatus() == 
TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE) {
+      TsFileResourceManager.getInstance().forceDegradeTsFileResource(resource);
       super.perform();
     } else {
       prepareTargetFile();
     }
   }
 
+  @Override
+  protected QueryDataSource initQueryDataSource() {
+    // Only a single file is involved here. Regardless of whether this file is 
sequence
+    // or not, it is passed in as sequence file, which allows us to bypass 
some logic in
+    // SeriesScanUtils that might fail.
+    return new QueryDataSource(
+        !seqFiles.isEmpty() ? seqFiles : unseqFiles, Collections.emptyList());
+  }
+
   private void prepareTargetFile() throws IOException {
     TsFileResource seqSourceFile = seqFiles.get(0);
     TsFileResource targetFile = targetFiles.get(0);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
index 9da89447c72..b0d26b14913 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/TsFileResourceManager.java
@@ -82,6 +82,23 @@ public class TsFileResourceManager {
     }
   }
 
+  public void forceDegradeTsFileResource(TsFileResource resource) {
+    if (TimeIndexLevel.valueOf(resource.getTimeIndexType()) == 
TimeIndexLevel.FILE_TIME_INDEX) {
+      return;
+    }
+    logger.debug("Force degrade tsfile resource {}", resource.getTsFilePath());
+    synchronized (this) {
+      if (!sealedTsFileResources.remove(resource)) {
+        resource.degradeTimeIndex();
+        return;
+      }
+      long memoryReduce = resource.degradeTimeIndex();
+      degradedTimeIndexNum++;
+      releaseTimeIndexMemCost(memoryReduce);
+      sealedTsFileResources.add(resource);
+    }
+  }
+
   /** once degradation is triggered, the total memory for timeIndex should 
reduce */
   private void releaseTimeIndexMemCost(long memCost) {
     totalTimeIndexMemCost -= memCost;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
index 3122aaed50a..28f862f9371 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairUnsortedFileCompactionTest.java
@@ -38,12 +38,15 @@ import 
org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileRepairStatus;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.DeviceTimeIndex;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
 
 import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -796,4 +799,46 @@ public class RepairUnsortedFileCompactionTest extends 
AbstractRepairDataTest {
       Assert.assertEquals(3, chunkMetadataList.size());
     }
   }
+
+  @Test
+  public void testResourceFileLostDevices() throws IOException {
+    IDeviceID d1 = new PlainDeviceID("root.testsg.d1");
+    IDeviceID d2 = new PlainDeviceID("root.testsg.d2");
+    TsFileResource resource = createEmptyFileAndResource(true);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
+      writer.startChunkGroup("d1");
+      writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+          Arrays.asList("s0", "s1", "s2"),
+          new TimeRange[] {new TimeRange(10, 40)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4,
+          Arrays.asList(false, false, true));
+      writer.endChunkGroup();
+
+      writer.startChunkGroup("d2");
+      writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+          Arrays.asList("s0", "s1", "s2"),
+          new TimeRange[] {new TimeRange(10, 40)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4,
+          Arrays.asList(false, false, true));
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    // lost d1
+    ITimeIndex timeIndex = new DeviceTimeIndex();
+    timeIndex.updateStartTime(d2, 10);
+    timeIndex.updateEndTime(d2, 40);
+    resource.setTimeIndex(timeIndex);
+    resource.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_CHECK);
+
+    RepairUnsortedFileCompactionTask task =
+        new RepairUnsortedFileCompactionTask(0, tsFileManager, resource, true, 
0);
+    Assert.assertTrue(task.start());
+    TsFileResource target = tsFileManager.getTsFileList(false).get(0);
+    try (TsFileSequenceReader reader = new 
TsFileSequenceReader(target.getTsFilePath())) {
+      List<IDeviceID> devicesInTargetFile = reader.getAllDevices();
+      Assert.assertEquals(Arrays.asList(d1, d2), devicesInTargetFile);
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/ResourceManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/ResourceManagerTest.java
index efa285f7082..7bb27d1caee 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/ResourceManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/memory/ResourceManagerTest.java
@@ -29,12 +29,14 @@ import 
org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
 import 
org.apache.iotdb.db.storageengine.dataregion.read.control.FileReaderManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.TimeIndexLevel;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.constant.TestConstant;
 
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.PlainDeviceID;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -45,6 +47,7 @@ import org.apache.tsfile.write.record.TSRecord;
 import org.apache.tsfile.write.record.datapoint.DataPoint;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -368,4 +371,41 @@ public class ResourceManagerTest {
           TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
     }
   }
+
+  @Test
+  public void testForceDegradeTimeIndex() {
+    File file =
+        new File(
+            TestConstant.BASE_OUTPUT_PATH.concat(
+                1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 1
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + IoTDBConstant.FILE_NAME_SEPARATOR
+                    + 0
+                    + ".tsfile"));
+    TsFileResource tsFileResource = new TsFileResource(file);
+    IDeviceID device1 = new PlainDeviceID("root.test.d1");
+    IDeviceID device2 = new PlainDeviceID("root.test.d2");
+    tsFileResource.updateStartTime(device1, 1);
+    tsFileResource.updateStartTime(device1, 2);
+    tsFileResource.updateStartTime(device2, 1);
+    tsFileResource.updateStartTime(device2, 2);
+
+    assertEquals(
+        TimeIndexLevel.DEVICE_TIME_INDEX,
+        TimeIndexLevel.valueOf(tsFileResource.getTimeIndexType()));
+    tsFileResourceManager.registerSealedTsFileResource(tsFileResource);
+    long resourceRamSizeBeforeDegrade = tsFileResource.calculateRamSize();
+    Assert.assertEquals(
+        resourceRamSizeBeforeDegrade, 
tsFileResourceManager.getTotalTimeIndexMemCost());
+    
TsFileResourceManager.getInstance().forceDegradeTsFileResource(tsFileResource);
+    Assert.assertEquals(
+        tsFileResource.calculateRamSize(), 
tsFileResourceManager.getTotalTimeIndexMemCost());
+    Assert.assertTrue(
+        resourceRamSizeBeforeDegrade > 
tsFileResourceManager.getTotalTimeIndexMemCost());
+    Assert.assertTrue(tsFileResource.getTimeIndexType() == 
ITimeIndex.FILE_TIME_INDEX_TYPE);
+    Assert.assertEquals(1, tsFileResourceManager.getDegradedTimeIndexNum());
+  }
 }

Reply via email to