[ 
https://issues.apache.org/jira/browse/BEAM-3813?focusedWorklogId=106863&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-106863
 ]

ASF GitHub Bot logged work on BEAM-3813:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 29/May/18 21:36
            Start Date: 29/May/18 21:36
    Worklog Time Spent: 10m 
      Work Description: iemejia closed pull request #5491: [BEAM-3813] Support 
(de)serialization of S3 encryption options via JSON
URL: https://github.com/apache/beam/pull/5491
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
index 0c7ce5b292c..48bb74342c1 100644
--- 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/AwsModule.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.beam.sdk.io.aws.options;
 
 import com.amazonaws.auth.AWSCredentialsProvider;
@@ -28,6 +27,8 @@
 import com.amazonaws.auth.PropertiesFileCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
+import com.amazonaws.services.s3.model.SSECustomerKey;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
@@ -49,8 +50,11 @@
 import java.util.Map;
 
 /**
- * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link 
JsonDeserializer}
- * for {@link AWSCredentialsProvider} and some subclasses. The serialized form 
is a JSON map.
+ * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link 
JsonDeserializer} for
+ * {@link AWSCredentialsProvider} and some subclasses. The serialized form is 
a JSON map.
+ *
+ * <p>It also adds serializers for S3 encryption objects {@link 
SSECustomerKey} and {@link
+ * SSEAwsKeyManagementParams}.
  */
 @AutoService(Module.class)
 public class AwsModule extends SimpleModule {
@@ -62,33 +66,31 @@
   public AwsModule() {
     super("AwsModule");
     setMixInAnnotation(AWSCredentialsProvider.class, 
AWSCredentialsProviderMixin.class);
+    setMixInAnnotation(SSECustomerKey.class, SSECustomerKeyMixin.class);
+    setMixInAnnotation(SSEAwsKeyManagementParams.class, 
SSEAwsKeyManagementParamsMixin.class);
   }
 
-  /**
-   * A mixin to add Jackson annotations to {@link AWSCredentialsProvider}.
-   */
+  /** A mixin to add Jackson annotations to {@link AWSCredentialsProvider}. */
   @JsonDeserialize(using = AWSCredentialsProviderDeserializer.class)
   @JsonSerialize(using = AWSCredentialsProviderSerializer.class)
   @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY)
-  private static class AWSCredentialsProviderMixin {
-
-  }
+  private static class AWSCredentialsProviderMixin {}
 
   private static class AWSCredentialsProviderDeserializer
       extends JsonDeserializer<AWSCredentialsProvider> {
 
     @Override
-    public AWSCredentialsProvider deserialize(
-        JsonParser jsonParser, DeserializationContext context) throws 
IOException {
+    public AWSCredentialsProvider deserialize(JsonParser jsonParser, 
DeserializationContext context)
+        throws IOException {
       return context.readValue(jsonParser, AWSCredentialsProvider.class);
     }
 
     @Override
-    public AWSCredentialsProvider deserializeWithType(JsonParser jsonParser,
-        DeserializationContext context, TypeDeserializer typeDeserializer) 
throws IOException {
+    public AWSCredentialsProvider deserializeWithType(
+        JsonParser jsonParser, DeserializationContext context, 
TypeDeserializer typeDeserializer)
+        throws IOException {
       Map<String, String> asMap =
-          jsonParser.readValueAs(new TypeReference<Map<String, String>>() {
-          });
+          jsonParser.readValueAs(new TypeReference<Map<String, String>>() {});
 
       String typeNameKey = typeDeserializer.getPropertyName();
       String typeName = asMap.get(typeNameKey);
@@ -102,8 +104,8 @@ public AWSCredentialsProvider 
deserializeWithType(JsonParser jsonParser,
             new BasicAWSCredentials(asMap.get(AWS_ACCESS_KEY_ID), 
asMap.get(AWS_SECRET_KEY)));
       } else if 
(typeName.equals(PropertiesFileCredentialsProvider.class.getSimpleName())) {
         return new 
PropertiesFileCredentialsProvider(asMap.get(CREDENTIALS_FILE_PATH));
-      } else if (typeName
-          
.equals(ClasspathPropertiesFileCredentialsProvider.class.getSimpleName())) {
+      } else if (typeName.equals(
+          ClasspathPropertiesFileCredentialsProvider.class.getSimpleName())) {
         return new 
ClasspathPropertiesFileCredentialsProvider(asMap.get(CREDENTIALS_FILE_PATH));
       } else if 
(typeName.equals(DefaultAWSCredentialsProviderChain.class.getSimpleName())) {
         return new DefaultAWSCredentialsProviderChain();
@@ -122,7 +124,8 @@ public AWSCredentialsProvider 
deserializeWithType(JsonParser jsonParser,
     }
   }
 
-  static class AWSCredentialsProviderSerializer extends 
JsonSerializer<AWSCredentialsProvider> {
+  private static class AWSCredentialsProviderSerializer
+      extends JsonSerializer<AWSCredentialsProvider> {
     // These providers are singletons, so don't require any serialization, 
other than type.
     private static final ImmutableSet<Object> SINGLETON_CREDENTIAL_PROVIDERS =
         ImmutableSet.of(
@@ -133,14 +136,20 @@ public AWSCredentialsProvider 
deserializeWithType(JsonParser jsonParser,
             EC2ContainerCredentialsProviderWrapper.class);
 
     @Override
-    public void serialize(AWSCredentialsProvider credentialsProvider, 
JsonGenerator jsonGenerator,
-        SerializerProvider serializers) throws IOException {
+    public void serialize(
+        AWSCredentialsProvider credentialsProvider,
+        JsonGenerator jsonGenerator,
+        SerializerProvider serializers)
+        throws IOException {
       serializers.defaultSerializeValue(credentialsProvider, jsonGenerator);
     }
 
     @Override
-    public void serializeWithType(AWSCredentialsProvider credentialsProvider,
-        JsonGenerator jsonGenerator, SerializerProvider serializers, 
TypeSerializer typeSerializer)
+    public void serializeWithType(
+        AWSCredentialsProvider credentialsProvider,
+        JsonGenerator jsonGenerator,
+        SerializerProvider serializers,
+        TypeSerializer typeSerializer)
         throws IOException {
       typeSerializer.writeTypePrefixForObject(credentialsProvider, 
jsonGenerator);
 
@@ -163,14 +172,15 @@ public void serializeWithType(AWSCredentialsProvider 
credentialsProvider,
           throw new IOException("failed to access private field with 
reflection", e);
         }
 
-      } else if (credentialsProvider.getClass()
+      } else if (credentialsProvider
+          .getClass()
           .equals(ClasspathPropertiesFileCredentialsProvider.class)) {
         try {
           ClasspathPropertiesFileCredentialsProvider specificProvider =
               (ClasspathPropertiesFileCredentialsProvider) credentialsProvider;
           Field field =
-              ClasspathPropertiesFileCredentialsProvider.class
-                  .getDeclaredField(CREDENTIALS_FILE_PATH);
+              
ClasspathPropertiesFileCredentialsProvider.class.getDeclaredField(
+                  CREDENTIALS_FILE_PATH);
           field.setAccessible(true);
           String credentialsFilePath = ((String) field.get(specificProvider));
           jsonGenerator.writeStringField(CREDENTIALS_FILE_PATH, 
credentialsFilePath);
@@ -185,4 +195,43 @@ public void serializeWithType(AWSCredentialsProvider 
credentialsProvider,
       typeSerializer.writeTypeSuffixForObject(credentialsProvider, 
jsonGenerator);
     }
   }
+
+  /** A mixin to add Jackson annotations to {@link SSECustomerKey}. */
+  @JsonDeserialize(using = SSECustomerKeyDeserializer.class)
+  private static class SSECustomerKeyMixin {}
+
+  private static class SSECustomerKeyDeserializer extends 
JsonDeserializer<SSECustomerKey> {
+    @Override
+    public SSECustomerKey deserialize(JsonParser parser, 
DeserializationContext context)
+        throws IOException {
+      Map<String, String> asMap = parser.readValueAs(new 
TypeReference<Map<String, String>>() {});
+
+      final String key = asMap.getOrDefault("key", null);
+      final String algorithm = asMap.getOrDefault("algorithm", null);
+      final String md5 = asMap.getOrDefault("md5", null);
+      SSECustomerKey sseCustomerKey = new SSECustomerKey(key);
+      if (algorithm != null) {
+        sseCustomerKey.setAlgorithm(algorithm);
+      }
+      if (md5 != null) {
+        sseCustomerKey.setMd5(md5);
+      }
+      return sseCustomerKey;
+    }
+  }
+
+  /** A mixin to add Jackson annotations to {@link SSEAwsKeyManagementParams}. 
*/
+  @JsonDeserialize(using = SSEAwsKeyManagementParamsDeserializer.class)
+  private static class SSEAwsKeyManagementParamsMixin {}
+
+  private static class SSEAwsKeyManagementParamsDeserializer
+      extends JsonDeserializer<SSEAwsKeyManagementParams> {
+    @Override
+    public SSEAwsKeyManagementParams deserialize(JsonParser parser, 
DeserializationContext context)
+        throws IOException {
+      Map<String, String> asMap = parser.readValueAs(new 
TypeReference<Map<String, String>>() {});
+      final String awsKmsKeyId = asMap.getOrDefault("awsKmsKeyId", null);
+      return new SSEAwsKeyManagementParams(awsKmsKeyId);
+    }
+  }
 }
diff --git 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/S3Options.java
 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/S3Options.java
index 22ab91ce39f..58053816d48 100644
--- 
a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/S3Options.java
+++ 
b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/options/S3Options.java
@@ -25,9 +25,7 @@
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 
-/**
- * Options used to configure Amazon Web Services S3.
- */
+/** Options used to configure Amazon Web Services S3. */
 public interface S3Options extends AwsOptions {
 
   @Description("AWS S3 storage class used for creating S3 objects")
@@ -53,12 +51,18 @@
   String getSSEAlgorithm();
   void setSSEAlgorithm(String value);
 
-  @Description("SSE key for SSE-C encryption, e.g. a base64 encoded key and 
the algorithm.")
+  @Description(
+      "SSE key for SSE-C encryption, e.g. a base64 encoded key and the 
algorithm."
+          + "To specify on the command-line, represent the value as a JSON 
object. For example:"
+          + " --SSECustomerKey={\"key\": \"86glyTlCN...\", \"algorithm\": 
\"AES256\"}")
   @Nullable
   SSECustomerKey getSSECustomerKey();
   void setSSECustomerKey(SSECustomerKey value);
 
-  @Description("KMS key id for SSE-KMS encryption, e.g. \"arn:aws:kms:...\".")
+  @Description(
+      "KMS key id for SSE-KMS encryption, e.g. \"arn:aws:kms:...\"."
+          + "To specify on the command-line, represent the value as a JSON 
object. For example:"
+          + " --SSEAwsKeyManagementParams={\"awsKmsKeyId\": 
\"arn:aws:kms:...\"}")
   @Nullable
   SSEAwsKeyManagementParams getSSEAwsKeyManagementParams();
   void setSSEAwsKeyManagementParams(SSEAwsKeyManagementParams value);
diff --git 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleTest.java
 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleTest.java
index 91eafe432eb..c9cf2b7fb12 100644
--- 
a/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleTest.java
+++ 
b/sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/options/AwsModuleTest.java
@@ -32,6 +32,8 @@
 import com.amazonaws.auth.PropertiesFileCredentialsProvider;
 import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
 import com.amazonaws.auth.profile.ProfileCredentialsProvider;
+import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
+import com.amazonaws.services.s3.model.SSECustomerKey;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.lang.reflect.Field;
@@ -42,9 +44,7 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/**
- * Tests {@link AwsModule}.
- */
+/** Tests {@link AwsModule}. */
 @RunWith(JUnit4.class)
 public class AwsModuleTest {
 
@@ -58,7 +58,6 @@ public void testObjectMapperIsAbleToFindModule() {
 
   @Test
   public void testAWSStaticCredentialsProviderSerializationDeserialization() 
throws Exception {
-
     String awsKeyId = "key-id";
     String awsSecretKey = "secret-key";
 
@@ -70,15 +69,16 @@ public void 
testAWSStaticCredentialsProviderSerializationDeserialization() throw
         objectMapper.readValue(serializedCredentialsProvider, 
AWSCredentialsProvider.class);
 
     assertEquals(credentialsProvider.getClass(), 
deserializedCredentialsProvider.getClass());
-    assertEquals(credentialsProvider.getCredentials().getAWSAccessKeyId(),
+    assertEquals(
+        credentialsProvider.getCredentials().getAWSAccessKeyId(),
         deserializedCredentialsProvider.getCredentials().getAWSAccessKeyId());
-    assertEquals(credentialsProvider.getCredentials().getAWSSecretKey(),
+    assertEquals(
+        credentialsProvider.getCredentials().getAWSSecretKey(),
         deserializedCredentialsProvider.getCredentials().getAWSSecretKey());
   }
 
   @Test
   public void 
testPropertiesFileCredentialsProviderSerializationDeserialization() throws 
Exception {
-
     String credentialsFilePath = "/path/to/file";
 
     PropertiesFileCredentialsProvider credentialsProvider =
@@ -90,8 +90,7 @@ public void 
testPropertiesFileCredentialsProviderSerializationDeserialization()
 
     assertEquals(credentialsProvider.getClass(), 
deserializedCredentialsProvider.getClass());
 
-    Field field =
-        
PropertiesFileCredentialsProvider.class.getDeclaredField("credentialsFilePath");
+    Field field = 
PropertiesFileCredentialsProvider.class.getDeclaredField("credentialsFilePath");
     field.setAccessible(true);
     String deserializedCredentialsFilePath = ((String) 
field.get(deserializedCredentialsProvider));
     assertEquals(credentialsFilePath, deserializedCredentialsFilePath);
@@ -100,7 +99,6 @@ public void 
testPropertiesFileCredentialsProviderSerializationDeserialization()
   @Test
   public void 
testClasspathPropertiesFileCredentialsProviderSerializationDeserialization()
       throws Exception {
-
     String credentialsFilePath = "/path/to/file";
 
     ClasspathPropertiesFileCredentialsProvider credentialsProvider =
@@ -155,4 +153,33 @@ public void 
testSingletonAWSCredentialsProviderSerializationDeserialization() th
         objectMapper.readValue(serializedCredentialsProvider, 
AWSCredentialsProvider.class);
     assertEquals(credentialsProvider.getClass(), 
deserializedCredentialsProvider.getClass());
   }
+
+  @Test
+  public void testSSECustomerKeySerializationDeserialization() throws 
Exception {
+    final String key = "86glyTlCNZgccSxW8JxMa6ZdjdK3N141glAysPUZ3AA=";
+    final String md5 = null;
+    final String algorithm = "AES256";
+
+    SSECustomerKey value = new SSECustomerKey(key);
+
+    String valueAsJson = objectMapper.writeValueAsString(value);
+    SSECustomerKey valueDes = objectMapper.readValue(valueAsJson, 
SSECustomerKey.class);
+    assertEquals(key, valueDes.getKey());
+    assertEquals(algorithm, valueDes.getAlgorithm());
+    assertEquals(md5, valueDes.getMd5());
+  }
+
+  @Test
+  public void testSSEAwsKeyManagementParamsSerializationDeserialization() 
throws Exception {
+    final String awsKmsKeyId =
+        
"arn:aws:kms:eu-west-1:123456789012:key/dc123456-7890-ABCD-EF01-234567890ABC";
+    final String encryption = "aws:kms";
+    SSEAwsKeyManagementParams value = new 
SSEAwsKeyManagementParams(awsKmsKeyId);
+
+    String valueAsJson = objectMapper.writeValueAsString(value);
+    SSEAwsKeyManagementParams valueDes =
+        objectMapper.readValue(valueAsJson, SSEAwsKeyManagementParams.class);
+    assertEquals(awsKmsKeyId, valueDes.getAwsKmsKeyId());
+    assertEquals(encryption, valueDes.getEncryption());
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 106863)
    Time Spent: 3h 40m  (was: 3.5h)

> Support encryption for S3FileSystem (SSE-S3, SSE-C and SSE-KMS)
> ---------------------------------------------------------------
>
>                 Key: BEAM-3813
>                 URL: https://issues.apache.org/jira/browse/BEAM-3813
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-aws
>            Reporter: Ismaël Mejía
>            Assignee: Ismaël Mejía
>            Priority: Minor
>             Fix For: 2.5.0
>
>          Time Spent: 3h 40m
>  Remaining Estimate: 0h
>
> We should enable AWS S3 users to use encryption when reading or writing by 
> provide encryption keys or using server side encryption via an algorithm, or 
> a key management system (KMS)..
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to