steveloughran commented on code in PR #6884:
URL: https://github.com/apache/hadoop/pull/6884#discussion_r1668510834
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java:
##########
@@ -83,6 +83,11 @@ public interface AWSHeaders {
*/
String CRYPTO_CEK_ALGORITHM = "x-amz-cek-alg";
+ /**
+ * Header for unencrypted content length of an object.
Review Comment:
add a {@value} for better IDE displays
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java:
##########
@@ -61,6 +61,8 @@ public class ClientManagerImpl implements ClientManager {
*/
private final S3ClientFactory clientFactory;
+ private final S3ClientFactory unencryptedClientFactory;
Review Comment:
javadoc
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java:
##########
@@ -153,6 +168,12 @@ public synchronized S3AsyncClient getOrCreateAsyncClient()
throws IOException {
return s3AsyncClient.eval();
}
+ @Override
Review Comment:
javadoc
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java:
##########
@@ -84,25 +86,31 @@ public class ClientManagerImpl implements ClientManager {
/** Async client is used for transfer manager. */
private final LazyAutoCloseableReference<S3AsyncClient> s3AsyncClient;
+ private final LazyAutoCloseableReference<S3Client> unencryptedS3Client;
Review Comment:
javadoc
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import io.netty.util.internal.StringUtil;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
[email protected]
[email protected]
+public final class CSEUtils {
+
+ private CSEUtils() {
+ }
+
+ /**
+ * Checks if the file suffix ends with
+ * {@link
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX}
+ * when the config
+ * {@link
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE_DEFAULT}
+ * is enabled and CSE is used.
+ * @param skipCSEInstructionFile whether to skip checking for the filename
suffix
+ * @param key file name
+ * @return true if cse is disabled or if skipping of instruction file is
disabled or file name
+ * does not end with defined suffix
+ */
+ public static boolean isCSEInstructionFile(boolean skipCSEInstructionFile,
String key) {
+ if (!skipCSEInstructionFile) {
+ return true;
+ }
+ return !key.endsWith(S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+ }
+
+ /**
+ * Checks if CSE-KMS or CSE-CUSTOM is set.
+ * @param encryptionMethod type of encryption used
+ * @return true if encryption method is CSE-KMS or CSE-CUSTOM
+ */
+ public static boolean isCSEKmsOrCustom(String encryptionMethod) {
+ return S3AEncryptionMethods.CSE_KMS.getMethod().equals(encryptionMethod) ||
+ S3AEncryptionMethods.CSE_CUSTOM.getMethod().equals(encryptionMethod);
+ }
+
+ /**
+ * Checks if a given S3 object is encrypted or not by checking following two
conditions
+ * 1. if object metadata contains x-amz-cek-alg
+ * 2. if instruction file is present
+ *
+ * @param s3Client S3 client
+ * @param bucket bucket name of the s3 object
+ * @param key key value of the s3 object
+ * @return true if S3 object is encrypted
+ */
+ public static boolean isObjectEncrypted(S3Client s3Client, String bucket,
String key) {
+ HeadObjectRequest request = HeadObjectRequest.builder()
+ .bucket(bucket)
+ .key(key)
+ .build();
+ HeadObjectResponse headObjectResponse = s3Client.headObject(request);
+ if (headObjectResponse.hasMetadata() &&
+ headObjectResponse.metadata().get(CRYPTO_CEK_ALGORITHM) != null) {
+ return true;
+ }
+ HeadObjectRequest instructionFileCheckRequest = HeadObjectRequest.builder()
+ .bucket(bucket)
+ .key(key + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX)
+ .build();
+ try {
+ s3Client.headObject(instructionFileCheckRequest);
+ return true;
+ } catch (NoSuchKeyException e) {
+ // Ignore. This indicates no instruction file is present
+ }
+ return false;
+ }
+
+ /**
+ * Get the unencrypted object length by either subtracting
+ * {@link InternalConstants#CSE_PADDING_LENGTH} from the object size or
calculating the
+ * actual size by doing S3 ranged GET operation.
+ *
+ * @param s3Client S3 client
+ * @param bucket bucket name of the s3 object
+ * @param key key value of the s3 object
+ * @param contentLength S3 object length
+ * @param headObjectResponse response from headObject call
+ * @param cseRangedGetEnabled is ranged get enabled
+ * @param cseReadUnencryptedObjects is reading of une
+ * @return unencrypted length of the object
+ * @throws IOException IO failures
+ */
+ public static long getUnencryptedObjectLength(S3Client s3Client,
+ String bucket,
+ String key,
+ long contentLength,
+ HeadObjectResponse headObjectResponse,
+ boolean cseRangedGetEnabled,
+ boolean cseReadUnencryptedObjects) throws IOException {
+
+ if (cseReadUnencryptedObjects) {
+ // if object is unencrypted, return the actual size
+ if (!isObjectEncrypted(s3Client, bucket, key)) {
+ return contentLength;
+ }
+ }
+
+ // check if unencrypted content length metadata is present or not.
+ if (headObjectResponse != null) {
+ String plaintextLength =
headObjectResponse.metadata().get(UNENCRYPTED_CONTENT_LENGTH);
+ if (headObjectResponse.hasMetadata() &&
!StringUtil.isNullOrEmpty(plaintextLength)) {
+ return Long.parseLong(plaintextLength);
+ }
+ }
+
+ if (cseRangedGetEnabled) {
+ // identify the unencrypted length by doing a ranged GET operation.
+ if (contentLength >= CSE_PADDING_LENGTH) {
+ long minPlaintextLength = contentLength - CSE_PADDING_LENGTH;
+ if (minPlaintextLength < 0) {
+ minPlaintextLength = 0;
+ }
+ GetObjectRequest getObjectRequest = GetObjectRequest.builder()
+ .bucket(bucket)
+ .key(key)
+ .range(formatRange(minPlaintextLength, contentLength))
+ .build();
+ InputStream inputStream = s3Client.getObject(getObjectRequest);
Review Comment:
error handling here?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java:
##########
@@ -325,19 +325,6 @@ private Map<String, byte[]> retrieveHeaders(
md.contentEncoding());
maybeSetHeader(headers, XA_CONTENT_LANGUAGE,
md.contentLanguage());
- // If CSE is enabled, use the unencrypted content length.
- // TODO: CSE is not supported yet, add these headers in during CSE work.
-// if (md.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null
Review Comment:
should this be uncommented?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+ private static final String ENCRYPTION_CLIENT_CLASSNAME =
+ "software.amazon.encryption.s3.S3EncryptionClient";
+
+ /**
+ * Encryption client availability.
+ */
+ private static final boolean ENCRYPTION_CLIENT_FOUND =
checkForEncryptionClient();
Review Comment:
make this a LazyAtomicReference<Boolean> with the checkForEncryption method
passed in for on-demand evel. otherwise every single debug log is going to
include messages, possibly including stack traces
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+ private static final String ENCRYPTION_CLIENT_CLASSNAME =
+ "software.amazon.encryption.s3.S3EncryptionClient";
+
+ /**
+ * Encryption client availability.
+ */
+ private static final boolean ENCRYPTION_CLIENT_FOUND =
checkForEncryptionClient();
+
+ /**
+ * S3Client to be wrapped by encryption client.
+ */
+ private S3Client s3Client;
+
+ /**
+ * S3AsyncClient to be wrapped by encryption client.
+ */
+ private S3AsyncClient s3AsyncClient;
+
+ private static boolean checkForEncryptionClient() {
+ try {
+ ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+ cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+ LOG.debug("encryption client class {} found",
ENCRYPTION_CLIENT_CLASSNAME);
+ return true;
+ } catch (Exception e) {
+ LOG.debug("encryption client class {} not found",
ENCRYPTION_CLIENT_CLASSNAME, e);
+ return false;
+ }
+ }
+
+ /**
+ * Is the Encryption client available?
+ * @return true if it was found in the classloader
+ */
+ private static synchronized boolean isEncryptionClientAvailable() {
+ return ENCRYPTION_CLIENT_FOUND;
+ }
+
+ /**
+ * Create encrypted s3 client.
+ * @param uri S3A file system URI
+ * @param parameters parameter object
+ * @return encrypted s3 client
+ * @throws IOException IO failures
+ */
+ @Override
+ public S3Client createS3Client(URI uri, S3ClientCreationParameters
parameters)
+ throws IOException {
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+ "No encryption client available");
+ }
+
+ s3Client = super.createS3Client(uri, parameters);
+ s3AsyncClient = super.createS3AsyncClient(uri, parameters);
+
+ return
createS3EncryptionClient(parameters.getClientSideEncryptionMaterials());
+ }
+
+ /**
+ * Create async encrypted s3 client.
+ * @param uri S3A file system URI
+ * @param parameters parameter object
+ * @return async encrypted s3 client
+ * @throws IOException IO failures
+ */
+ @Override
+ public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters
parameters)
+ throws IOException {
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+ "No encryption client available");
+ }
+ return
createS3AsyncEncryptionClient(parameters.getClientSideEncryptionMaterials());
+ }
+
+ private S3Client createS3EncryptionClient(final CSEMaterials cseMaterials) {
+ S3EncryptionClient.Builder s3EncryptionClientBuilder =
+
S3EncryptionClient.builder().wrappedAsyncClient(s3AsyncClient).wrappedClient(s3Client)
+ // this is required for doing S3 ranged GET calls
+ .enableLegacyUnauthenticatedModes(true)
+ // this is required for backward compatibility with older
encryption clients
+ .enableLegacyWrappingAlgorithms(true);
+
+ switch (cseMaterials.getCseKeyType()) {
+ case KMS:
+ s3EncryptionClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
+ break;
+ case CUSTOM:
+ Keyring keyring =
getKeyringProvider(cseMaterials.getCustomKeyringClassName(),
+ cseMaterials.getConf());
+ CryptographicMaterialsManager cmm =
DefaultCryptoMaterialsManager.builder()
+ .keyring(keyring)
+ .build();
+ s3EncryptionClientBuilder.cryptoMaterialsManager(cmm);
+ break;
+ default:
+ break;
+ }
+
+ return s3EncryptionClientBuilder.build();
+ }
+
+ private S3AsyncClient createS3AsyncEncryptionClient(final CSEMaterials
cseMaterials) {
Review Comment:
add a precondition check for s3AsyncClient non null.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java:
##########
@@ -580,14 +581,18 @@ public SinglePendingCommit uploadFileToPendingCommit(File
localFile,
for (int partNumber = 1; partNumber <= numParts; partNumber += 1) {
progress.progress();
long size = Math.min(length - offset, uploadPartSize);
- UploadPartRequest part = writeOperations.newUploadPartRequestBuilder(
+ UploadPartRequest.Builder partBuilder =
writeOperations.newUploadPartRequestBuilder(
destKey,
uploadId,
partNumber,
- size).build();
+ size);
+ if (partNumber == numParts) {
Review Comment:
make isLastPart a parameter to the newUploadRequestBuilder method, set it in
all places it is needed
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -768,16 +787,29 @@ public boolean accept(FileStatus status) {
*/
static class AcceptAllButS3nDirs implements FileStatusAcceptor {
Review Comment:
again, new acceptor
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1109,6 +1136,44 @@ private ClientManager createClientManager(URI fsURI,
boolean dtEnabled) throws I
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
S3ClientFactory.class);
+ S3ClientFactory clientFactory;
+ S3ClientFactory unecnryptedClientFactory = null;
+ CSEMaterials cseMaterials = null;
+
+ if (isCSEEnabled) {
+ S3AEncryptionMethods algorithm = getS3EncryptionAlgorithm();
+ switch (algorithm) {
+ case CSE_KMS:
+ String kmsKeyId = getS3EncryptionKey(bucket, conf, true);
+ Preconditions.checkArgument(kmsKeyId != null && !kmsKeyId.isEmpty(),
+ "KMS keyId cannot be null or empty");
+ cseMaterials = new CSEMaterials()
+ .withCSEKeyType(CSEMaterials.CSEKeyType.KMS)
+ .withConf(conf)
+ .withKmsKeyId(kmsKeyId);
+ break;
+ case CSE_CUSTOM:
+ String customCryptoClassName =
conf.get(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME);
Review Comment:
getTrimmed()
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1109,6 +1136,44 @@ private ClientManager createClientManager(URI fsURI,
boolean dtEnabled) throws I
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
S3ClientFactory.class);
+ S3ClientFactory clientFactory;
+ S3ClientFactory unecnryptedClientFactory = null;
+ CSEMaterials cseMaterials = null;
+
+ if (isCSEEnabled) {
+ S3AEncryptionMethods algorithm = getS3EncryptionAlgorithm();
+ switch (algorithm) {
+ case CSE_KMS:
+ String kmsKeyId = getS3EncryptionKey(bucket, conf, true);
+ Preconditions.checkArgument(kmsKeyId != null && !kmsKeyId.isEmpty(),
+ "KMS keyId cannot be null or empty");
+ cseMaterials = new CSEMaterials()
+ .withCSEKeyType(CSEMaterials.CSEKeyType.KMS)
+ .withConf(conf)
+ .withKmsKeyId(kmsKeyId);
+ break;
+ case CSE_CUSTOM:
+ String customCryptoClassName =
conf.get(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME);
+ Preconditions.checkArgument(customCryptoClassName != null &&
+ !customCryptoClassName.isEmpty(),
+ "CSE custom cryptographic class name cannot be null or empty");
+ cseMaterials = new CSEMaterials()
+ .withCSEKeyType(CSEMaterials.CSEKeyType.CUSTOM)
+ .withConf(conf)
+ .withCustomCryptographicClassName(customCryptoClassName);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid client side encryption
algorithm."
+ + " Only CSE-KMS and CSE-CUSTOM is supported");
+ }
+ clientFactory =
ReflectionUtils.newInstance(EncryptionS3ClientFactory.class, conf);
+ // This just creates a factory class. Unencrypted client will only be
created when the
+ // config is enabled and when it is actually required.
+ unecnryptedClientFactory =
ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
Review Comment:
typo
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1109,6 +1136,44 @@ private ClientManager createClientManager(URI fsURI,
boolean dtEnabled) throws I
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
S3ClientFactory.class);
+ S3ClientFactory clientFactory;
+ S3ClientFactory unecnryptedClientFactory = null;
+ CSEMaterials cseMaterials = null;
+
+ if (isCSEEnabled) {
+ S3AEncryptionMethods algorithm = getS3EncryptionAlgorithm();
+ switch (algorithm) {
+ case CSE_KMS:
+ String kmsKeyId = getS3EncryptionKey(bucket, conf, true);
+ Preconditions.checkArgument(kmsKeyId != null && !kmsKeyId.isEmpty(),
+ "KMS keyId cannot be null or empty");
+ cseMaterials = new CSEMaterials()
+ .withCSEKeyType(CSEMaterials.CSEKeyType.KMS)
+ .withConf(conf)
+ .withKmsKeyId(kmsKeyId);
+ break;
+ case CSE_CUSTOM:
+ String customCryptoClassName =
conf.get(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME);
+ Preconditions.checkArgument(customCryptoClassName != null &&
+ !customCryptoClassName.isEmpty(),
+ "CSE custom cryptographic class name cannot be null or empty");
+ cseMaterials = new CSEMaterials()
+ .withCSEKeyType(CSEMaterials.CSEKeyType.CUSTOM)
+ .withConf(conf)
+ .withCustomCryptographicClassName(customCryptoClassName);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid client side encryption
algorithm."
+ + " Only CSE-KMS and CSE-CUSTOM is supported");
Review Comment:
"are supported"
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import io.netty.util.internal.StringUtil;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
[email protected]
[email protected]
+public final class CSEUtils {
Review Comment:
its too late to move S3AEncryptionMethods (and we should also declare that
public evolviing/unstable, but should we add a .encryption package under .impl?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws
Exception {
}
}
+ /**
+ * Testing how file name with suffix ".instruction" is ignored when CSE is
enabled
+ * based on configurations.
+ * @throws IOException
+ */
+ @Test
+ public void testEncryptionWithInstructionFile() throws IOException {
+ maybeSkipTest();
+ S3AFileSystem cseDisabledFS = new S3AFileSystem();
+ Configuration cseDisabledConf = getConfiguration();
+ S3AFileSystem cseEnabledFS = getFileSystem();
+ Path unEncryptedFilePath = path(getMethodName());
+ Path unEncryptedFile = new Path(unEncryptedFilePath,
+ "file" + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+
+ // Initialize a CSE disabled FS.
+ removeBaseAndBucketOverrides(getTestBucketName(cseDisabledConf),
+ cseDisabledConf,
+ S3_ENCRYPTION_ALGORITHM,
+ S3_ENCRYPTION_KEY,
+ SERVER_SIDE_ENCRYPTION_ALGORITHM,
+ SERVER_SIDE_ENCRYPTION_KEY);
+ cseDisabledFS.initialize(getFileSystem().getUri(),
+ cseDisabledConf);
+
+ // Unencrypted data written to a path.
+ try (FSDataOutputStream out = cseDisabledFS.create(unEncryptedFile)) {
+ out.write(new byte[SMALL_FILE_SIZE]);
+ }
+
+ // list from cse disabled FS
+ assertEquals("number of files didn't match", 1,
Review Comment:
use assertJ assert with an explict hasLength() condition
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEMaterials.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This class is for storing information about key type and corresponding key
+ * to be used for client side encryption.
+ */
+public class CSEMaterials {
+ /**
+ * Enum for CSE key types.
+ */
+ public enum CSEKeyType {
+ KMS,
Review Comment:
javadoc
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import io.netty.util.internal.StringUtil;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
[email protected]
[email protected]
+public final class CSEUtils {
+
+ private CSEUtils() {
+ }
+
+ /**
+ * Checks if the file suffix ends with
+ * {@link
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX}
+ * when the config
+ * {@link
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE_DEFAULT}
+ * is enabled and CSE is used.
+ * @param skipCSEInstructionFile whether to skip checking for the filename
suffix
+ * @param key file name
+ * @return true if cse is disabled or if skipping of instruction file is
disabled or file name
+ * does not end with defined suffix
+ */
+ public static boolean isCSEInstructionFile(boolean skipCSEInstructionFile,
String key) {
+ if (!skipCSEInstructionFile) {
+ return true;
+ }
+ return !key.endsWith(S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+ }
+
+ /**
+ * Checks if CSE-KMS or CSE-CUSTOM is set.
+ * @param encryptionMethod type of encryption used
+ * @return true if encryption method is CSE-KMS or CSE-CUSTOM
+ */
+ public static boolean isCSEKmsOrCustom(String encryptionMethod) {
+ return S3AEncryptionMethods.CSE_KMS.getMethod().equals(encryptionMethod) ||
+ S3AEncryptionMethods.CSE_CUSTOM.getMethod().equals(encryptionMethod);
+ }
+
+ /**
+ * Checks if a given S3 object is encrypted or not by checking following two
conditions
+ * 1. if object metadata contains x-amz-cek-alg
+ * 2. if instruction file is present
+ *
+ * @param s3Client S3 client
+ * @param bucket bucket name of the s3 object
+ * @param key key value of the s3 object
+ * @return true if S3 object is encrypted
+ */
+ public static boolean isObjectEncrypted(S3Client s3Client, String bucket,
String key) {
+ HeadObjectRequest request = HeadObjectRequest.builder()
+ .bucket(bucket)
+ .key(key)
+ .build();
+ HeadObjectResponse headObjectResponse = s3Client.headObject(request);
+ if (headObjectResponse.hasMetadata() &&
+ headObjectResponse.metadata().get(CRYPTO_CEK_ALGORITHM) != null) {
+ return true;
+ }
+ HeadObjectRequest instructionFileCheckRequest = HeadObjectRequest.builder()
+ .bucket(bucket)
+ .key(key + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX)
+ .build();
+ try {
+ s3Client.headObject(instructionFileCheckRequest);
+ return true;
+ } catch (NoSuchKeyException e) {
+ // Ignore. This indicates no instruction file is present
+ }
+ return false;
+ }
+
+ /**
+ * Get the unencrypted object length by either subtracting
+ * {@link InternalConstants#CSE_PADDING_LENGTH} from the object size or
calculating the
+ * actual size by doing S3 ranged GET operation.
+ *
+ * @param s3Client S3 client
+ * @param bucket bucket name of the s3 object
+ * @param key key value of the s3 object
+ * @param contentLength S3 object length
+ * @param headObjectResponse response from headObject call
+ * @param cseRangedGetEnabled is ranged get enabled
+ * @param cseReadUnencryptedObjects is reading of une
+ * @return unencrypted length of the object
+ * @throws IOException IO failures
+ */
+ public static long getUnencryptedObjectLength(S3Client s3Client,
Review Comment:
this is going to have to be pulled out into listing callbacks, executed
within the same auditspan and updating statistics. if a client is doing *any*
IO we need to know who and why (server side) and how long (client)
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java:
##########
@@ -736,6 +736,61 @@ private Constants() {
public static final String S3_ENCRYPTION_KEY =
"fs.s3a.encryption.key";
+ /**
+ * Client side encryption (CSE-CUSTOM) with custom cryptographic material
manager class name.
Review Comment:
add {@value} tags on all of these
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import io.netty.util.internal.StringUtil;
+import org.apache.hadoop.classification.InterfaceAudience;
Review Comment:
ordering
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws
Exception {
}
}
+ /**
+ * Testing how file name with suffix ".instruction" is ignored when CSE is
enabled
+ * based on configurations.
+ * @throws IOException
+ */
+ @Test
+ public void testEncryptionWithInstructionFile() throws IOException {
+ maybeSkipTest();
+ S3AFileSystem cseDisabledFS = new S3AFileSystem();
Review Comment:
1. use createTestFileSystem()
2. needs to be cleaned up in finally
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws
Exception {
}
}
+ /**
+ * Testing how file name with suffix ".instruction" is ignored when CSE is
enabled
+ * based on configurations.
+ * @throws IOException
+ */
+ @Test
+ public void testEncryptionWithInstructionFile() throws IOException {
+ maybeSkipTest();
+ S3AFileSystem cseDisabledFS = new S3AFileSystem();
+ Configuration cseDisabledConf = getConfiguration();
+ S3AFileSystem cseEnabledFS = getFileSystem();
+ Path unEncryptedFilePath = path(getMethodName());
+ Path unEncryptedFile = new Path(unEncryptedFilePath,
+ "file" + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+
+ // Initialize a CSE disabled FS.
+ removeBaseAndBucketOverrides(getTestBucketName(cseDisabledConf),
+ cseDisabledConf,
+ S3_ENCRYPTION_ALGORITHM,
+ S3_ENCRYPTION_KEY,
+ SERVER_SIDE_ENCRYPTION_ALGORITHM,
+ SERVER_SIDE_ENCRYPTION_KEY);
+ cseDisabledFS.initialize(getFileSystem().getUri(),
+ cseDisabledConf);
+
+ // Unencrypted data written to a path.
+ try (FSDataOutputStream out = cseDisabledFS.create(unEncryptedFile)) {
+ out.write(new byte[SMALL_FILE_SIZE]);
+ }
+
+ // list from cse disabled FS
+ assertEquals("number of files didn't match", 1,
+ cseDisabledFS.listStatus(unEncryptedFilePath).length);
+
+ // list from cse enabled fs with skipping of instruction file
+ cseEnabledFS.getConf().setBoolean(S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE,
true);
+ cseEnabledFS.initialize(getFileSystem().getUri(), cseEnabledFS.getConf());
+ assertEquals("number of files didn't match", 0,
+ cseEnabledFS.listStatus(unEncryptedFilePath).length);
+
+ // disable skipping cse instruction file.
+ cseEnabledFS.getConf().setBoolean(S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE,
false);
+ cseEnabledFS.initialize(getFileSystem().getUri(), cseEnabledFS.getConf());
+ assertEquals("number of files didn't match", 1,
+ cseEnabledFS.listStatus(unEncryptedFilePath).length);
+ }
+
+ /**
+ * Testing how unencrypted length is calculated/fetched under different
circumstances
+ * when CSE is enabled.
+ * @throws IOException
+ */
+ @Test
+ public void testEncryptionWithDifferentWaysForCalculatingUnencryptedLength()
throws IOException {
+ maybeSkipTest();
+ S3AFileSystem cseEnabledFS = getFileSystem();
+ Path filePath = path(getMethodName());
+ String key = cseEnabledFS.pathToKey(filePath);
+
+ // write object with unencrypted with random content length header
+ Map<String, String> metadata = new HashMap<>();
+ metadata.put(AWSHeaders.UNENCRYPTED_CONTENT_LENGTH, "10");
+ try (AuditSpan span = span()) {
+ PutObjectRequest request = PutObjectRequest.builder()
Review Comment:
use RequestFactory through S3AStore
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
Review Comment:
import ordering on new files. always
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws
Exception {
}
}
+ /**
+ * Testing how file name with suffix ".instruction" is ignored when CSE is
enabled
+ * based on configurations.
+ * @throws IOException
+ */
+ @Test
+ public void testEncryptionWithInstructionFile() throws IOException {
+ maybeSkipTest();
+ S3AFileSystem cseDisabledFS = new S3AFileSystem();
+ Configuration cseDisabledConf = getConfiguration();
Review Comment:
new Configuration(getConfiguration())
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
Review Comment:
javadocs. always. save time on my reviewing and you waiting for the reviews
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws
Exception {
}
}
+ /**
+ * Testing how file name with suffix ".instruction" is ignored when CSE is
enabled
+ * based on configurations.
+ * @throws IOException
+ */
+ @Test
+ public void testEncryptionWithInstructionFile() throws IOException {
+ maybeSkipTest();
+ S3AFileSystem cseDisabledFS = new S3AFileSystem();
+ Configuration cseDisabledConf = getConfiguration();
+ S3AFileSystem cseEnabledFS = getFileSystem();
+ Path unEncryptedFilePath = path(getMethodName());
+ Path unEncryptedFile = new Path(unEncryptedFilePath,
+ "file" + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+
+ // Initialize a CSE disabled FS.
+ removeBaseAndBucketOverrides(getTestBucketName(cseDisabledConf),
+ cseDisabledConf,
+ S3_ENCRYPTION_ALGORITHM,
+ S3_ENCRYPTION_KEY,
+ SERVER_SIDE_ENCRYPTION_ALGORITHM,
+ SERVER_SIDE_ENCRYPTION_KEY);
+ cseDisabledFS.initialize(getFileSystem().getUri(),
+ cseDisabledConf);
+
+ // Unencrypted data written to a path.
+ try (FSDataOutputStream out = cseDisabledFS.create(unEncryptedFile)) {
+ out.write(new byte[SMALL_FILE_SIZE]);
+ }
+
+ // list from cse disabled FS
+ assertEquals("number of files didn't match", 1,
+ cseDisabledFS.listStatus(unEncryptedFilePath).length);
+
+ // list from cse enabled fs with skipping of instruction file
+ cseEnabledFS.getConf().setBoolean(S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE,
true);
Review Comment:
and now you are trying to patch the settings of the default fs and call
initialize again. Do not do this. In fact, if we don't have a check to stop
initialize() being called twice -we should
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionCustom.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
+import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
+
+import static
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
+
+public class ITestS3AClientSideEncryptionCustom extends
ITestS3AClientSideEncryption {
Review Comment:
javadoc
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java:
##########
@@ -153,11 +154,16 @@ public S3AsyncClient createS3AsyncClient(
.thresholdInBytes(parameters.getMultiPartThreshold())
.build();
- return configureClientBuilder(S3AsyncClient.builder(), parameters, conf,
bucket)
- .httpClientBuilder(httpClientBuilder)
- .multipartConfiguration(multipartConfiguration)
- .multipartEnabled(parameters.isMultipartCopy())
- .build();
+ S3AsyncClientBuilder s3AsyncClientBuilder =
+ configureClientBuilder(S3AsyncClient.builder(), parameters, conf,
bucket)
+ .httpClientBuilder(httpClientBuilder);
+
+ if (!parameters.isClientSideEncryptionEnabled()) {
Review Comment:
shouldn't this be part of configureClientBuilder?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+ private static final String ENCRYPTION_CLIENT_CLASSNAME =
+ "software.amazon.encryption.s3.S3EncryptionClient";
+
+ /**
+ * Encryption client availability.
+ */
+ private static final boolean ENCRYPTION_CLIENT_FOUND =
checkForEncryptionClient();
+
+ /**
+ * S3Client to be wrapped by encryption client.
+ */
+ private S3Client s3Client;
+
+ /**
+ * S3AsyncClient to be wrapped by encryption client.
+ */
+ private S3AsyncClient s3AsyncClient;
+
+ private static boolean checkForEncryptionClient() {
+ try {
+ ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+ cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+ LOG.debug("encryption client class {} found",
ENCRYPTION_CLIENT_CLASSNAME);
+ return true;
+ } catch (Exception e) {
+ LOG.debug("encryption client class {} not found",
ENCRYPTION_CLIENT_CLASSNAME, e);
+ return false;
+ }
+ }
+
+ /**
+ * Is the Encryption client available?
+ * @return true if it was found in the classloader
+ */
+ private static synchronized boolean isEncryptionClientAvailable() {
+ return ENCRYPTION_CLIENT_FOUND;
+ }
+
+ /**
+ * Create encrypted s3 client.
+ * @param uri S3A file system URI
+ * @param parameters parameter object
+ * @return encrypted s3 client
+ * @throws IOException IO failures
+ */
+ @Override
+ public S3Client createS3Client(URI uri, S3ClientCreationParameters
parameters)
+ throws IOException {
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+ "No encryption client available");
+ }
+
+ s3Client = super.createS3Client(uri, parameters);
+ s3AsyncClient = super.createS3AsyncClient(uri, parameters);
+
+ return
createS3EncryptionClient(parameters.getClientSideEncryptionMaterials());
+ }
+
+ /**
+ * Create async encrypted s3 client.
Review Comment:
note that it only works if createS3Client was called first
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+ private static final String ENCRYPTION_CLIENT_CLASSNAME =
+ "software.amazon.encryption.s3.S3EncryptionClient";
+
+ /**
+ * Encryption client availability.
+ */
+ private static final boolean ENCRYPTION_CLIENT_FOUND =
checkForEncryptionClient();
+
+ /**
+ * S3Client to be wrapped by encryption client.
+ */
+ private S3Client s3Client;
+
+ /**
+ * S3AsyncClient to be wrapped by encryption client.
+ */
+ private S3AsyncClient s3AsyncClient;
+
+ private static boolean checkForEncryptionClient() {
+ try {
+ ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+ cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+ LOG.debug("encryption client class {} found",
ENCRYPTION_CLIENT_CLASSNAME);
+ return true;
+ } catch (Exception e) {
+ LOG.debug("encryption client class {} not found",
ENCRYPTION_CLIENT_CLASSNAME, e);
+ return false;
+ }
+ }
+
+ /**
+ * Is the Encryption client available?
+ * @return true if it was found in the classloader
+ */
+ private static synchronized boolean isEncryptionClientAvailable() {
+ return ENCRYPTION_CLIENT_FOUND;
+ }
+
+ /**
+ * Create encrypted s3 client.
+ * @param uri S3A file system URI
+ * @param parameters parameter object
+ * @return encrypted s3 client
+ * @throws IOException IO failures
+ */
+ @Override
+ public S3Client createS3Client(URI uri, S3ClientCreationParameters
parameters)
+ throws IOException {
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+ "No encryption client available");
+ }
+
+ s3Client = super.createS3Client(uri, parameters);
+ s3AsyncClient = super.createS3AsyncClient(uri, parameters);
+
+ return
createS3EncryptionClient(parameters.getClientSideEncryptionMaterials());
+ }
+
+ /**
+ * Create async encrypted s3 client.
+ * @param uri S3A file system URI
+ * @param parameters parameter object
+ * @return async encrypted s3 client
+ * @throws IOException IO failures
+ */
+ @Override
+ public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters
parameters)
+ throws IOException {
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+ "No encryption client available");
+ }
+ return
createS3AsyncEncryptionClient(parameters.getClientSideEncryptionMaterials());
+ }
+
+ private S3Client createS3EncryptionClient(final CSEMaterials cseMaterials) {
+ S3EncryptionClient.Builder s3EncryptionClientBuilder =
+
S3EncryptionClient.builder().wrappedAsyncClient(s3AsyncClient).wrappedClient(s3Client)
+ // this is required for doing S3 ranged GET calls
+ .enableLegacyUnauthenticatedModes(true)
+ // this is required for backward compatibility with older
encryption clients
+ .enableLegacyWrappingAlgorithms(true);
+
+ switch (cseMaterials.getCseKeyType()) {
+ case KMS:
+ s3EncryptionClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
+ break;
+ case CUSTOM:
+ Keyring keyring =
getKeyringProvider(cseMaterials.getCustomKeyringClassName(),
+ cseMaterials.getConf());
+ CryptographicMaterialsManager cmm =
DefaultCryptoMaterialsManager.builder()
+ .keyring(keyring)
+ .build();
+ s3EncryptionClientBuilder.cryptoMaterialsManager(cmm);
+ break;
+ default:
+ break;
+ }
+
+ return s3EncryptionClientBuilder.build();
+ }
+
+ private S3AsyncClient createS3AsyncEncryptionClient(final CSEMaterials
cseMaterials) {
+ S3AsyncEncryptionClient.Builder s3EncryptionAsyncClientBuilder =
+ S3AsyncEncryptionClient.builder().wrappedClient(s3AsyncClient)
+ // this is required for doing S3 ranged GET calls
+ .enableLegacyUnauthenticatedModes(true)
+ // this is required for backward compatibility with older
encryption clients
+ .enableLegacyWrappingAlgorithms(true);
+
+ switch (cseMaterials.getCseKeyType()) {
+ case KMS:
+ s3EncryptionAsyncClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
+ break;
+ case CUSTOM:
+ Keyring keyring =
getKeyringProvider(cseMaterials.getCustomKeyringClassName(),
+ cseMaterials.getConf());
+ CryptographicMaterialsManager cmm =
DefaultCryptoMaterialsManager.builder()
+ .keyring(keyring)
+ .build();
+ s3EncryptionAsyncClientBuilder.cryptoMaterialsManager(cmm);
+ break;
+ default:
+ break;
+ }
+
+ return s3EncryptionAsyncClientBuilder.build();
+ }
+
+ /**
+ * Get the custom Keyring class.
+ * @param className
+ * @param conf
+ * @return
+ */
+ private Keyring getKeyringProvider(String className,
+ Configuration conf) {
+ try {
+ return
ReflectionUtils.newInstance(getCustomKeyringProviderClass(className), conf);
+ } catch (Exception e) {
+ // this is for testing purpose to support CustomKeyring.java
+ return
ReflectionUtils.newInstance(getCustomKeyringProviderClass(className), conf,
+ new Class[] {Configuration.class}, conf);
+ }
+ }
+
+ private Class<? extends Keyring> getCustomKeyringProviderClass(String
className) {
+ if (Strings.isNullOrEmpty(className)) {
+ throw new IllegalArgumentException(
Review Comment:
Preconditions.checkArgument()
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws
Exception {
}
}
+ /**
+ * Testing how file name with suffix ".instruction" is ignored when CSE is
enabled
+ * based on configurations.
+ * @throws IOException
+ */
+ @Test
+ public void testEncryptionWithInstructionFile() throws IOException {
+ maybeSkipTest();
+ S3AFileSystem cseDisabledFS = new S3AFileSystem();
+ Configuration cseDisabledConf = getConfiguration();
+ S3AFileSystem cseEnabledFS = getFileSystem();
Review Comment:
this is ref to the normal fs
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+ private static final String ENCRYPTION_CLIENT_CLASSNAME =
+ "software.amazon.encryption.s3.S3EncryptionClient";
+
+ /**
+ * Encryption client availability.
+ */
+ private static final boolean ENCRYPTION_CLIENT_FOUND =
checkForEncryptionClient();
+
+ /**
+ * S3Client to be wrapped by encryption client.
+ */
+ private S3Client s3Client;
+
+ /**
+ * S3AsyncClient to be wrapped by encryption client.
+ */
+ private S3AsyncClient s3AsyncClient;
+
+ private static boolean checkForEncryptionClient() {
+ try {
+ ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+ cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+ LOG.debug("encryption client class {} found",
ENCRYPTION_CLIENT_CLASSNAME);
+ return true;
+ } catch (Exception e) {
+ LOG.debug("encryption client class {} not found",
ENCRYPTION_CLIENT_CLASSNAME, e);
+ return false;
+ }
+ }
+
+ /**
+ * Is the Encryption client available?
+ * @return true if it was found in the classloader
+ */
+ private static synchronized boolean isEncryptionClientAvailable() {
+ return ENCRYPTION_CLIENT_FOUND;
+ }
+
+ /**
+ * Create encrypted s3 client.
+ * @param uri S3A file system URI
+ * @param parameters parameter object
+ * @return encrypted s3 client
+ * @throws IOException IO failures
+ */
+ @Override
+ public S3Client createS3Client(URI uri, S3ClientCreationParameters
parameters)
Review Comment:
So we need to create both of these clients before creating encryption code?
I am worried that this class now has an implicit expectation that the
synchronous S3 client is always created before the async one.
1. javadocs MUST State this.
2. async code must use Preconditions check to fail meaningfully if it is
ever called before/without the sync client being created.
What will happen if/when we move to async only?
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+ private static final String ENCRYPTION_CLIENT_CLASSNAME =
+ "software.amazon.encryption.s3.S3EncryptionClient";
+
+ /**
+ * Encryption client availability.
+ */
+ private static final boolean ENCRYPTION_CLIENT_FOUND =
checkForEncryptionClient();
+
+ /**
+ * S3Client to be wrapped by encryption client.
+ */
+ private S3Client s3Client;
+
+ /**
+ * S3AsyncClient to be wrapped by encryption client.
+ */
+ private S3AsyncClient s3AsyncClient;
+
+ private static boolean checkForEncryptionClient() {
+ try {
+ ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+ cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+ LOG.debug("encryption client class {} found",
ENCRYPTION_CLIENT_CLASSNAME);
+ return true;
+ } catch (Exception e) {
+ LOG.debug("encryption client class {} not found",
ENCRYPTION_CLIENT_CLASSNAME, e);
+ return false;
+ }
+ }
+
+ /**
+ * Is the Encryption client available?
+ * @return true if it was found in the classloader
+ */
+ private static synchronized boolean isEncryptionClientAvailable() {
+ return ENCRYPTION_CLIENT_FOUND;
+ }
+
+ /**
+ * Create encrypted s3 client.
+ * @param uri S3A file system URI
+ * @param parameters parameter object
+ * @return encrypted s3 client
+ * @throws IOException IO failures
+ */
+ @Override
+ public S3Client createS3Client(URI uri, S3ClientCreationParameters
parameters)
+ throws IOException {
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+ "No encryption client available");
+ }
+
+ s3Client = super.createS3Client(uri, parameters);
+ s3AsyncClient = super.createS3AsyncClient(uri, parameters);
+
+ return
createS3EncryptionClient(parameters.getClientSideEncryptionMaterials());
+ }
+
+ /**
+ * Create async encrypted s3 client.
+ * @param uri S3A file system URI
+ * @param parameters parameter object
+ * @return async encrypted s3 client
+ * @throws IOException IO failures
+ */
+ @Override
+ public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters
parameters)
+ throws IOException {
+ if (!isEncryptionClientAvailable()) {
+ throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+ "No encryption client available");
+ }
+ return
createS3AsyncEncryptionClient(parameters.getClientSideEncryptionMaterials());
+ }
+
+ private S3Client createS3EncryptionClient(final CSEMaterials cseMaterials) {
Review Comment:
javadocs
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -726,9 +742,11 @@ public void close() {
*/
static class AcceptFilesOnly implements FileStatusAcceptor {
Review Comment:
create a new subclass to perform the skipping, choose which one to pass down
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java:
##########
@@ -876,11 +877,15 @@ private void uploadBlockAsync(final
S3ADataBlocks.DataBlock block,
? RequestBody.fromFile(uploadData.getFile())
: RequestBody.fromInputStream(uploadData.getUploadStream(), size);
- request = writeOperationHelper.newUploadPartRequestBuilder(
+ UploadPartRequest.Builder requestBuilder =
writeOperationHelper.newUploadPartRequestBuilder(
key,
uploadId,
currentPartNumber,
- size).build();
+ size);
+ if (isLast) {
Review Comment:
see CommitOperations
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -1409,7 +1409,7 @@ public interface InputStreamCallbacks extends Closeable {
* @return the response
*/
@Retries.OnceRaw
- ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request);
+ ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request)
throws IOException;
Review Comment:
update javadocs to explain change
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws
Exception {
}
}
+ /**
+ * Testing how file name with suffix ".instruction" is ignored when CSE is
enabled
+ * based on configurations.
+ * @throws IOException
+ */
+ @Test
+ public void testEncryptionWithInstructionFile() throws IOException {
+ maybeSkipTest();
+ S3AFileSystem cseDisabledFS = new S3AFileSystem();
+ Configuration cseDisabledConf = getConfiguration();
+ S3AFileSystem cseEnabledFS = getFileSystem();
+ Path unEncryptedFilePath = path(getMethodName());
+ Path unEncryptedFile = new Path(unEncryptedFilePath,
+ "file" + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+
+ // Initialize a CSE disabled FS.
+ removeBaseAndBucketOverrides(getTestBucketName(cseDisabledConf),
+ cseDisabledConf,
+ S3_ENCRYPTION_ALGORITHM,
+ S3_ENCRYPTION_KEY,
+ SERVER_SIDE_ENCRYPTION_ALGORITHM,
+ SERVER_SIDE_ENCRYPTION_KEY);
+ cseDisabledFS.initialize(getFileSystem().getUri(),
+ cseDisabledConf);
+
+ // Unencrypted data written to a path.
+ try (FSDataOutputStream out = cseDisabledFS.create(unEncryptedFile)) {
Review Comment:
ContractTestUtils has a method for this
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java:
##########
@@ -118,6 +119,13 @@ final class S3ClientCreationParameters {
*/
private StatisticsFromAwsSdk metrics;
+ private Boolean isCSEEnabled = false;
Review Comment:
javadoc
##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md:
##########
@@ -674,10 +674,18 @@ clients where S3-CSE has not been enabled.
### Features
-- Supports client side encryption with keys managed in AWS KMS.
+- Supports client side encryption with keys managed in AWS KMS (CSE-KMS)
+- Supports client side encryption with custom keys by implementing custom
[Keyring](https://docs.aws.amazon.com/encryption-sdk/latest/developer-guide/choose-keyring.html)
(CSE-CUSTOM)
+- Backward compatible with older encryption clients like
`AmazonS3EncryptionClient.java`(V1) and `AmazonS3EncryptionClientV2.java`(V2)
- encryption settings propagated into jobs through any issued delegation
tokens.
- encryption information stored as headers in the uploaded object.
+### Compatibility
+
+- V1 and V2 client suppports reading unencrypted s3 object where as V3 client
does not support. Inorder to read s3 objects in a directory with mix of
encrypted and unencrypted objects when CSE is enabled set
`fs.s3a.encryption.cse.read.unencrypted.objects=true`
Review Comment:
spelling
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java:
##########
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.concurrent.Executor;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
Review Comment:
imports
##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md:
##########
@@ -674,10 +674,18 @@ clients where S3-CSE has not been enabled.
### Features
-- Supports client side encryption with keys managed in AWS KMS.
+- Supports client side encryption with keys managed in AWS KMS (CSE-KMS)
+- Supports client side encryption with custom keys by implementing custom
[Keyring](https://docs.aws.amazon.com/encryption-sdk/latest/developer-guide/choose-keyring.html)
(CSE-CUSTOM)
+- Backward compatible with older encryption clients like
`AmazonS3EncryptionClient.java`(V1) and `AmazonS3EncryptionClientV2.java`(V2)
- encryption settings propagated into jobs through any issued delegation
tokens.
- encryption information stored as headers in the uploaded object.
+### Compatibility
+
+- V1 and V2 client suppports reading unencrypted s3 object where as V3 client
does not support. Inorder to read s3 objects in a directory with mix of
encrypted and unencrypted objects when CSE is enabled set
`fs.s3a.encryption.cse.read.unencrypted.objects=true`
+- Unlike V2 and V3 client which always pads 16 bytes, V1 client pads extra
bytes to the next multiple of 16. For example if unencrypted object size is
12bytes, V1 client pads extra 4bytes to make it multiple of 16. So inorder to
read objects encrypted by V1 client, set
`fs.s3a.encryption.cse.object.size.ranged.get.enabled=true`
Review Comment:
use "the" where it helps readability
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import io.netty.util.internal.StringUtil;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
[email protected]
[email protected]
+public final class CSEUtils {
+
+ private CSEUtils() {
+ }
+
+ /**
+ * Checks if the file suffix ends with
+ * {@link
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX}
+ * when the config
+ * {@link
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE_DEFAULT}
+ * is enabled and CSE is used.
+ * @param skipCSEInstructionFile whether to skip checking for the filename
suffix
+ * @param key file name
+ * @return true if cse is disabled or if skipping of instruction file is
disabled or file name
+ * does not end with defined suffix
+ */
+ public static boolean isCSEInstructionFile(boolean skipCSEInstructionFile,
String key) {
+ if (!skipCSEInstructionFile) {
+ return true;
+ }
+ return !key.endsWith(S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+ }
+
+ /**
+ * Checks if CSE-KMS or CSE-CUSTOM is set.
+ * @param encryptionMethod type of encryption used
+ * @return true if encryption method is CSE-KMS or CSE-CUSTOM
+ */
+ public static boolean isCSEKmsOrCustom(String encryptionMethod) {
+ return S3AEncryptionMethods.CSE_KMS.getMethod().equals(encryptionMethod) ||
+ S3AEncryptionMethods.CSE_CUSTOM.getMethod().equals(encryptionMethod);
+ }
+
+ /**
+ * Checks if a given S3 object is encrypted or not by checking following two
conditions
+ * 1. if object metadata contains x-amz-cek-alg
+ * 2. if instruction file is present
+ *
+ * @param s3Client S3 client
+ * @param bucket bucket name of the s3 object
+ * @param key key value of the s3 object
+ * @return true if S3 object is encrypted
+ */
+ public static boolean isObjectEncrypted(S3Client s3Client, String bucket,
String key) {
Review Comment:
needs to use s3a request bulder for things like requesterPays, audit headers
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import io.netty.util.internal.StringUtil;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
[email protected]
[email protected]
+public final class CSEUtils {
+
+ private CSEUtils() {
+ }
+
+ /**
+ * Checks if the file suffix ends with
+ * {@link
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX}
+ * when the config
+ * {@link
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE_DEFAULT}
+ * is enabled and CSE is used.
+ * @param skipCSEInstructionFile whether to skip checking for the filename
suffix
+ * @param key file name
+ * @return true if cse is disabled or if skipping of instruction file is
disabled or file name
+ * does not end with defined suffix
+ */
+ public static boolean isCSEInstructionFile(boolean skipCSEInstructionFile,
String key) {
+ if (!skipCSEInstructionFile) {
+ return true;
+ }
+ return !key.endsWith(S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+ }
+
+ /**
+ * Checks if CSE-KMS or CSE-CUSTOM is set.
+ * @param encryptionMethod type of encryption used
+ * @return true if encryption method is CSE-KMS or CSE-CUSTOM
+ */
+ public static boolean isCSEKmsOrCustom(String encryptionMethod) {
+ return S3AEncryptionMethods.CSE_KMS.getMethod().equals(encryptionMethod) ||
+ S3AEncryptionMethods.CSE_CUSTOM.getMethod().equals(encryptionMethod);
+ }
+
+ /**
+ * Checks if a given S3 object is encrypted or not by checking following two
conditions
+ * 1. if object metadata contains x-amz-cek-alg
+ * 2. if instruction file is present
+ *
+ * @param s3Client S3 client
+ * @param bucket bucket name of the s3 object
+ * @param key key value of the s3 object
+ * @return true if S3 object is encrypted
+ */
+ public static boolean isObjectEncrypted(S3Client s3Client, String bucket,
String key) {
+ HeadObjectRequest request = HeadObjectRequest.builder()
+ .bucket(bucket)
+ .key(key)
+ .build();
+ HeadObjectResponse headObjectResponse = s3Client.headObject(request);
+ if (headObjectResponse.hasMetadata() &&
+ headObjectResponse.metadata().get(CRYPTO_CEK_ALGORITHM) != null) {
+ return true;
+ }
+ HeadObjectRequest instructionFileCheckRequest = HeadObjectRequest.builder()
Review Comment:
so a double check is made for an object and instruction file? this is going
to hurt
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1109,6 +1136,44 @@ private ClientManager createClientManager(URI fsURI,
boolean dtEnabled) throws I
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
S3ClientFactory.class);
+ S3ClientFactory clientFactory;
+ S3ClientFactory unecnryptedClientFactory = null;
+ CSEMaterials cseMaterials = null;
+
+ if (isCSEEnabled) {
Review Comment:
this should be pulled out somewhere. goal: PRs should leave the
s3aFilesystem class cleaner and clearer than before
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java:
##########
@@ -26,6 +26,7 @@
import java.util.Map;
import java.util.concurrent.Executor;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
Review Comment:
imports
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/CustomKeyring.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.List;
+
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.encryption.s3.materials.DecryptionMaterials;
+import software.amazon.encryption.s3.materials.EncryptedDataKey;
+import software.amazon.encryption.s3.materials.EncryptionMaterials;
+import software.amazon.encryption.s3.materials.Keyring;
+import software.amazon.encryption.s3.materials.KmsKeyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
+import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION;
+
+/**
+ * Custom Keyring implementation by warpping over KmsKeyring.
+ * This is used for testing {@link ITestS3AClientSideEncryptionCustom}.
+ */
+public class CustomKeyring implements Keyring {
+ private final KmsClient kmsClient;
+ private final Configuration conf;
+ private final KmsKeyring kmsKeyring;
+
+
+ public CustomKeyring(Configuration conf) throws IOException {
+ this.conf = conf;
+ kmsClient = KmsClient.builder().region(Region.of(conf.get(AWS_REGION,
AWS_S3_DEFAULT_REGION)))
+ .credentialsProvider(new TemporaryAWSCredentialsProvider(
+ new Path(conf.get("test.fs.s3a.name")).toUri(), conf))
Review Comment:
test utils getFsName()
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/CustomKeyring.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.List;
+
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.encryption.s3.materials.DecryptionMaterials;
+import software.amazon.encryption.s3.materials.EncryptedDataKey;
+import software.amazon.encryption.s3.materials.EncryptionMaterials;
+import software.amazon.encryption.s3.materials.Keyring;
+import software.amazon.encryption.s3.materials.KmsKeyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
+import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION;
+
+/**
+ * Custom Keyring implementation by warpping over KmsKeyring.
+ * This is used for testing {@link ITestS3AClientSideEncryptionCustom}.
+ */
+public class CustomKeyring implements Keyring {
+ private final KmsClient kmsClient;
+ private final Configuration conf;
+ private final KmsKeyring kmsKeyring;
+
+
+ public CustomKeyring(Configuration conf) throws IOException {
+ this.conf = conf;
+ kmsClient = KmsClient.builder().region(Region.of(conf.get(AWS_REGION,
AWS_S3_DEFAULT_REGION)))
+ .credentialsProvider(new TemporaryAWSCredentialsProvider(
+ new Path(conf.get("test.fs.s3a.name")).toUri(), conf))
+ .build();
+ kmsKeyring = KmsKeyring.builder()
+ .kmsClient(kmsClient)
+ .wrappingKeyId(conf.get(Constants.S3_ENCRYPTION_KEY)).build();
Review Comment:
you need to use getS3EncryptionKey() so secrets in jceks files are picked up
and per-bucket-settings used
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws
Exception {
}
}
+ /**
+ * Testing how file name with suffix ".instruction" is ignored when CSE is
enabled
+ * based on configurations.
+ * @throws IOException
+ */
+ @Test
+ public void testEncryptionWithInstructionFile() throws IOException {
+ maybeSkipTest();
+ S3AFileSystem cseDisabledFS = new S3AFileSystem();
+ Configuration cseDisabledConf = getConfiguration();
+ S3AFileSystem cseEnabledFS = getFileSystem();
+ Path unEncryptedFilePath = path(getMethodName());
+ Path unEncryptedFile = new Path(unEncryptedFilePath,
+ "file" + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+
+ // Initialize a CSE disabled FS.
+ removeBaseAndBucketOverrides(getTestBucketName(cseDisabledConf),
+ cseDisabledConf,
+ S3_ENCRYPTION_ALGORITHM,
+ S3_ENCRYPTION_KEY,
+ SERVER_SIDE_ENCRYPTION_ALGORITHM,
+ SERVER_SIDE_ENCRYPTION_KEY);
+ cseDisabledFS.initialize(getFileSystem().getUri(),
+ cseDisabledConf);
+
+ // Unencrypted data written to a path.
+ try (FSDataOutputStream out = cseDisabledFS.create(unEncryptedFile)) {
+ out.write(new byte[SMALL_FILE_SIZE]);
+ }
+
+ // list from cse disabled FS
+ assertEquals("number of files didn't match", 1,
+ cseDisabledFS.listStatus(unEncryptedFilePath).length);
+
+ // list from cse enabled fs with skipping of instruction file
+ cseEnabledFS.getConf().setBoolean(S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE,
true);
+ cseEnabledFS.initialize(getFileSystem().getUri(), cseEnabledFS.getConf());
+ assertEquals("number of files didn't match", 0,
+ cseEnabledFS.listStatus(unEncryptedFilePath).length);
+
+ // disable skipping cse instruction file.
+ cseEnabledFS.getConf().setBoolean(S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE,
false);
+ cseEnabledFS.initialize(getFileSystem().getUri(), cseEnabledFS.getConf());
+ assertEquals("number of files didn't match", 1,
Review Comment:
AssertJ
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionCustom.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
+import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
+
+import static
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
+
+public class ITestS3AClientSideEncryptionCustom extends
ITestS3AClientSideEncryption {
+
+ private static final String KMS_KEY_WRAP_ALGO = "kms+context";
+ /**
+ * Creating custom configs for CSE-CUSTOM testing.
+ *
+ * @return Configuration.
+ */
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ S3ATestUtils.disableFilesystemCaching(conf);
+ conf.set(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME,
+ CustomKeyring.class.getCanonicalName());
+ return conf;
+ }
+
+ @Override
+ protected void maybeSkipTest() throws IOException {
+ skipIfEncryptionTestsDisabled(getConfiguration());
+ // skip the test if CSE-CUSTOM is not set.
+ skipIfEncryptionNotSet(getConfiguration(),
S3AEncryptionMethods.CSE_CUSTOM);
+ }
+
+
+ @Override
+ protected void assertEncrypted(Path path) throws IOException {
+ Map<String, byte[]> fsXAttrs = getFileSystem().getXAttrs(path);
+ String xAttrPrefix = "header.";
+
+ // Assert KeyWrap Algo
+ assertEquals("Key wrap algo isn't same as expected", KMS_KEY_WRAP_ALGO,
+ processHeader(fsXAttrs,
+ xAttrPrefix + AWSHeaders.CRYPTO_KEYWRAP_ALGORITHM));
+ }
+
+ /**
+ * A method to process a FS xAttribute Header key by decoding it.
+ *
+ * @param fsXAttrs Map of String(Key) and bytes[](Value) to represent fs
+ * xAttr.
+ * @param headerKey Key value of the header we are trying to process.
+ * @return String representing the value of key header provided.
+ */
+ private String processHeader(Map<String, byte[]> fsXAttrs,
+ String headerKey) {
+ return HeaderProcessing.decodeBytes(fsXAttrs.get(headerKey));
Review Comment:
fail meaningfull if the header isnt found
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java:
##########
@@ -527,20 +529,29 @@ public static String stringify(AwsServiceException e) {
* @param owner owner of the file
* @param eTag S3 object eTag or null if unavailable
* @param versionId S3 object versionId or null if unavailable
+ * @param s3Client s3 client object
* @param isCSEEnabled is client side encryption enabled?
+ * @param bucket s3 bucket name
+ * @param cseRangedGetEnabled is ranged get enabled
+ * @param cseReadUnencryptedObjects is read unencrypted object enabled
* @return a status entry
+ * @throws IOException IO failures
*/
public static S3AFileStatus createFileStatus(Path keyPath,
Review Comment:
wrong place to do this.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import io.netty.util.internal.StringUtil;
Review Comment:
imports
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java:
##########
@@ -128,6 +136,13 @@ private CallableRaisingIOE<S3AsyncClient>
createAyncClient() {
() -> clientFactory.createS3AsyncClient(getUri(),
clientCreationParameters));
}
+ private CallableRaisingIOE<S3Client> createUnencryptedS3Client() {
Review Comment:
Needs better handling of deployments without this, so raising
IllegalStateException when getOrCreateUnencryptedS3Client() is called. With a
test.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java:
##########
@@ -325,19 +325,6 @@ private Map<String, byte[]> retrieveHeaders(
md.contentEncoding());
maybeSetHeader(headers, XA_CONTENT_LANGUAGE,
md.contentLanguage());
- // If CSE is enabled, use the unencrypted content length.
Review Comment:
uncomment?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java:
##########
@@ -637,4 +640,9 @@ private static void skipIfCrossRegionClient(
skip("Skipping test as cross region client is in use ");
}
}
+
+ private static void unsetEncryption(Configuration conf) {
Review Comment:
javadoc. always the javadocs.
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java:
##########
@@ -84,25 +86,31 @@ public class ClientManagerImpl implements ClientManager {
/** Async client is used for transfer manager. */
private final LazyAutoCloseableReference<S3AsyncClient> s3AsyncClient;
+ private final LazyAutoCloseableReference<S3Client> unencryptedS3Client;
Review Comment:
javaadocs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]