This is an automated email from the ASF dual-hosted git repository.
nkollar pushed a commit to branch encryption
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/encryption by this push:
new 3f2d0e7 PARQUET-1228: Format Structures encryption (#613)
3f2d0e7 is described below
commit 3f2d0e7f5c05907ee37cf549e6ed4bf0e067d491
Author: ggershinsky <[email protected]>
AuthorDate: Tue Aug 27 12:07:10 2019 +0200
PARQUET-1228: Format Structures encryption (#613)
---
.travis.yml | 1 +
dev/travis-before_install-encryption.sh | 29 +++
.../org/apache/parquet/format/BlockCipher.java | 69 +++++++
.../main/java/org/apache/parquet/format/Util.java | 222 +++++++++++++++++----
pom.xml | 2 +-
5 files changed, 278 insertions(+), 45 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index 3fe18f6..fae25f8 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,4 +1,5 @@
language: java
+jdk: openjdk8
before_install:
- bash dev/travis-before_install.sh
diff --git a/dev/travis-before_install-encryption.sh
b/dev/travis-before_install-encryption.sh
new file mode 100755
index 0000000..0e3a3f6
--- /dev/null
+++ b/dev/travis-before_install-encryption.sh
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+################################################################################
+# This is a branch-specific script that gets invoked at the end of
+# travis-before_install.sh. It is run for the bloom-filter branch only.
+################################################################################
+
+cd ..
+git clone https://github.com/apache/parquet-format.git
+cd parquet-format
+mvn install -DskipTests --batch-mode
+cd $TRAVIS_BUILD_DIR
+
+
diff --git
a/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java
b/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java
new file mode 100755
index 0000000..48c0bf2
--- /dev/null
+++
b/parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.format;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface BlockCipher{
+
+
+ public interface Encryptor{
+ /**
+ * Encrypts the plaintext.
+ *
+ * @param plaintext - starts at offset 0 of the input, and fills up the
entire byte array.
+ * @param AAD - Additional Authenticated Data for the encryption (ignored
in case of CTR cipher)
+ * @return lengthAndCiphertext The first 4 bytes of the returned value are
the ciphertext length (little endian int).
+ * The ciphertext starts at offset 4 and fills up the rest of the
returned byte array.
+ * The ciphertext includes the nonce and (in case of GCM cipher) the tag,
as detailed in the
+ * Parquet Modular Encryption specification.
+ * @throws IOException thrown upon any crypto problem encountered during
encryption
+ */
+ public byte[] encrypt(byte[] plaintext, byte[] AAD) throws IOException;
+ }
+
+
+ public interface Decryptor{
+ /**
+ * Decrypts the ciphertext.
+ *
+ * @param lengthAndCiphertext - The first 4 bytes of the input are the
ciphertext length (little endian int).
+ * The ciphertext starts at offset 4 and fills up the rest of the input
byte array.
+ * The ciphertext includes the nonce and (in case of GCM cipher) the tag,
as detailed in the
+ * Parquet Modular Encryption specification.
+ * @param AAD - Additional Authenticated Data for the decryption (ignored
in case of CTR cipher)
+ * @return plaintext - starts at offset 0 of the output value, and fills
up the entire byte array.
+ * @throws IOException thrown upon any crypto problem encountered during
decryption
+ */
+ public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD) throws
IOException;
+
+ /**
+ * Convenience decryption method that reads the length and ciphertext from
the input stream.
+ *
+ * @param from Input stream with length and ciphertext.
+ * @param AAD - Additional Authenticated Data for the decryption (ignored
in case of CTR cipher)
+ * @return plaintext - starts at offset 0 of the output, and fills up the
entire byte array.
+ * @throws IOException thrown upon any crypto or IO problem encountered
during decryption
+ */
+ public byte[] decrypt(InputStream from, byte[] AAD) throws IOException;
+ }
+}
+
diff --git
a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
index d09d007..9242290 100644
---
a/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
+++
b/parquet-format-structures/src/main/java/org/apache/parquet/format/Util.java
@@ -20,6 +20,8 @@
package org.apache.parquet.format;
import static org.apache.parquet.format.FileMetaData._Fields.CREATED_BY;
+import static
org.apache.parquet.format.FileMetaData._Fields.ENCRYPTION_ALGORITHM;
+import static
org.apache.parquet.format.FileMetaData._Fields.FOOTER_SIGNING_KEY_METADATA;
import static
org.apache.parquet.format.FileMetaData._Fields.KEY_VALUE_METADATA;
import static org.apache.parquet.format.FileMetaData._Fields.NUM_ROWS;
import static org.apache.parquet.format.FileMetaData._Fields.ROW_GROUPS;
@@ -30,9 +32,11 @@ import static
org.apache.parquet.format.event.Consumers.listElementsOf;
import static org.apache.parquet.format.event.Consumers.listOf;
import static org.apache.parquet.format.event.Consumers.struct;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.thrift.TBase;
@@ -40,7 +44,7 @@ import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TIOStreamTransport;
-
+import org.apache.thrift.transport.TMemoryBuffer;
import org.apache.parquet.format.event.Consumers.Consumer;
import org.apache.parquet.format.event.Consumers.DelegatingFieldConsumer;
import org.apache.parquet.format.event.EventBasedThriftReader;
@@ -54,37 +58,91 @@ import
org.apache.parquet.format.event.TypedConsumer.StringConsumer;
*/
public class Util {
+ private final static int INIT_MEM_ALLOC_ENCR_BUFFER = 100;
+
public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream
to) throws IOException {
- write(columnIndex, to);
+ writeColumnIndex(columnIndex, to, null, null);
+ }
+
+ public static void writeColumnIndex(ColumnIndex columnIndex, OutputStream
to,
+ BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException {
+ write(columnIndex, to, encryptor, AAD);
}
public static ColumnIndex readColumnIndex(InputStream from) throws
IOException {
- return read(from, new ColumnIndex());
+ return readColumnIndex(from, null, null);
+ }
+
+ public static ColumnIndex readColumnIndex(InputStream from,
+ BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+ return read(from, new ColumnIndex(), decryptor, AAD);
}
public static void writeOffsetIndex(OffsetIndex offsetIndex, OutputStream
to) throws IOException {
- write(offsetIndex, to);
+ writeOffsetIndex(offsetIndex, to, null, null);
+ }
+
+ public static void writeOffsetIndex(OffsetIndex offsetIndex, OutputStream
to,
+ BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException {
+ write(offsetIndex, to, encryptor, AAD);
}
public static OffsetIndex readOffsetIndex(InputStream from) throws
IOException {
- return read(from, new OffsetIndex());
+ return readOffsetIndex(from, null, null);
+ }
+
+ public static OffsetIndex readOffsetIndex(InputStream from,
+ BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+ return read(from, new OffsetIndex(), decryptor, AAD);
}
public static void writePageHeader(PageHeader pageHeader, OutputStream to)
throws IOException {
- write(pageHeader, to);
+ writePageHeader(pageHeader, to, null, null);
+ }
+
+ public static void writePageHeader(PageHeader pageHeader, OutputStream to,
+ BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException {
+ write(pageHeader, to, encryptor, AAD);
}
public static PageHeader readPageHeader(InputStream from) throws IOException
{
- return read(from, new PageHeader());
+ return readPageHeader(from, null, null);
+ }
+
+ public static PageHeader readPageHeader(InputStream from,
+ BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+ return read(from, new PageHeader(), decryptor, AAD);
+ }
+
+ public static void writeFileMetaData(org.apache.parquet.format.FileMetaData
fileMetadata,
+ OutputStream to) throws IOException {
+ writeFileMetaData(fileMetadata, to, null, null);
}
- public static void writeFileMetaData(org.apache.parquet.format.FileMetaData
fileMetadata, OutputStream to) throws IOException {
- write(fileMetadata, to);
+ public static void writeFileMetaData(org.apache.parquet.format.FileMetaData
fileMetadata,
+ OutputStream to, BlockCipher.Encryptor encryptor, byte[] AAD) throws
IOException {
+ write(fileMetadata, to, encryptor, AAD);
}
public static FileMetaData readFileMetaData(InputStream from) throws
IOException {
- return read(from, new FileMetaData());
+ return readFileMetaData(from, null, null);
+ }
+
+ public static FileMetaData readFileMetaData(InputStream from,
+ BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+ return read(from, new FileMetaData(), decryptor, AAD);
+ }
+
+ public static void writeColumnMetaData(ColumnMetaData columnMetaData,
OutputStream to,
+ BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException {
+ write(columnMetaData, to, encryptor, AAD);
}
+
+ public static ColumnMetaData readColumnMetaData(InputStream from,
+ BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+ return read(from, new ColumnMetaData(), decryptor, AAD);
+ }
+
/**
* reads the meta data from the stream
* @param from the stream to read the metadata from
@@ -93,15 +151,28 @@ public class Util {
* @throws IOException if any I/O error occurs during the reading
*/
public static FileMetaData readFileMetaData(InputStream from, boolean
skipRowGroups) throws IOException {
+ return readFileMetaData(from, skipRowGroups, (BlockCipher.Decryptor) null,
(byte[]) null);
+ }
+
+ public static FileMetaData readFileMetaData(InputStream from, boolean
skipRowGroups,
+ BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
FileMetaData md = new FileMetaData();
if (skipRowGroups) {
- readFileMetaData(from, new DefaultFileMetaDataConsumer(md),
skipRowGroups);
+ readFileMetaData(from, new DefaultFileMetaDataConsumer(md),
skipRowGroups, decryptor, AAD);
} else {
- read(from, md);
+ read(from, md, decryptor, AAD);
}
return md;
}
+ public static void
writeFileCryptoMetaData(org.apache.parquet.format.FileCryptoMetaData
cryptoMetadata, OutputStream to) throws IOException {
+ write(cryptoMetadata, to, null, null);
+ }
+
+ public static FileCryptoMetaData readFileCryptoMetaData(InputStream from)
throws IOException {
+ return read(from, new FileCryptoMetaData(), null, null);
+ }
+
/**
* To read metadata in a streaming fashion.
*
@@ -113,6 +184,8 @@ public class Util {
abstract public void addRowGroup(RowGroup rowGroup);
abstract public void addKeyValueMetaData(KeyValue kv);
abstract public void setCreatedBy(String createdBy);
+ abstract public void setEncryptionAlgorithm(EncryptionAlgorithm
encryptionAlgorithm);
+ abstract public void setFooterSigningKeyMetadata(byte[]
footerSigningKeyMetadata);
}
/**
@@ -155,41 +228,73 @@ public class Util {
public void addKeyValueMetaData(KeyValue kv) {
md.addToKey_value_metadata(kv);
}
+
+ @Override
+ public void setEncryptionAlgorithm(EncryptionAlgorithm
encryptionAlgorithm) {
+ md.setEncryption_algorithm(encryptionAlgorithm);
+ }
+
+ @Override
+ public void setFooterSigningKeyMetadata(byte[] footerSigningKeyMetadata) {
+ md.setFooter_signing_key_metadata(footerSigningKeyMetadata);
+ }
}
public static void readFileMetaData(InputStream from, FileMetaDataConsumer
consumer) throws IOException {
- readFileMetaData(from, consumer, false);
+ readFileMetaData(from, consumer, null, null);
+ }
+
+ public static void readFileMetaData(InputStream from, FileMetaDataConsumer
consumer,
+ BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+ readFileMetaData(from, consumer, false, decryptor, AAD);
}
public static void readFileMetaData(InputStream from, final
FileMetaDataConsumer consumer, boolean skipRowGroups) throws IOException {
+ readFileMetaData(from, consumer, skipRowGroups, null, null);
+ }
+
+ public static void readFileMetaData(final InputStream input, final
FileMetaDataConsumer consumer,
+ boolean skipRowGroups, BlockCipher.Decryptor decryptor, byte[] AAD)
throws IOException {
try {
DelegatingFieldConsumer eventConsumer = fieldConsumer()
- .onField(VERSION, new I32Consumer() {
- @Override
- public void consume(int value) {
- consumer.setVersion(value);
- }
- }).onField(SCHEMA, listOf(SchemaElement.class, new
Consumer<List<SchemaElement>>() {
- @Override
- public void consume(List<SchemaElement> schema) {
- consumer.setSchema(schema);
- }
- })).onField(NUM_ROWS, new I64Consumer() {
- @Override
- public void consume(long value) {
- consumer.setNumRows(value);
- }
- }).onField(KEY_VALUE_METADATA, listElementsOf(struct(KeyValue.class, new
Consumer<KeyValue>() {
- @Override
- public void consume(KeyValue kv) {
- consumer.addKeyValueMetaData(kv);
- }
- }))).onField(CREATED_BY, new StringConsumer() {
- @Override
- public void consume(String value) {
- consumer.setCreatedBy(value);
- }
- });
+ .onField(VERSION, new I32Consumer() {
+ @Override
+ public void consume(int value) {
+ consumer.setVersion(value);
+ }
+ }).onField(SCHEMA, listOf(SchemaElement.class, new
Consumer<List<SchemaElement>>() {
+ @Override
+ public void consume(List<SchemaElement> schema) {
+ consumer.setSchema(schema);
+ }
+ })).onField(NUM_ROWS, new I64Consumer() {
+ @Override
+ public void consume(long value) {
+ consumer.setNumRows(value);
+ }
+ }).onField(KEY_VALUE_METADATA, listElementsOf(struct(KeyValue.class,
new Consumer<KeyValue>() {
+ @Override
+ public void consume(KeyValue kv) {
+ consumer.addKeyValueMetaData(kv);
+ }
+ }))).onField(CREATED_BY, new StringConsumer() {
+ @Override
+ public void consume(String value) {
+ consumer.setCreatedBy(value);
+ }
+ }).onField(ENCRYPTION_ALGORITHM, struct(EncryptionAlgorithm.class,
new Consumer<EncryptionAlgorithm>() {
+ @Override
+ public void consume(EncryptionAlgorithm encryptionAlgorithm) {
+ consumer.setEncryptionAlgorithm(encryptionAlgorithm);
+ }
+ })).onField(FOOTER_SIGNING_KEY_METADATA, new StringConsumer() {
+ @Override
+ public void consume(String value) {
+ byte[] keyMetadata = value.getBytes(StandardCharsets.UTF_8);
+ consumer.setFooterSigningKeyMetadata(keyMetadata);
+ }
+ });
+
if (!skipRowGroups) {
eventConsumer = eventConsumer.onField(ROW_GROUPS,
listElementsOf(struct(RowGroup.class, new Consumer<RowGroup>() {
@Override
@@ -198,8 +303,16 @@ public class Util {
}
})));
}
- new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
+ final InputStream from;
+ if (null == decryptor) {
+ from = input;
+ }
+ else {
+ byte[] plainText = decryptor.decrypt(input, AAD);
+ from = new ByteArrayInputStream(plainText);
+ }
+ new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);
} catch (TException e) {
throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
}
@@ -217,7 +330,16 @@ public class Util {
return new InterningProtocol(new TCompactProtocol(t));
}
- private static <T extends TBase<?,?>> T read(InputStream from, T tbase)
throws IOException {
+
+ private static <T extends TBase<?,?>> T read(final InputStream input, T
tbase, BlockCipher.Decryptor decryptor, byte[] AAD) throws IOException {
+ final InputStream from;
+ if (null == decryptor) {
+ from = input;
+ } else {
+ byte[] plainText = decryptor.decrypt(input, AAD);
+ from = new ByteArrayInputStream(plainText);
+ }
+
try {
tbase.read(protocol(from));
return tbase;
@@ -226,11 +348,23 @@ public class Util {
}
}
- private static void write(TBase<?, ?> tbase, OutputStream to) throws
IOException {
- try {
- tbase.write(protocol(to));
+ private static void write(TBase<?, ?> tbase, OutputStream to,
BlockCipher.Encryptor encryptor, byte[] AAD) throws IOException {
+ if (null == encryptor) {
+ try {
+ tbase.write(protocol(to));
+ return;
+ } catch (TException e) {
+ throw new IOException("can not write " + tbase, e);
+ }
+ }
+ // Serialize and encrypt the structure
+ try (TMemoryBuffer thriftMemoryBuffer = new
TMemoryBuffer(INIT_MEM_ALLOC_ENCR_BUFFER)) {
+ tbase.write(new InterningProtocol(new
TCompactProtocol(thriftMemoryBuffer)));
+ byte[] encryptedBuffer =
encryptor.encrypt(thriftMemoryBuffer.getArray(), AAD);
+ to.write(encryptedBuffer);
} catch (TException e) {
throw new IOException("can not write " + tbase, e);
}
}
}
+
diff --git a/pom.xml b/pom.xml
index 1808dba..7bfef73 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,7 +81,7 @@
<hadoop1.version>1.2.1</hadoop1.version>
<cascading.version>2.7.1</cascading.version>
<cascading3.version>3.1.2</cascading3.version>
- <parquet.format.version>2.6.0</parquet.format.version>
+ <parquet.format.version>2.7.0-SNAPSHOT</parquet.format.version>
<previous.version>1.7.0</previous.version>
<thrift.executable>thrift</thrift.executable>
<format.thrift.executable>thrift</format.thrift.executable>