This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 08a4e7e62 PARQUET-2430: Add parquet joiner v2 (#1335)
08a4e7e62 is described below
commit 08a4e7e6279f8c3b8558bd294fee4489a96d0db1
Author: Max Konstantinov <[email protected]>
AuthorDate: Thu Sep 19 08:30:08 2024 -0700
PARQUET-2430: Add parquet joiner v2 (#1335)
---
.../parquet/hadoop/rewrite/ParquetRewriter.java | 486 +++++++++++++-------
.../parquet/hadoop/rewrite/RewriteOptions.java | 208 ++++++++-
.../hadoop/rewrite/ParquetRewriterTest.java | 489 +++++++++++++++------
3 files changed, 865 insertions(+), 318 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 092568ab1..9e106fc3c 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -29,12 +29,15 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
@@ -87,6 +90,40 @@ import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Rewrites multiple input files into a single output file.
+ * <p>
+ * Supported functionality:
+ * <ul>
+ * <li>Merging multiple files into a single one</li>
+ * <li>Applying column transformations</li>
+ * <li><i>Joining</i> with extra files with a different schema</li>
+ * </ul>
+ * <p>
+ * Note that the total number of row groups from all input files is preserved
in the output file.
+ * This may not be optimal if row groups are very small and will not solve
small file problems. Instead, it will
+ * make it worse to have a large file footer in the output file.
+ * <p>
+ * <h2>Merging multiple files into a single output files</h2>
+ * Use {@link RewriteOptions.Builder}'s constructor or methods to provide
<code>inputFiles</code>.
+ * Please note the schema of all <code>inputFiles</code> must be the same,
otherwise the rewrite will fail.
+ * <p>
+ * <h2>Applying column transformations</h2>
+ * Some supported column transformations: pruning, masking, encrypting,
changing a codec.
+ * See {@link RewriteOptions} and {@link RewriteOptions.Builder} for the full
list with description.
+ * <p>
+ * <h2><i>Joining</i> with extra files with a different schema</h2>
+ * Use {@link RewriteOptions.Builder}'s constructor or methods to provide
<code>inputFilesToJoin</code>.
+ * Please note the schema of all <code>inputFilesToJoin</code> must be the
same, otherwise the rewrite will fail.
+ * Requirements for a <i>joining</i> the main <code>inputFiles</code>(left)
and <code>inputFilesToJoin</code>(right):
+ * <ul>
+ * <li>the number of files might be different on the left and right,</li>
+ * <li>the schema of files inside of each group(left/right) must be the same,
but those two schemas not necessarily should be equal,</li>
+ * <li>the total number of row groups must be the same on the left and
right,</li>
+ * <li>the total number of rows must be the same on the left and right,</li>
+ * <li>the global ordering of rows must be the same on the left and right.</li>
+ * </ul>
+ */
public class ParquetRewriter implements Closeable {
// Key to store original writer version in the file key-value metadata
@@ -99,54 +136,39 @@ public class ParquetRewriter implements Closeable {
private Map<ColumnPath, MaskMode> maskColumns = null;
private Set<ColumnPath> encryptColumns = null;
private boolean encryptMode = false;
- private final Map<String, String> extraMetaData = new HashMap<>();
+ private final Map<String, String> extraMetaData;
// Writer to rewrite the input files
private final ParquetFileWriter writer;
// Number of blocks written which is used to keep track of the actual row
group ordinal
private int numBlocksRewritten = 0;
// Reader and relevant states of the in-processing input file
private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
- // Schema of input files (should be the same) and to write to the output file
- private MessageType schema = null;
- private final Map<ColumnPath, ColumnDescriptor> descriptorsMap;
- // The reader for the current input file
- private TransParquetFileReader reader = null;
- // The metadata of current reader being processed
- private ParquetMetadata meta = null;
- // created_by information of current reader being processed
- private String originalCreatedBy = "";
- // Unique created_by information from all input files
- private final Set<String> allOriginalCreatedBys = new HashSet<>();
+ private final Queue<TransParquetFileReader> inputFilesToJoin = new
LinkedList<>();
+ private MessageType outSchema;
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
+ private final boolean overwriteInputWithJoinColumns;
public ParquetRewriter(RewriteOptions options) throws IOException {
+ this.newCodecName = options.getNewCodecName();
+ this.indexCacheStrategy = options.getIndexCacheStrategy();
+ this.overwriteInputWithJoinColumns =
options.getOverwriteInputWithJoinColumns();
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
- openInputFiles(options.getParquetInputFiles(), conf);
- LOG.info("Start rewriting {} input file(s) {} to {}", inputFiles.size(),
options.getParquetInputFiles(), out);
-
- // Init reader of the first input file
- initNextReader();
-
- newCodecName = options.getNewCodecName();
- List<String> pruneColumns = options.getPruneColumns();
- // Prune columns if specified
- if (pruneColumns != null && !pruneColumns.isEmpty()) {
- List<String> paths = new ArrayList<>();
- getPaths(schema, paths, null);
- for (String col : pruneColumns) {
- if (!paths.contains(col)) {
- LOG.warn("Input column name {} doesn't show up in the schema of file
{}", col, reader.getFile());
- }
- }
-
- Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
- schema = pruneColumnsInSchema(schema, prunePaths);
- }
-
- this.descriptorsMap =
- schema.getColumns().stream().collect(Collectors.toMap(x ->
ColumnPath.get(x.getPath()), x -> x));
+ inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
+
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(),
conf));
+ ensureSameSchema(inputFiles);
+ ensureSameSchema(inputFilesToJoin);
+ ensureRowCount();
+ LOG.info(
+ "Start rewriting {} input file(s) {} to {}",
+ inputFiles.size() + inputFilesToJoin.size(),
+ Stream.concat(options.getParquetInputFiles().stream(),
options.getParquetInputFilesToJoin().stream())
+ .collect(Collectors.toList()),
+ out);
+
+ this.outSchema = pruneColumnsInSchema(getSchema(),
options.getPruneColumns());
+ this.extraMetaData = getExtraMetadata(options);
if (options.getMaskColumns() != null) {
this.maskColumns = new HashMap<>();
@@ -160,12 +182,10 @@ public class ParquetRewriter implements Closeable {
this.encryptMode = true;
}
- this.indexCacheStrategy = options.getIndexCacheStrategy();
-
ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
writer = new ParquetFileWriter(
out,
- schema,
+ outSchema,
writerMode,
DEFAULT_BLOCK_SIZE,
MAX_PADDING_SIZE_DEFAULT,
@@ -176,89 +196,133 @@ public class ParquetRewriter implements Closeable {
writer.start();
}
+ // TODO: Should we mark it as deprecated to encourage the main constructor
usage? it is also used only from
+ // deprecated classes atm
// Ctor for legacy CompressionConverter and ColumnMasker
public ParquetRewriter(
TransParquetFileReader reader,
ParquetFileWriter writer,
ParquetMetadata meta,
- MessageType schema,
+ MessageType outSchema,
String originalCreatedBy,
CompressionCodecName codecName,
List<String> maskColumns,
MaskMode maskMode) {
- this.reader = reader;
this.writer = writer;
- this.meta = meta;
- this.schema = schema;
- this.descriptorsMap =
- schema.getColumns().stream().collect(Collectors.toMap(x ->
ColumnPath.get(x.getPath()), x -> x));
+ this.outSchema = outSchema;
this.newCodecName = codecName;
- originalCreatedBy = originalCreatedBy == null ?
meta.getFileMetaData().getCreatedBy() : originalCreatedBy;
- extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
- extraMetaData.put(ORIGINAL_CREATED_BY_KEY, originalCreatedBy);
+ extraMetaData = new
HashMap<>(meta.getFileMetaData().getKeyValueMetaData());
+ extraMetaData.put(
+ ORIGINAL_CREATED_BY_KEY,
+ originalCreatedBy != null
+ ? originalCreatedBy
+ : meta.getFileMetaData().getCreatedBy());
if (maskColumns != null && maskMode != null) {
this.maskColumns = new HashMap<>();
for (String col : maskColumns) {
this.maskColumns.put(ColumnPath.fromDotString(col), maskMode);
}
}
+ this.inputFiles.add(reader);
this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
+ this.overwriteInputWithJoinColumns = false;
}
- // Open all input files to validate their schemas are compatible to merge
- private void openInputFiles(List<InputFile> inputFiles, ParquetConfiguration
conf) {
- Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(),
"No input files");
+ private MessageType getSchema() {
+ MessageType schemaMain =
inputFiles.peek().getFooter().getFileMetaData().getSchema();
+ if (inputFilesToJoin.isEmpty()) {
+ return schemaMain;
+ } else {
+ Map<String, Type> fieldNames = new LinkedHashMap<>();
+ schemaMain.getFields().forEach(x -> fieldNames.put(x.getName(), x));
+ inputFilesToJoin
+ .peek()
+ .getFooter()
+ .getFileMetaData()
+ .getSchema()
+ .getFields()
+ .forEach(x -> {
+ if (!fieldNames.containsKey(x.getName())) {
+ fieldNames.put(x.getName(), x);
+ } else if (overwriteInputWithJoinColumns) {
+ LOG.info("Column {} in inputFiles is overwritten by
inputFilesToJoin side", x.getName());
+ fieldNames.put(x.getName(), x);
+ }
+ });
+ return new MessageType(schemaMain.getName(), new
ArrayList<>(fieldNames.values()));
+ }
+ }
+ private Map<String, String> getExtraMetadata(RewriteOptions options) {
+ List<TransParquetFileReader> allFiles;
+ if (options.getIgnoreJoinFilesMetadata()) {
+ allFiles = new ArrayList<>(inputFiles);
+ } else {
+ allFiles = Stream.concat(inputFiles.stream(), inputFilesToJoin.stream())
+ .collect(Collectors.toList());
+ }
+ Map<String, String> result = new HashMap<>();
+ result.put(
+ ORIGINAL_CREATED_BY_KEY,
+ allFiles.stream()
+ .map(x -> x.getFooter().getFileMetaData().getCreatedBy())
+ .collect(Collectors.toSet())
+ .stream()
+ .reduce((a, b) -> a + "\n" + b)
+ .orElse(""));
+ allFiles.forEach(x ->
result.putAll(x.getFileMetaData().getKeyValueMetaData()));
+ return result;
+ }
+
+ private void ensureRowCount() {
+ if (!inputFilesToJoin.isEmpty()) {
+ List<Long> blocksRowCountsL = inputFiles.stream()
+ .flatMap(x ->
x.getFooter().getBlocks().stream().map(BlockMetaData::getRowCount))
+ .collect(Collectors.toList());
+ List<Long> blocksRowCountsR = inputFilesToJoin.stream()
+ .flatMap(x ->
x.getFooter().getBlocks().stream().map(BlockMetaData::getRowCount))
+ .collect(Collectors.toList());
+ if (!blocksRowCountsL.equals(blocksRowCountsR)) {
+ throw new IllegalArgumentException(
+ "The number of rows in each block must match! Left blocks row
counts: " + blocksRowCountsL
+ + ", right blocks row counts" + blocksRowCountsR + ".");
+ }
+ }
+ }
+
+ private Queue<TransParquetFileReader> getFileReaders(List<InputFile>
inputFiles, ParquetConfiguration conf) {
+ LinkedList<TransParquetFileReader> inputFileReaders = new LinkedList<>();
for (InputFile inputFile : inputFiles) {
try {
TransParquetFileReader reader = new TransParquetFileReader(
inputFile, ParquetReadOptions.builder(conf).build());
- MessageType inputFileSchema =
- reader.getFooter().getFileMetaData().getSchema();
- if (this.schema == null) {
- this.schema = inputFileSchema;
- } else {
- // Now we enforce equality of schemas from input files for
simplicity.
- if (!this.schema.equals(inputFileSchema)) {
- LOG.error(
- "Input files have different schemas, expect: {}, input: {},
current file: {}",
- this.schema,
- inputFileSchema,
- inputFile);
- throw new InvalidSchemaException(
- "Input files have different schemas, current file: " +
inputFile);
- }
- }
- this.allOriginalCreatedBys.add(
- reader.getFooter().getFileMetaData().getCreatedBy());
- this.inputFiles.add(reader);
+ inputFileReaders.add(reader);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to open input file: " +
inputFile, e);
}
}
-
- extraMetaData.put(ORIGINAL_CREATED_BY_KEY, String.join("\n",
allOriginalCreatedBys));
+ return inputFileReaders;
}
- // Routines to get reader of next input file and set up relevant states
- private void initNextReader() throws IOException {
- if (reader != null) {
- LOG.info("Finish rewriting input file: {}", reader.getFile());
- }
-
- if (inputFiles.isEmpty()) {
- reader = null;
- meta = null;
- originalCreatedBy = null;
- return;
+ private void ensureSameSchema(Queue<TransParquetFileReader>
inputFileReaders) {
+ MessageType schema = null;
+ for (TransParquetFileReader reader : inputFileReaders) {
+ MessageType newSchema = reader.getFooter().getFileMetaData().getSchema();
+ if (schema == null) {
+ schema = newSchema;
+ } else {
+ // Now we enforce equality of schemas from input files for simplicity.
+ if (!schema.equals(newSchema)) {
+ String file = reader.getFile();
+ LOG.error(
+ "Input files have different schemas, expect: {}, input: {},
current file: {}",
+ schema,
+ newSchema,
+ file);
+ throw new InvalidSchemaException("Input files have different
schemas, current file: " + file);
+ }
+ }
}
- if (reader != null) reader.close();
- reader = inputFiles.poll();
- meta = reader.getFooter();
- originalCreatedBy = meta.getFileMetaData().getCreatedBy();
- extraMetaData.putAll(meta.getFileMetaData().getKeyValueMetaData());
-
- LOG.info("Rewriting input file: {}, remaining files: {}",
reader.getFile(), inputFiles.size());
}
@Override
@@ -267,92 +331,156 @@ public class ParquetRewriter implements Closeable {
}
public void processBlocks() throws IOException {
- while (reader != null) {
- IndexCache indexCache = IndexCache.create(reader,
descriptorsMap.keySet(), indexCacheStrategy, true);
- processBlocksFromReader(indexCache);
- indexCache.clean();
- initNextReader();
- }
- }
-
- private void processBlocksFromReader(IndexCache indexCache) throws
IOException {
- for (int blockId = 0; blockId < meta.getBlocks().size(); blockId++) {
- BlockMetaData blockMetaData = meta.getBlocks().get(blockId);
- writer.startBlock(blockMetaData.getRowCount());
- indexCache.setBlockMetadata(blockMetaData);
- List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
- for (int i = 0, columnId = 0; i < columnsInOrder.size(); i++) {
- ColumnChunkMetaData chunk = columnsInOrder.get(i);
- ColumnDescriptor descriptor = descriptorsMap.get(chunk.getPath());
-
- // This column has been pruned.
- if (descriptor == null) {
- continue;
- }
-
- // If a column is encrypted, we simply throw exception.
- // Later we can add a feature to trans-encrypt it with different keys
- if (chunk.isEncrypted()) {
- throw new IOException("Column " + chunk.getPath().toDotString() + "
is already encrypted");
+ TransParquetFileReader readerToJoin = null;
+ IndexCache indexCacheToJoin = null;
+ int blockIdxToJoin = 0;
+ List<ColumnDescriptor> outColumns = outSchema.getColumns();
+
+ while (!inputFiles.isEmpty()) {
+ TransParquetFileReader reader = inputFiles.poll();
+ LOG.info("Rewriting input file: {}, remaining files: {}",
reader.getFile(), inputFiles.size());
+ ParquetMetadata meta = reader.getFooter();
+ Set<ColumnPath> columnPaths =
meta.getFileMetaData().getSchema().getColumns().stream()
+ .map(x -> ColumnPath.get(x.getPath()))
+ .collect(Collectors.toSet());
+ IndexCache indexCache = IndexCache.create(reader, columnPaths,
indexCacheStrategy, true);
+
+ for (int blockIdx = 0; blockIdx < meta.getBlocks().size(); blockIdx++) {
+ BlockMetaData blockMetaData = meta.getBlocks().get(blockIdx);
+ writer.startBlock(blockMetaData.getRowCount());
+ indexCache.setBlockMetadata(blockMetaData);
+ Map<ColumnPath, ColumnChunkMetaData> pathToChunk =
+ blockMetaData.getColumns().stream().collect(Collectors.toMap(x ->
x.getPath(), x -> x));
+
+ if (!inputFilesToJoin.isEmpty()) {
+ if (readerToJoin == null
+ || ++blockIdxToJoin
+ == readerToJoin.getFooter().getBlocks().size()) {
+ if (readerToJoin != null) readerToJoin.close();
+ blockIdxToJoin = 0;
+ readerToJoin = inputFilesToJoin.poll();
+ Set<ColumnPath> columnPathsToJoin =
+
readerToJoin.getFileMetaData().getSchema().getColumns().stream()
+ .map(x -> ColumnPath.get(x.getPath()))
+ .collect(Collectors.toSet());
+ if (indexCacheToJoin != null) {
+ indexCacheToJoin.clean();
+ }
+ indexCacheToJoin = IndexCache.create(readerToJoin,
columnPathsToJoin, indexCacheStrategy, true);
+ indexCacheToJoin.setBlockMetadata(
+ readerToJoin.getFooter().getBlocks().get(blockIdxToJoin));
+ } else {
+ blockIdxToJoin++;
+ indexCacheToJoin.setBlockMetadata(
+ readerToJoin.getFooter().getBlocks().get(blockIdxToJoin));
+ }
}
- reader.setStreamPosition(chunk.getStartingPos());
- CompressionCodecName newCodecName = this.newCodecName == null ?
chunk.getCodec() : this.newCodecName;
- boolean encryptColumn =
- encryptMode && encryptColumns != null &&
encryptColumns.contains(chunk.getPath());
-
- if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
- // Mask column and compress it again.
- MaskMode maskMode = maskColumns.get(chunk.getPath());
- if (maskMode.equals(MaskMode.NULLIFY)) {
- Type.Repetition repetition =
- descriptor.getPrimitiveType().getRepetition();
- if (repetition.equals(Type.Repetition.REQUIRED)) {
- throw new IOException("Required column ["
- + descriptor.getPrimitiveType().getName() + "] cannot be
nullified");
+ for (int outColumnIdx = 0; outColumnIdx < outColumns.size();
outColumnIdx++) {
+ ColumnPath colPath =
+ ColumnPath.get(outColumns.get(outColumnIdx).getPath());
+ if (readerToJoin != null) {
+ Optional<ColumnChunkMetaData> chunkToJoin =
+
readerToJoin.getFooter().getBlocks().get(blockIdxToJoin).getColumns().stream()
+ .filter(x -> x.getPath().equals(colPath))
+ .findFirst();
+ if (chunkToJoin.isPresent()
+ && (overwriteInputWithJoinColumns ||
!columnPaths.contains(colPath))) {
+ processBlock(
+ readerToJoin, blockIdxToJoin, outColumnIdx,
indexCacheToJoin, chunkToJoin.get());
+ } else {
+ processBlock(reader, blockIdx, outColumnIdx, indexCache,
pathToChunk.get(colPath));
}
- nullifyColumn(blockId, descriptor, chunk, writer, schema,
newCodecName, encryptColumn);
} else {
- throw new UnsupportedOperationException("Only nullify is supported
for now");
- }
- } else if (encryptMode || this.newCodecName != null) {
- // Prepare encryption context
- ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
- if (encryptMode) {
- columnChunkEncryptorRunTime = new ColumnChunkEncryptorRunTime(
- writer.getEncryptor(), chunk, numBlocksRewritten, columnId);
+ processBlock(reader, blockIdx, outColumnIdx, indexCache,
pathToChunk.get(colPath));
}
-
- // Translate compression and/or encryption
- writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
- processChunk(
- blockMetaData.getRowCount(),
- chunk,
- newCodecName,
- columnChunkEncryptorRunTime,
- encryptColumn,
- indexCache.getBloomFilter(chunk),
- indexCache.getColumnIndex(chunk),
- indexCache.getOffsetIndex(chunk));
- writer.endColumn();
- } else {
- // Nothing changed, simply copy the binary data.
- BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
- ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
- OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
- writer.appendColumnChunk(
- descriptor, reader.getStream(), chunk, bloomFilter, columnIndex,
offsetIndex);
}
- columnId++;
+ writer.endBlock();
+ indexCache.clean();
+ numBlocksRewritten++;
+ }
+
+ indexCache.clean();
+ LOG.info("Finish rewriting input file: {}", reader.getFile());
+ reader.close();
+ }
+ if (readerToJoin != null) readerToJoin.close();
+ }
+
+ private void processBlock(
+ TransParquetFileReader reader,
+ int blockIdx,
+ int outColumnIdx,
+ IndexCache indexCache,
+ ColumnChunkMetaData chunk)
+ throws IOException {
+ if (chunk.isEncrypted()) {
+ throw new IOException("Column " + chunk.getPath().toDotString() + " is
already encrypted");
+ }
+ ColumnDescriptor descriptor = outSchema.getColumns().get(outColumnIdx);
+ BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx);
+ String originalCreatedBy = reader.getFileMetaData().getCreatedBy();
+
+ reader.setStreamPosition(chunk.getStartingPos());
+ CompressionCodecName newCodecName = this.newCodecName == null ?
chunk.getCodec() : this.newCodecName;
+ boolean encryptColumn = encryptMode && encryptColumns != null &&
encryptColumns.contains(chunk.getPath());
+
+ if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
+ // Mask column and compress it again.
+ MaskMode maskMode = maskColumns.get(chunk.getPath());
+ if (maskMode.equals(MaskMode.NULLIFY)) {
+ Type.Repetition repetition =
descriptor.getPrimitiveType().getRepetition();
+ if (repetition.equals(Type.Repetition.REQUIRED)) {
+ throw new IOException(
+ "Required column [" + descriptor.getPrimitiveType().getName() +
"] cannot be nullified");
+ }
+ nullifyColumn(
+ reader,
+ blockIdx,
+ descriptor,
+ chunk,
+ writer,
+ outSchema,
+ newCodecName,
+ encryptColumn,
+ originalCreatedBy);
+ } else {
+ throw new UnsupportedOperationException("Only nullify is supported for
now");
+ }
+ } else if (encryptMode || this.newCodecName != null) {
+ // Prepare encryption context
+ ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
+ if (encryptMode) {
+ columnChunkEncryptorRunTime =
+ new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk,
numBlocksRewritten, outColumnIdx);
}
- writer.endBlock();
- numBlocksRewritten++;
+ // Translate compression and/or encryption
+ writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
+ processChunk(
+ reader,
+ blockMetaData.getRowCount(),
+ chunk,
+ newCodecName,
+ columnChunkEncryptorRunTime,
+ encryptColumn,
+ indexCache.getBloomFilter(chunk),
+ indexCache.getColumnIndex(chunk),
+ indexCache.getOffsetIndex(chunk),
+ originalCreatedBy);
+ writer.endColumn();
+ } else {
+ // Nothing changed, simply copy the binary data.
+ BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
+ ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
+ OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
+ writer.appendColumnChunk(descriptor, reader.getStream(), chunk,
bloomFilter, columnIndex, offsetIndex);
}
}
private void processChunk(
+ TransParquetFileReader reader,
long blockRowCount,
ColumnChunkMetaData chunk,
CompressionCodecName newCodecName,
@@ -360,7 +488,8 @@ public class ParquetRewriter implements Closeable {
boolean encryptColumn,
BloomFilter bloomFilter,
ColumnIndex columnIndex,
- OffsetIndex offsetIndex)
+ OffsetIndex offsetIndex,
+ String originalCreatedBy)
throws IOException {
CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
CompressionCodecFactory.BytesInputDecompressor decompressor = null;
@@ -663,11 +792,24 @@ public class ParquetRewriter implements Closeable {
}
}
- private MessageType pruneColumnsInSchema(MessageType schema, Set<ColumnPath>
prunePaths) {
- List<Type> fields = schema.getFields();
- List<String> currentPath = new ArrayList<>();
- List<Type> prunedFields = pruneColumnsInFields(fields, currentPath,
prunePaths);
- return new MessageType(schema.getName(), prunedFields);
+ private MessageType pruneColumnsInSchema(MessageType schema, List<String>
pruneColumns) {
+ if (pruneColumns == null || pruneColumns.isEmpty()) {
+ return schema;
+ } else {
+ List<String> paths = new ArrayList<>();
+ getPaths(schema, paths, null);
+ for (String col : pruneColumns) {
+ if (!paths.contains(col)) {
+ LOG.warn("Input column name {} doesn't show up in the schema", col);
+ }
+ }
+ Set<ColumnPath> prunePaths = convertToColumnPaths(pruneColumns);
+
+ List<Type> fields = schema.getFields();
+ List<String> currentPath = new ArrayList<>();
+ List<Type> prunedFields = pruneColumnsInFields(fields, currentPath,
prunePaths);
+ return new MessageType(schema.getName(), prunedFields);
+ }
}
private List<Type> pruneColumnsInFields(List<Type> fields, List<String>
currentPath, Set<ColumnPath> prunePaths) {
@@ -711,13 +853,15 @@ public class ParquetRewriter implements Closeable {
}
private void nullifyColumn(
+ TransParquetFileReader reader,
int blockIndex,
ColumnDescriptor descriptor,
ColumnChunkMetaData chunk,
ParquetFileWriter writer,
MessageType schema,
CompressionCodecName newCodecName,
- boolean encryptColumn)
+ boolean encryptColumn,
+ String originalCreatedBy)
throws IOException {
if (encryptColumn) {
Preconditions.checkArgument(writer.getEncryptor() != null, "Missing
encryptor");
@@ -813,7 +957,7 @@ public class ParquetRewriter implements Closeable {
} else {
Type tempField = extractField(field.asGroupType(), targetField);
if (tempField != null) {
- return tempField;
+ return new GroupType(candidate.getRepetition(), candidate.getName(),
tempField);
}
}
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
index 127af68dd..a69403f46 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -38,12 +38,13 @@ import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.OutputFile;
/**
- * A set of options to create a ParquetRewriter.
+ * A set of options to create a {@link ParquetRewriter}. See {@link
RewriteOptions.Builder} for options description.
*/
public class RewriteOptions {
private final ParquetConfiguration conf;
private final List<InputFile> inputFiles;
+ private final List<InputFile> inputFilesToJoin;
private final OutputFile outputFile;
private final List<String> pruneColumns;
private final CompressionCodecName newCodecName;
@@ -51,19 +52,25 @@ public class RewriteOptions {
private final List<String> encryptColumns;
private final FileEncryptionProperties fileEncryptionProperties;
private final IndexCache.CacheStrategy indexCacheStrategy;
+ private final boolean overwriteInputWithJoinColumns;
+ private final boolean ignoreJoinFilesMetadata;
private RewriteOptions(
ParquetConfiguration conf,
List<InputFile> inputFiles,
+ List<InputFile> inputFilesToJoin,
OutputFile outputFile,
List<String> pruneColumns,
CompressionCodecName newCodecName,
Map<String, MaskMode> maskColumns,
List<String> encryptColumns,
FileEncryptionProperties fileEncryptionProperties,
- IndexCache.CacheStrategy indexCacheStrategy) {
+ IndexCache.CacheStrategy indexCacheStrategy,
+ boolean overwriteInputWithJoinColumns,
+ boolean ignoreJoinFilesMetadata) {
this.conf = conf;
this.inputFiles = inputFiles;
+ this.inputFilesToJoin = inputFilesToJoin;
this.outputFile = outputFile;
this.pruneColumns = pruneColumns;
this.newCodecName = newCodecName;
@@ -71,6 +78,8 @@ public class RewriteOptions {
this.encryptColumns = encryptColumns;
this.fileEncryptionProperties = fileEncryptionProperties;
this.indexCacheStrategy = indexCacheStrategy;
+ this.overwriteInputWithJoinColumns = overwriteInputWithJoinColumns;
+ this.ignoreJoinFilesMetadata = ignoreJoinFilesMetadata;
}
/**
@@ -110,6 +119,26 @@ public class RewriteOptions {
.collect(Collectors.toList());
}
+ /**
+ * Gets the input {@link Path}s for the rewrite if they exist for all input
files to join,
+ * otherwise throws a {@link RuntimeException}.
+ *
+ * @return a {@link List} of the associated input {@link Path}s to join
+ */
+ public List<Path> getInputFilesToJoin() {
+ return inputFilesToJoin.stream()
+ .map(f -> {
+ if (f instanceof HadoopOutputFile) {
+ HadoopOutputFile hadoopOutputFile = (HadoopOutputFile) f;
+ return new Path(hadoopOutputFile.getPath());
+ } else {
+ throw new RuntimeException(
+ "The input files to join do not all have an associated Hadoop
Path.");
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
/**
* Gets the {@link InputFile}s for the rewrite.
*
@@ -119,6 +148,15 @@ public class RewriteOptions {
return inputFiles;
}
+ /**
+ * Gets the right {@link InputFile}s to join during the rewrite.
+ *
+ * @return a {@link List} of the associated {@link InputFile}s to join
+ */
+ public List<InputFile> getParquetInputFilesToJoin() {
+ return inputFilesToJoin;
+ }
+
/**
* Get the {@link Path} for the rewrite if it exists, otherwise throws a
{@link RuntimeException}.
*
@@ -166,10 +204,19 @@ public class RewriteOptions {
return indexCacheStrategy;
}
- // Builder to create a RewriterOptions.
+ public boolean getOverwriteInputWithJoinColumns() {
+ return overwriteInputWithJoinColumns;
+ }
+
+ public boolean getIgnoreJoinFilesMetadata() {
+ return ignoreJoinFilesMetadata;
+ }
+
+ /** Builder for {@link RewriteOptions} which is used for constructing {@link
ParquetRewriter}.*/
public static class Builder {
private final ParquetConfiguration conf;
private final List<InputFile> inputFiles;
+ private final List<InputFile> inputFilesToJoin;
private final OutputFile outputFile;
private List<String> pruneColumns;
private CompressionCodecName newCodecName;
@@ -177,6 +224,24 @@ public class RewriteOptions {
private List<String> encryptColumns;
private FileEncryptionProperties fileEncryptionProperties;
private IndexCache.CacheStrategy indexCacheStrategy =
IndexCache.CacheStrategy.NONE;
+ private boolean overwriteInputWithJoinColumns = false;
+ private boolean ignoreJoinFilesMetadata = false;
+
+ /**
+ * Create a builder to create a RewriterOptions.
+ *
+ * @param conf configuration for reading from input files and
writing to output file
+ * @param inputFile input file path to read from
+ * @param inputFileToJoin input join file path to read from
+ * @param outputFile output file path to rewrite to
+ */
+ public Builder(Configuration conf, Path inputFile, Path inputFileToJoin,
Path outputFile) {
+ this(
+ new HadoopParquetConfiguration(conf),
+ HadoopInputFile.fromPathUnchecked(inputFile, conf),
+ HadoopInputFile.fromPathUnchecked(inputFileToJoin, conf),
+ HadoopOutputFile.fromPathUnchecked(outputFile, conf));
+ }
/**
* Create a builder to create a RewriterOptions.
@@ -200,7 +265,20 @@ public class RewriteOptions {
* @param outputFile output file to rewrite to
*/
public Builder(ParquetConfiguration conf, InputFile inputFile, OutputFile
outputFile) {
- this(conf, Collections.singletonList(inputFile), outputFile);
+ this(conf, Collections.singletonList(inputFile), null, outputFile);
+ }
+
+ /**
+ * Create a builder to create a RewriterOptions.
+ *
+ * @param conf configuration for reading from input files and
writing to output file
+ * @param inputFile input file to read from
+ * @param inputFileToJoin input join file to read from
+ * @param outputFile output file to rewrite to
+ */
+ public Builder(
+ ParquetConfiguration conf, InputFile inputFile, InputFile
inputFileToJoin, OutputFile outputFile) {
+ this(conf, Collections.singletonList(inputFile),
Collections.singletonList(inputFileToJoin), outputFile);
}
/**
@@ -213,6 +291,8 @@ public class RewriteOptions {
* if row groups are very small and will not solve small file problems.
Instead, it will
* make it worse to have a large file footer in the output file.
* TODO: support rewrite by record to break the original row groups into
reasonable ones.
+ * <p>
+ * See {@link ParquetRewriter} for more details.
*
* @param conf configuration for reading from input files and
writing to output file
* @param inputFiles list of input file paths to read from
@@ -224,6 +304,7 @@ public class RewriteOptions {
for (Path inputFile : inputFiles) {
this.inputFiles.add(HadoopInputFile.fromPathUnchecked(inputFile,
conf));
}
+ this.inputFilesToJoin = new ArrayList<>();
this.outputFile = HadoopOutputFile.fromPathUnchecked(outputFile, conf);
}
@@ -237,6 +318,8 @@ public class RewriteOptions {
* if row groups are very small and will not solve small file problems.
Instead, it will
* make it worse to have a large file footer in the output file.
* TODO: support rewrite by record to break the original row groups into
reasonable ones.
+ * <p>
+ * See {@link ParquetRewriter} for more details.
*
* @param conf configuration for reading from input files and
writing to output file
* @param inputFiles list of input file paths to read from
@@ -245,6 +328,68 @@ public class RewriteOptions {
public Builder(ParquetConfiguration conf, List<InputFile> inputFiles,
OutputFile outputFile) {
this.conf = conf;
this.inputFiles = inputFiles;
+ this.inputFilesToJoin = new ArrayList<>();
+ this.outputFile = outputFile;
+ }
+
+ /**
+ * Create a builder to create a RewriterOptions.
+ * <p>
+ * Please note the schema of all files in each file group
<code>inputFiles</code> and <code>inputFilesToJoin</code>
+ * must be the same while those two schemas can be different in comparison
with each other.
+ * Otherwise, the rewrite will fail.
+ * <p>
+ * The rewrite will keep original row groups from all input files. This
may not be optimal
+ * if row groups are very small and will not solve small file problems.
Instead, it will
+ * make it worse to have a large file footer in the output file.
+ * TODO: support rewrite by record to break the original row groups into
reasonable ones.
+ * <p>
+ * See {@link ParquetRewriter} for more details.
+ *
+ * @param conf configuration for reading from input files
and writing to output file
+ * @param inputFiles list of input file paths to read from
+ * @param inputFilesToJoin list of input join file paths to read from
+ * @param outputFile output file path to rewrite to
+ */
+ public Builder(Configuration conf, List<Path> inputFiles, List<Path>
inputFilesToJoin, Path outputFile) {
+ this.conf = new HadoopParquetConfiguration(conf);
+ this.inputFiles = new ArrayList<>(inputFiles.size());
+ for (Path inputFile : inputFiles) {
+ this.inputFiles.add(HadoopInputFile.fromPathUnchecked(inputFile,
conf));
+ }
+ this.inputFilesToJoin = new ArrayList<>(inputFilesToJoin.size());
+ for (Path inputFile : inputFilesToJoin) {
+ this.inputFilesToJoin.add(HadoopInputFile.fromPathUnchecked(inputFile,
conf));
+ }
+ this.outputFile = HadoopOutputFile.fromPathUnchecked(outputFile, conf);
+ }
+
+ /**
+ * Create a builder to create a RewriterOptions.
+ * <p>
+ * Please note the schema of all files in each file group
<code>inputFiles</code> and <code>inputFilesToJoin</code>
+ * must be the same while those two schemas can be different in comparison
with each other.
+ * Otherwise, the rewrite will fail.
+ * <p>
+ * The rewrite will keep original row groups from all input files. This
may not be optimal
+ * if row groups are very small and will not solve small file problems.
Instead, it will
+ * make it worse to have a large file footer in the output file.
+ * <p>
+ * See {@link ParquetRewriter} for more details.
+ *
+ * @param conf configuration for reading from input files and
writing to output file
+ * @param inputFiles list of input file paths to read from
+ * @param inputFilesToJoin list of input join file paths to read from
+ * @param outputFile output file path to rewrite to
+ */
+ public Builder(
+ ParquetConfiguration conf,
+ List<InputFile> inputFiles,
+ List<InputFile> inputFilesToJoin,
+ OutputFile outputFile) {
+ this.conf = conf;
+ this.inputFiles = inputFiles;
+ this.inputFilesToJoin = inputFilesToJoin;
this.outputFile = outputFile;
}
@@ -325,6 +470,18 @@ public class RewriteOptions {
return this;
}
+ /**
+ * Add an input join file to read from.
+ *
+ * @param path input file path to read from
+ * @return self
+ */
+ public Builder addInputFileToJoinColumns(Path path) {
+ this.inputFilesToJoin.add(
+ HadoopInputFile.fromPathUnchecked(path,
ConfigurationUtil.createHadoopConfiguration(conf)));
+ return this;
+ }
+
/**
* Add an input file to read from.
*
@@ -336,6 +493,17 @@ public class RewriteOptions {
return this;
}
+ /**
+ * Add an input file to join.
+ *
+ * @param fileToJoin input file to join
+ * @return self
+ */
+ public Builder addInputFilesToJoin(InputFile fileToJoin) {
+ this.inputFilesToJoin.add(fileToJoin);
+ return this;
+ }
+
/**
* Set the index(ColumnIndex, Offset and BloomFilter) cache strategy.
* <p>
@@ -350,6 +518,33 @@ public class RewriteOptions {
return this;
}
+ /**
+ * Set a flag whether columns from join files need to overwrite columns
from the main input files.
+ * <p>
+ * By default, join files columns do not overwrite the main input file
columns.
+ *
+ * @param overwriteInputWithJoinColumns a flag if columns from join files
should overwrite columns
+ * from the main input files
+ * @return self
+ */
+ public Builder overwriteInputWithJoinColumns(boolean
overwriteInputWithJoinColumns) {
+ this.overwriteInputWithJoinColumns = overwriteInputWithJoinColumns;
+ return this;
+ }
+
+ /**
+ * Set a flag whether metadata from join files should be ignored.
+ * <p>
+ * By default, metadata is not ignored.
+ *
+ * @param ignoreJoinFilesMetadata a flag if metadata from join files
should be ignored
+ * @return self
+ */
+ public Builder ignoreJoinFilesMetadata(boolean ignoreJoinFilesMetadata) {
+ this.ignoreJoinFilesMetadata = ignoreJoinFilesMetadata;
+ return this;
+ }
+
/**
* Build the RewriterOptions.
*
@@ -390,13 +585,16 @@ public class RewriteOptions {
return new RewriteOptions(
conf,
inputFiles,
+ (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()),
outputFile,
pruneColumns,
newCodecName,
maskColumns,
encryptColumns,
fileEncryptionProperties,
- indexCacheStrategy);
+ indexCacheStrategy,
+ overwriteInputWithJoinColumns,
+ ignoreJoinFilesMetadata);
}
}
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 0573d4e33..ce612baec 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -33,6 +33,9 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
@@ -44,7 +47,10 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.HadoopReadOptions;
@@ -108,6 +114,7 @@ public class ParquetRewriterTest {
private final boolean usingHadoop;
private List<EncryptionTestFile> inputFiles = Lists.newArrayList();
+ private List<EncryptionTestFile> inputFilesToJoin = Lists.newArrayList();
private String outputFile = null;
private ParquetRewriter rewriter = null;
@@ -161,7 +168,7 @@ public class ParquetRewriterTest {
rewriter.close();
// Verify the schema is not changed for the columns not pruned
- validateSchema();
+ validateSchemaWithGenderColumnPruned(false);
// Verify codec has been translated
verifyCodec(
@@ -174,17 +181,10 @@ public class ParquetRewriterTest {
null);
// Verify the data are not changed for the columns not pruned
- validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(),
null);
+ validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(),
null, false);
// Verify the page index
- validatePageIndex(new HashMap<Integer, Integer>() {
- {
- put(0, 0);
- put(1, 1);
- put(2, 3);
- put(3, 4);
- }
- });
+ validatePageIndex(new HashSet<>(), false);
// Verify original.created.by is preserved
validateCreatedBy();
@@ -194,6 +194,7 @@ public class ParquetRewriterTest {
@Before
public void setUp() {
outputFile = TestFileBuilder.createTempFile("test");
+ inputFilesToJoin = new ArrayList<>();
}
@Test
@@ -238,7 +239,7 @@ public class ParquetRewriterTest {
rewriter.close();
// Verify the schema are not changed for the columns not pruned
- validateSchema();
+ validateSchemaWithGenderColumnPruned(false);
// Verify codec has been translated
verifyCodec(
@@ -251,16 +252,10 @@ public class ParquetRewriterTest {
null);
// Verify the data are not changed for the columns not pruned
- validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(),
null);
+ validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(),
null, false);
// Verify the page index
- validatePageIndex(new HashMap<Integer, Integer>() {
- {
- put(0, 0);
- put(1, 1);
- put(2, 3);
- }
- });
+ validatePageIndex(ImmutableSet.of("Links.Forward"), false);
// Verify original.created.by is preserved
validateCreatedBy();
@@ -318,7 +313,7 @@ public class ParquetRewriterTest {
rewriter.close();
// Verify the schema is not changed for the columns not pruned
- validateSchema();
+ validateSchemaWithGenderColumnPruned(false);
// Verify codec has been translated
FileDecryptionProperties fileDecryptionProperties =
EncDecProperties.getFileDecryptionProperties();
@@ -332,7 +327,7 @@ public class ParquetRewriterTest {
fileDecryptionProperties);
// Verify the data are not changed for the columns not pruned
- validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(),
fileDecryptionProperties);
+ validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(),
fileDecryptionProperties, false);
// Verify column encryption
ParquetMetadata metaData = getFileMetaData(outputFile,
fileDecryptionProperties);
@@ -493,17 +488,10 @@ public class ParquetRewriterTest {
// Verify the data are not changed for non-encrypted and non-masked
columns.
// Also make sure the masked column is nullified.
- validateColumnData(Collections.emptySet(), maskColumns.keySet(),
fileDecryptionProperties);
+ validateColumnData(Collections.emptySet(), maskColumns.keySet(),
fileDecryptionProperties, false);
// Verify the page index
- validatePageIndex(new HashMap<Integer, Integer>() {
- {
- put(1, 1);
- put(2, 2);
- put(3, 3);
- put(4, 4);
- }
- });
+ validatePageIndex(ImmutableSet.of("DocId"), false);
// Verify the column is encrypted
ParquetMetadata metaData = getFileMetaData(outputFile,
fileDecryptionProperties);
@@ -583,18 +571,10 @@ public class ParquetRewriterTest {
null);
// Verify the merged data are not changed
- validateColumnData(Collections.emptySet(), Collections.emptySet(), null);
+ validateColumnData(Collections.emptySet(), Collections.emptySet(), null,
false);
// Verify the page index
- validatePageIndex(new HashMap<Integer, Integer>() {
- {
- put(0, 0);
- put(1, 1);
- put(2, 2);
- put(3, 3);
- put(4, 4);
- }
- });
+ validatePageIndex(new HashSet<>(), false);
// Verify original.created.by is preserved
validateCreatedBy();
@@ -603,6 +583,15 @@ public class ParquetRewriterTest {
@Test(expected = InvalidSchemaException.class)
public void testMergeTwoFilesWithDifferentSchema() throws Exception {
+ testMergeTwoFilesWithDifferentSchemaSetup(true);
+ }
+
+ @Test(expected = InvalidSchemaException.class)
+ public void testMergeTwoFilesToJoinWithDifferentSchema() throws Exception {
+ testMergeTwoFilesWithDifferentSchemaSetup(false);
+ }
+
+ public void testMergeTwoFilesWithDifferentSchemaSetup(boolean
wrongSchemaInInputFile) throws Exception {
MessageType schema1 = new MessageType(
"schema",
new PrimitiveType(OPTIONAL, INT64, "DocId"),
@@ -625,18 +614,32 @@ public class ParquetRewriterTest {
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());
- inputFiles.add(new TestFileBuilder(conf, schema2)
+ inputFilesToJoin.add(new TestFileBuilder(conf, schema1)
.withNumRecord(numRecord)
.withCodec("UNCOMPRESSED")
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());
-
- List<Path> inputPaths = new ArrayList<>();
- for (EncryptionTestFile inputFile : inputFiles) {
- inputPaths.add(new Path(inputFile.getFileName()));
+ if (wrongSchemaInInputFile) {
+ inputFiles.add(new TestFileBuilder(conf, schema2)
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(writerVersion)
+ .build());
+ } else {
+ inputFilesToJoin.add(new TestFileBuilder(conf, schema2)
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(writerVersion)
+ .build());
}
- RewriteOptions.Builder builder = createBuilder(inputPaths);
+
+ RewriteOptions.Builder builder = createBuilder(
+ inputFiles.stream().map(x -> new
Path(x.getFileName())).collect(Collectors.toList()),
+ inputFilesToJoin.stream().map(x -> new
Path(x.getFileName())).collect(Collectors.toList()),
+ false);
RewriteOptions options =
builder.indexCacheStrategy(indexCacheStrategy).build();
// This should throw an exception because the schemas are different
@@ -733,6 +736,127 @@ public class ParquetRewriterTest {
.build());
}
+ @Test
+ public void testFilesToJoinHaveDifferentRowCount() throws Exception {
+ MessageType schema1 = new MessageType("schema", new
PrimitiveType(OPTIONAL, INT64, "DocId"));
+ MessageType schema2 = new MessageType("schema", new
PrimitiveType(REQUIRED, BINARY, "Name"));
+ inputFiles = ImmutableList.of(
+ new TestFileBuilder(conf, schema1).withNumRecord(numRecord).build());
+ inputFilesToJoin = ImmutableList.of(
+ new TestFileBuilder(conf, schema2).withNumRecord(numRecord /
2).build());
+ RewriteOptions.Builder builder = createBuilder(
+ inputFiles.stream().map(x -> new
Path(x.getFileName())).collect(Collectors.toList()),
+ inputFilesToJoin.stream().map(x -> new
Path(x.getFileName())).collect(Collectors.toList()),
+ true);
+ RewriteOptions options = builder.build();
+ try {
+ rewriter =
+ new ParquetRewriter(options); // This should throw an exception
because the row count is different
+ } catch (RuntimeException e) {
+ assertTrue(e.getMessage().contains("The number of rows in each block
must match"));
+ }
+ }
+
+ @Test
+ public void testOneInputFileManyInputFilesToJoinWithJoinColumnsOverwrite()
throws Exception {
+ testOneInputFileManyInputFilesToJoinSetup(true);
+ }
+
+ @Test
+ public void
testOneInputFileManyInputFilesToJoinWithoutJoinColumnsOverwrite() throws
Exception {
+ testOneInputFileManyInputFilesToJoinSetup(false);
+ }
+
+ public void testOneInputFileManyInputFilesToJoinSetup(boolean
joinColumnsOverwrite) throws Exception {
+ testOneInputFileManyInputFilesToJoinSetup();
+
+ String encryptColumn = "DocId";
+ String pruneColumn = "Gender";
+
+ FileEncryptionProperties fileEncryptionProperties =
EncDecProperties.getFileEncryptionProperties(
+ new String[] {encryptColumn}, ParquetCipher.AES_GCM_CTR_V1, false);
+ FileDecryptionProperties fileDecryptionProperties =
EncDecProperties.getFileDecryptionProperties();
+
+ List<Path> inputPathsL =
+ inputFiles.stream().map(x -> new
Path(x.getFileName())).collect(Collectors.toList());
+ List<Path> inputPathsR =
+ inputFilesToJoin.stream().map(y -> new
Path(y.getFileName())).collect(Collectors.toList());
+ List<String> pruneColumns = ImmutableList.of(pruneColumn);
+ Map<String, MaskMode> maskColumns = ImmutableMap.of(encryptColumn,
MaskMode.NULLIFY);
+ RewriteOptions options = createBuilder(inputPathsL, inputPathsR, true)
+ .prune(pruneColumns)
+ .mask(maskColumns)
+ .transform(CompressionCodecName.ZSTD)
+ .indexCacheStrategy(indexCacheStrategy)
+ .overwriteInputWithJoinColumns(joinColumnsOverwrite)
+ .encrypt(ImmutableList.of(encryptColumn))
+ .encryptionProperties(fileEncryptionProperties)
+ .build();
+
+ rewriter = new ParquetRewriter(options);
+ rewriter.processBlocks();
+ rewriter.close();
+
+ Map<ColumnPath, List<BloomFilter>> inputBloomFilters =
allInputBloomFilters();
+ Map<ColumnPath, List<BloomFilter>> outputBloomFilters =
allOutputBloomFilters(fileDecryptionProperties);
+ Set<ColumnPath> schemaRColumns = createSchemaToJoin().getColumns().stream()
+ .map(x -> ColumnPath.get(x.getPath()))
+ .collect(Collectors.toSet());
+ Set<ColumnPath> rBloomFilters = outputBloomFilters.keySet().stream()
+ .filter(schemaRColumns::contains)
+ .collect(Collectors.toSet());
+
+ // Verify column encryption
+ ParquetMetadata metaData = getFileMetaData(outputFile,
fileDecryptionProperties);
+ assertFalse(metaData.getBlocks().isEmpty());
+ List<ColumnChunkMetaData> columns =
metaData.getBlocks().get(0).getColumns();
+ Set<String> set = ImmutableSet.of(encryptColumn);
+ for (ColumnChunkMetaData column : columns) {
+ if (set.contains(column.getPath().toDotString())) {
+ assertTrue(column.isEncrypted());
+ } else {
+ assertFalse(column.isEncrypted());
+ }
+ }
+
+ validateColumnData(
+ new HashSet<>(pruneColumns),
+ maskColumns.keySet(),
+ fileDecryptionProperties,
+ joinColumnsOverwrite); // Verify data
+ validateSchemaWithGenderColumnPruned(true); // Verify schema
+ validateCreatedBy(); // Verify original.created.by
+ assertEquals(inputBloomFilters.keySet(), rBloomFilters); // Verify bloom
filters
+ verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.ZSTD),
fileDecryptionProperties); // Verify codec
+ validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite);
+ }
+
+ private void testOneInputFileManyInputFilesToJoinSetup() throws IOException {
+ inputFiles = Lists.newArrayList(new TestFileBuilder(conf, createSchema())
+ .withNumRecord(numRecord)
+ .withRowGroupSize(1 * 1024 * 1024)
+ .withCodec("GZIP")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(writerVersion)
+ .build());
+
+ List<Long> rowGroupRowCounts = ParquetFileReader.readFooter(
+ conf, new Path(inputFiles.get(0).getFileName()),
ParquetMetadataConverter.NO_FILTER)
+ .getBlocks()
+ .stream()
+ .map(BlockMetaData::getRowCount)
+ .collect(Collectors.toList());
+
+ for (long count : rowGroupRowCounts) {
+ inputFilesToJoin.add(new TestFileBuilder(conf, createSchemaToJoin())
+ .withNumRecord((int) count)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(writerVersion)
+ .build());
+ }
+ }
+
private MessageType createSchema() {
return new MessageType(
"schema",
@@ -748,52 +872,85 @@ public class ParquetRewriterTest {
new PrimitiveType(REPEATED, BINARY, "Forward")));
}
+ private MessageType createSchemaToJoin() {
+ return new MessageType(
+ "schema",
+ new PrimitiveType(REPEATED, FLOAT, "FloatFraction"),
+ new PrimitiveType(OPTIONAL, INT64, "Age"),
+ new GroupType(
+ OPTIONAL,
+ "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
+ }
+
private void validateColumnData(
- Set<String> prunePaths, Set<String> nullifiedPaths,
FileDecryptionProperties fileDecryptionProperties)
+ Set<String> prunePaths,
+ Set<String> nullifiedPaths,
+ FileDecryptionProperties fileDecryptionProperties,
+ Boolean joinColumnsOverwrite)
throws IOException {
ParquetReader<Group> reader = ParquetReader.builder(new
GroupReadSupport(), new Path(outputFile))
.withConf(conf)
.withDecryption(fileDecryptionProperties)
.build();
- // Get the total number of rows from input files
- int totalRows = 0;
- for (EncryptionTestFile inputFile : inputFiles) {
- totalRows += inputFile.getFileContent().length;
- }
+ List<SimpleGroup> filesMain = inputFiles.stream()
+ .flatMap(x -> Arrays.stream(x.getFileContent()))
+ .collect(Collectors.toList());
+ List<SimpleGroup> filesJoined = inputFilesToJoin.stream()
+ .flatMap(x -> Arrays.stream(x.getFileContent()))
+ .collect(Collectors.toList());
+ BiFunction<String, Integer, Group> groups = (name, rowIdx) -> {
+ if (!filesMain.get(0).getType().containsField(name)
+ || joinColumnsOverwrite
+ && !filesJoined.isEmpty()
+ && filesJoined.get(0).getType().containsField(name)) {
+ return filesJoined.get(rowIdx);
+ } else {
+ return filesMain.get(rowIdx);
+ }
+ };
+ int totalRows =
+ inputFiles.stream().mapToInt(x -> x.getFileContent().length).sum();
for (int i = 0; i < totalRows; i++) {
Group group = reader.read();
assertNotNull(group);
- SimpleGroup expectGroup = inputFiles.get(i /
numRecord).getFileContent()[i % numRecord];
-
if (!prunePaths.contains("DocId")) {
if (nullifiedPaths.contains("DocId")) {
assertThrows(RuntimeException.class, () -> group.getLong("DocId",
0));
} else {
- assertEquals(group.getLong("DocId", 0), expectGroup.getLong("DocId",
0));
+ assertEquals(
+ group.getLong("DocId", 0), groups.apply("DocId",
i).getLong("DocId", 0));
}
}
if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) {
assertArrayEquals(
group.getBinary("Name", 0).getBytes(),
- expectGroup.getBinary("Name", 0).getBytes());
+ groups.apply("Name", i).getBinary("Name", 0).getBytes());
}
if (!prunePaths.contains("Gender") &&
!nullifiedPaths.contains("Gender")) {
assertArrayEquals(
group.getBinary("Gender", 0).getBytes(),
- expectGroup.getBinary("Gender", 0).getBytes());
+ groups.apply("Gender", i).getBinary("Gender", 0).getBytes());
}
if (!prunePaths.contains("FloatFraction") &&
!nullifiedPaths.contains("FloatFraction")) {
- assertEquals(group.getFloat("FloatFraction", 0),
expectGroup.getFloat("FloatFraction", 0), 0);
+ assertEquals(
+ group.getFloat("FloatFraction", 0),
+ groups.apply("FloatFraction", i).getFloat("FloatFraction", 0),
+ 0);
}
if (!prunePaths.contains("DoubleFraction") &&
!nullifiedPaths.contains("DoubleFraction")) {
- assertEquals(group.getDouble("DoubleFraction", 0),
expectGroup.getDouble("DoubleFraction", 0), 0);
+ assertEquals(
+ group.getDouble("DoubleFraction", 0),
+ groups.apply("DoubleFraction", i).getDouble("DoubleFraction", 0),
+ 0);
}
Group subGroup = group.getGroup("Links", 0);
@@ -801,7 +958,7 @@ public class ParquetRewriterTest {
if (!prunePaths.contains("Links.Backward") &&
!nullifiedPaths.contains("Links.Backward")) {
assertArrayEquals(
subGroup.getBinary("Backward", 0).getBytes(),
- expectGroup
+ groups.apply("Links", i)
.getGroup("Links", 0)
.getBinary("Backward", 0)
.getBytes());
@@ -813,7 +970,7 @@ public class ParquetRewriterTest {
} else {
assertArrayEquals(
subGroup.getBinary("Forward", 0).getBytes(),
- expectGroup
+ groups.apply("Links", i)
.getGroup("Links", 0)
.getBinary("Forward", 0)
.getBytes());
@@ -852,77 +1009,107 @@ public class ParquetRewriterTest {
assertEquals(expectedCodecs, codecs);
}
+ @FunctionalInterface
+ interface CheckedFunction<T, R> {
+ R apply(T t) throws IOException;
+ }
+
/**
* Verify the page index is correct.
*
- * @param outFileColumnMapping the column mapping from the output file to
the input file.
+ * @param exclude the columns to exclude from comparison, for example
because they were nullified.
+ * @param joinColumnsOverwrite whether a join columns overwrote existing
overlapping columns.
*/
- private void validatePageIndex(Map<Integer, Integer> outFileColumnMapping)
throws Exception {
- ParquetMetadata outMetaData = getFileMetaData(outputFile, null);
-
- int inputFileIndex = 0;
- TransParquetFileReader inReader = new TransParquetFileReader(
- HadoopInputFile.fromPath(new
Path(inputFiles.get(inputFileIndex).getFileName()), conf),
- HadoopReadOptions.builder(conf).build());
- ParquetMetadata inMetaData = inReader.getFooter();
-
- try (TransParquetFileReader outReader = new TransParquetFileReader(
- HadoopInputFile.fromPath(new Path(outputFile), conf),
- HadoopReadOptions.builder(conf).build())) {
-
- for (int outBlockId = 0, inBlockId = 0;
- outBlockId < outMetaData.getBlocks().size();
- ++outBlockId, ++inBlockId) {
- // Refresh reader of input file
- if (inBlockId == inMetaData.getBlocks().size()) {
- inReader = new TransParquetFileReader(
- HadoopInputFile.fromPath(
- new Path(inputFiles.get(++inputFileIndex).getFileName()),
conf),
- HadoopReadOptions.builder(conf).build());
- inMetaData = inReader.getFooter();
- inBlockId = 0;
- }
-
- BlockMetaData inBlockMetaData = inMetaData.getBlocks().get(inBlockId);
- BlockMetaData outBlockMetaData =
outMetaData.getBlocks().get(outBlockId);
+ private void validatePageIndex(Set<String> exclude, boolean
joinColumnsOverwrite) throws Exception {
+ class BlockMeta {
+ final TransParquetFileReader reader;
+ final BlockMetaData blockMeta;
+ final Map<ColumnPath, ColumnChunkMetaData> colPathToMeta;
+
+ BlockMeta(
+ TransParquetFileReader reader,
+ BlockMetaData blockMeta,
+ Map<ColumnPath, ColumnChunkMetaData> colPathToMeta) {
+ this.reader = reader;
+ this.blockMeta = blockMeta;
+ this.colPathToMeta = colPathToMeta;
+ }
+ }
+ CheckedFunction<List<String>, List<BlockMeta>> blockMetaExtractor = files
-> {
+ List<BlockMeta> result = new ArrayList<>();
+ for (String inputFile : files) {
+ TransParquetFileReader reader = new TransParquetFileReader(
+ HadoopInputFile.fromPath(new Path(inputFile), conf),
+ HadoopReadOptions.builder(conf).build());
+ reader.getFooter()
+ .getBlocks()
+ .forEach(blockMetaData -> result.add(new BlockMeta(
+ reader,
+ blockMetaData,
+ blockMetaData.getColumns().stream()
+ .collect(
+ Collectors.toMap(ColumnChunkMetaData::getPath,
Function.identity())))));
+ }
+ return result;
+ };
- assertEquals(inBlockMetaData.getRowCount(),
outBlockMetaData.getRowCount());
+ List<BlockMeta> inBlocksMain = blockMetaExtractor.apply(
+
inputFiles.stream().map(EncryptionTestFile::getFileName).collect(Collectors.toList()));
+ List<BlockMeta> inBlocksJoined = blockMetaExtractor.apply(
+
inputFilesToJoin.stream().map(EncryptionTestFile::getFileName).collect(Collectors.toList()));
+ List<BlockMeta> outBlocks =
blockMetaExtractor.apply(ImmutableList.of(outputFile));
+ for (int blockIdx = 0; blockIdx < outBlocks.size(); blockIdx++) {
+ BlockMetaData outBlockMeta = outBlocks.get(blockIdx).blockMeta;
+ TransParquetFileReader outReader = outBlocks.get(blockIdx).reader;
+ for (ColumnChunkMetaData outChunk : outBlockMeta.getColumns()) {
+ if (exclude.contains(outChunk.getPath().toDotString())) continue;
+ TransParquetFileReader inReader;
+ BlockMetaData inBlockMeta;
+ ColumnChunkMetaData inChunk;
+ if
(!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())
+ || joinColumnsOverwrite
+ && !inBlocksJoined.isEmpty()
+ &&
inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())) {
+ inReader = inBlocksJoined.get(blockIdx).reader;
+ inBlockMeta = inBlocksJoined.get(blockIdx).blockMeta;
+ inChunk =
inBlocksJoined.get(blockIdx).colPathToMeta.get(outChunk.getPath());
+ } else {
+ inReader = inBlocksMain.get(blockIdx).reader;
+ inBlockMeta = inBlocksMain.get(blockIdx).blockMeta;
+ inChunk =
inBlocksMain.get(blockIdx).colPathToMeta.get(outChunk.getPath());
+ }
- for (int j = 0; j < outBlockMetaData.getColumns().size(); j++) {
- if (!outFileColumnMapping.containsKey(j)) {
- continue;
- }
- int columnIdFromInputFile = outFileColumnMapping.get(j);
- ColumnChunkMetaData inChunk =
inBlockMetaData.getColumns().get(columnIdFromInputFile);
- ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk);
- OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk);
- ColumnChunkMetaData outChunk = outBlockMetaData.getColumns().get(j);
- ColumnIndex outColumnIndex = outReader.readColumnIndex(outChunk);
- OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk);
- if (inColumnIndex != null) {
- assertEquals(inColumnIndex.getBoundaryOrder(),
outColumnIndex.getBoundaryOrder());
- assertEquals(inColumnIndex.getMaxValues(),
outColumnIndex.getMaxValues());
- assertEquals(inColumnIndex.getMinValues(),
outColumnIndex.getMinValues());
- assertEquals(inColumnIndex.getNullCounts(),
outColumnIndex.getNullCounts());
- }
- if (inOffsetIndex != null) {
- List<Long> inOffsets = getOffsets(inReader, inChunk);
- List<Long> outOffsets = getOffsets(outReader, outChunk);
- assertEquals(inOffsets.size(), outOffsets.size());
- assertEquals(inOffsets.size(), inOffsetIndex.getPageCount());
- assertEquals(inOffsetIndex.getPageCount(),
outOffsetIndex.getPageCount());
- for (int k = 0; k < inOffsetIndex.getPageCount(); k++) {
- assertEquals(inOffsetIndex.getFirstRowIndex(k),
outOffsetIndex.getFirstRowIndex(k));
- assertEquals(
- inOffsetIndex.getLastRowIndex(k,
inBlockMetaData.getRowCount()),
- outOffsetIndex.getLastRowIndex(k,
outBlockMetaData.getRowCount()));
- assertEquals(inOffsetIndex.getOffset(k), (long)
inOffsets.get(k));
- assertEquals(outOffsetIndex.getOffset(k), (long)
outOffsets.get(k));
- }
+ ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk);
+ OffsetIndex inOffsetIndex = inReader.readOffsetIndex(inChunk);
+ ColumnIndex outColumnIndex = outReader.readColumnIndex(outChunk);
+ OffsetIndex outOffsetIndex = outReader.readOffsetIndex(outChunk);
+ if (inColumnIndex != null) {
+ assertEquals(inColumnIndex.getBoundaryOrder(),
outColumnIndex.getBoundaryOrder());
+ assertEquals(inColumnIndex.getMaxValues(),
outColumnIndex.getMaxValues());
+ assertEquals(inColumnIndex.getMinValues(),
outColumnIndex.getMinValues());
+ assertEquals(inColumnIndex.getNullCounts(),
outColumnIndex.getNullCounts());
+ }
+ if (inOffsetIndex != null) {
+ List<Long> inOffsets = getOffsets(inReader, inChunk);
+ List<Long> outOffsets = getOffsets(outReader, outChunk);
+ assertEquals(inOffsets.size(), outOffsets.size());
+ assertEquals(inOffsets.size(), inOffsetIndex.getPageCount());
+ assertEquals(inOffsetIndex.getPageCount(),
outOffsetIndex.getPageCount());
+ for (int k = 0; k < inOffsetIndex.getPageCount(); k++) {
+ assertEquals(inOffsetIndex.getFirstRowIndex(k),
outOffsetIndex.getFirstRowIndex(k));
+ assertEquals(
+ inOffsetIndex.getLastRowIndex(k, inBlockMeta.getRowCount()),
+ outOffsetIndex.getLastRowIndex(k, outBlockMeta.getRowCount()));
+ assertEquals(inOffsetIndex.getOffset(k), (long) inOffsets.get(k));
+ assertEquals(outOffsetIndex.getOffset(k), (long)
outOffsets.get(k));
}
}
}
}
+
+ for (BlockMeta t3 : inBlocksMain) t3.reader.close();
+ for (BlockMeta t3 : inBlocksJoined) t3.reader.close();
+ for (BlockMeta t3 : outBlocks) t3.reader.close();
}
private List<Long> getOffsets(TransParquetFileReader reader,
ColumnChunkMetaData chunk) throws IOException {
@@ -963,7 +1150,9 @@ public class ParquetRewriterTest {
private void validateCreatedBy() throws Exception {
Set<String> createdBySet = new HashSet<>();
- for (EncryptionTestFile inputFile : inputFiles) {
+ List<EncryptionTestFile> inFiles =
+ Stream.concat(inputFiles.stream(),
inputFilesToJoin.stream()).collect(Collectors.toList());
+ for (EncryptionTestFile inputFile : inFiles) {
ParquetMetadata pmd = getFileMetaData(inputFile.getFileName(), null);
createdBySet.add(pmd.getFileMetaData().getCreatedBy());
assertNull(pmd.getFileMetaData().getKeyValueMetaData().get(ParquetRewriter.ORIGINAL_CREATED_BY_KEY));
@@ -1005,7 +1194,9 @@ public class ParquetRewriterTest {
private Map<ColumnPath, List<BloomFilter>> allInputBloomFilters() throws
Exception {
Map<ColumnPath, List<BloomFilter>> inputBloomFilters = new HashMap<>();
- for (EncryptionTestFile inputFile : inputFiles) {
+ List<EncryptionTestFile> files =
+ Stream.concat(inputFiles.stream(),
inputFilesToJoin.stream()).collect(Collectors.toList());
+ for (EncryptionTestFile inputFile : files) {
Map<ColumnPath, List<BloomFilter>> bloomFilters =
allBloomFilters(inputFile.getFileName(), null);
for (Map.Entry<ColumnPath, List<BloomFilter>> entry :
bloomFilters.entrySet()) {
List<BloomFilter> bloomFilterList =
inputBloomFilters.getOrDefault(entry.getKey(), new ArrayList<>());
@@ -1048,35 +1239,49 @@ public class ParquetRewriterTest {
}
private RewriteOptions.Builder createBuilder(List<Path> inputPaths) throws
IOException {
+ return createBuilder(inputPaths, new ArrayList<>(), false);
+ }
+
+ private RewriteOptions.Builder createBuilder(
+ List<Path> inputPathsL, List<Path> inputPathsR, boolean
overwriteInputWithJoinColumns) throws IOException {
RewriteOptions.Builder builder;
if (usingHadoop) {
Path outputPath = new Path(outputFile);
- builder = new RewriteOptions.Builder(conf, inputPaths, outputPath);
+ builder = new RewriteOptions.Builder(conf, inputPathsL, inputPathsR,
outputPath);
} else {
OutputFile outputPath = HadoopOutputFile.fromPath(new Path(outputFile),
conf);
- List<InputFile> inputs = inputPaths.stream()
+ List<InputFile> inputsL = inputPathsL.stream()
+ .map(p -> HadoopInputFile.fromPathUnchecked(p, conf))
+ .collect(Collectors.toList());
+ List<InputFile> inputsR = inputPathsR.stream()
.map(p -> HadoopInputFile.fromPathUnchecked(p, conf))
.collect(Collectors.toList());
- builder = new RewriteOptions.Builder(parquetConf, inputs, outputPath);
+ builder = new RewriteOptions.Builder(parquetConf, inputsL, inputsR,
outputPath);
}
+ builder.overwriteInputWithJoinColumns(overwriteInputWithJoinColumns);
return builder;
}
- private void validateSchema() throws IOException {
- ParquetMetadata pmd =
- ParquetFileReader.readFooter(conf, new Path(outputFile),
ParquetMetadataConverter.NO_FILTER);
- MessageType schema = pmd.getFileMetaData().getSchema();
- List<Type> fields = schema.getFields();
- assertEquals(fields.size(), 5);
- assertEquals(fields.get(0).getName(), "DocId");
- assertEquals(fields.get(1).getName(), "Name");
- assertEquals(fields.get(2).getName(), "FloatFraction");
- assertEquals(fields.get(3).getName(), "DoubleFraction");
- assertEquals(fields.get(4).getName(), "Links");
- List<Type> subFields = fields.get(4).asGroupType().getFields();
- assertEquals(subFields.size(), 2);
- assertEquals(subFields.get(0).getName(), "Backward");
- assertEquals(subFields.get(1).getName(), "Forward");
+ private void validateSchemaWithGenderColumnPruned(boolean addJoinedColumn)
throws IOException {
+ MessageType expectSchema = new MessageType(
+ "schema",
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "Name"),
+ new PrimitiveType(REPEATED, FLOAT, "FloatFraction"),
+ new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"),
+ new GroupType(
+ OPTIONAL,
+ "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
+ if (addJoinedColumn) {
+ expectSchema = expectSchema.union(new MessageType("schema", new
PrimitiveType(OPTIONAL, INT64, "Age")));
+ }
+ MessageType actualSchema = ParquetFileReader.readFooter(
+ conf, new Path(outputFile), ParquetMetadataConverter.NO_FILTER)
+ .getFileMetaData()
+ .getSchema();
+ assertEquals(expectSchema, actualSchema);
}
private void ensureContainsGzipFile() {