This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 6f23d5c support Server Side Encryption for S3FileIO (#1754)
6f23d5c is described below
commit 6f23d5c7e9e36d59b1401e63fbf2e46aecf6ca90
Author: jackye1995 <[email protected]>
AuthorDate: Mon Nov 16 13:17:17 2020 -0800
support Server Side Encryption for S3FileIO (#1754)
---
.../org/apache/iceberg/aws/AwsIntegTestUtil.java | 58 ++++++
.../org/apache/iceberg/aws/s3/S3FileIOTest.java | 195 +++++++++++++++++++++
.../java/org/apache/iceberg/aws/AwsClientUtil.java | 7 +
.../java/org/apache/iceberg/aws/AwsProperties.java | 112 ++++++++++++
.../java/org/apache/iceberg/aws/s3/BaseS3File.java | 25 ++-
.../java/org/apache/iceberg/aws/s3/S3FileIO.java | 20 ++-
.../org/apache/iceberg/aws/s3/S3InputFile.java | 9 +-
.../org/apache/iceberg/aws/s3/S3InputStream.java | 22 ++-
.../org/apache/iceberg/aws/s3/S3OutputFile.java | 11 +-
.../org/apache/iceberg/aws/s3/S3OutputStream.java | 41 ++++-
build.gradle | 20 +++
11 files changed, 502 insertions(+), 18 deletions(-)
diff --git
a/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
new file mode 100644
index 0000000..a71804c
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/AwsIntegTestUtil.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws;
+
+import java.util.stream.Collectors;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.Delete;
+import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
+
+public class AwsIntegTestUtil {
+
+ private AwsIntegTestUtil() {
+ }
+
+ /**
+ * Set the environment variable AWS_TEST_BUCKET for a default bucket to use
for testing
+ * @return bucket name
+ */
+ public static String testBucketName() {
+ return System.getenv("AWS_TEST_BUCKET");
+ }
+
+ public static void cleanS3Bucket(S3Client s3, String bucketName, String
prefix) {
+ boolean hasContent = true;
+ while (hasContent) {
+ ListObjectsV2Response response =
s3.listObjectsV2(ListObjectsV2Request.builder()
+ .bucket(bucketName).prefix(prefix).build());
+ hasContent = response.hasContents();
+ if (hasContent) {
+
s3.deleteObjects(DeleteObjectsRequest.builder().bucket(bucketName).delete(Delete.builder().objects(
+ response.contents().stream()
+ .map(obj -> ObjectIdentifier.builder().key(obj.key()).build())
+ .collect(Collectors.toList())
+ ).build()).build());
+ }
+ }
+ }
+}
diff --git
a/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java
b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java
new file mode 100644
index 0000000..4b5ce03
--- /dev/null
+++ b/aws/src/integration/java/org/apache/iceberg/aws/s3/S3FileIOTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws.s3;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.SecureRandom;
+import java.util.Base64;
+import java.util.UUID;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+import org.apache.iceberg.aws.AwsClientUtil;
+import org.apache.iceberg.aws.AwsIntegTestUtil;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.awssdk.services.kms.model.ListAliasesRequest;
+import software.amazon.awssdk.services.kms.model.ListAliasesResponse;
+import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionRequest;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
+import software.amazon.awssdk.utils.IoUtils;
+
+public class S3FileIOTest {
+
+ private static S3Client s3;
+ private static KmsClient kms;
+ private static String bucketName;
+ private static String prefix;
+ private static byte[] contentBytes;
+ private static String content;
+ private static String kmsKeyArn;
+ private String objectKey;
+ private String objectUri;
+
+ @BeforeClass
+ public static void beforeClass() {
+ s3 = AwsClientUtil.defaultS3Client();
+ kms = AwsClientUtil.defaultKmsClient();
+ bucketName = AwsIntegTestUtil.testBucketName();
+ prefix = UUID.randomUUID().toString();
+ contentBytes = new byte[1024 * 1024 * 10];
+ content = new String(contentBytes, StandardCharsets.UTF_8);
+ kmsKeyArn = kms.createKey().keyMetadata().arn();
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ AwsIntegTestUtil.cleanS3Bucket(s3, bucketName, prefix);
+
kms.scheduleKeyDeletion(ScheduleKeyDeletionRequest.builder().keyId(kmsKeyArn).pendingWindowInDays(7).build());
+ }
+
+ @Before
+ public void before() {
+ objectKey = String.format("%s/%s", prefix, UUID.randomUUID().toString());
+ objectUri = String.format("s3://%s/%s", bucketName, objectKey);
+ }
+
+ @Test
+ public void testNewInputStream() throws Exception {
+
s3.putObject(PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(),
+ RequestBody.fromBytes(contentBytes));
+ S3FileIO s3FileIO = new S3FileIO();
+ validateRead(s3FileIO);
+ }
+
+ @Test
+ public void testNewOutputStream() throws Exception {
+ S3FileIO s3FileIO = new S3FileIO();
+ write(s3FileIO);
+ InputStream stream =
s3.getObject(GetObjectRequest.builder().bucket(bucketName).key(objectKey).build());
+ String result = IoUtils.toUtf8String(stream);
+ stream.close();
+ Assert.assertEquals(content, result);
+ }
+
+ @Test
+ public void testSSE_S3() throws Exception {
+ AwsProperties properties = new AwsProperties();
+ properties.setS3FileIoSseType(AwsProperties.S3FILEIO_SSE_TYPE_S3);
+ S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client,
properties);
+ write(s3FileIO);
+ validateRead(s3FileIO);
+ GetObjectResponse response = s3.getObject(
+
GetObjectRequest.builder().bucket(bucketName).key(objectKey).build()).response();
+ Assert.assertEquals(ServerSideEncryption.AES256,
response.serverSideEncryption());
+ }
+
+ @Test
+ public void testSSE_KMS() throws Exception {
+ AwsProperties properties = new AwsProperties();
+ properties.setS3FileIoSseType(AwsProperties.S3FILEIO_SSE_TYPE_KMS);
+ properties.setS3FileIoSseKey(kmsKeyArn);
+ S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client,
properties);
+ write(s3FileIO);
+ validateRead(s3FileIO);
+ GetObjectResponse response = s3.getObject(
+
GetObjectRequest.builder().bucket(bucketName).key(objectKey).build()).response();
+ Assert.assertEquals(ServerSideEncryption.AWS_KMS,
response.serverSideEncryption());
+ Assert.assertEquals(response.ssekmsKeyId(), kmsKeyArn);
+ }
+
+ @Test
+ public void testSSE_KMS_default() throws Exception {
+ AwsProperties properties = new AwsProperties();
+ properties.setS3FileIoSseType(AwsProperties.S3FILEIO_SSE_TYPE_KMS);
+ S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client,
properties);
+ write(s3FileIO);
+ validateRead(s3FileIO);
+ GetObjectResponse response = s3.getObject(
+
GetObjectRequest.builder().bucket(bucketName).key(objectKey).build()).response();
+ Assert.assertEquals(ServerSideEncryption.AWS_KMS,
response.serverSideEncryption());
+ ListAliasesResponse listAliasesResponse = kms.listAliases(
+ ListAliasesRequest.builder().keyId(response.ssekmsKeyId()).build());
+ Assert.assertTrue(listAliasesResponse.hasAliases());
+ Assert.assertEquals(1, listAliasesResponse.aliases().size());
+ Assert.assertEquals("alias/aws/s3",
listAliasesResponse.aliases().get(0).aliasName());
+ }
+
+ @Test
+ public void testSSE_Custom() throws Exception {
+ // generate key
+ KeyGenerator keyGenerator = KeyGenerator.getInstance("AES");
+ keyGenerator.init(256, new SecureRandom());
+ SecretKey secretKey = keyGenerator.generateKey();
+ Base64.Encoder encoder = Base64.getEncoder();
+ String encodedKey = new String(encoder.encode(secretKey.getEncoded()),
StandardCharsets.UTF_8);
+ // generate md5
+ MessageDigest digest = MessageDigest.getInstance("MD5");
+ String md5 = new
String(encoder.encode(digest.digest(secretKey.getEncoded())),
StandardCharsets.UTF_8);
+
+ AwsProperties properties = new AwsProperties();
+ properties.setS3FileIoSseType(AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM);
+ properties.setS3FileIoSseKey(encodedKey);
+ properties.setS3FileIoSseMd5(md5);
+ S3FileIO s3FileIO = new S3FileIO(AwsClientUtil::defaultS3Client,
properties);
+ write(s3FileIO);
+ validateRead(s3FileIO);
+ GetObjectResponse response = s3.getObject(
+ GetObjectRequest.builder().bucket(bucketName).key(objectKey)
+ .sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
+ .sseCustomerKey(encodedKey)
+ .sseCustomerKeyMD5(md5)
+ .build()).response();
+ Assert.assertNull(response.serverSideEncryption());
+ Assert.assertEquals(ServerSideEncryption.AES256.name(),
response.sseCustomerAlgorithm());
+ Assert.assertEquals(md5, response.sseCustomerKeyMD5());
+ }
+
+ private void write(S3FileIO s3FileIO) throws Exception {
+ OutputFile outputFile = s3FileIO.newOutputFile(objectUri);
+ OutputStream outputStream = outputFile.create();
+ IoUtils.copy(new ByteArrayInputStream(contentBytes), outputStream);
+ outputStream.close();
+ }
+
+ private void validateRead(S3FileIO s3FileIO) throws Exception {
+ InputFile file = s3FileIO.newInputFile(objectUri);
+ Assert.assertEquals(contentBytes.length, file.getLength());
+ InputStream stream = file.newStream();
+ String result = IoUtils.toUtf8String(stream);
+ stream.close();
+ Assert.assertEquals(content, result);
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
b/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
index aa2d1f3..e75b3f0 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsClientUtil.java
@@ -20,6 +20,7 @@
package org.apache.iceberg.aws;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.s3.S3Client;
/**
@@ -40,4 +41,10 @@ public class AwsClientUtil {
.httpClient(UrlConnectionHttpClient.create())
.build();
}
+
+ public static KmsClient defaultKmsClient() {
+ return KmsClient.builder()
+ .httpClient(UrlConnectionHttpClient.create())
+ .build();
+ }
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
new file mode 100644
index 0000000..b9c33df
--- /dev/null
+++ b/aws/src/main/java/org/apache/iceberg/aws/AwsProperties.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.aws;
+
+import java.util.Map;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class AwsProperties {
+
+ /**
+ * Type of S3 Server side encryption used, default to {@link
AwsProperties#S3FILEIO_SSE_TYPE_NONE}.
+ */
+ public static final String S3FILEIO_SSE_TYPE = "s3fileio.sse.type";
+
+ /**
+ * No server side encryption.
+ */
+ public static final String S3FILEIO_SSE_TYPE_NONE = "none";
+
+ /**
+ * S3 SSE-KMS encryption.
+ * For more details:
https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html
+ */
+ public static final String S3FILEIO_SSE_TYPE_KMS = "kms";
+
+ /**
+ * S3 SSE-S3 encryption.
+ * For more details:
https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html
+ */
+ public static final String S3FILEIO_SSE_TYPE_S3 = "s3";
+
+ /**
+ * S3 SSE-C encryption.
+ * For more details:
https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html
+ */
+ public static final String S3FILEIO_SSE_TYPE_CUSTOM = "custom";
+
+ /**
+ * If S3 encryption type is SSE-KMS, input is a KMS Key ID or ARN.
+ * In case this property is not set, default key "aws/s3" is used.
+ * If encryption type is SSE-C, input is a custom base-64 AES256 symmetric
key.
+ */
+ public static final String S3FILEIO_SSE_KEY = "s3fileio.sse.key";
+
+ /**
+ * If S3 encryption type is SSE-C, input is the base-64 MD5 digest of the
secret key.
+ * This MD5 must be explicitly passed in by the caller to ensure key
integrity.
+ */
+ public static final String S3FILEIO_SSE_MD5 = "s3fileio.sse.md5";
+
+ private String s3FileIoSseType;
+ private String s3FileIoSseKey;
+ private String s3FileIoSseMd5;
+
+ public AwsProperties() {
+ this.s3FileIoSseType = S3FILEIO_SSE_TYPE_NONE;
+ this.s3FileIoSseKey = null;
+ this.s3FileIoSseMd5 = null;
+ }
+
+ public AwsProperties(Map<String, String> properties) {
+ this.s3FileIoSseType = properties.getOrDefault(
+ AwsProperties.S3FILEIO_SSE_TYPE, AwsProperties.S3FILEIO_SSE_TYPE_NONE);
+ this.s3FileIoSseKey = properties.get(AwsProperties.S3FILEIO_SSE_KEY);
+ this.s3FileIoSseMd5 = properties.get(AwsProperties.S3FILEIO_SSE_MD5);
+ if (AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM.equals(s3FileIoSseType)) {
+ Preconditions.checkNotNull(s3FileIoSseKey, "Cannot initialize SSE-C
S3FileIO with null encryption key");
+ Preconditions.checkNotNull(s3FileIoSseMd5, "Cannot initialize SSE-C
S3FileIO with null encryption key MD5");
+ }
+ }
+
+ public String s3FileIoSseType() {
+ return s3FileIoSseType;
+ }
+
+ public void setS3FileIoSseType(String sseType) {
+ this.s3FileIoSseType = sseType;
+ }
+
+ public String s3FileIoSseKey() {
+ return s3FileIoSseKey;
+ }
+
+ public void setS3FileIoSseKey(String sseKey) {
+ this.s3FileIoSseKey = sseKey;
+ }
+
+ public String s3FileIoSseMd5() {
+ return s3FileIoSseMd5;
+ }
+
+ public void setS3FileIoSseMd5(String sseMd5) {
+ this.s3FileIoSseMd5 = sseMd5;
+ }
+}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java
index 7933573..d907106 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/BaseS3File.java
@@ -19,20 +19,28 @@
package org.apache.iceberg.aws.s3;
+import org.apache.iceberg.aws.AwsProperties;
import software.amazon.awssdk.http.HttpStatusCode;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
abstract class BaseS3File {
private final S3Client client;
private final S3URI uri;
+ private final AwsProperties awsProperties;
private HeadObjectResponse metadata;
BaseS3File(S3Client client, S3URI uri) {
+ this(client, uri, new AwsProperties());
+ }
+
+ BaseS3File(S3Client client, S3URI uri, AwsProperties awsProperties) {
this.client = client;
this.uri = uri;
+ this.awsProperties = awsProperties;
}
public String location() {
@@ -47,6 +55,10 @@ abstract class BaseS3File {
return uri;
}
+ public AwsProperties awsProperties() {
+ return awsProperties;
+ }
+
/**
* Note: this may be stale if file was deleted since metadata is cached for
size/existence checks.
*
@@ -66,10 +78,17 @@ abstract class BaseS3File {
protected HeadObjectResponse getObjectMetadata() throws S3Exception {
if (metadata == null) {
- metadata = client().headObject(HeadObjectRequest.builder()
+ HeadObjectRequest.Builder requestBuilder = HeadObjectRequest.builder()
.bucket(uri().bucket())
- .key(uri().key())
- .build());
+ .key(uri().key());
+
+ if
(AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM.equals(awsProperties.s3FileIoSseType()))
{
+
requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name());
+ requestBuilder.sseCustomerKey(awsProperties.s3FileIoSseKey());
+ requestBuilder.sseCustomerKeyMD5(awsProperties.s3FileIoSseMd5());
+ }
+
+ metadata = client().headObject(requestBuilder.build());
}
return metadata;
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
index 1f69ec5..443c58f 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java
@@ -19,7 +19,9 @@
package org.apache.iceberg.aws.s3;
+import java.util.Map;
import org.apache.iceberg.aws.AwsClientUtil;
+import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
@@ -36,24 +38,31 @@ import
software.amazon.awssdk.services.s3.model.ObjectIdentifier;
*/
public class S3FileIO implements FileIO {
private final SerializableSupplier<S3Client> s3;
+ private AwsProperties awsProperties;
+
private transient S3Client client;
public S3FileIO() {
- this.s3 = AwsClientUtil::defaultS3Client;
+ this(AwsClientUtil::defaultS3Client);
}
public S3FileIO(SerializableSupplier<S3Client> s3) {
+ this(s3, new AwsProperties());
+ }
+
+ public S3FileIO(SerializableSupplier<S3Client> s3, AwsProperties
awsProperties) {
this.s3 = s3;
+ this.awsProperties = awsProperties;
}
@Override
public InputFile newInputFile(String path) {
- return new S3InputFile(client(), new S3URI(path));
+ return new S3InputFile(client(), new S3URI(path), awsProperties);
}
@Override
public OutputFile newOutputFile(String path) {
- return new S3OutputFile(client(), new S3URI(path));
+ return new S3OutputFile(client(), new S3URI(path), awsProperties);
}
@Override
@@ -73,4 +82,9 @@ public class S3FileIO implements FileIO {
}
return client;
}
+
+ @Override
+ public void initialize(Map<String, String> properties) {
+ this.awsProperties = new AwsProperties(properties);
+ }
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
index 4450161..93d86c3 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputFile.java
@@ -19,13 +19,18 @@
package org.apache.iceberg.aws.s3;
+import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;
import software.amazon.awssdk.services.s3.S3Client;
public class S3InputFile extends BaseS3File implements InputFile {
public S3InputFile(S3Client client, S3URI uri) {
- super(client, uri);
+ this(client, uri, new AwsProperties());
+ }
+
+ public S3InputFile(S3Client client, S3URI uri, AwsProperties awsProperties) {
+ super(client, uri, awsProperties);
}
/**
@@ -40,7 +45,7 @@ public class S3InputFile extends BaseS3File implements
InputFile {
@Override
public SeekableInputStream newStream() {
- return new S3InputStream(client(), uri());
+ return new S3InputStream(client(), uri(), awsProperties());
}
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
index f506067..2d3962b 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java
@@ -22,6 +22,7 @@ package org.apache.iceberg.aws.s3;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
+import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -31,6 +32,7 @@ import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
class S3InputStream extends SeekableInputStream {
private static final Logger LOG =
LoggerFactory.getLogger(S3InputStream.class);
@@ -38,6 +40,7 @@ class S3InputStream extends SeekableInputStream {
private final StackTraceElement[] createStack;
private final S3Client s3;
private final S3URI location;
+ private final AwsProperties awsProperties;
private InputStream stream;
private long pos = 0;
@@ -47,8 +50,13 @@ class S3InputStream extends SeekableInputStream {
private int skipSize = 1024 * 1024;
S3InputStream(S3Client s3, S3URI location) {
+ this(s3, location, new AwsProperties());
+ }
+
+ S3InputStream(S3Client s3, S3URI location, AwsProperties awsProperties) {
this.s3 = s3;
this.location = location;
+ this.awsProperties = awsProperties;
createStack = Thread.currentThread().getStackTrace();
}
@@ -126,13 +134,19 @@ class S3InputStream extends SeekableInputStream {
}
private void openStream() throws IOException {
- GetObjectRequest request = GetObjectRequest.builder()
+ GetObjectRequest.Builder requestBuilder = GetObjectRequest.builder()
.bucket(location.bucket())
.key(location.key())
- .range(String.format("bytes=%s-", pos))
- .build();
+ .range(String.format("bytes=%s-", pos));
+
+ if
(AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM.equals(awsProperties.s3FileIoSseType()))
{
+ requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name());
+ requestBuilder.sseCustomerKey(awsProperties.s3FileIoSseKey());
+ requestBuilder.sseCustomerKeyMD5(awsProperties.s3FileIoSseMd5());
+ }
+
closeStream();
- stream = s3.getObject(request, ResponseTransformer.toInputStream());
+ stream = s3.getObject(requestBuilder.build(),
ResponseTransformer.toInputStream());
}
private void closeStream() throws IOException {
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
index 0996ea0..44a745a 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputFile.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.aws.s3;
import java.io.IOException;
import java.io.UncheckedIOException;
+import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
@@ -29,7 +30,11 @@ import software.amazon.awssdk.services.s3.S3Client;
public class S3OutputFile extends BaseS3File implements OutputFile {
public S3OutputFile(S3Client client, S3URI uri) {
- super(client, uri);
+ this(client, uri, new AwsProperties());
+ }
+
+ public S3OutputFile(S3Client client, S3URI uri, AwsProperties awsProperties)
{
+ super(client, uri, awsProperties);
}
/**
@@ -50,7 +55,7 @@ public class S3OutputFile extends BaseS3File implements
OutputFile {
@Override
public PositionOutputStream createOrOverwrite() {
try {
- return new S3OutputStream(client(), uri());
+ return new S3OutputStream(client(), uri(), awsProperties());
} catch (IOException e) {
throw new UncheckedIOException("Filed to create output stream for
location: " + uri(), e);
}
@@ -58,6 +63,6 @@ public class S3OutputFile extends BaseS3File implements
OutputFile {
@Override
public InputFile toInputFile() {
- return new S3InputFile(client(), uri());
+ return new S3InputFile(client(), uri(), awsProperties());
}
}
diff --git a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
index 2ae94a2..42b9608 100644
--- a/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
+++ b/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java
@@ -25,6 +25,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
+import java.util.Locale;
+import org.apache.iceberg.aws.AwsProperties;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.slf4j.Logger;
@@ -32,6 +34,7 @@ import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
class S3OutputStream extends PositionOutputStream {
private static final Logger LOG =
LoggerFactory.getLogger(S3OutputStream.class);
@@ -39,6 +42,7 @@ class S3OutputStream extends PositionOutputStream {
private final StackTraceElement[] createStack;
private final S3Client s3;
private final S3URI location;
+ private final AwsProperties awsProperties;
private final OutputStream stream;
private final File stagingFile;
@@ -47,8 +51,13 @@ class S3OutputStream extends PositionOutputStream {
private boolean closed = false;
S3OutputStream(S3Client s3, S3URI location) throws IOException {
+ this(s3, location, new AwsProperties());
+ }
+
+ S3OutputStream(S3Client s3, S3URI location, AwsProperties awsProperties)
throws IOException {
this.s3 = s3;
this.location = location;
+ this.awsProperties = awsProperties;
createStack = Thread.currentThread().getStackTrace();
stagingFile = File.createTempFile("s3fileio-", ".tmp");
@@ -91,9 +100,35 @@ class S3OutputStream extends PositionOutputStream {
try {
stream.close();
- s3.putObject(
-
PutObjectRequest.builder().bucket(location.bucket()).key(location.key()).build(),
- RequestBody.fromFile(stagingFile));
+ PutObjectRequest.Builder requestBuilder = PutObjectRequest.builder()
+ .bucket(location.bucket())
+ .key(location.key());
+
+ switch (awsProperties.s3FileIoSseType().toLowerCase(Locale.ENGLISH)) {
+ case AwsProperties.S3FILEIO_SSE_TYPE_NONE:
+ break;
+
+ case AwsProperties.S3FILEIO_SSE_TYPE_KMS:
+ requestBuilder.serverSideEncryption(ServerSideEncryption.AWS_KMS);
+ requestBuilder.ssekmsKeyId(awsProperties.s3FileIoSseKey());
+ break;
+
+ case AwsProperties.S3FILEIO_SSE_TYPE_S3:
+ requestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
+ break;
+
+ case AwsProperties.S3FILEIO_SSE_TYPE_CUSTOM:
+
requestBuilder.sseCustomerAlgorithm(ServerSideEncryption.AES256.name());
+ requestBuilder.sseCustomerKey(awsProperties.s3FileIoSseKey());
+ requestBuilder.sseCustomerKeyMD5(awsProperties.s3FileIoSseMd5());
+ break;
+
+ default:
+ throw new IllegalArgumentException(
+ "Cannot support given S3 encryption type: " +
awsProperties.s3FileIoSseType());
+ }
+
+ s3.putObject(requestBuilder.build(), RequestBody.fromFile(stagingFile));
} finally {
if (!stagingFile.delete()) {
LOG.warn("Could not delete temporary file: {}", stagingFile);
diff --git a/build.gradle b/build.gradle
index 0a8bc40..7df7c9f 100644
--- a/build.gradle
+++ b/build.gradle
@@ -251,12 +251,32 @@ project(':iceberg-aws') {
compile 'software.amazon.awssdk:url-connection-client'
compile 'software.amazon.awssdk:s3'
+ compile 'software.amazon.awssdk:kms'
testCompile("com.adobe.testing:s3mock-junit4") {
exclude module: "spring-boot-starter-logging"
exclude module: "logback-classic"
}
}
+
+ sourceSets {
+ integration {
+ java.srcDir "$projectDir/src/integration/java"
+ resources.srcDir "$projectDir/src/integration/resources"
+ compileClasspath += main.output + test.output
+ runtimeClasspath += main.output + test.output
+ }
+ }
+
+ configurations {
+ integrationImplementation.extendsFrom testImplementation
+ integrationRuntime.extendsFrom testRuntime
+ }
+
+ task integrationTest(type: Test) {
+ testClassesDirs = sourceSets.integration.output.classesDirs
+ classpath = sourceSets.integration.runtimeClasspath
+ }
}
project(':iceberg-flink') {