Repository: incubator-gobblin
Updated Branches:
  refs/heads/master f543a39a4 -> e92e5f8a0


[GOBBLIN-521] Add support for encryption in the GPGCodec

Closes #2391 from htran1/gpg_encryptor


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e92e5f8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e92e5f8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e92e5f8a

Branch: refs/heads/master
Commit: e92e5f8a0f7641cbe89279f0a977f9f27a6ba768
Parents: f543a39
Author: Hung Tran <[email protected]>
Authored: Mon Jul 9 09:30:25 2018 -0700
Committer: Hung Tran <[email protected]>
Committed: Mon Jul 9 09:30:25 2018 -0700

----------------------------------------------------------------------
 .../gobblin/crypto/EncryptionConfigParser.java  |  13 ++
 gobblin-data-management/build.gradle            |   1 +
 .../FileAwareInputStreamDataWriterTest.java     | 102 +++++++++
 .../gobblin-crypto-provider/build.gradle        |   1 +
 .../crypto/GobblinEncryptionProvider.java       |  17 +-
 .../crypto/GobblinEncryptionProviderTest.java   |  72 +++++-
 gobblin-modules/gobblin-crypto/build.gradle     |  13 ++
 .../org/apache/gobblin/crypto/GPGCodec.java     |  53 ++++-
 .../apache/gobblin/crypto/GPGFileDecryptor.java |   4 +-
 .../apache/gobblin/crypto/GPGFileEncryptor.java | 220 +++++++++++++++++++
 .../gobblin/crypto/GPGFileEncryptorTest.java    | 104 +++++++++
 11 files changed, 589 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
----------------------------------------------------------------------
diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
index 900b616..0f87a6d 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/crypto/EncryptionConfigParser.java
@@ -67,6 +67,10 @@ public class EncryptionConfigParser {
 
   public static final String ENCRYPTION_TYPE_ANY = "any";
 
+  /**
+   * Some algorithms can be configured with an underlying cipher, like the 
symmetric cipher used with GPG
+   */
+  public static final String ENCRYPTION_CIPHER_KEY = "cipher";
 
   /**
    * Represents the entity we are trying to retrieve configuration for. 
Internally this
@@ -200,6 +204,15 @@ public class EncryptionConfigParser {
   }
 
   /**
+   * Get the underlying cipher name
+   * @param parameters parameters map
+   * @return the cipher name
+   */
+  public static String getCipher(Map<String, Object> parameters) {
+    return (String)parameters.get(ENCRYPTION_CIPHER_KEY);
+  }
+
+  /**
    * Extract a set of properties for a given branch, stripping out the prefix 
and branch
    * suffix.
    *

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-data-management/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-data-management/build.gradle 
b/gobblin-data-management/build.gradle
index add49ef..f6cacac 100644
--- a/gobblin-data-management/build.gradle
+++ b/gobblin-data-management/build.gradle
@@ -58,6 +58,7 @@ dependencies {
   testCompile externalDependency.hiveJdbc
 
   testRuntime project(":gobblin-modules:gobblin-crypto-provider") // for GPG
+  testCompile project(":gobblin-modules:gobblin-crypto") // for GPG
 }
 
 task testJar(type: Jar, dependsOn: testClasses) {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
index 152081d..e772158 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
@@ -43,6 +43,9 @@ import com.google.common.io.Files;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.crypto.EncryptionConfigParser;
+import org.apache.gobblin.crypto.GPGFileDecryptor;
+import org.apache.gobblin.crypto.GPGFileEncryptor;
+import org.apache.gobblin.crypto.GPGFileEncryptorTest;
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.CopySource;
 import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
@@ -135,6 +138,105 @@ public class FileAwareInputStreamDataWriterTest {
   }
 
   @Test
+  public void testWriteWithGPGSymmetricEncryption() throws Exception {
+    byte[] streamString = "testEncryptedContents".getBytes("UTF-8");
+
+    FileStatus status = fs.getFileStatus(testTempPath);
+    OwnerAndPermission ownerAndPermission =
+        new OwnerAndPermission(status.getOwner(), status.getGroup(), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    CopyableFile cf = 
CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+
+    CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new 
TestCopyableDataset(new Path("/source")));
+
+    WorkUnitState state = TestUtils.createTestWorkUnitState();
+    state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, 
"staging").toString());
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, 
"output").toString());
+    state.setProp(ConfigurationKeys.WRITER_FILE_PATH, 
RandomStringUtils.randomAlphabetic(5));
+    state.setProp("writer.encrypt." + 
EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, "gpg");
+    state.setProp("writer.encrypt." + 
EncryptionConfigParser.ENCRYPTION_KEYSTORE_PASSWORD_KEY, "testPassword");
+
+    CopySource.serializeCopyEntity(state, cf);
+    CopySource.serializeCopyableDataset(state, metadata);
+
+    FileAwareInputStreamDataWriter dataWriter = new 
FileAwareInputStreamDataWriter(state, 1, 0);
+
+    FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, 
StreamUtils.convertStream(
+        new ByteArrayInputStream(streamString)));
+    dataWriter.write(fileAwareInputStream);
+    dataWriter.commit();
+
+    Path writtenFilePath = new Path(new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
+        cf.getDatasetAndPartition(metadata).identifier()), 
cf.getDestination());
+    Assert.assertTrue(writtenFilePath.getName().endsWith("gpg"),
+        "Expected encryption name to be appended to destination");
+    byte[] encryptedContent = IOUtils.toByteArray(new 
FileInputStream(writtenFilePath.toString()));
+    byte[] decryptedContent = new byte[streamString.length];
+    IOUtils.readFully(GPGFileDecryptor.decryptFile(new 
FileInputStream(writtenFilePath.toString()), "testPassword"),
+        decryptedContent);
+
+
+    // encrypted string should not be the same as the plaintext
+    Assert.assertNotEquals(encryptedContent, streamString);
+
+    // decrypted string should be the same as the plaintext
+    Assert.assertEquals(decryptedContent, streamString);
+
+  }
+
+  @Test
+  public void testWriteWithGPGAsymmetricEncryption() throws Exception {
+    byte[] streamString = "testEncryptedContents".getBytes("UTF-8");
+
+    FileStatus status = fs.getFileStatus(testTempPath);
+    OwnerAndPermission ownerAndPermission =
+        new OwnerAndPermission(status.getOwner(), status.getGroup(), new 
FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    CopyableFile cf = 
CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+
+    CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new 
TestCopyableDataset(new Path("/source")));
+
+    WorkUnitState state = TestUtils.createTestWorkUnitState();
+    state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, 
"staging").toString());
+    state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, 
"output").toString());
+    state.setProp(ConfigurationKeys.WRITER_FILE_PATH, 
RandomStringUtils.randomAlphabetic(5));
+    state.setProp("writer.encrypt." + 
EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, "gpg");
+    state.setProp("writer.encrypt." + 
EncryptionConfigParser.ENCRYPTION_KEYSTORE_PATH_KEY,
+        GPGFileEncryptor.class.getResource(
+            GPGFileEncryptorTest.PUBLIC_KEY).toString());
+    state.setProp("writer.encrypt." + 
EncryptionConfigParser.ENCRYPTION_KEYSTORE_PASSWORD_KEY,
+        GPGFileEncryptorTest.PASSPHRASE);
+    state.setProp("writer.encrypt." + 
EncryptionConfigParser.ENCRYPTION_KEY_NAME,
+        GPGFileEncryptorTest.KEY_ID);
+
+    CopySource.serializeCopyEntity(state, cf);
+    CopySource.serializeCopyableDataset(state, metadata);
+
+    FileAwareInputStreamDataWriter dataWriter = new 
FileAwareInputStreamDataWriter(state, 1, 0);
+
+    FileAwareInputStream fileAwareInputStream = new FileAwareInputStream(cf, 
StreamUtils.convertStream(
+        new ByteArrayInputStream(streamString)));
+    dataWriter.write(fileAwareInputStream);
+    dataWriter.commit();
+
+    Path writtenFilePath = new Path(new 
Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
+        cf.getDatasetAndPartition(metadata).identifier()), 
cf.getDestination());
+    Assert.assertTrue(writtenFilePath.getName().endsWith("gpg"),
+        "Expected encryption name to be appended to destination");
+    byte[] encryptedContent = IOUtils.toByteArray(new 
FileInputStream(writtenFilePath.toString()));
+    byte[] decryptedContent = new byte[streamString.length];
+    IOUtils.readFully(GPGFileDecryptor.decryptFile(new 
FileInputStream(writtenFilePath.toString()),
+        
GPGFileEncryptor.class.getResourceAsStream(GPGFileEncryptorTest.PRIVATE_KEY),
+        GPGFileEncryptorTest.PASSPHRASE), decryptedContent);
+
+
+    // encrypted string should not be the same as the plaintext
+    Assert.assertNotEquals(encryptedContent, streamString);
+
+    // decrypted string should be the same as the plaintext
+    Assert.assertEquals(decryptedContent, streamString);
+
+  }
+
+  @Test
   public void testCommit() throws IOException {
 
     String destinationExistingToken = "destination";

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto-provider/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto-provider/build.gradle 
b/gobblin-modules/gobblin-crypto-provider/build.gradle
index a989c29..8a6f06a 100644
--- a/gobblin-modules/gobblin-crypto-provider/build.gradle
+++ b/gobblin-modules/gobblin-crypto-provider/build.gradle
@@ -24,6 +24,7 @@ dependencies {
 
   testCompile project(":gobblin-test-utils")
   testCompile externalDependency.testng
+  testCompile project(path: ":gobblin-modules:gobblin-crypto", configuration: 
"tests")
 }
 
 ext.classification="library"

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/crypto/GobblinEncryptionProvider.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/crypto/GobblinEncryptionProvider.java
 
b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/crypto/GobblinEncryptionProvider.java
index 7d4cd5e..4bd3091 100644
--- 
a/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/crypto/GobblinEncryptionProvider.java
+++ 
b/gobblin-modules/gobblin-crypto-provider/src/main/java/org/apache/gobblin/crypto/GobblinEncryptionProvider.java
@@ -20,6 +20,8 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.fs.Path;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 
@@ -83,9 +85,20 @@ public class GobblinEncryptionProvider implements 
CredentialStoreProvider, Encry
         return new RotatingAESCodec(cs);
       case GPGCodec.TAG:
         String password = 
EncryptionConfigParser.getKeystorePassword(parameters);
-        Preconditions.checkNotNull(password, "Must specify an en/decryption 
password for GPGCodec!");
-        return new GPGCodec(password);
+        String keystorePathStr = 
EncryptionConfigParser.getKeystorePath(parameters);
+        String keyName = EncryptionConfigParser.getKeyName(parameters);
+        String cipherName = EncryptionConfigParser.getCipher(parameters);
+
+        // if not using a keystore then use password based encryption
+        if (keystorePathStr == null) {
+          Preconditions.checkNotNull(password, "Must specify an en/decryption 
password for GPGCodec!");
+          return new GPGCodec(password, cipherName);
+        }
 
+        // if a key name is not present then use a key id of 0. A GPGCodec may 
be configured without a key name
+        // when used only for decryption where the key name is retrieved from 
the encrypted file
+        return new GPGCodec(new Path(keystorePathStr), password,
+            keyName == null ? 0 : Long.parseUnsignedLong(keyName, 16), 
cipherName);
       default:
         log.debug("Do not support encryption type {}", algorithm);
         return null;

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/crypto/GobblinEncryptionProviderTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/crypto/GobblinEncryptionProviderTest.java
 
b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/crypto/GobblinEncryptionProviderTest.java
index 21731d6..682e03d 100644
--- 
a/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/crypto/GobblinEncryptionProviderTest.java
+++ 
b/gobblin-modules/gobblin-crypto-provider/src/test/java/org/apache/gobblin/crypto/GobblinEncryptionProviderTest.java
@@ -16,20 +16,25 @@
  */
 package org.apache.gobblin.crypto;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.junit.Assert;
+import org.apache.commons.io.IOUtils;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import org.apache.gobblin.codec.StreamCodec;
 
 
 public class GobblinEncryptionProviderTest {
+  private static final long KEY_ID = -4435883136602571409L;
+
   @Test
   public void testCanBuildAes() throws IOException {
     Map<String, Object> properties = new HashMap<>();
@@ -48,6 +53,69 @@ public class GobblinEncryptionProviderTest {
     cipherStream.write(toEncrypt);
     cipherStream.close();
 
-    Assert.assertTrue("Expected to be able to write ciphertext!", 
cipherOut.size() > 0);
+    Assert.assertTrue(cipherOut.size() > 0, "Expected to be able to write 
ciphertext!");
+  }
+
+  @Test
+  public void testCanBuildGPG() throws IOException {
+    Map<String, Object> encryptionProperties = new HashMap<>();
+    encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, 
GPGCodec.TAG);
+    
encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEYSTORE_PATH_KEY, 
GPGFileEncryptor.class.getResource(
+        GPGFileEncryptorTest.PUBLIC_KEY).toString());
+    encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEY_NAME, 
String.valueOf(GPGFileEncryptorTest.KEY_ID));
+
+    testGPG(encryptionProperties);
+  }
+
+  @Test
+  public void testBuildGPGGoodCipher() throws IOException {
+    Map<String, Object> encryptionProperties = new HashMap<>();
+    encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, 
GPGCodec.TAG);
+    
encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEYSTORE_PATH_KEY, 
GPGFileEncryptor.class.getResource(
+        GPGFileEncryptorTest.PUBLIC_KEY).toString());
+    encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEY_NAME, 
String.valueOf(GPGFileEncryptorTest.KEY_ID));
+    encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_CIPHER_KEY, 
"CAST5");
+
+    testGPG(encryptionProperties);
+  }
+
+  @Test(expectedExceptions = RuntimeException.class, 
expectedExceptionsMessageRegExp = ".*BadCipher.*")
+  public void testBuildGPGBadCipher() throws IOException {
+    Map<String, Object> encryptionProperties = new HashMap<>();
+    encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, 
GPGCodec.TAG);
+    
encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEYSTORE_PATH_KEY, 
GPGFileEncryptor.class.getResource(
+        GPGFileEncryptorTest.PUBLIC_KEY).toString());
+    encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEY_NAME, 
String.valueOf(GPGFileEncryptorTest.KEY_ID));
+    encryptionProperties.put(EncryptionConfigParser.ENCRYPTION_CIPHER_KEY, 
"BadCipher");
+
+    testGPG(encryptionProperties);
+  }
+
+  private void testGPG(Map<String, Object> encryptionProperties) throws 
IOException {
+    StreamCodec encryptor = 
EncryptionFactory.buildStreamCryptoProvider(encryptionProperties);
+    Assert.assertNotNull(encryptor);
+
+    Map<String, Object> decryptionProperties = new HashMap<>();
+    decryptionProperties.put(EncryptionConfigParser.ENCRYPTION_ALGORITHM_KEY, 
GPGCodec.TAG);
+    
decryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEYSTORE_PATH_KEY, 
GPGFileEncryptor.class.getResource(
+        GPGFileEncryptorTest.PRIVATE_KEY).toString());
+    
decryptionProperties.put(EncryptionConfigParser.ENCRYPTION_KEYSTORE_PASSWORD_KEY,
 GPGFileEncryptorTest.PASSPHRASE);
+    StreamCodec decryptor = 
EncryptionFactory.buildStreamCryptoProvider(decryptionProperties);
+    Assert.assertNotNull(decryptor);
+
+    ByteArrayOutputStream cipherOut = new ByteArrayOutputStream();
+    OutputStream cipherStream = encryptor.encodeOutputStream(cipherOut);
+    cipherStream.write(GPGFileEncryptorTest.EXPECTED_FILE_CONTENT_BYTES);
+    cipherStream.close();
+
+    byte[] encryptedBytes = cipherOut.toByteArray();
+    Assert.assertTrue(encryptedBytes.length > 0, "Expected to be able to write 
ciphertext!");
+
+    try (InputStream is = decryptor.decodeInputStream(new 
ByteArrayInputStream(encryptedBytes))) {
+      byte[] decryptedBytes = IOUtils.toByteArray(is);
+
+      Assert.assertNotEquals(GPGFileEncryptorTest.EXPECTED_FILE_CONTENT_BYTES, 
encryptedBytes);
+      Assert.assertEquals(GPGFileEncryptorTest.EXPECTED_FILE_CONTENT_BYTES, 
decryptedBytes);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-crypto/build.gradle 
b/gobblin-modules/gobblin-crypto/build.gradle
index e9abf2e..b868c1b 100644
--- a/gobblin-modules/gobblin-crypto/build.gradle
+++ b/gobblin-modules/gobblin-crypto/build.gradle
@@ -37,6 +37,19 @@ dependencies {
 
 ext.classification="library"
 
+task testJar(type: Jar, dependsOn: testClasses) {
+  baseName = "test-${project.archivesBaseName}"
+  from sourceSets.test.output
+}
+
+configurations {
+  tests
+}
+
+artifacts {
+  tests testJar
+}
+
 jmh {
   include = ""
   zip64 = true

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGCodec.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGCodec.java
 
b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGCodec.java
index e2330e3..6ecba69 100644
--- 
a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGCodec.java
+++ 
b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGCodec.java
@@ -20,31 +20,74 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
-import org.apache.gobblin.codec.StreamCodec;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
+import org.apache.gobblin.codec.StreamCodec;
 
 /**
- * Codec class that supports GPG decryption (only).
+ * Codec class that supports GPG encryption and decryption.
  */
 public class GPGCodec implements StreamCodec {
   public static final String TAG = "gpg";
 
   private final String password;
+  private final String cipher;
+  private final Path keyRingPath;
+  private final long keyId;
+  private final Configuration conf;
 
-  public GPGCodec(String password) {
+  /**
+   * Constructor for a {@code GPGCodec} configured for password-based 
encryption
+   * @param password password for symmetric encryption
+   * @param cipher the symmetric cipher to use for encryption. If null or 
empty then a default cipher is used.
+   */
+  public GPGCodec(String password, String cipher) {
     this.password = password;
+    this.cipher = cipher;
+
+    this.keyRingPath = null;
+    this.keyId = 0;
+    this.conf = null;
+  }
+
+  /**
+   * Constructor for a {@code GPGCodec} configured for key-based encryption
+   * @param keyRingPath path to the key ring
+   * @param passphrase passphrase for retrieving the decryption key
+   * @param keyId id for retrieving the key used for encryption
+   * @param cipher the symmetric cipher to use for encryption. If null or 
empty then a default cipher is used.
+   */
+  public GPGCodec(Path keyRingPath, String passphrase, long keyId, String 
cipher) {
+    this.keyRingPath = keyRingPath;
+    this.password = passphrase;
+    this.keyId = keyId;
+    this.cipher = cipher;
+    this.conf = new Configuration();
   }
 
   @Override
   public OutputStream encodeOutputStream(OutputStream origStream)
       throws IOException {
-    throw new UnsupportedOperationException("Not implemented yet");
+    if (this.keyRingPath != null) {
+      try (InputStream keyRingInputStream = 
this.keyRingPath.getFileSystem(this.conf).open(this.keyRingPath)) {
+        return GPGFileEncryptor.encryptFile(origStream, keyRingInputStream, 
this.keyId, this.cipher);
+      }
+    } else {
+      return GPGFileEncryptor.encryptFile(origStream, this.password, 
this.cipher);
+    }
   }
 
   @Override
   public InputStream decodeInputStream(InputStream origStream)
       throws IOException {
-    return GPGFileDecryptor.decryptFile(origStream, password);
+    if (this.keyRingPath != null) {
+      try (InputStream keyRingInputStream = 
this.keyRingPath.getFileSystem(this.conf).open(keyRingPath)) {
+        return GPGFileDecryptor.decryptFile(origStream, keyRingInputStream, 
this.password);
+      }
+    } else {
+      return GPGFileDecryptor.decryptFile(origStream, this.password);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileDecryptor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileDecryptor.java
 
b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileDecryptor.java
index 4c673c7..b7026e8 100644
--- 
a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileDecryptor.java
+++ 
b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileDecryptor.java
@@ -81,9 +81,9 @@ public class GPGFileDecryptor {
   /**
    * Taking in a file inputstream, keyring inputstream and a passPhrase, 
generate a decrypted file inputstream.
    * @param inputStream file inputstream
-   * @param keyIn keyring inputstream
+   * @param keyIn keyring inputstream. This InputStream is owned by the caller.
    * @param passPhrase passPhrase
-   * @return
+   * @return an {@link InputStream} for the decrypted content
    * @throws IOException
    */
   public InputStream decryptFile(InputStream inputStream, InputStream keyIn, 
String passPhrase)

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileEncryptor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileEncryptor.java
 
b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileEncryptor.java
new file mode 100644
index 0000000..aebedb2
--- /dev/null
+++ 
b/gobblin-modules/gobblin-crypto/src/main/java/org/apache/gobblin/crypto/GPGFileEncryptor.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.crypto;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Field;
+import java.security.SecureRandom;
+import java.security.Security;
+import java.util.Date;
+import java.util.Set;
+
+import org.apache.commons.lang3.StringUtils;
+import org.bouncycastle.jce.provider.BouncyCastleProvider;
+import org.bouncycastle.openpgp.PGPEncryptedData;
+import org.bouncycastle.openpgp.PGPEncryptedDataGenerator;
+import org.bouncycastle.openpgp.PGPException;
+import org.bouncycastle.openpgp.PGPLiteralDataGenerator;
+import org.bouncycastle.openpgp.PGPPublicKey;
+import org.bouncycastle.openpgp.PGPPublicKeyRingCollection;
+import org.bouncycastle.openpgp.PGPUtil;
+import org.bouncycastle.openpgp.operator.bc.BcKeyFingerprintCalculator;
+import 
org.bouncycastle.openpgp.operator.jcajce.JcePBEKeyEncryptionMethodGenerator;
+import org.bouncycastle.openpgp.operator.jcajce.JcePGPDataEncryptorBuilder;
+import 
org.bouncycastle.openpgp.operator.jcajce.JcePublicKeyKeyEncryptionMethodGenerator;
+import org.reflections.ReflectionUtils;
+
+import com.google.common.base.Preconditions;
+
+import lombok.experimental.UtilityClass;
+
+
+/**
+ * A utility class that supports both password based and key based encryption
+ *
+ * Code reference - org.bouncycastle.openpgp.examples.PBEFileProcessor
+ *                - org.bouncycastle.openpgp.examples.KeyBasedFileProcessor
+ */
+@UtilityClass
+public class GPGFileEncryptor {
+  private static int BUFFER_SIZE = 1024;
+  private static String PAYLOAD_NAME = "payload.file";
+  private static String PROVIDER_NAME = BouncyCastleProvider.PROVIDER_NAME;
+
+  /**
+   * Taking in an input {@link OutputStream} and a passPhrase, return an 
{@link OutputStream} that can be used to output
+   * encrypted output to the input {@link OutputStream}.
+   * @param outputStream the output stream to hold the ciphertext {@link 
OutputStream}
+   * @param passPhrase pass phrase
+   * @param cipher the symmetric cipher to use for encryption. If null or 
empty then a default cipher is used.
+   * @return {@link OutputStream} to write content to for encryption
+   * @throws IOException
+   */
+  public OutputStream encryptFile(OutputStream outputStream, String 
passPhrase, String cipher) throws IOException {
+    try {
+      if (Security.getProvider(PROVIDER_NAME) == null) {
+        Security.addProvider(new BouncyCastleProvider());
+      }
+
+      PGPEncryptedDataGenerator cPk = new PGPEncryptedDataGenerator(
+          new 
JcePGPDataEncryptorBuilder(symmetricKeyAlgorithmNameToTag(cipher))
+              .setSecureRandom(new SecureRandom())
+              .setProvider(PROVIDER_NAME));
+      cPk.addMethod(new 
JcePBEKeyEncryptionMethodGenerator(passPhrase.toCharArray()).setProvider(PROVIDER_NAME));
+
+      OutputStream cOut = cPk.open(outputStream, new byte[BUFFER_SIZE]);
+
+      PGPLiteralDataGenerator literalGen = new PGPLiteralDataGenerator();
+      OutputStream _literalOut =
+          literalGen.open(cOut, PGPLiteralDataGenerator.BINARY, PAYLOAD_NAME, 
new Date(), new byte[BUFFER_SIZE]);
+
+      return new ClosingWrapperOutputStream(_literalOut, cOut, outputStream);
+    } catch (PGPException e) {
+      throw new IOException(e);
+    }
+  }
+
+
+  /**
+   * Taking in an input {@link OutputStream}, keyring inputstream and a 
passPhrase, generate an encrypted {@link OutputStream}.
+   * @param outputStream {@link OutputStream} that will receive the encrypted 
content
+   * @param keyIn keyring inputstream. This InputStream is owned by the caller.
+   * @param keyId key identifier
+   * @param cipher the symmetric cipher to use for encryption. If null or 
empty then a default cipher is used.
+   * @return an {@link OutputStream} to write content to for encryption
+   * @throws IOException
+   */
+  public OutputStream encryptFile(OutputStream outputStream, InputStream 
keyIn, long keyId, String cipher)
+      throws IOException {
+    try {
+      if (Security.getProvider(PROVIDER_NAME) == null) {
+        Security.addProvider(new BouncyCastleProvider());
+      }
+
+      PGPEncryptedDataGenerator cPk = new PGPEncryptedDataGenerator(
+          new 
JcePGPDataEncryptorBuilder(symmetricKeyAlgorithmNameToTag(cipher))
+              .setSecureRandom(new SecureRandom())
+              .setProvider(PROVIDER_NAME));
+
+      PGPPublicKey publicKey;
+      PGPPublicKeyRingCollection keyRings = new 
PGPPublicKeyRingCollection(PGPUtil.getDecoderStream(keyIn),
+          new BcKeyFingerprintCalculator());
+      publicKey = keyRings.getPublicKey(keyId);
+
+      if (publicKey == null) {
+        throw new IllegalArgumentException("public key for encryption not 
found");
+      }
+
+      cPk.addMethod(new 
JcePublicKeyKeyEncryptionMethodGenerator(publicKey).setProvider(PROVIDER_NAME));
+
+      OutputStream cOut = cPk.open(outputStream, new byte[BUFFER_SIZE]);
+
+      PGPLiteralDataGenerator literalGen = new PGPLiteralDataGenerator();
+      OutputStream _literalOut =
+          literalGen.open(cOut, PGPLiteralDataGenerator.BINARY, PAYLOAD_NAME, 
new Date(), new byte[BUFFER_SIZE]);
+
+      return new ClosingWrapperOutputStream(_literalOut, cOut, outputStream);
+    } catch (PGPException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Convert a string cipher name to the integer tag used by GPG
+   * @param cipherName the cipher name
+   * @return integer tag for the cipher
+   */
+  private static int symmetricKeyAlgorithmNameToTag(String cipherName) {
+    // Use CAST5 if no cipher specified
+    if (StringUtils.isEmpty(cipherName)) {
+      return PGPEncryptedData.CAST5;
+    }
+
+    Set<Field> fields = ReflectionUtils.getAllFields(PGPEncryptedData.class, 
ReflectionUtils.withName(cipherName));
+
+    if (fields.isEmpty()) {
+      throw new RuntimeException("Could not find tag for cipher name " + 
cipherName);
+    }
+
+    try {
+      return fields.iterator().next().getInt(null);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException("Could not access field " + cipherName, e);
+    }
+  }
+
+  /**
+   * A class for keeping track of wrapped output streams and closing them when 
this stream is closed.
+   * This is required because GPG wrapping of streams does not propagate the 
close.
+   */
+  private static class ClosingWrapperOutputStream extends OutputStream {
+    private final OutputStream[] outputStreams;
+    private final OutputStream firstStream;
+
+    /**
+     * Creates an output stream that writes to the first {@link OutputStream} 
and closes all of the {@link OutputStream}s
+     * when close() is called
+     * @param outputStreams list of {@link OutputStream}s where the first one 
is written to and the rest are tracked
+     *                      for closing.
+     */
+    public ClosingWrapperOutputStream(OutputStream... outputStreams) {
+      Preconditions.checkArgument(outputStreams.length >= 1);
+
+      this.outputStreams = outputStreams;
+      this.firstStream = outputStreams[0];
+    }
+
+    @Override
+    public void write(byte[] bytes)
+        throws IOException
+    {
+      this.firstStream.write(bytes);
+    }
+
+    @Override
+    public void write(byte[] bytes, int offset, int length)
+        throws IOException
+    {
+      this.firstStream.write(bytes, offset, length);
+    }
+
+    @Override
+    public void write(int b)
+        throws IOException
+    {
+      this.firstStream.write(b);
+    }
+
+    public void flush()
+        throws IOException
+    {
+      for (OutputStream os : this.outputStreams) {
+        os.flush();
+      }
+    }
+
+    public void close()
+        throws IOException
+    {
+      for (OutputStream os : this.outputStreams) {
+        os.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e92e5f8a/gobblin-modules/gobblin-crypto/src/test/java/org/apache/gobblin/crypto/GPGFileEncryptorTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-crypto/src/test/java/org/apache/gobblin/crypto/GPGFileEncryptorTest.java
 
b/gobblin-modules/gobblin-crypto/src/test/java/org/apache/gobblin/crypto/GPGFileEncryptorTest.java
new file mode 100644
index 0000000..285c414
--- /dev/null
+++ 
b/gobblin-modules/gobblin-crypto/src/test/java/org/apache/gobblin/crypto/GPGFileEncryptorTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.crypto;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.bouncycastle.openpgp.PGPException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Test class for {@link GPGFileDecryptor}
+ * Test key and test passphrase are generated offline
+ */
+public class GPGFileEncryptorTest {
+  public static final String PASSWORD = "test";
+  public static final String PASSPHRASE = "gobblin";
+  public static final String PUBLIC_KEY = "/crypto/gpg/testPublic.key";
+  public static final String PRIVATE_KEY = "/crypto/gpg/testPrivate.key";
+  public static final String KEY_ID = "c27093CA21A87D6F";
+  public static final String EXPECTED_FILE_CONTENT = "This is a key based 
encryption file.";
+  public static final byte[] EXPECTED_FILE_CONTENT_BYTES = 
EXPECTED_FILE_CONTENT.getBytes(StandardCharsets.UTF_8);
+
+  /**
+   * Encrypt a test string with a symmetric key and check that it can be 
decrypted
+   * @throws IOException
+   * @throws PGPException
+   */
+  @Test
+  public void encryptSym() throws IOException, PGPException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    OutputStream os = GPGFileEncryptor.encryptFile(baos, PASSWORD, "DES");
+
+    os.write(EXPECTED_FILE_CONTENT_BYTES);
+    os.close();
+    baos.close();
+
+    byte[] encryptedBytes = baos.toByteArray();
+
+    try (InputStream is = GPGFileDecryptor.decryptFile(new 
ByteArrayInputStream(encryptedBytes), "test")) {
+      byte[] decryptedBytes = IOUtils.toByteArray(is);
+
+      Assert.assertNotEquals(EXPECTED_FILE_CONTENT_BYTES, encryptedBytes);
+      Assert.assertEquals(EXPECTED_FILE_CONTENT_BYTES, decryptedBytes);
+    }
+  }
+
+  /**
+   * Encrypt a test string with an asymmetric key and check that it can be 
decrypted
+   * @throws IOException
+   * @throws PGPException
+   */
+  @Test
+  public void encryptAsym() throws IOException, PGPException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    OutputStream os = GPGFileEncryptor.encryptFile(baos, 
getClass().getResourceAsStream(PUBLIC_KEY),
+        Long.parseUnsignedLong(KEY_ID, 16), "CAST5");
+
+    os.write(EXPECTED_FILE_CONTENT_BYTES);
+    os.close();
+    baos.close();
+
+    byte[] encryptedBytes = baos.toByteArray();
+
+    try (InputStream is = GPGFileDecryptor.decryptFile(new 
ByteArrayInputStream(encryptedBytes),
+        getClass().getResourceAsStream(PRIVATE_KEY), PASSPHRASE)) {
+      byte[] decryptedBytes = IOUtils.toByteArray(is);
+
+      Assert.assertNotEquals(EXPECTED_FILE_CONTENT_BYTES, encryptedBytes);
+      Assert.assertEquals(EXPECTED_FILE_CONTENT_BYTES, decryptedBytes);
+    }
+  }
+
+  /**
+   * Test error with bad cipher
+   */
+  @Test(expectedExceptions = RuntimeException.class, 
expectedExceptionsMessageRegExp = ".*BadCipher.*")
+  public void badCipher() throws IOException, PGPException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    OutputStream os = GPGFileEncryptor.encryptFile(baos, 
getClass().getResourceAsStream(PUBLIC_KEY),
+        Long.parseUnsignedLong(KEY_ID, 16), "BadCipher");
+  }
+}

Reply via email to