This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 686f07147 GH-3035: ParquetRewriter: Add a column renaming feature
(#3036)
686f07147 is described below
commit 686f07147499f06eeb78c9643aba499e20002676
Author: Maksim Konstantinov <[email protected]>
AuthorDate: Wed Nov 13 07:09:08 2024 -0800
GH-3035: ParquetRewriter: Add a column renaming feature (#3036)
---
.../parquet/hadoop/rewrite/ParquetRewriter.java | 137 +++++++++--
.../parquet/hadoop/rewrite/RewriteOptions.java | 81 +++++--
.../hadoop/rewrite/ParquetRewriterTest.java | 251 +++++++++++++++------
3 files changed, 363 insertions(+), 106 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 2ff9c0ea3..9535b4335 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -109,7 +109,7 @@ import org.slf4j.LoggerFactory;
* Please note the schema of all <code>inputFiles</code> must be the same,
otherwise the rewrite will fail.
* <p>
* <h2>Applying column transformations</h2>
- * Some supported column transformations: pruning, masking, encrypting,
changing a codec.
+ * Some supported column transformations: pruning, masking, renaming,
encrypting, changing a codec.
* See {@link RewriteOptions} and {@link RewriteOptions.Builder} for the full
list with description.
* <p>
* <h2><i>Joining</i> with extra files with a different schema</h2>
@@ -149,18 +149,23 @@ public class ParquetRewriter implements Closeable {
private final IndexCache.CacheStrategy indexCacheStrategy;
private final boolean overwriteInputWithJoinColumns;
private final InternalFileEncryptor nullColumnEncryptor;
+ private final Map<String, String> renamedColumns;
public ParquetRewriter(RewriteOptions options) throws IOException {
this.newCodecName = options.getNewCodecName();
this.indexCacheStrategy = options.getIndexCacheStrategy();
this.overwriteInputWithJoinColumns =
options.getOverwriteInputWithJoinColumns();
+ this.renamedColumns = options.getRenameColumns();
ParquetConfiguration conf = options.getParquetConfiguration();
- OutputFile out = options.getParquetOutputFile();
- inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
-
inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(),
conf));
+ this.inputFiles.addAll(getFileReaders(options.getParquetInputFiles(),
conf));
+
this.inputFilesToJoin.addAll(getFileReaders(options.getParquetInputFilesToJoin(),
conf));
+ this.outSchema = pruneColumnsInSchema(getSchema(),
options.getPruneColumns());
+ this.extraMetaData = getExtraMetadata(options);
ensureSameSchema(inputFiles);
ensureSameSchema(inputFilesToJoin);
ensureRowCount();
+ ensureRenamingCorrectness(outSchema, renamedColumns);
+ OutputFile out = options.getParquetOutputFile();
LOG.info(
"Start rewriting {} input file(s) {} to {}",
inputFiles.size() + inputFilesToJoin.size(),
@@ -168,9 +173,6 @@ public class ParquetRewriter implements Closeable {
.collect(Collectors.toList()),
out);
- this.outSchema = pruneColumnsInSchema(getSchema(),
options.getPruneColumns());
- this.extraMetaData = getExtraMetadata(options);
-
if (options.getMaskColumns() != null) {
this.maskColumns = new HashMap<>();
for (Map.Entry<String, MaskMode> col :
options.getMaskColumns().entrySet()) {
@@ -184,9 +186,9 @@ public class ParquetRewriter implements Closeable {
}
ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
- writer = new ParquetFileWriter(
+ this.writer = new ParquetFileWriter(
out,
- outSchema,
+ renamedColumns.isEmpty() ? outSchema :
getSchemaWithRenamedColumns(this.outSchema),
writerMode,
DEFAULT_BLOCK_SIZE,
MAX_PADDING_SIZE_DEFAULT,
@@ -200,7 +202,8 @@ public class ParquetRewriter implements Closeable {
this.nullColumnEncryptor = null;
} else {
this.nullColumnEncryptor = new
InternalFileEncryptor(options.getFileEncryptionProperties());
- List<ColumnDescriptor> columns = outSchema.getColumns();
+ List<ColumnDescriptor> columns =
+ getSchemaWithRenamedColumns(this.outSchema).getColumns();
for (int i = 0; i < columns.size(); i++) {
writer.getEncryptor()
.getColumnSetup(ColumnPath.get(columns.get(i).getPath()), true, i);
@@ -223,8 +226,8 @@ public class ParquetRewriter implements Closeable {
this.writer = writer;
this.outSchema = outSchema;
this.newCodecName = codecName;
- extraMetaData = new
HashMap<>(meta.getFileMetaData().getKeyValueMetaData());
- extraMetaData.put(
+ this.extraMetaData = new
HashMap<>(meta.getFileMetaData().getKeyValueMetaData());
+ this.extraMetaData.put(
ORIGINAL_CREATED_BY_KEY,
originalCreatedBy != null
? originalCreatedBy
@@ -239,6 +242,7 @@ public class ParquetRewriter implements Closeable {
this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
this.overwriteInputWithJoinColumns = false;
this.nullColumnEncryptor = null;
+ this.renamedColumns = new HashMap<>();
}
private MessageType getSchema() {
@@ -266,6 +270,27 @@ public class ParquetRewriter implements Closeable {
}
}
+ private MessageType getSchemaWithRenamedColumns(MessageType schema) {
+ List<Type> fields = schema.getFields().stream()
+ .map(type -> {
+ if (!renamedColumns.containsKey(type.getName())) {
+ return type;
+ } else if (type.isPrimitive()) {
+ return new PrimitiveType(
+ type.getRepetition(),
+ type.asPrimitiveType().getPrimitiveTypeName(),
+ renamedColumns.get(type.getName()));
+ } else {
+ return new GroupType(
+ type.getRepetition(),
+ renamedColumns.get(type.getName()),
+ type.asGroupType().getFields());
+ }
+ })
+ .collect(Collectors.toList());
+ return new MessageType(schema.getName(), fields);
+ }
+
private Map<String, String> getExtraMetadata(RewriteOptions options) {
List<TransParquetFileReader> allFiles;
if (options.getIgnoreJoinFilesMetadata()) {
@@ -338,6 +363,21 @@ public class ParquetRewriter implements Closeable {
}
}
+ private void ensureRenamingCorrectness(MessageType schema, Map<String,
String> renameMap) {
+ Set<String> columns =
schema.getFields().stream().map(Type::getName).collect(Collectors.toSet());
+ renameMap.forEach((src, dst) -> {
+ if (!columns.contains(src)) {
+ String msg = String.format("Column to rename '%s' is not found in
input files schema", src);
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ } else if (columns.contains(dst)) {
+ String msg = String.format("Renamed column target name '%s' is already
present in a schema", dst);
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+ });
+ }
+
@Override
public void close() throws IOException {
writer.end(extraMetaData);
@@ -421,6 +461,27 @@ public class ParquetRewriter implements Closeable {
if (readerToJoin != null) readerToJoin.close();
}
+ private ColumnPath normalizeFieldsInPath(ColumnPath path) {
+ if (renamedColumns.isEmpty()) {
+ return path;
+ } else {
+ String[] pathArray = path.toArray();
+ pathArray[0] = renamedColumns.getOrDefault(pathArray[0], pathArray[0]);
+ return ColumnPath.get(pathArray);
+ }
+ }
+
+ private PrimitiveType normalizeNameInType(PrimitiveType type) {
+ if (renamedColumns.isEmpty()) {
+ return type;
+ } else {
+ return new PrimitiveType(
+ type.getRepetition(),
+ type.asPrimitiveType().getPrimitiveTypeName(),
+ renamedColumns.getOrDefault(type.getName(), type.getName()));
+ }
+ }
+
private void processBlock(
TransParquetFileReader reader,
int blockIdx,
@@ -431,7 +492,28 @@ public class ParquetRewriter implements Closeable {
if (chunk.isEncrypted()) {
throw new IOException("Column " + chunk.getPath().toDotString() + " is
already encrypted");
}
- ColumnDescriptor descriptor = outSchema.getColumns().get(outColumnIdx);
+
+ ColumnChunkMetaData chunkNormalized = chunk;
+ if (!renamedColumns.isEmpty()) {
+ // Keep an eye if this get stale because of ColumnChunkMetaData change
+ chunkNormalized = ColumnChunkMetaData.get(
+ normalizeFieldsInPath(chunk.getPath()),
+ normalizeNameInType(chunk.getPrimitiveType()),
+ chunk.getCodec(),
+ chunk.getEncodingStats(),
+ chunk.getEncodings(),
+ chunk.getStatistics(),
+ chunk.getFirstDataPageOffset(),
+ chunk.getDictionaryPageOffset(),
+ chunk.getValueCount(),
+ chunk.getTotalSize(),
+ chunk.getTotalUncompressedSize(),
+ chunk.getSizeStatistics());
+ }
+
+ ColumnDescriptor descriptorOriginal =
outSchema.getColumns().get(outColumnIdx);
+ ColumnDescriptor descriptorRenamed =
+ getSchemaWithRenamedColumns(outSchema).getColumns().get(outColumnIdx);
BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx);
String originalCreatedBy = reader.getFileMetaData().getCreatedBy();
@@ -443,13 +525,21 @@ public class ParquetRewriter implements Closeable {
// Mask column and compress it again.
MaskMode maskMode = maskColumns.get(chunk.getPath());
if (maskMode.equals(MaskMode.NULLIFY)) {
- Type.Repetition repetition =
descriptor.getPrimitiveType().getRepetition();
+ Type.Repetition repetition =
+ descriptorOriginal.getPrimitiveType().getRepetition();
if (repetition.equals(Type.Repetition.REQUIRED)) {
- throw new IOException(
- "Required column [" + descriptor.getPrimitiveType().getName() +
"] cannot be nullified");
+ throw new IOException("Required column ["
+ + descriptorOriginal.getPrimitiveType().getName() + "] cannot be
nullified");
}
nullifyColumn(
- reader, blockIdx, descriptor, chunk, writer, newCodecName,
encryptColumn, originalCreatedBy);
+ reader,
+ blockIdx,
+ descriptorOriginal,
+ chunk,
+ writer,
+ newCodecName,
+ encryptColumn,
+ originalCreatedBy);
} else {
throw new UnsupportedOperationException("Only nullify is supported for
now");
}
@@ -462,7 +552,7 @@ public class ParquetRewriter implements Closeable {
}
// Translate compression and/or encryption
- writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
+ writer.startColumn(descriptorRenamed, chunk.getValueCount(),
newCodecName);
processChunk(
reader,
blockMetaData.getRowCount(),
@@ -480,7 +570,8 @@ public class ParquetRewriter implements Closeable {
BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
- writer.appendColumnChunk(descriptor, reader.getStream(), chunk,
bloomFilter, columnIndex, offsetIndex);
+ writer.appendColumnChunk(
+ descriptorRenamed, reader.getStream(), chunkNormalized, bloomFilter,
columnIndex, offsetIndex);
}
}
@@ -522,7 +613,7 @@ public class ParquetRewriter implements Closeable {
}
if (bloomFilter != null) {
- writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter);
+
writer.addBloomFilter(normalizeFieldsInPath(chunk.getPath()).toDotString(),
bloomFilter);
}
reader.setStreamPosition(chunk.getStartingPos());
@@ -580,7 +671,7 @@ public class ParquetRewriter implements Closeable {
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy,
- chunk.getPrimitiveType(),
+ normalizeNameInType(chunk.getPrimitiveType()),
headerV1.getStatistics(),
columnIndex,
pageOrdinal,
@@ -648,7 +739,7 @@ public class ParquetRewriter implements Closeable {
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy,
- chunk.getPrimitiveType(),
+ normalizeNameInType(chunk.getPrimitiveType()),
headerV2.getStatistics(),
columnIndex,
pageOrdinal,
@@ -887,7 +978,7 @@ public class ParquetRewriter implements Closeable {
CompressionCodecFactory.BytesInputCompressor compressor =
codecFactory.getCompressor(newCodecName);
// Create new schema that only has the current column
- MessageType newSchema = newSchema(outSchema, descriptor);
+ MessageType newSchema = getSchemaWithRenamedColumns(newSchema(outSchema,
descriptor));
ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
compressor,
newSchema,
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
index a69403f46..f85b65ea3 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -20,8 +20,11 @@ package org.apache.parquet.hadoop.rewrite;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -49,6 +52,7 @@ public class RewriteOptions {
private final List<String> pruneColumns;
private final CompressionCodecName newCodecName;
private final Map<String, MaskMode> maskColumns;
+ private final Map<String, String> renameColumns;
private final List<String> encryptColumns;
private final FileEncryptionProperties fileEncryptionProperties;
private final IndexCache.CacheStrategy indexCacheStrategy;
@@ -63,6 +67,7 @@ public class RewriteOptions {
List<String> pruneColumns,
CompressionCodecName newCodecName,
Map<String, MaskMode> maskColumns,
+ Map<String, String> renameColumns,
List<String> encryptColumns,
FileEncryptionProperties fileEncryptionProperties,
IndexCache.CacheStrategy indexCacheStrategy,
@@ -75,6 +80,7 @@ public class RewriteOptions {
this.pruneColumns = pruneColumns;
this.newCodecName = newCodecName;
this.maskColumns = maskColumns;
+ this.renameColumns = renameColumns;
this.encryptColumns = encryptColumns;
this.fileEncryptionProperties = fileEncryptionProperties;
this.indexCacheStrategy = indexCacheStrategy;
@@ -192,6 +198,10 @@ public class RewriteOptions {
return maskColumns;
}
+ public Map<String, String> getRenameColumns() {
+ return renameColumns;
+ }
+
public List<String> getEncryptColumns() {
return encryptColumns;
}
@@ -221,6 +231,7 @@ public class RewriteOptions {
private List<String> pruneColumns;
private CompressionCodecName newCodecName;
private Map<String, MaskMode> maskColumns;
+ private Map<String, String> renameColumns;
private List<String> encryptColumns;
private FileEncryptionProperties fileEncryptionProperties;
private IndexCache.CacheStrategy indexCacheStrategy =
IndexCache.CacheStrategy.NONE;
@@ -432,6 +443,19 @@ public class RewriteOptions {
return this;
}
+ /**
+ * Set the columns to be renamed.
+ * <p>
+ * Note that nested columns can't be renamed, in case of GroupType column
only top level column can be renamed.
+ *
+ * @param renameColumns map where keys are original names and values are
new names
+ * @return self
+ */
+ public Builder renameColumns(Map<String, String> renameColumns) {
+ this.renameColumns = renameColumns;
+ return this;
+ }
+
/**
* Set the columns to encrypt.
* <p>
@@ -551,6 +575,28 @@ public class RewriteOptions {
* @return a RewriterOptions
*/
public RewriteOptions build() {
+ checkPreconditions();
+ return new RewriteOptions(
+ conf,
+ inputFiles,
+ (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()),
+ outputFile,
+ pruneColumns,
+ newCodecName,
+ maskColumns,
+ renameColumns == null
+ ? new HashMap<>()
+ : renameColumns.entrySet().stream()
+ .collect(Collectors.toMap(x -> x.getKey().trim(), x ->
x.getValue()
+ .trim())),
+ encryptColumns,
+ fileEncryptionProperties,
+ indexCacheStrategy,
+ overwriteInputWithJoinColumns,
+ ignoreJoinFilesMetadata);
+ }
+
+ private void checkPreconditions() {
Preconditions.checkArgument(inputFiles != null && !inputFiles.isEmpty(),
"Input file is required");
Preconditions.checkArgument(outputFile != null, "Output file is
required");
@@ -561,7 +607,6 @@ public class RewriteOptions {
!maskColumns.containsKey(pruneColumn), "Cannot prune and mask
same column");
}
}
-
if (encryptColumns != null) {
for (String pruneColumn : pruneColumns) {
Preconditions.checkArgument(
@@ -570,6 +615,26 @@ public class RewriteOptions {
}
}
+ if (renameColumns != null) {
+ Set<String> nullifiedColumns = maskColumns == null
+ ? new HashSet<>()
+ : maskColumns.entrySet().stream()
+ .filter(x -> x.getValue() == MaskMode.NULLIFY)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ renameColumns.forEach((colSrc, colDst) -> {
+ Preconditions.checkArgument(
+ colSrc != null && !colSrc.trim().isEmpty(), "Renamed column
source name can't be empty");
+ Preconditions.checkArgument(
+ colDst != null && !colDst.trim().isEmpty(), "Renamed column
target name can't be empty");
+ Preconditions.checkArgument(
+ !nullifiedColumns.contains(colSrc), "Cannot nullify and rename
the same column");
+ Preconditions.checkArgument(
+ !colSrc.contains(".") && !colDst.contains("."),
+ "Renamed column can't be nested, in case of GroupType column
only a top level column can be renamed");
+ });
+ }
+
if (encryptColumns != null && !encryptColumns.isEmpty()) {
Preconditions.checkArgument(
fileEncryptionProperties != null,
@@ -581,20 +646,6 @@ public class RewriteOptions {
encryptColumns != null && !encryptColumns.isEmpty(),
"Encrypt columns is required when FileEncryptionProperties is
set");
}
-
- return new RewriteOptions(
- conf,
- inputFiles,
- (inputFilesToJoin != null ? inputFilesToJoin : new ArrayList<>()),
- outputFile,
- pruneColumns,
- newCodecName,
- maskColumns,
- encryptColumns,
- fileEncryptionProperties,
- indexCacheStrategy,
- overwriteInputWithJoinColumns,
- ignoreJoinFilesMetadata);
}
}
}
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index 34c90a464..c1da97c40 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.parquet.hadoop.rewrite;
+import static java.util.Collections.emptyMap;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
@@ -181,10 +182,10 @@ public class ParquetRewriterTest {
null);
// Verify the data are not changed for the columns not pruned
- validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(),
null, false);
+ validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(),
null, false, emptyMap());
// Verify the page index
- validatePageIndex(new HashSet<>(), false);
+ validatePageIndex(new HashSet<>(), false, emptyMap());
// Verify original.created.by is preserved
validateCreatedBy();
@@ -199,7 +200,7 @@ public class ParquetRewriterTest {
@Test
public void testPruneSingleColumnTranslateCodecSingleFile() throws Exception
{
- ensureContainsGzipFile();
+ addGzipInputFile();
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -210,8 +211,8 @@ public class ParquetRewriterTest {
@Test
public void testPruneSingleColumnTranslateCodecTwoFiles() throws Exception {
- ensureContainsGzipFile();
- ensureContainsUncompressedFile();
+ addGzipInputFile();
+ addUncompressedInputFile();
List<Path> inputPaths = new ArrayList<Path>() {
{
add(new Path(inputFiles.get(0).getFileName()));
@@ -252,10 +253,10 @@ public class ParquetRewriterTest {
null);
// Verify the data are not changed for the columns not pruned
- validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(),
null, false);
+ validateColumnData(new HashSet<>(pruneColumns), maskColumns.keySet(),
null, false, emptyMap());
// Verify the page index
- validatePageIndex(ImmutableSet.of("Links.Forward"), false);
+ validatePageIndex(ImmutableSet.of("Links.Forward"), false, emptyMap());
// Verify original.created.by is preserved
validateCreatedBy();
@@ -264,7 +265,7 @@ public class ParquetRewriterTest {
@Test
public void testPruneNullifyTranslateCodecSingleFile() throws Exception {
- ensureContainsGzipFile();
+ addGzipInputFile();
List<Path> inputPaths = new ArrayList<Path>() {
{
@@ -276,8 +277,8 @@ public class ParquetRewriterTest {
@Test
public void testPruneNullifyTranslateCodecTwoFiles() throws Exception {
- ensureContainsGzipFile();
- ensureContainsUncompressedFile();
+ addGzipInputFile();
+ addUncompressedInputFile();
List<Path> inputPaths = new ArrayList<Path>() {
{
@@ -327,7 +328,8 @@ public class ParquetRewriterTest {
fileDecryptionProperties);
// Verify the data are not changed for the columns not pruned
- validateColumnData(new HashSet<>(pruneColumns), Collections.emptySet(),
fileDecryptionProperties, false);
+ validateColumnData(
+ new HashSet<>(pruneColumns), Collections.emptySet(),
fileDecryptionProperties, false, emptyMap());
// Verify column encryption
ParquetMetadata metaData = getFileMetaData(outputFile,
fileDecryptionProperties);
@@ -349,7 +351,7 @@ public class ParquetRewriterTest {
@Test
public void testPruneEncryptTranslateCodecSingleFile() throws Exception {
- ensureContainsGzipFile();
+ addGzipInputFile();
List<Path> inputPaths = new ArrayList<Path>() {
{
@@ -361,8 +363,8 @@ public class ParquetRewriterTest {
@Test
public void testPruneEncryptTranslateCodecTwoFiles() throws Exception {
- ensureContainsGzipFile();
- ensureContainsUncompressedFile();
+ addGzipInputFile();
+ addUncompressedInputFile();
List<Path> inputPaths = new ArrayList<Path>() {
{
@@ -488,10 +490,10 @@ public class ParquetRewriterTest {
// Verify the data are not changed for non-encrypted and non-masked
columns.
// Also make sure the masked column is nullified.
- validateColumnData(Collections.emptySet(), maskColumns.keySet(),
fileDecryptionProperties, false);
+ validateColumnData(Collections.emptySet(), maskColumns.keySet(),
fileDecryptionProperties, false, emptyMap());
// Verify the page index
- validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false);
+ validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false,
emptyMap());
// Verify the column is encrypted
ParquetMetadata metaData = getFileMetaData(outputFile,
fileDecryptionProperties);
@@ -511,7 +513,7 @@ public class ParquetRewriterTest {
@Test
public void testNullifyEncryptSingleFile() throws Exception {
- ensureContainsGzipFile();
+ addGzipInputFile();
List<Path> inputPaths = new ArrayList<Path>() {
{
@@ -523,8 +525,8 @@ public class ParquetRewriterTest {
@Test
public void testNullifyEncryptTwoFiles() throws Exception {
- ensureContainsGzipFile();
- ensureContainsUncompressedFile();
+ addGzipInputFile();
+ addUncompressedInputFile();
List<Path> inputPaths = new ArrayList<Path>() {
{
@@ -537,8 +539,8 @@ public class ParquetRewriterTest {
@Test
public void testMergeTwoFilesOnly() throws Exception {
- ensureContainsGzipFile();
- ensureContainsUncompressedFile();
+ addGzipInputFile();
+ addUncompressedInputFile();
// Only merge two files but do not change anything.
List<Path> inputPaths = new ArrayList<>();
@@ -571,27 +573,103 @@ public class ParquetRewriterTest {
null);
// Verify the merged data are not changed
- validateColumnData(Collections.emptySet(), Collections.emptySet(), null,
false);
+ validateColumnData(Collections.emptySet(), Collections.emptySet(), null,
false, emptyMap());
// Verify the page index
- validatePageIndex(new HashSet<>(), false);
+ validatePageIndex(new HashSet<>(), false, emptyMap());
// Verify original.created.by is preserved
validateCreatedBy();
validateRowGroupRowCount();
}
+ @Test
+ public void testMergeTwoFilesOnlyRenameColumn() throws Exception {
+ addGzipInputFile();
+ addUncompressedInputFile();
+
+ Map<String, String> renameColumns = ImmutableMap.of("Name", "NameRenamed");
+ List<String> pruneColumns = ImmutableList.of("Gender");
+ String[] encryptColumns = {"DocId"};
+ FileEncryptionProperties fileEncryptionProperties =
+ EncDecProperties.getFileEncryptionProperties(encryptColumns,
ParquetCipher.AES_GCM_CTR_V1, false);
+ List<Path> inputPaths =
+ inputFiles.stream().map(x -> new
Path(x.getFileName())).collect(Collectors.toList());
+ RewriteOptions.Builder builder = createBuilder(inputPaths);
+ RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy)
+ .renameColumns(ImmutableMap.of("Name", "NameRenamed"))
+ .prune(pruneColumns)
+ .transform(CompressionCodecName.SNAPPY)
+ .encrypt(Arrays.asList(encryptColumns))
+ .encryptionProperties(fileEncryptionProperties)
+ .build();
+
+ rewriter = new ParquetRewriter(options);
+ rewriter.processBlocks();
+ rewriter.close();
+
+ FileDecryptionProperties fileDecryptionProperties =
EncDecProperties.getFileDecryptionProperties();
+
+ // Verify the schema is not changed
+ ParquetMetadata pmd =
+ ParquetFileReader.readFooter(conf, new Path(outputFile),
ParquetMetadataConverter.NO_FILTER);
+ MessageType schema = pmd.getFileMetaData().getSchema();
+ MessageType expectSchema = createSchemaWithRenamed();
+ assertEquals(expectSchema, schema);
+
+ verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.SNAPPY),
fileDecryptionProperties); // Verify codec
+ // Verify the merged data are not changed
+ validateColumnData(
+ new HashSet<>(pruneColumns), Collections.emptySet(),
fileDecryptionProperties, false, renameColumns);
+ validatePageIndex(ImmutableSet.of("DocId"), false, renameColumns); //
Verify the page index
+ validateCreatedBy(); // Verify original.created.by is preserved
+ validateRowGroupRowCount();
+
+ ParquetMetadata metaData = getFileMetaData(outputFile,
fileDecryptionProperties);
+ assertFalse(metaData.getBlocks().isEmpty());
+ Set<String> encryptedColumns = new
HashSet<>(Arrays.asList(encryptColumns));
+ for (BlockMetaData blockMetaData : metaData.getBlocks()) {
+ List<ColumnChunkMetaData> columns = blockMetaData.getColumns();
+ for (ColumnChunkMetaData column : columns) {
+ if (encryptedColumns.contains(column.getPath().toDotString())) {
+ assertTrue(column.isEncrypted());
+ } else {
+ assertFalse(column.isEncrypted());
+ }
+ }
+ }
+ }
+
@Test(expected = InvalidSchemaException.class)
public void testMergeTwoFilesWithDifferentSchema() throws Exception {
- testMergeTwoFilesWithDifferentSchemaSetup(true);
+ testMergeTwoFilesWithDifferentSchemaSetup(true, null, null);
}
@Test(expected = InvalidSchemaException.class)
public void testMergeTwoFilesToJoinWithDifferentSchema() throws Exception {
- testMergeTwoFilesWithDifferentSchemaSetup(false);
+ testMergeTwoFilesWithDifferentSchemaSetup(false, null, null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMergeTwoFilesWithWrongDestinationRenamedColumn() throws
Exception {
+ testMergeTwoFilesWithDifferentSchemaSetup(
+ null, ImmutableMap.of("WrongColumnName", "WrongColumnNameRenamed"),
null);
}
- public void testMergeTwoFilesWithDifferentSchemaSetup(boolean
wrongSchemaInInputFile) throws Exception {
+ @Test(expected = IllegalArgumentException.class)
+ public void testMergeTwoFilesWithWrongSourceRenamedColumn() throws Exception
{
+ testMergeTwoFilesWithDifferentSchemaSetup(null, ImmutableMap.of("Name",
"DocId"), null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMergeTwoFilesNullifyAndRenamedSameColumn() throws Exception {
+ testMergeTwoFilesWithDifferentSchemaSetup(
+ null, ImmutableMap.of("Name", "NameRenamed"), ImmutableMap.of("Name",
MaskMode.NULLIFY));
+ }
+
+ public void testMergeTwoFilesWithDifferentSchemaSetup(
+ Boolean wrongSchemaInInputFile, Map<String, String> renameColumns,
Map<String, MaskMode> maskColumns)
+ throws Exception {
MessageType schema1 = new MessageType(
"schema",
new PrimitiveType(OPTIONAL, INT64, "DocId"),
@@ -620,27 +698,32 @@ public class ParquetRewriterTest {
.withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
.withWriterVersion(writerVersion)
.build());
- if (wrongSchemaInInputFile) {
- inputFiles.add(new TestFileBuilder(conf, schema2)
- .withNumRecord(numRecord)
- .withCodec("UNCOMPRESSED")
- .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
- .withWriterVersion(writerVersion)
- .build());
- } else {
- inputFilesToJoin.add(new TestFileBuilder(conf, schema2)
- .withNumRecord(numRecord)
- .withCodec("UNCOMPRESSED")
- .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
- .withWriterVersion(writerVersion)
- .build());
+ if (wrongSchemaInInputFile != null) {
+ if (wrongSchemaInInputFile) {
+ inputFiles.add(new TestFileBuilder(conf, schema2)
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(writerVersion)
+ .build());
+ } else {
+ inputFilesToJoin.add(new TestFileBuilder(conf, schema2)
+ .withNumRecord(numRecord)
+ .withCodec("UNCOMPRESSED")
+ .withPageSize(ParquetProperties.DEFAULT_PAGE_SIZE)
+ .withWriterVersion(writerVersion)
+ .build());
+ }
}
RewriteOptions.Builder builder = createBuilder(
inputFiles.stream().map(x -> new
Path(x.getFileName())).collect(Collectors.toList()),
inputFilesToJoin.stream().map(x -> new
Path(x.getFileName())).collect(Collectors.toList()),
false);
- RewriteOptions options =
builder.indexCacheStrategy(indexCacheStrategy).build();
+ RewriteOptions options = builder.indexCacheStrategy(indexCacheStrategy)
+ .renameColumns(renameColumns)
+ .mask(maskColumns)
+ .build();
// This should throw an exception because the schemas are different
rewriter = new ParquetRewriter(options);
@@ -648,7 +731,7 @@ public class ParquetRewriterTest {
@Test
public void testRewriteFileWithMultipleBlocks() throws Exception {
- ensureContainsGzipFile();
+ addGzipInputFile();
List<Path> inputPaths = new ArrayList<Path>() {
{
@@ -823,12 +906,13 @@ public class ParquetRewriterTest {
new HashSet<>(pruneColumns),
maskColumns.keySet(),
fileDecryptionProperties,
- joinColumnsOverwrite); // Verify data
+ joinColumnsOverwrite,
+ emptyMap()); // Verify data
validateSchemaWithGenderColumnPruned(true); // Verify schema
validateCreatedBy(); // Verify original.created.by
assertEquals(inputBloomFilters.keySet(), rBloomFilters); // Verify bloom
filters
verifyCodec(outputFile, ImmutableSet.of(CompressionCodecName.ZSTD),
fileDecryptionProperties); // Verify codec
- validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite);
+ validatePageIndex(ImmutableSet.of(encryptColumn), joinColumnsOverwrite,
emptyMap());
}
private void testOneInputFileManyInputFilesToJoinSetup() throws IOException {
@@ -884,11 +968,26 @@ public class ParquetRewriterTest {
new PrimitiveType(REPEATED, BINARY, "Forward")));
}
+ private MessageType createSchemaWithRenamed() {
+ return new MessageType(
+ "schema",
+ new PrimitiveType(OPTIONAL, INT64, "DocId"),
+ new PrimitiveType(REQUIRED, BINARY, "NameRenamed"),
+ new PrimitiveType(REPEATED, FLOAT, "FloatFraction"),
+ new PrimitiveType(OPTIONAL, DOUBLE, "DoubleFraction"),
+ new GroupType(
+ OPTIONAL,
+ "Links",
+ new PrimitiveType(REPEATED, BINARY, "Backward"),
+ new PrimitiveType(REPEATED, BINARY, "Forward")));
+ }
+
private void validateColumnData(
Set<String> prunePaths,
Set<String> nullifiedPaths,
FileDecryptionProperties fileDecryptionProperties,
- Boolean joinColumnsOverwrite)
+ Boolean joinColumnsOverwrite,
+ Map<String, String> renameColumns)
throws IOException {
ParquetReader<Group> reader = ParquetReader.builder(new
GroupReadSupport(), new Path(outputFile))
.withConf(conf)
@@ -901,7 +1000,7 @@ public class ParquetRewriterTest {
List<SimpleGroup> filesJoined = inputFilesToJoin.stream()
.flatMap(x -> Arrays.stream(x.getFileContent()))
.collect(Collectors.toList());
- BiFunction<String, Integer, Group> groups = (name, rowIdx) -> {
+ BiFunction<String, Integer, Group> groupsExpected = (name, rowIdx) -> {
if (!filesMain.get(0).getType().containsField(name)
|| joinColumnsOverwrite
&& !filesJoined.isEmpty()
@@ -915,50 +1014,53 @@ public class ParquetRewriterTest {
int totalRows =
inputFiles.stream().mapToInt(x -> x.getFileContent().length).sum();
for (int i = 0; i < totalRows; i++) {
- Group group = reader.read();
- assertNotNull(group);
+ Group groupActual = reader.read();
+ assertNotNull(groupActual);
if (!prunePaths.contains("DocId")) {
if (nullifiedPaths.contains("DocId")) {
- assertThrows(RuntimeException.class, () -> group.getLong("DocId",
0));
+ assertThrows(RuntimeException.class, () ->
groupActual.getLong("DocId", 0));
} else {
assertEquals(
- group.getLong("DocId", 0), groups.apply("DocId",
i).getLong("DocId", 0));
+ groupActual.getLong("DocId", 0),
+ groupsExpected.apply("DocId", i).getLong("DocId", 0));
}
}
if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) {
+ String colName = renameColumns.getOrDefault("Name", "Name");
assertArrayEquals(
- group.getBinary("Name", 0).getBytes(),
- groups.apply("Name", i).getBinary("Name", 0).getBytes());
+ groupActual.getBinary(colName, 0).getBytes(),
+ groupsExpected.apply("Name", i).getBinary("Name", 0).getBytes());
}
if (!prunePaths.contains("Gender") &&
!nullifiedPaths.contains("Gender")) {
assertArrayEquals(
- group.getBinary("Gender", 0).getBytes(),
- groups.apply("Gender", i).getBinary("Gender", 0).getBytes());
+ groupActual.getBinary("Gender", 0).getBytes(),
+ groupsExpected.apply("Gender", i).getBinary("Gender",
0).getBytes());
}
if (!prunePaths.contains("FloatFraction") &&
!nullifiedPaths.contains("FloatFraction")) {
assertEquals(
- group.getFloat("FloatFraction", 0),
- groups.apply("FloatFraction", i).getFloat("FloatFraction", 0),
+ groupActual.getFloat("FloatFraction", 0),
+ groupsExpected.apply("FloatFraction", i).getFloat("FloatFraction",
0),
0);
}
if (!prunePaths.contains("DoubleFraction") &&
!nullifiedPaths.contains("DoubleFraction")) {
assertEquals(
- group.getDouble("DoubleFraction", 0),
- groups.apply("DoubleFraction", i).getDouble("DoubleFraction", 0),
+ groupActual.getDouble("DoubleFraction", 0),
+ groupsExpected.apply("DoubleFraction",
i).getDouble("DoubleFraction", 0),
0);
}
- Group subGroup = group.getGroup("Links", 0);
+ Group subGroup = groupActual.getGroup("Links", 0);
if (!prunePaths.contains("Links.Backward") &&
!nullifiedPaths.contains("Links.Backward")) {
assertArrayEquals(
subGroup.getBinary("Backward", 0).getBytes(),
- groups.apply("Links", i)
+ groupsExpected
+ .apply("Links", i)
.getGroup("Links", 0)
.getBinary("Backward", 0)
.getBytes());
@@ -970,7 +1072,8 @@ public class ParquetRewriterTest {
} else {
assertArrayEquals(
subGroup.getBinary("Forward", 0).getBytes(),
- groups.apply("Links", i)
+ groupsExpected
+ .apply("Links", i)
.getGroup("Links", 0)
.getBinary("Forward", 0)
.getBytes());
@@ -1014,13 +1117,22 @@ public class ParquetRewriterTest {
R apply(T t) throws IOException;
}
+ private ColumnPath normalizeFieldsInPath(ColumnPath path, Map<String,
String> renameColumns) {
+ String[] pathArray = path.toArray();
+ if (renameColumns != null) {
+ pathArray[0] = renameColumns.getOrDefault(pathArray[0], pathArray[0]);
+ }
+ return ColumnPath.get(pathArray);
+ }
+
/**
* Verify the page index is correct.
*
* @param exclude the columns to exclude from comparison, for example
because they were nullified.
* @param joinColumnsOverwrite whether a join columns overwrote existing
overlapping columns.
*/
- private void validatePageIndex(Set<String> exclude, boolean
joinColumnsOverwrite) throws Exception {
+ private void validatePageIndex(Set<String> exclude, boolean
joinColumnsOverwrite, Map<String, String> renameColumns)
+ throws Exception {
class BlockMeta {
final TransParquetFileReader reader;
final BlockMetaData blockMeta;
@@ -1058,6 +1170,8 @@ public class ParquetRewriterTest {
List<BlockMeta> inBlocksJoined = blockMetaExtractor.apply(
inputFilesToJoin.stream().map(EncryptionTestFile::getFileName).collect(Collectors.toList()));
List<BlockMeta> outBlocks =
blockMetaExtractor.apply(ImmutableList.of(outputFile));
+ Map<String, String> renameColumnsInverted =
+
renameColumns.entrySet().stream().collect(Collectors.toMap(Map.Entry::getValue,
Map.Entry::getKey));
for (int blockIdx = 0; blockIdx < outBlocks.size(); blockIdx++) {
BlockMetaData outBlockMeta = outBlocks.get(blockIdx).blockMeta;
TransParquetFileReader outReader = outBlocks.get(blockIdx).reader;
@@ -1066,17 +1180,18 @@ public class ParquetRewriterTest {
TransParquetFileReader inReader;
BlockMetaData inBlockMeta;
ColumnChunkMetaData inChunk;
- if
(!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())
+ ColumnPath colPath = normalizeFieldsInPath(outChunk.getPath(),
renameColumnsInverted);
+ if (!inBlocksMain.get(blockIdx).colPathToMeta.containsKey(colPath)
|| joinColumnsOverwrite
&& !inBlocksJoined.isEmpty()
- &&
inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(outChunk.getPath())) {
+ &&
inBlocksJoined.get(blockIdx).colPathToMeta.containsKey(colPath)) {
inReader = inBlocksJoined.get(blockIdx).reader;
inBlockMeta = inBlocksJoined.get(blockIdx).blockMeta;
- inChunk =
inBlocksJoined.get(blockIdx).colPathToMeta.get(outChunk.getPath());
+ inChunk = inBlocksJoined.get(blockIdx).colPathToMeta.get(colPath);
} else {
inReader = inBlocksMain.get(blockIdx).reader;
inBlockMeta = inBlocksMain.get(blockIdx).blockMeta;
- inChunk =
inBlocksMain.get(blockIdx).colPathToMeta.get(outChunk.getPath());
+ inChunk = inBlocksMain.get(blockIdx).colPathToMeta.get(colPath);
}
ColumnIndex inColumnIndex = inReader.readColumnIndex(inChunk);
@@ -1284,13 +1399,13 @@ public class ParquetRewriterTest {
assertEquals(expectSchema, actualSchema);
}
- private void ensureContainsGzipFile() {
+ private void addGzipInputFile() {
if (!inputFiles.contains(gzipEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(this.gzipEncryptionTestFileWithoutBloomFilterColumn);
}
}
- private void ensureContainsUncompressedFile() {
+ private void addUncompressedInputFile() {
if
(!inputFiles.contains(uncompressedEncryptionTestFileWithoutBloomFilterColumn)) {
inputFiles.add(uncompressedEncryptionTestFileWithoutBloomFilterColumn);
}