This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 720aa3d3c PARQUET-2228: ParquetRewriter supports more than one input
file (#1026)
720aa3d3c is described below
commit 720aa3d3c1ce1ce5c5e2b7715cd8c5f11bdc8ede
Author: Gang Wu <[email protected]>
AuthorDate: Tue Feb 21 18:43:09 2023 +0800
PARQUET-2228: ParquetRewriter supports more than one input file (#1026)
---
.../parquet/hadoop/rewrite/ParquetRewriter.java | 112 ++++++--
.../parquet/hadoop/rewrite/RewriteOptions.java | 88 +++++-
.../hadoop/rewrite/ParquetRewriterTest.java | 319 +++++++++++++++++----
3 files changed, 433 insertions(+), 86 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index e5befa99d..834de8340 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -45,7 +45,6 @@ import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ColumnChunkPageWriteStore;
-import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -63,6 +62,7 @@ import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
@@ -74,15 +74,16 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
import static
org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH;
import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
-import static
org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
@@ -93,41 +94,49 @@ public class ParquetRewriter implements Closeable {
private static final Logger LOG =
LoggerFactory.getLogger(ParquetRewriter.class);
private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
private final byte[] pageBuffer = new byte[pageBufferSize];
- private TransParquetFileReader reader;
- private ParquetFileWriter writer;
- private ParquetMetadata meta;
- private MessageType schema;
- private String originalCreatedBy;
+ // Configurations for the new file
private CompressionCodecName newCodecName = null;
private List<String> pruneColumns = null;
private Map<ColumnPath, MaskMode> maskColumns = null;
private Set<ColumnPath> encryptColumns = null;
private boolean encryptMode = false;
private Map<String, String> extraMetaData = new HashMap<>();
+ // Writer to rewrite the input files
+ private ParquetFileWriter writer;
+ // Number of blocks written which is used to keep track of the actual row
group ordinal
+ private int numBlocksRewritten = 0;
+ // Reader and relevant states of the in-processing input file
+ private Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
+ // Schema of input files (should be the same) and to write to the output file
+ private MessageType schema = null;
+ // The reader for the current input file
+ private TransParquetFileReader reader = null;
+ // The metadata of current reader being processed
+ private ParquetMetadata meta = null;
+ // created_by information of current reader being processed
+ private String originalCreatedBy = "";
+ // Unique created_by information from all input files
+ private Set<String> allOriginalCreatedBys = new HashSet<>();
public ParquetRewriter(RewriteOptions options) throws IOException {
- Preconditions.checkArgument(options.getInputFiles().size() == 1, "Only
support one input file");
- Path inPath = options.getInputFiles().get(0);
- Path outPath = options.getOutputFile();
Configuration conf = options.getConf();
+ Path outPath = options.getOutputFile();
+ openInputFiles(options.getInputFiles(), conf);
+ LOG.info("Start rewriting {} input files {} to {}", inputFiles.size(),
outPath);
+
+ // Init reader of the first input file
+ initNextReader();
newCodecName = options.getNewCodecName();
pruneColumns = options.getPruneColumns();
- // Get file metadata and full schema from the input file
- meta = ParquetFileReader.readFooter(conf, inPath, NO_FILTER);
- schema = meta.getFileMetaData().getSchema();
- originalCreatedBy = meta.getFileMetaData().getCreatedBy();
- extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
- extraMetaData.put(ORIGINAL_CREATED_BY_KEY, originalCreatedBy);
-
// Prune columns if specified
if (pruneColumns != null && !pruneColumns.isEmpty()) {
List<String> paths = new ArrayList<>();
getPaths(schema, paths, null);
for (String col : pruneColumns) {
if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, inPath.getName());
+ LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, reader.getFile());
}
}
@@ -147,9 +156,6 @@ public class ParquetRewriter implements Closeable {
this.encryptMode = true;
}
- reader = new TransParquetFileReader(
- HadoopInputFile.fromPath(inPath, conf),
HadoopReadOptions.builder(conf).build());
-
ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf),
schema, writerMode,
DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT,
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
@@ -183,12 +189,69 @@ public class ParquetRewriter implements Closeable {
}
}
+ // Open all input files to validate their schemas are compatible to merge
+ private void openInputFiles(List<Path> inputFiles, Configuration conf) {
+ Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(),
"No input files");
+
+ for (Path inputFile : inputFiles) {
+ try {
+ TransParquetFileReader reader = new TransParquetFileReader(
+ HadoopInputFile.fromPath(inputFile, conf),
HadoopReadOptions.builder(conf).build());
+ MessageType inputFileSchema =
reader.getFooter().getFileMetaData().getSchema();
+ if (this.schema == null) {
+ this.schema = inputFileSchema;
+ } else {
+ // Now we enforce equality of schemas from input files for
simplicity.
+ if (!this.schema.equals(inputFileSchema)) {
+ LOG.error("Input files have different schemas, expect: {}, input:
{}, current file: {}",
+ this.schema, inputFileSchema, inputFile);
+ throw new InvalidSchemaException("Input files have different
schemas, current file: " + inputFile);
+ }
+ }
+
this.allOriginalCreatedBys.add(reader.getFooter().getFileMetaData().getCreatedBy());
+ this.inputFiles.add(reader);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to open input file: " +
inputFile, e);
+ }
+ }
+
+ extraMetaData.put(ORIGINAL_CREATED_BY_KEY, String.join("\n",
allOriginalCreatedBys));
+ }
+
+ // Routines to get reader of next input file and set up relevant states
+ private void initNextReader() {
+ if (reader != null) {
+ LOG.info("Finish rewriting input file: {}", reader.getFile());
+ }
+
+ if (inputFiles.isEmpty()) {
+ reader = null;
+ meta = null;
+ originalCreatedBy = null;
+ return;
+ }
+
+ reader = inputFiles.poll();
+ meta = reader.getFooter();
+ originalCreatedBy = meta.getFileMetaData().getCreatedBy();
+ extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
+
+ LOG.info("Rewriting input file: {}, remaining files: {}",
reader.getFile(), inputFiles.size());
+ }
+
@Override
public void close() throws IOException {
writer.end(extraMetaData);
}
public void processBlocks() throws IOException {
+ while (reader != null) {
+ processBlocksFromReader();
+ initNextReader();
+ }
+ }
+
+ private void processBlocksFromReader() throws IOException {
PageReadStore store = reader.readNextRowGroup();
ColumnReadStoreImpl crStore = new ColumnReadStoreImpl(store, new
DummyGroupConverter(), schema, originalCreatedBy);
Map<ColumnPath, ColumnDescriptor> descriptorsMap =
schema.getColumns().stream().collect(
@@ -236,7 +299,6 @@ public class ParquetRewriter implements Closeable {
writer,
schema,
newCodecName,
- blockId,
encryptColumn);
} else {
throw new UnsupportedOperationException("Only nullify is supported
for now");
@@ -246,7 +308,7 @@ public class ParquetRewriter implements Closeable {
ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
if (encryptMode) {
columnChunkEncryptorRunTime =
- new ColumnChunkEncryptorRunTime(writer.getEncryptor(),
chunk, blockId, columnId);
+ new ColumnChunkEncryptorRunTime(writer.getEncryptor(),
chunk, numBlocksRewritten, columnId);
}
// Translate compression and/or encryption
@@ -267,6 +329,7 @@ public class ParquetRewriter implements Closeable {
writer.endBlock();
store = reader.readNextRowGroup();
blockId++;
+ numBlocksRewritten++;
}
}
@@ -571,7 +634,6 @@ public class ParquetRewriter implements Closeable {
ParquetFileWriter writer,
MessageType schema,
CompressionCodecName newCodecName,
- int rowGroupOrdinal,
boolean encryptColumn) throws IOException {
if (encryptColumn) {
Preconditions.checkArgument(writer.getEncryptor() != null, "Missing
encryptor");
@@ -593,7 +655,7 @@ public class ParquetRewriter implements Closeable {
MessageType newSchema = newSchema(schema, descriptor);
ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
compressor, newSchema, props.getAllocator(),
props.getColumnIndexTruncateLength(),
- props.getPageWriteChecksumEnabled(), writer.getEncryptor(),
rowGroupOrdinal);
+ props.getPageWriteChecksumEnabled(), writer.getEncryptor(),
numBlocksRewritten);
ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
index e15ba43c7..cc1280921 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -28,7 +28,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
-// A set of options to create a ParquetRewriter.
+/**
+ * A set of options to create a ParquetRewriter.
+ */
public class RewriteOptions {
final Configuration conf;
@@ -101,37 +103,121 @@ public class RewriteOptions {
private List<String> encryptColumns;
private FileEncryptionProperties fileEncryptionProperties;
+ /**
+ * Create a builder to create a RewriterOptions.
+ *
+ * @param conf configuration for reading from input files and
writing to output file
+ * @param inputFile input file path to read from
+ * @param outputFile output file path to rewrite to
+ */
public Builder(Configuration conf, Path inputFile, Path outputFile) {
this.conf = conf;
this.inputFiles = Arrays.asList(inputFile);
this.outputFile = outputFile;
}
+ /**
+ * Create a builder to create a RewriterOptions.
+ * <p>
+ * Please note that if merging more than one file, the schema of all files
must be the same.
+ * Otherwise, the rewrite will fail.
+ * <p>
+ * The rewrite will keep original row groups from all input files. This
may not be optimal
+ * if row groups are very small and will not solve small file problems.
Instead, it will
+ * make it worse to have a large file footer in the output file.
+ * TODO: support rewrite by record to break the original row groups into
reasonable ones.
+ *
+ * @param conf configuration for reading from input files and
writing to output file
+ * @param inputFiles list of input file paths to read from
+ * @param outputFile output file path to rewrite to
+ */
+ public Builder(Configuration conf, List<Path> inputFiles, Path outputFile)
{
+ this.conf = conf;
+ this.inputFiles = inputFiles;
+ this.outputFile = outputFile;
+ }
+
+ /**
+ * Set the columns to prune.
+ * <p>
+ * By default, all columns are kept.
+ *
+ * @param columns list of columns to prune
+ * @return self
+ */
public Builder prune(List<String> columns) {
this.pruneColumns = columns;
return this;
}
+ /**
+ * Set the compression codec to use for the output file.
+ * <p>
+ * By default, the codec is the same as the input file.
+ *
+ * @param newCodecName compression codec to use
+ * @return self
+ */
public Builder transform(CompressionCodecName newCodecName) {
this.newCodecName = newCodecName;
return this;
}
+ /**
+ * Set the columns to mask.
+ * <p>
+ * By default, no columns are masked.
+ *
+ * @param maskColumns map of columns to mask to the masking mode
+ * @return self
+ */
public Builder mask(Map<String, MaskMode> maskColumns) {
this.maskColumns = maskColumns;
return this;
}
+ /**
+ * Set the columns to encrypt.
+ * <p>
+ * By default, no columns are encrypted.
+ *
+ * @param encryptColumns list of columns to encrypt
+ * @return self
+ */
public Builder encrypt(List<String> encryptColumns) {
this.encryptColumns = encryptColumns;
return this;
}
+ /**
+ * Set the encryption properties to use for the output file.
+ * <p>
+ * This is required if encrypting columns are not empty.
+ *
+ * @param fileEncryptionProperties encryption properties to use
+ * @return self
+ */
public Builder encryptionProperties(FileEncryptionProperties
fileEncryptionProperties) {
this.fileEncryptionProperties = fileEncryptionProperties;
return this;
}
+ /**
+ * Add an input file to read from.
+ *
+ * @param path input file path to read from
+ * @return self
+ */
+ public Builder addInputFile(Path path) {
+ this.inputFiles.add(path);
+ return this;
+ }
+
+ /**
+ * Build the RewriterOptions.
+ *
+ * @return a RewriterOptions
+ */
public RewriteOptions build() {
Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(),
"Input file is required");
Preconditions.checkArgument(outputFile != null, "Output file is
required");
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 25b4f3564..043261f77 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.parquet.hadoop.rewrite;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
@@ -50,6 +51,7 @@ import
org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
@@ -82,19 +84,15 @@ public class ParquetRewriterTest {
private final int numRecord = 100000;
private Configuration conf = new Configuration();
- private EncryptionTestFile inputFile = null;
+ private List<EncryptionTestFile> inputFiles = null;
private String outputFile = null;
private ParquetRewriter rewriter = null;
- @Test
- public void testPruneSingleColumnAndTranslateCodec() throws Exception {
- testSetup("GZIP");
-
- Path inputPath = new Path(inputFile.getFileName());
+ private void testPruneSingleColumnTranslateCodec(List<Path> inputPaths)
throws Exception {
Path outputPath = new Path(outputFile);
List<String> pruneColumns = Arrays.asList("Gender");
CompressionCodecName newCodec = CompressionCodecName.ZSTD;
- RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPath, outputPath);
+ RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
RewriteOptions options =
builder.prune(pruneColumns).transform(newCodec).build();
rewriter = new ParquetRewriter(options);
@@ -115,10 +113,12 @@ public class ParquetRewriterTest {
assertEquals(subFields.get(1).getName(), "Forward");
// Verify codec has been translated
- verifyCodec(outputFile, CompressionCodecName.ZSTD, null);
+ verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
+ add(CompressionCodecName.ZSTD);
+ }}, null);
// Verify the data are not changed for the columns not pruned
- validateColumnData(outputFile, inputFile.getFileContent(), new
HashSet<>(pruneColumns), Collections.emptySet(), null);
+ validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(),
null);
// Verify the page index
validatePageIndex(new HashMap<Integer, Integer>() {{
@@ -133,16 +133,31 @@ public class ParquetRewriterTest {
}
@Test
- public void testPruneNullifyAndTranslateCodec() throws Exception {
- testSetup("UNCOMPRESSED");
+ public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception
{
+ testSingleInputFileSetup("GZIP");
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new Path(inputFiles.get(0).getFileName()));
+ }};
+ testPruneSingleColumnTranslateCodec(inputPaths);
+ }
+
+ @Test
+ public void testPruneSingleColumnTranslateCodecTwoFiles() throws Exception {
+ testMultipleInputFilesSetup();
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new Path(inputFiles.get(0).getFileName()));
+ add(new Path(inputFiles.get(1).getFileName()));
+ }};
+ testPruneSingleColumnTranslateCodec(inputPaths);
+ }
- Path inputPath = new Path(inputFile.getFileName());
+ private void testPruneNullifyTranslateCodec(List<Path> inputPaths) throws
Exception {
Path outputPath = new Path(outputFile);
List<String> pruneColumns = Arrays.asList("Gender");
Map<String, MaskMode> maskColumns = new HashMap<>();
maskColumns.put("Links.Forward", MaskMode.NULLIFY);
- CompressionCodecName newCodec = CompressionCodecName.GZIP;
- RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPath, outputPath);
+ CompressionCodecName newCodec = CompressionCodecName.ZSTD;
+ RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
RewriteOptions options =
builder.prune(pruneColumns).mask(maskColumns).transform(newCodec).build();
rewriter = new ParquetRewriter(options);
@@ -163,10 +178,12 @@ public class ParquetRewriterTest {
assertEquals(subFields.get(1).getName(), "Forward");
// Verify codec has been translated
- verifyCodec(outputFile, newCodec, null);
+ verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
+ add(newCodec);
+ }}, null);
// Verify the data are not changed for the columns not pruned
- validateColumnData(outputFile, inputFile.getFileContent(), new
HashSet<>(pruneColumns), maskColumns.keySet(), null);
+ validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(),
null);
// Verify the page index
validatePageIndex(new HashMap<Integer, Integer>() {{
@@ -180,12 +197,27 @@ public class ParquetRewriterTest {
}
@Test
- public void testPruneEncryptAndTranslateCodec() throws Exception {
- testSetup("GZIP");
+ public void testPruneNullifyTranslateCodecSingleFile() throws Exception {
+ testSingleInputFileSetup("GZIP");
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new Path(inputFiles.get(0).getFileName()));
+ }};
+ testPruneNullifyTranslateCodec(inputPaths);
+ }
+
+ @Test
+ public void testPruneNullifyTranslateCodecTwoFiles() throws Exception {
+ testMultipleInputFilesSetup();
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new Path(inputFiles.get(0).getFileName()));
+ add(new Path(inputFiles.get(1).getFileName()));
+ }};
+ testPruneNullifyTranslateCodec(inputPaths);
+ }
- Path inputPath = new Path(inputFile.getFileName());
+ private void testPruneEncryptTranslateCodec(List<Path> inputPaths) throws
Exception {
Path outputPath = new Path(outputFile);
- RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPath, outputPath);
+ RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
// Prune
List<String> pruneColumns = Arrays.asList("Gender");
@@ -221,11 +253,12 @@ public class ParquetRewriterTest {
// Verify codec has been translated
FileDecryptionProperties fileDecryptionProperties =
EncDecProperties.getFileDecryptionProperties();
- verifyCodec(outputFile, newCodec, fileDecryptionProperties);
+ verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
+ add(newCodec);
+ }}, fileDecryptionProperties);
// Verify the data are not changed for the columns not pruned
- validateColumnData(outputFile,
- inputFile.getFileContent(), new HashSet<>(pruneColumns),
Collections.emptySet(), fileDecryptionProperties);
+ validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(),
fileDecryptionProperties);
// Verify column encryption
ParquetMetadata metaData = getFileMetaData(outputFile,
fileDecryptionProperties);
@@ -245,9 +278,25 @@ public class ParquetRewriterTest {
}
@Test
- public void testNullifyAndEncryptColumn() throws Exception {
- testSetup("GZIP");
+ public void testPruneEncryptTranslateCodecSingleFile() throws Exception {
+ testSingleInputFileSetup("GZIP");
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new Path(inputFiles.get(0).getFileName()));
+ }};
+ testPruneEncryptTranslateCodec(inputPaths);
+ }
+
+ @Test
+ public void testPruneEncryptTranslateCodecTwoFiles() throws Exception {
+ testMultipleInputFilesSetup();
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new Path(inputFiles.get(0).getFileName()));
+ add(new Path(inputFiles.get(1).getFileName()));
+ }};
+ testPruneEncryptTranslateCodec(inputPaths);
+ }
+ private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws
Exception {
Map<String, MaskMode> maskColumns = new HashMap<>();
maskColumns.put("DocId", MaskMode.NULLIFY);
@@ -255,9 +304,9 @@ public class ParquetRewriterTest {
FileEncryptionProperties fileEncryptionProperties =
EncDecProperties.getFileEncryptionProperties(
encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false);
- Path inputPath = new Path(inputFile.getFileName());
Path outputPath = new Path(outputFile);
- RewriteOptions options = new RewriteOptions.Builder(conf, inputPath,
outputPath).mask(maskColumns)
+ RewriteOptions options = new RewriteOptions.Builder(conf, inputPaths,
outputPath).mask(maskColumns)
+ .transform(CompressionCodecName.ZSTD)
.encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties).build();
rewriter = new ParquetRewriter(options);
@@ -267,12 +316,13 @@ public class ParquetRewriterTest {
FileDecryptionProperties fileDecryptionProperties =
EncDecProperties.getFileDecryptionProperties();
// Verify codec has not been changed
- verifyCodec(outputFile, CompressionCodecName.GZIP,
fileDecryptionProperties);
+ verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
+ add(CompressionCodecName.ZSTD);
+ }}, fileDecryptionProperties);
// Verify the data are not changed for non-encrypted and non-masked
columns.
// Also make sure the masked column is nullified.
- validateColumnData(outputFile,
- inputFile.getFileContent(), Collections.emptySet(),
maskColumns.keySet(), fileDecryptionProperties);
+ validateColumnData(Collections.emptySet(), maskColumns.keySet(),
fileDecryptionProperties);
// Verify the page index
validatePageIndex(new HashMap<Integer, Integer>() {{
@@ -298,13 +348,132 @@ public class ParquetRewriterTest {
}
}
- private void testSetup(String compression) throws IOException {
+ @Test
+ public void testNullifyEncryptSingleFile() throws Exception {
+ testSingleInputFileSetup("GZIP");
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new Path(inputFiles.get(0).getFileName()));
+ }};
+ testNullifyAndEncryptColumn(inputPaths);
+ }
+
+ @Test
+ public void testNullifyEncryptTwoFiles() throws Exception {
+ testMultipleInputFilesSetup();
+ List<Path> inputPaths = new ArrayList<Path>() {{
+ add(new Path(inputFiles.get(0).getFileName()));
+ add(new Path(inputFiles.get(1).getFileName()));
+ }};
+ testNullifyAndEncryptColumn(inputPaths);
+ }
+
+ @Test
+ public void testMergeTwoFilesOnly() throws Exception {
+ testMultipleInputFilesSetup();
+
+ // Only merge two files but do not change anything.
+ List<Path> inputPaths = new ArrayList<>();
+ for (EncryptionTestFile inputFile : inputFiles) {
+ inputPaths.add(new Path(inputFile.getFileName()));
+ }
+ Path outputPath = new Path(outputFile);
+ RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
+ RewriteOptions options = builder.build();
+
+ rewriter = new ParquetRewriter(options);
+ rewriter.processBlocks();
+ rewriter.close();
+
+ // Verify the schema are not changed
+ ParquetMetadata pmd = ParquetFileReader.readFooter(conf, new
Path(outputFile), ParquetMetadataConverter.NO_FILTER);
+ MessageType schema = pmd.getFileMetaData().getSchema();
+ MessageType expectSchema = createSchema();
+ assertEquals(expectSchema, schema);
+
+ // Verify codec has not been translated
+ verifyCodec(outputFile, new HashSet<CompressionCodecName>() {{
+ add(CompressionCodecName.GZIP);
+ add(CompressionCodecName.UNCOMPRESSED);
+ }}, null);
+
+ // Verify the merged data are not changed
+ validateColumnData(Collections.emptySet(), Collections.emptySet(), null);
+
+ // Verify the page index
+ validatePageIndex(new HashMap<Integer, Integer>() {{
+ put(0, 0);
+ put(1, 1);
+ put(2, 2);
+ put(3, 3);
+ put(4, 4);
+ }});
+
+ // Verify original.created.by is preserved
+ validateCreatedBy();
+ }
+
+ @Test(expected = InvalidSchemaException.class)
+ public void testMergeTwoFilesWithDifferentSchema() throws Exception {
+ MessageType schema1 = new MessageType("schema",
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(OPTIONAL, BINARY, "Gender"),
+ new GroupType(OPTIONAL, "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
+ MessageType schema2 = new MessageType("schema",
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(OPTIONAL, BINARY, "Gender"));
+ inputFiles = Lists.newArrayList();
+ inputFiles.add(new TestFileBuilder(conf, schema1)
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .build());
+ inputFiles.add(new TestFileBuilder(conf, schema2)
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .build());
+ outputFile = TestFileBuilder.createTempFile("test");
+
+ List<Path> inputPaths = new ArrayList<>();
+ for (EncryptionTestFile inputFile : inputFiles) {
+ inputPaths.add(new Path(inputFile.getFileName()));
+ }
+ Path outputPath = new Path(outputFile);
+ RewriteOptions.Builder builder = new RewriteOptions.Builder(conf,
inputPaths, outputPath);
+ RewriteOptions options = builder.build();
+
+ // This should throw an exception because the schemas are different
+ rewriter = new ParquetRewriter(options);
+ }
+
+ private void testSingleInputFileSetup(String compression) throws IOException
{
MessageType schema = createSchema();
- inputFile = new TestFileBuilder(conf, schema)
+ inputFiles = Lists.newArrayList();
+ inputFiles.add(new TestFileBuilder(conf, schema)
.withNumRecord(numRecord)
.withCodec(compression)
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
- .build();
+ .build());
+ outputFile = TestFileBuilder.createTempFile("test");
+ }
+
+ private void testMultipleInputFilesSetup() throws IOException {
+ MessageType schema = createSchema();
+ inputFiles = Lists.newArrayList();
+ inputFiles.add(new TestFileBuilder(conf, schema)
+ .withNumRecord(numRecord)
+ .withCodec("GZIP")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .build());
+ inputFiles.add(new TestFileBuilder(conf, schema)
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .build());
outputFile = TestFileBuilder.createTempFile("test");
}
@@ -318,39 +487,47 @@ public class ParquetRewriterTest {
new PrimitiveType(REPEATED, BINARY, "Forward")));
}
- private void validateColumnData(String file,
- SimpleGroup[] fileContent,
- Set<String> prunePaths,
+ private void validateColumnData(Set<String> prunePaths,
Set<String> nullifiedPaths,
FileDecryptionProperties
fileDecryptionProperties) throws IOException {
- ParquetReader<Group> reader = ParquetReader.builder(new
GroupReadSupport(), new Path(file))
+ ParquetReader<Group> reader = ParquetReader.builder(new
GroupReadSupport(), new Path(outputFile))
.withConf(conf).withDecryption(fileDecryptionProperties).build();
- for (int i = 0; i < numRecord; i++) {
+
+ // Get total number of rows from input files
+ int totalRows = 0;
+ for (EncryptionTestFile inputFile : inputFiles) {
+ totalRows += inputFile.getFileContent().length;
+ }
+
+ for (int i = 0; i < totalRows; i++) {
Group group = reader.read();
+ assertNotNull(group);
+
+ SimpleGroup expectGroup = inputFiles.get(i /
numRecord).getFileContent()[i % numRecord];
if (!prunePaths.contains("DocId")) {
if (nullifiedPaths.contains("DocId")) {
assertThrows(RuntimeException.class, () -> group.getLong("DocId",
0));
} else {
- assertEquals(group.getLong("DocId", 0),
fileContent[i].getLong("DocId", 0));
+ assertEquals(group.getLong("DocId", 0), expectGroup.getLong("DocId",
0));
}
}
if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) {
assertArrayEquals(group.getBinary("Name", 0).getBytes(),
- fileContent[i].getBinary("Name", 0).getBytes());
+ expectGroup.getBinary("Name", 0).getBytes());
}
if (!prunePaths.contains("Gender") &&
!nullifiedPaths.contains("Gender")) {
assertArrayEquals(group.getBinary("Gender", 0).getBytes(),
- fileContent[i].getBinary("Gender", 0).getBytes());
+ expectGroup.getBinary("Gender", 0).getBytes());
}
Group subGroup = group.getGroup("Links", 0);
if (!prunePaths.contains("Links.Backward") &&
!nullifiedPaths.contains("Links.Backward")) {
assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(),
- fileContent[i].getGroup("Links", 0).getBinary("Backward",
0).getBytes());
+ expectGroup.getGroup("Links", 0).getBinary("Backward",
0).getBytes());
}
if (!prunePaths.contains("Links.Forward")) {
@@ -358,7 +535,7 @@ public class ParquetRewriterTest {
assertThrows(RuntimeException.class, () ->
subGroup.getBinary("Forward", 0));
} else {
assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(),
- fileContent[i].getGroup("Links", 0).getBinary("Forward",
0).getBytes());
+ expectGroup.getGroup("Links", 0).getBinary("Forward",
0).getBytes());
}
}
}
@@ -380,16 +557,18 @@ public class ParquetRewriterTest {
}
private void verifyCodec(String file,
- CompressionCodecName codec,
+ Set<CompressionCodecName> expectedCodecs,
FileDecryptionProperties fileDecryptionProperties)
throws IOException {
+ Set<CompressionCodecName> codecs = new HashSet<>();
ParquetMetadata pmd = getFileMetaData(file, fileDecryptionProperties);
for (int i = 0; i < pmd.getBlocks().size(); i++) {
BlockMetaData block = pmd.getBlocks().get(i);
for (int j = 0; j < block.getColumns().size(); ++j) {
ColumnChunkMetaData columnChunkMetaData = block.getColumns().get(j);
- assertEquals(columnChunkMetaData.getCodec(), codec);
+ codecs.add(columnChunkMetaData.getCodec());
}
}
+ assertEquals(expectedCodecs, codecs);
}
/**
@@ -398,18 +577,28 @@ public class ParquetRewriterTest {
* @param outFileColumnMapping the column mapping from the output file to
the input file.
*/
private void validatePageIndex(Map<Integer, Integer> outFileColumnMapping)
throws Exception {
- ParquetMetadata inMetaData = getFileMetaData(inputFile.getFileName(),
null);
ParquetMetadata outMetaData = getFileMetaData(outputFile, null);
- assertEquals(inMetaData.getBlocks().size(),
outMetaData.getBlocks().size());
- try (TransParquetFileReader inReader = new TransParquetFileReader(
- HadoopInputFile.fromPath(new Path(inputFile.getFileName()), conf),
HadoopReadOptions.builder(conf).build());
- TransParquetFileReader outReader = new TransParquetFileReader(
- HadoopInputFile.fromPath(new Path(outputFile), conf),
HadoopReadOptions.builder(conf).build())) {
+ int inputFileIndex = 0;
+ TransParquetFileReader inReader = new
TransParquetFileReader(HadoopInputFile.fromPath(
+ new Path(inputFiles.get(inputFileIndex).getFileName()), conf),
HadoopReadOptions.builder(conf).build());
+ ParquetMetadata inMetaData = inReader.getFooter();
+
+ try (TransParquetFileReader outReader = new TransParquetFileReader(
+ HadoopInputFile.fromPath(new Path(outputFile), conf),
HadoopReadOptions.builder(conf).build())) {
+
+ for (int outBlockId = 0, inBlockId = 0; outBlockId <
outMetaData.getBlocks().size(); ++outBlockId, ++inBlockId) {
+ // Refresh reader of input file
+ if (inBlockId == inMetaData.getBlocks().size()) {
+ inReader = new TransParquetFileReader(
+ HadoopInputFile.fromPath(new
Path(inputFiles.get(++inputFileIndex).getFileName()), conf),
+ HadoopReadOptions.builder(conf).build());
+ inMetaData = inReader.getFooter();
+ inBlockId = 0;
+ }
- for (int i = 0; i < inMetaData.getBlocks().size(); i++) {
- BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(i);
- BlockMetaData outBlockMetaData = outMetaData.getBlocks().get(i);
+ BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(inBlockId);
+ BlockMetaData outBlockMetaData =
outMetaData.getBlocks().get(outBlockId);
for (int j = 0; j < outBlockMetaData.getColumns().size(); j++) {
if (!outFileColumnMapping.containsKey(j)) {
@@ -484,15 +673,25 @@ public class ParquetRewriterTest {
}
private void validateCreatedBy() throws Exception {
- FileMetaData inFMD = getFileMetaData(inputFile.getFileName(),
null).getFileMetaData();
- FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData();
+ Set<String> createdBySet = new HashSet<>();
+ for (EncryptionTestFile inputFile : inputFiles) {
+ ParquetMetadata pmd = getFileMetaData(inputFile.getFileName(), null);
+ createdBySet.add(pmd.getFileMetaData().getCreatedBy());
+
assertNull(pmd.getFileMetaData().getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY));
+ }
- assertEquals(inFMD.getCreatedBy(), outFMD.getCreatedBy());
-
assertNull(inFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY));
+ // Verify created_by from input files have been deduplicated
+ Object[] inputCreatedBys = createdBySet.toArray();
+ assertEquals(1, inputCreatedBys.length);
+
+ // Verify created_by has been set
+ FileMetaData outFMD = getFileMetaData(outputFile, null).getFileMetaData();
+ String inputCreatedBy = (String) inputCreatedBys[0];
+ assertEquals(inputCreatedBy, outFMD.getCreatedBy());
+ // Verify original.created.by has been set
String originalCreatedBy =
outFMD.getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY);
- assertNotNull(originalCreatedBy);
- assertEquals(inFMD.getCreatedBy(), originalCreatedBy);
+ assertEquals(inputCreatedBy, originalCreatedBy);
}
}