This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch fix-resource-pipe-mark
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 2d8aa300bff5740c0561f495172f8045cad2125a
Author: Steve Yurong Su <[email protected]>
AuthorDate: Mon Jul 14 15:36:36 2025 +0800

    Create TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java
---
 ...GeneratedByPipeMarkValidationAndRepairTool.java | 286 +++++++++++++++++++++
 1 file changed, 286 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java
new file mode 100644
index 00000000000..50d82bb5a0d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/validate/TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.tools.validate;
+
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.log.CompactionLogger;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool {
+
+  private static final String USAGE =
+      "Usage: --expected true|false --dirs <dir1> <dir2> ...\n"
+          + "  --expected: whether the TsFileResource is expected to be 
generated by pipe\n"
+          + "  --dirs: list of data directories to validate and repair";
+
+  private static final Set<File> dataDirs = new ConcurrentSkipListSet<>();
+  private static final AtomicBoolean expectedMark = new AtomicBoolean(true);
+
+  private static final AtomicLong runtime = new 
AtomicLong(System.currentTimeMillis());
+
+  private static final AtomicInteger totalTsFileNum = new AtomicInteger(0);
+  private static final AtomicInteger toRepairTsFileNum = new AtomicInteger(0);
+
+  // Usage: --expected true|false --dirs <dir1> <dir2> ...
+  // TODO: support validating and repairing specific time partition directories
+  public static void main(String[] args) throws IOException {
+    parseCommandLineArgs(args);
+    final Map<String, List<File>> partitionDirs = findAllPartitionDirs();
+    partitionDirs.entrySet().parallelStream()
+        .forEach(
+            TsFileResourceIsGeneratedByPipeMarkValidationAndRepairTool
+                ::validateAndRepairTsFileResourcesInPartition);
+    printStatistics();
+  }
+
+  private static void parseCommandLineArgs(final String[] args) {
+    for (int i = 0; i < args.length; i++) {
+      if ("--expected".equals(args[i]) && i + 1 < args.length) {
+        expectedMark.set(Boolean.parseBoolean(args[++i]));
+      } else if ("--dirs".equals(args[i]) && i + 1 < args.length) {
+        i++;
+        while (i < args.length && !args[i].startsWith("--")) {
+          dataDirs.add(new File(args[i++]));
+        }
+        i--;
+      } else {
+        System.out.println("Unknown argument: " + args[i]);
+        System.out.println(USAGE);
+        // Exit if an unknown argument is encountered
+        System.exit(1);
+      }
+    }
+
+    if (dataDirs.isEmpty()) {
+      System.out.println(
+          "No data directories provided. Please specify with --dirs <dir1> 
<dir2> ...");
+      System.exit(1);
+    }
+
+    System.out.println("Expected mark: " + expectedMark.get());
+    System.out.println("Data directories: ");
+    for (File dir : dataDirs) {
+      System.out.println("  " + dir.getAbsolutePath());
+      if (!dir.exists() || !dir.isDirectory()) {
+        System.out.println("Invalid directory: " + dir.getAbsolutePath());
+        System.exit(1);
+      }
+    }
+  }
+
+  private static Map<String, List<File>> findAllPartitionDirs() {
+    final Map<String, List<File>> partitionMap = new HashMap<>();
+
+    for (final File dataDir : dataDirs) {
+      if (!dataDir.exists() || !dataDir.isDirectory()) {
+        System.out.println(dataDir.getAbsolutePath() + " is not a valid 
directory");
+        continue;
+      }
+
+      for (final File seqOrUnseqDataDir : 
Objects.requireNonNull(dataDir.listFiles())) {
+        if (!(seqOrUnseqDataDir.isDirectory()
+            && (seqOrUnseqDataDir.getName().equals("sequence")
+                || seqOrUnseqDataDir.getName().equals("unsequence")))) {
+          System.out.println(
+              seqOrUnseqDataDir.getAbsolutePath() + " is not a sequence or 
unsequence directory");
+          continue;
+        }
+
+        for (final File sg : 
Objects.requireNonNull(seqOrUnseqDataDir.listFiles())) {
+          if (!sg.isDirectory()) {
+            System.out.println(sg.getAbsolutePath() + " is not a valid 
directory");
+            continue;
+          }
+
+          for (final File dataRegionDir : 
Objects.requireNonNull(sg.listFiles())) {
+            if (!dataRegionDir.isDirectory()) {
+              System.out.println(dataRegionDir.getAbsolutePath() + " is not a 
valid directory");
+              continue;
+            }
+
+            for (final File timePartitionDir : 
Objects.requireNonNull(dataRegionDir.listFiles())) {
+              if (!timePartitionDir.isDirectory()) {
+                System.out.println(
+                    timePartitionDir.getAbsolutePath() + " is not a valid 
directory");
+                continue;
+              }
+
+              final String partitionKey =
+                  calculateTimePartitionKey(
+                      sg.getName(), dataRegionDir.getName(), 
timePartitionDir.getName());
+              final List<File> partitionDirs =
+                  partitionMap.computeIfAbsent(partitionKey, v -> new 
ArrayList<>());
+              partitionDirs.add(timePartitionDir);
+            }
+          }
+        }
+      }
+    }
+
+    return partitionMap;
+  }
+
+  private static String calculateTimePartitionKey(
+      final String storageGroup, final String dataRegion, final String 
timePartition) {
+    return storageGroup + "-" + dataRegion + "-" + timePartition;
+  }
+
+  private static void validateAndRepairTsFileResourcesInPartition(
+      final Map.Entry<String, List<File>> partitionEntry) {
+    final String partitionName = partitionEntry.getKey();
+    final List<File> partitionDirs = partitionEntry.getValue();
+
+    final AtomicInteger totalResources = new AtomicInteger();
+    final AtomicInteger toRepairResources = new AtomicInteger();
+
+    for (final File partitionDir : partitionDirs) {
+      try {
+        final List<TsFileResource> resources =
+            loadAllTsFileResources(Collections.singletonList(partitionDir));
+        totalResources.addAndGet(resources.size());
+
+        for (final TsFileResource resource : resources) {
+          if (validateAndRepairSingleTsFileResource(resource)) {
+            toRepairResources.incrementAndGet();
+          }
+        }
+      } catch (final Exception e) {
+        System.err.printf(
+            "Error loading resources from partition %s: %s%n",
+            partitionDir.getAbsolutePath(), e.getMessage());
+      }
+    }
+
+    totalTsFileNum.addAndGet(totalResources.get());
+    toRepairTsFileNum.addAndGet(toRepairResources.get());
+    System.out.printf(
+        "TimePartition %s has %d total resources, %d to repair resources. 
Process completed.\n",
+        partitionName, totalResources.get(), toRepairResources.get());
+  }
+
+  private static List<TsFileResource> loadAllTsFileResources(List<File> 
timePartitionDirs)
+      throws IOException {
+    final List<TsFileResource> resources = new ArrayList<>();
+
+    for (File timePartitionDir : timePartitionDirs) {
+      for (File tsfile : Objects.requireNonNull(timePartitionDir.listFiles())) 
{
+        String filePath = tsfile.getAbsolutePath();
+        // has compaction log
+        if 
(filePath.endsWith(CompactionLogger.INNER_COMPACTION_LOG_NAME_SUFFIX)
+            || 
filePath.endsWith(CompactionLogger.CROSS_COMPACTION_LOG_NAME_SUFFIX)) {
+          System.out.println(
+              "Time partition "
+                  + timePartitionDir.getName()
+                  + " is skipped because a compaction is not finished");
+          return Collections.emptyList();
+        }
+
+        if (!filePath.endsWith(TsFileConstant.TSFILE_SUFFIX) || 
!tsfile.isFile()) {
+          continue;
+        }
+        String resourcePath = tsfile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX;
+
+        if (!new File(resourcePath).exists()) {
+          System.out.println(
+              tsfile.getAbsolutePath() + " is skipped because resource file is 
not exist.");
+          continue;
+        }
+
+        TsFileResource resource = new TsFileResource(tsfile);
+        resource.deserialize();
+        resource.close();
+        resources.add(resource);
+      }
+    }
+
+    return resources;
+  }
+
+  /**
+   * Validates and repairs a single TsFileResource.
+   *
+   * @param resource the TsFileResource to validate and repair
+   * @return true if the resource needs to be repaired and false if it is valid
+   */
+  private static boolean validateAndRepairSingleTsFileResource(TsFileResource 
resource) {
+    if (resource.isGeneratedByPipe() == expectedMark.get()) {
+      // The resource is valid, no need to repair
+      return false;
+    }
+
+    System.out.println(
+        "Repairing TsFileResource: "
+            + resource.getTsFile().getAbsolutePath()
+            + ", expected mark: "
+            + expectedMark.get()
+            + ", actual mark: "
+            + resource.isGeneratedByPipe());
+
+    try {
+      repairSingleTsFileResource(resource);
+
+      System.out.println(
+          "Marked TsFileResource as"
+              + expectedMark.get()
+              + " in resource: "
+              + resource.getTsFile().getAbsolutePath());
+    } catch (final Exception e) {
+      System.out.println(
+          "ERROR: Failed to repair TsFileResource: "
+              + resource.getTsFile().getAbsolutePath()
+              + ", error: "
+              + e.getMessage());
+    }
+
+    return true;
+  }
+
+  private static void repairSingleTsFileResource(TsFileResource resource) 
throws IOException {
+    resource.setGeneratedByPipe(expectedMark.get());
+    resource.serialize();
+  }
+
+  private static void printStatistics() {
+    
System.out.println("\n------------------------------------------------------");
+    System.out.println("Validation and repair completed. Statistics:");
+    System.out.println(
+        "Total time taken: "
+            + (System.currentTimeMillis() - runtime.get())
+            + " ms, total TsFile resources: "
+            + totalTsFileNum.get()
+            + ", to repair TsFile resources: "
+            + toRepairTsFileNum.get());
+  }
+}

Reply via email to