Repository: parquet-mr Updated Branches: refs/heads/master 898f3d0f6 -> 255f10834
PARQUET-460: merge multi parquet files to one file A merge command for parquet-tools based on https://issues.apache.org/jira/browse/PARQUET-382. Author: flykobe <[email protected]> Closes #327 from flykobe/merge_tool and squashes the following commits: b031c18 [flykobe] check input files da28832 [flykobe] merge multi parquet files to one file Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/255f1083 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/255f1083 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/255f1083 Branch: refs/heads/master Commit: 255f10834a67cf13518316de0e2c8a345677ebbf Parents: 898f3d0 Author: flykobe <[email protected]> Authored: Tue Aug 16 10:40:52 2016 -0700 Committer: Julien Le Dem <[email protected]> Committed: Tue Aug 16 10:40:52 2016 -0700 ---------------------------------------------------------------------- .../parquet/tools/command/MergeCommand.java | 157 +++++++++++++++++++ .../apache/parquet/tools/command/Registry.java | 1 + parquet-tools/src/main/scripts/parquet-merge | 28 ++++ 3 files changed, 186 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/255f1083/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java ---------------------------------------------------------------------- diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java new file mode 100644 index 0000000..e6d9747 --- /dev/null +++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.tools.command; + +import org.apache.commons.cli.CommandLine; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.util.HiddenFileFilter; +import org.slf4j.Logger; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class MergeCommand extends ArgsOnlyCommand { + public static final String[] USAGE = new String[] { + "<input> [<input> ...] <output>", + "where <input> is the source parquet files/directory to be merged", + " <output> is the destination parquet file" + }; + + /** + * Biggest number of input files we can merge. + */ + private static final int MAX_FILE_NUM = 100; + + private Configuration conf; + + public MergeCommand() { + super(2, MAX_FILE_NUM + 1); + + conf = new Configuration(); + } + + @Override + public String[] getUsageDescription() { + return USAGE; + } + + @Override + public void execute(CommandLine options) throws Exception { + // Prepare arguments + List<String> args = options.getArgList(); + List<Path> inputFiles = getInputFiles(args.subList(0, args.size() - 1)); + Path outputFile = new Path(args.get(args.size() - 1)); + + // Merge schema and extraMeta + FileMetaData mergedMeta = mergedMetadata(inputFiles); + + // Merge data + ParquetFileWriter writer = new ParquetFileWriter(conf, + mergedMeta.getSchema(), outputFile, ParquetFileWriter.Mode.CREATE); + writer.start(); + for (Path input: inputFiles) { + writer.appendFile(conf, input); + } + writer.end(mergedMeta.getKeyValueMetaData()); + } + + private FileMetaData mergedMetadata(List<Path> inputFiles) throws IOException { + return ParquetFileWriter.mergeMetadataFiles(inputFiles, conf).getFileMetaData(); + } + + /** + * Get all input files. + * @param input input files or directory. + * @return ordered input files. + */ + private List<Path> getInputFiles(List<String> input) throws IOException { + List<Path> inputFiles = null; + + if (input.size() == 1) { + Path p = new Path(input.get(0)); + FileSystem fs = p.getFileSystem(conf); + FileStatus status = fs.getFileStatus(p); + + if (status.isDir()) { + inputFiles = getInputFilesFromDirectory(status); + } + } else { + inputFiles = parseInputFiles(input); + } + + checkParquetFiles(inputFiles); + + return inputFiles; + } + + /** + * Check input files basically. + * ParquetFileReader will throw exception when reading an illegal parquet file. + * + * @param inputFiles files to be merged. + * @throws IOException + */ + private void checkParquetFiles(List<Path> inputFiles) throws IOException { + if (inputFiles == null || inputFiles.size() <= 1) { + throw new IllegalArgumentException("Not enough files to merge"); + } + + for (Path inputFile: inputFiles) { + FileSystem fs = inputFile.getFileSystem(conf); + FileStatus status = fs.getFileStatus(inputFile); + + if (status.isDir()) { + throw new IllegalArgumentException("Illegal parquet file: " + inputFile.toUri()); + } + } + } + + /** + * Get all parquet files under partition directory. + * @param partitionDir partition directory. + * @return parquet files to be merged. + */ + private List<Path> getInputFilesFromDirectory(FileStatus partitionDir) throws IOException { + FileSystem fs = partitionDir.getPath().getFileSystem(conf); + FileStatus[] inputFiles = fs.listStatus(partitionDir.getPath(), HiddenFileFilter.INSTANCE); + + List<Path> input = new ArrayList<Path>(); + for (FileStatus f: inputFiles) { + input.add(f.getPath()); + } + return input; + } + + private List<Path> parseInputFiles(List<String> input) { + List<Path> inputFiles = new ArrayList<Path>(); + + for (String name: input) { + inputFiles.add(new Path(name)); + } + + return inputFiles; + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/255f1083/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java ---------------------------------------------------------------------- diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java index d9c59cc..a722408 100644 --- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java +++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java @@ -31,6 +31,7 @@ public final class Registry { registry.put("schema", ShowSchemaCommand.class); registry.put("meta", ShowMetaCommand.class); registry.put("dump", DumpCommand.class); + registry.put("merge", MergeCommand.class); } public static Map<String,Command> allCommands() { http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/255f1083/parquet-tools/src/main/scripts/parquet-merge ---------------------------------------------------------------------- diff --git a/parquet-tools/src/main/scripts/parquet-merge b/parquet-tools/src/main/scripts/parquet-merge new file mode 100755 index 0000000..995a105 --- /dev/null +++ b/parquet-tools/src/main/scripts/parquet-merge @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# The name of the top-level script +TOPSCRIPT="parquet-tools" + +# Determine the path to the script's directory +APPPATH=$( cd "$(dirname "$0")" ; pwd -P ) + +# Run the application +exec "${APPPATH}/${TOPSCRIPT}" merge "$@"
