This is an automated email from the ASF dual-hosted git repository.
william pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 72a886ed4 ORC-1644: Add `merge` tool to merge multiple ORC files into
a single ORC file
72a886ed4 is described below
commit 72a886ed48501f066d8077b691355c5ea20d1097
Author: sychen <[email protected]>
AuthorDate: Sun Mar 10 15:00:50 2024 -0700
ORC-1644: Add `merge` tool to merge multiple ORC files into a single ORC
file
### What changes were proposed in this pull request?
This PR aims to add merge tool to merges multiple ORC files to produce a
single ORC file.
### Why are the changes needed?
In the ORC 1.3.0 version, the `OrcFile#mergeFiles` method was introduced by
[ORC-132](https://issues.apache.org/jira/browse/ORC-132) , which supports
merging multiple ORC files into one ORC file.
However, when merging, we need to write Java code to call it.
There is no simple command that can be called directly.
### How was this patch tested?
Add UT
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #1834 from cxzl25/ORC-1644.
Authored-by: sychen <[email protected]>
Signed-off-by: William Hyun <[email protected]>
(cherry picked from commit b031d3731d008844e3d0756aadbd493ed781f622)
Signed-off-by: William Hyun <[email protected]>
---
.../src/java/org/apache/orc/tools/Driver.java | 4 +
.../src/java/org/apache/orc/tools/MergeFiles.java | 122 +++++++++++++++++++++
.../org/apache/orc/tools/merge/TestMergeFiles.java | 112 +++++++++++++++++++
site/_docs/java-tools.md | 9 ++
4 files changed, 247 insertions(+)
diff --git a/java/tools/src/java/org/apache/orc/tools/Driver.java
b/java/tools/src/java/org/apache/orc/tools/Driver.java
index c846e875b..95e2d8728 100644
--- a/java/tools/src/java/org/apache/orc/tools/Driver.java
+++ b/java/tools/src/java/org/apache/orc/tools/Driver.java
@@ -91,6 +91,7 @@ public class Driver {
System.err.println(" data - print the data from the ORC file");
System.err.println(" json-schema - scan JSON files to determine their
schema");
System.err.println(" key - print information about the keys");
+ System.err.println(" merge - merge multiple ORC files into a single
ORC file");
System.err.println(" meta - print the metadata about the ORC file");
System.err.println(" scan - scan the ORC file");
System.err.println(" sizes - list size on disk of each column");
@@ -120,6 +121,9 @@ public class Driver {
case "key":
KeyTool.main(conf, options.commandArgs);
break;
+ case "merge":
+ MergeFiles.main(conf, options.commandArgs);
+ break;
case "meta":
FileDump.main(conf, options.commandArgs);
break;
diff --git a/java/tools/src/java/org/apache/orc/tools/MergeFiles.java
b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java
new file mode 100644
index 000000000..8de15aaf4
--- /dev/null
+++ b/java/tools/src/java/org/apache/orc/tools/MergeFiles.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.orc.OrcFile;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Merge multiple ORC files that all have the same schema into a single ORC
file.
+ */
+public class MergeFiles {
+
+ public static void main(Configuration conf, String[] args) throws Exception {
+ Options opts = createOptions();
+ CommandLine cli = new DefaultParser().parse(opts, args);
+ HelpFormatter formatter = new HelpFormatter();
+ if (cli.hasOption('h')) {
+ formatter.printHelp("merge", opts);
+ return;
+ }
+ String outputFilename = cli.getOptionValue("output");
+ if (outputFilename == null || outputFilename.isEmpty()) {
+ System.err.println("output filename is null");
+ formatter.printHelp("merge", opts);
+ return;
+ }
+ boolean ignoreExtension = cli.hasOption("ignoreExtension");
+
+ List<Path> inputFiles = new ArrayList<>();
+ OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(conf);
+
+ String[] files = cli.getArgs();
+ for (String root : files) {
+ Path rootPath = new Path(root);
+ FileSystem fs = rootPath.getFileSystem(conf);
+ for (RemoteIterator<LocatedFileStatus> itr = fs.listFiles(rootPath,
true); itr.hasNext(); ) {
+ LocatedFileStatus status = itr.next();
+ if (status.isFile() && (ignoreExtension ||
status.getPath().getName().endsWith(".orc"))) {
+ inputFiles.add(status.getPath());
+ }
+ }
+ }
+ if (inputFiles.isEmpty()) {
+ System.err.println("No files found.");
+ System.exit(1);
+ }
+
+ List<Path> mergedFiles = OrcFile.mergeFiles(
+ new Path(outputFilename), writerOptions, inputFiles);
+
+ List<Path> unSuccessMergedFiles = new ArrayList<>();
+ if (mergedFiles.size() != inputFiles.size()) {
+ Set<Path> mergedFilesSet = new HashSet<>(mergedFiles);
+ for (Path inputFile : inputFiles) {
+ if (!mergedFilesSet.contains(inputFile)) {
+ unSuccessMergedFiles.add(inputFile);
+ }
+ }
+ }
+
+ if (!unSuccessMergedFiles.isEmpty()) {
+ System.err.println("List of files that could not be merged:");
+ unSuccessMergedFiles.forEach(path ->
System.err.println(path.toString()));
+ }
+
+ System.out.printf("Output path: %s, Input files size: %d, Merge files
size: %d%n",
+ outputFilename, inputFiles.size(), mergedFiles.size());
+ if (!unSuccessMergedFiles.isEmpty()) {
+ System.exit(1);
+ }
+ }
+
+ private static Options createOptions() {
+ Options result = new Options();
+ result.addOption(Option.builder("o")
+ .longOpt("output")
+ .desc("Output filename")
+ .hasArg()
+ .build());
+
+ result.addOption(Option.builder("i")
+ .longOpt("ignoreExtension")
+ .desc("Ignore ORC file extension")
+ .build());
+
+ result.addOption(Option.builder("h")
+ .longOpt("help")
+ .desc("Print help message")
+ .build());
+ return result;
+ }
+}
diff --git a/java/tools/src/test/org/apache/orc/tools/merge/TestMergeFiles.java
b/java/tools/src/test/org/apache/orc/tools/merge/TestMergeFiles.java
new file mode 100644
index 000000000..2088b90ba
--- /dev/null
+++ b/java/tools/src/test/org/apache/orc/tools/merge/TestMergeFiles.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.tools.merge;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.CompressionKind;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.orc.tools.MergeFiles;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestMergeFiles {
+ private Path workDir = new Path(
+ Paths.get(System.getProperty("test.tmp.dir"),
"orc-test-merge").toString());
+ private Configuration conf;
+ private FileSystem fs;
+ private Path testFilePath;
+
+ @BeforeEach
+ public void openFileSystem() throws Exception {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf);
+ fs.setWorkingDirectory(workDir);
+ fs.mkdirs(workDir);
+ fs.deleteOnExit(workDir);
+ testFilePath = new Path("TestMergeFiles.testMerge.orc");
+ fs.delete(testFilePath, false);
+ }
+
+ @Test
+ public void testMerge() throws Exception {
+ TypeDescription schema =
TypeDescription.fromString("struct<x:int,y:string>");
+ Map<String, Integer> fileToRowCountMap = new LinkedHashMap<>();
+ fileToRowCountMap.put("test-merge-1.orc", 10000);
+ fileToRowCountMap.put("test-merge-2.orc", 20000);
+ for (Map.Entry<String, Integer> fileToRowCount :
fileToRowCountMap.entrySet()) {
+ Writer writer = OrcFile.createWriter(new Path(fileToRowCount.getKey()),
+ OrcFile.writerOptions(conf)
+ .setSchema(schema));
+ VectorizedRowBatch batch = schema.createRowBatch();
+ LongColumnVector x = (LongColumnVector) batch.cols[0];
+ BytesColumnVector y = (BytesColumnVector) batch.cols[1];
+ for (int r = 0; r < fileToRowCount.getValue(); ++r) {
+ int row = batch.size++;
+ x.vector[row] = r;
+ byte[] buffer = ("byte-" + r).getBytes();
+ y.setRef(row, buffer, 0, buffer.length);
+ if (batch.size == batch.getMaxSize()) {
+ writer.addRowBatch(batch);
+ batch.reset();
+ }
+ }
+ if (batch.size != 0) {
+ writer.addRowBatch(batch);
+ }
+ writer.close();
+ }
+
+ PrintStream origOut = System.out;
+ ByteArrayOutputStream myOut = new ByteArrayOutputStream();
+ // replace stdout and run command
+ System.setOut(new PrintStream(myOut, false, StandardCharsets.UTF_8));
+ MergeFiles.main(conf, new String[]{workDir.toString(),
+ "--output", testFilePath.toString()});
+ System.out.flush();
+ System.setOut(origOut);
+ String output = myOut.toString(StandardCharsets.UTF_8);
+ System.out.println(output);
+ assertTrue(output.contains("Input files size: 2, Merge files size: 2"));
+
+ try (Reader reader = OrcFile.createReader(testFilePath,
OrcFile.readerOptions(conf))) {
+ assertEquals(schema, reader.getSchema());
+ assertEquals(CompressionKind.ZSTD, reader.getCompressionKind());
+ assertEquals(2, reader.getStripes().size());
+ assertEquals(10000 + 20000, reader.getNumberOfRows());
+ }
+ }
+}
diff --git a/site/_docs/java-tools.md b/site/_docs/java-tools.md
index 7b1069ea1..87c30af36 100644
--- a/site/_docs/java-tools.md
+++ b/site/_docs/java-tools.md
@@ -331,6 +331,15 @@ Percent Bytes/Row Name
______________________________________________________________________
~~~
+## Java Merge
+
+The merge command can merge multiple ORC files that all have the same schema
into a single ORC file.
+
+~~~ shell
+% java -jar orc-tools-X.Y.Z-uber.jar merge --output /path/to/merged.orc
/path/to/input_orc/
+______________________________________________________________________
+~~~
+
## Java Version
The version command prints the version of this ORC tool.