This is an automated email from the ASF dual-hosted git repository. gabor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push: new 720aa3d3c PARQUET-2228: ParquetRewriter supports more than one input file (#1026) 720aa3d3c is described below commit 720aa3d3c1ce1ce5c5e2b7715cd8c5f11bdc8ede Author: Gang Wu <ust...@gmail.com> AuthorDate: Tue Feb 21 18:43:09 2023 +0800 PARQUET-2228: ParquetRewriter supports more than one input file (#1026) --- .../parquet/hadoop/rewrite/ParquetRewriter.java | 112 ++++++-- .../parquet/hadoop/rewrite/RewriteOptions.java | 88 +++++- .../hadoop/rewrite/ParquetRewriterTest.java | 319 +++++++++++++++++---- 3 files changed, 433 insertions(+), 86 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index e5befa99d..834de8340 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -45,7 +45,6 @@ import org.apache.parquet.format.PageHeader; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.CodecFactory; import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; -import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileWriter; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; @@ -63,6 +62,7 @@ import org.apache.parquet.io.api.Converter; import org.apache.parquet.io.api.GroupConverter; import org.apache.parquet.io.api.PrimitiveConverter; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -74,15 +74,16 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH; import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; -import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT; @@ -93,41 +94,49 @@ public class ParquetRewriter implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(ParquetRewriter.class); private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2; private final byte[] pageBuffer = new byte[pageBufferSize]; - private TransParquetFileReader reader; - private ParquetFileWriter writer; - private ParquetMetadata meta; - private MessageType schema; - private String originalCreatedBy; + // Configurations for the new file private CompressionCodecName newCodecName = null; private List<String> pruneColumns = null; private Map<ColumnPath, MaskMode> maskColumns = null; private Set<ColumnPath> encryptColumns = null; private boolean encryptMode = false; private Map<String, String> extraMetaData = new HashMap<>(); + // Writer to rewrite the input files + private ParquetFileWriter writer; + // Number of blocks written which is used to keep track of the actual row group ordinal + private int numBlocksRewritten = 0; + // Reader and relevant states of the in-processing input file + private Queue<TransParquetFileReader> inputFiles = new LinkedList<>(); + // Schema of input files (should be the same) and to write to the output file + private MessageType schema = null; + // The reader for the current input file + private TransParquetFileReader reader = null; + // The metadata of current reader being processed + private ParquetMetadata meta = null; + // created_by information of current reader being processed + private String originalCreatedBy = ""; + // Unique created_by information from all input files + private Set<String> allOriginalCreatedBys = new HashSet<>(); public ParquetRewriter(RewriteOptions options) throws IOException { - Preconditions.checkArgument(options.getInputFiles().size() == 1, "Only support one input file"); - Path inPath = options.getInputFiles().get(0); - Path outPath = options.getOutputFile(); Configuration conf = options.getConf(); + Path outPath = options.getOutputFile(); + openInputFiles(options.getInputFiles(), conf); + LOG.info("Start rewriting {} input files {} to {}", inputFiles.size(), outPath); + + // Init reader of the first input file + initNextReader(); newCodecName = options.getNewCodecName(); pruneColumns = options.getPruneColumns(); - // Get file metadata and full schema from the input file - meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER); - schema = meta.getFileMetaData().getSchema(); - originalCreatedBy = meta.getFileMetaData().getCreatedBy(); - extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData()); - extraMetaData.put(ORIGINAL_CREATED_BY_KEY, originalCreatedBy); - // Prune columns if specified if (pruneColumns != null && !pruneColumns.isEmpty()) { List<String> paths = new ArrayList<>(); getPaths(schema, paths, null); for (String col : pruneColumns) { if (!paths.contains(col)) { - LOG.warn("Input column name {} doesn't show up in the schema of file {}", col, inPath.getName()); + LOG.warn("Input column name {} doesn't show up in the schema of file {}", col, reader.getFile()); } } @@ -147,9 +156,6 @@ public class ParquetRewriter implements Closeable { this.encryptMode = true; } - reader = new TransParquetFileReader( - HadoopInputFile.fromPath(inPath, conf), HadoopReadOptions.builder(conf).build()); - ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE; writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, writerMode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, @@ -183,12 +189,69 @@ public class ParquetRewriter implements Closeable { } } + // Open all input files to validate their schemas are compatible to merge + private void openInputFiles(List<Path> inputFiles, Configuration conf) { + Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), "No input files"); + + for (Path inputFile : inputFiles) { + try { + TransParquetFileReader reader = new TransParquetFileReader( + HadoopInputFile.fromPath(inputFile, conf), HadoopReadOptions.builder(conf).build()); + MessageType inputFileSchema = reader.getFooter().getFileMetaData().getSchema(); + if (this.schema == null) { + this.schema = inputFileSchema; + } else { + // Now we enforce equality of schemas from input files for simplicity. + if (!this.schema.equals(inputFileSchema)) { + LOG.error("Input files have different schemas, expect: {}, input: {}, current file: {}", + this.schema, inputFileSchema, inputFile); + throw new InvalidSchemaException("Input files have different schemas, current file: " + inputFile); + } + } + this.allOriginalCreatedBys.add(reader.getFooter().getFileMetaData().getCreatedBy()); + this.inputFiles.add(reader); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to open input file: " + inputFile, e); + } + } + + extraMetaData.put(ORIGINAL_CREATED_BY_KEY, String.join("\n", allOriginalCreatedBys)); + } + + // Routines to get reader of next input file and set up relevant states + private void initNextReader() { + if (reader != null) { + LOG.info("Finish rewriting input file: {}", reader.getFile()); + } + + if (inputFiles.isEmpty()) { + reader = null; + meta = null; + originalCreatedBy = null; + return; + } + + reader = inputFiles.poll(); + meta = reader.getFooter(); + originalCreatedBy = meta.getFileMetaData().getCreatedBy(); + extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData()); + + LOG.info("Rewriting input file: {}, remaining files: {}", reader.getFile(), inputFiles.size()); + } + @Override public void close() throws IOException { writer.end(extraMetaData); } public void processBlocks() throws IOException { + while (reader != null) { + processBlocksFromReader(); + initNextReader(); + } + } + + private void processBlocksFromReader() throws IOException { PageReadStore store = reader.readNextRowGroup(); ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, originalCreatedBy); Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect( @@ -236,7 +299,6 @@ public class ParquetRewriter implements Closeable { writer, schema, newCodecName, - blockId, encryptColumn); } else { throw new UnsupportedOperationException("Only nullify is supported for now"); @@ -246,7 +308,7 @@ public class ParquetRewriter implements Closeable { ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null; if (encryptMode) { columnChunkEncryptorRunTime = - new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId); + new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk, numBlocksRewritten, columnId); } // Translate compression and/or encryption @@ -267,6 +329,7 @@ public class ParquetRewriter implements Closeable { writer.endBlock(); store = reader.readNextRowGroup(); blockId++; + numBlocksRewritten++; } } @@ -571,7 +634,6 @@ public class ParquetRewriter implements Closeable { ParquetFileWriter writer, MessageType schema, CompressionCodecName newCodecName, - int rowGroupOrdinal, boolean encryptColumn) throws IOException { if (encryptColumn) { Preconditions.checkArgument(writer.getEncryptor() != null, "Missing encryptor"); @@ -593,7 +655,7 @@ public class ParquetRewriter implements Closeable { MessageType newSchema = newSchema(schema, descriptor); ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore( compressor, newSchema, props.getAllocator(), props.getColumnIndexTruncateLength(), - props.getPageWriteChecksumEnabled(), writer.getEncryptor(), rowGroupOrdinal); + props.getPageWriteChecksumEnabled(), writer.getEncryptor(), numBlocksRewritten); ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore); ColumnWriter cWriter = cStore.getColumnWriter(descriptor); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index e15ba43c7..cc1280921 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -28,7 +28,9 @@ import java.util.Arrays; import java.util.List; import java.util.Map; -// A set of options to create a ParquetRewriter. +/** + * A set of options to create a ParquetRewriter. + */ public class RewriteOptions { final Configuration conf; @@ -101,37 +103,121 @@ public class RewriteOptions { private List<String> encryptColumns; private FileEncryptionProperties fileEncryptionProperties; + /** + * Create a builder to create a RewriterOptions. + * + * @param conf configuration for reading from input files and writing to output file + * @param inputFile input file path to read from + * @param outputFile output file path to rewrite to + */ public Builder(Configuration conf, Path inputFile, Path outputFile) { this.conf = conf; this.inputFiles = Arrays.asList(inputFile); this.outputFile = outputFile; } + /** + * Create a builder to create a RewriterOptions. + * <p> + * Please note that if merging more than one file, the schema of all files must be the same. + * Otherwise, the rewrite will fail. + * <p> + * The rewrite will keep original row groups from all input files. This may not be optimal + * if row groups are very small and will not solve small file problems. Instead, it will + * make it worse to have a large file footer in the output file. + * TODO: support rewrite by record to break the original row groups into reasonable ones. + * + * @param conf configuration for reading from input files and writing to output file + * @param inputFiles list of input file paths to read from + * @param outputFile output file path to rewrite to + */ + public Builder(Configuration conf, List<Path> inputFiles, Path outputFile) { + this.conf = conf; + this.inputFiles = inputFiles; + this.outputFile = outputFile; + } + + /** + * Set the columns to prune. + * <p> + * By default, all columns are kept. + * + * @param columns list of columns to prune + * @return self + */ public Builder prune(List<String> columns) { this.pruneColumns = columns; return this; } + /** + * Set the compression codec to use for the output file. + * <p> + * By default, the codec is the same as the input file. + * + * @param newCodecName compression codec to use + * @return self + */ public Builder transform(CompressionCodecName newCodecName) { this.newCodecName = newCodecName; return this; } + /** + * Set the columns to mask. + * <p> + * By default, no columns are masked. + * + * @param maskColumns map of columns to mask to the masking mode + * @return self + */ public Builder mask(Map<String, MaskMode> maskColumns) { this.maskColumns = maskColumns; return this; } + /** + * Set the columns to encrypt. + * <p> + * By default, no columns are encrypted. + * + * @param encryptColumns list of columns to encrypt + * @return self + */ public Builder encrypt(List<String> encryptColumns) { this.encryptColumns = encryptColumns; return this; } + /** + * Set the encryption properties to use for the output file. + * <p> + * This is required if encrypting columns are not empty. + * + * @param fileEncryptionProperties encryption properties to use + * @return self + */ public Builder encryptionProperties(FileEncryptionProperties fileEncryptionProperties) { this.fileEncryptionProperties = fileEncryptionProperties; return this; } + /** + * Add an input file to read from. + * + * @param path input file path to read from + * @return self + */ + public Builder addInputFile(Path path) { + this.inputFiles.add(path); + return this; + } + + /** + * Build the RewriterOptions. + * + * @return a RewriterOptions + */ public RewriteOptions build() { Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(), "Input file is required"); Preconditions.checkArgument(outputFile != null, "Output file is required"); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index 25b4f3564..043261f77 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -18,6 +18,7 @@ */ package org.apache.parquet.hadoop.rewrite; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.parquet.HadoopReadOptions; @@ -50,6 +51,7 @@ import org.apache.parquet.internal.column.columnindex.OffsetIndex; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.InvalidSchemaException; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; @@ -82,19 +84,15 @@ public class ParquetRewriterTest { private final int numRecord = 100000; private Configuration conf = new Configuration(); - private EncryptionTestFile inputFile = null; + private List<EncryptionTestFile> inputFiles = null; private String outputFile = null; private ParquetRewriter rewriter = null; - @Test - public void testPruneSingleColumnAndTranslateCodec() throws Exception { - testSetup("GZIP"); - - Path inputPath = new Path(inputFile.getFileName()); + private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths) throws Exception { Path outputPath = new Path(outputFile); List<String> pruneColumns = Arrays.asList("Gender"); CompressionCodecName newCodec = CompressionCodecName.ZSTD; - RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPath, outputPath); + RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); RewriteOptions options = builder.prune(pruneColumns).transform(newCodec).build(); rewriter = new ParquetRewriter(options); @@ -115,10 +113,12 @@ public class ParquetRewriterTest { assertEquals(subFields.get(1).getName(), "Forward"); // Verify codec has been translated - verifyCodec(outputFile, CompressionCodecName.ZSTD, null); + verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{ + add(CompressionCodecName.ZSTD); + }}, null); // Verify the data are not changed for the columns not pruned - validateColumnData(outputFile, inputFile.getFileContent(), new HashSet<>(pruneColumns), Collections.emptySet(), null); + validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), null); // Verify the page index validatePageIndex(new HashMap<Integer, Integer>() {{ @@ -133,16 +133,31 @@ public class ParquetRewriterTest { } @Test - public void testPruneNullifyAndTranslateCodec() throws Exception { - testSetup("UNCOMPRESSED"); + public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception { + testSingleInputFileSetup("GZIP"); + List<Path> inputPaths = new ArrayList<Path>() {{ + add(new Path(inputFiles.get(0).getFileName())); + }}; + testPruneSingleColumnTranslateCodec(inputPaths); + } + + @Test + public void testPruneSingleColumnTranslateCodecTwoFiles() throws Exception { + testMultipleInputFilesSetup(); + List<Path> inputPaths = new ArrayList<Path>() {{ + add(new Path(inputFiles.get(0).getFileName())); + add(new Path(inputFiles.get(1).getFileName())); + }}; + testPruneSingleColumnTranslateCodec(inputPaths); + } - Path inputPath = new Path(inputFile.getFileName()); + private void testPruneNullifyTranslateCodec(List<Path> inputPaths) throws Exception { Path outputPath = new Path(outputFile); List<String> pruneColumns = Arrays.asList("Gender"); Map<String, MaskMode> maskColumns = new HashMap<>(); maskColumns.put("Links.Forward", MaskMode.NULLIFY); - CompressionCodecName newCodec = CompressionCodecName.GZIP; - RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPath, outputPath); + CompressionCodecName newCodec = CompressionCodecName.ZSTD; + RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); RewriteOptions options = builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).build(); rewriter = new ParquetRewriter(options); @@ -163,10 +178,12 @@ public class ParquetRewriterTest { assertEquals(subFields.get(1).getName(), "Forward"); // Verify codec has been translated - verifyCodec(outputFile, newCodec, null); + verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{ + add(newCodec); + }}, null); // Verify the data are not changed for the columns not pruned - validateColumnData(outputFile, inputFile.getFileContent(), new HashSet<>(pruneColumns), maskColumns.keySet(), null); + validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(), null); // Verify the page index validatePageIndex(new HashMap<Integer, Integer>() {{ @@ -180,12 +197,27 @@ public class ParquetRewriterTest { } @Test - public void testPruneEncryptAndTranslateCodec() throws Exception { - testSetup("GZIP"); + public void testPruneNullifyTranslateCodecSingleFile() throws Exception { + testSingleInputFileSetup("GZIP"); + List<Path> inputPaths = new ArrayList<Path>() {{ + add(new Path(inputFiles.get(0).getFileName())); + }}; + testPruneNullifyTranslateCodec(inputPaths); + } + + @Test + public void testPruneNullifyTranslateCodecTwoFiles() throws Exception { + testMultipleInputFilesSetup(); + List<Path> inputPaths = new ArrayList<Path>() {{ + add(new Path(inputFiles.get(0).getFileName())); + add(new Path(inputFiles.get(1).getFileName())); + }}; + testPruneNullifyTranslateCodec(inputPaths); + } - Path inputPath = new Path(inputFile.getFileName()); + private void testPruneEncryptTranslateCodec(List<Path> inputPaths) throws Exception { Path outputPath = new Path(outputFile); - RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPath, outputPath); + RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); // Prune List<String> pruneColumns = Arrays.asList("Gender"); @@ -221,11 +253,12 @@ public class ParquetRewriterTest { // Verify codec has been translated FileDecryptionProperties fileDecryptionProperties = EncDecProperties.getFileDecryptionProperties(); - verifyCodec(outputFile, newCodec, fileDecryptionProperties); + verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{ + add(newCodec); + }}, fileDecryptionProperties); // Verify the data are not changed for the columns not pruned - validateColumnData(outputFile, - inputFile.getFileContent(), new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties); + validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(), fileDecryptionProperties); // Verify column encryption ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties); @@ -245,9 +278,25 @@ public class ParquetRewriterTest { } @Test - public void testNullifyAndEncryptColumn() throws Exception { - testSetup("GZIP"); + public void testPruneEncryptTranslateCodecSingleFile() throws Exception { + testSingleInputFileSetup("GZIP"); + List<Path> inputPaths = new ArrayList<Path>() {{ + add(new Path(inputFiles.get(0).getFileName())); + }}; + testPruneEncryptTranslateCodec(inputPaths); + } + + @Test + public void testPruneEncryptTranslateCodecTwoFiles() throws Exception { + testMultipleInputFilesSetup(); + List<Path> inputPaths = new ArrayList<Path>() {{ + add(new Path(inputFiles.get(0).getFileName())); + add(new Path(inputFiles.get(1).getFileName())); + }}; + testPruneEncryptTranslateCodec(inputPaths); + } + private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws Exception { Map<String, MaskMode> maskColumns = new HashMap<>(); maskColumns.put("DocId", MaskMode.NULLIFY); @@ -255,9 +304,9 @@ public class ParquetRewriterTest { FileEncryptionProperties fileEncryptionProperties = EncDecProperties.getFileEncryptionProperties( encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false); - Path inputPath = new Path(inputFile.getFileName()); Path outputPath = new Path(outputFile); - RewriteOptions options = new RewriteOptions.Builder(conf, inputPath, outputPath).mask(maskColumns) + RewriteOptions options = new RewriteOptions.Builder(conf, inputPaths, outputPath).mask(maskColumns) + .transform(CompressionCodecName.ZSTD) .encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties).build(); rewriter = new ParquetRewriter(options); @@ -267,12 +316,13 @@ public class ParquetRewriterTest { FileDecryptionProperties fileDecryptionProperties = EncDecProperties.getFileDecryptionProperties(); // Verify codec has not been changed - verifyCodec(outputFile, CompressionCodecName.GZIP, fileDecryptionProperties); + verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{ + add(CompressionCodecName.ZSTD); + }}, fileDecryptionProperties); // Verify the data are not changed for non-encrypted and non-masked columns. // Also make sure the masked column is nullified. - validateColumnData(outputFile, - inputFile.getFileContent(), Collections.emptySet(), maskColumns.keySet(), fileDecryptionProperties); + validateColumnData(Collections.emptySet(), maskColumns.keySet(), fileDecryptionProperties); // Verify the page index validatePageIndex(new HashMap<Integer, Integer>() {{ @@ -298,13 +348,132 @@ public class ParquetRewriterTest { } } - private void testSetup(String compression) throws IOException { + @Test + public void testNullifyEncryptSingleFile() throws Exception { + testSingleInputFileSetup("GZIP"); + List<Path> inputPaths = new ArrayList<Path>() {{ + add(new Path(inputFiles.get(0).getFileName())); + }}; + testNullifyAndEncryptColumn(inputPaths); + } + + @Test + public void testNullifyEncryptTwoFiles() throws Exception { + testMultipleInputFilesSetup(); + List<Path> inputPaths = new ArrayList<Path>() {{ + add(new Path(inputFiles.get(0).getFileName())); + add(new Path(inputFiles.get(1).getFileName())); + }}; + testNullifyAndEncryptColumn(inputPaths); + } + + @Test + public void testMergeTwoFilesOnly() throws Exception { + testMultipleInputFilesSetup(); + + // Only merge two files but do not change anything. + List<Path> inputPaths = new ArrayList<>(); + for (EncryptionTestFile inputFile : inputFiles) { + inputPaths.add(new Path(inputFile.getFileName())); + } + Path outputPath = new Path(outputFile); + RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); + RewriteOptions options = builder.build(); + + rewriter = new ParquetRewriter(options); + rewriter.processBlocks(); + rewriter.close(); + + // Verify the schema are not changed + ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER); + MessageType schema = pmd.getFileMetaData().getSchema(); + MessageType expectSchema = createSchema(); + assertEquals(expectSchema, schema); + + // Verify codec has not been translated + verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{ + add(CompressionCodecName.GZIP); + add(CompressionCodecName.UNCOMPRESSED); + }}, null); + + // Verify the merged data are not changed + validateColumnData(Collections.emptySet(), Collections.emptySet(), null); + + // Verify the page index + validatePageIndex(new HashMap<Integer, Integer>() {{ + put(0, 0); + put(1, 1); + put(2, 2); + put(3, 3); + put(4, 4); + }}); + + // Verify original.created.by is preserved + validateCreatedBy(); + } + + @Test(expected = InvalidSchemaException.class) + public void testMergeTwoFilesWithDifferentSchema() throws Exception { + MessageType schema1 = new MessageType("schema", + new PrimitiveType(OPTIONAL, INT64, "DocId"), + new PrimitiveType(REQUIRED, BINARY, "Name"), + new PrimitiveType(OPTIONAL, BINARY, "Gender"), + new GroupType(OPTIONAL, "Links", + new PrimitiveType(REPEATED, BINARY, "Backward"), + new PrimitiveType(REPEATED, BINARY, "Forward"))); + MessageType schema2 = new MessageType("schema", + new PrimitiveType(OPTIONAL, INT64, "DocId"), + new PrimitiveType(REQUIRED, BINARY, "Name"), + new PrimitiveType(OPTIONAL, BINARY, "Gender")); + inputFiles = Lists.newArrayList(); + inputFiles.add(new TestFileBuilder(conf, schema1) + .withNumRecord(numRecord) + .withCodec("UNCOMPRESSED") + .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) + .build()); + inputFiles.add(new TestFileBuilder(conf, schema2) + .withNumRecord(numRecord) + .withCodec("UNCOMPRESSED") + .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) + .build()); + outputFile = TestFileBuilder.createTempFile("test"); + + List<Path> inputPaths = new ArrayList<>(); + for (EncryptionTestFile inputFile : inputFiles) { + inputPaths.add(new Path(inputFile.getFileName())); + } + Path outputPath = new Path(outputFile); + RewriteOptions.Builder builder = new RewriteOptions.Builder(conf, inputPaths, outputPath); + RewriteOptions options = builder.build(); + + // This should throw an exception because the schemas are different + rewriter = new ParquetRewriter(options); + } + + private void testSingleInputFileSetup(String compression) throws IOException { MessageType schema = createSchema(); - inputFile = new TestFileBuilder(conf, schema) + inputFiles = Lists.newArrayList(); + inputFiles.add(new TestFileBuilder(conf, schema) .withNumRecord(numRecord) .withCodec(compression) .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) - .build(); + .build()); + outputFile = TestFileBuilder.createTempFile("test"); + } + + private void testMultipleInputFilesSetup() throws IOException { + MessageType schema = createSchema(); + inputFiles = Lists.newArrayList(); + inputFiles.add(new TestFileBuilder(conf, schema) + .withNumRecord(numRecord) + .withCodec("GZIP") + .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) + .build()); + inputFiles.add(new TestFileBuilder(conf, schema) + .withNumRecord(numRecord) + .withCodec("UNCOMPRESSED") + .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE) + .build()); outputFile = TestFileBuilder.createTempFile("test"); } @@ -318,39 +487,47 @@ public class ParquetRewriterTest { new PrimitiveType(REPEATED, BINARY, "Forward"))); } - private void validateColumnData(String file, - SimpleGroup[] fileContent, - Set<String> prunePaths, + private void validateColumnData(Set<String> prunePaths, Set<String> nullifiedPaths, FileDecryptionProperties fileDecryptionProperties) throws IOException { - ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(file)) + ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(outputFile)) .withConf(conf).withDecryption(fileDecryptionProperties).build(); - for (int i = 0; i < numRecord; i++) { + + // Get total number of rows from input files + int totalRows = 0; + for (EncryptionTestFile inputFile : inputFiles) { + totalRows += inputFile.getFileContent().length; + } + + for (int i = 0; i < totalRows; i++) { Group group = reader.read(); + assertNotNull(group); + + SimpleGroup expectGroup = inputFiles.get(i / numRecord).getFileContent()[i % numRecord]; if (!prunePaths.contains("DocId")) { if (nullifiedPaths.contains("DocId")) { assertThrows(RuntimeException.class, () -> group.getLong("DocId", 0)); } else { - assertEquals(group.getLong("DocId", 0), fileContent[i].getLong("DocId", 0)); + assertEquals(group.getLong("DocId", 0), expectGroup.getLong("DocId", 0)); } } if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) { assertArrayEquals(group.getBinary("Name", 0).getBytes(), - fileContent[i].getBinary("Name", 0).getBytes()); + expectGroup.getBinary("Name", 0).getBytes()); } if (!prunePaths.contains("Gender") && !nullifiedPaths.contains("Gender")) { assertArrayEquals(group.getBinary("Gender", 0).getBytes(), - fileContent[i].getBinary("Gender", 0).getBytes()); + expectGroup.getBinary("Gender", 0).getBytes()); } Group subGroup = group.getGroup("Links", 0); if (!prunePaths.contains("Links.Backward") && !nullifiedPaths.contains("Links.Backward")) { assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(), - fileContent[i].getGroup("Links", 0).getBinary("Backward", 0).getBytes()); + expectGroup.getGroup("Links", 0).getBinary("Backward", 0).getBytes()); } if (!prunePaths.contains("Links.Forward")) { @@ -358,7 +535,7 @@ public class ParquetRewriterTest { assertThrows(RuntimeException.class, () -> subGroup.getBinary("Forward", 0)); } else { assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), - fileContent[i].getGroup("Links", 0).getBinary("Forward", 0).getBytes()); + expectGroup.getGroup("Links", 0).getBinary("Forward", 0).getBytes()); } } } @@ -380,16 +557,18 @@ public class ParquetRewriterTest { } private void verifyCodec(String file, - CompressionCodecName codec, + Set<CompressionCodecName> expectedCodecs, FileDecryptionProperties fileDecryptionProperties) throws IOException { + Set<CompressionCodecName> codecs = new HashSet<>(); ParquetMetadata pmd = getFileMetaData(file, fileDecryptionProperties); for (int i = 0; i < pmd.getBlocks().size(); i++) { BlockMetaData block = pmd.getBlocks().get(i); for (int j = 0; j < block.getColumns().size(); ++j) { ColumnChunkMetaData columnChunkMetaData = block.getColumns().get(j); - assertEquals(columnChunkMetaData.getCodec(), codec); + codecs.add(columnChunkMetaData.getCodec()); } } + assertEquals(expectedCodecs, codecs); } /** @@ -398,18 +577,28 @@ public class ParquetRewriterTest { * @param outFileColumnMapping the column mapping from the output file to the input file. */ private void validatePageIndex(Map<Integer, Integer> outFileColumnMapping) throws Exception { - ParquetMetadata inMetaData = getFileMetaData(inputFile.getFileName(), null); ParquetMetadata outMetaData = getFileMetaData(outputFile, null); - assertEquals(inMetaData.getBlocks().size(), outMetaData.getBlocks().size()); - try (TransParquetFileReader inReader = new TransParquetFileReader( - HadoopInputFile.fromPath(new Path(inputFile.getFileName()), conf), HadoopReadOptions.builder(conf).build()); - TransParquetFileReader outReader = new TransParquetFileReader( - HadoopInputFile.fromPath(new Path(outputFile), conf), HadoopReadOptions.builder(conf).build())) { + int inputFileIndex = 0; + TransParquetFileReader inReader = new TransParquetFileReader(HadoopInputFile.fromPath( + new Path(inputFiles.get(inputFileIndex).getFileName()), conf), HadoopReadOptions.builder(conf).build()); + ParquetMetadata inMetaData = inReader.getFooter(); + + try (TransParquetFileReader outReader = new TransParquetFileReader( + HadoopInputFile.fromPath(new Path(outputFile), conf), HadoopReadOptions.builder(conf).build())) { + + for (int outBlockId = 0, inBlockId = 0; outBlockId < outMetaData.getBlocks().size(); ++outBlockId, ++inBlockId) { + // Refresh reader of input file + if (inBlockId == inMetaData.getBlocks().size()) { + inReader = new TransParquetFileReader( + HadoopInputFile.fromPath(new Path(inputFiles.get(++inputFileIndex).getFileName()), conf), + HadoopReadOptions.builder(conf).build()); + inMetaData = inReader.getFooter(); + inBlockId = 0; + } - for (int i = 0; i < inMetaData.getBlocks().size(); i++) { - BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(i); - BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(i); + BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(inBlockId); + BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(outBlockId); for (int j = 0; j < outBlockMetaData.getColumns().size(); j++) { if (!outFileColumnMapping.containsKey(j)) { @@ -484,15 +673,25 @@ public class ParquetRewriterTest { } private void validateCreatedBy() throws Exception { - FileMetaData inFMD = getFileMetaData(inputFile.getFileName(), null).getFileMetaData(); - FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData(); + Set<String> createdBySet = new HashSet<>(); + for (EncryptionTestFile inputFile : inputFiles) { + ParquetMetadata pmd = getFileMetaData(inputFile.getFileName(), null); + createdBySet.add(pmd.getFileMetaData().getCreatedBy()); + assertNull(pmd.getFileMetaData().getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY)); + } - assertEquals(inFMD.getCreatedBy(), outFMD.getCreatedBy()); - assertNull(inFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY)); + // Verify created_by from input files have been deduplicated + Object[] inputCreatedBys = createdBySet.toArray(); + assertEquals(1, inputCreatedBys.length); + + // Verify created_by has been set + FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData(); + String inputCreatedBy = (String) inputCreatedBys[0]; + assertEquals(inputCreatedBy, outFMD.getCreatedBy()); + // Verify original.created.by has been set String originalCreatedBy = outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY); - assertNotNull(originalCreatedBy); - assertEquals(inFMD.getCreatedBy(), originalCreatedBy); + assertEquals(inputCreatedBy, originalCreatedBy); } }