This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 5baa9036e GH-3026: Add a fix to ParquetRewriter when you try to
nullify and encrypt 2 separate columns (#3027)
5baa9036e is described below
commit 5baa9036e0c905d9397aca6c927570ce0e062a42
Author: Maksim Konstantinov <[email protected]>
AuthorDate: Fri Oct 11 18:04:04 2024 -0700
GH-3026: Add a fix to ParquetRewriter when you try to nullify and encrypt 2
separate columns (#3027)
---
.../parquet/hadoop/rewrite/ParquetRewriter.java | 32 ++++++++++++----------
.../hadoop/rewrite/ParquetRewriterTest.java | 4 +--
2 files changed, 20 insertions(+), 16 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 9e106fc3c..2ff9c0ea3 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -144,10 +144,11 @@ public class ParquetRewriter implements Closeable {
// Reader and relevant states of the in-processing input file
private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
private final Queue<TransParquetFileReader> inputFilesToJoin = new
LinkedList<>();
- private MessageType outSchema;
+ private final MessageType outSchema;
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
private final boolean overwriteInputWithJoinColumns;
+ private final InternalFileEncryptor nullColumnEncryptor;
public ParquetRewriter(RewriteOptions options) throws IOException {
this.newCodecName = options.getNewCodecName();
@@ -194,6 +195,17 @@ public class ParquetRewriter implements Closeable {
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
options.getFileEncryptionProperties());
writer.start();
+ // column nullification requires a separate encryptor and forcing other
columns encryption initialization
+ if (options.getFileEncryptionProperties() == null) {
+ this.nullColumnEncryptor = null;
+ } else {
+ this.nullColumnEncryptor = new
InternalFileEncryptor(options.getFileEncryptionProperties());
+ List<ColumnDescriptor> columns = outSchema.getColumns();
+ for (int i = 0; i < columns.size(); i++) {
+ writer.getEncryptor()
+ .getColumnSetup(ColumnPath.get(columns.get(i).getPath()), true, i);
+ }
+ }
}
// TODO: Should we mark it as deprecated to encourage the main constructor
usage? it is also used only from
@@ -226,6 +238,7 @@ public class ParquetRewriter implements Closeable {
this.inputFiles.add(reader);
this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
this.overwriteInputWithJoinColumns = false;
+ this.nullColumnEncryptor = null;
}
private MessageType getSchema() {
@@ -436,15 +449,7 @@ public class ParquetRewriter implements Closeable {
"Required column [" + descriptor.getPrimitiveType().getName() +
"] cannot be nullified");
}
nullifyColumn(
- reader,
- blockIdx,
- descriptor,
- chunk,
- writer,
- outSchema,
- newCodecName,
- encryptColumn,
- originalCreatedBy);
+ reader, blockIdx, descriptor, chunk, writer, newCodecName,
encryptColumn, originalCreatedBy);
} else {
throw new UnsupportedOperationException("Only nullify is supported for
now");
}
@@ -858,7 +863,6 @@ public class ParquetRewriter implements Closeable {
ColumnDescriptor descriptor,
ColumnChunkMetaData chunk,
ParquetFileWriter writer,
- MessageType schema,
CompressionCodecName newCodecName,
boolean encryptColumn,
String originalCreatedBy)
@@ -871,7 +875,7 @@ public class ParquetRewriter implements Closeable {
int dMax = descriptor.getMaxDefinitionLevel();
PageReadStore pageReadStore = reader.readRowGroup(blockIndex);
ColumnReadStoreImpl crStore =
- new ColumnReadStoreImpl(pageReadStore, new DummyGroupConverter(),
schema, originalCreatedBy);
+ new ColumnReadStoreImpl(pageReadStore, new DummyGroupConverter(),
outSchema, originalCreatedBy);
ColumnReader cReader = crStore.getColumnReader(descriptor);
ParquetProperties.WriterVersion writerVersion =
chunk.getEncodingStats().usesV2Pages()
@@ -883,14 +887,14 @@ public class ParquetRewriter implements Closeable {
CompressionCodecFactory.BytesInputCompressor compressor =
codecFactory.getCompressor(newCodecName);
// Create new schema that only has the current column
- MessageType newSchema = newSchema(schema, descriptor);
+ MessageType newSchema = newSchema(outSchema, descriptor);
ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
compressor,
newSchema,
props.getAllocator(),
props.getColumnIndexTruncateLength(),
props.getPageWriteChecksumEnabled(),
- writer.getEncryptor(),
+ nullColumnEncryptor,
numBlocksRewritten);
ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index ce612baec..34c90a464 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -455,7 +455,7 @@ public class ParquetRewriterTest {
private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws
Exception {
Map<String, MaskMode> maskColumns = new HashMap<>();
- maskColumns.put("DocId", MaskMode.NULLIFY);
+ maskColumns.put("Links.Forward", MaskMode.NULLIFY);
String[] encryptColumns = {"DocId"};
FileEncryptionProperties fileEncryptionProperties =
@@ -491,7 +491,7 @@ public class ParquetRewriterTest {
validateColumnData(Collections.emptySet(), maskColumns.keySet(),
fileDecryptionProperties, false);
// Verify the page index
- validatePageIndex(ImmutableSet.of("DocId"), false);
+ validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false);
// Verify the column is encrypted
ParquetMetadata metaData = getFileMetaData(outputFile,
fileDecryptionProperties);