This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f23d5c  support Server Side Encryption for S3FileIO (#1754)
6f23d5c is described below

commit 6f23d5c7e9e36d59b1401e63fbf2e46aecf6ca90
Author: jackye1995 <[email protected]>
AuthorDate: Mon Nov 16 13:17:17 2020 -0800

    support Server Side Encryption for S3FileIO (#1754)
---
 .../org/apache/iceberg/aws/AwsIntegTestUtil.java   |  58 ++++++
 .../org/apache/iceberg/aws/s3/S3FileIOTest.java    | 195 +++++++++++++++++++++
 .../java/org/apache/iceberg/aws/AwsClientUtil.java |   7 +
 .../java/org/apache/iceberg/aws/AwsProperties.java | 112 ++++++++++++
 .../java/org/apache/iceberg/aws/s3/BaseS3File.java |  25 ++-
 .../java/org/apache/iceberg/aws/s3/S3FileIO.java   |  20 ++-
 .../org/apache/iceberg/aws/s3/S3InputFile.java     |   9 +-
 .../org/apache/iceberg/aws/s3/S3InputStream.java   |  22 ++-
 .../org/apache/iceberg/aws/s3/S3OutputFile.java    |  11 +-
 .../org/apache/iceberg/aws/s3/S3OutputStream.java  |  41 ++++-
 build.gradle                                       |  20 +++
 11 files changed, 502 insertions(+), 18 deletions(-)

diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java 
b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
new file mode 100644
index 0000000..a71804c
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws;
+
+import java.util.stream.Collectors;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.Delete;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+
+public class AwsIntegTestUtil {
+
+  private AwsIntegTestUtil() {
+  }
+
+  /**
+   * Set the environment variable AWS_TEST_BUCKET for a default bucket to use 
for testing
+   * @return bucket name
+   */
+  public static String testBucketName() {
+    return System.getenv("AWS_TEST_BUCKET");
+  }
+
+  public static void cleanS3Bucket(S3Client s3, String bucketName, String 
prefix) {
+    boolean hasContent = true;
+    while (hasContent) {
+      ListObjectsV2Response response = 
s3.listObjectsV2(ListObjectsV2Request.builder()
+          .bucket(bucketName).prefix(prefix).build());
+      hasContent = response.hasContents();
+      if (hasContent) {
+        
s3.deleteObjects(DeleteObjectsRequest.builder().bucket(bucketName).delete(Delete.builder().objects(
+            response.contents().stream()
+                .map(obj -> ObjectIdentifier.builder().key(obj.key()).build())
+                .collect(Collectors.toList())
+        ).build()).build());
+      }
+    }
+  }
+}
diff --git 
a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java 
b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java
new file mode 100644
index 0000000..4b5ce03
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.s3;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.SecureRandom;
+import java.util.Base64;
+import java.util.UUID;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+import org.apache.iceberg.aws.AwsClientUtil;
+import org.apache.iceberg.aws.AwsIntegTestUtil;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.awssdk.services.kms.model.ListAliasesRequest;
+import software.amazon.awssdk.services.kms.model.ListAliasesResponse;
+import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionRequest;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+import software.amazon.awssdk.utils.IoUtils;
+
+public class S3FileIOTest {
+
+  private static S3Client s3;
+  private static KmsClient kms;
+  private static String bucketName;
+  private static String prefix;
+  private static byte[] contentBytes;
+  private static String content;
+  private static String kmsKeyArn;
+  private String objectKey;
+  private String objectUri;
+
+  @BeforeClass
+  public static void beforeClass() {
+    s3 = AwsClientUtil.defaultS3Client();
+    kms = AwsClientUtil.defaultKmsClient();
+    bucketName = AwsIntegTestUtil.testBucketName();
+    prefix = UUID.randomUUID().toString();
+    contentBytes = new byte[1024 * 1024 * 10];
+    content = new String(contentBytes, StandardCharsets.UTF_8);
+    kmsKeyArn = kms.createKey().keyMetadata().arn();
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    AwsIntegTestUtil.cleanS3Bucket(s3, bucketName, prefix);
+    
kms.scheduleKeyDeletion(ScheduleKeyDeletionRequest.builder().keyId(kmsKeyArn).pendingWindowInDays(7).build());
+  }
+
+  @Before
+  public void before() {
+    objectKey = String.format("%s/%s", prefix, UUID.randomUUID().toString());
+    objectUri = String.format("s3://%s/%s", bucketName, objectKey);
+  }
+
+  @Test
+  public void testNewInputStream() throws Exception {
+    
s3.putObject(PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
+        RequestBody.fromBytes(contentBytes));
+    S3FileIO s3FileIO = new S3FileIO();
+    validateRead(s3FileIO);
+  }
+
+  @Test
+  public void testNewOutputStream() throws Exception {
+    S3FileIO s3FileIO = new S3FileIO();
+    write(s3FileIO);
+    InputStream stream = 
s3.getObject(GetObjectRequest.builder().bucket(bucketName).key(objectKey).build());
+    String result = IoUtils.toUtf8String(stream);
+    stream.close();
+    Assert.assertEquals(content, result);
+  }
+
+  @Test
+  public void testSSE_S3() throws Exception {
+    AwsProperties properties = new AwsProperties();
+    properties.setS3FileIoSseType(AwsProperties.S3FILEIO_SSE_TYPE_S3);
+    S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client, 
properties);
+    write(s3FileIO);
+    validateRead(s3FileIO);
+    GetObjectResponse response = s3.getObject(
+        
GetObjectRequest.builder().bucket(bucketName).key(objectKey).build()).response();
+    Assert.assertEquals(ServerSideEncryption.AES256, 
response.serverSideEncryption());
+  }
+
+  @Test
+  public void testSSE_KMS() throws Exception {
+    AwsProperties properties = new AwsProperties();
+    properties.setS3FileIoSseType(AwsProperties.S3FILEIO_SSE_TYPE_KMS);
+    properties.setS3FileIoSseKey(kmsKeyArn);
+    S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client, 
properties);
+    write(s3FileIO);
+    validateRead(s3FileIO);
+    GetObjectResponse response = s3.getObject(
+        
GetObjectRequest.builder().bucket(bucketName).key(objectKey).build()).response();
+    Assert.assertEquals(ServerSideEncryption.AWS_KMS, 
response.serverSideEncryption());
+    Assert.assertEquals(response.ssekmsKeyId(), kmsKeyArn);
+  }
+
+  @Test
+  public void testSSE_KMS_default() throws Exception {
+    AwsProperties properties = new AwsProperties();
+    properties.setS3FileIoSseType(AwsProperties.S3FILEIO_SSE_TYPE_KMS);
+    S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client, 
properties);
+    write(s3FileIO);
+    validateRead(s3FileIO);
+    GetObjectResponse response = s3.getObject(
+        
GetObjectRequest.builder().bucket(bucketName).key(objectKey).build()).response();
+    Assert.assertEquals(ServerSideEncryption.AWS_KMS, 
response.serverSideEncryption());
+    ListAliasesResponse listAliasesResponse = kms.listAliases(
+        ListAliasesRequest.builder().keyId(response.ssekmsKeyId()).build());
+    Assert.assertTrue(listAliasesResponse.hasAliases());
+    Assert.assertEquals(1, listAliasesResponse.aliases().size());
+    Assert.assertEquals("alias/aws/s3", 
listAliasesResponse.aliases().get(0).aliasName());
+  }
+
+  @Test
+  public void testSSE_Custom() throws Exception {
+    // generate key
+    KeyGenerator keyGenerator = KeyGenerator.getInstance("AES");
+    keyGenerator.init(256, new SecureRandom());
+    SecretKey secretKey = keyGenerator.generateKey();
+    Base64.Encoder encoder = Base64.getEncoder();
+    String encodedKey = new String(encoder.encode(secretKey.getEncoded()), 
StandardCharsets.UTF_8);
+    // generate md5
+    MessageDigest digest = MessageDigest.getInstance("MD5");
+    String md5 = new 
String(encoder.encode(digest.digest(secretKey.getEncoded())), 
StandardCharsets.UTF_8);
+
+    AwsProperties properties = new AwsProperties();
+    properties.setS3FileIoSseType(AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM);
+    properties.setS3FileIoSseKey(encodedKey);
+    properties.setS3FileIoSseMd5(md5);
+    S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client, 
properties);
+    write(s3FileIO);
+    validateRead(s3FileIO);
+    GetObjectResponse response = s3.getObject(
+        GetObjectRequest.builder().bucket(bucketName).key(objectKey)
+            .sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
+            .sseCustomerKey(encodedKey)
+            .sseCustomerKeyMD5(md5)
+            .build()).response();
+    Assert.assertNull(response.serverSideEncryption());
+    Assert.assertEquals(ServerSideEncryption.AES256.name(), 
response.sseCustomerAlgorithm());
+    Assert.assertEquals(md5, response.sseCustomerKeyMD5());
+  }
+
+  private void write(S3FileIO s3FileIO) throws Exception {
+    OutputFile outputFile = s3FileIO.newOutputFile(objectUri);
+    OutputStream outputStream = outputFile.create();
+    IoUtils.copy(new ByteArrayInputStream(contentBytes), outputStream);
+    outputStream.close();
+  }
+
+  private void validateRead(S3FileIO s3FileIO) throws Exception {
+    InputFile file = s3FileIO.newInputFile(objectUri);
+    Assert.assertEquals(contentBytes.length, file.getLength());
+    InputStream stream = file.newStream();
+    String result = IoUtils.toUtf8String(stream);
+    stream.close();
+    Assert.assertEquals(content, result);
+  }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java 
b/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
index aa2d1f3..e75b3f0 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
@@ -20,6 +20,7 @@
 package org.apache.iceberg.aws;
 
 import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.kms.KmsClient;
 import software.amazon.awssdk.services.s3.S3Client;
 
 /**
@@ -40,4 +41,10 @@ public class AwsClientUtil {
         .httpClient(UrlConnectionHttpClient.create())
         .build();
   }
+
+  public static KmsClient defaultKmsClient() {
+    return KmsClient.builder()
+        .httpClient(UrlConnectionHttpClient.create())
+        .build();
+  }
 }
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java 
b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
new file mode 100644
index 0000000..b9c33df
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AwsProperties {
+
+  /**
+   * Type of S3 Server side encryption used, default to {@link 
AwsProperties#S3FILEIO_SSE_TYPE_NONE}.
+   */
+  public static final String S3FILEIO_SSE_TYPE = "s3fileio.sse.type";
+
+  /**
+   * No server side encryption.
+   */
+  public static final String S3FILEIO_SSE_TYPE_NONE = "none";
+
+  /**
+   * S3 SSE-KMS encryption.
+   * For more details: 
https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
+   */
+  public static final String S3FILEIO_SSE_TYPE_KMS = "kms";
+
+  /**
+   * S3 SSE-S3 encryption.
+   * For more details: 
https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
+   */
+  public static final String S3FILEIO_SSE_TYPE_S3 = "s3";
+
+  /**
+   * S3 SSE-C encryption.
+   * For more details: 
https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
+   */
+  public static final String S3FILEIO_SSE_TYPE_CUSTOM = "custom";
+
+  /**
+   * If S3 encryption type is SSE-KMS, input is a KMS Key ID or ARN.
+   *   In case this property is not set, default key "aws/s3" is used.
+   * If encryption type is SSE-C, input is a custom base-64 AES256 symmetric 
key.
+   */
+  public static final String S3FILEIO_SSE_KEY = "s3fileio.sse.key";
+
+  /**
+   * If S3 encryption type is SSE-C, input is the base-64 MD5 digest of the 
secret key.
+   * This MD5 must be explicitly passed in by the caller to ensure key 
integrity.
+   */
+  public static final String S3FILEIO_SSE_MD5 = "s3fileio.sse.md5";
+
+  private String s3FileIoSseType;
+  private String s3FileIoSseKey;
+  private String s3FileIoSseMd5;
+
+  public AwsProperties() {
+    this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE;
+    this.s3FileIoSseKey = null;
+    this.s3FileIoSseMd5 = null;
+  }
+
+  public AwsProperties(Map<String, String> properties) {
+    this.s3FileIoSseType = properties.getOrDefault(
+        AwsProperties.S3FILEIO_SSE_TYPE, AwsProperties.S3FILEIO_SSE_TYPE_NONE);
+    this.s3FileIoSseKey = properties.get(AwsProperties.S3FILEIO_SSE_KEY);
+    this.s3FileIoSseMd5 = properties.get(AwsProperties.S3FILEIO_SSE_MD5);
+    if (AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM.equals(s3FileIoSseType)) {
+      Preconditions.checkNotNull(s3FileIoSseKey, "Cannot initialize SSE-C 
S3FileIO with null encryption key");
+      Preconditions.checkNotNull(s3FileIoSseMd5, "Cannot initialize SSE-C 
S3FileIO with null encryption key MD5");
+    }
+  }
+
+  public String s3FileIoSseType() {
+    return s3FileIoSseType;
+  }
+
+  public void setS3FileIoSseType(String sseType) {
+    this.s3FileIoSseType = sseType;
+  }
+
+  public String s3FileIoSseKey() {
+    return s3FileIoSseKey;
+  }
+
+  public void setS3FileIoSseKey(String sseKey) {
+    this.s3FileIoSseKey = sseKey;
+  }
+
+  public String s3FileIoSseMd5() {
+    return s3FileIoSseMd5;
+  }
+
+  public void setS3FileIoSseMd5(String sseMd5) {
+    this.s3FileIoSseMd5 = sseMd5;
+  }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java
index 7933573..d907106 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java
@@ -19,20 +19,28 @@
 
 package org.apache.iceberg.aws.s3;
 
+import org.apache.iceberg.aws.AwsProperties;
 import software.amazon.awssdk.http.HttpStatusCode;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
 
 abstract class BaseS3File {
   private final S3Client client;
   private final S3URI uri;
+  private final AwsProperties awsProperties;
   private HeadObjectResponse metadata;
 
   BaseS3File(S3Client client, S3URI uri) {
+    this(client, uri, new AwsProperties());
+  }
+
+  BaseS3File(S3Client client, S3URI uri, AwsProperties awsProperties) {
     this.client = client;
     this.uri = uri;
+    this.awsProperties = awsProperties;
   }
 
   public String location() {
@@ -47,6 +55,10 @@ abstract class BaseS3File {
     return uri;
   }
 
+  public AwsProperties awsProperties() {
+    return awsProperties;
+  }
+
   /**
    * Note: this may be stale if file was deleted since metadata is cached for 
size/existence checks.
    *
@@ -66,10 +78,17 @@ abstract class BaseS3File {
 
   protected HeadObjectResponse getObjectMetadata() throws S3Exception {
     if (metadata == null) {
-      metadata = client().headObject(HeadObjectRequest.builder()
+      HeadObjectRequest.Builder requestBuilder = HeadObjectRequest.builder()
           .bucket(uri().bucket())
-          .key(uri().key())
-          .build());
+          .key(uri().key());
+
+      if 
(AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM.equals(awsProperties.s3FileIoSseType()))
 {
+        
requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name());
+        requestBuilder.sseCustomerKey(awsProperties.s3FileIoSseKey());
+        requestBuilder.sseCustomerKeyMD5(awsProperties.s3FileIoSseMd5());
+      }
+
+      metadata = client().headObject(requestBuilder.build());
     }
 
     return metadata;
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index 1f69ec5..443c58f 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -19,7 +19,9 @@
 
 package org.apache.iceberg.aws.s3;
 
+import java.util.Map;
 import org.apache.iceberg.aws.AwsClientUtil;
+import org.apache.iceberg.aws.AwsProperties;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
@@ -36,24 +38,31 @@ import 
software.amazon.awssdk.services.s3.model.ObjectIdentifier;
  */
 public class S3FileIO implements FileIO {
   private final SerializableSupplier<S3Client> s3;
+  private AwsProperties awsProperties;
+
   private transient S3Client client;
 
   public S3FileIO() {
-    this.s3 = AwsClientUtil::defaultS3Client;
+    this(AwsClientUtil::defaultS3Client);
   }
 
   public S3FileIO(SerializableSupplier<S3Client> s3) {
+    this(s3, new AwsProperties());
+  }
+
+  public S3FileIO(SerializableSupplier<S3Client> s3, AwsProperties 
awsProperties) {
     this.s3 = s3;
+    this.awsProperties = awsProperties;
   }
 
   @Override
   public InputFile newInputFile(String path) {
-    return new S3InputFile(client(), new S3URI(path));
+    return new S3InputFile(client(), new S3URI(path), awsProperties);
   }
 
   @Override
   public OutputFile newOutputFile(String path) {
-    return new S3OutputFile(client(), new S3URI(path));
+    return new S3OutputFile(client(), new S3URI(path), awsProperties);
   }
 
   @Override
@@ -73,4 +82,9 @@ public class S3FileIO implements FileIO {
     }
     return client;
   }
+
+  @Override
+  public void initialize(Map<String, String> properties) {
+    this.awsProperties = new AwsProperties(properties);
+  }
 }
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
index 4450161..93d86c3 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
@@ -19,13 +19,18 @@
 
 package org.apache.iceberg.aws.s3;
 
+import org.apache.iceberg.aws.AwsProperties;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.SeekableInputStream;
 import software.amazon.awssdk.services.s3.S3Client;
 
 public class S3InputFile extends BaseS3File implements InputFile {
   public S3InputFile(S3Client client, S3URI uri) {
-    super(client, uri);
+    this(client, uri, new AwsProperties());
+  }
+
+  public S3InputFile(S3Client client, S3URI uri, AwsProperties awsProperties) {
+    super(client, uri, awsProperties);
   }
 
   /**
@@ -40,7 +45,7 @@ public class S3InputFile extends BaseS3File implements 
InputFile {
 
   @Override
   public SeekableInputStream newStream() {
-    return new S3InputStream(client(), uri());
+    return new S3InputStream(client(), uri(), awsProperties());
   }
 
 }
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
index f506067..2d3962b 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.aws.s3;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
+import org.apache.iceberg.aws.AwsProperties;
 import org.apache.iceberg.io.SeekableInputStream;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.core.sync.ResponseTransformer;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
 
 class S3InputStream extends SeekableInputStream {
   private static final Logger LOG = 
LoggerFactory.getLogger(S3InputStream.class);
@@ -38,6 +40,7 @@ class S3InputStream extends SeekableInputStream {
   private final StackTraceElement[] createStack;
   private final S3Client s3;
   private final S3URI location;
+  private final AwsProperties awsProperties;
 
   private InputStream stream;
   private long pos = 0;
@@ -47,8 +50,13 @@ class S3InputStream extends SeekableInputStream {
   private int skipSize = 1024 * 1024;
 
   S3InputStream(S3Client s3, S3URI location) {
+    this(s3, location, new AwsProperties());
+  }
+
+  S3InputStream(S3Client s3, S3URI location, AwsProperties awsProperties) {
     this.s3 = s3;
     this.location = location;
+    this.awsProperties = awsProperties;
 
     createStack = Thread.currentThread().getStackTrace();
   }
@@ -126,13 +134,19 @@ class S3InputStream extends SeekableInputStream {
   }
 
   private void openStream() throws IOException {
-    GetObjectRequest request = GetObjectRequest.builder()
+    GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
         .bucket(location.bucket())
         .key(location.key())
-        .range(String.format("bytes=%s-", pos))
-        .build();
+        .range(String.format("bytes=%s-", pos));
+
+    if 
(AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM.equals(awsProperties.s3FileIoSseType()))
 {
+      requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name());
+      requestBuilder.sseCustomerKey(awsProperties.s3FileIoSseKey());
+      requestBuilder.sseCustomerKeyMD5(awsProperties.s3FileIoSseMd5());
+    }
+
     closeStream();
-    stream = s3.getObject(request, ResponseTransformer.toInputStream());
+    stream = s3.getObject(requestBuilder.build(), 
ResponseTransformer.toInputStream());
   }
 
   private void closeStream() throws IOException {
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
index 0996ea0..44a745a 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.aws.s3;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import org.apache.iceberg.aws.AwsProperties;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
@@ -29,7 +30,11 @@ import software.amazon.awssdk.services.s3.S3Client;
 
 public class S3OutputFile extends BaseS3File implements OutputFile {
   public S3OutputFile(S3Client client, S3URI uri) {
-    super(client, uri);
+    this(client, uri, new AwsProperties());
+  }
+
+  public S3OutputFile(S3Client client, S3URI uri, AwsProperties awsProperties) 
{
+    super(client, uri, awsProperties);
   }
 
   /**
@@ -50,7 +55,7 @@ public class S3OutputFile extends BaseS3File implements 
OutputFile {
   @Override
   public PositionOutputStream createOrOverwrite() {
     try {
-      return new S3OutputStream(client(), uri());
+      return new S3OutputStream(client(), uri(), awsProperties());
     } catch (IOException e) {
       throw new UncheckedIOException("Filed to create output stream for 
location: " + uri(), e);
     }
@@ -58,6 +63,6 @@ public class S3OutputFile extends BaseS3File implements 
OutputFile {
 
   @Override
   public InputFile toInputFile() {
-    return new S3InputFile(client(), uri());
+    return new S3InputFile(client(), uri(), awsProperties());
   }
 }
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java 
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
index 2ae94a2..42b9608 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
@@ -25,6 +25,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
+import java.util.Locale;
+import org.apache.iceberg.aws.AwsProperties;
 import org.apache.iceberg.io.PositionOutputStream;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.slf4j.Logger;
@@ -32,6 +34,7 @@ import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.core.sync.RequestBody;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
 
 class S3OutputStream extends PositionOutputStream {
   private static final Logger LOG = 
LoggerFactory.getLogger(S3OutputStream.class);
@@ -39,6 +42,7 @@ class S3OutputStream extends PositionOutputStream {
   private final StackTraceElement[] createStack;
   private final S3Client s3;
   private final S3URI location;
+  private final AwsProperties awsProperties;
 
   private final OutputStream stream;
   private final File stagingFile;
@@ -47,8 +51,13 @@ class S3OutputStream extends PositionOutputStream {
   private boolean closed = false;
 
   S3OutputStream(S3Client s3, S3URI location) throws IOException {
+    this(s3, location, new AwsProperties());
+  }
+
+  S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties) 
throws IOException {
     this.s3 = s3;
     this.location = location;
+    this.awsProperties = awsProperties;
 
     createStack = Thread.currentThread().getStackTrace();
     stagingFile = File.createTempFile("s3fileio-", ".tmp");
@@ -91,9 +100,35 @@ class S3OutputStream extends PositionOutputStream {
     try {
       stream.close();
 
-      s3.putObject(
-          
PutObjectRequest.builder().bucket(location.bucket()).key(location.key()).build(),
-          RequestBody.fromFile(stagingFile));
+      PutObjectRequest.Builder requestBuilder = PutObjectRequest.builder()
+          .bucket(location.bucket())
+          .key(location.key());
+
+      switch (awsProperties.s3FileIoSseType().toLowerCase(Locale.ENGLISH)) {
+        case AwsProperties.S3FILEIO_SSE_TYPE_NONE:
+          break;
+
+        case AwsProperties.S3FILEIO_SSE_TYPE_KMS:
+          requestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
+          requestBuilder.ssekmsKeyId(awsProperties.s3FileIoSseKey());
+          break;
+
+        case AwsProperties.S3FILEIO_SSE_TYPE_S3:
+          requestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
+          break;
+
+        case AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM:
+          
requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name());
+          requestBuilder.sseCustomerKey(awsProperties.s3FileIoSseKey());
+          requestBuilder.sseCustomerKeyMD5(awsProperties.s3FileIoSseMd5());
+          break;
+
+        default:
+          throw new IllegalArgumentException(
+              "Cannot support given S3 encryption type: " + 
awsProperties.s3FileIoSseType());
+      }
+
+      s3.putObject(requestBuilder.build(), RequestBody.fromFile(stagingFile));
     } finally {
       if (!stagingFile.delete()) {
         LOG.warn("Could not delete temporary file: {}", stagingFile);
diff --git a/build.gradle b/build.gradle
index 0a8bc40..7df7c9f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -251,12 +251,32 @@ project(':iceberg-aws') {
 
     compile 'software.amazon.awssdk:url-connection-client'
     compile 'software.amazon.awssdk:s3'
+    compile 'software.amazon.awssdk:kms'
 
     testCompile("com.adobe.testing:s3mock-junit4") {
       exclude module: "spring-boot-starter-logging"
       exclude module: "logback-classic"
     }
   }
+
+  sourceSets {
+    integration {
+      java.srcDir "$projectDir/src/integration/java"
+      resources.srcDir "$projectDir/src/integration/resources"
+      compileClasspath += main.output + test.output
+      runtimeClasspath += main.output + test.output
+    }
+  }
+
+  configurations {
+    integrationImplementation.extendsFrom testImplementation
+    integrationRuntime.extendsFrom testRuntime
+  }
+
+  task integrationTest(type: Test) {
+    testClassesDirs = sourceSets.integration.output.classesDirs
+    classpath = sourceSets.integration.runtimeClasspath
+  }
 }
 
 project(':iceberg-flink') {

Reply via email to