This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 559b58a2183 Enhance repair data file scan util (#14167)
559b58a2183 is described below

commit 559b58a2183f4bc0b9079573214a3b44e8141292
Author: shuwenwei <[email protected]>
AuthorDate: Tue Nov 26 10:11:39 2024 +0800

    Enhance repair data file scan util (#14167)
    
    * enhance repair data file scan util
    
    * add ut
    
    * add log
    
    * remove duplicate code
---
 .../CompactionStatisticsCheckFailedException.java  |  87 ++++++
 .../task/RepairUnsortedFileCompactionTask.java     |   4 +-
 .../compaction/repair/RepairDataFileScanUtil.java  | 338 +++++++++++++++------
 .../repair/RepairTimePartitionScanTask.java        |   4 +-
 .../repair/RepairDataFileScanUtilTest.java         | 128 +++++++-
 5 files changed, 462 insertions(+), 99 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionStatisticsCheckFailedException.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionStatisticsCheckFailedException.java
new file mode 100644
index 00000000000..9fe58c76042
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionStatisticsCheckFailedException.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception;
+
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.common.TimeRange;
+
+public class CompactionStatisticsCheckFailedException extends RuntimeException 
{
+
+  public CompactionStatisticsCheckFailedException(String msg) {
+    super(msg);
+  }
+
+  public CompactionStatisticsCheckFailedException(
+      IDeviceID deviceID, TimeRange deviceTimeRange, TimeRange 
actualDeviceTimeRange) {
+    super(
+        getExceptionMsg(
+            deviceID,
+            "The time range of current device is "
+                + deviceTimeRange
+                + ", which should equals actual device time range "
+                + actualDeviceTimeRange));
+  }
+
+  public CompactionStatisticsCheckFailedException(
+      IDeviceID deviceID, TimeseriesMetadata timeseriesMetadata, TimeRange 
actualTimeRange) {
+    super(
+        getExceptionMsg(
+            deviceID,
+            "Current timeseriesMetadata is "
+                + timeseriesMetadata
+                + ", which should equals actual time range "
+                + actualTimeRange));
+  }
+
+  public CompactionStatisticsCheckFailedException(
+      IDeviceID deviceID, ChunkMetadata chunkMetadata, TimeRange 
actualChunkTimeRange) {
+    super(
+        getExceptionMsg(
+            deviceID,
+            "Current chunkMetadata is "
+                + chunkMetadata
+                + ", which should equals actual chunk time range "
+                + actualChunkTimeRange));
+  }
+
+  public CompactionStatisticsCheckFailedException(
+      IDeviceID deviceID, PageHeader pageHeader, TimeRange pageDataTimeRange) {
+    super(
+        getExceptionMsg(
+            deviceID,
+            "Current page is "
+                + pageHeader
+                + ", which should contains actual page data time range "
+                + pageDataTimeRange));
+  }
+
+  private static String getExceptionMsg(IDeviceID deviceID, String detail) {
+    return "The device(" + deviceID + ")'s time range verification failed. " + 
detail;
+  }
+
+  @Override
+  @SuppressWarnings("java:S3551")
+  public Throwable fillInStackTrace() {
+    return this;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
index 48df5c4e295..3f76b6a538f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
@@ -198,12 +198,12 @@ public class RepairUnsortedFileCompactionTask extends 
InnerSpaceCompactionTask {
       return;
     }
     RepairDataFileScanUtil repairDataFileScanUtil = new 
RepairDataFileScanUtil(sourceFile, true);
-    repairDataFileScanUtil.scanTsFile();
+    repairDataFileScanUtil.scanTsFile(true);
     if (repairDataFileScanUtil.isBrokenFile()) {
       sourceFile.setTsFileRepairStatus(TsFileRepairStatus.CAN_NOT_REPAIR);
       return;
     }
-    if (repairDataFileScanUtil.hasUnsortedData()) {
+    if (repairDataFileScanUtil.hasUnsortedDataOrWrongStatistics()) {
       
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
index c11a7468a71..cffe4979316 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.storageengine.dataregion.compaction.repair;
 
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionStatisticsCheckFailedException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionChunkReader;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
@@ -38,15 +39,16 @@ import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.MetaMarker;
 import org.apache.tsfile.file.header.ChunkHeader;
 import org.apache.tsfile.file.header.PageHeader;
-import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.TsFileDeviceIterator;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.common.TimeRange;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
 import org.slf4j.Logger;
@@ -56,8 +58,9 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -67,7 +70,8 @@ import static 
org.apache.tsfile.read.reader.chunk.ChunkReader.decryptAndUncompre
 public class RepairDataFileScanUtil {
   private static final Logger logger = 
LoggerFactory.getLogger(RepairDataFileScanUtil.class);
   private final TsFileResource resource;
-  private boolean hasUnsortedData;
+  private ArrayDeviceTimeIndex timeIndex;
+  private boolean hasUnsortedDataOrWrongStatistics;
   private boolean isBrokenFile;
   private long previousTime;
   private boolean printLog;
@@ -78,13 +82,27 @@ public class RepairDataFileScanUtil {
 
   public RepairDataFileScanUtil(TsFileResource resource, boolean printLog) {
     this.resource = resource;
-    this.hasUnsortedData = false;
+    this.hasUnsortedDataOrWrongStatistics = false;
     this.previousTime = Long.MIN_VALUE;
     this.printLog = printLog;
   }
 
   public void scanTsFile() {
+    scanTsFile(false);
+  }
+
+  public void scanTsFile(boolean checkTsFileResource) {
     File tsfile = resource.getTsFile();
+    try {
+      timeIndex = checkTsFileResource ? getDeviceTimeIndex(resource) : null;
+    } catch (IOException e) {
+      logger.warn(
+          "Meet error when read tsfile resource file {}, it may be repaired 
after reboot",
+          tsfile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX,
+          e);
+      isBrokenFile = true;
+      return;
+    }
     try (TsFileSequenceReader reader =
         new CompactionTsFileReader(
             tsfile.getPath(),
@@ -92,26 +110,53 @@ public class RepairDataFileScanUtil {
                 ? CompactionType.INNER_SEQ_COMPACTION
                 : CompactionType.INNER_UNSEQ_COMPACTION)) {
       TsFileDeviceIterator deviceIterator = 
reader.getAllDevicesIteratorWithIsAligned();
+      Set<IDeviceID> deviceIdsInTimeIndex =
+          checkTsFileResource ? new HashSet<>(timeIndex.getDevices()) : 
Collections.emptySet();
       while (deviceIterator.hasNext()) {
         Pair<IDeviceID, Boolean> deviceIsAlignedPair = deviceIterator.next();
         IDeviceID device = deviceIsAlignedPair.getLeft();
+        if (checkTsFileResource) {
+          if (!deviceIdsInTimeIndex.contains(device)) {
+            throw new CompactionStatisticsCheckFailedException(
+                device + " does not exist in the resource file");
+          }
+          deviceIdsInTimeIndex.remove(device);
+        }
         MetadataIndexNode metadataIndexNode =
             deviceIterator.getFirstMeasurementNodeOfCurrentDevice();
+        TimeRange deviceTimeRangeInResource =
+            checkTsFileResource
+                ? new TimeRange(timeIndex.getStartTime(device), 
timeIndex.getEndTime(device))
+                : null;
         boolean isAligned = deviceIsAlignedPair.getRight();
         if (isAligned) {
-          checkAlignedDeviceSeries(reader, device, metadataIndexNode);
+          checkAlignedDeviceSeries(
+              reader, device, metadataIndexNode, deviceTimeRangeInResource, 
checkTsFileResource);
         } else {
-          checkNonAlignedDeviceSeries(reader, device, metadataIndexNode);
+          checkNonAlignedDeviceSeries(
+              reader, device, metadataIndexNode, deviceTimeRangeInResource, 
checkTsFileResource);
         }
       }
+      if (!deviceIdsInTimeIndex.isEmpty()) {
+        throw new CompactionStatisticsCheckFailedException(
+            "These devices (" + deviceIdsInTimeIndex + ") do not exist in the 
tsfile");
+      }
     } catch (CompactionLastTimeCheckFailedException 
lastTimeCheckFailedException) {
-      this.hasUnsortedData = true;
+      this.hasUnsortedDataOrWrongStatistics = true;
       if (printLog) {
         logger.error(
             "File {} has unsorted data: ",
             resource.getTsFile().getPath(),
             lastTimeCheckFailedException);
       }
+    } catch (CompactionStatisticsCheckFailedException 
compactionStatisticsCheckFailedException) {
+      this.hasUnsortedDataOrWrongStatistics = true;
+      if (printLog) {
+        logger.error(
+            "File {} has wrong time statistics: ",
+            resource.getTsFile().getPath(),
+            compactionStatisticsCheckFailedException);
+      }
     } catch (Exception e) {
       // ignored the exception caused by thread interrupt
       if (Thread.currentThread().isInterrupted()) {
@@ -127,102 +172,188 @@ public class RepairDataFileScanUtil {
   }
 
   private void checkAlignedDeviceSeries(
-      TsFileSequenceReader reader, IDeviceID device, MetadataIndexNode 
metadataIndexNode)
+      TsFileSequenceReader reader,
+      IDeviceID device,
+      MetadataIndexNode metadataIndexNode,
+      TimeRange deviceTimeRangeInResource,
+      boolean checkTsFileResource)
       throws IOException {
-    List<AlignedChunkMetadata> chunkMetadataList =
-        reader.getAlignedChunkMetadataByMetadataIndexNode(device, 
metadataIndexNode, false);
-    for (AlignedChunkMetadata alignedChunkMetadata : chunkMetadataList) {
-      IChunkMetadata timeChunkMetadata = 
alignedChunkMetadata.getTimeChunkMetadata();
-      Chunk timeChunk = reader.readMemChunk((ChunkMetadata) timeChunkMetadata);
+    List<TimeseriesMetadata> timeColumnTimeseriesMetadata = new ArrayList<>(1);
+    reader.readITimeseriesMetadata(timeColumnTimeseriesMetadata, 
metadataIndexNode, "");
+    TimeseriesMetadata timeseriesMetadata = 
timeColumnTimeseriesMetadata.get(0);
 
-      CompactionChunkReader chunkReader = new CompactionChunkReader(timeChunk);
-      ByteBuffer chunkDataBuffer = timeChunk.getData();
-      ChunkHeader chunkHeader = timeChunk.getHeader();
-      while (chunkDataBuffer.hasRemaining()) {
-        // deserialize a PageHeader from chunkDataBuffer
-        PageHeader pageHeader = null;
-        if (((byte) (chunkHeader.getChunkType() & 0x3F)) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
-          pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
timeChunk.getChunkStatistic());
-        } else {
-          pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunkHeader.getDataType());
-        }
-        ByteBuffer pageData = 
chunkReader.readPageDataWithoutUncompressing(pageHeader);
-        IDecryptor decryptor = 
IDecryptor.getDecryptor(timeChunk.getEncryptParam());
-        ByteBuffer uncompressedPageData =
-            decryptAndUncompressPageData(
-                pageHeader,
-                
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
-                pageData,
-                decryptor);
-        Decoder decoder =
-            Decoder.getDecoderByType(chunkHeader.getEncodingType(), 
chunkHeader.getDataType());
-        while (decoder.hasNext(uncompressedPageData)) {
-          long currentTime = decoder.readLong(uncompressedPageData);
-          checkPreviousTimeAndUpdate(device, currentTime);
-        }
-      }
+    // check device time range
+    TimeRange timeseriesTimeRange =
+        new TimeRange(
+            timeseriesMetadata.getStatistics().getStartTime(),
+            timeseriesMetadata.getStatistics().getEndTime());
+    if (checkTsFileResource && 
!timeseriesTimeRange.equals(deviceTimeRangeInResource)) {
+      throw new CompactionStatisticsCheckFailedException(
+          device, deviceTimeRangeInResource, timeseriesTimeRange);
+    }
+
+    long actualTimeseriesStartTime = Long.MAX_VALUE;
+    long actualTimeseriesEndTime = Long.MIN_VALUE;
+    List<ChunkMetadata> timeChunkMetadataList =
+        reader.readChunkMetaDataList(timeColumnTimeseriesMetadata.get(0));
+    for (ChunkMetadata timeChunkMetadata : timeChunkMetadataList) {
+      actualTimeseriesStartTime =
+          Math.min(actualTimeseriesStartTime, 
timeChunkMetadata.getStartTime());
+      actualTimeseriesEndTime = Math.max(actualTimeseriesEndTime, 
timeChunkMetadata.getEndTime());
+      checkTimeChunkInAlignedSeries(reader, device, timeChunkMetadata);
     }
+
+    // reset previousTime
     previousTime = Long.MIN_VALUE;
+
+    // check timeseries time range
+    if (actualTimeseriesStartTime > actualTimeseriesEndTime) {
+      return;
+    }
+    TimeRange actualTimeseriesTimeRange =
+        new TimeRange(actualTimeseriesStartTime, actualTimeseriesEndTime);
+    if (!actualTimeseriesTimeRange.equals(timeseriesTimeRange)) {
+      throw new CompactionStatisticsCheckFailedException(
+          device, timeseriesMetadata, actualTimeseriesTimeRange);
+    }
   }
 
-  private void checkNonAlignedDeviceSeries(
-      TsFileSequenceReader reader, IDeviceID device, MetadataIndexNode 
metadataIndexNode)
+  private void checkTimeChunkInAlignedSeries(
+      TsFileSequenceReader reader, IDeviceID device, ChunkMetadata 
timeChunkMetadata)
       throws IOException {
-    Iterator<Map<String, List<ChunkMetadata>>> 
measurementChunkMetadataListMapIterator =
-        reader.getMeasurementChunkMetadataListMapIterator(metadataIndexNode);
-    while (measurementChunkMetadataListMapIterator.hasNext()) {
-      Map<String, List<ChunkMetadata>> measurementChunkMetadataListMap =
-          measurementChunkMetadataListMapIterator.next();
-      for (Map.Entry<String, List<ChunkMetadata>> 
measurementChunkMetadataListEntry :
-          measurementChunkMetadataListMap.entrySet()) {
-        List<ChunkMetadata> chunkMetadataList = 
measurementChunkMetadataListEntry.getValue();
-        checkSingleNonAlignedSeries(
-            reader, device, measurementChunkMetadataListEntry.getKey(), 
chunkMetadataList);
-        previousTime = Long.MIN_VALUE;
+    Chunk timeChunk = reader.readMemChunk(timeChunkMetadata);
+
+    CompactionChunkReader chunkReader = new CompactionChunkReader(timeChunk);
+    ByteBuffer chunkDataBuffer = timeChunk.getData();
+    ChunkHeader chunkHeader = timeChunk.getHeader();
+    long actualChunkStartTime = Long.MAX_VALUE;
+    long actualChunkEndTime = Long.MIN_VALUE;
+    while (chunkDataBuffer.hasRemaining()) {
+      // deserialize a PageHeader from chunkDataBuffer
+      PageHeader pageHeader = null;
+      if (((byte) (chunkHeader.getChunkType() & 0x3F)) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+        pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
timeChunk.getChunkStatistic());
+      } else {
+        pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunkHeader.getDataType());
       }
+      actualChunkStartTime = Math.min(actualChunkStartTime, 
pageHeader.getStartTime());
+      actualChunkEndTime = Math.max(actualChunkEndTime, 
pageHeader.getEndTime());
+      ByteBuffer pageData = 
chunkReader.readPageDataWithoutUncompressing(pageHeader);
+      IDecryptor decryptor = 
IDecryptor.getDecryptor(timeChunk.getEncryptParam());
+      ByteBuffer uncompressedPageData =
+          decryptAndUncompressPageData(
+              pageHeader,
+              IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
+              pageData,
+              decryptor);
+      validateTimeData(device, uncompressedPageData, pageHeader);
+    }
+    if (actualChunkStartTime > actualChunkEndTime) {
+      return;
+    }
+    TimeRange actualChunkTimeRange = new TimeRange(actualChunkStartTime, 
actualChunkEndTime);
+    if (!actualChunkTimeRange.equals(
+        new TimeRange(timeChunkMetadata.getStartTime(), 
timeChunkMetadata.getEndTime()))) {
+      throw new CompactionStatisticsCheckFailedException(
+          device, timeChunkMetadata, actualChunkTimeRange);
     }
   }
 
-  private void checkSingleNonAlignedSeries(
+  private void checkNonAlignedDeviceSeries(
       TsFileSequenceReader reader,
-      IDeviceID deviceID,
-      String measurementId,
-      List<ChunkMetadata> chunkMetadataList)
+      IDeviceID device,
+      MetadataIndexNode metadataIndexNode,
+      TimeRange deviceTimeRangeInResource,
+      boolean checkTsFileResource)
       throws IOException {
-    for (ChunkMetadata chunkMetadata : chunkMetadataList) {
-      if (chunkMetadata == null || chunkMetadata.getStatistics().getCount() == 
0) {
-        continue;
-      }
-      Chunk chunk = reader.readMemChunk(chunkMetadata);
-      ChunkHeader chunkHeader = chunk.getHeader();
-      CompactionChunkReader chunkReader = new CompactionChunkReader(chunk);
-      ByteBuffer chunkDataBuffer = chunk.getData();
-      while (chunkDataBuffer.hasRemaining()) {
-        // deserialize a PageHeader from chunkDataBuffer
-        PageHeader pageHeader = null;
-        if (((byte) (chunkHeader.getChunkType() & 0x3F)) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
-          pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunk.getChunkStatistic());
-        } else {
-          pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunkHeader.getDataType());
-        }
-        ByteBuffer pageData = 
chunkReader.readPageDataWithoutUncompressing(pageHeader);
-        IDecryptor decryptor = 
IDecryptor.getDecryptor(chunk.getEncryptParam());
-        ByteBuffer uncompressedPageData =
-            decryptAndUncompressPageData(
-                pageHeader,
-                
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
-                pageData,
-                decryptor);
-        ByteBuffer timeBuffer = 
getTimeBufferFromNonAlignedPage(uncompressedPageData);
-        Decoder timeDecoder =
-            Decoder.getDecoderByType(
-                
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
-                TSDataType.INT64);
-        while (timeDecoder.hasNext(timeBuffer)) {
-          long currentTime = timeDecoder.readLong(timeBuffer);
-          checkPreviousTimeAndUpdate(deviceID, measurementId, currentTime);
-        }
+    List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+    reader.getDeviceTimeseriesMetadata(
+        timeseriesMetadataList, metadataIndexNode, Collections.emptySet(), 
true);
+    long actualDeviceStartTime = Long.MAX_VALUE;
+    long actualDeviceEndTime = Long.MIN_VALUE;
+    for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+      actualDeviceStartTime =
+          Math.min(actualDeviceStartTime, 
timeseriesMetadata.getStatistics().getStartTime());
+      actualDeviceEndTime =
+          Math.max(actualDeviceStartTime, 
timeseriesMetadata.getStatistics().getEndTime());
+      checkSingleNonAlignedSeries(reader, device, timeseriesMetadata);
+      previousTime = Long.MIN_VALUE;
+    }
+
+    if (!checkTsFileResource || actualDeviceStartTime > actualDeviceEndTime) {
+      return;
+    }
+    TimeRange actualDeviceTimeRange = new TimeRange(actualDeviceStartTime, 
actualDeviceEndTime);
+    if (!actualDeviceTimeRange.equals(deviceTimeRangeInResource)) {
+      throw new CompactionStatisticsCheckFailedException(
+          device, deviceTimeRangeInResource, actualDeviceTimeRange);
+    }
+  }
+
+  private void checkSingleNonAlignedSeries(
+      TsFileSequenceReader reader, IDeviceID deviceID, TimeseriesMetadata 
timeseriesMetadata)
+      throws IOException {
+    TimeRange timeseriesTimeRange =
+        new TimeRange(
+            timeseriesMetadata.getStatistics().getStartTime(),
+            timeseriesMetadata.getStatistics().getEndTime());
+    long actualTimeseriesStartTime = Long.MAX_VALUE;
+    long actualTimeseriesEndTime = Long.MIN_VALUE;
+    for (IChunkMetadata iChunkMetadata : 
timeseriesMetadata.getChunkMetadataList()) {
+      ChunkMetadata chunkMetadata = (ChunkMetadata) iChunkMetadata;
+      actualTimeseriesStartTime = Math.min(actualTimeseriesStartTime, 
chunkMetadata.getStartTime());
+      actualTimeseriesEndTime = Math.max(actualTimeseriesEndTime, 
chunkMetadata.getEndTime());
+      checkChunkOfNonAlignedSeries(reader, deviceID, chunkMetadata);
+    }
+    if (actualTimeseriesStartTime > actualTimeseriesEndTime) {
+      return;
+    }
+    TimeRange actualTimeseriesTimeRange =
+        new TimeRange(actualTimeseriesStartTime, actualTimeseriesEndTime);
+    if (!actualTimeseriesTimeRange.equals(timeseriesTimeRange)) {
+      throw new CompactionStatisticsCheckFailedException(
+          deviceID, timeseriesMetadata, actualTimeseriesTimeRange);
+    }
+  }
+
+  private void checkChunkOfNonAlignedSeries(
+      TsFileSequenceReader reader, IDeviceID deviceID, ChunkMetadata 
chunkMetadata)
+      throws IOException {
+    Chunk chunk = reader.readMemChunk(chunkMetadata);
+    ChunkHeader chunkHeader = chunk.getHeader();
+    CompactionChunkReader chunkReader = new CompactionChunkReader(chunk);
+    ByteBuffer chunkDataBuffer = chunk.getData();
+    long actualChunkStartTime = Long.MAX_VALUE;
+    long actualChunkEndTime = Long.MIN_VALUE;
+    while (chunkDataBuffer.hasRemaining()) {
+      // deserialize a PageHeader from chunkDataBuffer
+      PageHeader pageHeader = null;
+      if (((byte) (chunkHeader.getChunkType() & 0x3F)) == 
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+        pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunk.getChunkStatistic());
+      } else {
+        pageHeader = PageHeader.deserializeFrom(chunkDataBuffer, 
chunkHeader.getDataType());
       }
+      actualChunkStartTime = Math.min(actualChunkStartTime, 
pageHeader.getStartTime());
+      actualChunkEndTime = Math.max(actualChunkEndTime, 
pageHeader.getEndTime());
+      ByteBuffer pageData = 
chunkReader.readPageDataWithoutUncompressing(pageHeader);
+      IDecryptor decryptor = IDecryptor.getDecryptor(chunk.getEncryptParam());
+      ByteBuffer uncompressedPageData =
+          decryptAndUncompressPageData(
+              pageHeader,
+              IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
+              pageData,
+              decryptor);
+      ByteBuffer timeBuffer = 
getTimeBufferFromNonAlignedPage(uncompressedPageData);
+      validateTimeData(deviceID, timeBuffer, pageHeader);
+    }
+    if (actualChunkStartTime > actualChunkEndTime) {
+      return;
+    }
+    TimeRange actualChunkTimeRange = new TimeRange(actualChunkStartTime, 
actualChunkEndTime);
+    if (!actualChunkTimeRange.equals(
+        new TimeRange(chunkMetadata.getStartTime(), 
chunkMetadata.getEndTime()))) {
+      throw new CompactionStatisticsCheckFailedException(
+          deviceID, chunkMetadata, actualChunkTimeRange);
     }
   }
 
@@ -234,6 +365,31 @@ public class RepairDataFileScanUtil {
     return timeBuffer;
   }
 
+  private void validateTimeData(
+      IDeviceID device, ByteBuffer uncompressedTimeData, PageHeader 
pageHeader) throws IOException {
+    Decoder decoder =
+        Decoder.getDecoderByType(
+            
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+            TSDataType.INT64);
+    TimeRange pageHeaderTimeRange =
+        new TimeRange(pageHeader.getStartTime(), pageHeader.getEndTime());
+    long actualStartTime = Long.MAX_VALUE;
+    long actualEndTime = Long.MIN_VALUE;
+    while (decoder.hasNext(uncompressedTimeData)) {
+      long currentTime = decoder.readLong(uncompressedTimeData);
+      actualStartTime = Math.min(actualStartTime, currentTime);
+      actualEndTime = Math.max(actualEndTime, currentTime);
+      checkPreviousTimeAndUpdate(device, currentTime);
+    }
+    if (actualStartTime > actualEndTime) {
+      return;
+    }
+    TimeRange actualPageTimeRange = new TimeRange(actualStartTime, 
actualEndTime);
+    if (!actualPageTimeRange.equals(pageHeaderTimeRange)) {
+      throw new CompactionStatisticsCheckFailedException(device, pageHeader, 
actualPageTimeRange);
+    }
+  }
+
   private void checkPreviousTimeAndUpdate(IDeviceID deviceID, String 
measurementId, long time) {
     if (previousTime >= time) {
       throw new CompactionLastTimeCheckFailedException(
@@ -249,8 +405,8 @@ public class RepairDataFileScanUtil {
     previousTime = time;
   }
 
-  public boolean hasUnsortedData() {
-    return hasUnsortedData;
+  public boolean hasUnsortedDataOrWrongStatistics() {
+    return hasUnsortedDataOrWrongStatistics;
   }
 
   public boolean isBrokenFile() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
index d044824a3f2..a73d237705e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
@@ -103,7 +103,7 @@ public class RepairTimePartitionScanTask implements 
Callable<Void> {
         }
         LOGGER.info("[RepairScheduler] start check tsfile: {}", sourceFile);
         RepairDataFileScanUtil scanUtil = new 
RepairDataFileScanUtil(sourceFile);
-        scanUtil.scanTsFile();
+        scanUtil.scanTsFile(true);
         checkTaskStatusAndMayStop();
         if (scanUtil.isBrokenFile()) {
           LOGGER.warn("[RepairScheduler] {} is skipped because it is broken", 
sourceFile);
@@ -111,7 +111,7 @@ public class RepairTimePartitionScanTask implements 
Callable<Void> {
           latch.countDown();
           continue;
         }
-        if (!scanUtil.hasUnsortedData()) {
+        if (!scanUtil.hasUnsortedDataOrWrongStatistics()) {
           latch.countDown();
           continue;
         }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
index 70c7813a26d..a4fe8e20e13 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
@@ -24,8 +24,11 @@ import org.apache.iotdb.db.exception.StorageEngineException;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
 
 import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.read.common.TimeRange;
@@ -36,6 +39,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.List;
 
 public class RepairDataFileScanUtilTest extends AbstractCompactionTest {
 
@@ -76,7 +80,123 @@ public class RepairDataFileScanUtilTest extends 
AbstractCompactionTest {
     RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
     scanUtil.scanTsFile();
     Assert.assertFalse(scanUtil.isBrokenFile());
-    Assert.assertFalse(scanUtil.hasUnsortedData());
+    Assert.assertFalse(scanUtil.hasUnsortedDataOrWrongStatistics());
+  }
+
+  @Test
+  public void testWrongChunkStatisticsWithNonAlignedSeries() throws 
IOException {
+    TsFileResource resource = createEmptyFileAndResource(true);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
+      writer.startChunkGroup("d2");
+      writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+          "s0", new TimeRange[] {new TimeRange(10, 40)}, TSEncoding.PLAIN, 
CompressionType.LZ4);
+      writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+          "s1",
+          new TimeRange[] {new TimeRange(40, 40), new TimeRange(50, 70)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4);
+      List<ChunkMetadata> chunkMetadataListInMemory =
+          writer.getFileWriter().getChunkMetadataListOfCurrentDeviceInMemory();
+      ChunkMetadata originChunkMetadata = chunkMetadataListInMemory.get(0);
+      originChunkMetadata.getStatistics().setStartTime(4);
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
+    scanUtil.scanTsFile();
+    Assert.assertFalse(scanUtil.isBrokenFile());
+    Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics());
+  }
+
+  @Test
+  public void testWrongChunkStatisticsWithAlignedSeries() throws IOException {
+    TsFileResource resource = createEmptyFileAndResource(true);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
+      writer.startChunkGroup("d1");
+      writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+          Arrays.asList("s0", "s1", "s2"),
+          new TimeRange[] {new TimeRange(10, 40)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4,
+          Arrays.asList(false, false, true));
+      List<ChunkMetadata> chunkMetadataListInMemory =
+          writer.getFileWriter().getChunkMetadataListOfCurrentDeviceInMemory();
+      ChunkMetadata originChunkMetadata = chunkMetadataListInMemory.get(0);
+      originChunkMetadata.getStatistics().setStartTime(20);
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
+    scanUtil.scanTsFile();
+    Assert.assertFalse(scanUtil.isBrokenFile());
+    Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics());
+  }
+
+  @Test
+  public void testWrongResourceStatistics() throws IOException {
+    TsFileResource resource = createEmptyFileAndResource(true);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
+      writer.startChunkGroup("d1");
+      writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+          Arrays.asList("s0", "s1", "s2"),
+          new TimeRange[] {new TimeRange(10, 40)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4,
+          Arrays.asList(false, false, true));
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    resource
+        .getTimeIndex()
+        
.updateStartTime(IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d1"), 1);
+    RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
+    scanUtil.scanTsFile(true);
+    Assert.assertFalse(scanUtil.isBrokenFile());
+    Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics());
+  }
+
+  @Test
+  public void testDeviceNotExistsInTsFile() throws IOException {
+    TsFileResource resource = createEmptyFileAndResource(true);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
+      writer.startChunkGroup("d1");
+      writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+          Arrays.asList("s0", "s1", "s2"),
+          new TimeRange[] {new TimeRange(10, 40)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4,
+          Arrays.asList(false, false, true));
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    resource
+        .getTimeIndex()
+        
.updateStartTime(IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d2"), 1);
+    RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
+    scanUtil.scanTsFile(true);
+    Assert.assertFalse(scanUtil.isBrokenFile());
+    Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics());
+  }
+
+  @Test
+  public void testDeviceDoesNotExistInResourceFile() throws IOException {
+    TsFileResource resource = createEmptyFileAndResource(true);
+    try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource)) {
+      writer.startChunkGroup("d1");
+      writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+          Arrays.asList("s0", "s1", "s2"),
+          new TimeRange[] {new TimeRange(10, 40)},
+          TSEncoding.PLAIN,
+          CompressionType.LZ4,
+          Arrays.asList(false, false, true));
+      writer.endChunkGroup();
+      writer.endFile();
+    }
+    resource.setTimeIndex(new ArrayDeviceTimeIndex());
+    RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
+    scanUtil.scanTsFile(true);
+    Assert.assertFalse(scanUtil.isBrokenFile());
+    Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics());
   }
 
   @Test
@@ -104,7 +224,7 @@ public class RepairDataFileScanUtilTest extends 
AbstractCompactionTest {
     RepairDataFileScanUtil scanUtil1 = new RepairDataFileScanUtil(resource1);
     scanUtil1.scanTsFile();
     Assert.assertFalse(scanUtil1.isBrokenFile());
-    Assert.assertTrue(scanUtil1.hasUnsortedData());
+    Assert.assertTrue(scanUtil1.hasUnsortedDataOrWrongStatistics());
 
     TsFileResource resource2 = createEmptyFileAndResource(true);
     try (CompactionTestFileWriter writer = new 
CompactionTestFileWriter(resource2)) {
@@ -133,7 +253,7 @@ public class RepairDataFileScanUtilTest extends 
AbstractCompactionTest {
     RepairDataFileScanUtil scanUtil2 = new RepairDataFileScanUtil(resource2);
     scanUtil2.scanTsFile();
     Assert.assertFalse(scanUtil2.isBrokenFile());
-    Assert.assertTrue(scanUtil2.hasUnsortedData());
+    Assert.assertTrue(scanUtil2.hasUnsortedDataOrWrongStatistics());
   }
 
   @Test
@@ -162,6 +282,6 @@ public class RepairDataFileScanUtilTest extends 
AbstractCompactionTest {
     RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
     scanUtil.scanTsFile();
     Assert.assertFalse(scanUtil.isBrokenFile());
-    Assert.assertTrue(scanUtil.hasUnsortedData());
+    Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics());
   }
 }


Reply via email to