This is an automated email from the ASF dual-hosted git repository.
shangxinli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new c36be7b99 PARQUET-2229: ParquetRewriter masks and encrypts the same
column (#1021)
c36be7b99 is described below
commit c36be7b990f4e74d5560f494b53bb99c3c701749
Author: Gang Wu <[email protected]>
AuthorDate: Sat Feb 11 06:22:55 2023 +0800
PARQUET-2229: ParquetRewriter masks and encrypts the same column (#1021)
---
.../parquet/hadoop/rewrite/ParquetRewriter.java | 28 ++++----
.../parquet/hadoop/rewrite/RewriteOptions.java | 8 ---
.../hadoop/rewrite/ParquetRewriterTest.java | 80 ++++++++++++++++++++--
3 files changed, 90 insertions(+), 26 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
index e4870af98..e5befa99d 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java
@@ -218,13 +218,7 @@ public class ParquetRewriter implements Closeable {
reader.setStreamPosition(chunk.getStartingPos());
CompressionCodecName newCodecName = this.newCodecName == null ?
chunk.getCodec() : this.newCodecName;
- ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
- boolean encryptColumn = false;
- if (encryptMode) {
- columnChunkEncryptorRunTime =
- new ColumnChunkEncryptorRunTime(writer.getEncryptor(),
chunk, blockId, columnId);
- encryptColumn = encryptColumns != null &&
encryptColumns.contains(chunk.getPath());
- }
+ boolean encryptColumn = encryptMode && encryptColumns != null &&
encryptColumns.contains(chunk.getPath());
if (maskColumns != null && maskColumns.containsKey(chunk.getPath())) {
// Mask column and compress it again.
@@ -242,12 +236,19 @@ public class ParquetRewriter implements Closeable {
writer,
schema,
newCodecName,
- columnChunkEncryptorRunTime,
+ blockId,
encryptColumn);
} else {
throw new UnsupportedOperationException("Only nullify is supported
for now");
}
} else if (encryptMode || this.newCodecName != null) {
+ // Prepare encryption context
+ ColumnChunkEncryptorRunTime columnChunkEncryptorRunTime = null;
+ if (encryptMode) {
+ columnChunkEncryptorRunTime =
+ new ColumnChunkEncryptorRunTime(writer.getEncryptor(),
chunk, blockId, columnId);
+ }
+
// Translate compression and/or encryption
writer.startColumn(descriptor,
crStore.getColumnReader(descriptor).getTotalValueCount(), newCodecName);
processChunk(chunk, newCodecName, columnChunkEncryptorRunTime,
encryptColumn);
@@ -570,12 +571,12 @@ public class ParquetRewriter implements Closeable {
ParquetFileWriter writer,
MessageType schema,
CompressionCodecName newCodecName,
- ColumnChunkEncryptorRunTime
columnChunkEncryptorRunTime,
+ int rowGroupOrdinal,
boolean encryptColumn) throws IOException {
- // TODO: support nullifying and encrypting same column.
- if (columnChunkEncryptorRunTime != null) {
- throw new RuntimeException("Nullifying and encrypting column is not
implemented yet");
+ if (encryptColumn) {
+ Preconditions.checkArgument(writer.getEncryptor() != null, "Missing
encryptor");
}
+
long totalChunkValues = chunk.getValueCount();
int dMax = descriptor.getMaxDefinitionLevel();
ColumnReader cReader = crStore.getColumnReader(descriptor);
@@ -591,7 +592,8 @@ public class ParquetRewriter implements Closeable {
// Create new schema that only has the current column
MessageType newSchema = newSchema(schema, descriptor);
ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
- compressor, newSchema, props.getAllocator(),
props.getColumnIndexTruncateLength());
+ compressor, newSchema, props.getAllocator(),
props.getColumnIndexTruncateLength(),
+ props.getPageWriteChecksumEnabled(), writer.getEncryptor(),
rowGroupOrdinal);
ColumnWriteStore cStore = props.newColumnWriteStore(newSchema, cPageStore);
ColumnWriter cWriter = cStore.getColumnWriter(descriptor);
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
index a11c5a61f..e15ba43c7 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java
@@ -152,14 +152,6 @@ public class RewriteOptions {
}
}
- // TODO: support masking and encrypting same columns
- if (maskColumns != null && encryptColumns != null) {
- for (String encryptColumn : encryptColumns) {
- Preconditions.checkArgument(!maskColumns.containsKey(encryptColumn),
- "Cannot encrypt and mask same column");
- }
- }
-
if (encryptColumns != null && !encryptColumns.isEmpty()) {
Preconditions.checkArgument(fileEncryptionProperties != null,
"FileEncryptionProperties is required when encrypting
columns");
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
index a6881577c..25b4f3564 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/rewrite/ParquetRewriterTest.java
@@ -75,6 +75,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class ParquetRewriterTest {
@@ -243,6 +244,60 @@ public class ParquetRewriterTest {
validateCreatedBy();
}
+ @Test
+ public void testNullifyAndEncryptColumn() throws Exception {
+ testSetup("GZIP");
+
+ Map<String, MaskMode> maskColumns = new HashMap<>();
+ maskColumns.put("DocId", MaskMode.NULLIFY);
+
+ String[] encryptColumns = {"DocId"};
+ FileEncryptionProperties fileEncryptionProperties =
EncDecProperties.getFileEncryptionProperties(
+ encryptColumns, ParquetCipher.AES_GCM_CTR_V1, false);
+
+ Path inputPath = new Path(inputFile.getFileName());
+ Path outputPath = new Path(outputFile);
+ RewriteOptions options = new RewriteOptions.Builder(conf, inputPath,
outputPath).mask(maskColumns)
+
.encrypt(Arrays.asList(encryptColumns)).encryptionProperties(fileEncryptionProperties).build();
+
+ rewriter = new ParquetRewriter(options);
+ rewriter.processBlocks();
+ rewriter.close();
+
+ FileDecryptionProperties fileDecryptionProperties =
EncDecProperties.getFileDecryptionProperties();
+
+ // Verify codec has not been changed
+ verifyCodec(outputFile, CompressionCodecName.GZIP,
fileDecryptionProperties);
+
+ // Verify the data are not changed for non-encrypted and non-masked
columns.
+ // Also make sure the masked column is nullified.
+ validateColumnData(outputFile,
+ inputFile.getFileContent(), Collections.emptySet(),
maskColumns.keySet(), fileDecryptionProperties);
+
+ // Verify the page index
+ validatePageIndex(new HashMap<Integer, Integer>() {{
+ put(1, 1);
+ put(2, 2);
+ put(3, 3);
+ put(4, 4);
+ }});
+
+ // Verify the column is encrypted
+ ParquetMetadata metaData = getFileMetaData(outputFile,
fileDecryptionProperties);
+ assertTrue(metaData.getBlocks().size() > 0);
+ Set<String> encryptedColumns = new
HashSet<>(Arrays.asList(encryptColumns));
+ for (BlockMetaData blockMetaData : metaData.getBlocks()) {
+ List<ColumnChunkMetaData> columns = blockMetaData.getColumns();
+ for (ColumnChunkMetaData column : columns) {
+ if (encryptedColumns.contains(column.getPath().toDotString())) {
+ assertTrue(column.isEncrypted());
+ } else {
+ assertFalse(column.isEncrypted());
+ }
+ }
+ }
+ }
+
private void testSetup(String compression) throws IOException {
MessageType schema = createSchema();
inputFile = new TestFileBuilder(conf, schema)
@@ -272,27 +327,42 @@ public class ParquetRewriterTest {
.withConf(conf).withDecryption(fileDecryptionProperties).build();
for (int i = 0; i < numRecord; i++) {
Group group = reader.read();
- if (!prunePaths.contains("DocId") && !nullifiedPaths.contains("DocId")) {
- assertTrue(group.getLong("DocId", 0) ==
fileContent[i].getLong("DocId", 0));
+
+ if (!prunePaths.contains("DocId")) {
+ if (nullifiedPaths.contains("DocId")) {
+ assertThrows(RuntimeException.class, () -> group.getLong("DocId",
0));
+ } else {
+ assertEquals(group.getLong("DocId", 0),
fileContent[i].getLong("DocId", 0));
+ }
}
+
if (!prunePaths.contains("Name") && !nullifiedPaths.contains("Name")) {
assertArrayEquals(group.getBinary("Name", 0).getBytes(),
fileContent[i].getBinary("Name", 0).getBytes());
}
+
if (!prunePaths.contains("Gender") &&
!nullifiedPaths.contains("Gender")) {
assertArrayEquals(group.getBinary("Gender", 0).getBytes(),
fileContent[i].getBinary("Gender", 0).getBytes());
}
+
Group subGroup = group.getGroup("Links", 0);
+
if (!prunePaths.contains("Links.Backward") &&
!nullifiedPaths.contains("Links.Backward")) {
assertArrayEquals(subGroup.getBinary("Backward", 0).getBytes(),
fileContent[i].getGroup("Links", 0).getBinary("Backward",
0).getBytes());
}
- if (!prunePaths.contains("Links.Forward") &&
!nullifiedPaths.contains("Links.Forward")) {
- assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(),
- fileContent[i].getGroup("Links", 0).getBinary("Forward",
0).getBytes());
+
+ if (!prunePaths.contains("Links.Forward")) {
+ if (nullifiedPaths.contains("Links.Forward")) {
+ assertThrows(RuntimeException.class, () ->
subGroup.getBinary("Forward", 0));
+ } else {
+ assertArrayEquals(subGroup.getBinary("Forward", 0).getBytes(),
+ fileContent[i].getGroup("Links", 0).getBinary("Forward",
0).getBytes());
+ }
}
}
+
reader.close();
}