This is an automated email from the ASF dual-hosted git repository. shangxinli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push: new c36be7b99 PARQUET-2229: ParquetRewriter masks and encrypts the same column (#1021) c36be7b99 is described below commit c36be7b990f4e74d5560f494b53bb99c3c701749 Author: Gang Wu <ust...@gmail.com> AuthorDate: Sat Feb 11 06:22:55 2023 +0800 PARQUET-2229: ParquetRewriter masks and encrypts the same column (#1021) --- .../parquet/hadoop/rewrite/ParquetRewriter.java | 28 ++++---- .../parquet/hadoop/rewrite/RewriteOptions.java | 8 --- .../hadoop/rewrite/ParquetRewriterTest.java | 80 ++++++++++++++++++++-- 3 files changed, 90 insertions(+), 26 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java index e4870af98..e5befa99d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java @@ -218,13 +218,7 @@ public class ParquetRewriter implements Closeable { reader.setStreamPosition(chunk.getStartingPos()); CompressionCodecName newCodecName = this.newCodecName == null ? chunk.getCodec() : this.newCodecName; - ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null; - boolean encryptColumn = false; - if (encryptMode) { - columnChunkEncryptorRunTime = - new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId); - encryptColumn = encryptColumns != null && encryptColumns.contains(chunk.getPath()); - } + boolean encryptColumn = encryptMode && encryptColumns != null && encryptColumns.contains(chunk.getPath()); if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) { // Mask column and compress it again. @@ -242,12 +236,19 @@ public class ParquetRewriter implements Closeable { writer, schema, newCodecName, - columnChunkEncryptorRunTime, + blockId, encryptColumn); } else { throw new UnsupportedOperationException("Only nullify is supported for now"); } } else if (encryptMode || this.newCodecName != null) { + // Prepare encryption context + ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null; + if (encryptMode) { + columnChunkEncryptorRunTime = + new ColumnChunkEncryptorRunTime(writer.getEncryptor(), chunk, blockId, columnId); + } + // Translate compression and/or encryption writer.startColumn(descriptor, crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName); processChunk(chunk, newCodecName, columnChunkEncryptorRunTime, encryptColumn); @@ -570,12 +571,12 @@ public class ParquetRewriter implements Closeable { ParquetFileWriter writer, MessageType schema, CompressionCodecName newCodecName, - ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime, + int rowGroupOrdinal, boolean encryptColumn) throws IOException { - // TODO: support nullifying and encrypting same column. - if (columnChunkEncryptorRunTime != null) { - throw new RuntimeException("Nullifying and encrypting column is not implemented yet"); + if (encryptColumn) { + Preconditions.checkArgument(writer.getEncryptor() != null, "Missing encryptor"); } + long totalChunkValues = chunk.getValueCount(); int dMax = descriptor.getMaxDefinitionLevel(); ColumnReader cReader = crStore.getColumnReader(descriptor); @@ -591,7 +592,8 @@ public class ParquetRewriter implements Closeable { // Create new schema that only has the current column MessageType newSchema = newSchema(schema, descriptor); ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore( - compressor, newSchema, props.getAllocator(), props.getColumnIndexTruncateLength()); + compressor, newSchema, props.getAllocator(), props.getColumnIndexTruncateLength(), + props.getPageWriteChecksumEnabled(), writer.getEncryptor(), rowGroupOrdinal); ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore); ColumnWriter cWriter = cStore.getColumnWriter(descriptor); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java index a11c5a61f..e15ba43c7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java @@ -152,14 +152,6 @@ public class RewriteOptions { } } - // TODO: support masking and encrypting same columns - if (maskColumns != null && encryptColumns != null) { - for (String encryptColumn : encryptColumns) { - Preconditions.checkArgument(!maskColumns.containsKey(encryptColumn), - "Cannot encrypt and mask same column"); - } - } - if (encryptColumns != null && !encryptColumns.isEmpty()) { Preconditions.checkArgument(fileEncryptionProperties != null, "FileEncryptionProperties is required when encrypting columns"); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java index a6881577c..25b4f3564 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java @@ -75,6 +75,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; public class ParquetRewriterTest { @@ -243,6 +244,60 @@ public class ParquetRewriterTest { validateCreatedBy(); } + @Test + public void testNullifyAndEncryptColumn() throws Exception { + testSetup("GZIP"); + + Map<String, MaskMode> maskColumns = new HashMap<>(); + maskColumns.put("DocId", MaskMode.NULLIFY); + + String[] encryptColumns = {"DocId"}; + FileEncryptionProperties fileEncryptionProperties = EncDecProperties.getFileEncryptionProperties( + encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false); + + Path inputPath = new Path(inputFile.getFileName()); + Path outputPath = new Path(outputFile); + RewriteOptions options = new RewriteOptions.Builder(conf, inputPath, outputPath).mask(maskColumns) + .encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties).build(); + + rewriter = new ParquetRewriter(options); + rewriter.processBlocks(); + rewriter.close(); + + FileDecryptionProperties fileDecryptionProperties = EncDecProperties.getFileDecryptionProperties(); + + // Verify codec has not been changed + verifyCodec(outputFile, CompressionCodecName.GZIP, fileDecryptionProperties); + + // Verify the data are not changed for non-encrypted and non-masked columns. + // Also make sure the masked column is nullified. + validateColumnData(outputFile, + inputFile.getFileContent(), Collections.emptySet(), maskColumns.keySet(), fileDecryptionProperties); + + // Verify the page index + validatePageIndex(new HashMap<Integer, Integer>() {{ + put(1, 1); + put(2, 2); + put(3, 3); + put(4, 4); + }}); + + // Verify the column is encrypted + ParquetMetadata metaData = getFileMetaData(outputFile, fileDecryptionProperties); + assertTrue(metaData.getBlocks().size() > 0); + Set<String> encryptedColumns = new HashSet<>(Arrays.asList(encryptColumns)); + for (BlockMetaData blockMetaData : metaData.getBlocks()) { + List<ColumnChunkMetaData> columns = blockMetaData.getColumns(); + for (ColumnChunkMetaData column : columns) { + if (encryptedColumns.contains(column.getPath().toDotString())) { + assertTrue(column.isEncrypted()); + } else { + assertFalse(column.isEncrypted()); + } + } + } + } + private void testSetup(String compression) throws IOException { MessageType schema = createSchema(); inputFile = new TestFileBuilder(conf, schema) @@ -272,27 +327,42 @@ public class ParquetRewriterTest { .withConf(conf).withDecryption(fileDecryptionProperties).build(); for (int i = 0; i < numRecord; i++) { Group group = reader.read(); - if (!prunePaths.contains("DocId") && !nullifiedPaths.contains("DocId")) { - assertTrue(group.getLong("DocId", 0) == fileContent[i].getLong("DocId", 0)); + + if (!prunePaths.contains("DocId")) { + if (nullifiedPaths.contains("DocId")) { + assertThrows(RuntimeException.class, () -> group.getLong("DocId", 0)); + } else { + assertEquals(group.getLong("DocId", 0), fileContent[i].getLong("DocId", 0)); + } } + if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) { assertArrayEquals(group.getBinary("Name", 0).getBytes(), fileContent[i].getBinary("Name", 0).getBytes()); } + if (!prunePaths.contains("Gender") && !nullifiedPaths.contains("Gender")) { assertArrayEquals(group.getBinary("Gender", 0).getBytes(), fileContent[i].getBinary("Gender", 0).getBytes()); } + Group subGroup = group.getGroup("Links", 0); + if (!prunePaths.contains("Links.Backward") && !nullifiedPaths.contains("Links.Backward")) { assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(), fileContent[i].getGroup("Links", 0).getBinary("Backward", 0).getBytes()); } - if (!prunePaths.contains("Links.Forward") && !nullifiedPaths.contains("Links.Forward")) { - assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), - fileContent[i].getGroup("Links", 0).getBinary("Forward", 0).getBytes()); + + if (!prunePaths.contains("Links.Forward")) { + if (nullifiedPaths.contains("Links.Forward")) { + assertThrows(RuntimeException.class, () -> subGroup.getBinary("Forward", 0)); + } else { + assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(), + fileContent[i].getGroup("Links", 0).getBinary("Forward", 0).getBytes()); + } } } + reader.close(); }