Repository: incubator-gobblin Updated Branches: refs/heads/master f543a39a4 -> e92e5f8a0
[GOBBLIN-521] Add support for encryption in the GPGCodec Closes #2391 from htran1/gpg_encryptor Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e92e5f8a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e92e5f8a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e92e5f8a Branch: refs/heads/master Commit: e92e5f8a0f7641cbe89279f0a977f9f27a6ba768 Parents: f543a39 Author: Hung Tran <[email protected]> Authored: Mon Jul 9 09:30:25 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Jul 9 09:30:25 2018 -0700 ---------------------------------------------------------------------- .../gobblin/crypto/EncryptionConfigParser.java | 13 ++ gobblin-data-management/build.gradle | 1 + .../FileAwareInputStreamDataWriterTest.java | 102 +++++++++ .../gobblin-crypto-provider/build.gradle | 1 + .../crypto/GobblinEncryptionProvider.java | 17 +- .../crypto/GobblinEncryptionProviderTest.java | 72 +++++- gobblin-modules/gobblin-crypto/build.gradle | 13 ++ .../org/apache/gobblin/crypto/GPGCodec.java | 53 ++++- .../apache/gobblin/crypto/GPGFileDecryptor.java | 4 +- .../apache/gobblin/crypto/GPGFileEncryptor.java | 220 +++++++++++++++++++ .../gobblin/crypto/GPGFileEncryptorTest.java | 104 +++++++++ 11 files changed, 589 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java ---------------------------------------------------------------------- diff --git a/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java b/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java index 900b616..0f87a6d 100644 --- a/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java +++ b/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java @@ -67,6 +67,10 @@ public class EncryptionConfigParser { public static final String ENCRYPTION_TYPE_ANY = "any"; + /** + * Some algorithms can be configured with an underlying cipher, like the symmetric cipher used with GPG + */ + public static final String ENCRYPTION_CIPHER_KEY = "cipher"; /** * Represents the entity we are trying to retrieve configuration for. Internally this @@ -200,6 +204,15 @@ public class EncryptionConfigParser { } /** + * Get the underlying cipher name + * @param parameters parameters map + * @return the cipher name + */ + public static String getCipher(Map<String, Object> parameters) { + return (String)parameters.get(ENCRYPTION_CIPHER_KEY); + } + + /** * Extract a set of properties for a given branch, stripping out the prefix and branch * suffix. * http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-data-management/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-data-management/build.gradle b/gobblin-data-management/build.gradle index add49ef..f6cacac 100644 --- a/gobblin-data-management/build.gradle +++ b/gobblin-data-management/build.gradle @@ -58,6 +58,7 @@ dependencies { testCompile externalDependency.hiveJdbc testRuntime project(":gobblin-modules:gobblin-crypto-provider") // for GPG + testCompile project(":gobblin-modules:gobblin-crypto") // for GPG } task testJar(type: Jar, dependsOn: testClasses) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java index 152081d..e772158 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java @@ -43,6 +43,9 @@ import com.google.common.io.Files; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.crypto.EncryptionConfigParser; +import org.apache.gobblin.crypto.GPGFileDecryptor; +import org.apache.gobblin.crypto.GPGFileEncryptor; +import org.apache.gobblin.crypto.GPGFileEncryptorTest; import org.apache.gobblin.data.management.copy.CopyConfiguration; import org.apache.gobblin.data.management.copy.CopySource; import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata; @@ -135,6 +138,105 @@ public class FileAwareInputStreamDataWriterTest { } @Test + public void testWriteWithGPGSymmetricEncryption() throws Exception { + byte[] streamString = "testEncryptedContents".getBytes("UTF-8"); + + FileStatus status = fs.getFileStatus(testTempPath); + OwnerAndPermission ownerAndPermission = + new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission); + + CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source"))); + + WorkUnitState state = TestUtils.createTestWorkUnitState(); + state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, "staging").toString()); + state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, "output").toString()); + state.setProp(ConfigurationKeys.WRITER_FILE_PATH, RandomStringUtils.randomAlphabetic(5)); + state.setProp("writer.encrypt." + EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, "gpg"); + state.setProp("writer.encrypt." + EncryptionConfigParser.ENCRYPTION_KEYSTORE_PASSWORD_KEY, "testPassword"); + + CopySource.serializeCopyEntity(state, cf); + CopySource.serializeCopyableDataset(state, metadata); + + FileAwareInputStreamDataWriter dataWriter = new FileAwareInputStreamDataWriter(state, 1, 0); + + FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, StreamUtils.convertStream( + new ByteArrayInputStream(streamString))); + dataWriter.write(fileAwareInputStream); + dataWriter.commit(); + + Path writtenFilePath = new Path(new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), + cf.getDatasetAndPartition(metadata).identifier()), cf.getDestination()); + Assert.assertTrue(writtenFilePath.getName().endsWith("gpg"), + "Expected encryption name to be appended to destination"); + byte[] encryptedContent = IOUtils.toByteArray(new FileInputStream(writtenFilePath.toString())); + byte[] decryptedContent = new byte[streamString.length]; + IOUtils.readFully(GPGFileDecryptor.decryptFile(new FileInputStream(writtenFilePath.toString()), "testPassword"), + decryptedContent); + + + // encrypted string should not be the same as the plaintext + Assert.assertNotEquals(encryptedContent, streamString); + + // decrypted string should be the same as the plaintext + Assert.assertEquals(decryptedContent, streamString); + + } + + @Test + public void testWriteWithGPGAsymmetricEncryption() throws Exception { + byte[] streamString = "testEncryptedContents".getBytes("UTF-8"); + + FileStatus status = fs.getFileStatus(testTempPath); + OwnerAndPermission ownerAndPermission = + new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission); + + CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source"))); + + WorkUnitState state = TestUtils.createTestWorkUnitState(); + state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, "staging").toString()); + state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, "output").toString()); + state.setProp(ConfigurationKeys.WRITER_FILE_PATH, RandomStringUtils.randomAlphabetic(5)); + state.setProp("writer.encrypt." + EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, "gpg"); + state.setProp("writer.encrypt." + EncryptionConfigParser.ENCRYPTION_KEYSTORE_PATH_KEY, + GPGFileEncryptor.class.getResource( + GPGFileEncryptorTest.PUBLIC_KEY).toString()); + state.setProp("writer.encrypt." + EncryptionConfigParser.ENCRYPTION_KEYSTORE_PASSWORD_KEY, + GPGFileEncryptorTest.PASSPHRASE); + state.setProp("writer.encrypt." + EncryptionConfigParser.ENCRYPTION_KEY_NAME, + GPGFileEncryptorTest.KEY_ID); + + CopySource.serializeCopyEntity(state, cf); + CopySource.serializeCopyableDataset(state, metadata); + + FileAwareInputStreamDataWriter dataWriter = new FileAwareInputStreamDataWriter(state, 1, 0); + + FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, StreamUtils.convertStream( + new ByteArrayInputStream(streamString))); + dataWriter.write(fileAwareInputStream); + dataWriter.commit(); + + Path writtenFilePath = new Path(new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR), + cf.getDatasetAndPartition(metadata).identifier()), cf.getDestination()); + Assert.assertTrue(writtenFilePath.getName().endsWith("gpg"), + "Expected encryption name to be appended to destination"); + byte[] encryptedContent = IOUtils.toByteArray(new FileInputStream(writtenFilePath.toString())); + byte[] decryptedContent = new byte[streamString.length]; + IOUtils.readFully(GPGFileDecryptor.decryptFile(new FileInputStream(writtenFilePath.toString()), + GPGFileEncryptor.class.getResourceAsStream(GPGFileEncryptorTest.PRIVATE_KEY), + GPGFileEncryptorTest.PASSPHRASE), decryptedContent); + + + // encrypted string should not be the same as the plaintext + Assert.assertNotEquals(encryptedContent, streamString); + + // decrypted string should be the same as the plaintext + Assert.assertEquals(decryptedContent, streamString); + + } + + @Test public void testCommit() throws IOException { String destinationExistingToken = "destination"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto-provider/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/build.gradle b/gobblin-modules/gobblin-crypto-provider/build.gradle index a989c29..8a6f06a 100644 --- a/gobblin-modules/gobblin-crypto-provider/build.gradle +++ b/gobblin-modules/gobblin-crypto-provider/build.gradle @@ -24,6 +24,7 @@ dependencies { testCompile project(":gobblin-test-utils") testCompile externalDependency.testng + testCompile project(path: ":gobblin-modules:gobblin-crypto", configuration: "tests") } ext.classification="library" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/crypto/GobblinEncryptionProvider.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/crypto/GobblinEncryptionProvider.java b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/crypto/GobblinEncryptionProvider.java index 7d4cd5e..4bd3091 100644 --- a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/crypto/GobblinEncryptionProvider.java +++ b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/crypto/GobblinEncryptionProvider.java @@ -20,6 +20,8 @@ import java.io.IOException; import java.util.Map; import java.util.Set; +import org.apache.hadoop.fs.Path; + import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; @@ -83,9 +85,20 @@ public class GobblinEncryptionProvider implements CredentialStoreProvider, Encry return new RotatingAESCodec(cs); case GPGCodec.TAG: String password = EncryptionConfigParser.getKeystorePassword(parameters); - Preconditions.checkNotNull(password, "Must specify an en/decryption password for GPGCodec!"); - return new GPGCodec(password); + String keystorePathStr = EncryptionConfigParser.getKeystorePath(parameters); + String keyName = EncryptionConfigParser.getKeyName(parameters); + String cipherName = EncryptionConfigParser.getCipher(parameters); + + // if not using a keystore then use password based encryption + if (keystorePathStr == null) { + Preconditions.checkNotNull(password, "Must specify an en/decryption password for GPGCodec!"); + return new GPGCodec(password, cipherName); + } + // if a key name is not present then use a key id of 0. A GPGCodec may be configured without a key name + // when used only for decryption where the key name is retrieved from the encrypted file + return new GPGCodec(new Path(keystorePathStr), password, + keyName == null ? 0 : Long.parseUnsignedLong(keyName, 16), cipherName); default: log.debug("Do not support encryption type {}", algorithm); return null; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/crypto/GobblinEncryptionProviderTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/crypto/GobblinEncryptionProviderTest.java b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/crypto/GobblinEncryptionProviderTest.java index 21731d6..682e03d 100644 --- a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/crypto/GobblinEncryptionProviderTest.java +++ b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/crypto/GobblinEncryptionProviderTest.java @@ -16,20 +16,25 @@ */ package org.apache.gobblin.crypto; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; -import org.junit.Assert; +import org.apache.commons.io.IOUtils; +import org.testng.Assert; import org.testng.annotations.Test; import org.apache.gobblin.codec.StreamCodec; public class GobblinEncryptionProviderTest { + private static final long KEY_ID = -4435883136602571409L; + @Test public void testCanBuildAes() throws IOException { Map<String, Object> properties = new HashMap<>(); @@ -48,6 +53,69 @@ public class GobblinEncryptionProviderTest { cipherStream.write(toEncrypt); cipherStream.close(); - Assert.assertTrue("Expected to be able to write ciphertext!", cipherOut.size() > 0); + Assert.assertTrue(cipherOut.size() > 0, "Expected to be able to write ciphertext!"); + } + + @Test + public void testCanBuildGPG() throws IOException { + Map<String, Object> encryptionProperties = new HashMap<>(); + encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, GPGCodec.TAG); + encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEYSTORE_PATH_KEY, GPGFileEncryptor.class.getResource( + GPGFileEncryptorTest.PUBLIC_KEY).toString()); + encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEY_NAME, String.valueOf(GPGFileEncryptorTest.KEY_ID)); + + testGPG(encryptionProperties); + } + + @Test + public void testBuildGPGGoodCipher() throws IOException { + Map<String, Object> encryptionProperties = new HashMap<>(); + encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, GPGCodec.TAG); + encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEYSTORE_PATH_KEY, GPGFileEncryptor.class.getResource( + GPGFileEncryptorTest.PUBLIC_KEY).toString()); + encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEY_NAME, String.valueOf(GPGFileEncryptorTest.KEY_ID)); + encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_CIPHER_KEY, "CAST5"); + + testGPG(encryptionProperties); + } + + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = ".*BadCipher.*") + public void testBuildGPGBadCipher() throws IOException { + Map<String, Object> encryptionProperties = new HashMap<>(); + encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, GPGCodec.TAG); + encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEYSTORE_PATH_KEY, GPGFileEncryptor.class.getResource( + GPGFileEncryptorTest.PUBLIC_KEY).toString()); + encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEY_NAME, String.valueOf(GPGFileEncryptorTest.KEY_ID)); + encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_CIPHER_KEY, "BadCipher"); + + testGPG(encryptionProperties); + } + + private void testGPG(Map<String, Object> encryptionProperties) throws IOException { + StreamCodec encryptor = EncryptionFactory.buildStreamCryptoProvider(encryptionProperties); + Assert.assertNotNull(encryptor); + + Map<String, Object> decryptionProperties = new HashMap<>(); + decryptionProperties.put(EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, GPGCodec.TAG); + decryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEYSTORE_PATH_KEY, GPGFileEncryptor.class.getResource( + GPGFileEncryptorTest.PRIVATE_KEY).toString()); + decryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEYSTORE_PASSWORD_KEY, GPGFileEncryptorTest.PASSPHRASE); + StreamCodec decryptor = EncryptionFactory.buildStreamCryptoProvider(decryptionProperties); + Assert.assertNotNull(decryptor); + + ByteArrayOutputStream cipherOut = new ByteArrayOutputStream(); + OutputStream cipherStream = encryptor.encodeOutputStream(cipherOut); + cipherStream.write(GPGFileEncryptorTest.EXPECTED_FILE_CONTENT_BYTES); + cipherStream.close(); + + byte[] encryptedBytes = cipherOut.toByteArray(); + Assert.assertTrue(encryptedBytes.length > 0, "Expected to be able to write ciphertext!"); + + try (InputStream is = decryptor.decodeInputStream(new ByteArrayInputStream(encryptedBytes))) { + byte[] decryptedBytes = IOUtils.toByteArray(is); + + Assert.assertNotEquals(GPGFileEncryptorTest.EXPECTED_FILE_CONTENT_BYTES, encryptedBytes); + Assert.assertEquals(GPGFileEncryptorTest.EXPECTED_FILE_CONTENT_BYTES, decryptedBytes); + } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto/build.gradle b/gobblin-modules/gobblin-crypto/build.gradle index e9abf2e..b868c1b 100644 --- a/gobblin-modules/gobblin-crypto/build.gradle +++ b/gobblin-modules/gobblin-crypto/build.gradle @@ -37,6 +37,19 @@ dependencies { ext.classification="library" +task testJar(type: Jar, dependsOn: testClasses) { + baseName = "test-${project.archivesBaseName}" + from sourceSets.test.output +} + +configurations { + tests +} + +artifacts { + tests testJar +} + jmh { include = "" zip64 = true http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGCodec.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGCodec.java b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGCodec.java index e2330e3..6ecba69 100644 --- a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGCodec.java +++ b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGCodec.java @@ -20,31 +20,74 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import org.apache.gobblin.codec.StreamCodec; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.gobblin.codec.StreamCodec; /** - * Codec class that supports GPG decryption (only). + * Codec class that supports GPG encryption and decryption. */ public class GPGCodec implements StreamCodec { public static final String TAG = "gpg"; private final String password; + private final String cipher; + private final Path keyRingPath; + private final long keyId; + private final Configuration conf; - public GPGCodec(String password) { + /** + * Constructor for a {@code GPGCodec} configured for password-based encryption + * @param password password for symmetric encryption + * @param cipher the symmetric cipher to use for encryption. If null or empty then a default cipher is used. + */ + public GPGCodec(String password, String cipher) { this.password = password; + this.cipher = cipher; + + this.keyRingPath = null; + this.keyId = 0; + this.conf = null; + } + + /** + * Constructor for a {@code GPGCodec} configured for key-based encryption + * @param keyRingPath path to the key ring + * @param passphrase passphrase for retrieving the decryption key + * @param keyId id for retrieving the key used for encryption + * @param cipher the symmetric cipher to use for encryption. If null or empty then a default cipher is used. + */ + public GPGCodec(Path keyRingPath, String passphrase, long keyId, String cipher) { + this.keyRingPath = keyRingPath; + this.password = passphrase; + this.keyId = keyId; + this.cipher = cipher; + this.conf = new Configuration(); } @Override public OutputStream encodeOutputStream(OutputStream origStream) throws IOException { - throw new UnsupportedOperationException("Not implemented yet"); + if (this.keyRingPath != null) { + try (InputStream keyRingInputStream = this.keyRingPath.getFileSystem(this.conf).open(this.keyRingPath)) { + return GPGFileEncryptor.encryptFile(origStream, keyRingInputStream, this.keyId, this.cipher); + } + } else { + return GPGFileEncryptor.encryptFile(origStream, this.password, this.cipher); + } } @Override public InputStream decodeInputStream(InputStream origStream) throws IOException { - return GPGFileDecryptor.decryptFile(origStream, password); + if (this.keyRingPath != null) { + try (InputStream keyRingInputStream = this.keyRingPath.getFileSystem(this.conf).open(keyRingPath)) { + return GPGFileDecryptor.decryptFile(origStream, keyRingInputStream, this.password); + } + } else { + return GPGFileDecryptor.decryptFile(origStream, this.password); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileDecryptor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileDecryptor.java b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileDecryptor.java index 4c673c7..b7026e8 100644 --- a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileDecryptor.java +++ b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileDecryptor.java @@ -81,9 +81,9 @@ public class GPGFileDecryptor { /** * Taking in a file inputstream, keyring inputstream and a passPhrase, generate a decrypted file inputstream. * @param inputStream file inputstream - * @param keyIn keyring inputstream + * @param keyIn keyring inputstream. This InputStream is owned by the caller. * @param passPhrase passPhrase - * @return + * @return an {@link InputStream} for the decrypted content * @throws IOException */ public InputStream decryptFile(InputStream inputStream, InputStream keyIn, String passPhrase) http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileEncryptor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileEncryptor.java b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileEncryptor.java new file mode 100644 index 0000000..aebedb2 --- /dev/null +++ b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileEncryptor.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.crypto; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Field; +import java.security.SecureRandom; +import java.security.Security; +import java.util.Date; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.bouncycastle.jce.provider.BouncyCastleProvider; +import org.bouncycastle.openpgp.PGPEncryptedData; +import org.bouncycastle.openpgp.PGPEncryptedDataGenerator; +import org.bouncycastle.openpgp.PGPException; +import org.bouncycastle.openpgp.PGPLiteralDataGenerator; +import org.bouncycastle.openpgp.PGPPublicKey; +import org.bouncycastle.openpgp.PGPPublicKeyRingCollection; +import org.bouncycastle.openpgp.PGPUtil; +import org.bouncycastle.openpgp.operator.bc.BcKeyFingerprintCalculator; +import org.bouncycastle.openpgp.operator.jcajce.JcePBEKeyEncryptionMethodGenerator; +import org.bouncycastle.openpgp.operator.jcajce.JcePGPDataEncryptorBuilder; +import org.bouncycastle.openpgp.operator.jcajce.JcePublicKeyKeyEncryptionMethodGenerator; +import org.reflections.ReflectionUtils; + +import com.google.common.base.Preconditions; + +import lombok.experimental.UtilityClass; + + +/** + * A utility class that supports both password based and key based encryption + * + * Code reference - org.bouncycastle.openpgp.examples.PBEFileProcessor + * - org.bouncycastle.openpgp.examples.KeyBasedFileProcessor + */ +@UtilityClass +public class GPGFileEncryptor { + private static int BUFFER_SIZE = 1024; + private static String PAYLOAD_NAME = "payload.file"; + private static String PROVIDER_NAME = BouncyCastleProvider.PROVIDER_NAME; + + /** + * Taking in an input {@link OutputStream} and a passPhrase, return an {@link OutputStream} that can be used to output + * encrypted output to the input {@link OutputStream}. + * @param outputStream the output stream to hold the ciphertext {@link OutputStream} + * @param passPhrase pass phrase + * @param cipher the symmetric cipher to use for encryption. If null or empty then a default cipher is used. + * @return {@link OutputStream} to write content to for encryption + * @throws IOException + */ + public OutputStream encryptFile(OutputStream outputStream, String passPhrase, String cipher) throws IOException { + try { + if (Security.getProvider(PROVIDER_NAME) == null) { + Security.addProvider(new BouncyCastleProvider()); + } + + PGPEncryptedDataGenerator cPk = new PGPEncryptedDataGenerator( + new JcePGPDataEncryptorBuilder(symmetricKeyAlgorithmNameToTag(cipher)) + .setSecureRandom(new SecureRandom()) + .setProvider(PROVIDER_NAME)); + cPk.addMethod(new JcePBEKeyEncryptionMethodGenerator(passPhrase.toCharArray()).setProvider(PROVIDER_NAME)); + + OutputStream cOut = cPk.open(outputStream, new byte[BUFFER_SIZE]); + + PGPLiteralDataGenerator literalGen = new PGPLiteralDataGenerator(); + OutputStream _literalOut = + literalGen.open(cOut, PGPLiteralDataGenerator.BINARY, PAYLOAD_NAME, new Date(), new byte[BUFFER_SIZE]); + + return new ClosingWrapperOutputStream(_literalOut, cOut, outputStream); + } catch (PGPException e) { + throw new IOException(e); + } + } + + + /** + * Taking in an input {@link OutputStream}, keyring inputstream and a passPhrase, generate an encrypted {@link OutputStream}. + * @param outputStream {@link OutputStream} that will receive the encrypted content + * @param keyIn keyring inputstream. This InputStream is owned by the caller. + * @param keyId key identifier + * @param cipher the symmetric cipher to use for encryption. If null or empty then a default cipher is used. + * @return an {@link OutputStream} to write content to for encryption + * @throws IOException + */ + public OutputStream encryptFile(OutputStream outputStream, InputStream keyIn, long keyId, String cipher) + throws IOException { + try { + if (Security.getProvider(PROVIDER_NAME) == null) { + Security.addProvider(new BouncyCastleProvider()); + } + + PGPEncryptedDataGenerator cPk = new PGPEncryptedDataGenerator( + new JcePGPDataEncryptorBuilder(symmetricKeyAlgorithmNameToTag(cipher)) + .setSecureRandom(new SecureRandom()) + .setProvider(PROVIDER_NAME)); + + PGPPublicKey publicKey; + PGPPublicKeyRingCollection keyRings = new PGPPublicKeyRingCollection(PGPUtil.getDecoderStream(keyIn), + new BcKeyFingerprintCalculator()); + publicKey = keyRings.getPublicKey(keyId); + + if (publicKey == null) { + throw new IllegalArgumentException("public key for encryption not found"); + } + + cPk.addMethod(new JcePublicKeyKeyEncryptionMethodGenerator(publicKey).setProvider(PROVIDER_NAME)); + + OutputStream cOut = cPk.open(outputStream, new byte[BUFFER_SIZE]); + + PGPLiteralDataGenerator literalGen = new PGPLiteralDataGenerator(); + OutputStream _literalOut = + literalGen.open(cOut, PGPLiteralDataGenerator.BINARY, PAYLOAD_NAME, new Date(), new byte[BUFFER_SIZE]); + + return new ClosingWrapperOutputStream(_literalOut, cOut, outputStream); + } catch (PGPException e) { + throw new IOException(e); + } + } + + /** + * Convert a string cipher name to the integer tag used by GPG + * @param cipherName the cipher name + * @return integer tag for the cipher + */ + private static int symmetricKeyAlgorithmNameToTag(String cipherName) { + // Use CAST5 if no cipher specified + if (StringUtils.isEmpty(cipherName)) { + return PGPEncryptedData.CAST5; + } + + Set<Field> fields = ReflectionUtils.getAllFields(PGPEncryptedData.class, ReflectionUtils.withName(cipherName)); + + if (fields.isEmpty()) { + throw new RuntimeException("Could not find tag for cipher name " + cipherName); + } + + try { + return fields.iterator().next().getInt(null); + } catch (IllegalAccessException e) { + throw new RuntimeException("Could not access field " + cipherName, e); + } + } + + /** + * A class for keeping track of wrapped output streams and closing them when this stream is closed. + * This is required because GPG wrapping of streams does not propagate the close. + */ + private static class ClosingWrapperOutputStream extends OutputStream { + private final OutputStream[] outputStreams; + private final OutputStream firstStream; + + /** + * Creates an output stream that writes to the first {@link OutputStream} and closes all of the {@link OutputStream}s + * when close() is called + * @param outputStreams list of {@link OutputStream}s where the first one is written to and the rest are tracked + * for closing. + */ + public ClosingWrapperOutputStream(OutputStream... outputStreams) { + Preconditions.checkArgument(outputStreams.length >= 1); + + this.outputStreams = outputStreams; + this.firstStream = outputStreams[0]; + } + + @Override + public void write(byte[] bytes) + throws IOException + { + this.firstStream.write(bytes); + } + + @Override + public void write(byte[] bytes, int offset, int length) + throws IOException + { + this.firstStream.write(bytes, offset, length); + } + + @Override + public void write(int b) + throws IOException + { + this.firstStream.write(b); + } + + public void flush() + throws IOException + { + for (OutputStream os : this.outputStreams) { + os.flush(); + } + } + + public void close() + throws IOException + { + for (OutputStream os : this.outputStreams) { + os.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto/src/test/java/org/apache/gobblin/crypto/GPGFileEncryptorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-crypto/src/test/java/org/apache/gobblin/crypto/GPGFileEncryptorTest.java b/gobblin-modules/gobblin-crypto/src/test/java/org/apache/gobblin/crypto/GPGFileEncryptorTest.java new file mode 100644 index 0000000..285c414 --- /dev/null +++ b/gobblin-modules/gobblin-crypto/src/test/java/org/apache/gobblin/crypto/GPGFileEncryptorTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.crypto; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.bouncycastle.openpgp.PGPException; +import org.testng.Assert; +import org.testng.annotations.Test; + + +/** + * Test class for {@link GPGFileDecryptor} + * Test key and test passphrase are generated offline + */ +public class GPGFileEncryptorTest { + public static final String PASSWORD = "test"; + public static final String PASSPHRASE = "gobblin"; + public static final String PUBLIC_KEY = "/crypto/gpg/testPublic.key"; + public static final String PRIVATE_KEY = "/crypto/gpg/testPrivate.key"; + public static final String KEY_ID = "c27093CA21A87D6F"; + public static final String EXPECTED_FILE_CONTENT = "This is a key based encryption file."; + public static final byte[] EXPECTED_FILE_CONTENT_BYTES = EXPECTED_FILE_CONTENT.getBytes(StandardCharsets.UTF_8); + + /** + * Encrypt a test string with a symmetric key and check that it can be decrypted + * @throws IOException + * @throws PGPException + */ + @Test + public void encryptSym() throws IOException, PGPException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputStream os = GPGFileEncryptor.encryptFile(baos, PASSWORD, "DES"); + + os.write(EXPECTED_FILE_CONTENT_BYTES); + os.close(); + baos.close(); + + byte[] encryptedBytes = baos.toByteArray(); + + try (InputStream is = GPGFileDecryptor.decryptFile(new ByteArrayInputStream(encryptedBytes), "test")) { + byte[] decryptedBytes = IOUtils.toByteArray(is); + + Assert.assertNotEquals(EXPECTED_FILE_CONTENT_BYTES, encryptedBytes); + Assert.assertEquals(EXPECTED_FILE_CONTENT_BYTES, decryptedBytes); + } + } + + /** + * Encrypt a test string with an asymmetric key and check that it can be decrypted + * @throws IOException + * @throws PGPException + */ + @Test + public void encryptAsym() throws IOException, PGPException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputStream os = GPGFileEncryptor.encryptFile(baos, getClass().getResourceAsStream(PUBLIC_KEY), + Long.parseUnsignedLong(KEY_ID, 16), "CAST5"); + + os.write(EXPECTED_FILE_CONTENT_BYTES); + os.close(); + baos.close(); + + byte[] encryptedBytes = baos.toByteArray(); + + try (InputStream is = GPGFileDecryptor.decryptFile(new ByteArrayInputStream(encryptedBytes), + getClass().getResourceAsStream(PRIVATE_KEY), PASSPHRASE)) { + byte[] decryptedBytes = IOUtils.toByteArray(is); + + Assert.assertNotEquals(EXPECTED_FILE_CONTENT_BYTES, encryptedBytes); + Assert.assertEquals(EXPECTED_FILE_CONTENT_BYTES, decryptedBytes); + } + } + + /** + * Test error with bad cipher + */ + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = ".*BadCipher.*") + public void badCipher() throws IOException, PGPException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + OutputStream os = GPGFileEncryptor.encryptFile(baos, getClass().getResourceAsStream(PUBLIC_KEY), + Long.parseUnsignedLong(KEY_ID, 16), "BadCipher"); + } +}
