Copilot commented on code in PR #2601:
URL: https://github.com/apache/orc/pull/2601#discussion_r3123026909


##########
java/tools/src/java/org/apache/orc/tools/MergeFiles.java:
##########
@@ -100,11 +155,333 @@ public static void main(Configuration conf, String[] 
args) throws Exception {
     }
   }
 
+  /**
+   * Multi-output behavior when --maxSize is set.
+   * Input files are grouped by cumulative raw file size; each group is merged 
into
+   * a separate part file (part-00000.orc, part-00001.orc, ...) under 
outputDir.
+   * A single file whose size already exceeds maxSizeBytes is placed in its 
own part.
+   */
+  private static void mergeIntoMultipleFiles(Configuration conf,
+                                              OrcFile.WriterOptions 
writerOptions,
+                                              List<LocatedFileStatus> 
inputStatuses,
+                                              List<Path> inputFiles,
+                                              Path outputDir,
+                                              long maxSizeBytes,
+                                              boolean overwrite) throws 
Exception {
+    DirMergeResult r = mergeBatchedIntoDir(conf, writerOptions, inputStatuses,
+        outputDir, maxSizeBytes, overwrite);
+
+    if (!r.unmergedFiles.isEmpty()) {
+      System.err.println("List of files that could not be merged:");
+      r.unmergedFiles.forEach(path -> System.err.println(path.toString()));
+    }
+
+    System.out.printf(
+        "Output path: %s, Input files size: %d, Merge files size: %d, Output 
files: %d%n",
+        outputDir, inputFiles.size(), r.mergedFileCount, r.partFileCount);
+    if (!r.unmergedFiles.isEmpty()) {
+      System.exit(1);
+    }
+  }
+
+  /**
+   * Preserve the relative directory structure of {@code inputRoot} under
+   * {@code outputRoot}: every directory containing ORC files (a "leaf") is 
merged
+   * independently, and its relative path is mirrored beneath {@code 
outputRoot}.
+   *
+   * <p>Leaves are located by a depth-first walk of {@code inputRoot}. A 
directory
+   * that contains both ORC files and subdirectories is considered ambiguous 
and
+   * raises an error. Hidden entries (names starting with {@code '_'} or
+   * {@code '.'}) are always skipped, matching Hive/Spark conventions for 
markers
+   * like {@code _SUCCESS}, {@code _committed_*} or {@code _temporary}.
+   *
+   * <p>If {@code --maxSize} was not supplied ({@code maxSizeBytes == 0}), each
+   * leaf is merged into a single {@code part-00000.orc} file; otherwise
+   * ({@code maxSizeBytes > 0}, already validated in {@link #main}) the files
+   * under a leaf are split into size-bounded part files exactly as in the flat
+   * multi-file mode.
+   */
+  private static void mergePreserveStructure(Configuration conf,
+                                             OrcFile.WriterOptions 
writerOptions,
+                                             Path inputRoot,
+                                             Path outputRoot,
+                                             long maxSizeBytes,
+                                             boolean ignoreExtension,
+                                             boolean overwrite) throws 
Exception {
+    FileSystem inFs = inputRoot.getFileSystem(conf);
+    if (!inFs.exists(inputRoot) || 
!inFs.getFileStatus(inputRoot).isDirectory()) {
+      throw new IllegalArgumentException(
+          "Input path must be an existing directory with --preserveStructure: 
" + inputRoot);
+    }
+
+    FileSystem outFs = outputRoot.getFileSystem(conf);
+    prepareOutputDir(outFs, outputRoot, overwrite, "--preserveStructure");
+
+    Path qualifiedInputRoot = inFs.makeQualified(inputRoot);
+    List<Path> leaves = new ArrayList<>();
+    collectLeafDirs(inFs, qualifiedInputRoot, ignoreExtension, leaves);

Review Comment:
   In `--preserveStructure` mode, `prepareOutputDir` is invoked before 
validating/collecting leaf directories. Since `prepareOutputDir` may delete an 
existing non-empty output directory when `--overwrite` is set, an invalid input 
tree (eg a mixed directory) can still trigger destructive deletion before the 
tool fails. Consider collecting/validating leaves first (or at least delaying 
deletion until after validation succeeds), and only then preparing/overwriting 
the output directory.



##########
java/tools/src/java/org/apache/orc/tools/MergeFiles.java:
##########
@@ -100,11 +155,333 @@ public static void main(Configuration conf, String[] 
args) throws Exception {
     }
   }
 
+  /**
+   * Multi-output behavior when --maxSize is set.
+   * Input files are grouped by cumulative raw file size; each group is merged 
into
+   * a separate part file (part-00000.orc, part-00001.orc, ...) under 
outputDir.
+   * A single file whose size already exceeds maxSizeBytes is placed in its 
own part.
+   */
+  private static void mergeIntoMultipleFiles(Configuration conf,
+                                              OrcFile.WriterOptions 
writerOptions,
+                                              List<LocatedFileStatus> 
inputStatuses,
+                                              List<Path> inputFiles,
+                                              Path outputDir,
+                                              long maxSizeBytes,
+                                              boolean overwrite) throws 
Exception {
+    DirMergeResult r = mergeBatchedIntoDir(conf, writerOptions, inputStatuses,
+        outputDir, maxSizeBytes, overwrite);
+
+    if (!r.unmergedFiles.isEmpty()) {
+      System.err.println("List of files that could not be merged:");
+      r.unmergedFiles.forEach(path -> System.err.println(path.toString()));
+    }
+
+    System.out.printf(
+        "Output path: %s, Input files size: %d, Merge files size: %d, Output 
files: %d%n",
+        outputDir, inputFiles.size(), r.mergedFileCount, r.partFileCount);
+    if (!r.unmergedFiles.isEmpty()) {
+      System.exit(1);
+    }
+  }
+
+  /**
+   * Preserve the relative directory structure of {@code inputRoot} under
+   * {@code outputRoot}: every directory containing ORC files (a "leaf") is 
merged
+   * independently, and its relative path is mirrored beneath {@code 
outputRoot}.
+   *
+   * <p>Leaves are located by a depth-first walk of {@code inputRoot}. A 
directory
+   * that contains both ORC files and subdirectories is considered ambiguous 
and
+   * raises an error. Hidden entries (names starting with {@code '_'} or
+   * {@code '.'}) are always skipped, matching Hive/Spark conventions for 
markers
+   * like {@code _SUCCESS}, {@code _committed_*} or {@code _temporary}.
+   *
+   * <p>If {@code --maxSize} was not supplied ({@code maxSizeBytes == 0}), each
+   * leaf is merged into a single {@code part-00000.orc} file; otherwise
+   * ({@code maxSizeBytes > 0}, already validated in {@link #main}) the files
+   * under a leaf are split into size-bounded part files exactly as in the flat
+   * multi-file mode.
+   */
+  private static void mergePreserveStructure(Configuration conf,
+                                             OrcFile.WriterOptions 
writerOptions,
+                                             Path inputRoot,
+                                             Path outputRoot,
+                                             long maxSizeBytes,
+                                             boolean ignoreExtension,
+                                             boolean overwrite) throws 
Exception {
+    FileSystem inFs = inputRoot.getFileSystem(conf);
+    if (!inFs.exists(inputRoot) || 
!inFs.getFileStatus(inputRoot).isDirectory()) {
+      throw new IllegalArgumentException(
+          "Input path must be an existing directory with --preserveStructure: 
" + inputRoot);
+    }
+
+    FileSystem outFs = outputRoot.getFileSystem(conf);
+    prepareOutputDir(outFs, outputRoot, overwrite, "--preserveStructure");
+
+    Path qualifiedInputRoot = inFs.makeQualified(inputRoot);
+    List<Path> leaves = new ArrayList<>();
+    collectLeafDirs(inFs, qualifiedInputRoot, ignoreExtension, leaves);
+
+    if (leaves.isEmpty()) {
+      System.err.println("No leaf directories containing ORC files found 
under: " + inputRoot);
+      System.exit(1);
+    }
+
+    long effectiveMax = maxSizeBytes > 0 ? maxSizeBytes : Long.MAX_VALUE;
+    int totalInputFiles = 0;
+    int totalMerged = 0;
+    int totalPartFiles = 0;
+    List<Path> allUnmerged = new ArrayList<>();
+
+    for (Path leaf : leaves) {
+      String relative = relativize(qualifiedInputRoot, 
inFs.makeQualified(leaf));
+      Path outputLeaf = relative.isEmpty() ? outputRoot : new Path(outputRoot, 
relative);
+
+      List<LocatedFileStatus> leafStatuses =
+          listLeafFiles(inFs, leaf, ignoreExtension);
+      if (leafStatuses.isEmpty()) {
+        continue;
+      }
+
+      // Each leaf's output directory lives under a freshly-prepared 
outputRoot,
+      // so we don't need (and shouldn't force) overwrite semantics here.
+      DirMergeResult r = mergeBatchedIntoDir(
+          conf, writerOptions, leafStatuses, outputLeaf, effectiveMax, false);
+
+      totalInputFiles += leafStatuses.size();
+      totalMerged += r.mergedFileCount;
+      totalPartFiles += r.partFileCount;
+      allUnmerged.addAll(r.unmergedFiles);
+
+      System.out.printf(
+          "Leaf: %s -> %s, Input files: %d, Merge files: %d, Output files: 
%d%n",
+          leaf, outputLeaf, leafStatuses.size(), r.mergedFileCount, 
r.partFileCount);
+    }
+
+    if (!allUnmerged.isEmpty()) {
+      System.err.println("List of files that could not be merged:");
+      allUnmerged.forEach(path -> System.err.println(path.toString()));
+    }
+
+    System.out.printf(
+        "Output root: %s, Leaves: %d, Total input files: %d, "
+            + "Total merge files: %d, Total output files: %d%n",
+        outputRoot, leaves.size(), totalInputFiles, totalMerged, 
totalPartFiles);
+    if (!allUnmerged.isEmpty()) {
+      System.exit(1);
+    }
+  }
+
+  /**
+   * Core per-directory merge: groups {@code inputStatuses} into size-bounded 
batches
+   * and writes each batch as {@code part-NNNNN.orc} under {@code outputDir}. 
This
+   * helper contains no I/O side effects on stdout/stderr or process exit; 
callers
+   * aggregate/report results themselves.
+   */
+  private static DirMergeResult mergeBatchedIntoDir(Configuration conf,
+                                                    OrcFile.WriterOptions 
writerOptions,
+                                                    List<LocatedFileStatus> 
inputStatuses,
+                                                    Path outputDir,
+                                                    long maxSizeBytes,
+                                                    boolean overwrite) throws 
Exception {
+    FileSystem outFs = outputDir.getFileSystem(conf);
+    prepareOutputDir(outFs, outputDir, overwrite, "multi-file merge");
+
+    List<List<Path>> batches = new ArrayList<>();
+    List<Path> currentBatch = new ArrayList<>();
+    long currentBatchSize = 0;
+
+    for (LocatedFileStatus status : inputStatuses) {
+      long fileSize = status.getLen();
+      if (!currentBatch.isEmpty() && currentBatchSize > maxSizeBytes - 
fileSize) {
+        batches.add(currentBatch);
+        currentBatch = new ArrayList<>();
+        currentBatchSize = 0;
+      }
+      currentBatch.add(status.getPath());
+      currentBatchSize += fileSize;
+    }
+    if (!currentBatch.isEmpty()) {
+      batches.add(currentBatch);
+    }
+
+    int totalMerged = 0;
+    List<Path> allUnmerged = new ArrayList<>();
+
+    for (int i = 0; i < batches.size(); i++) {
+      List<Path> batch = batches.get(i);
+      Path partOutput = new Path(outputDir, String.format(PART_FILE_FORMAT, 
i));
+      List<Path> merged = OrcFile.mergeFiles(partOutput, 
writerOptions.clone(), batch);
+      totalMerged += merged.size();
+
+      if (merged.size() != batch.size()) {
+        Set<Path> mergedSet = new HashSet<>(merged);
+        for (Path p : batch) {
+          if (!mergedSet.contains(p)) {
+            allUnmerged.add(p);
+          }
+        }
+      }
+    }
+
+    return new DirMergeResult(batches.size(), totalMerged, allUnmerged);
+  }

Review Comment:
   `mergeBatchedIntoDir` reports `partFileCount` as `batches.size()` and uses 
the batch index for the part filename. If `OrcFile.mergeFiles(...)` returns an 
empty list for a batch (eg all inputs are skipped as non-ORC/corrupt when 
`--ignoreExtension` is used), no part file is created, causing gaps like 
missing `part-00000.orc` and an incorrect output-file count in the summary. 
Track the number of actually written part files (and use a separate part index 
that only increments when at least one file is merged) so the part 
numbering/count matches the filesystem state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to