This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 559b58a2183 Enhance repair data file scan util (#14167)
559b58a2183 is described below
commit 559b58a2183f4bc0b9079573214a3b44e8141292
Author: shuwenwei <[email protected]>
AuthorDate: Tue Nov 26 10:11:39 2024 +0800
Enhance repair data file scan util (#14167)
* enhance repair data file scan util
* add ut
* add log
* remove duplicate code
---
.../CompactionStatisticsCheckFailedException.java | 87 ++++++
.../task/RepairUnsortedFileCompactionTask.java | 4 +-
.../compaction/repair/RepairDataFileScanUtil.java | 338 +++++++++++++++------
.../repair/RepairTimePartitionScanTask.java | 4 +-
.../repair/RepairDataFileScanUtilTest.java | 128 +++++++-
5 files changed, 462 insertions(+), 99 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionStatisticsCheckFailedException.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionStatisticsCheckFailedException.java
new file mode 100644
index 00000000000..9fe58c76042
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/exception/CompactionStatisticsCheckFailedException.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception;
+
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.common.TimeRange;
+
+public class CompactionStatisticsCheckFailedException extends RuntimeException
{
+
+ public CompactionStatisticsCheckFailedException(String msg) {
+ super(msg);
+ }
+
+ public CompactionStatisticsCheckFailedException(
+ IDeviceID deviceID, TimeRange deviceTimeRange, TimeRange
actualDeviceTimeRange) {
+ super(
+ getExceptionMsg(
+ deviceID,
+ "The time range of current device is "
+ + deviceTimeRange
+ + ", which should equals actual device time range "
+ + actualDeviceTimeRange));
+ }
+
+ public CompactionStatisticsCheckFailedException(
+ IDeviceID deviceID, TimeseriesMetadata timeseriesMetadata, TimeRange
actualTimeRange) {
+ super(
+ getExceptionMsg(
+ deviceID,
+ "Current timeseriesMetadata is "
+ + timeseriesMetadata
+ + ", which should equals actual time range "
+ + actualTimeRange));
+ }
+
+ public CompactionStatisticsCheckFailedException(
+ IDeviceID deviceID, ChunkMetadata chunkMetadata, TimeRange
actualChunkTimeRange) {
+ super(
+ getExceptionMsg(
+ deviceID,
+ "Current chunkMetadata is "
+ + chunkMetadata
+ + ", which should equals actual chunk time range "
+ + actualChunkTimeRange));
+ }
+
+ public CompactionStatisticsCheckFailedException(
+ IDeviceID deviceID, PageHeader pageHeader, TimeRange pageDataTimeRange) {
+ super(
+ getExceptionMsg(
+ deviceID,
+ "Current page is "
+ + pageHeader
+ + ", which should contains actual page data time range "
+ + pageDataTimeRange));
+ }
+
+ private static String getExceptionMsg(IDeviceID deviceID, String detail) {
+ return "The device(" + deviceID + ")'s time range verification failed. " +
detail;
+ }
+
+ @Override
+ @SuppressWarnings("java:S3551")
+ public Throwable fillInStackTrace() {
+ return this;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
index 48df5c4e295..3f76b6a538f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/task/RepairUnsortedFileCompactionTask.java
@@ -198,12 +198,12 @@ public class RepairUnsortedFileCompactionTask extends
InnerSpaceCompactionTask {
return;
}
RepairDataFileScanUtil repairDataFileScanUtil = new
RepairDataFileScanUtil(sourceFile, true);
- repairDataFileScanUtil.scanTsFile();
+ repairDataFileScanUtil.scanTsFile(true);
if (repairDataFileScanUtil.isBrokenFile()) {
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.CAN_NOT_REPAIR);
return;
}
- if (repairDataFileScanUtil.hasUnsortedData()) {
+ if (repairDataFileScanUtil.hasUnsortedDataOrWrongStatistics()) {
sourceFile.setTsFileRepairStatus(TsFileRepairStatus.NEED_TO_REPAIR_BY_REWRITE);
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
index c11a7468a71..cffe4979316 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtil.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.repair;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionStatisticsCheckFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.reader.CompactionChunkReader;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
@@ -38,15 +39,16 @@ import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkHeader;
import org.apache.tsfile.file.header.PageHeader;
-import org.apache.tsfile.file.metadata.AlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TsFileDeviceIterator;
import org.apache.tsfile.read.TsFileSequenceReader;
import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
import org.slf4j.Logger;
@@ -56,8 +58,9 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -67,7 +70,8 @@ import static
org.apache.tsfile.read.reader.chunk.ChunkReader.decryptAndUncompre
public class RepairDataFileScanUtil {
private static final Logger logger =
LoggerFactory.getLogger(RepairDataFileScanUtil.class);
private final TsFileResource resource;
- private boolean hasUnsortedData;
+ private ArrayDeviceTimeIndex timeIndex;
+ private boolean hasUnsortedDataOrWrongStatistics;
private boolean isBrokenFile;
private long previousTime;
private boolean printLog;
@@ -78,13 +82,27 @@ public class RepairDataFileScanUtil {
public RepairDataFileScanUtil(TsFileResource resource, boolean printLog) {
this.resource = resource;
- this.hasUnsortedData = false;
+ this.hasUnsortedDataOrWrongStatistics = false;
this.previousTime = Long.MIN_VALUE;
this.printLog = printLog;
}
public void scanTsFile() {
+ scanTsFile(false);
+ }
+
+ public void scanTsFile(boolean checkTsFileResource) {
File tsfile = resource.getTsFile();
+ try {
+ timeIndex = checkTsFileResource ? getDeviceTimeIndex(resource) : null;
+ } catch (IOException e) {
+ logger.warn(
+ "Meet error when read tsfile resource file {}, it may be repaired
after reboot",
+ tsfile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX,
+ e);
+ isBrokenFile = true;
+ return;
+ }
try (TsFileSequenceReader reader =
new CompactionTsFileReader(
tsfile.getPath(),
@@ -92,26 +110,53 @@ public class RepairDataFileScanUtil {
? CompactionType.INNER_SEQ_COMPACTION
: CompactionType.INNER_UNSEQ_COMPACTION)) {
TsFileDeviceIterator deviceIterator =
reader.getAllDevicesIteratorWithIsAligned();
+ Set<IDeviceID> deviceIdsInTimeIndex =
+ checkTsFileResource ? new HashSet<>(timeIndex.getDevices()) :
Collections.emptySet();
while (deviceIterator.hasNext()) {
Pair<IDeviceID, Boolean> deviceIsAlignedPair = deviceIterator.next();
IDeviceID device = deviceIsAlignedPair.getLeft();
+ if (checkTsFileResource) {
+ if (!deviceIdsInTimeIndex.contains(device)) {
+ throw new CompactionStatisticsCheckFailedException(
+ device + " does not exist in the resource file");
+ }
+ deviceIdsInTimeIndex.remove(device);
+ }
MetadataIndexNode metadataIndexNode =
deviceIterator.getFirstMeasurementNodeOfCurrentDevice();
+ TimeRange deviceTimeRangeInResource =
+ checkTsFileResource
+ ? new TimeRange(timeIndex.getStartTime(device),
timeIndex.getEndTime(device))
+ : null;
boolean isAligned = deviceIsAlignedPair.getRight();
if (isAligned) {
- checkAlignedDeviceSeries(reader, device, metadataIndexNode);
+ checkAlignedDeviceSeries(
+ reader, device, metadataIndexNode, deviceTimeRangeInResource,
checkTsFileResource);
} else {
- checkNonAlignedDeviceSeries(reader, device, metadataIndexNode);
+ checkNonAlignedDeviceSeries(
+ reader, device, metadataIndexNode, deviceTimeRangeInResource,
checkTsFileResource);
}
}
+ if (!deviceIdsInTimeIndex.isEmpty()) {
+ throw new CompactionStatisticsCheckFailedException(
+ "These devices (" + deviceIdsInTimeIndex + ") do not exist in the
tsfile");
+ }
} catch (CompactionLastTimeCheckFailedException
lastTimeCheckFailedException) {
- this.hasUnsortedData = true;
+ this.hasUnsortedDataOrWrongStatistics = true;
if (printLog) {
logger.error(
"File {} has unsorted data: ",
resource.getTsFile().getPath(),
lastTimeCheckFailedException);
}
+ } catch (CompactionStatisticsCheckFailedException
compactionStatisticsCheckFailedException) {
+ this.hasUnsortedDataOrWrongStatistics = true;
+ if (printLog) {
+ logger.error(
+ "File {} has wrong time statistics: ",
+ resource.getTsFile().getPath(),
+ compactionStatisticsCheckFailedException);
+ }
} catch (Exception e) {
// ignored the exception caused by thread interrupt
if (Thread.currentThread().isInterrupted()) {
@@ -127,102 +172,188 @@ public class RepairDataFileScanUtil {
}
private void checkAlignedDeviceSeries(
- TsFileSequenceReader reader, IDeviceID device, MetadataIndexNode
metadataIndexNode)
+ TsFileSequenceReader reader,
+ IDeviceID device,
+ MetadataIndexNode metadataIndexNode,
+ TimeRange deviceTimeRangeInResource,
+ boolean checkTsFileResource)
throws IOException {
- List<AlignedChunkMetadata> chunkMetadataList =
- reader.getAlignedChunkMetadataByMetadataIndexNode(device,
metadataIndexNode, false);
- for (AlignedChunkMetadata alignedChunkMetadata : chunkMetadataList) {
- IChunkMetadata timeChunkMetadata =
alignedChunkMetadata.getTimeChunkMetadata();
- Chunk timeChunk = reader.readMemChunk((ChunkMetadata) timeChunkMetadata);
+ List<TimeseriesMetadata> timeColumnTimeseriesMetadata = new ArrayList<>(1);
+ reader.readITimeseriesMetadata(timeColumnTimeseriesMetadata,
metadataIndexNode, "");
+ TimeseriesMetadata timeseriesMetadata =
timeColumnTimeseriesMetadata.get(0);
- CompactionChunkReader chunkReader = new CompactionChunkReader(timeChunk);
- ByteBuffer chunkDataBuffer = timeChunk.getData();
- ChunkHeader chunkHeader = timeChunk.getHeader();
- while (chunkDataBuffer.hasRemaining()) {
- // deserialize a PageHeader from chunkDataBuffer
- PageHeader pageHeader = null;
- if (((byte) (chunkHeader.getChunkType() & 0x3F)) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
timeChunk.getChunkStatistic());
- } else {
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
- }
- ByteBuffer pageData =
chunkReader.readPageDataWithoutUncompressing(pageHeader);
- IDecryptor decryptor =
IDecryptor.getDecryptor(timeChunk.getEncryptParam());
- ByteBuffer uncompressedPageData =
- decryptAndUncompressPageData(
- pageHeader,
-
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
- pageData,
- decryptor);
- Decoder decoder =
- Decoder.getDecoderByType(chunkHeader.getEncodingType(),
chunkHeader.getDataType());
- while (decoder.hasNext(uncompressedPageData)) {
- long currentTime = decoder.readLong(uncompressedPageData);
- checkPreviousTimeAndUpdate(device, currentTime);
- }
- }
+ // check device time range
+ TimeRange timeseriesTimeRange =
+ new TimeRange(
+ timeseriesMetadata.getStatistics().getStartTime(),
+ timeseriesMetadata.getStatistics().getEndTime());
+ if (checkTsFileResource &&
!timeseriesTimeRange.equals(deviceTimeRangeInResource)) {
+ throw new CompactionStatisticsCheckFailedException(
+ device, deviceTimeRangeInResource, timeseriesTimeRange);
+ }
+
+ long actualTimeseriesStartTime = Long.MAX_VALUE;
+ long actualTimeseriesEndTime = Long.MIN_VALUE;
+ List<ChunkMetadata> timeChunkMetadataList =
+ reader.readChunkMetaDataList(timeColumnTimeseriesMetadata.get(0));
+ for (ChunkMetadata timeChunkMetadata : timeChunkMetadataList) {
+ actualTimeseriesStartTime =
+ Math.min(actualTimeseriesStartTime,
timeChunkMetadata.getStartTime());
+ actualTimeseriesEndTime = Math.max(actualTimeseriesEndTime,
timeChunkMetadata.getEndTime());
+ checkTimeChunkInAlignedSeries(reader, device, timeChunkMetadata);
}
+
+ // reset previousTime
previousTime = Long.MIN_VALUE;
+
+ // check timeseries time range
+ if (actualTimeseriesStartTime > actualTimeseriesEndTime) {
+ return;
+ }
+ TimeRange actualTimeseriesTimeRange =
+ new TimeRange(actualTimeseriesStartTime, actualTimeseriesEndTime);
+ if (!actualTimeseriesTimeRange.equals(timeseriesTimeRange)) {
+ throw new CompactionStatisticsCheckFailedException(
+ device, timeseriesMetadata, actualTimeseriesTimeRange);
+ }
}
- private void checkNonAlignedDeviceSeries(
- TsFileSequenceReader reader, IDeviceID device, MetadataIndexNode
metadataIndexNode)
+ private void checkTimeChunkInAlignedSeries(
+ TsFileSequenceReader reader, IDeviceID device, ChunkMetadata
timeChunkMetadata)
throws IOException {
- Iterator<Map<String, List<ChunkMetadata>>>
measurementChunkMetadataListMapIterator =
- reader.getMeasurementChunkMetadataListMapIterator(metadataIndexNode);
- while (measurementChunkMetadataListMapIterator.hasNext()) {
- Map<String, List<ChunkMetadata>> measurementChunkMetadataListMap =
- measurementChunkMetadataListMapIterator.next();
- for (Map.Entry<String, List<ChunkMetadata>>
measurementChunkMetadataListEntry :
- measurementChunkMetadataListMap.entrySet()) {
- List<ChunkMetadata> chunkMetadataList =
measurementChunkMetadataListEntry.getValue();
- checkSingleNonAlignedSeries(
- reader, device, measurementChunkMetadataListEntry.getKey(),
chunkMetadataList);
- previousTime = Long.MIN_VALUE;
+ Chunk timeChunk = reader.readMemChunk(timeChunkMetadata);
+
+ CompactionChunkReader chunkReader = new CompactionChunkReader(timeChunk);
+ ByteBuffer chunkDataBuffer = timeChunk.getData();
+ ChunkHeader chunkHeader = timeChunk.getHeader();
+ long actualChunkStartTime = Long.MAX_VALUE;
+ long actualChunkEndTime = Long.MIN_VALUE;
+ while (chunkDataBuffer.hasRemaining()) {
+ // deserialize a PageHeader from chunkDataBuffer
+ PageHeader pageHeader = null;
+ if (((byte) (chunkHeader.getChunkType() & 0x3F)) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
timeChunk.getChunkStatistic());
+ } else {
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
}
+ actualChunkStartTime = Math.min(actualChunkStartTime,
pageHeader.getStartTime());
+ actualChunkEndTime = Math.max(actualChunkEndTime,
pageHeader.getEndTime());
+ ByteBuffer pageData =
chunkReader.readPageDataWithoutUncompressing(pageHeader);
+ IDecryptor decryptor =
IDecryptor.getDecryptor(timeChunk.getEncryptParam());
+ ByteBuffer uncompressedPageData =
+ decryptAndUncompressPageData(
+ pageHeader,
+ IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
+ pageData,
+ decryptor);
+ validateTimeData(device, uncompressedPageData, pageHeader);
+ }
+ if (actualChunkStartTime > actualChunkEndTime) {
+ return;
+ }
+ TimeRange actualChunkTimeRange = new TimeRange(actualChunkStartTime,
actualChunkEndTime);
+ if (!actualChunkTimeRange.equals(
+ new TimeRange(timeChunkMetadata.getStartTime(),
timeChunkMetadata.getEndTime()))) {
+ throw new CompactionStatisticsCheckFailedException(
+ device, timeChunkMetadata, actualChunkTimeRange);
}
}
- private void checkSingleNonAlignedSeries(
+ private void checkNonAlignedDeviceSeries(
TsFileSequenceReader reader,
- IDeviceID deviceID,
- String measurementId,
- List<ChunkMetadata> chunkMetadataList)
+ IDeviceID device,
+ MetadataIndexNode metadataIndexNode,
+ TimeRange deviceTimeRangeInResource,
+ boolean checkTsFileResource)
throws IOException {
- for (ChunkMetadata chunkMetadata : chunkMetadataList) {
- if (chunkMetadata == null || chunkMetadata.getStatistics().getCount() ==
0) {
- continue;
- }
- Chunk chunk = reader.readMemChunk(chunkMetadata);
- ChunkHeader chunkHeader = chunk.getHeader();
- CompactionChunkReader chunkReader = new CompactionChunkReader(chunk);
- ByteBuffer chunkDataBuffer = chunk.getData();
- while (chunkDataBuffer.hasRemaining()) {
- // deserialize a PageHeader from chunkDataBuffer
- PageHeader pageHeader = null;
- if (((byte) (chunkHeader.getChunkType() & 0x3F)) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunk.getChunkStatistic());
- } else {
- pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
- }
- ByteBuffer pageData =
chunkReader.readPageDataWithoutUncompressing(pageHeader);
- IDecryptor decryptor =
IDecryptor.getDecryptor(chunk.getEncryptParam());
- ByteBuffer uncompressedPageData =
- decryptAndUncompressPageData(
- pageHeader,
-
IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
- pageData,
- decryptor);
- ByteBuffer timeBuffer =
getTimeBufferFromNonAlignedPage(uncompressedPageData);
- Decoder timeDecoder =
- Decoder.getDecoderByType(
-
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
- TSDataType.INT64);
- while (timeDecoder.hasNext(timeBuffer)) {
- long currentTime = timeDecoder.readLong(timeBuffer);
- checkPreviousTimeAndUpdate(deviceID, measurementId, currentTime);
- }
+ List<TimeseriesMetadata> timeseriesMetadataList = new ArrayList<>();
+ reader.getDeviceTimeseriesMetadata(
+ timeseriesMetadataList, metadataIndexNode, Collections.emptySet(),
true);
+ long actualDeviceStartTime = Long.MAX_VALUE;
+ long actualDeviceEndTime = Long.MIN_VALUE;
+ for (TimeseriesMetadata timeseriesMetadata : timeseriesMetadataList) {
+ actualDeviceStartTime =
+ Math.min(actualDeviceStartTime,
timeseriesMetadata.getStatistics().getStartTime());
+ actualDeviceEndTime =
+ Math.max(actualDeviceStartTime,
timeseriesMetadata.getStatistics().getEndTime());
+ checkSingleNonAlignedSeries(reader, device, timeseriesMetadata);
+ previousTime = Long.MIN_VALUE;
+ }
+
+ if (!checkTsFileResource || actualDeviceStartTime > actualDeviceEndTime) {
+ return;
+ }
+ TimeRange actualDeviceTimeRange = new TimeRange(actualDeviceStartTime,
actualDeviceEndTime);
+ if (!actualDeviceTimeRange.equals(deviceTimeRangeInResource)) {
+ throw new CompactionStatisticsCheckFailedException(
+ device, deviceTimeRangeInResource, actualDeviceTimeRange);
+ }
+ }
+
+ private void checkSingleNonAlignedSeries(
+ TsFileSequenceReader reader, IDeviceID deviceID, TimeseriesMetadata
timeseriesMetadata)
+ throws IOException {
+ TimeRange timeseriesTimeRange =
+ new TimeRange(
+ timeseriesMetadata.getStatistics().getStartTime(),
+ timeseriesMetadata.getStatistics().getEndTime());
+ long actualTimeseriesStartTime = Long.MAX_VALUE;
+ long actualTimeseriesEndTime = Long.MIN_VALUE;
+ for (IChunkMetadata iChunkMetadata :
timeseriesMetadata.getChunkMetadataList()) {
+ ChunkMetadata chunkMetadata = (ChunkMetadata) iChunkMetadata;
+ actualTimeseriesStartTime = Math.min(actualTimeseriesStartTime,
chunkMetadata.getStartTime());
+ actualTimeseriesEndTime = Math.max(actualTimeseriesEndTime,
chunkMetadata.getEndTime());
+ checkChunkOfNonAlignedSeries(reader, deviceID, chunkMetadata);
+ }
+ if (actualTimeseriesStartTime > actualTimeseriesEndTime) {
+ return;
+ }
+ TimeRange actualTimeseriesTimeRange =
+ new TimeRange(actualTimeseriesStartTime, actualTimeseriesEndTime);
+ if (!actualTimeseriesTimeRange.equals(timeseriesTimeRange)) {
+ throw new CompactionStatisticsCheckFailedException(
+ deviceID, timeseriesMetadata, actualTimeseriesTimeRange);
+ }
+ }
+
+ private void checkChunkOfNonAlignedSeries(
+ TsFileSequenceReader reader, IDeviceID deviceID, ChunkMetadata
chunkMetadata)
+ throws IOException {
+ Chunk chunk = reader.readMemChunk(chunkMetadata);
+ ChunkHeader chunkHeader = chunk.getHeader();
+ CompactionChunkReader chunkReader = new CompactionChunkReader(chunk);
+ ByteBuffer chunkDataBuffer = chunk.getData();
+ long actualChunkStartTime = Long.MAX_VALUE;
+ long actualChunkEndTime = Long.MIN_VALUE;
+ while (chunkDataBuffer.hasRemaining()) {
+ // deserialize a PageHeader from chunkDataBuffer
+ PageHeader pageHeader = null;
+ if (((byte) (chunkHeader.getChunkType() & 0x3F)) ==
MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER) {
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunk.getChunkStatistic());
+ } else {
+ pageHeader = PageHeader.deserializeFrom(chunkDataBuffer,
chunkHeader.getDataType());
}
+ actualChunkStartTime = Math.min(actualChunkStartTime,
pageHeader.getStartTime());
+ actualChunkEndTime = Math.max(actualChunkEndTime,
pageHeader.getEndTime());
+ ByteBuffer pageData =
chunkReader.readPageDataWithoutUncompressing(pageHeader);
+ IDecryptor decryptor = IDecryptor.getDecryptor(chunk.getEncryptParam());
+ ByteBuffer uncompressedPageData =
+ decryptAndUncompressPageData(
+ pageHeader,
+ IUnCompressor.getUnCompressor(chunkHeader.getCompressionType()),
+ pageData,
+ decryptor);
+ ByteBuffer timeBuffer =
getTimeBufferFromNonAlignedPage(uncompressedPageData);
+ validateTimeData(deviceID, timeBuffer, pageHeader);
+ }
+ if (actualChunkStartTime > actualChunkEndTime) {
+ return;
+ }
+ TimeRange actualChunkTimeRange = new TimeRange(actualChunkStartTime,
actualChunkEndTime);
+ if (!actualChunkTimeRange.equals(
+ new TimeRange(chunkMetadata.getStartTime(),
chunkMetadata.getEndTime()))) {
+ throw new CompactionStatisticsCheckFailedException(
+ deviceID, chunkMetadata, actualChunkTimeRange);
}
}
@@ -234,6 +365,31 @@ public class RepairDataFileScanUtil {
return timeBuffer;
}
+ private void validateTimeData(
+ IDeviceID device, ByteBuffer uncompressedTimeData, PageHeader
pageHeader) throws IOException {
+ Decoder decoder =
+ Decoder.getDecoderByType(
+
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()),
+ TSDataType.INT64);
+ TimeRange pageHeaderTimeRange =
+ new TimeRange(pageHeader.getStartTime(), pageHeader.getEndTime());
+ long actualStartTime = Long.MAX_VALUE;
+ long actualEndTime = Long.MIN_VALUE;
+ while (decoder.hasNext(uncompressedTimeData)) {
+ long currentTime = decoder.readLong(uncompressedTimeData);
+ actualStartTime = Math.min(actualStartTime, currentTime);
+ actualEndTime = Math.max(actualEndTime, currentTime);
+ checkPreviousTimeAndUpdate(device, currentTime);
+ }
+ if (actualStartTime > actualEndTime) {
+ return;
+ }
+ TimeRange actualPageTimeRange = new TimeRange(actualStartTime,
actualEndTime);
+ if (!actualPageTimeRange.equals(pageHeaderTimeRange)) {
+ throw new CompactionStatisticsCheckFailedException(device, pageHeader,
actualPageTimeRange);
+ }
+ }
+
private void checkPreviousTimeAndUpdate(IDeviceID deviceID, String
measurementId, long time) {
if (previousTime >= time) {
throw new CompactionLastTimeCheckFailedException(
@@ -249,8 +405,8 @@ public class RepairDataFileScanUtil {
previousTime = time;
}
- public boolean hasUnsortedData() {
- return hasUnsortedData;
+ public boolean hasUnsortedDataOrWrongStatistics() {
+ return hasUnsortedDataOrWrongStatistics;
}
public boolean isBrokenFile() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
index d044824a3f2..a73d237705e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairTimePartitionScanTask.java
@@ -103,7 +103,7 @@ public class RepairTimePartitionScanTask implements
Callable<Void> {
}
LOGGER.info("[RepairScheduler] start check tsfile: {}", sourceFile);
RepairDataFileScanUtil scanUtil = new
RepairDataFileScanUtil(sourceFile);
- scanUtil.scanTsFile();
+ scanUtil.scanTsFile(true);
checkTaskStatusAndMayStop();
if (scanUtil.isBrokenFile()) {
LOGGER.warn("[RepairScheduler] {} is skipped because it is broken",
sourceFile);
@@ -111,7 +111,7 @@ public class RepairTimePartitionScanTask implements
Callable<Void> {
latch.countDown();
continue;
}
- if (!scanUtil.hasUnsortedData()) {
+ if (!scanUtil.hasUnsortedDataOrWrongStatistics()) {
latch.countDown();
continue;
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
index 70c7813a26d..a4fe8e20e13 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/repair/RepairDataFileScanUtilTest.java
@@ -24,8 +24,11 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex;
import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.TimeRange;
@@ -36,6 +39,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
public class RepairDataFileScanUtilTest extends AbstractCompactionTest {
@@ -76,7 +80,123 @@ public class RepairDataFileScanUtilTest extends
AbstractCompactionTest {
RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
scanUtil.scanTsFile();
Assert.assertFalse(scanUtil.isBrokenFile());
- Assert.assertFalse(scanUtil.hasUnsortedData());
+ Assert.assertFalse(scanUtil.hasUnsortedDataOrWrongStatistics());
+ }
+
+ @Test
+ public void testWrongChunkStatisticsWithNonAlignedSeries() throws
IOException {
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ writer.startChunkGroup("d2");
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s0", new TimeRange[] {new TimeRange(10, 40)}, TSEncoding.PLAIN,
CompressionType.LZ4);
+ writer.generateSimpleNonAlignedSeriesToCurrentDevice(
+ "s1",
+ new TimeRange[] {new TimeRange(40, 40), new TimeRange(50, 70)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4);
+ List<ChunkMetadata> chunkMetadataListInMemory =
+ writer.getFileWriter().getChunkMetadataListOfCurrentDeviceInMemory();
+ ChunkMetadata originChunkMetadata = chunkMetadataListInMemory.get(0);
+ originChunkMetadata.getStatistics().setStartTime(4);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
+ scanUtil.scanTsFile();
+ Assert.assertFalse(scanUtil.isBrokenFile());
+ Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics());
+ }
+
+ @Test
+ public void testWrongChunkStatisticsWithAlignedSeries() throws IOException {
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1", "s2"),
+ new TimeRange[] {new TimeRange(10, 40)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false, true));
+ List<ChunkMetadata> chunkMetadataListInMemory =
+ writer.getFileWriter().getChunkMetadataListOfCurrentDeviceInMemory();
+ ChunkMetadata originChunkMetadata = chunkMetadataListInMemory.get(0);
+ originChunkMetadata.getStatistics().setStartTime(20);
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
+ scanUtil.scanTsFile();
+ Assert.assertFalse(scanUtil.isBrokenFile());
+ Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics());
+ }
+
+ @Test
+ public void testWrongResourceStatistics() throws IOException {
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1", "s2"),
+ new TimeRange[] {new TimeRange(10, 40)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false, true));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ resource
+ .getTimeIndex()
+
.updateStartTime(IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d1"), 1);
+ RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
+ scanUtil.scanTsFile(true);
+ Assert.assertFalse(scanUtil.isBrokenFile());
+ Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics());
+ }
+
+ @Test
+ public void testDeviceNotExistsInTsFile() throws IOException {
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1", "s2"),
+ new TimeRange[] {new TimeRange(10, 40)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false, true));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ resource
+ .getTimeIndex()
+
.updateStartTime(IDeviceID.Factory.DEFAULT_FACTORY.create("root.testsg.d2"), 1);
+ RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
+ scanUtil.scanTsFile(true);
+ Assert.assertFalse(scanUtil.isBrokenFile());
+ Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics());
+ }
+
+ @Test
+ public void testDeviceDoesNotExistInResourceFile() throws IOException {
+ TsFileResource resource = createEmptyFileAndResource(true);
+ try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource)) {
+ writer.startChunkGroup("d1");
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1", "s2"),
+ new TimeRange[] {new TimeRange(10, 40)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false, true));
+ writer.endChunkGroup();
+ writer.endFile();
+ }
+ resource.setTimeIndex(new ArrayDeviceTimeIndex());
+ RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
+ scanUtil.scanTsFile(true);
+ Assert.assertFalse(scanUtil.isBrokenFile());
+ Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics());
}
@Test
@@ -104,7 +224,7 @@ public class RepairDataFileScanUtilTest extends
AbstractCompactionTest {
RepairDataFileScanUtil scanUtil1 = new RepairDataFileScanUtil(resource1);
scanUtil1.scanTsFile();
Assert.assertFalse(scanUtil1.isBrokenFile());
- Assert.assertTrue(scanUtil1.hasUnsortedData());
+ Assert.assertTrue(scanUtil1.hasUnsortedDataOrWrongStatistics());
TsFileResource resource2 = createEmptyFileAndResource(true);
try (CompactionTestFileWriter writer = new
CompactionTestFileWriter(resource2)) {
@@ -133,7 +253,7 @@ public class RepairDataFileScanUtilTest extends
AbstractCompactionTest {
RepairDataFileScanUtil scanUtil2 = new RepairDataFileScanUtil(resource2);
scanUtil2.scanTsFile();
Assert.assertFalse(scanUtil2.isBrokenFile());
- Assert.assertTrue(scanUtil2.hasUnsortedData());
+ Assert.assertTrue(scanUtil2.hasUnsortedDataOrWrongStatistics());
}
@Test
@@ -162,6 +282,6 @@ public class RepairDataFileScanUtilTest extends
AbstractCompactionTest {
RepairDataFileScanUtil scanUtil = new RepairDataFileScanUtil(resource);
scanUtil.scanTsFile();
Assert.assertFalse(scanUtil.isBrokenFile());
- Assert.assertTrue(scanUtil.hasUnsortedData());
+ Assert.assertTrue(scanUtil.hasUnsortedDataOrWrongStatistics());
}
}