This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/master by this push:
     new 5baa9036e GH-3026: Add a fix to ParquetRewriter when you try to 
nullify and encrypt 2 separate columns (#3027)
5baa9036e is described below

commit 5baa9036e0c905d9397aca6c927570ce0e062a42
Author: Maksim Konstantinov <[email protected]>
AuthorDate: Fri Oct 11 18:04:04 2024 -0700

    GH-3026: Add a fix to ParquetRewriter when you try to nullify and encrypt 2 
separate columns (#3027)
---
 .../parquet/hadoop/rewrite/ParquetRewriter.java    | 32 ++++++++++++----------
 .../hadoop/rewrite/ParquetRewriterTest.java        |  4 +--
 2 files changed, 20 insertions(+), 16 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index 9e106fc3c..2ff9c0ea3 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -144,10 +144,11 @@ public class ParquetRewriter implements Closeable {
   // Reader and relevant states of the in-processing input file
   private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
   private final Queue<TransParquetFileReader> inputFilesToJoin = new 
LinkedList<>();
-  private MessageType outSchema;
+  private final MessageType outSchema;
   // The index cache strategy
   private final IndexCache.CacheStrategy indexCacheStrategy;
   private final boolean overwriteInputWithJoinColumns;
+  private final InternalFileEncryptor nullColumnEncryptor;
 
   public ParquetRewriter(RewriteOptions options) throws IOException {
     this.newCodecName = options.getNewCodecName();
@@ -194,6 +195,17 @@ public class ParquetRewriter implements Closeable {
         ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED,
         options.getFileEncryptionProperties());
     writer.start();
+    // column nullification requires a separate encryptor and forcing other 
columns encryption initialization
+    if (options.getFileEncryptionProperties() == null) {
+      this.nullColumnEncryptor = null;
+    } else {
+      this.nullColumnEncryptor = new 
InternalFileEncryptor(options.getFileEncryptionProperties());
+      List<ColumnDescriptor> columns = outSchema.getColumns();
+      for (int i = 0; i < columns.size(); i++) {
+        writer.getEncryptor()
+            .getColumnSetup(ColumnPath.get(columns.get(i).getPath()), true, i);
+      }
+    }
   }
 
   // TODO: Should we mark it as deprecated to encourage the main constructor 
usage? it is also used only from
@@ -226,6 +238,7 @@ public class ParquetRewriter implements Closeable {
     this.inputFiles.add(reader);
     this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
     this.overwriteInputWithJoinColumns = false;
+    this.nullColumnEncryptor = null;
   }
 
   private MessageType getSchema() {
@@ -436,15 +449,7 @@ public class ParquetRewriter implements Closeable {
               "Required column [" + descriptor.getPrimitiveType().getName() + 
"] cannot be nullified");
         }
         nullifyColumn(
-            reader,
-            blockIdx,
-            descriptor,
-            chunk,
-            writer,
-            outSchema,
-            newCodecName,
-            encryptColumn,
-            originalCreatedBy);
+            reader, blockIdx, descriptor, chunk, writer, newCodecName, 
encryptColumn, originalCreatedBy);
       } else {
         throw new UnsupportedOperationException("Only nullify is supported for 
now");
       }
@@ -858,7 +863,6 @@ public class ParquetRewriter implements Closeable {
       ColumnDescriptor descriptor,
       ColumnChunkMetaData chunk,
       ParquetFileWriter writer,
-      MessageType schema,
       CompressionCodecName newCodecName,
       boolean encryptColumn,
       String originalCreatedBy)
@@ -871,7 +875,7 @@ public class ParquetRewriter implements Closeable {
     int dMax = descriptor.getMaxDefinitionLevel();
     PageReadStore pageReadStore = reader.readRowGroup(blockIndex);
     ColumnReadStoreImpl crStore =
-        new ColumnReadStoreImpl(pageReadStore, new DummyGroupConverter(), 
schema, originalCreatedBy);
+        new ColumnReadStoreImpl(pageReadStore, new DummyGroupConverter(), 
outSchema, originalCreatedBy);
     ColumnReader cReader = crStore.getColumnReader(descriptor);
 
     ParquetProperties.WriterVersion writerVersion = 
chunk.getEncodingStats().usesV2Pages()
@@ -883,14 +887,14 @@ public class ParquetRewriter implements Closeable {
     CompressionCodecFactory.BytesInputCompressor compressor = 
codecFactory.getCompressor(newCodecName);
 
     // Create new schema that only has the current column
-    MessageType newSchema = newSchema(schema, descriptor);
+    MessageType newSchema = newSchema(outSchema, descriptor);
     ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
         compressor,
         newSchema,
         props.getAllocator(),
         props.getColumnIndexTruncateLength(),
         props.getPageWriteChecksumEnabled(),
-        writer.getEncryptor(),
+        nullColumnEncryptor,
         numBlocksRewritten);
     ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
     ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index ce612baec..34c90a464 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -455,7 +455,7 @@ public class ParquetRewriterTest {
 
   private void testNullifyAndEncryptColumn(List<Path> inputPaths) throws 
Exception {
     Map<String, MaskMode> maskColumns = new HashMap<>();
-    maskColumns.put("DocId", MaskMode.NULLIFY);
+    maskColumns.put("Links.Forward", MaskMode.NULLIFY);
 
     String[] encryptColumns = {"DocId"};
     FileEncryptionProperties fileEncryptionProperties =
@@ -491,7 +491,7 @@ public class ParquetRewriterTest {
     validateColumnData(Collections.emptySet(), maskColumns.keySet(), 
fileDecryptionProperties, false);
 
     // Verify the page index
-    validatePageIndex(ImmutableSet.of("DocId"), false);
+    validatePageIndex(ImmutableSet.of("DocId", "Links.Forward"), false);
 
     // Verify the column is encrypted
     ParquetMetadata metaData = getFileMetaData(outputFile, 
fileDecryptionProperties);

Reply via email to