This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch refactor_tsfile_validation_tool in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e69a36213bd0cd1aa00980e80404ca6c587ae461 Author: Tian Jiang <[email protected]> AuthorDate: Wed Jul 31 10:29:05 2024 +0800 Abstract with TsFileSequenceScan --- .../iotdb/db/tools/utils/TsFileSequenceScan.java | 163 +++++++++ .../iotdb/db/tools/utils/TsFileValidationScan.java | 374 +++++++++++++++++++ .../db/tools/validate/TsFileValidationTool.java | 399 +-------------------- .../compaction/AbstractCompactionTest.java | 2 +- .../FastCrossCompactionPerformerTest.java | 2 +- ...eCompactionWithFastPerformerValidationTest.java | 2 +- ...actionWithReadPointPerformerValidationTest.java | 2 +- 7 files changed, 557 insertions(+), 387 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileSequenceScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileSequenceScan.java new file mode 100644 index 00000000000..bc9bcad6306 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileSequenceScan.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.tools.utils; + +import org.apache.tsfile.common.conf.TSFileConfig; +import org.apache.tsfile.file.MetaMarker; +import org.apache.tsfile.file.header.ChunkGroupHeader; +import org.apache.tsfile.file.header.ChunkHeader; +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Pair; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.nio.ByteBuffer; + +public abstract class TsFileSequenceScan { + + protected boolean printToFile; + protected PrintWriter pw = null; + + protected TsFileSequenceReader reader; + protected byte marker; + protected File file; + + protected IDeviceID currDeviceID; + protected String currMeasurementID; + protected Pair<IDeviceID, String> currTimeseriesID; + protected boolean currChunkOnePage; + + /** + * @return true if the file should be scanned + */ + protected boolean onFileOpen(File file) throws IOException { + // skip head magic string + reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1); + this.file = file; + return true; + } + + protected void onFileEnd() throws IOException {} + + protected void onChunk(PageVisitor pageVisitor) throws IOException { + ChunkHeader chunkHeader = reader.readChunkHeader(marker); + if (chunkHeader.getDataSize() == 0) { + // empty value chunk + return; + } + currMeasurementID = chunkHeader.getMeasurementID(); + currTimeseriesID = new Pair<>(currDeviceID, currMeasurementID); + + int dataSize = chunkHeader.getDataSize(); + + while (dataSize > 0) { + PageHeader pageHeader = + reader.readPageHeader( + chunkHeader.getDataType(), + (chunkHeader.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); + ByteBuffer pageData = reader.readPage(pageHeader, chunkHeader.getCompressionType()); + pageVisitor.onPage(pageHeader, pageData, chunkHeader); + dataSize -= pageHeader.getSerializedPageSize(); + } + } + + protected void onChunkGroup() throws IOException { + ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader(); + currDeviceID = chunkGroupHeader.getDeviceID(); + } + + protected abstract void onTimePage( + PageHeader pageHeader, ByteBuffer pageData, ChunkHeader chunkHeader) throws IOException; + + protected abstract void onValuePage( + PageHeader pageHeader, ByteBuffer pageData, ChunkHeader chunkHeader) throws IOException; + + protected abstract void onNonAlignedPage( + PageHeader pageHeader, ByteBuffer pageData, ChunkHeader chunkHeader) throws IOException; + + protected interface PageVisitor { + void onPage(PageHeader pageHeader, ByteBuffer pageData, ChunkHeader chunkHeader) + throws IOException; + } + + @SuppressWarnings("java:S106") + protected void printBoth(String msg) { + System.out.println(msg); + if (printToFile && pw != null) { + pw.println(msg); + } + } + + protected abstract void onException(Throwable t); + + @SuppressWarnings("java:S1181") + public void scanTsFile(File tsFile) { + try (TsFileSequenceReader r = new TsFileSequenceReader(tsFile.getAbsolutePath())) { + this.reader = r; + boolean shouldScan = onFileOpen(tsFile); + if (!shouldScan) { + return; + } + // start reading data points in sequence + while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { + switch (marker) { + case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: + currChunkOnePage = true; + onChunk(this::onNonAlignedPage); + break; + case MetaMarker.CHUNK_HEADER: + currChunkOnePage = false; + onChunk(this::onNonAlignedPage); + break; + case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: + currChunkOnePage = true; + onChunk(this::onTimePage); + break; + case MetaMarker.TIME_CHUNK_HEADER: + currChunkOnePage = false; + onChunk(this::onTimePage); + break; + case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: + currChunkOnePage = true; + onChunk(this::onValuePage); + break; + case MetaMarker.VALUE_CHUNK_HEADER: + currChunkOnePage = false; + onChunk(this::onValuePage); + break; + case MetaMarker.CHUNK_GROUP_HEADER: + onChunkGroup(); + break; + case MetaMarker.OPERATION_INDEX_RANGE: + reader.readPlanIndex(); + break; + default: + MetaMarker.handleUnexpectedMarker(marker); + } + } + + onFileEnd(); + } catch (Throwable e) { + onException(e); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileValidationScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileValidationScan.java new file mode 100644 index 00000000000..a3dfc8cdc5c --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/utils/TsFileValidationScan.java @@ -0,0 +1,374 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.tools.utils; + +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.apache.tsfile.common.conf.TSFileDescriptor; +import org.apache.tsfile.encoding.decoder.Decoder; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.header.ChunkHeader; +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.PlainDeviceID; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.common.BatchData; +import org.apache.tsfile.read.reader.page.PageReader; +import org.apache.tsfile.read.reader.page.TimePageReader; +import org.apache.tsfile.utils.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@SuppressWarnings("java:S2175") +public class TsFileValidationScan extends TsFileSequenceScan { + + private static final Logger LOGGER = LoggerFactory.getLogger(TsFileValidationScan.class); + private static final String STR_FIND_BAD_FILE = "-- Find the bad file "; + private static final String STR_OVERLAP_LATTER_FILE = ", overlap with latter files."; + private static final String STR_OVERLAP_BETWEEN_FILE = + " overlap between files, with previous file "; + private static final String STR_TIMESERIES = "-------- Timeseries "; + private static final String FILE_NON_EXIST = ""; + private static final IDeviceID EMPTY_DEVICE_ID = new PlainDeviceID(""); + + protected boolean printDetails; + protected int badFileNum; + + // deviceID -> has checked overlap or not + protected Map<IDeviceID, Boolean> hasCheckedDeviceOverlap; + protected Map<Pair<IDeviceID, String>, SeriesOverlapPrintInfo> hasSeriesPrintedDetails; + // measurementId -> lastChunkEndTime in current file + protected Map<Pair<IDeviceID, String>, Long> lashChunkEndTime; + // measurementID -> <fileName, [lastTime, endTimeInLastFile]> + protected Map<Pair<IDeviceID, String>, FileLastTimeInfo> timeseriesLastTimeMap = new HashMap<>(); + // deviceID -> <fileName, endTime>, the endTime of device in the last seq file + protected Map<IDeviceID, FileLastTimeInfo> deviceEndTime = new HashMap<>(); + // fileName -> isBadFile + protected Map<String, Boolean> isBadFileMap = new HashMap<>(); + protected List<String> previousBadFileMsgs = new ArrayList<>(); + protected TsFileResource resource; + + protected long currentChunkEndTime; + protected long currentPageEndTime; + protected long lastPageEndTime; + + @Override + protected boolean onFileOpen(File file) throws IOException { + super.onFileOpen(file); + + hasCheckedDeviceOverlap = new HashMap<>(); + currDeviceID = EMPTY_DEVICE_ID; + hasSeriesPrintedDetails = new HashMap<>(); + lashChunkEndTime = new HashMap<>(); + + resource = new TsFileResource(file); + if (!new File(file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists()) { + // resource file does not exist, tsfile may not be flushed yet + LOGGER.warn( + "{} does not exist ,skip it.", file.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX); + return false; + } else { + resource.deserialize(); + } + isBadFileMap.put(file.getName(), false); + return true; + } + + @Override + protected void onFileEnd() { + // record the end time of each timeseries in current file + for (Map.Entry<Pair<IDeviceID, String>, Long> entry : lashChunkEndTime.entrySet()) { + FileLastTimeInfo fileNameLastTime = + timeseriesLastTimeMap.computeIfAbsent(currTimeseriesID, id -> new FileLastTimeInfo()); + if (fileNameLastTime.endTimeInLastFile <= entry.getValue()) { + fileNameLastTime.endTimeInLastFile = entry.getValue(); + fileNameLastTime.lastFileName = file.getName(); + } + } + } + + @Override + protected void onChunkGroup() throws IOException { + FileLastTimeInfo fileNameLastTimePair = + deviceEndTime.computeIfAbsent(currDeviceID, k -> new FileLastTimeInfo()); + if (!currDeviceID.equals(EMPTY_DEVICE_ID)) { + long endTime = resource.getEndTime(currDeviceID); + // record the end time of last device in current file + if (endTime > fileNameLastTimePair.lastTime) { + fileNameLastTimePair.lastFileName = file.getName(); + fileNameLastTimePair.endTimeInLastFile = endTime; + } + } + + super.onChunkGroup(); + + fileNameLastTimePair = deviceEndTime.computeIfAbsent(currDeviceID, k -> new FileLastTimeInfo()); + if (!Boolean.TRUE.equals(hasCheckedDeviceOverlap.getOrDefault(currDeviceID, false)) + && resource.getStartTime(currDeviceID) <= fileNameLastTimePair.endTimeInLastFile) { + // device overlap, find bad file + recordDeviceOverlap(fileNameLastTimePair.lastFileName); + } + hasCheckedDeviceOverlap.put(currDeviceID, true); + } + + private void recordDeviceOverlap(String badFileName) { + // add previous bad file msg to list + if (!Boolean.TRUE.equals(isBadFileMap.get(badFileName))) { + if (printDetails) { + previousBadFileMsgs.add( + STR_FIND_BAD_FILE + + file.getParentFile().getAbsolutePath() + + File.separator + + deviceEndTime.get(currDeviceID).lastFileName + + STR_OVERLAP_LATTER_FILE); + } else { + previousBadFileMsgs.add( + file.getParentFile().getAbsolutePath() + + File.separator + + deviceEndTime.get(currDeviceID).lastFileName); + } + isBadFileMap.put(badFileName, true); + badFileNum++; + } + // print current file + if (!Boolean.TRUE.equals(isBadFileMap.get(file.getName()))) { + if (printDetails) { + printBoth(STR_FIND_BAD_FILE + file.getAbsolutePath()); + } else { + printBoth(file.getAbsolutePath()); + } + isBadFileMap.put(file.getName(), true); + badFileNum++; + } + if (printDetails) { + printBoth( + "---- Device " + + currDeviceID + + STR_OVERLAP_BETWEEN_FILE + + deviceEndTime.get(currDeviceID).lastFileName); + } + } + + @Override + protected void onTimePage(PageHeader pageHeader, ByteBuffer pageData, ChunkHeader chunkHeader) + throws IOException { + // Time Chunk + Decoder defaultTimeDecoder = + Decoder.getDecoderByType( + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), + TSDataType.INT64); + TimePageReader timePageReader = new TimePageReader(pageHeader, pageData, defaultTimeDecoder); + long[] timeBatch = timePageReader.getNextTimeBatch(); + FileLastTimeInfo fileNameLastTimePair = + timeseriesLastTimeMap.computeIfAbsent(currTimeseriesID, id -> new FileLastTimeInfo()); + for (long timestamp : timeBatch) { + onTimeStamp(timestamp, fileNameLastTimePair); + } + } + + protected void markInFileOverlap() { + if (!Boolean.TRUE.equals(isBadFileMap.get(file.getName()))) { + if (printDetails) { + printBoth(STR_FIND_BAD_FILE + file.getAbsolutePath()); + } else { + printBoth(file.getAbsolutePath()); + } + isBadFileMap.put(file.getName(), true); + badFileNum++; + } + } + + private void markBetweenFileOverlap(String overLapFile) { + // overlap between file, then add previous bad file path to list + if (!Boolean.TRUE.equals(isBadFileMap.getOrDefault(overLapFile, false))) { + if (printDetails) { + previousBadFileMsgs.add( + STR_FIND_BAD_FILE + + file.getParentFile().getAbsolutePath() + + File.separator + + overLapFile + + STR_OVERLAP_LATTER_FILE); + } else { + previousBadFileMsgs.add( + file.getParentFile().getAbsolutePath() + File.separator + overLapFile); + } + isBadFileMap.put(overLapFile, true); + badFileNum++; + } + } + + protected void printOverlapDetails(long timestamp, FileLastTimeInfo seriesLastTime) { + SeriesOverlapPrintInfo seriesOverlapPrintInfo = + hasSeriesPrintedDetails.computeIfAbsent( + currTimeseriesID, k -> new SeriesOverlapPrintInfo()); + if (timestamp <= seriesLastTime.endTimeInLastFile) { + if (!seriesOverlapPrintInfo.betweenFileOverlapPrinted) { + printBoth( + STR_TIMESERIES + + currTimeseriesID + + STR_OVERLAP_BETWEEN_FILE + + seriesLastTime.lastFileName); + seriesOverlapPrintInfo.betweenFileOverlapPrinted = true; + } + } else if (timestamp <= lashChunkEndTime.getOrDefault(currTimeseriesID, Long.MIN_VALUE)) { + if (!seriesOverlapPrintInfo.chunkOverlapPrinted) { + printBoth(STR_TIMESERIES + currTimeseriesID + " overlap between chunks"); + seriesOverlapPrintInfo.chunkOverlapPrinted = true; + } + } else if (timestamp <= lastPageEndTime) { + if (!seriesOverlapPrintInfo.crossPageOverlapPrinted) { + printBoth(STR_TIMESERIES + currTimeseriesID + " overlap between pages"); + seriesOverlapPrintInfo.crossPageOverlapPrinted = true; + } + } else { + if (!seriesOverlapPrintInfo.inPagePOverlapPrinted) { + printBoth(STR_TIMESERIES + currTimeseriesID + " overlap within one page"); + seriesOverlapPrintInfo.inPagePOverlapPrinted = true; + } + } + } + + @Override + protected void onValuePage(PageHeader pageHeader, ByteBuffer pageData, ChunkHeader chunkHeader) { + // Value Page, skip it + } + + @Override + protected void onNonAlignedPage( + PageHeader pageHeader, ByteBuffer pageData, ChunkHeader chunkHeader) throws IOException { + // NonAligned Chunk + Decoder defaultTimeDecoder = + Decoder.getDecoderByType( + TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), + TSDataType.INT64); + Decoder valueDecoder = + Decoder.getDecoderByType(chunkHeader.getEncodingType(), chunkHeader.getDataType()); + PageReader pageReader = + new PageReader(pageData, chunkHeader.getDataType(), valueDecoder, defaultTimeDecoder); + BatchData batchData = pageReader.getAllSatisfiedPageData(); + FileLastTimeInfo fileNameLastTimePair = + timeseriesLastTimeMap.computeIfAbsent(currTimeseriesID, id -> new FileLastTimeInfo()); + while (batchData.hasCurrent()) { + long timestamp = batchData.currentTime(); + onTimeStamp(timestamp, fileNameLastTimePair); + batchData.next(); + } + } + + private void onTimeStamp(long timestamp, FileLastTimeInfo fileNameLastTimePair) { + if (timestamp <= fileNameLastTimePair.lastTime) { + // find bad file + markInFileOverlap(); + + if (timestamp <= fileNameLastTimePair.endTimeInLastFile) { + markBetweenFileOverlap(fileNameLastTimePair.lastFileName); + } + + if (printDetails) { + printOverlapDetails(timestamp, fileNameLastTimePair); + } + } else { + fileNameLastTimePair.lastTime = timestamp; + currentPageEndTime = timestamp; + currentChunkEndTime = timestamp; + } + } + + @Override + protected void onChunk(PageVisitor pageVisitor) throws IOException { + currentChunkEndTime = Long.MIN_VALUE; + lastPageEndTime = Long.MIN_VALUE; + + super.onChunk(pageVisitor); + + lashChunkEndTime.put( + currTimeseriesID, + Math.max( + lashChunkEndTime.getOrDefault(currTimeseriesID, Long.MIN_VALUE), currentChunkEndTime)); + } + + public List<String> getPreviousBadFileMsgs() { + return previousBadFileMsgs; + } + + @Override + protected void onException(Throwable t) { + LOGGER.error("Meet errors in reading file {} , skip it.", file.getAbsolutePath(), t); + if (!Boolean.TRUE.equals(isBadFileMap.get(file.getName()))) { + if (printDetails) { + printBoth( + "-- Meet errors in reading file " + + file.getAbsolutePath() + + ", tsfile may be corrupted."); + } else { + printBoth(file.getAbsolutePath()); + } + isBadFileMap.put(file.getName(), true); + badFileNum++; + } + } + + public int getBadFileNum() { + return badFileNum; + } + + @TestOnly + public void setBadFileNum(int badFileNum) { + this.badFileNum = badFileNum; + } + + public void reset(boolean resetBadFileNum) { + if (resetBadFileNum) { + badFileNum = 0; + } + timeseriesLastTimeMap.clear(); + deviceEndTime.clear(); + isBadFileMap.clear(); + } + + protected static class FileLastTimeInfo { + + public FileLastTimeInfo() { + lastFileName = FILE_NON_EXIST; + lastTime = Long.MIN_VALUE; + endTimeInLastFile = Long.MIN_VALUE; + } + + private String lastFileName; + private long lastTime; + private long endTimeInLastFile; + } + + protected static class SeriesOverlapPrintInfo { + + private boolean betweenFileOverlapPrinted; + private boolean chunkOverlapPrinted; + private boolean crossPageOverlapPrinted; + private boolean inPagePOverlapPrinted; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileValidationTool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileValidationTool.java index 2b88553b6b8..3e0b5169fc9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileValidationTool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileValidationTool.java @@ -18,25 +18,8 @@ */ package org.apache.iotdb.db.tools.validate; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.tools.utils.TsFileValidationScan; -import org.apache.tsfile.common.conf.TSFileConfig; -import org.apache.tsfile.common.conf.TSFileDescriptor; -import org.apache.tsfile.common.constant.TsFileConstant; -import org.apache.tsfile.encoding.decoder.Decoder; -import org.apache.tsfile.enums.TSDataType; -import org.apache.tsfile.file.MetaMarker; -import org.apache.tsfile.file.header.ChunkGroupHeader; -import org.apache.tsfile.file.header.ChunkHeader; -import org.apache.tsfile.file.header.PageHeader; -import org.apache.tsfile.file.metadata.IDeviceID; -import org.apache.tsfile.file.metadata.PlainDeviceID; -import org.apache.tsfile.file.metadata.enums.TSEncoding; -import org.apache.tsfile.read.TsFileSequenceReader; -import org.apache.tsfile.read.common.BatchData; -import org.apache.tsfile.read.reader.page.PageReader; -import org.apache.tsfile.read.reader.page.TimePageReader; -import org.apache.tsfile.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,18 +27,14 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.regex.Pattern; import java.util.stream.Collectors; -import static org.apache.iotdb.commons.conf.IoTDBConstant.PATH_SEPARATOR; import static org.apache.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; /** @@ -86,16 +65,8 @@ public class TsFileValidationTool { private static final Logger logger = LoggerFactory.getLogger(TsFileValidationTool.class); private static final List<File> seqDataDirList = new ArrayList<>(); private static final List<File> fileList = new ArrayList<>(); - public static int badFileNum = 0; - // measurementID -> <fileName, [lastTime, endTimeInLastFile]> - private static final Map<String, Pair<String, long[]>> measurementLastTime = new HashMap<>(); - - // deviceID -> <fileName, endTime>, the endTime of device in the last seq file - private static final Map<IDeviceID, Pair<String, Long>> deviceEndTime = new HashMap<>(); - - // fileName -> isBadFile - private static final Map<String, Boolean> isBadFileMap = new HashMap<>(); + private static final TsFileValidationScan validationScan = new TsFileValidationScan(); /** * The form of param is: [path of data dir or tsfile] [-pd = print details or not] [-f = path of @@ -180,7 +151,7 @@ public class TsFileValidationTool { } } if (printDetails) { - printBoth("Finish checking successfully, totally find " + badFileNum + " bad files."); + printBoth("Finish checking successfully, totally find " + getBadFileNum() + " bad files."); } if (printToFile) { pw.close(); @@ -189,351 +160,10 @@ public class TsFileValidationTool { public static void findUncorrectFiles(List<File> tsFiles) { for (File tsFile : tsFiles) { - List<String> previousBadFileMsgs = new ArrayList<>(); - try { - TsFileResource resource = new TsFileResource(tsFile); - if (!new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).exists()) { - // resource file does not exist, tsfile may not be flushed yet - logger.warn( - "{} does not exist ,skip it.", - tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX); - continue; - } else { - resource.deserialize(); - } - isBadFileMap.put(tsFile.getName(), false); - - try (TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { - // deviceID -> has checked overlap or not - Map<IDeviceID, Boolean> hasCheckedDeviceOverlap = new HashMap<>(); - reader.position((long) TSFileConfig.MAGIC_STRING.getBytes().length + 1); - byte marker; - IDeviceID deviceID = new PlainDeviceID(""); - Map<String, boolean[]> hasMeasurementPrintedDetails = new HashMap<>(); - // measurementId -> lastChunkEndTime in current file - Map<String, Long> lashChunkEndTime = new HashMap<>(); - - // start reading data points in sequence - while ((marker = reader.readMarker()) != MetaMarker.SEPARATOR) { - switch (marker) { - case MetaMarker.CHUNK_HEADER: - case MetaMarker.TIME_CHUNK_HEADER: - case MetaMarker.VALUE_CHUNK_HEADER: - case MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER: - case MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER: - case MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER: - ChunkHeader header = reader.readChunkHeader(marker); - if (header.getDataSize() == 0) { - // empty value chunk - break; - } - long currentChunkEndTime = Long.MIN_VALUE; - String measurementID = - ((PlainDeviceID) deviceID).toStringID() - + PATH_SEPARATOR - + header.getMeasurementID(); - hasMeasurementPrintedDetails.computeIfAbsent(measurementID, k -> new boolean[4]); - measurementLastTime.computeIfAbsent( - measurementID, - k -> { - long[] arr = new long[2]; - Arrays.fill(arr, Long.MIN_VALUE); - return new Pair<>("", arr); - }); - Decoder defaultTimeDecoder = - Decoder.getDecoderByType( - TSEncoding.valueOf( - TSFileDescriptor.getInstance().getConfig().getTimeEncoder()), - TSDataType.INT64); - Decoder valueDecoder = - Decoder.getDecoderByType(header.getEncodingType(), header.getDataType()); - int dataSize = header.getDataSize(); - long lastPageEndTime = Long.MIN_VALUE; - while (dataSize > 0) { - valueDecoder.reset(); - PageHeader pageHeader = - reader.readPageHeader( - header.getDataType(), - (header.getChunkType() & 0x3F) == MetaMarker.CHUNK_HEADER); - ByteBuffer pageData = reader.readPage(pageHeader, header.getCompressionType()); - long currentPageEndTime = Long.MIN_VALUE; - if ((header.getChunkType() & (byte) TsFileConstant.TIME_COLUMN_MASK) - == (byte) TsFileConstant.TIME_COLUMN_MASK) { - // Time Chunk - TimePageReader timePageReader = - new TimePageReader(pageHeader, pageData, defaultTimeDecoder); - long[] timeBatch = timePageReader.getNextTimeBatch(); - for (int i = 0; i < timeBatch.length; i++) { - long timestamp = timeBatch[i]; - if (timestamp <= measurementLastTime.get(measurementID).right[0]) { - // find bad file - if (timestamp <= measurementLastTime.get(measurementID).right[1]) { - // overlap between file, then add previous bad file path to list - String lastBadFile = measurementLastTime.get(measurementID).left; - if (!isBadFileMap.get(lastBadFile)) { - if (printDetails) { - previousBadFileMsgs.add( - "-- Find the bad file " - + tsFile.getParentFile().getAbsolutePath() - + File.separator - + lastBadFile - + ", overlap with later files."); - } else { - previousBadFileMsgs.add( - tsFile.getParentFile().getAbsolutePath() - + File.separator - + lastBadFile); - } - isBadFileMap.put(lastBadFile, true); - badFileNum++; - } - } - - if (!isBadFileMap.get(tsFile.getName())) { - if (printDetails) { - printBoth("-- Find the bad file " + tsFile.getAbsolutePath()); - } else { - printBoth(tsFile.getAbsolutePath()); - } - isBadFileMap.put(tsFile.getName(), true); - badFileNum++; - } - if (printDetails) { - if (timestamp <= measurementLastTime.get(measurementID).right[1]) { - if (!hasMeasurementPrintedDetails.get(measurementID)[0]) { - printBoth( - "-------- Timeseries " - + measurementID - + " overlap between files, with previous file " - + measurementLastTime.get(measurementID).left); - hasMeasurementPrintedDetails.get(measurementID)[0] = true; - } - } else if (timestamp - <= lashChunkEndTime.getOrDefault(measurementID, Long.MIN_VALUE)) { - if (!hasMeasurementPrintedDetails.get(measurementID)[1]) { - printBoth( - "-------- Timeseries " - + measurementID - + " overlap between chunks"); - hasMeasurementPrintedDetails.get(measurementID)[1] = true; - } - } else if (timestamp <= lastPageEndTime) { - if (!hasMeasurementPrintedDetails.get(measurementID)[2]) { - printBoth( - "-------- Timeseries " - + measurementID - + " overlap between pages"); - hasMeasurementPrintedDetails.get(measurementID)[2] = true; - } - } else { - if (!hasMeasurementPrintedDetails.get(measurementID)[3]) { - printBoth( - "-------- Timeseries " - + measurementID - + " overlap within one page"); - hasMeasurementPrintedDetails.get(measurementID)[3] = true; - } - } - } - } else { - measurementLastTime.get(measurementID).right[0] = timestamp; - currentPageEndTime = timestamp; - currentChunkEndTime = timestamp; - } - } - } else if ((header.getChunkType() & (byte) TsFileConstant.VALUE_COLUMN_MASK) - == (byte) TsFileConstant.VALUE_COLUMN_MASK) { - // Value Chunk, skip it - } else { - // NonAligned Chunk - PageReader pageReader = - new PageReader( - pageData, header.getDataType(), valueDecoder, defaultTimeDecoder); - BatchData batchData = pageReader.getAllSatisfiedPageData(); - while (batchData.hasCurrent()) { - long timestamp = batchData.currentTime(); - if (timestamp <= measurementLastTime.get(measurementID).right[0]) { - // find bad file - if (timestamp <= measurementLastTime.get(measurementID).right[1]) { - // overlap between file, then add previous bad file path to list - if (!isBadFileMap.get(measurementLastTime.get(measurementID).left)) { - if (printDetails) { - previousBadFileMsgs.add( - "-- Find the bad file " - + tsFile.getParentFile().getAbsolutePath() - + File.separator - + measurementLastTime.get(measurementID).left - + ", overlap with later files."); - } else { - previousBadFileMsgs.add( - tsFile.getParentFile().getAbsolutePath() - + File.separator - + measurementLastTime.get(measurementID).left); - } - badFileNum++; - isBadFileMap.put(measurementLastTime.get(measurementID).left, true); - } - } - if (!isBadFileMap.get(tsFile.getName())) { - if (printDetails) { - printBoth("-- Find the bad file " + tsFile.getAbsolutePath()); - } else { - printBoth(tsFile.getAbsolutePath()); - } - isBadFileMap.put(tsFile.getName(), true); - badFileNum++; - } - if (printDetails) { - if (timestamp <= measurementLastTime.get(measurementID).right[1]) { - if (!hasMeasurementPrintedDetails.get(measurementID)[0]) { - printBoth( - "-------- Timeseries " - + measurementID - + " overlap between files, with previous file " - + measurementLastTime.get(measurementID).left); - hasMeasurementPrintedDetails.get(measurementID)[0] = true; - } - } else if (timestamp - <= lashChunkEndTime.getOrDefault(measurementID, Long.MIN_VALUE)) { - if (!hasMeasurementPrintedDetails.get(measurementID)[1]) { - printBoth( - "-------- Timeseries " - + measurementID - + " overlap between chunks"); - hasMeasurementPrintedDetails.get(measurementID)[1] = true; - } - } else if (timestamp <= lastPageEndTime) { - if (!hasMeasurementPrintedDetails.get(measurementID)[2]) { - printBoth( - "-------- Timeseries " - + measurementID - + " overlap between pages"); - hasMeasurementPrintedDetails.get(measurementID)[2] = true; - } - } else { - if (!hasMeasurementPrintedDetails.get(measurementID)[3]) { - printBoth( - "-------- Timeseries " - + measurementID - + " overlap within one page"); - hasMeasurementPrintedDetails.get(measurementID)[3] = true; - } - } - } - } else { - measurementLastTime.get(measurementID).right[0] = timestamp; - currentPageEndTime = timestamp; - currentChunkEndTime = timestamp; - } - batchData.next(); - } - } - dataSize -= pageHeader.getSerializedPageSize(); - lastPageEndTime = Math.max(lastPageEndTime, currentPageEndTime); - } - lashChunkEndTime.put( - measurementID, - Math.max( - lashChunkEndTime.getOrDefault(measurementID, Long.MIN_VALUE), - currentChunkEndTime)); - break; - case MetaMarker.CHUNK_GROUP_HEADER: - if (!deviceID.equals(new PlainDeviceID(""))) { - // record the end time of last device in current file - if (resource.getEndTime(deviceID) - > deviceEndTime.computeIfAbsent( - deviceID, - k -> { - return new Pair<>("", Long.MIN_VALUE); - }) - .right) { - deviceEndTime.get(deviceID).left = tsFile.getName(); - deviceEndTime.get(deviceID).right = resource.getEndTime(deviceID); - } - } - ChunkGroupHeader chunkGroupHeader = reader.readChunkGroupHeader(); - deviceID = chunkGroupHeader.getDeviceID(); - if (!hasCheckedDeviceOverlap.getOrDefault(deviceID, false) - && resource.getStartTime(deviceID) - <= deviceEndTime.computeIfAbsent( - deviceID, - k -> { - return new Pair<>("", Long.MIN_VALUE); - }) - .right) { - // find bad file - // add prevous bad file msg to list - if (!isBadFileMap.get(deviceEndTime.get(deviceID).left)) { - if (printDetails) { - previousBadFileMsgs.add( - "-- Find the bad file " - + tsFile.getParentFile().getAbsolutePath() - + File.separator - + deviceEndTime.get(deviceID).left - + ", overlap with later files."); - } else { - previousBadFileMsgs.add( - tsFile.getParentFile().getAbsolutePath() - + File.separator - + deviceEndTime.get(deviceID).left); - } - isBadFileMap.put(deviceEndTime.get(deviceID).left, true); - badFileNum++; - } - // print current file - if (!isBadFileMap.get(tsFile.getName())) { - if (printDetails) { - printBoth("-- Find the bad file " + tsFile.getAbsolutePath()); - } else { - printBoth(tsFile.getAbsolutePath()); - } - isBadFileMap.put(tsFile.getName(), true); - badFileNum++; - } - if (printDetails) { - printBoth( - "---- Device " - + deviceID - + " overlap between files, with previous file " - + deviceEndTime.get(deviceID).left); - } - } - hasCheckedDeviceOverlap.put(deviceID, true); - break; - case MetaMarker.OPERATION_INDEX_RANGE: - reader.readPlanIndex(); - break; - default: - MetaMarker.handleUnexpectedMarker(marker); - } - } - - // record the end time of each timeseries in current file - for (Map.Entry<String, Long> entry : lashChunkEndTime.entrySet()) { - if (measurementLastTime.get(entry.getKey()).right[1] <= entry.getValue()) { - measurementLastTime.get(entry.getKey()).right[1] = entry.getValue(); - measurementLastTime.get(entry.getKey()).left = tsFile.getName(); - } - } - } - } catch (Throwable e) { - logger.error("Meet errors in reading file {} , skip it.", tsFile.getAbsolutePath(), e); - if (!isBadFileMap.get(tsFile.getName())) { - if (printDetails) { - printBoth( - "-- Meet errors in reading file " - + tsFile.getAbsolutePath() - + ", tsfile may be corrupted."); - } else { - printBoth(tsFile.getAbsolutePath()); - } - isBadFileMap.put(tsFile.getName(), true); - badFileNum++; - } - } finally { - for (String msg : previousBadFileMsgs) { - printBoth(msg); - } + validationScan.getPreviousBadFileMsgs().clear(); + validationScan.scanTsFile(tsFile); + for (String msg : validationScan.getPreviousBadFileMsgs()) { + printBoth(msg); } } } @@ -579,12 +209,7 @@ public class TsFileValidationTool { } public static void clearMap(boolean resetBadFileNum) { - if (resetBadFileNum) { - badFileNum = 0; - } - measurementLastTime.clear(); - deviceEndTime.clear(); - isBadFileMap.clear(); + validationScan.reset(resetBadFileNum); } private static boolean checkIsDirectory(File dir) { @@ -602,4 +227,12 @@ public class TsFileValidationTool { pw.println(msg); } } + + public static int getBadFileNum() { + return validationScan.getBadFileNum(); + } + + public static void setBadFileNum(int badFileNum) { + validationScan.setBadFileNum(badFileNum); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java index 8bd47bbd917..49bd5593cc0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java @@ -526,7 +526,7 @@ public class AbstractCompactionTest { files.add(resource.getTsFile()); } TsFileValidationTool.findUncorrectFiles(files); - Assert.assertEquals(0, TsFileValidationTool.badFileNum); + Assert.assertEquals(0, TsFileValidationTool.getBadFileNum()); } protected Map<PartialPath, List<TimeValuePair>> readSourceFiles( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java index 483363f6ec5..9cf7b2b488b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/FastCrossCompactionPerformerTest.java @@ -4397,6 +4397,6 @@ public class FastCrossCompactionPerformerTest extends AbstractCompactionTest { files.add(resource.getTsFile()); } TsFileValidationTool.findUncorrectFiles(files); - Assert.assertEquals(0, TsFileValidationTool.badFileNum); + Assert.assertEquals(0, TsFileValidationTool.getBadFileNum()); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithFastPerformerValidationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithFastPerformerValidationTest.java index 6808c8d23d5..c895f95f9de 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithFastPerformerValidationTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithFastPerformerValidationTest.java @@ -100,7 +100,7 @@ public class CrossSpaceCompactionWithFastPerformerValidationTest extends Abstrac Thread.currentThread().setName(oldThreadName); FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders(); SystemInfo.getInstance().setMemorySizeForCompaction(compactionMemory); - TsFileValidationTool.badFileNum = 0; + TsFileValidationTool.setBadFileNum(0); } /** diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithReadPointPerformerValidationTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithReadPointPerformerValidationTest.java index ff41c9a9902..48ad6d366d0 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithReadPointPerformerValidationTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/cross/CrossSpaceCompactionWithReadPointPerformerValidationTest.java @@ -98,7 +98,7 @@ public class CrossSpaceCompactionWithReadPointPerformerValidationTest Thread.currentThread().setName(oldThreadName); FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders(); SystemInfo.getInstance().setMemorySizeForCompaction(compactionMemory); - TsFileValidationTool.badFileNum = 0; + TsFileValidationTool.setBadFileNum(0); } /**
