This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 9cc4cbfe175 [To dev/1.3] Pipe: Added progressIndex reset function to 
the resource resetting script (#15957)
9cc4cbfe175 is described below

commit 9cc4cbfe1752822d7152cc73d194a6fb0736dafb
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 16 19:00:21 2025 +0800

    [To dev/1.3] Pipe: Added progressIndex reset function to the resource 
resetting script (#15957)
    
    * Pipe: Added progressIndex reset function to the resource resetting script
    
    * bat-fix
---
 ...pipe.bat => reset-resource-pipe-statistics.bat} |   4 +-
 ...y-pipe.sh => reset-resource-pipe-statistics.sh} |   4 +-
 ...va => TsFileResourcePipeStatisticsSetTool.java} | 125 ++++++++++++++-------
 3 files changed, 86 insertions(+), 47 deletions(-)

diff --git 
a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.bat
 
b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.bat
similarity index 92%
rename from 
iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.bat
rename to 
iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.bat
index 7d2b867bba9..098072a4b4f 100644
--- 
a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.bat
+++ 
b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.bat
@@ -19,7 +19,7 @@
 
 @echo off
 echo ````````````````````````````````````````````````````````````````````````
-echo Starting Validating the isGeneratedByPipe Mark in TsFile Resources
+echo Starting Resetting the Pipe Related Statistics in TsFile Resources
 echo ````````````````````````````````````````````````````````````````````````
 
 if "%OS%" == "Windows_NT" setlocal
@@ -28,7 +28,7 @@ pushd %~dp0..\..
 if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
 popd
 
-if NOT DEFINED MAIN_CLASS set 
MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool
+if NOT DEFINED MAIN_CLASS set 
MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourcePipeStatisticsSetTool
 if NOT DEFINED JAVA_HOME goto :err
 
 @REM 
-----------------------------------------------------------------------------
diff --git 
a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.sh
 
b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.sh
similarity index 90%
rename from 
iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.sh
rename to 
iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.sh
index daabae671f3..5dc3fab8699 100644
--- 
a/iotdb-core/datanode/src/assembly/resources/tools/tsfile/mark-is-generated-by-pipe.sh
+++ 
b/iotdb-core/datanode/src/assembly/resources/tools/tsfile/reset-resource-pipe-statistics.sh
@@ -19,7 +19,7 @@
 #
 
 echo 
------------------------------------------------------------------------------------
-echo Starting Validating the isGeneratedByPipe Mark in TsFile Resources
+echo Starting Resetting the Pipe Related Statistics in TsFile Resources
 echo 
------------------------------------------------------------------------------------
 
 source "$(dirname "$0")/../../sbin/iotdb-common.sh"
@@ -45,7 +45,7 @@ for f in ${IOTDB_HOME}/lib/*.jar; do
   CLASSPATH=${CLASSPATH}":"$f
 done
 
-MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool
+MAIN_CLASS=org.apache.iotdb.db.tools.validate.TsFileResourcePipeStatisticsSetTool
 
 "$JAVA" -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
 exit $?
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourcePipeStatisticsSetTool.java
similarity index 60%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourcePipeStatisticsSetTool.java
index e14f7e81d05..9c4c73361a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourcePipeStatisticsSetTool.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.tools.validate;
 
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
@@ -37,33 +38,34 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
+public class TsFileResourcePipeStatisticsSetTool {
 
   private static final Logger LOGGER =
-      org.slf4j.LoggerFactory.getLogger(
-          TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.class);
+      
org.slf4j.LoggerFactory.getLogger(TsFileResourcePipeStatisticsSetTool.class);
 
   private static final String USAGE =
-      "Usage: --expected true|false --dirs <dir1> <dir2> ...\n"
-          + "  --expected: whether the TsFileResource is expected to be 
generated by pipe\n"
+      "Usage: [--isGeneratedByPipe true|false] [--resetProgressIndex] --dirs 
<dir1> <dir2> ...\n"
+          + "  --isGeneratedByPipe: whether the TsFileResource is 
isGeneratedByPipe to be generated by pipe\n"
+          + "  --resetProgressIndex: whether to reset the TsFileResources' 
progressIndexes\n"
           + "  --dirs: list of data directories to validate and repair";
 
   private static final Set<File> dataDirs = new ConcurrentSkipListSet<>();
-  private static final AtomicBoolean expectedMark = new AtomicBoolean(true);
+  private static AtomicBoolean isGeneratedByPipeMark = null;
+  private static boolean resetProgressIndex = false;
 
   private static final AtomicLong runtime = new 
AtomicLong(System.currentTimeMillis());
 
   private static final AtomicInteger totalTsFileNum = new AtomicInteger(0);
-  private static final AtomicInteger toRepairTsFileNum = new AtomicInteger(0);
+  private static final AtomicInteger toResetFlagNum = new AtomicInteger(0);
+  private static final AtomicInteger toResetProgressIndexNum = new 
AtomicInteger(0);
+  private static final AtomicInteger changedNum = new AtomicInteger(0);
 
-  // Usage: --expected true|false --dirs <dir1> <dir2> ...
+  // Usage: [--isGeneratedByPipe true|false] [--resetProgressIndex] --dirs 
<dir1> <dir2> ...
   public static void main(String[] args) throws IOException {
     parseCommandLineArgs(args);
     final List<File> partitionDirs = findAllPartitionDirs();
     partitionDirs.parallelStream()
-        .forEach(
-            TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool
-                ::validateAndRepairTsFileResourcesInPartition);
+        
.forEach(TsFileResourcePipeStatisticsSetTool::validateAndRepairTsFileResourcesInPartition);
     printStatistics();
   }
 
@@ -74,14 +76,17 @@ public class 
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
     if (args.length == 0
         || argSet.contains("--help")
         || argSet.contains("-h")
-        || !(argSet.contains("--expected") && argSet.contains("--dirs"))) {
+        || !((argSet.contains("--isGeneratedByPipe") || 
argSet.contains("--resetProgressIndex"))
+            && argSet.contains("--dirs"))) {
       LOGGER.info(USAGE);
       System.exit(1);
     }
 
     for (int i = 0; i < args.length; i++) {
-      if ("--expected".equals(args[i]) && i + 1 < args.length) {
-        expectedMark.set(Boolean.parseBoolean(args[++i]));
+      if ("--isGeneratedByPipe".equals(args[i]) && i + 1 < args.length) {
+        isGeneratedByPipeMark = new 
AtomicBoolean(Boolean.parseBoolean(args[++i]));
+      } else if ("--resetProgressIndex".equals(args[i])) {
+        resetProgressIndex = true;
       } else if ("--dirs".equals(args[i]) && i + 1 < args.length) {
         i++;
         while (i < args.length && !args[i].startsWith("--")) {
@@ -102,7 +107,8 @@ public class 
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
     }
 
     LOGGER.info("------------------------------------------------------");
-    LOGGER.info("Expected mark: {}", expectedMark.get());
+    LOGGER.info("isGeneratedByPipe mark: {}", isGeneratedByPipeMark);
+    LOGGER.info("resetProgressIndex: {}", resetProgressIndex);
     LOGGER.info("Data directories: ");
     for (File dir : dataDirs) {
       LOGGER.info("  {}", dir.getAbsolutePath());
@@ -144,19 +150,20 @@ public class 
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
   }
 
   private static void validateAndRepairTsFileResourcesInPartition(final File 
partitionDir) {
-    final AtomicInteger totalResources = new AtomicInteger();
-    final AtomicInteger toRepairResources = new AtomicInteger();
+    final AtomicInteger totalTsFileResource = new AtomicInteger(0);
+    final AtomicInteger toResetFlagResource = new AtomicInteger(0);
+    final AtomicInteger toResetProgressIndexResource = new AtomicInteger(0);
+    final AtomicInteger changedResource = new AtomicInteger(0);
 
     try {
       final List<TsFileResource> resources =
           loadAllTsFileResources(Collections.singletonList(partitionDir));
-      totalResources.addAndGet(resources.size());
+      totalTsFileResource.addAndGet(resources.size());
 
       for (final TsFileResource resource : resources) {
         try {
-          if (validateAndRepairSingleTsFileResource(resource)) {
-            toRepairResources.incrementAndGet();
-          }
+          validateAndRepairSingleTsFileResource(
+              resource, toResetFlagResource, toResetProgressIndexResource, 
changedResource);
         } catch (final Exception e) {
           // Continue processing other resources even if one fails
           LOGGER.warn(
@@ -174,13 +181,17 @@ public class 
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
           e);
     }
 
-    totalTsFileNum.addAndGet(totalResources.get());
-    toRepairTsFileNum.addAndGet(toRepairResources.get());
+    totalTsFileNum.addAndGet(totalTsFileResource.get());
+    toResetFlagNum.addAndGet(toResetFlagResource.get());
+    toResetProgressIndexNum.addAndGet(toResetProgressIndexResource.get());
+    changedNum.addAndGet(changedResource.get());
     LOGGER.info(
-        "TimePartition {} has {} total resources, {} to repair resources. 
Process completed.",
+        "TimePartition {} has {} total resources, {} to set isGeneratedByPipe 
resources, {} to reset progressIndex resources, {} changed resources. Process 
completed.",
         partitionDir,
-        totalResources.get(),
-        toRepairResources.get());
+        totalTsFileResource.get(),
+        toResetFlagResource.get(),
+        toResetProgressIndexResource.get(),
+        changedResource.get());
   }
 
   private static List<TsFileResource> loadAllTsFileResources(List<File> 
timePartitionDirs)
@@ -217,37 +228,63 @@ public class 
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
    * @param resource the TsFileResource to validate and repair
    * @return true if the resource needs to be repaired and false if it is valid
    */
-  private static boolean validateAndRepairSingleTsFileResource(TsFileResource 
resource) {
-    if (resource.isGeneratedByPipe() == expectedMark.get()) {
+  private static void validateAndRepairSingleTsFileResource(
+      final TsFileResource resource,
+      final AtomicInteger toResetFlagResource,
+      final AtomicInteger toResetProgressIndexResource,
+      final AtomicInteger changedResource) {
+    boolean skip = true;
+    if (Objects.nonNull(isGeneratedByPipeMark)
+        && resource.isGeneratedByPipe() != isGeneratedByPipeMark.get()) {
       // The resource is valid, no need to repair
-      return false;
+      LOGGER.info(
+          "Repairing TsFileResource: {}, isGeneratedByPipe mark: {}, actual 
mark: {}",
+          resource.getTsFile().getAbsolutePath(),
+          isGeneratedByPipeMark.get(),
+          resource.isGeneratedByPipe());
+
+      toResetFlagResource.getAndIncrement();
+      skip = false;
     }
 
-    LOGGER.info(
-        "Repairing TsFileResource: {}, expected mark: {}, actual mark: {}",
-        resource.getTsFile().getAbsolutePath(),
-        expectedMark.get(),
-        resource.isGeneratedByPipe());
+    if (resetProgressIndex && resource.getMaxProgressIndex() != 
MinimumProgressIndex.INSTANCE) {
+      // The resource is valid, no need to repair
+      LOGGER.info(
+          "Resetting TsFileResource:{} 's progressIndex to minimum, original 
progressIndex: {}",
+          resource.getTsFile().getAbsolutePath(),
+          resource.getMaxProgressIndex());
+
+      toResetProgressIndexResource.getAndIncrement();
+      skip = false;
+    }
+    if (skip) {
+      return;
+    }
+    changedResource.getAndIncrement();
 
     try {
       repairSingleTsFileResource(resource);
 
       LOGGER.info(
           "Marked TsFileResource as {} in resource: {}",
-          expectedMark.get(),
+          isGeneratedByPipeMark.get(),
+          resource.getTsFile().getAbsolutePath());
+      LOGGER.info(
+          "Reset TsFileResource:{} 's progressIndex to minimum.",
           resource.getTsFile().getAbsolutePath());
     } catch (final Exception e) {
       LOGGER.warn(
-          "ERROR: Failed to repair TsFileResource: {}, error: {}",
-          resource.getTsFile().getAbsolutePath(),
-          e.getMessage());
+          "ERROR: Failed to repair TsFileResource: {}", 
resource.getTsFile().getAbsolutePath(), e);
     }
-
-    return true;
   }
 
   private static void repairSingleTsFileResource(TsFileResource resource) 
throws IOException {
-    resource.setGeneratedByPipe(expectedMark.get());
+    if (Objects.nonNull(isGeneratedByPipeMark)) {
+      resource.setGeneratedByPipe(isGeneratedByPipeMark.get());
+    }
+    if (resetProgressIndex) {
+      resource.setProgressIndex(MinimumProgressIndex.INSTANCE);
+    }
     resource.serialize();
   }
 
@@ -255,9 +292,11 @@ public class 
TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
     LOGGER.info("------------------------------------------------------");
     LOGGER.info("Validation and repair completed. Statistics:");
     LOGGER.info(
-        "Total time taken: {} ms, total TsFile resources: {}, repaired TsFile 
resources: {}",
+        "Total time taken: {} ms, total TsFile resources: {}, set 
isGeneratedByPipe resources: {}, reset progressIndex resources: {}, changed 
resources: {}",
         System.currentTimeMillis() - runtime.get(),
         totalTsFileNum.get(),
-        toRepairTsFileNum.get());
+        toResetFlagNum.get(),
+        toResetProgressIndexNum.get(),
+        changedNum.get());
   }
 }

Reply via email to