Repository: parquet-mr Updated Branches: refs/heads/master c3819688c -> b1ea059a6
PARQUET-381: Add feature to merge metadata (summary) files, and control which files are generated 1) Add helper to merge 2 summary files, useful for merging 2 directories of data into 1 2) Add more control over whether _common_metadata, _metadata, or both is written Author: Alex Levenson <[email protected]> Closes #277 from isnotinvain/alexlevenson/merge-summary-files and squashes the following commits: 86232f5 [Alex Levenson] Address comments 96b9495 [Alex Levenson] Fix null extraMetaData 099c913 [Alex Levenson] Make deprecated method delegate to new method 7a98957 [Alex Levenson] Merge branch 'master' into alexlevenson/merge-summary-files ddaf4ff [Alex Levenson] Introduce job summary levels for controlling which metadata files are generated 87a2ebc [Alex Levenson] Update comments 9d2b8da [Alex Levenson] Add helper method for merging metadata files Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/b1ea059a Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/b1ea059a Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/b1ea059a Branch: refs/heads/master Commit: b1ea059a66c7d6d6bb4cb53d2005a9b7bb599ada Parents: c381968 Author: Alex Levenson <[email protected]> Authored: Tue Oct 13 15:54:03 2015 -0700 Committer: Alex Levenson <[email protected]> Committed: Tue Oct 13 15:54:03 2015 -0700 ---------------------------------------------------------------------- .../parquet/hadoop/ParquetFileReader.java | 13 +- .../parquet/hadoop/ParquetFileWriter.java | 65 +++++- .../parquet/hadoop/ParquetOutputCommitter.java | 66 ++++-- .../parquet/hadoop/ParquetOutputFormat.java | 51 ++++- .../hadoop/example/ExampleParquetWriter.java | 11 +- .../hadoop/example/GroupWriteSupport.java | 12 +- .../parquet/hadoop/TestMergeMetadataFiles.java | 215 +++++++++++++++++++ .../parquet/hadoop/TestParquetFileWriter.java | 5 +- .../TestParquetOutputFormatJobSummaryLevel.java | 69 ++++++ 9 files changed, 478 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index ea7a672..f43e692 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -252,6 +252,15 @@ public class ParquetFileReader implements Closeable { /** * Read the footers of all the files under that path (recursively) * not using summary files. + */ + public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus, boolean skipRowGroups) throws IOException { + List<FileStatus> statuses = listFiles(configuration, fileStatus); + return readAllFootersInParallel(configuration, statuses, skipRowGroups); + } + + /** + * Read the footers of all the files under that path (recursively) + * not using summary files. * rowGroups are not skipped * @param configuration the configuration to access the FS * @param fileStatus the root dir @@ -259,10 +268,10 @@ public class ParquetFileReader implements Closeable { * @throws IOException */ public static List<Footer> readAllFootersInParallel(Configuration configuration, FileStatus fileStatus) throws IOException { - List<FileStatus> statuses = listFiles(configuration, fileStatus); - return readAllFootersInParallel(configuration, statuses, false); + return readAllFootersInParallel(configuration, fileStatus, false); } + @Deprecated public static List<Footer> readFooters(Configuration configuration, Path path) throws IOException { return readFooters(configuration, status(configuration, path)); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index d6d8369..664ee9d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.Log; +import org.apache.parquet.Preconditions; import org.apache.parquet.Version; import org.apache.parquet.bytes.BytesInput; import org.apache.parquet.bytes.BytesUtils; @@ -46,6 +47,7 @@ import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Encoding; import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.metadata.BlockMetaData; @@ -474,30 +476,83 @@ public class ParquetFileWriter { org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer); writeFileMetaData(parquetMetadata, out); if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex)); - BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex)); + BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex)); out.write(MAGIC); } /** + * Given a list of metadata files, merge them into a single ParquetMetadata + * Requires that the schemas be compatible, and the extraMetadata be exactly equal. + */ + public static ParquetMetadata mergeMetadataFiles(List<Path> files, Configuration conf) throws IOException { + Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list of metadata"); + + GlobalMetaData globalMetaData = null; + List<BlockMetaData> blocks = new ArrayList<BlockMetaData>(); + + for (Path p : files) { + ParquetMetadata pmd = ParquetFileReader.readFooter(conf, p, ParquetMetadataConverter.NO_FILTER); + FileMetaData fmd = pmd.getFileMetaData(); + globalMetaData = mergeInto(fmd, globalMetaData, true); + blocks.addAll(pmd.getBlocks()); + } + + // collapse GlobalMetaData into a single FileMetaData, which will throw if they are not compatible + return new ParquetMetadata(globalMetaData.merge(), blocks); + } + + /** + * Given a list of metadata files, merge them into a single metadata file. + * Requires that the schemas be compatible, and the extraMetaData be exactly equal. + * This is useful when merging 2 directories of parquet files into a single directory, as long + * as both directories were written with compatible schemas and equal extraMetaData. + */ + public static void writeMergedMetadataFile(List<Path> files, Path outputPath, Configuration conf) throws IOException { + ParquetMetadata merged = mergeMetadataFiles(files, conf); + writeMetadataFile(outputPath, merged, outputPath.getFileSystem(conf)); + } + + /** * writes a _metadata and _common_metadata file * @param configuration the configuration to use to get the FileSystem * @param outputPath the directory to write the _metadata file to * @param footers the list of footers to merge + * @deprecated use the variant of writeMetadataFile that takes a {@link JobSummaryLevel} as an argument. * @throws IOException */ + @Deprecated public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException { + writeMetadataFile(configuration, outputPath, footers, JobSummaryLevel.ALL); + } + + /** + * writes _common_metadata file, and optionally a _metadata file depending on the {@link JobSummaryLevel} provided + */ + public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers, JobSummaryLevel level) throws IOException { + Preconditions.checkArgument(level == JobSummaryLevel.ALL || level == JobSummaryLevel.COMMON_ONLY, + "Unsupported level: " + level); + FileSystem fs = outputPath.getFileSystem(configuration); outputPath = outputPath.makeQualified(fs); ParquetMetadata metadataFooter = mergeFooters(outputPath, footers); - writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE); + + if (level == JobSummaryLevel.ALL) { + writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE); + } + metadataFooter.getBlocks().clear(); writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE); } - private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile) + private static void writeMetadataFile(Path outputPathRoot, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile) + throws IOException { + Path metaDataPath = new Path(outputPathRoot, parquetMetadataFile); + writeMetadataFile(metaDataPath, metadataFooter, fs); + } + + private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs) throws IOException { - Path metaDataPath = new Path(outputPath, parquetMetadataFile); - FSDataOutputStream metadata = fs.create(metaDataPath); + FSDataOutputStream metadata = fs.create(outputPath); metadata.write(MAGIC); serializeFooter(metadataFooter, metadata); metadata.close(); http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java index 9a0930a..45455ef 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputCommitter.java @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.parquet.Log; +import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.apache.parquet.hadoop.util.ContextUtil; public class ParquetOutputCommitter extends FileOutputCommitter { @@ -48,30 +49,63 @@ public class ParquetOutputCommitter extends FileOutputCommitter { writeMetaDataFile(configuration,outputPath); } + // TODO: This method should propagate errors, and we should clean up + // TODO: all the catching of Exceptions below -- see PARQUET-383 public static void writeMetaDataFile(Configuration configuration, Path outputPath) { - if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) { + JobSummaryLevel level = ParquetOutputFormat.getJobSummaryLevel(configuration); + if (level == JobSummaryLevel.NONE) { + return; + } + + try { + final FileSystem fileSystem = outputPath.getFileSystem(configuration); + FileStatus outputStatus = fileSystem.getFileStatus(outputPath); + List<Footer> footers; + + switch (level) { + case ALL: + footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus, false); // don't skip row groups + break; + case COMMON_ONLY: + footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus, true); // skip row groups + break; + default: + throw new IllegalArgumentException("Unrecognized job summary level: " + level); + } + + // If there are no footers, _metadata file cannot be written since there is no way to determine schema! + // Onus of writing any summary files lies with the caller in this case. + if (footers.isEmpty()) { + return; + } + try { - final FileSystem fileSystem = outputPath.getFileSystem(configuration); - FileStatus outputStatus = fileSystem.getFileStatus(outputPath); - List<Footer> footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus); - // If there are no footers, _metadata file cannot be written since there is no way to determine schema! - // Onus of writing any summary files lies with the caller in this case. - if (footers.isEmpty()) { - return; - } + ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers, level); + } catch (Exception e) { + LOG.warn("could not write summary file(s) for " + outputPath, e); + + final Path metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE); + try { - ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers); - } catch (Exception e) { - LOG.warn("could not write summary file for " + outputPath, e); - final Path metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE); if (fileSystem.exists(metadataPath)) { fileSystem.delete(metadataPath, true); } + } catch (Exception e2) { + LOG.warn("could not delete metadata file" + outputPath, e2); } - } catch (Exception e) { - LOG.warn("could not write summary file for " + outputPath, e); + + try { + final Path commonMetadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE); + if (fileSystem.exists(commonMetadataPath)) { + fileSystem.delete(commonMetadataPath, true); + } + } catch (Exception e2) { + LOG.warn("could not delete metadata file" + outputPath, e2); + } + } + } catch (Exception e) { + LOG.warn("could not write summary file for " + outputPath, e); } } - } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index e075db3..ad6c034 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -103,6 +103,33 @@ import org.apache.parquet.hadoop.util.ConfigurationUtil; public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> { private static final Log LOG = Log.getLog(ParquetOutputFormat.class); + public static enum JobSummaryLevel { + /** + * Write no summary files + */ + NONE, + /** + * Write both summary file with row group info and summary file without + * (both _metadata and _common_metadata) + */ + ALL, + /** + * Write only the summary file without the row group info + * (_common_metadata only) + */ + COMMON_ONLY + } + + /** + * An alias for JOB_SUMMARY_LEVEL, where true means ALL and false means NONE + */ + @Deprecated + public static final String ENABLE_JOB_SUMMARY = "parquet.enable.summary-metadata"; + + /** + * Must be one of the values in {@link JobSummaryLevel} (case insensitive) + */ + public static final String JOB_SUMMARY_LEVEL = "parquet.summary.metadata.level"; public static final String BLOCK_SIZE = "parquet.block.size"; public static final String PAGE_SIZE = "parquet.page.size"; public static final String COMPRESSION = "parquet.compression"; @@ -111,7 +138,6 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> { public static final String ENABLE_DICTIONARY = "parquet.enable.dictionary"; public static final String VALIDATION = "parquet.validation"; public static final String WRITER_VERSION = "parquet.writer.version"; - public static final String ENABLE_JOB_SUMMARY = "parquet.enable.summary-metadata"; public static final String MEMORY_POOL_RATIO = "parquet.memory.pool.ratio"; public static final String MIN_MEMORY_ALLOCATION = "parquet.memory.min.chunk.size"; public static final String MAX_PADDING_BYTES = "parquet.writer.max-padding"; @@ -119,6 +145,29 @@ public class ParquetOutputFormat<T> extends FileOutputFormat<Void, T> { // default to no padding for now private static final int DEFAULT_MAX_PADDING_SIZE = 0; + public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { + String level = conf.get(JOB_SUMMARY_LEVEL); + String deprecatedFlag = conf.get(ENABLE_JOB_SUMMARY); + + if (deprecatedFlag != null) { + LOG.warn("Setting " + ENABLE_JOB_SUMMARY + " is deprecated, please use " + JOB_SUMMARY_LEVEL); + } + + if (level != null && deprecatedFlag != null) { + LOG.warn("Both " + JOB_SUMMARY_LEVEL + " and " + ENABLE_JOB_SUMMARY + " are set! " + ENABLE_JOB_SUMMARY + " will be ignored."); + } + + if (level != null) { + return JobSummaryLevel.valueOf(level.toUpperCase()); + } + + if (deprecatedFlag != null) { + return Boolean.valueOf(deprecatedFlag) ? JobSummaryLevel.ALL : JobSummaryLevel.NONE; + } + + return JobSummaryLevel.ALL; + } + public static void setWriteSupportClass(Job job, Class<?> writeSupportClass) { getConfiguration(job).set(WRITE_SUPPORT_CLASS, writeSupportClass.getName()); } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java index c63be91..88879c2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/ExampleParquetWriter.java @@ -27,6 +27,8 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; /** * An example file writer class. @@ -70,6 +72,7 @@ public class ExampleParquetWriter extends ParquetWriter<Group> { public static class Builder extends ParquetWriter.Builder<Group, Builder> { private MessageType type = null; + private Map<String, String> extraMetaData = new HashMap<String, String>(); private Builder(Path file) { super(file); @@ -80,6 +83,11 @@ public class ExampleParquetWriter extends ParquetWriter<Group> { return this; } + public Builder withExtraMetaData(Map<String, String> extraMetaData) { + this.extraMetaData = extraMetaData; + return this; + } + @Override protected Builder self() { return this; @@ -87,7 +95,8 @@ public class ExampleParquetWriter extends ParquetWriter<Group> { @Override protected WriteSupport<Group> getWriteSupport(Configuration conf) { - return new GroupWriteSupport(type); + return new GroupWriteSupport(type, extraMetaData); } + } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java index 25f8fe5..ee59a6e 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/example/GroupWriteSupport.java @@ -22,6 +22,7 @@ import static org.apache.parquet.Preconditions.checkNotNull; import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -45,14 +46,21 @@ public class GroupWriteSupport extends WriteSupport<Group> { return parseMessageType(checkNotNull(configuration.get(PARQUET_EXAMPLE_SCHEMA), PARQUET_EXAMPLE_SCHEMA)); } - private MessageType schema = null; + private MessageType schema; private GroupWriter groupWriter; + private Map<String, String> extraMetaData; public GroupWriteSupport() { + this(null, new HashMap<String, String>()); } GroupWriteSupport(MessageType schema) { + this(schema, new HashMap<String, String>()); + } + + GroupWriteSupport(MessageType schema, Map<String, String> extraMetaData) { this.schema = schema; + this.extraMetaData = extraMetaData; } @Override @@ -61,7 +69,7 @@ public class GroupWriteSupport extends WriteSupport<Group> { if (schema == null) { schema = getSchema(configuration); } - return new WriteContext(schema, new HashMap<String, String>()); + return new WriteContext(schema, this.extraMetaData); } @Override http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java new file mode 100644 index 0000000..6f86062 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestMergeMetadataFiles.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.MessageType; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestMergeMetadataFiles { + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static final MessageType schema = parseMessageType( + "message test { " + + "required binary binary_field; " + + "required int32 int32_field; " + + "required int64 int64_field; " + + "required boolean boolean_field; " + + "required float float_field; " + + "required double double_field; " + + "required fixed_len_byte_array(3) flba_field; " + + "required int96 int96_field; " + + "} "); + + // schema1 with a field removed + private static final MessageType schema2 = parseMessageType( + "message test { " + + "required binary binary_field; " + + "required int32 int32_field; " + + "required int64 int64_field; " + + "required boolean boolean_field; " + + "required float float_field; " + + "required double double_field; " + + "required fixed_len_byte_array(3) flba_field; " + + "} "); + + private static void writeFile(File out, Configuration conf, boolean useSchema2) throws IOException { + if (!useSchema2) { + GroupWriteSupport.setSchema(schema, conf); + } else { + GroupWriteSupport.setSchema(schema2, conf); + } + SimpleGroupFactory f = new SimpleGroupFactory(schema); + + Map<String, String> extraMetaData = new HashMap<String, String>(); + extraMetaData.put("schema_num", useSchema2 ? "2" : "1" ); + + ParquetWriter<Group> writer = ExampleParquetWriter + .builder(new Path(out.getAbsolutePath())) + .withConf(conf) + .withExtraMetaData(extraMetaData) + .build(); + + for (int i = 0; i < 1000; i++) { + Group g = f.newGroup() + .append("binary_field", "test" + i) + .append("int32_field", i) + .append("int64_field", (long) i) + .append("boolean_field", i % 2 == 0) + .append("float_field", (float) i) + .append("double_field", (double)i) + .append("flba_field", "foo"); + + if (!useSchema2) { + g = g.append("int96_field", Binary.fromConstantByteArray(new byte[12])); + } + + writer.write(g); + } + writer.close(); + } + + private static class WrittenFileInfo { + public Configuration conf; + public Path metaPath1; + public Path metaPath2; + public Path commonMetaPath1; + public Path commonMetaPath2; + } + + private WrittenFileInfo writeFiles(boolean mixedSchemas) throws Exception { + WrittenFileInfo info = new WrittenFileInfo(); + Configuration conf = new Configuration(); + info.conf = conf; + + File root1 = new File(temp.getRoot(), "out1"); + File root2 = new File(temp.getRoot(), "out2"); + Path rootPath1 = new Path(root1.getAbsolutePath()); + Path rootPath2 = new Path(root2.getAbsolutePath()); + + for (int i = 0; i < 10; i++) { + writeFile(new File(root1, i + ".parquet"), conf, true); + } + + List<Footer> footers = ParquetFileReader.readFooters(conf, rootPath1.getFileSystem(conf).getFileStatus(rootPath1), false); + ParquetFileWriter.writeMetadataFile(conf, rootPath1, footers, JobSummaryLevel.ALL); + + for (int i = 0; i < 7; i++) { + writeFile(new File(root2, i + ".parquet"), conf, !mixedSchemas); + } + + footers = ParquetFileReader.readFooters(conf, rootPath2.getFileSystem(conf).getFileStatus(rootPath2), false); + ParquetFileWriter.writeMetadataFile(conf, rootPath2, footers, JobSummaryLevel.ALL); + + info.commonMetaPath1 = new Path(new File(root1, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE).getAbsolutePath()); + info.commonMetaPath2 = new Path(new File(root2, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE).getAbsolutePath()); + info.metaPath1 = new Path(new File(root1, ParquetFileWriter.PARQUET_METADATA_FILE).getAbsolutePath()); + info.metaPath2 = new Path(new File(root2, ParquetFileWriter.PARQUET_METADATA_FILE).getAbsolutePath()); + + return info; + } + + @Test + public void testMergeMetadataFiles() throws Exception { + WrittenFileInfo info = writeFiles(false); + + ParquetMetadata commonMeta1 = ParquetFileReader.readFooter(info.conf, info.commonMetaPath1, ParquetMetadataConverter.NO_FILTER); + ParquetMetadata commonMeta2 = ParquetFileReader.readFooter(info.conf, info.commonMetaPath2, ParquetMetadataConverter.NO_FILTER); + ParquetMetadata meta1 = ParquetFileReader.readFooter(info.conf, info.metaPath1, ParquetMetadataConverter.NO_FILTER); + ParquetMetadata meta2 = ParquetFileReader.readFooter(info.conf, info.metaPath2, ParquetMetadataConverter.NO_FILTER); + + assertTrue(commonMeta1.getBlocks().isEmpty()); + assertTrue(commonMeta2.getBlocks().isEmpty()); + assertEquals(commonMeta1.getFileMetaData().getSchema(), commonMeta2.getFileMetaData().getSchema()); + + assertFalse(meta1.getBlocks().isEmpty()); + assertFalse(meta2.getBlocks().isEmpty()); + assertEquals(meta1.getFileMetaData().getSchema(), meta2.getFileMetaData().getSchema()); + + + assertEquals(commonMeta1.getFileMetaData().getKeyValueMetaData(), commonMeta2.getFileMetaData().getKeyValueMetaData()); + assertEquals(meta1.getFileMetaData().getKeyValueMetaData(), meta2.getFileMetaData().getKeyValueMetaData()); + + // test file serialization + Path mergedOut = new Path(new File(temp.getRoot(), "merged_meta").getAbsolutePath()); + Path mergedCommonOut = new Path(new File(temp.getRoot(), "merged_common_meta").getAbsolutePath()); + ParquetFileWriter.writeMergedMetadataFile(Arrays.asList(info.metaPath1, info.metaPath2), mergedOut, info.conf); + ParquetFileWriter.writeMergedMetadataFile(Arrays.asList(info.commonMetaPath1, info.commonMetaPath2), mergedCommonOut, info.conf); + + ParquetMetadata mergedMeta = ParquetFileReader.readFooter(info.conf, mergedOut, ParquetMetadataConverter.NO_FILTER); + ParquetMetadata mergedCommonMeta = ParquetFileReader.readFooter(info.conf, mergedCommonOut, ParquetMetadataConverter.NO_FILTER); + + // ideally we'd assert equality here, but BlockMetaData and it's references don't implement equals + assertEquals(meta1.getBlocks().size() + meta2.getBlocks().size(), mergedMeta.getBlocks().size()); + assertTrue(mergedCommonMeta.getBlocks().isEmpty()); + + assertEquals(meta1.getFileMetaData().getSchema(), mergedMeta.getFileMetaData().getSchema()); + assertEquals(commonMeta1.getFileMetaData().getSchema(), mergedCommonMeta.getFileMetaData().getSchema()); + + assertEquals(meta1.getFileMetaData().getKeyValueMetaData(), mergedMeta.getFileMetaData().getKeyValueMetaData()); + assertEquals(commonMeta1.getFileMetaData().getKeyValueMetaData(), mergedCommonMeta.getFileMetaData().getKeyValueMetaData()); + } + + @Test + public void testThrowsWhenIncompatible() throws Exception { + WrittenFileInfo info = writeFiles(true); + + Path mergedOut = new Path(new File(temp.getRoot(), "merged_meta").getAbsolutePath()); + Path mergedCommonOut = new Path(new File(temp.getRoot(), "merged_common_meta").getAbsolutePath()); + + try { + ParquetFileWriter.writeMergedMetadataFile(Arrays.asList(info.metaPath1, info.metaPath2), mergedOut, info.conf); + fail("this should throw"); + } catch (RuntimeException e) { + assertEquals("could not merge metadata: key schema_num has conflicting values: [2, 1]", e.getMessage()); + } + + try { + ParquetFileWriter.writeMergedMetadataFile(Arrays.asList(info.commonMetaPath1, info.commonMetaPath2), mergedCommonOut, info.conf); + fail("this should throw"); + } catch (RuntimeException e) { + assertEquals("could not merge metadata: key schema_num has conflicting values: [2, 1]", e.getMessage()); + } + } +} http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index d22b657..597daa8 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -28,6 +28,7 @@ import org.apache.parquet.CorruptStatistics; import org.apache.parquet.Version; import org.apache.parquet.VersionParser; import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; import org.junit.Assume; import org.junit.Rule; import org.junit.Test; @@ -550,7 +551,7 @@ public class TestParquetFileWriter { FileStatus outputStatus = fs.getFileStatus(testDirPath); List<Footer> footers = ParquetFileReader.readFooters(configuration, outputStatus, false); validateFooters(footers); - ParquetFileWriter.writeMetadataFile(configuration, testDirPath, footers); + ParquetFileWriter.writeMetadataFile(configuration, testDirPath, footers, JobSummaryLevel.ALL); footers = ParquetFileReader.readFooters(configuration, outputStatus, false); validateFooters(footers); @@ -759,7 +760,7 @@ public class TestParquetFileWriter { footers.add(footer); // This should not throw an exception - ParquetFileWriter.writeMetadataFile(conf, relativeRoot, footers); + ParquetFileWriter.writeMetadataFile(conf, relativeRoot, footers, JobSummaryLevel.ALL); } } http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/b1ea059a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetOutputFormatJobSummaryLevel.java ---------------------------------------------------------------------- diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetOutputFormatJobSummaryLevel.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetOutputFormatJobSummaryLevel.java new file mode 100644 index 0000000..b81dd0c --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetOutputFormatJobSummaryLevel.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestParquetOutputFormatJobSummaryLevel { + @Test + public void testDefault() throws Exception { + Configuration conf = new Configuration(); + // default should be ALL + assertEquals(JobSummaryLevel.ALL, ParquetOutputFormat.getJobSummaryLevel(conf)); + } + + @Test + public void testDeprecatedStillWorks() throws Exception { + Configuration conf = new Configuration(); + + conf.set(ParquetOutputFormat.ENABLE_JOB_SUMMARY, "true"); + assertEquals(JobSummaryLevel.ALL, ParquetOutputFormat.getJobSummaryLevel(conf)); + + conf.set(ParquetOutputFormat.ENABLE_JOB_SUMMARY, "false"); + assertEquals(JobSummaryLevel.NONE, ParquetOutputFormat.getJobSummaryLevel(conf)); + } + + @Test + public void testLevelParses() throws Exception { + Configuration conf = new Configuration(); + + conf.set(ParquetOutputFormat.JOB_SUMMARY_LEVEL, "all"); + assertEquals(JobSummaryLevel.ALL, ParquetOutputFormat.getJobSummaryLevel(conf)); + + conf.set(ParquetOutputFormat.JOB_SUMMARY_LEVEL, "common_only"); + assertEquals(JobSummaryLevel.COMMON_ONLY, ParquetOutputFormat.getJobSummaryLevel(conf)); + + conf.set(ParquetOutputFormat.JOB_SUMMARY_LEVEL, "none"); + assertEquals(JobSummaryLevel.NONE, ParquetOutputFormat.getJobSummaryLevel(conf)); + } + + @Test + public void testLevelTakesPrecedence() throws Exception { + Configuration conf = new Configuration(); + + conf.set(ParquetOutputFormat.JOB_SUMMARY_LEVEL, "common_only"); + conf.set(ParquetOutputFormat.ENABLE_JOB_SUMMARY, "false"); + assertEquals(JobSummaryLevel.COMMON_ONLY, ParquetOutputFormat.getJobSummaryLevel(conf)); + } + +}
