Copilot commented on code in PR #2601:
URL: https://github.com/apache/orc/pull/2601#discussion_r3128086156
##########
java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java:
##########
@@ -107,4 +110,348 @@ public void testMerge() throws Exception {
assertEquals(10000 + 20000, reader.getNumberOfRows());
}
}
+
+ /**
+ * Verifies that --maxSize splits input files into multiple part files under
the output
+ * directory. Three source files are created; a tight size threshold forces
them to be
+ * written into at least two part files.
+ */
+ @Test
+ public void testMergeWithMaxSize() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ // Create 3 source ORC files.
+ String[] sourceNames = {
+ workDir + File.separator + "ms-1.orc",
+ workDir + File.separator + "ms-2.orc",
+ workDir + File.separator + "ms-3.orc"
+ };
+ int[] rowCounts = {5000, 5000, 5000};
+ for (int f = 0; f < sourceNames.length; f++) {
+ Writer writer = OrcFile.createWriter(new Path(sourceNames[f]),
+ OrcFile.writerOptions(conf).setSchema(schema));
+ VectorizedRowBatch batch = schema.createRowBatch();
+ LongColumnVector x = (LongColumnVector) batch.cols[0];
+ BytesColumnVector y = (BytesColumnVector) batch.cols[1];
+ for (int r = 0; r < rowCounts[f]; ++r) {
+ int row = batch.size++;
+ x.vector[row] = r;
+ byte[] buffer = ("val-" + r).getBytes();
+ y.setRef(row, buffer, 0, buffer.length);
+ if (batch.size == batch.getMaxSize()) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+ }
+ if (batch.size != 0) {
+ writer.addRowBatch(batch);
+ }
+ writer.close();
+ }
+
+ long firstTwo = fs.getFileStatus(new Path(sourceNames[0])).getLen()
+ + fs.getFileStatus(new Path(sourceNames[1])).getLen();
+ long maxSize = firstTwo + 1;
+
+ Path outputDir = new Path(workDir + File.separator + "merge-multi-out");
+ fs.delete(outputDir, true);
+
+ PrintStream origOut = System.out;
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8));
+ try {
+ MergeFiles.main(conf, new String[]{workDir.toString(),
+ "--output", outputDir.toString(),
+ "--maxSize", String.valueOf(maxSize)});
+ System.out.flush();
+ } finally {
+ System.setOut(origOut);
+ }
+ String output = myOut.toString(StandardCharsets.UTF_8);
+ System.out.println(output);
+
+ assertTrue(output.contains("Input files size: 3"), "Should report 3 input
files");
+ assertTrue(output.contains("Merge files size: 3"), "All 3 files should be
merged");
+ assertTrue(fs.isDirectory(outputDir), "Output directory should be
created");
+
+ // Verify that multiple part files were created and total row count is
correct.
+ long totalRows = 0;
+ int partCount = 0;
+ for (int i = 0; ; i++) {
+ Path part = new Path(outputDir,
String.format(MergeFiles.PART_FILE_FORMAT, i));
+ if (!fs.exists(part)) {
+ break;
+ }
+ partCount++;
+ try (Reader reader = OrcFile.createReader(part,
OrcFile.readerOptions(conf))) {
+ totalRows += reader.getNumberOfRows();
+ }
+ }
+ assertEquals(2, partCount, "Expected exactly two output part files, got: "
+ partCount);
+ assertEquals(5000 + 5000 + 5000, totalRows, "Total row count across all
parts should match");
+ }
+
+ /**
+ * A single input file that is larger than --maxSize must still be emitted
as its
+ * own part file (we never split an input).
+ */
+ @Test
+ public void testMergeWithMaxSizeSingleGiantFile() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ Path inputDir = new Path(workDir, "giant-in");
+ fs.mkdirs(inputDir);
+ Path giant = new Path(inputDir, "giant.orc");
+ writeOrcFile(giant, schema, 20000);
+ long giantSize = fs.getFileStatus(giant).getLen();
+ assertTrue(giantSize > 0, "Giant source file should have non-zero size");
+
+ Path outputDir = new Path(workDir, "giant-out");
+ fs.delete(outputDir, true);
+
+ long maxSize = Math.max(1L, giantSize / 2);
+
+ PrintStream origOut = System.out;
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8));
+ try {
+ MergeFiles.main(conf, new String[]{inputDir.toString(),
+ "--output", outputDir.toString(),
+ "--maxSize", String.valueOf(maxSize)});
+ System.out.flush();
+ } finally {
+ System.setOut(origOut);
+ }
+ String output = myOut.toString(StandardCharsets.UTF_8);
+ System.out.println(output);
Review Comment:
These tests unconditionally print the captured tool output to stdout, which
can make CI logs noisy. Consider removing this println (or only printing on
assertion failure) since failures can include the captured output in the
assertion message.
```suggestion
```
##########
java/tools/src/java/org/apache/orc/tools/MergeFiles.java:
##########
@@ -55,28 +62,76 @@ public static void main(Configuration conf, String[] args)
throws Exception {
return;
Review Comment:
The message "output filename is null" is misleading here because the
condition also covers the empty-string case (and more broadly this is an output
*path* that can be a directory in multi-file modes). Consider changing it to
something like "--output is required" / "output path is missing" to better
guide CLI users.
##########
java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java:
##########
@@ -107,4 +110,348 @@ public void testMerge() throws Exception {
assertEquals(10000 + 20000, reader.getNumberOfRows());
}
}
+
+ /**
+ * Verifies that --maxSize splits input files into multiple part files under
the output
+ * directory. Three source files are created; a tight size threshold forces
them to be
+ * written into at least two part files.
+ */
+ @Test
+ public void testMergeWithMaxSize() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ // Create 3 source ORC files.
+ String[] sourceNames = {
+ workDir + File.separator + "ms-1.orc",
+ workDir + File.separator + "ms-2.orc",
+ workDir + File.separator + "ms-3.orc"
+ };
+ int[] rowCounts = {5000, 5000, 5000};
+ for (int f = 0; f < sourceNames.length; f++) {
+ Writer writer = OrcFile.createWriter(new Path(sourceNames[f]),
+ OrcFile.writerOptions(conf).setSchema(schema));
+ VectorizedRowBatch batch = schema.createRowBatch();
+ LongColumnVector x = (LongColumnVector) batch.cols[0];
+ BytesColumnVector y = (BytesColumnVector) batch.cols[1];
+ for (int r = 0; r < rowCounts[f]; ++r) {
+ int row = batch.size++;
+ x.vector[row] = r;
+ byte[] buffer = ("val-" + r).getBytes();
+ y.setRef(row, buffer, 0, buffer.length);
+ if (batch.size == batch.getMaxSize()) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+ }
+ if (batch.size != 0) {
+ writer.addRowBatch(batch);
+ }
+ writer.close();
+ }
+
+ long firstTwo = fs.getFileStatus(new Path(sourceNames[0])).getLen()
+ + fs.getFileStatus(new Path(sourceNames[1])).getLen();
+ long maxSize = firstTwo + 1;
+
+ Path outputDir = new Path(workDir + File.separator + "merge-multi-out");
+ fs.delete(outputDir, true);
+
+ PrintStream origOut = System.out;
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8));
+ try {
+ MergeFiles.main(conf, new String[]{workDir.toString(),
+ "--output", outputDir.toString(),
+ "--maxSize", String.valueOf(maxSize)});
+ System.out.flush();
+ } finally {
+ System.setOut(origOut);
+ }
+ String output = myOut.toString(StandardCharsets.UTF_8);
+ System.out.println(output);
+
+ assertTrue(output.contains("Input files size: 3"), "Should report 3 input
files");
+ assertTrue(output.contains("Merge files size: 3"), "All 3 files should be
merged");
+ assertTrue(fs.isDirectory(outputDir), "Output directory should be
created");
+
+ // Verify that multiple part files were created and total row count is
correct.
+ long totalRows = 0;
+ int partCount = 0;
+ for (int i = 0; ; i++) {
+ Path part = new Path(outputDir,
String.format(MergeFiles.PART_FILE_FORMAT, i));
+ if (!fs.exists(part)) {
+ break;
+ }
+ partCount++;
+ try (Reader reader = OrcFile.createReader(part,
OrcFile.readerOptions(conf))) {
+ totalRows += reader.getNumberOfRows();
+ }
+ }
+ assertEquals(2, partCount, "Expected exactly two output part files, got: "
+ partCount);
+ assertEquals(5000 + 5000 + 5000, totalRows, "Total row count across all
parts should match");
+ }
+
+ /**
+ * A single input file that is larger than --maxSize must still be emitted
as its
+ * own part file (we never split an input).
+ */
+ @Test
+ public void testMergeWithMaxSizeSingleGiantFile() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ Path inputDir = new Path(workDir, "giant-in");
+ fs.mkdirs(inputDir);
+ Path giant = new Path(inputDir, "giant.orc");
+ writeOrcFile(giant, schema, 20000);
+ long giantSize = fs.getFileStatus(giant).getLen();
+ assertTrue(giantSize > 0, "Giant source file should have non-zero size");
+
+ Path outputDir = new Path(workDir, "giant-out");
+ fs.delete(outputDir, true);
+
+ long maxSize = Math.max(1L, giantSize / 2);
+
+ PrintStream origOut = System.out;
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8));
+ try {
+ MergeFiles.main(conf, new String[]{inputDir.toString(),
+ "--output", outputDir.toString(),
+ "--maxSize", String.valueOf(maxSize)});
+ System.out.flush();
+ } finally {
+ System.setOut(origOut);
+ }
+ String output = myOut.toString(StandardCharsets.UTF_8);
+ System.out.println(output);
+
+ assertTrue(output.contains("Input files size: 1"), "Should report 1 input
file");
+ assertTrue(output.contains("Merge files size: 1"), "The giant file should
be merged");
+
+ Path part0 = new Path(outputDir,
String.format(MergeFiles.PART_FILE_FORMAT, 0));
+ Path part1 = new Path(outputDir,
String.format(MergeFiles.PART_FILE_FORMAT, 1));
+ assertTrue(fs.exists(part0), "Expected part-00000.orc");
+ assertFalse(fs.exists(part1), "Expected exactly one part file for a single
input");
+ try (Reader reader = OrcFile.createReader(part0,
OrcFile.readerOptions(conf))) {
+ assertEquals(20000, reader.getNumberOfRows(),
+ "Single giant file must be preserved intact in its own part");
+ }
+ }
+
+ /**
+ * By default running the merge against a non-empty output directory must
fail so
+ * that existing data is not silently destroyed. With --overwrite the
directory's
+ * contents are deleted before new part files are written.
+ */
+ @Test
+ public void testMergeWithMaxSizeOverwriteBehavior() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ Path inputDir = new Path(workDir, "ow-in");
+ fs.mkdirs(inputDir);
+ writeOrcFile(new Path(inputDir, "a.orc"), schema, 100);
+
+ Path outputDir = new Path(workDir, "ow-out");
+ fs.delete(outputDir, true);
+ fs.mkdirs(outputDir);
+ Path stale = new Path(outputDir, "stale.txt");
+ fs.create(stale).close();
+
+ String[] baseArgs = {inputDir.toString(),
+ "--output", outputDir.toString(),
+ "--maxSize", "1048576"};
+
+ // Without --overwrite: should reject non-empty output directory and leave
it untouched.
+ IllegalArgumentException ex = assertThrows(IllegalArgumentException.class,
+ () -> MergeFiles.main(conf, baseArgs));
+ assertTrue(ex.getMessage().contains("not empty") &&
+ ex.getMessage().contains("--overwrite"),
+ "Error should mention --overwrite: " + ex.getMessage());
+ assertTrue(fs.exists(stale),
+ "Existing content must be preserved when overwrite is refused");
+
+ // With --overwrite: existing content is cleared and fresh part files are
written.
+ String[] owArgs = new String[baseArgs.length + 1];
+ System.arraycopy(baseArgs, 0, owArgs, 0, baseArgs.length);
+ owArgs[baseArgs.length] = "--overwrite";
+
+ PrintStream origErr = System.err;
+ ByteArrayOutputStream myErr = new ByteArrayOutputStream();
+ System.setErr(new PrintStream(myErr, false, StandardCharsets.UTF_8));
+ try {
+ MergeFiles.main(conf, owArgs);
+ System.err.flush();
+ } finally {
+ System.setErr(origErr);
+ }
+ String errOut = myErr.toString(StandardCharsets.UTF_8);
+ assertTrue(errOut.contains("Overwriting existing non-empty output
directory"),
+ "Expected an overwrite warning on stderr, got:\n" + errOut);
+ assertFalse(fs.exists(stale), "Pre-existing file should be cleared by
--overwrite");
+
+ Path part0 = new Path(outputDir,
String.format(MergeFiles.PART_FILE_FORMAT, 0));
+ assertTrue(fs.exists(part0), "Expected part-00000.orc after overwrite");
+ try (Reader reader = OrcFile.createReader(part0,
OrcFile.readerOptions(conf))) {
+ assertEquals(100, reader.getNumberOfRows());
+ }
+ }
+
+ /**
+ * Creates an ORC file at the given path with {@code rowCount} rows of the
fixed
+ * {@code struct<x:int,y:string>} schema used across these tests.
+ */
+ private void writeOrcFile(Path path, TypeDescription schema, int rowCount)
throws Exception {
+ Writer writer = OrcFile.createWriter(path,
+ OrcFile.writerOptions(conf).setSchema(schema));
+ VectorizedRowBatch batch = schema.createRowBatch();
+ LongColumnVector x = (LongColumnVector) batch.cols[0];
+ BytesColumnVector y = (BytesColumnVector) batch.cols[1];
+ for (int r = 0; r < rowCount; ++r) {
+ int row = batch.size++;
+ x.vector[row] = r;
+ byte[] buffer = ("val-" + r).getBytes();
+ y.setRef(row, buffer, 0, buffer.length);
+ if (batch.size == batch.getMaxSize()) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+ }
+ if (batch.size != 0) {
+ writer.addRowBatch(batch);
+ }
+ writer.close();
+ }
+
+ /**
+ * Two-level nested partitions (d=.../h=01, h=02, h=03). With
--preserveStructure,
+ * each leaf partition must be merged independently and its relative path
mirrored
+ * under the output root. Data MUST NOT be cross-mixed between leaves.
+ */
+ @Test
+ public void testPreserveStructureTwoLevelPartitions() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ Path inputRoot = new Path(workDir, "ps-in");
+ Path partitionBase = new Path(inputRoot, "d=2025-04-25");
+
+ int[] hRows = {100, 200, 300};
+ for (int h = 0; h < hRows.length; h++) {
+ Path leaf = new Path(partitionBase, String.format("h=%02d", h + 1));
+ fs.mkdirs(leaf);
+ writeOrcFile(new Path(leaf, "a.orc"), schema, hRows[h]);
+ writeOrcFile(new Path(leaf, "b.orc"), schema, hRows[h] / 2);
+ }
+
+ Path outputRoot = new Path(workDir, "ps-out");
+ fs.delete(outputRoot, true);
+
+ PrintStream origOut = System.out;
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8));
+ try {
+ MergeFiles.main(conf, new String[]{inputRoot.toString(),
+ "--output", outputRoot.toString(),
+ "--preserveStructure"});
+ System.out.flush();
+ } finally {
+ System.setOut(origOut);
+ }
+ String output = myOut.toString(StandardCharsets.UTF_8);
+ System.out.println(output);
Review Comment:
These tests unconditionally print the captured tool output to stdout, which
can make test runs noisy. Consider removing this println (or only printing on
failure) and rely on assertion messages to surface the output when needed.
```suggestion
```
##########
java/tools/src/java/org/apache/orc/tools/MergeFiles.java:
##########
@@ -55,28 +62,76 @@ public static void main(Configuration conf, String[] args)
throws Exception {
return;
}
boolean ignoreExtension = cli.hasOption("ignoreExtension");
+ boolean preserveStructure = cli.hasOption("preserveStructure");
+ boolean overwrite = cli.hasOption("overwrite");
- List<Path> inputFiles = new ArrayList<>();
- OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf);
+ long maxSizeBytes = 0;
+ if (cli.hasOption("maxSize")) {
+ try {
+ maxSizeBytes = Long.parseLong(cli.getOptionValue("maxSize"));
+ } catch (NumberFormatException e) {
+ System.err.println("--maxSize requires a numeric value in bytes.");
+ System.exit(1);
+ }
+ if (maxSizeBytes <= 0) {
+ System.err.println("--maxSize must be a positive number of bytes.");
+ System.exit(1);
+ }
+ }
String[] files = cli.getArgs();
+ OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf);
+
+ if (preserveStructure) {
+ if (files.length != 1) {
+ System.err.println(
+ "--preserveStructure requires exactly one input directory.");
+ System.exit(1);
+ }
+ mergePreserveStructure(conf, writerOptions, new Path(files[0]),
+ new Path(outputFilename), maxSizeBytes, ignoreExtension, overwrite);
+ return;
+ }
+
+ List<LocatedFileStatus> inputStatuses = new ArrayList<>();
for (String root : files) {
Path rootPath = new Path(root);
FileSystem fs = rootPath.getFileSystem(conf);
for (RemoteIterator<LocatedFileStatus> itr = fs.listFiles(rootPath,
true); itr.hasNext(); ) {
LocatedFileStatus status = itr.next();
if (status.isFile() && (ignoreExtension ||
status.getPath().getName().endsWith(".orc"))) {
- inputFiles.add(status.getPath());
+ inputStatuses.add(status);
}
}
}
- if (inputFiles.isEmpty()) {
+ if (inputStatuses.isEmpty()) {
System.err.println("No files found.");
System.exit(1);
}
- List<Path> mergedFiles = OrcFile.mergeFiles(
- new Path(outputFilename), writerOptions, inputFiles);
+ inputStatuses.sort(Comparator.comparing(FileStatus::getPath));
+
+ List<Path> inputFiles = new ArrayList<>(inputStatuses.size());
+ for (LocatedFileStatus s : inputStatuses) {
+ inputFiles.add(s.getPath());
+ }
+
+ if (maxSizeBytes > 0) {
+ mergeIntoMultipleFiles(conf, writerOptions, inputStatuses, inputFiles,
+ new Path(outputFilename), maxSizeBytes, overwrite);
+ } else {
+ mergeIntoSingleFile(writerOptions, inputFiles, new Path(outputFilename),
outputFilename);
Review Comment:
With --maxSize, inputs are enumerated before the output directory is
validated/cleaned (prepareOutputDir happens later). If the output directory is
inside an input root and --overwrite is used, you can end up deleting files
that were already included in the input list (including stale prior outputs),
causing missing inputs or incorrect merges. Consider validating/cleaning the
output directory before scanning inputs and/or explicitly excluding the output
directory subtree from enumeration.
##########
java/tools/src/java/org/apache/orc/tools/MergeFiles.java:
##########
@@ -100,11 +155,346 @@ public static void main(Configuration conf, String[]
args) throws Exception {
}
}
+ /**
+ * Multi-output behavior when --maxSize is set.
+ * Input files are grouped by cumulative raw file size; each group is merged
into
+ * a separate part file (part-00000.orc, part-00001.orc, ...) under
outputDir.
+ * A single file whose size already exceeds maxSizeBytes is placed in its
own part.
+ */
+ private static void mergeIntoMultipleFiles(Configuration conf,
+ OrcFile.WriterOptions
writerOptions,
+ List<LocatedFileStatus>
inputStatuses,
+ List<Path> inputFiles,
+ Path outputDir,
+ long maxSizeBytes,
Review Comment:
mergeIntoMultipleFiles takes both inputStatuses and inputFiles, but
inputFiles is just a projection of inputStatuses and is only used for
inputFiles.size() in the summary. This duplication makes the API easier to
misuse (mismatched lists). Consider dropping the inputFiles parameter and using
inputStatuses.size() (or deriving paths locally) to keep the method contract
consistent.
##########
java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java:
##########
@@ -107,4 +110,348 @@ public void testMerge() throws Exception {
assertEquals(10000 + 20000, reader.getNumberOfRows());
}
}
+
+ /**
+ * Verifies that --maxSize splits input files into multiple part files under
the output
+ * directory. Three source files are created; a tight size threshold forces
them to be
+ * written into at least two part files.
+ */
+ @Test
+ public void testMergeWithMaxSize() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ // Create 3 source ORC files.
+ String[] sourceNames = {
+ workDir + File.separator + "ms-1.orc",
+ workDir + File.separator + "ms-2.orc",
+ workDir + File.separator + "ms-3.orc"
+ };
+ int[] rowCounts = {5000, 5000, 5000};
+ for (int f = 0; f < sourceNames.length; f++) {
+ Writer writer = OrcFile.createWriter(new Path(sourceNames[f]),
+ OrcFile.writerOptions(conf).setSchema(schema));
+ VectorizedRowBatch batch = schema.createRowBatch();
+ LongColumnVector x = (LongColumnVector) batch.cols[0];
+ BytesColumnVector y = (BytesColumnVector) batch.cols[1];
+ for (int r = 0; r < rowCounts[f]; ++r) {
+ int row = batch.size++;
+ x.vector[row] = r;
+ byte[] buffer = ("val-" + r).getBytes();
+ y.setRef(row, buffer, 0, buffer.length);
+ if (batch.size == batch.getMaxSize()) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+ }
+ if (batch.size != 0) {
+ writer.addRowBatch(batch);
+ }
+ writer.close();
+ }
+
+ long firstTwo = fs.getFileStatus(new Path(sourceNames[0])).getLen()
+ + fs.getFileStatus(new Path(sourceNames[1])).getLen();
+ long maxSize = firstTwo + 1;
+
+ Path outputDir = new Path(workDir + File.separator + "merge-multi-out");
+ fs.delete(outputDir, true);
+
+ PrintStream origOut = System.out;
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8));
+ try {
+ MergeFiles.main(conf, new String[]{workDir.toString(),
+ "--output", outputDir.toString(),
+ "--maxSize", String.valueOf(maxSize)});
+ System.out.flush();
+ } finally {
+ System.setOut(origOut);
+ }
+ String output = myOut.toString(StandardCharsets.UTF_8);
+ System.out.println(output);
Review Comment:
These tests unconditionally print the captured tool output to stdout, which
can make CI logs noisy and harder to read. Consider removing this println (or
only printing on assertion failure) since the assertions already include
helpful context messages.
```suggestion
```
##########
java/tools/src/test/org/apache/orc/tools/TestMergeFiles.java:
##########
@@ -107,4 +110,348 @@ public void testMerge() throws Exception {
assertEquals(10000 + 20000, reader.getNumberOfRows());
}
}
+
+ /**
+ * Verifies that --maxSize splits input files into multiple part files under
the output
+ * directory. Three source files are created; a tight size threshold forces
them to be
+ * written into at least two part files.
+ */
+ @Test
+ public void testMergeWithMaxSize() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ // Create 3 source ORC files.
+ String[] sourceNames = {
+ workDir + File.separator + "ms-1.orc",
+ workDir + File.separator + "ms-2.orc",
+ workDir + File.separator + "ms-3.orc"
+ };
+ int[] rowCounts = {5000, 5000, 5000};
+ for (int f = 0; f < sourceNames.length; f++) {
+ Writer writer = OrcFile.createWriter(new Path(sourceNames[f]),
+ OrcFile.writerOptions(conf).setSchema(schema));
+ VectorizedRowBatch batch = schema.createRowBatch();
+ LongColumnVector x = (LongColumnVector) batch.cols[0];
+ BytesColumnVector y = (BytesColumnVector) batch.cols[1];
+ for (int r = 0; r < rowCounts[f]; ++r) {
+ int row = batch.size++;
+ x.vector[row] = r;
+ byte[] buffer = ("val-" + r).getBytes();
+ y.setRef(row, buffer, 0, buffer.length);
+ if (batch.size == batch.getMaxSize()) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+ }
+ if (batch.size != 0) {
+ writer.addRowBatch(batch);
+ }
+ writer.close();
+ }
+
+ long firstTwo = fs.getFileStatus(new Path(sourceNames[0])).getLen()
+ + fs.getFileStatus(new Path(sourceNames[1])).getLen();
+ long maxSize = firstTwo + 1;
+
+ Path outputDir = new Path(workDir + File.separator + "merge-multi-out");
+ fs.delete(outputDir, true);
+
+ PrintStream origOut = System.out;
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8));
+ try {
+ MergeFiles.main(conf, new String[]{workDir.toString(),
+ "--output", outputDir.toString(),
+ "--maxSize", String.valueOf(maxSize)});
+ System.out.flush();
+ } finally {
+ System.setOut(origOut);
+ }
+ String output = myOut.toString(StandardCharsets.UTF_8);
+ System.out.println(output);
+
+ assertTrue(output.contains("Input files size: 3"), "Should report 3 input
files");
+ assertTrue(output.contains("Merge files size: 3"), "All 3 files should be
merged");
+ assertTrue(fs.isDirectory(outputDir), "Output directory should be
created");
+
+ // Verify that multiple part files were created and total row count is
correct.
+ long totalRows = 0;
+ int partCount = 0;
+ for (int i = 0; ; i++) {
+ Path part = new Path(outputDir,
String.format(MergeFiles.PART_FILE_FORMAT, i));
+ if (!fs.exists(part)) {
+ break;
+ }
+ partCount++;
+ try (Reader reader = OrcFile.createReader(part,
OrcFile.readerOptions(conf))) {
+ totalRows += reader.getNumberOfRows();
+ }
+ }
+ assertEquals(2, partCount, "Expected exactly two output part files, got: "
+ partCount);
+ assertEquals(5000 + 5000 + 5000, totalRows, "Total row count across all
parts should match");
+ }
+
+ /**
+ * A single input file that is larger than --maxSize must still be emitted
as its
+ * own part file (we never split an input).
+ */
+ @Test
+ public void testMergeWithMaxSizeSingleGiantFile() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ Path inputDir = new Path(workDir, "giant-in");
+ fs.mkdirs(inputDir);
+ Path giant = new Path(inputDir, "giant.orc");
+ writeOrcFile(giant, schema, 20000);
+ long giantSize = fs.getFileStatus(giant).getLen();
+ assertTrue(giantSize > 0, "Giant source file should have non-zero size");
+
+ Path outputDir = new Path(workDir, "giant-out");
+ fs.delete(outputDir, true);
+
+ long maxSize = Math.max(1L, giantSize / 2);
+
+ PrintStream origOut = System.out;
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8));
+ try {
+ MergeFiles.main(conf, new String[]{inputDir.toString(),
+ "--output", outputDir.toString(),
+ "--maxSize", String.valueOf(maxSize)});
+ System.out.flush();
+ } finally {
+ System.setOut(origOut);
+ }
+ String output = myOut.toString(StandardCharsets.UTF_8);
+ System.out.println(output);
+
+ assertTrue(output.contains("Input files size: 1"), "Should report 1 input
file");
+ assertTrue(output.contains("Merge files size: 1"), "The giant file should
be merged");
+
+ Path part0 = new Path(outputDir,
String.format(MergeFiles.PART_FILE_FORMAT, 0));
+ Path part1 = new Path(outputDir,
String.format(MergeFiles.PART_FILE_FORMAT, 1));
+ assertTrue(fs.exists(part0), "Expected part-00000.orc");
+ assertFalse(fs.exists(part1), "Expected exactly one part file for a single
input");
+ try (Reader reader = OrcFile.createReader(part0,
OrcFile.readerOptions(conf))) {
+ assertEquals(20000, reader.getNumberOfRows(),
+ "Single giant file must be preserved intact in its own part");
+ }
+ }
+
+ /**
+ * By default running the merge against a non-empty output directory must
fail so
+ * that existing data is not silently destroyed. With --overwrite the
directory's
+ * contents are deleted before new part files are written.
+ */
+ @Test
+ public void testMergeWithMaxSizeOverwriteBehavior() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ Path inputDir = new Path(workDir, "ow-in");
+ fs.mkdirs(inputDir);
+ writeOrcFile(new Path(inputDir, "a.orc"), schema, 100);
+
+ Path outputDir = new Path(workDir, "ow-out");
+ fs.delete(outputDir, true);
+ fs.mkdirs(outputDir);
+ Path stale = new Path(outputDir, "stale.txt");
+ fs.create(stale).close();
+
+ String[] baseArgs = {inputDir.toString(),
+ "--output", outputDir.toString(),
+ "--maxSize", "1048576"};
+
+ // Without --overwrite: should reject non-empty output directory and leave
it untouched.
+ IllegalArgumentException ex = assertThrows(IllegalArgumentException.class,
+ () -> MergeFiles.main(conf, baseArgs));
+ assertTrue(ex.getMessage().contains("not empty") &&
+ ex.getMessage().contains("--overwrite"),
+ "Error should mention --overwrite: " + ex.getMessage());
+ assertTrue(fs.exists(stale),
+ "Existing content must be preserved when overwrite is refused");
+
+ // With --overwrite: existing content is cleared and fresh part files are
written.
+ String[] owArgs = new String[baseArgs.length + 1];
+ System.arraycopy(baseArgs, 0, owArgs, 0, baseArgs.length);
+ owArgs[baseArgs.length] = "--overwrite";
+
+ PrintStream origErr = System.err;
+ ByteArrayOutputStream myErr = new ByteArrayOutputStream();
+ System.setErr(new PrintStream(myErr, false, StandardCharsets.UTF_8));
+ try {
+ MergeFiles.main(conf, owArgs);
+ System.err.flush();
+ } finally {
+ System.setErr(origErr);
+ }
+ String errOut = myErr.toString(StandardCharsets.UTF_8);
+ assertTrue(errOut.contains("Overwriting existing non-empty output
directory"),
+ "Expected an overwrite warning on stderr, got:\n" + errOut);
+ assertFalse(fs.exists(stale), "Pre-existing file should be cleared by
--overwrite");
+
+ Path part0 = new Path(outputDir,
String.format(MergeFiles.PART_FILE_FORMAT, 0));
+ assertTrue(fs.exists(part0), "Expected part-00000.orc after overwrite");
+ try (Reader reader = OrcFile.createReader(part0,
OrcFile.readerOptions(conf))) {
+ assertEquals(100, reader.getNumberOfRows());
+ }
+ }
+
+ /**
+ * Creates an ORC file at the given path with {@code rowCount} rows of the
fixed
+ * {@code struct<x:int,y:string>} schema used across these tests.
+ */
+ private void writeOrcFile(Path path, TypeDescription schema, int rowCount)
throws Exception {
+ Writer writer = OrcFile.createWriter(path,
+ OrcFile.writerOptions(conf).setSchema(schema));
+ VectorizedRowBatch batch = schema.createRowBatch();
+ LongColumnVector x = (LongColumnVector) batch.cols[0];
+ BytesColumnVector y = (BytesColumnVector) batch.cols[1];
+ for (int r = 0; r < rowCount; ++r) {
+ int row = batch.size++;
+ x.vector[row] = r;
+ byte[] buffer = ("val-" + r).getBytes();
+ y.setRef(row, buffer, 0, buffer.length);
+ if (batch.size == batch.getMaxSize()) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+ }
+ if (batch.size != 0) {
+ writer.addRowBatch(batch);
+ }
+ writer.close();
+ }
+
+ /**
+ * Two-level nested partitions (d=.../h=01, h=02, h=03). With
--preserveStructure,
+ * each leaf partition must be merged independently and its relative path
mirrored
+ * under the output root. Data MUST NOT be cross-mixed between leaves.
+ */
+ @Test
+ public void testPreserveStructureTwoLevelPartitions() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ Path inputRoot = new Path(workDir, "ps-in");
+ Path partitionBase = new Path(inputRoot, "d=2025-04-25");
+
+ int[] hRows = {100, 200, 300};
+ for (int h = 0; h < hRows.length; h++) {
+ Path leaf = new Path(partitionBase, String.format("h=%02d", h + 1));
+ fs.mkdirs(leaf);
+ writeOrcFile(new Path(leaf, "a.orc"), schema, hRows[h]);
+ writeOrcFile(new Path(leaf, "b.orc"), schema, hRows[h] / 2);
+ }
+
+ Path outputRoot = new Path(workDir, "ps-out");
+ fs.delete(outputRoot, true);
+
+ PrintStream origOut = System.out;
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8));
+ try {
+ MergeFiles.main(conf, new String[]{inputRoot.toString(),
+ "--output", outputRoot.toString(),
+ "--preserveStructure"});
+ System.out.flush();
+ } finally {
+ System.setOut(origOut);
+ }
+ String output = myOut.toString(StandardCharsets.UTF_8);
+ System.out.println(output);
+
+ assertTrue(output.contains("Leaves: 3"),
+ "Expected 3 leaves in summary, got:\n" + output);
+
+ for (int h = 0; h < hRows.length; h++) {
+ Path outLeaf = new Path(outputRoot, "d=2025-04-25/" +
String.format("h=%02d", h + 1));
+ assertTrue(fs.isDirectory(outLeaf),
+ "Expected mirrored leaf directory to exist: " + outLeaf);
+ Path part0 = new Path(outLeaf,
String.format(MergeFiles.PART_FILE_FORMAT, 0));
+ assertTrue(fs.exists(part0),
+ "Expected part-00000.orc under leaf: " + outLeaf);
+ long expectedRows = hRows[h] + hRows[h] / 2;
+ try (Reader reader = OrcFile.createReader(part0,
OrcFile.readerOptions(conf))) {
+ assertEquals(expectedRows, reader.getNumberOfRows(),
+ "Row count mismatch for " + outLeaf);
+ assertEquals(schema, reader.getSchema());
+ }
+ }
+ }
+
+ /**
+ * A directory containing BOTH ORC files and subdirectories is ambiguous
under
+ * --preserveStructure and must be rejected before any output is written.
+ */
+ @Test
+ public void testPreserveStructureRejectsMixedDirectory() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ Path inputRoot = new Path(workDir, "psmix-in");
+ fs.mkdirs(inputRoot);
+ writeOrcFile(new Path(inputRoot, "stray.orc"), schema, 10);
+ Path subLeaf = new Path(inputRoot, "h=01");
+ fs.mkdirs(subLeaf);
+ writeOrcFile(new Path(subLeaf, "a.orc"), schema, 20);
+
+ Path outputRoot = new Path(workDir, "psmix-out");
+ fs.delete(outputRoot, true);
+
+ IllegalArgumentException ex = assertThrows(IllegalArgumentException.class,
+ () -> MergeFiles.main(conf, new String[]{inputRoot.toString(),
+ "--output", outputRoot.toString(),
+ "--preserveStructure"}));
+ assertTrue(ex.getMessage().contains("both ORC files and subdirectories"),
+ "Unexpected error message: " + ex.getMessage());
+ }
+
+ /**
+ * Hidden files/directories (names starting with '_' or '.') must always be
+ * ignored, so files like _SUCCESS or _temporary/*.orc don't pollute the
merged
+ * output.
+ */
+ @Test
+ public void testPreserveStructureSkipsHidden() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+
+ Path inputRoot = new Path(workDir, "pshidden-in");
+ Path leaf = new Path(inputRoot, "h=01");
+ fs.mkdirs(leaf);
+ writeOrcFile(new Path(leaf, "data.orc"), schema, 30);
+ // _SUCCESS-style hidden file next to the real data — should be ignored by
default.
+ writeOrcFile(new Path(leaf, "_hidden.orc"), schema, 999);
+ // Hidden sibling directory (e.g. _temporary) whose contents would
otherwise
+ // appear as a second leaf — should be skipped entirely.
+ Path hiddenDir = new Path(inputRoot, "_temporary");
+ fs.mkdirs(hiddenDir);
+ writeOrcFile(new Path(hiddenDir, "a.orc"), schema, 7);
+
+ Path outputRoot = new Path(workDir, "pshidden-out");
+ fs.delete(outputRoot, true);
+
+ PrintStream origOut = System.out;
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8));
+ try {
+ MergeFiles.main(conf, new String[]{inputRoot.toString(),
+ "--output", outputRoot.toString(),
+ "--preserveStructure"});
+ System.out.flush();
+ } finally {
+ System.setOut(origOut);
+ }
+ String output = myOut.toString(StandardCharsets.UTF_8);
+ System.out.println(output);
+
+ assertTrue(output.contains("Leaves: 1"),
+ "Expected exactly 1 leaf (hidden dir ignored), got:\n" + output);
+ assertFalse(fs.exists(new Path(outputRoot, "_temporary")),
+ "Hidden input directory should not be mirrored to output");
+
+ Path part0 = new Path(outputRoot, "h=01/" +
String.format(MergeFiles.PART_FILE_FORMAT, 0));
+ assertTrue(fs.exists(part0));
+ try (Reader reader = OrcFile.createReader(part0,
OrcFile.readerOptions(conf))) {
+ assertEquals(30, reader.getNumberOfRows(),
+ "Hidden _hidden.orc rows must not be merged in");
Review Comment:
These tests unconditionally print the captured tool output to stdout, which
can add noise to CI logs. Consider removing this println (or printing only when
a test fails) and include output in assertion failure messages instead.
```suggestion
assertTrue(output.contains("Leaves: 1"),
"Expected exactly 1 leaf (hidden dir ignored), got:\n" + output);
assertFalse(fs.exists(new Path(outputRoot, "_temporary")),
"Hidden input directory should not be mirrored to output.\nTool
output:\n" + output);
Path part0 = new Path(outputRoot, "h=01/" +
String.format(MergeFiles.PART_FILE_FORMAT, 0));
assertTrue(fs.exists(part0),
"Expected merged output file to exist: " + part0 + "\nTool
output:\n" + output);
try (Reader reader = OrcFile.createReader(part0,
OrcFile.readerOptions(conf))) {
assertEquals(30, reader.getNumberOfRows(),
"Hidden _hidden.orc rows must not be merged in.\nTool output:\n" +
output);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]