This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit ebdf275a3509be03471abd73c5473bb88b87169a Author: Caideyipi <[email protected]> AuthorDate: Wed Jul 16 19:00:21 2025 +0800 [To dev/1.3] Pipe: Added progressIndex reset function to the resource resetting script (#15957) * Pipe: Added progressIndex reset function to the resource resetting script * bat-fix --- ...pipe.bat => reset-resource-pipe-statistics.bat} | 4 +- ...y-pipe.sh => reset-resource-pipe-statistics.sh} | 4 +- ...va => TsFileResourcePipeStatisticsSetTool.java} | 125 ++++++++++++++------- 3 files changed, 86 insertions(+), 47 deletions(-) diff --git a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.bat b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.bat similarity index 92% rename from iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.bat rename to iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.bat index 7d2b867bba9..098072a4b4f 100644 --- a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.bat +++ b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.bat @@ -19,7 +19,7 @@ @echo off echo ```````````````````````````````````````````````````````````````````````` -echo Starting Validating the isGeneratedByPipe Mark in TsFile Resources +echo Starting Resetting the Pipe Related Statistics in TsFile Resources echo ```````````````````````````````````````````````````````````````````````` if "%OS%" == "Windows_NT" setlocal @@ -28,7 +28,7 @@ pushd %~dp0..\.. if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD% popd -if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool +if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourcePipeStatisticsSetTool if NOT DEFINED JAVA_HOME goto :err @REM ----------------------------------------------------------------------------- diff --git a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.sh b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.sh similarity index 90% rename from iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.sh rename to iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.sh index daabae671f3..5dc3fab8699 100644 --- a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.sh +++ b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.sh @@ -19,7 +19,7 @@ # echo ------------------------------------------------------------------------------------ -echo Starting Validating the isGeneratedByPipe Mark in TsFile Resources +echo Starting Resetting the Pipe Related Statistics in TsFile Resources echo ------------------------------------------------------------------------------------ source "$(dirname "$0")/../../sbin/iotdb-common.sh" @@ -45,7 +45,7 @@ for f in ${IOTDB_HOME}/lib/*.jar; do CLASSPATH=${CLASSPATH}":"$f done -MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool +MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourcePipeStatisticsSetTool "$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@" exit $? diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourcePipeStatisticsSetTool.java similarity index 60% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourcePipeStatisticsSetTool.java index e14f7e81d05..9c4c73361a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourcePipeStatisticsSetTool.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.tools.validate; +import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.common.constant.TsFileConstant; @@ -37,33 +38,34 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool { +public class TsFileResourcePipeStatisticsSetTool { private static final Logger LOGGER = - org.slf4j.LoggerFactory.getLogger( - TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.class); + org.slf4j.LoggerFactory.getLogger(TsFileResourcePipeStatisticsSetTool.class); private static final String USAGE = - "Usage: --expected true|false --dirs <dir1> <dir2> ...\n" - + " --expected: whether the TsFileResource is expected to be generated by pipe\n" + "Usage: [--isGeneratedByPipe true|false] [--resetProgressIndex] --dirs <dir1> <dir2> ...\n" + + " --isGeneratedByPipe: whether the TsFileResource is isGeneratedByPipe to be generated by pipe\n" + + " --resetProgressIndex: whether to reset the TsFileResources' progressIndexes\n" + " --dirs: list of data directories to validate and repair"; private static final Set<File> dataDirs = new ConcurrentSkipListSet<>(); - private static final AtomicBoolean expectedMark = new AtomicBoolean(true); + private static AtomicBoolean isGeneratedByPipeMark = null; + private static boolean resetProgressIndex = false; private static final AtomicLong runtime = new AtomicLong(System.currentTimeMillis()); private static final AtomicInteger totalTsFileNum = new AtomicInteger(0); - private static final AtomicInteger toRepairTsFileNum = new AtomicInteger(0); + private static final AtomicInteger toResetFlagNum = new AtomicInteger(0); + private static final AtomicInteger toResetProgressIndexNum = new AtomicInteger(0); + private static final AtomicInteger changedNum = new AtomicInteger(0); - // Usage: --expected true|false --dirs <dir1> <dir2> ... + // Usage: [--isGeneratedByPipe true|false] [--resetProgressIndex] --dirs <dir1> <dir2> ... public static void main(String[] args) throws IOException { parseCommandLineArgs(args); final List<File> partitionDirs = findAllPartitionDirs(); partitionDirs.parallelStream() - .forEach( - TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool - ::validateAndRepairTsFileResourcesInPartition); + .forEach(TsFileResourcePipeStatisticsSetTool::validateAndRepairTsFileResourcesInPartition); printStatistics(); } @@ -74,14 +76,17 @@ public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool { if (args.length == 0 || argSet.contains("--help") || argSet.contains("-h") - || !(argSet.contains("--expected") && argSet.contains("--dirs"))) { + || !((argSet.contains("--isGeneratedByPipe") || argSet.contains("--resetProgressIndex")) + && argSet.contains("--dirs"))) { LOGGER.info(USAGE); System.exit(1); } for (int i = 0; i < args.length; i++) { - if ("--expected".equals(args[i]) && i + 1 < args.length) { - expectedMark.set(Boolean.parseBoolean(args[++i])); + if ("--isGeneratedByPipe".equals(args[i]) && i + 1 < args.length) { + isGeneratedByPipeMark = new AtomicBoolean(Boolean.parseBoolean(args[++i])); + } else if ("--resetProgressIndex".equals(args[i])) { + resetProgressIndex = true; } else if ("--dirs".equals(args[i]) && i + 1 < args.length) { i++; while (i < args.length && !args[i].startsWith("--")) { @@ -102,7 +107,8 @@ public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool { } LOGGER.info("------------------------------------------------------"); - LOGGER.info("Expected mark: {}", expectedMark.get()); + LOGGER.info("isGeneratedByPipe mark: {}", isGeneratedByPipeMark); + LOGGER.info("resetProgressIndex: {}", resetProgressIndex); LOGGER.info("Data directories: "); for (File dir : dataDirs) { LOGGER.info(" {}", dir.getAbsolutePath()); @@ -144,19 +150,20 @@ public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool { } private static void validateAndRepairTsFileResourcesInPartition(final File partitionDir) { - final AtomicInteger totalResources = new AtomicInteger(); - final AtomicInteger toRepairResources = new AtomicInteger(); + final AtomicInteger totalTsFileResource = new AtomicInteger(0); + final AtomicInteger toResetFlagResource = new AtomicInteger(0); + final AtomicInteger toResetProgressIndexResource = new AtomicInteger(0); + final AtomicInteger changedResource = new AtomicInteger(0); try { final List<TsFileResource> resources = loadAllTsFileResources(Collections.singletonList(partitionDir)); - totalResources.addAndGet(resources.size()); + totalTsFileResource.addAndGet(resources.size()); for (final TsFileResource resource : resources) { try { - if (validateAndRepairSingleTsFileResource(resource)) { - toRepairResources.incrementAndGet(); - } + validateAndRepairSingleTsFileResource( + resource, toResetFlagResource, toResetProgressIndexResource, changedResource); } catch (final Exception e) { // Continue processing other resources even if one fails LOGGER.warn( @@ -174,13 +181,17 @@ public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool { e); } - totalTsFileNum.addAndGet(totalResources.get()); - toRepairTsFileNum.addAndGet(toRepairResources.get()); + totalTsFileNum.addAndGet(totalTsFileResource.get()); + toResetFlagNum.addAndGet(toResetFlagResource.get()); + toResetProgressIndexNum.addAndGet(toResetProgressIndexResource.get()); + changedNum.addAndGet(changedResource.get()); LOGGER.info( - "TimePartition {} has {} total resources, {} to repair resources. Process completed.", + "TimePartition {} has {} total resources, {} to set isGeneratedByPipe resources, {} to reset progressIndex resources, {} changed resources. Process completed.", partitionDir, - totalResources.get(), - toRepairResources.get()); + totalTsFileResource.get(), + toResetFlagResource.get(), + toResetProgressIndexResource.get(), + changedResource.get()); } private static List<TsFileResource> loadAllTsFileResources(List<File> timePartitionDirs) @@ -217,37 +228,63 @@ public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool { * @param resource the TsFileResource to validate and repair * @return true if the resource needs to be repaired and false if it is valid */ - private static boolean validateAndRepairSingleTsFileResource(TsFileResource resource) { - if (resource.isGeneratedByPipe() == expectedMark.get()) { + private static void validateAndRepairSingleTsFileResource( + final TsFileResource resource, + final AtomicInteger toResetFlagResource, + final AtomicInteger toResetProgressIndexResource, + final AtomicInteger changedResource) { + boolean skip = true; + if (Objects.nonNull(isGeneratedByPipeMark) + && resource.isGeneratedByPipe() != isGeneratedByPipeMark.get()) { // The resource is valid, no need to repair - return false; + LOGGER.info( + "Repairing TsFileResource: {}, isGeneratedByPipe mark: {}, actual mark: {}", + resource.getTsFile().getAbsolutePath(), + isGeneratedByPipeMark.get(), + resource.isGeneratedByPipe()); + + toResetFlagResource.getAndIncrement(); + skip = false; } - LOGGER.info( - "Repairing TsFileResource: {}, expected mark: {}, actual mark: {}", - resource.getTsFile().getAbsolutePath(), - expectedMark.get(), - resource.isGeneratedByPipe()); + if (resetProgressIndex && resource.getMaxProgressIndex() != MinimumProgressIndex.INSTANCE) { + // The resource is valid, no need to repair + LOGGER.info( + "Resetting TsFileResource:{} 's progressIndex to minimum, original progressIndex: {}", + resource.getTsFile().getAbsolutePath(), + resource.getMaxProgressIndex()); + + toResetProgressIndexResource.getAndIncrement(); + skip = false; + } + if (skip) { + return; + } + changedResource.getAndIncrement(); try { repairSingleTsFileResource(resource); LOGGER.info( "Marked TsFileResource as {} in resource: {}", - expectedMark.get(), + isGeneratedByPipeMark.get(), + resource.getTsFile().getAbsolutePath()); + LOGGER.info( + "Reset TsFileResource:{} 's progressIndex to minimum.", resource.getTsFile().getAbsolutePath()); } catch (final Exception e) { LOGGER.warn( - "ERROR: Failed to repair TsFileResource: {}, error: {}", - resource.getTsFile().getAbsolutePath(), - e.getMessage()); + "ERROR: Failed to repair TsFileResource: {}", resource.getTsFile().getAbsolutePath(), e); } - - return true; } private static void repairSingleTsFileResource(TsFileResource resource) throws IOException { - resource.setGeneratedByPipe(expectedMark.get()); + if (Objects.nonNull(isGeneratedByPipeMark)) { + resource.setGeneratedByPipe(isGeneratedByPipeMark.get()); + } + if (resetProgressIndex) { + resource.setProgressIndex(MinimumProgressIndex.INSTANCE); + } resource.serialize(); } @@ -255,9 +292,11 @@ public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool { LOGGER.info("------------------------------------------------------"); LOGGER.info("Validation and repair completed. Statistics:"); LOGGER.info( - "Total time taken: {} ms, total TsFile resources: {}, repaired TsFile resources: {}", + "Total time taken: {} ms, total TsFile resources: {}, set isGeneratedByPipe resources: {}, reset progressIndex resources: {}, changed resources: {}", System.currentTimeMillis() - runtime.get(), totalTsFileNum.get(), - toRepairTsFileNum.get()); + toResetFlagNum.get(), + toResetProgressIndexNum.get(), + changedNum.get()); } }
