This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 9cc4cbfe175 [To dev/1.3] Pipe: Added progressIndex reset function to
the resource resetting script (#15957)
9cc4cbfe175 is described below
commit 9cc4cbfe1752822d7152cc73d194a6fb0736dafb
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 16 19:00:21 2025 +0800
[To dev/1.3] Pipe: Added progressIndex reset function to the resource
resetting script (#15957)
* Pipe: Added progressIndex reset function to the resource resetting script
* bat-fix
---
...pipe.bat => reset-resource-pipe-statistics.bat} | 4 +-
...y-pipe.sh => reset-resource-pipe-statistics.sh} | 4 +-
...va => TsFileResourcePipeStatisticsSetTool.java} | 125 ++++++++++++++-------
3 files changed, 86 insertions(+), 47 deletions(-)
diff --git
a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.bat
b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.bat
similarity index 92%
rename from
iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.bat
rename to
iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.bat
index 7d2b867bba9..098072a4b4f 100644
---
a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.bat
+++
b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.bat
@@ -19,7 +19,7 @@
@echo off
echo ````````````````````````````````````````````````````````````````````````
-echo Starting Validating the isGeneratedByPipe Mark in TsFile Resources
+echo Starting Resetting the Pipe Related Statistics in TsFile Resources
echo ````````````````````````````````````````````````````````````````````````
if "%OS%" == "Windows_NT" setlocal
@@ -28,7 +28,7 @@ pushd %~dp0..\..
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
popd
-if NOT DEFINED MAIN_CLASS set
MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool
+if NOT DEFINED MAIN_CLASS set
MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourcePipeStatisticsSetTool
if NOT DEFINED JAVA_HOME goto :err
@REM
-----------------------------------------------------------------------------
diff --git
a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.sh
b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.sh
similarity index 90%
rename from
iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.sh
rename to
iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.sh
index daabae671f3..5dc3fab8699 100644
---
a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.sh
+++
b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.sh
@@ -19,7 +19,7 @@
#
echo
------------------------------------------------------------------------------------
-echo Starting Validating the isGeneratedByPipe Mark in TsFile Resources
+echo Starting Resetting the Pipe Related Statistics in TsFile Resources
echo
------------------------------------------------------------------------------------
source "$(dirname "$0")/../../sbin/iotdb-common.sh"
@@ -45,7 +45,7 @@ for f in ${IOTDB_HOME}/lib/*.jar; do
CLASSPATH=${CLASSPATH}":"$f
done
-MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool
+MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourcePipeStatisticsSetTool
"$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
exit $?
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourcePipeStatisticsSetTool.java
similarity index 60%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourcePipeStatisticsSetTool.java
index e14f7e81d05..9c4c73361a7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourcePipeStatisticsSetTool.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.tools.validate;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.tsfile.common.constant.TsFileConstant;
@@ -37,33 +38,34 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
+public class TsFileResourcePipeStatisticsSetTool {
private static final Logger LOGGER =
- org.slf4j.LoggerFactory.getLogger(
- TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.class);
+
org.slf4j.LoggerFactory.getLogger(TsFileResourcePipeStatisticsSetTool.class);
private static final String USAGE =
- "Usage: --expected true|false --dirs <dir1> <dir2> ...\n"
- + " --expected: whether the TsFileResource is expected to be
generated by pipe\n"
+ "Usage: [--isGeneratedByPipe true|false] [--resetProgressIndex] --dirs
<dir1> <dir2> ...\n"
+ + " --isGeneratedByPipe: whether the TsFileResource is
isGeneratedByPipe to be generated by pipe\n"
+ + " --resetProgressIndex: whether to reset the TsFileResources'
progressIndexes\n"
+ " --dirs: list of data directories to validate and repair";
private static final Set<File> dataDirs = new ConcurrentSkipListSet<>();
- private static final AtomicBoolean expectedMark = new AtomicBoolean(true);
+ private static AtomicBoolean isGeneratedByPipeMark = null;
+ private static boolean resetProgressIndex = false;
private static final AtomicLong runtime = new
AtomicLong(System.currentTimeMillis());
private static final AtomicInteger totalTsFileNum = new AtomicInteger(0);
- private static final AtomicInteger toRepairTsFileNum = new AtomicInteger(0);
+ private static final AtomicInteger toResetFlagNum = new AtomicInteger(0);
+ private static final AtomicInteger toResetProgressIndexNum = new
AtomicInteger(0);
+ private static final AtomicInteger changedNum = new AtomicInteger(0);
- // Usage: --expected true|false --dirs <dir1> <dir2> ...
+ // Usage: [--isGeneratedByPipe true|false] [--resetProgressIndex] --dirs
<dir1> <dir2> ...
public static void main(String[] args) throws IOException {
parseCommandLineArgs(args);
final List<File> partitionDirs = findAllPartitionDirs();
partitionDirs.parallelStream()
- .forEach(
- TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool
- ::validateAndRepairTsFileResourcesInPartition);
+
.forEach(TsFileResourcePipeStatisticsSetTool::validateAndRepairTsFileResourcesInPartition);
printStatistics();
}
@@ -74,14 +76,17 @@ public class
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
if (args.length == 0
|| argSet.contains("--help")
|| argSet.contains("-h")
- || !(argSet.contains("--expected") && argSet.contains("--dirs"))) {
+ || !((argSet.contains("--isGeneratedByPipe") ||
argSet.contains("--resetProgressIndex"))
+ && argSet.contains("--dirs"))) {
LOGGER.info(USAGE);
System.exit(1);
}
for (int i = 0; i < args.length; i++) {
- if ("--expected".equals(args[i]) && i + 1 < args.length) {
- expectedMark.set(Boolean.parseBoolean(args[++i]));
+ if ("--isGeneratedByPipe".equals(args[i]) && i + 1 < args.length) {
+ isGeneratedByPipeMark = new
AtomicBoolean(Boolean.parseBoolean(args[++i]));
+ } else if ("--resetProgressIndex".equals(args[i])) {
+ resetProgressIndex = true;
} else if ("--dirs".equals(args[i]) && i + 1 < args.length) {
i++;
while (i < args.length && !args[i].startsWith("--")) {
@@ -102,7 +107,8 @@ public class
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
}
LOGGER.info("------------------------------------------------------");
- LOGGER.info("Expected mark: {}", expectedMark.get());
+ LOGGER.info("isGeneratedByPipe mark: {}", isGeneratedByPipeMark);
+ LOGGER.info("resetProgressIndex: {}", resetProgressIndex);
LOGGER.info("Data directories: ");
for (File dir : dataDirs) {
LOGGER.info(" {}", dir.getAbsolutePath());
@@ -144,19 +150,20 @@ public class
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
}
private static void validateAndRepairTsFileResourcesInPartition(final File
partitionDir) {
- final AtomicInteger totalResources = new AtomicInteger();
- final AtomicInteger toRepairResources = new AtomicInteger();
+ final AtomicInteger totalTsFileResource = new AtomicInteger(0);
+ final AtomicInteger toResetFlagResource = new AtomicInteger(0);
+ final AtomicInteger toResetProgressIndexResource = new AtomicInteger(0);
+ final AtomicInteger changedResource = new AtomicInteger(0);
try {
final List<TsFileResource> resources =
loadAllTsFileResources(Collections.singletonList(partitionDir));
- totalResources.addAndGet(resources.size());
+ totalTsFileResource.addAndGet(resources.size());
for (final TsFileResource resource : resources) {
try {
- if (validateAndRepairSingleTsFileResource(resource)) {
- toRepairResources.incrementAndGet();
- }
+ validateAndRepairSingleTsFileResource(
+ resource, toResetFlagResource, toResetProgressIndexResource,
changedResource);
} catch (final Exception e) {
// Continue processing other resources even if one fails
LOGGER.warn(
@@ -174,13 +181,17 @@ public class
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
e);
}
- totalTsFileNum.addAndGet(totalResources.get());
- toRepairTsFileNum.addAndGet(toRepairResources.get());
+ totalTsFileNum.addAndGet(totalTsFileResource.get());
+ toResetFlagNum.addAndGet(toResetFlagResource.get());
+ toResetProgressIndexNum.addAndGet(toResetProgressIndexResource.get());
+ changedNum.addAndGet(changedResource.get());
LOGGER.info(
- "TimePartition {} has {} total resources, {} to repair resources.
Process completed.",
+ "TimePartition {} has {} total resources, {} to set isGeneratedByPipe
resources, {} to reset progressIndex resources, {} changed resources. Process
completed.",
partitionDir,
- totalResources.get(),
- toRepairResources.get());
+ totalTsFileResource.get(),
+ toResetFlagResource.get(),
+ toResetProgressIndexResource.get(),
+ changedResource.get());
}
private static List<TsFileResource> loadAllTsFileResources(List<File>
timePartitionDirs)
@@ -217,37 +228,63 @@ public class
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
* @param resource the TsFileResource to validate and repair
* @return true if the resource needs to be repaired and false if it is valid
*/
- private static boolean validateAndRepairSingleTsFileResource(TsFileResource
resource) {
- if (resource.isGeneratedByPipe() == expectedMark.get()) {
+ private static void validateAndRepairSingleTsFileResource(
+ final TsFileResource resource,
+ final AtomicInteger toResetFlagResource,
+ final AtomicInteger toResetProgressIndexResource,
+ final AtomicInteger changedResource) {
+ boolean skip = true;
+ if (Objects.nonNull(isGeneratedByPipeMark)
+ && resource.isGeneratedByPipe() != isGeneratedByPipeMark.get()) {
// The resource is valid, no need to repair
- return false;
+ LOGGER.info(
+ "Repairing TsFileResource: {}, isGeneratedByPipe mark: {}, actual
mark: {}",
+ resource.getTsFile().getAbsolutePath(),
+ isGeneratedByPipeMark.get(),
+ resource.isGeneratedByPipe());
+
+ toResetFlagResource.getAndIncrement();
+ skip = false;
}
- LOGGER.info(
- "Repairing TsFileResource: {}, expected mark: {}, actual mark: {}",
- resource.getTsFile().getAbsolutePath(),
- expectedMark.get(),
- resource.isGeneratedByPipe());
+ if (resetProgressIndex && resource.getMaxProgressIndex() !=
MinimumProgressIndex.INSTANCE) {
+ // The resource is valid, no need to repair
+ LOGGER.info(
+ "Resetting TsFileResource:{} 's progressIndex to minimum, original
progressIndex: {}",
+ resource.getTsFile().getAbsolutePath(),
+ resource.getMaxProgressIndex());
+
+ toResetProgressIndexResource.getAndIncrement();
+ skip = false;
+ }
+ if (skip) {
+ return;
+ }
+ changedResource.getAndIncrement();
try {
repairSingleTsFileResource(resource);
LOGGER.info(
"Marked TsFileResource as {} in resource: {}",
- expectedMark.get(),
+ isGeneratedByPipeMark.get(),
+ resource.getTsFile().getAbsolutePath());
+ LOGGER.info(
+ "Reset TsFileResource:{} 's progressIndex to minimum.",
resource.getTsFile().getAbsolutePath());
} catch (final Exception e) {
LOGGER.warn(
- "ERROR: Failed to repair TsFileResource: {}, error: {}",
- resource.getTsFile().getAbsolutePath(),
- e.getMessage());
+ "ERROR: Failed to repair TsFileResource: {}",
resource.getTsFile().getAbsolutePath(), e);
}
-
- return true;
}
private static void repairSingleTsFileResource(TsFileResource resource)
throws IOException {
- resource.setGeneratedByPipe(expectedMark.get());
+ if (Objects.nonNull(isGeneratedByPipeMark)) {
+ resource.setGeneratedByPipe(isGeneratedByPipeMark.get());
+ }
+ if (resetProgressIndex) {
+ resource.setProgressIndex(MinimumProgressIndex.INSTANCE);
+ }
resource.serialize();
}
@@ -255,9 +292,11 @@ public class
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
LOGGER.info("------------------------------------------------------");
LOGGER.info("Validation and repair completed. Statistics:");
LOGGER.info(
- "Total time taken: {} ms, total TsFile resources: {}, repaired TsFile
resources: {}",
+ "Total time taken: {} ms, total TsFile resources: {}, set
isGeneratedByPipe resources: {}, reset progressIndex resources: {}, changed
resources: {}",
System.currentTimeMillis() - runtime.get(),
totalTsFileNum.get(),
- toRepairTsFileNum.get());
+ toResetFlagNum.get(),
+ toResetProgressIndexNum.get(),
+ changedNum.get());
}
}