This is an automated email from the ASF dual-hosted git repository.

shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new c36be7b99 PARQUET-2229: ParquetRewriter masks and encrypts the same 
column (#1021)
c36be7b99 is described below

commit c36be7b990f4e74d5560f494b53bb99c3c701749
Author: Gang Wu <ust...@gmail.com>
AuthorDate: Sat Feb 11 06:22:55 2023 +0800

    PARQUET-2229: ParquetRewriter masks and encrypts the same column (#1021)
---
 .../parquet/hadoop/rewrite/ParquetRewriter.java    | 28 ++++----
 .../parquet/hadoop/rewrite/RewriteOptions.java     |  8 ---
 .../hadoop/rewrite/ParquetRewriterTest.java        | 80 ++++++++++++++++++++--
 3 files changed, 90 insertions(+), 26 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index e4870af98..e5befa99d 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -218,13 +218,7 @@ public class ParquetRewriter implements Closeable {
 
         reader.setStreamPosition(chunk.getStartingPos());
         CompressionCodecName newCodecName = this.newCodecName == null ? 
chunk.getCodec() : this.newCodecName;
-        ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
-        boolean encryptColumn = false;
-        if (encryptMode) {
-          columnChunkEncryptorRunTime =
-                  new ColumnChunkEncryptorRunTime(writer.getEncryptor(), 
chunk, blockId, columnId);
-          encryptColumn = encryptColumns != null && 
encryptColumns.contains(chunk.getPath());
-        }
+        boolean encryptColumn = encryptMode && encryptColumns != null && 
encryptColumns.contains(chunk.getPath());
 
         if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
           // Mask column and compress it again.
@@ -242,12 +236,19 @@ public class ParquetRewriter implements Closeable {
                     writer,
                     schema,
                     newCodecName,
-                    columnChunkEncryptorRunTime,
+                    blockId,
                     encryptColumn);
           } else {
             throw new UnsupportedOperationException("Only nullify is supported 
for now");
           }
         } else if (encryptMode || this.newCodecName != null) {
+          // Prepare encryption context
+          ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
+          if (encryptMode) {
+            columnChunkEncryptorRunTime =
+                    new ColumnChunkEncryptorRunTime(writer.getEncryptor(), 
chunk, blockId, columnId);
+          }
+
           // Translate compression and/or encryption
           writer.startColumn(descriptor, 
crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
           processChunk(chunk, newCodecName, columnChunkEncryptorRunTime, 
encryptColumn);
@@ -570,12 +571,12 @@ public class ParquetRewriter implements Closeable {
                              ParquetFileWriter writer,
                              MessageType schema,
                              CompressionCodecName newCodecName,
-                             ColumnChunkEncryptorRunTime 
columnChunkEncryptorRunTime,
+                             int rowGroupOrdinal,
                              boolean encryptColumn) throws IOException {
-    // TODO: support nullifying and encrypting same column.
-    if (columnChunkEncryptorRunTime != null) {
-      throw new RuntimeException("Nullifying and encrypting column is not 
implemented yet");
+    if (encryptColumn) {
+      Preconditions.checkArgument(writer.getEncryptor() != null, "Missing 
encryptor");
     }
+
     long totalChunkValues = chunk.getValueCount();
     int dMax = descriptor.getMaxDefinitionLevel();
     ColumnReader cReader = crStore.getColumnReader(descriptor);
@@ -591,7 +592,8 @@ public class ParquetRewriter implements Closeable {
     // Create new schema that only has the current column
     MessageType newSchema = newSchema(schema, descriptor);
     ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
-            compressor, newSchema, props.getAllocator(), 
props.getColumnIndexTruncateLength());
+            compressor, newSchema, props.getAllocator(), 
props.getColumnIndexTruncateLength(),
+            props.getPageWriteChecksumEnabled(), writer.getEncryptor(), 
rowGroupOrdinal);
     ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
     ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
index a11c5a61f..e15ba43c7 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -152,14 +152,6 @@ public class RewriteOptions {
         }
       }
 
-      // TODO: support masking and encrypting same columns
-      if (maskColumns != null && encryptColumns != null) {
-        for (String encryptColumn : encryptColumns) {
-          Preconditions.checkArgument(!maskColumns.containsKey(encryptColumn),
-                  "Cannot encrypt and mask same column");
-        }
-      }
-
       if (encryptColumns != null && !encryptColumns.isEmpty()) {
         Preconditions.checkArgument(fileEncryptionProperties != null,
                 "FileEncryptionProperties is required when encrypting 
columns");
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index a6881577c..25b4f3564 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -75,6 +75,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 public class ParquetRewriterTest {
@@ -243,6 +244,60 @@ public class ParquetRewriterTest {
     validateCreatedBy();
   }
 
+  @Test
+  public void testNullifyAndEncryptColumn() throws Exception {
+    testSetup("GZIP");
+
+    Map<String, MaskMode> maskColumns = new HashMap<>();
+    maskColumns.put("DocId", MaskMode.NULLIFY);
+
+    String[] encryptColumns = {"DocId"};
+    FileEncryptionProperties fileEncryptionProperties = 
EncDecProperties.getFileEncryptionProperties(
+            encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false);
+
+    Path inputPath = new Path(inputFile.getFileName());
+    Path outputPath = new Path(outputFile);
+    RewriteOptions options = new RewriteOptions.Builder(conf, inputPath, 
outputPath).mask(maskColumns)
+            
.encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties).build();
+
+    rewriter = new ParquetRewriter(options);
+    rewriter.processBlocks();
+    rewriter.close();
+
+    FileDecryptionProperties fileDecryptionProperties = 
EncDecProperties.getFileDecryptionProperties();
+
+    // Verify codec has not been changed
+    verifyCodec(outputFile, CompressionCodecName.GZIP, 
fileDecryptionProperties);
+
+    // Verify the data are not changed for non-encrypted and non-masked 
columns.
+    // Also make sure the masked column is nullified.
+    validateColumnData(outputFile,
+            inputFile.getFileContent(), Collections.emptySet(), 
maskColumns.keySet(), fileDecryptionProperties);
+
+    // Verify the page index
+    validatePageIndex(new HashMap<Integer, Integer>() {{
+      put(1, 1);
+      put(2, 2);
+      put(3, 3);
+      put(4, 4);
+    }});
+
+    // Verify the column is encrypted
+    ParquetMetadata metaData = getFileMetaData(outputFile, 
fileDecryptionProperties);
+    assertTrue(metaData.getBlocks().size() > 0);
+    Set<String> encryptedColumns = new 
HashSet<>(Arrays.asList(encryptColumns));
+    for (BlockMetaData blockMetaData : metaData.getBlocks()) {
+      List<ColumnChunkMetaData> columns = blockMetaData.getColumns();
+      for (ColumnChunkMetaData column : columns) {
+        if (encryptedColumns.contains(column.getPath().toDotString())) {
+          assertTrue(column.isEncrypted());
+        } else {
+          assertFalse(column.isEncrypted());
+        }
+      }
+    }
+  }
+
   private void testSetup(String compression) throws IOException {
     MessageType schema = createSchema();
     inputFile = new TestFileBuilder(conf, schema)
@@ -272,27 +327,42 @@ public class ParquetRewriterTest {
             .withConf(conf).withDecryption(fileDecryptionProperties).build();
     for (int i = 0; i < numRecord; i++) {
       Group group = reader.read();
-      if (!prunePaths.contains("DocId") && !nullifiedPaths.contains("DocId")) {
-        assertTrue(group.getLong("DocId", 0) == 
fileContent[i].getLong("DocId", 0));
+
+      if (!prunePaths.contains("DocId")) {
+        if (nullifiedPaths.contains("DocId")) {
+          assertThrows(RuntimeException.class, () -> group.getLong("DocId", 
0));
+        } else {
+          assertEquals(group.getLong("DocId", 0), 
fileContent[i].getLong("DocId", 0));
+        }
       }
+
       if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) {
         assertArrayEquals(group.getBinary("Name", 0).getBytes(),
                 fileContent[i].getBinary("Name", 0).getBytes());
       }
+
       if (!prunePaths.contains("Gender") && 
!nullifiedPaths.contains("Gender")) {
         assertArrayEquals(group.getBinary("Gender", 0).getBytes(),
                 fileContent[i].getBinary("Gender", 0).getBytes());
       }
+
       Group subGroup = group.getGroup("Links", 0);
+
       if (!prunePaths.contains("Links.Backward") && 
!nullifiedPaths.contains("Links.Backward")) {
         assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(),
                 fileContent[i].getGroup("Links", 0).getBinary("Backward", 
0).getBytes());
       }
-      if (!prunePaths.contains("Links.Forward") && 
!nullifiedPaths.contains("Links.Forward")) {
-        assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(),
-                fileContent[i].getGroup("Links", 0).getBinary("Forward", 
0).getBytes());
+
+      if (!prunePaths.contains("Links.Forward")) {
+        if (nullifiedPaths.contains("Links.Forward")) {
+          assertThrows(RuntimeException.class, () -> 
subGroup.getBinary("Forward", 0));
+        } else {
+          assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(),
+                  fileContent[i].getGroup("Links", 0).getBinary("Forward", 
0).getBytes());
+        }
       }
     }
+
     reader.close();
   }
 

Reply via email to