This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 0fea3e1e2 GH-3338: Support encrypted files for Parquet CLI commands
(#3339)
0fea3e1e2 is described below
commit 0fea3e1e22fffb0a25193e3efb9a5d090899458a
Author: Arnav Balyan <[email protected]>
AuthorDate: Wed Oct 8 08:17:44 2025 +0530
GH-3338: Support encrypted files for Parquet CLI commands (#3339)
---
.../java/org/apache/parquet/cli/BaseCommand.java | 96 +++++++++++++
.../cli/commands/ParquetMetadataCommand.java | 46 +++---
.../cli/commands/ShowBloomFilterCommand.java | 6 +-
.../parquet/cli/commands/ShowPagesCommand.java | 2 +-
.../cli/commands/ShowBloomFilterCommandTest.java | 154 +++++++++++++++++++++
5 files changed, 275 insertions(+), 29 deletions(-)
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
index 4ba843c6b..0c8841b05 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.cli.json.AvroJsonReader;
@@ -56,10 +57,14 @@ import org.apache.parquet.cli.util.Formats;
import org.apache.parquet.cli.util.GetClassLoader;
import org.apache.parquet.cli.util.Schemas;
import org.apache.parquet.cli.util.SeekableFSDataInputStream;
+import org.apache.parquet.crypto.DecryptionKeyRetriever;
+import org.apache.parquet.crypto.FileDecryptionProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
import org.slf4j.Logger;
public abstract class BaseCommand implements Command, Configurable {
@@ -360,6 +365,97 @@ public abstract class BaseCommand implements Command,
Configurable {
return urls;
}
+ protected ParquetFileReader createParquetFileReader(String source) throws
IOException {
+ InputFile in = HadoopInputFile.fromPath(qualifiedPath(source), getConf());
+
+ HadoopReadOptions.Builder optionsBuilder =
HadoopReadOptions.builder(getConf());
+ FileDecryptionProperties decryptionProperties =
createFileDecryptionProperties();
+ if (decryptionProperties != null) {
+ optionsBuilder.withDecryption(decryptionProperties);
+ }
+
+ return ParquetFileReader.open(in, optionsBuilder.build());
+ }
+
+ protected FileDecryptionProperties createFileDecryptionProperties() {
+ Configuration conf = getConf();
+ String footerKeyHex = conf.get("parquet.encryption.footer.key");
+ String columnKeysHex = conf.get("parquet.encryption.column.keys");
+
+ if (footerKeyHex == null && columnKeysHex == null) {
+ return null;
+ }
+
+ ConfigurableKeyRetriever keyRetriever = new ConfigurableKeyRetriever();
+ FileDecryptionProperties.Builder builder =
+ FileDecryptionProperties.builder().withPlaintextFilesAllowed();
+
+ byte[] footerKey = hexToBytes(footerKeyHex);
+ builder.withFooterKey(footerKey);
+
+ parseAndSetColumnKeys(columnKeysHex, keyRetriever);
+ builder.withKeyRetriever(keyRetriever);
+
+ return builder.build();
+ }
+
+ private void parseAndSetColumnKeys(String columnKeysStr,
ConfigurableKeyRetriever keyRetriever) {
+ String[] keyToColumns = columnKeysStr.split(";");
+ for (int i = 0; i < keyToColumns.length; ++i) {
+ final String curKeyToColumns = keyToColumns[i].trim();
+ if (curKeyToColumns.isEmpty()) {
+ continue;
+ }
+
+ String[] parts = curKeyToColumns.split(":");
+ if (parts.length != 2) {
+ console.warn(
+ "Incorrect key to columns mapping in
parquet.encryption.column.keys: [{}]", curKeyToColumns);
+ continue;
+ }
+
+ String columnKeyId = parts[0].trim();
+ String columnNamesStr = parts[1].trim();
+ String[] columnNames = columnNamesStr.split(",");
+
+ byte[] columnKeyBytes = hexToBytes(columnKeyId);
+
+ for (int j = 0; j < columnNames.length; ++j) {
+ final String columnName = columnNames[j].trim();
+ keyRetriever.putKey(columnName, columnKeyBytes);
+ console.debug("Added decryption key for column: {}", columnName);
+ }
+ }
+ }
+
+ private byte[] hexToBytes(String hex) {
+
+ if (hex.startsWith("0x") || hex.startsWith("0X")) {
+ hex = hex.substring(2);
+ }
+
+ int len = hex.length();
+ byte[] data = new byte[len / 2];
+ for (int i = 0; i < len; i += 2) {
+ data[i / 2] = (byte) ((Character.digit(hex.charAt(i), 16) << 4) +
Character.digit(hex.charAt(i + 1), 16));
+ }
+ return data;
+ }
+
+ private static class ConfigurableKeyRetriever implements
DecryptionKeyRetriever {
+ private final Map<String, byte[]> keyMap = new java.util.HashMap<>();
+
+ public void putKey(String keyId, byte[] keyBytes) {
+ keyMap.put(keyId, keyBytes);
+ }
+
+ @Override
+ public byte[] getKey(byte[] keyMetaData) {
+ String keyId = new String(keyMetaData, StandardCharsets.UTF_8);
+ return keyMap.get(keyId);
+ }
+ }
+
protected <D> Iterable<D> openDataFile(final String source, Schema
projection) throws IOException {
Formats.Format format = Formats.detectFormat(open(source));
switch (format) {
diff --git
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
index 9566c28cc..64421f6de 100644
---
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
+++
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ParquetMetadataCommand.java
@@ -43,7 +43,6 @@ import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.statistics.Statistics;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -70,32 +69,33 @@ public class ParquetMetadataCommand extends BaseCommand {
Preconditions.checkArgument(targets.size() == 1, "Cannot process multiple
Parquet files.");
String source = targets.get(0);
- ParquetMetadata footer =
- ParquetFileReader.readFooter(getConf(), qualifiedPath(source),
ParquetMetadataConverter.NO_FILTER);
-
- console.info("\nFile path: {}", source);
- console.info("Created by: {}", footer.getFileMetaData().getCreatedBy());
-
- Map<String, String> kv = footer.getFileMetaData().getKeyValueMetaData();
- if (kv != null && !kv.isEmpty()) {
- console.info("Properties:");
- String format = " %" + maxSize(kv.keySet()) + "s: %s";
- for (Map.Entry<String, String> entry : kv.entrySet()) {
- console.info(String.format(format, entry.getKey(), entry.getValue()));
+ try (ParquetFileReader reader = createParquetFileReader(source)) {
+ ParquetMetadata footer = reader.getFooter();
+
+ console.info("\nFile path: {}", source);
+ console.info("Created by: {}", footer.getFileMetaData().getCreatedBy());
+
+ Map<String, String> kv = footer.getFileMetaData().getKeyValueMetaData();
+ if (kv != null && !kv.isEmpty()) {
+ console.info("Properties:");
+ String format = " %" + maxSize(kv.keySet()) + "s: %s";
+ for (Map.Entry<String, String> entry : kv.entrySet()) {
+ console.info(String.format(format, entry.getKey(),
entry.getValue()));
+ }
+ } else {
+ console.info("Properties: (none)");
}
- } else {
- console.info("Properties: (none)");
- }
- MessageType schema = footer.getFileMetaData().getSchema();
- console.info("Schema:\n{}", schema);
+ MessageType schema = footer.getFileMetaData().getSchema();
+ console.info("Schema:\n{}", schema);
- List<BlockMetaData> rowGroups = footer.getBlocks();
- for (int index = 0, n = rowGroups.size(); index < n; index += 1) {
- printRowGroup(console, index, rowGroups.get(index), schema);
- }
+ List<BlockMetaData> rowGroups = footer.getBlocks();
+ for (int index = 0, n = rowGroups.size(); index < n; index += 1) {
+ printRowGroup(console, index, rowGroups.get(index), schema);
+ }
- console.info("");
+ console.info("");
+ }
return 0;
}
diff --git
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowBloomFilterCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowBloomFilterCommand.java
index d89a29c14..8fccd05d8 100644
---
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowBloomFilterCommand.java
+++
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowBloomFilterCommand.java
@@ -34,8 +34,6 @@ import org.apache.parquet.hadoop.BloomFilterReader;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
@@ -68,9 +66,7 @@ public class ShowBloomFilterCommand extends BaseCommand {
public int run() throws IOException {
Preconditions.checkArgument(file != null, "A Parquet file is required.");
- InputFile in = HadoopInputFile.fromPath(qualifiedPath(file), getConf());
-
- try (ParquetFileReader reader = ParquetFileReader.open(in)) {
+ try (ParquetFileReader reader = createParquetFileReader(file)) {
MessageType schema = reader.getFileMetaData().getSchema();
PrimitiveType type = Util.primitive(columnPath, schema);
diff --git
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
index 8ca531c6d..b1ec6cde5 100644
---
a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
+++
b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
@@ -96,7 +96,7 @@ public class ShowPagesCommand extends BaseCommand {
}
String source = targets.get(0);
- try (ParquetFileReader reader = ParquetFileReader.open(getConf(),
qualifiedPath(source))) {
+ try (ParquetFileReader reader = createParquetFileReader(source)) {
MessageType schema = reader.getFileMetaData().getSchema();
Map<ColumnDescriptor, PrimitiveType> columns = Maps.newLinkedHashMap();
if (this.columns == null || this.columns.isEmpty()) {
diff --git
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ShowBloomFilterCommandTest.java
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ShowBloomFilterCommandTest.java
index 4775e7f51..7e6c1063c 100644
---
a/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ShowBloomFilterCommandTest.java
+++
b/parquet-cli/src/test/java/org/apache/parquet/cli/commands/ShowBloomFilterCommandTest.java
@@ -19,10 +19,31 @@
package org.apache.parquet.cli.commands;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.crypto.ColumnEncryptionProperties;
+import org.apache.parquet.crypto.FileEncryptionProperties;
+import org.apache.parquet.crypto.ParquetCipher;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
import org.junit.Assert;
import org.junit.Test;
@@ -37,4 +58,137 @@ public class ShowBloomFilterCommandTest extends
ParquetFileTest {
command.setConf(new Configuration());
Assert.assertEquals(0, command.run());
}
+
+ @Test
+ public void testEncryptedFileWithBloomFilter() throws IOException {
+ File encryptedFile = createEncryptedFileWithBloomFilter();
+
+ ShowBloomFilterCommand command = new
ShowBloomFilterCommand(createLogger());
+ command.file = encryptedFile.getAbsolutePath();
+ command.columnPath = "name";
+ command.testValues = Arrays.asList(new String[] {"test_value_1",
"non_existent_value"});
+
+ Configuration conf = new Configuration();
+ conf.set("parquet.encryption.footer.key",
"0102030405060708090a0b0c0d0e0f10");
+ conf.set(
+ "parquet.encryption.column.keys",
+
"02030405060708090a0b0c0d0e0f1011:name,email;0405060708090a0b0c0d0e0f10111213:phone");
+ command.setConf(conf);
+
+ Assert.assertEquals(0, command.run());
+
+ ShowBloomFilterCommand emailCommand = new
ShowBloomFilterCommand(createLogger());
+ emailCommand.file = encryptedFile.getAbsolutePath();
+ emailCommand.columnPath = "email";
+ emailCommand.testValues = Arrays.asList(new String[] {"[email protected]",
"[email protected]"});
+ emailCommand.setConf(conf);
+
+ Assert.assertEquals(0, emailCommand.run());
+
+ ShowBloomFilterCommand phoneCommand = new
ShowBloomFilterCommand(createLogger());
+ phoneCommand.file = encryptedFile.getAbsolutePath();
+ phoneCommand.columnPath = "phone";
+ phoneCommand.testValues = Arrays.asList(new String[] {"555-0001",
"555-9999"});
+ phoneCommand.setConf(conf);
+
+ Assert.assertEquals(0, phoneCommand.run());
+
+ encryptedFile.delete();
+ }
+
+ private File createEncryptedFileWithBloomFilter() throws IOException {
+ MessageType schema = Types.buildMessage()
+ .required(INT32)
+ .named("id")
+ .required(BINARY)
+ .named("name")
+ .required(BINARY)
+ .named("email")
+ .required(BINARY)
+ .named("phone")
+ .named("test_schema");
+
+ File tempFile = new File(getTempFolder(), "encrypted_bloom_test.parquet");
+ tempFile.deleteOnExit();
+
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(schema, conf);
+
+ String[] encryptColumns = {"name", "email", "phone"};
+ FileEncryptionProperties encryptionProperties =
+ createFileEncryptionProperties(encryptColumns,
ParquetCipher.AES_GCM_CTR_V1, true);
+
+ SimpleGroupFactory factory = new SimpleGroupFactory(schema);
+ String[] nameValues = {"test_value_1", "test_value_2", "another_test",
"bloom_filter_test", "final_value"};
+ String[] emailValues = {
+ "[email protected]", "[email protected]", "[email protected]",
"[email protected]", "[email protected]"
+ };
+ String[] phoneValues = {"555-0001", "555-0002", "555-0003", "555-0004",
"555-0005"};
+
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(new
Path(tempFile.toURI()))
+ .withConf(conf)
+ .withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
+ .withEncryption(encryptionProperties)
+ .withBloomFilterEnabled("name", true)
+ .withBloomFilterEnabled("email", true)
+ .withBloomFilterEnabled("phone", true)
+ .withPageSize(1024)
+ .withRowGroupSize(4096)
+ .build()) {
+
+ for (int i = 0; i < nameValues.length; i++) {
+ SimpleGroup group = (SimpleGroup) factory.newGroup();
+ group.add("id", i + 1);
+ group.add("name", Binary.fromString(nameValues[i]));
+ group.add("email", Binary.fromString(emailValues[i]));
+ group.add("phone", Binary.fromString(phoneValues[i]));
+ writer.write(group);
+ }
+ }
+
+ return tempFile;
+ }
+
+ private FileEncryptionProperties createFileEncryptionProperties(
+ String[] encryptColumns, ParquetCipher cipher, boolean footerEncryption)
{
+
+ byte[] footerKey = {
+ 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c,
0x0d, 0x0e, 0x0f, 0x10
+ };
+
+ byte[] sharedKey = new byte[] {
+ 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d,
0x0e, 0x0f, 0x10, 0x11
+ };
+ byte[] phoneKey = new byte[] {
+ 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f,
0x10, 0x11, 0x12, 0x13
+ };
+
+ Map<String, byte[]> columnKeys = new HashMap<>();
+ columnKeys.put("name", sharedKey);
+ columnKeys.put("email", sharedKey);
+ columnKeys.put("phone", phoneKey);
+
+ Map<ColumnPath, ColumnEncryptionProperties> columnPropertyMap = new
HashMap<>();
+ for (String columnPath : encryptColumns) {
+ ColumnPath column = ColumnPath.fromDotString(columnPath);
+ byte[] columnKey = columnKeys.get(columnPath);
+
+ ColumnEncryptionProperties columnProps =
ColumnEncryptionProperties.builder(column)
+ .withKey(columnKey)
+ .withKeyMetaData(columnPath.getBytes(StandardCharsets.UTF_8))
+ .build();
+ columnPropertyMap.put(column, columnProps);
+ }
+
+ FileEncryptionProperties.Builder builder =
FileEncryptionProperties.builder(footerKey)
+ .withFooterKeyMetadata("footkey".getBytes(StandardCharsets.UTF_8))
+ .withAlgorithm(cipher)
+ .withEncryptedColumns(columnPropertyMap);
+
+ if (!footerEncryption) {
+ builder.withPlaintextFooter();
+ }
+
+ return builder.build();
+ }
}