Copilot commented on code in PR #2601:
URL: https://github.com/apache/orc/pull/2601#discussion_r3123026909
##########
java/tools/src/java/org/apache/orc/tools/MergeFiles.java:
##########
@@ -100,11 +155,333 @@ public static void main(Configuration conf, String[]
args) throws Exception {
}
}
+ /**
+ * Multi-output behavior when --maxSize is set.
+ * Input files are grouped by cumulative raw file size; each group is merged
into
+ * a separate part file (part-00000.orc, part-00001.orc, ...) under
outputDir.
+ * A single file whose size already exceeds maxSizeBytes is placed in its
own part.
+ */
+ private static void mergeIntoMultipleFiles(Configuration conf,
+ OrcFile.WriterOptions
writerOptions,
+ List<LocatedFileStatus>
inputStatuses,
+ List<Path> inputFiles,
+ Path outputDir,
+ long maxSizeBytes,
+ boolean overwrite) throws
Exception {
+ DirMergeResult r = mergeBatchedIntoDir(conf, writerOptions, inputStatuses,
+ outputDir, maxSizeBytes, overwrite);
+
+ if (!r.unmergedFiles.isEmpty()) {
+ System.err.println("List of files that could not be merged:");
+ r.unmergedFiles.forEach(path -> System.err.println(path.toString()));
+ }
+
+ System.out.printf(
+ "Output path: %s, Input files size: %d, Merge files size: %d, Output
files: %d%n",
+ outputDir, inputFiles.size(), r.mergedFileCount, r.partFileCount);
+ if (!r.unmergedFiles.isEmpty()) {
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Preserve the relative directory structure of {@code inputRoot} under
+ * {@code outputRoot}: every directory containing ORC files (a "leaf") is
merged
+ * independently, and its relative path is mirrored beneath {@code
outputRoot}.
+ *
+ * <p>Leaves are located by a depth-first walk of {@code inputRoot}. A
directory
+ * that contains both ORC files and subdirectories is considered ambiguous
and
+ * raises an error. Hidden entries (names starting with {@code '_'} or
+ * {@code '.'}) are always skipped, matching Hive/Spark conventions for
markers
+ * like {@code _SUCCESS}, {@code _committed_*} or {@code _temporary}.
+ *
+ * <p>If {@code --maxSize} was not supplied ({@code maxSizeBytes == 0}), each
+ * leaf is merged into a single {@code part-00000.orc} file; otherwise
+ * ({@code maxSizeBytes > 0}, already validated in {@link #main}) the files
+ * under a leaf are split into size-bounded part files exactly as in the flat
+ * multi-file mode.
+ */
+ private static void mergePreserveStructure(Configuration conf,
+ OrcFile.WriterOptions
writerOptions,
+ Path inputRoot,
+ Path outputRoot,
+ long maxSizeBytes,
+ boolean ignoreExtension,
+ boolean overwrite) throws
Exception {
+ FileSystem inFs = inputRoot.getFileSystem(conf);
+ if (!inFs.exists(inputRoot) ||
!inFs.getFileStatus(inputRoot).isDirectory()) {
+ throw new IllegalArgumentException(
+ "Input path must be an existing directory with --preserveStructure:
" + inputRoot);
+ }
+
+ FileSystem outFs = outputRoot.getFileSystem(conf);
+ prepareOutputDir(outFs, outputRoot, overwrite, "--preserveStructure");
+
+ Path qualifiedInputRoot = inFs.makeQualified(inputRoot);
+ List<Path> leaves = new ArrayList<>();
+ collectLeafDirs(inFs, qualifiedInputRoot, ignoreExtension, leaves);
Review Comment:
In `--preserveStructure` mode, `prepareOutputDir` is invoked before
validating/collecting leaf directories. Since `prepareOutputDir` may delete an
existing non-empty output directory when `--overwrite` is set, an invalid input
tree (eg a mixed directory) can still trigger destructive deletion before the
tool fails. Consider collecting/validating leaves first (or at least delaying
deletion until after validation succeeds), and only then preparing/overwriting
the output directory.
##########
java/tools/src/java/org/apache/orc/tools/MergeFiles.java:
##########
@@ -100,11 +155,333 @@ public static void main(Configuration conf, String[]
args) throws Exception {
}
}
+ /**
+ * Multi-output behavior when --maxSize is set.
+ * Input files are grouped by cumulative raw file size; each group is merged
into
+ * a separate part file (part-00000.orc, part-00001.orc, ...) under
outputDir.
+ * A single file whose size already exceeds maxSizeBytes is placed in its
own part.
+ */
+ private static void mergeIntoMultipleFiles(Configuration conf,
+ OrcFile.WriterOptions
writerOptions,
+ List<LocatedFileStatus>
inputStatuses,
+ List<Path> inputFiles,
+ Path outputDir,
+ long maxSizeBytes,
+ boolean overwrite) throws
Exception {
+ DirMergeResult r = mergeBatchedIntoDir(conf, writerOptions, inputStatuses,
+ outputDir, maxSizeBytes, overwrite);
+
+ if (!r.unmergedFiles.isEmpty()) {
+ System.err.println("List of files that could not be merged:");
+ r.unmergedFiles.forEach(path -> System.err.println(path.toString()));
+ }
+
+ System.out.printf(
+ "Output path: %s, Input files size: %d, Merge files size: %d, Output
files: %d%n",
+ outputDir, inputFiles.size(), r.mergedFileCount, r.partFileCount);
+ if (!r.unmergedFiles.isEmpty()) {
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Preserve the relative directory structure of {@code inputRoot} under
+ * {@code outputRoot}: every directory containing ORC files (a "leaf") is
merged
+ * independently, and its relative path is mirrored beneath {@code
outputRoot}.
+ *
+ * <p>Leaves are located by a depth-first walk of {@code inputRoot}. A
directory
+ * that contains both ORC files and subdirectories is considered ambiguous
and
+ * raises an error. Hidden entries (names starting with {@code '_'} or
+ * {@code '.'}) are always skipped, matching Hive/Spark conventions for
markers
+ * like {@code _SUCCESS}, {@code _committed_*} or {@code _temporary}.
+ *
+ * <p>If {@code --maxSize} was not supplied ({@code maxSizeBytes == 0}), each
+ * leaf is merged into a single {@code part-00000.orc} file; otherwise
+ * ({@code maxSizeBytes > 0}, already validated in {@link #main}) the files
+ * under a leaf are split into size-bounded part files exactly as in the flat
+ * multi-file mode.
+ */
+ private static void mergePreserveStructure(Configuration conf,
+ OrcFile.WriterOptions
writerOptions,
+ Path inputRoot,
+ Path outputRoot,
+ long maxSizeBytes,
+ boolean ignoreExtension,
+ boolean overwrite) throws
Exception {
+ FileSystem inFs = inputRoot.getFileSystem(conf);
+ if (!inFs.exists(inputRoot) ||
!inFs.getFileStatus(inputRoot).isDirectory()) {
+ throw new IllegalArgumentException(
+ "Input path must be an existing directory with --preserveStructure:
" + inputRoot);
+ }
+
+ FileSystem outFs = outputRoot.getFileSystem(conf);
+ prepareOutputDir(outFs, outputRoot, overwrite, "--preserveStructure");
+
+ Path qualifiedInputRoot = inFs.makeQualified(inputRoot);
+ List<Path> leaves = new ArrayList<>();
+ collectLeafDirs(inFs, qualifiedInputRoot, ignoreExtension, leaves);
+
+ if (leaves.isEmpty()) {
+ System.err.println("No leaf directories containing ORC files found
under: " + inputRoot);
+ System.exit(1);
+ }
+
+ long effectiveMax = maxSizeBytes > 0 ? maxSizeBytes : Long.MAX_VALUE;
+ int totalInputFiles = 0;
+ int totalMerged = 0;
+ int totalPartFiles = 0;
+ List<Path> allUnmerged = new ArrayList<>();
+
+ for (Path leaf : leaves) {
+ String relative = relativize(qualifiedInputRoot,
inFs.makeQualified(leaf));
+ Path outputLeaf = relative.isEmpty() ? outputRoot : new Path(outputRoot,
relative);
+
+ List<LocatedFileStatus> leafStatuses =
+ listLeafFiles(inFs, leaf, ignoreExtension);
+ if (leafStatuses.isEmpty()) {
+ continue;
+ }
+
+ // Each leaf's output directory lives under a freshly-prepared
outputRoot,
+ // so we don't need (and shouldn't force) overwrite semantics here.
+ DirMergeResult r = mergeBatchedIntoDir(
+ conf, writerOptions, leafStatuses, outputLeaf, effectiveMax, false);
+
+ totalInputFiles += leafStatuses.size();
+ totalMerged += r.mergedFileCount;
+ totalPartFiles += r.partFileCount;
+ allUnmerged.addAll(r.unmergedFiles);
+
+ System.out.printf(
+ "Leaf: %s -> %s, Input files: %d, Merge files: %d, Output files:
%d%n",
+ leaf, outputLeaf, leafStatuses.size(), r.mergedFileCount,
r.partFileCount);
+ }
+
+ if (!allUnmerged.isEmpty()) {
+ System.err.println("List of files that could not be merged:");
+ allUnmerged.forEach(path -> System.err.println(path.toString()));
+ }
+
+ System.out.printf(
+ "Output root: %s, Leaves: %d, Total input files: %d, "
+ + "Total merge files: %d, Total output files: %d%n",
+ outputRoot, leaves.size(), totalInputFiles, totalMerged,
totalPartFiles);
+ if (!allUnmerged.isEmpty()) {
+ System.exit(1);
+ }
+ }
+
+ /**
+ * Core per-directory merge: groups {@code inputStatuses} into size-bounded
batches
+ * and writes each batch as {@code part-NNNNN.orc} under {@code outputDir}.
This
+ * helper contains no I/O side effects on stdout/stderr or process exit;
callers
+ * aggregate/report results themselves.
+ */
+ private static DirMergeResult mergeBatchedIntoDir(Configuration conf,
+ OrcFile.WriterOptions
writerOptions,
+ List<LocatedFileStatus>
inputStatuses,
+ Path outputDir,
+ long maxSizeBytes,
+ boolean overwrite) throws
Exception {
+ FileSystem outFs = outputDir.getFileSystem(conf);
+ prepareOutputDir(outFs, outputDir, overwrite, "multi-file merge");
+
+ List<List<Path>> batches = new ArrayList<>();
+ List<Path> currentBatch = new ArrayList<>();
+ long currentBatchSize = 0;
+
+ for (LocatedFileStatus status : inputStatuses) {
+ long fileSize = status.getLen();
+ if (!currentBatch.isEmpty() && currentBatchSize > maxSizeBytes -
fileSize) {
+ batches.add(currentBatch);
+ currentBatch = new ArrayList<>();
+ currentBatchSize = 0;
+ }
+ currentBatch.add(status.getPath());
+ currentBatchSize += fileSize;
+ }
+ if (!currentBatch.isEmpty()) {
+ batches.add(currentBatch);
+ }
+
+ int totalMerged = 0;
+ List<Path> allUnmerged = new ArrayList<>();
+
+ for (int i = 0; i < batches.size(); i++) {
+ List<Path> batch = batches.get(i);
+ Path partOutput = new Path(outputDir, String.format(PART_FILE_FORMAT,
i));
+ List<Path> merged = OrcFile.mergeFiles(partOutput,
writerOptions.clone(), batch);
+ totalMerged += merged.size();
+
+ if (merged.size() != batch.size()) {
+ Set<Path> mergedSet = new HashSet<>(merged);
+ for (Path p : batch) {
+ if (!mergedSet.contains(p)) {
+ allUnmerged.add(p);
+ }
+ }
+ }
+ }
+
+ return new DirMergeResult(batches.size(), totalMerged, allUnmerged);
+ }
Review Comment:
`mergeBatchedIntoDir` reports `partFileCount` as `batches.size()` and uses
the batch index for the part filename. If `OrcFile.mergeFiles(...)` returns an
empty list for a batch (eg all inputs are skipped as non-ORC/corrupt when
`--ignoreExtension` is used), no part file is created, causing gaps like
missing `part-00000.orc` and an incorrect output-file count in the summary.
Track the number of actually written part files (and use a separate part index
that only increments when at least one file is merged) so the part
numbering/count matches the filesystem state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]