[ 
https://issues.apache.org/jira/browse/HADOOP-18708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863800#comment-17863800
 ] 

ASF GitHub Bot commented on HADOOP-18708:
-----------------------------------------

steveloughran commented on code in PR #6884:
URL: https://github.com/apache/hadoop/pull/6884#discussion_r1668510834


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/AWSHeaders.java:
##########
@@ -83,6 +83,11 @@ public interface AWSHeaders {
    */
   String CRYPTO_CEK_ALGORITHM = "x-amz-cek-alg";
 
+  /**
+   * Header for unencrypted content length of an object.

Review Comment:
   add a {@value}  for better IDE displays



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java:
##########
@@ -61,6 +61,8 @@ public class ClientManagerImpl implements ClientManager {
    */
   private final S3ClientFactory clientFactory;
 
+  private final S3ClientFactory unencryptedClientFactory;

Review Comment:
   javadoc



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java:
##########
@@ -153,6 +168,12 @@ public synchronized S3AsyncClient getOrCreateAsyncClient() 
throws IOException {
     return s3AsyncClient.eval();
   }
 
+  @Override

Review Comment:
   javadoc



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManagerImpl.java:
##########
@@ -84,25 +86,31 @@ public class ClientManagerImpl implements ClientManager {
   /** Async client is used for transfer manager. */
   private final LazyAutoCloseableReference<S3AsyncClient> s3AsyncClient;
 
+  private final LazyAutoCloseableReference<S3Client> unencryptedS3Client;

Review Comment:
   javadoc



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import io.netty.util.internal.StringUtil;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static 
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class CSEUtils {
+
+  private CSEUtils() {
+  }
+
+  /**
+   * Checks if the file suffix ends with
+   * {@link 
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX}
+   * when the config
+   * {@link 
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE_DEFAULT}
+   * is enabled and CSE is used.
+   * @param skipCSEInstructionFile whether to skip checking for the filename 
suffix
+   * @param key file name
+   * @return true if cse is disabled or if skipping of instruction file is 
disabled or file name
+   * does not end with defined suffix
+   */
+  public static boolean isCSEInstructionFile(boolean skipCSEInstructionFile, 
String key) {
+    if (!skipCSEInstructionFile) {
+      return true;
+    }
+    return !key.endsWith(S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+  }
+
+  /**
+   * Checks if CSE-KMS or CSE-CUSTOM is set.
+   * @param encryptionMethod type of encryption used
+   * @return true if encryption method is CSE-KMS or CSE-CUSTOM
+   */
+  public static boolean isCSEKmsOrCustom(String encryptionMethod) {
+    return S3AEncryptionMethods.CSE_KMS.getMethod().equals(encryptionMethod) ||
+        S3AEncryptionMethods.CSE_CUSTOM.getMethod().equals(encryptionMethod);
+  }
+
+  /**
+   * Checks if a given S3 object is encrypted or not by checking following two 
conditions
+   * 1. if object metadata contains x-amz-cek-alg
+   * 2. if instruction file is present
+   *
+   * @param s3Client S3 client
+   * @param bucket   bucket name of the s3 object
+   * @param key      key value of the s3 object
+   * @return true if S3 object is encrypted
+   */
+  public static boolean isObjectEncrypted(S3Client s3Client, String bucket, 
String key) {
+    HeadObjectRequest request = HeadObjectRequest.builder()
+        .bucket(bucket)
+        .key(key)
+        .build();
+    HeadObjectResponse headObjectResponse = s3Client.headObject(request);
+    if (headObjectResponse.hasMetadata() &&
+        headObjectResponse.metadata().get(CRYPTO_CEK_ALGORITHM) != null) {
+      return true;
+    }
+    HeadObjectRequest instructionFileCheckRequest = HeadObjectRequest.builder()
+        .bucket(bucket)
+        .key(key + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX)
+        .build();
+    try {
+      s3Client.headObject(instructionFileCheckRequest);
+      return true;
+    } catch (NoSuchKeyException e) {
+      // Ignore. This indicates no instruction file is present
+    }
+    return false;
+  }
+
+  /**
+   * Get the unencrypted object length by either subtracting
+   * {@link InternalConstants#CSE_PADDING_LENGTH} from the object size or 
calculating the
+   * actual size by doing S3 ranged GET operation.
+   *
+   * @param s3Client           S3 client
+   * @param bucket             bucket name of the s3 object
+   * @param key                key value of the s3 object
+   * @param contentLength      S3 object length
+   * @param headObjectResponse response from headObject call
+   * @param cseRangedGetEnabled is ranged get enabled
+   * @param cseReadUnencryptedObjects is reading of une
+   * @return unencrypted length of the object
+   * @throws IOException IO failures
+   */
+  public static long getUnencryptedObjectLength(S3Client s3Client,
+      String bucket,
+      String key,
+      long contentLength,
+      HeadObjectResponse headObjectResponse,
+      boolean cseRangedGetEnabled,
+      boolean cseReadUnencryptedObjects) throws IOException {
+
+    if (cseReadUnencryptedObjects) {
+      // if object is unencrypted, return the actual size
+      if (!isObjectEncrypted(s3Client, bucket, key)) {
+        return contentLength;
+      }
+    }
+
+    // check if unencrypted content length metadata is present or not.
+    if (headObjectResponse != null) {
+      String plaintextLength = 
headObjectResponse.metadata().get(UNENCRYPTED_CONTENT_LENGTH);
+      if (headObjectResponse.hasMetadata() && 
!StringUtil.isNullOrEmpty(plaintextLength)) {
+        return Long.parseLong(plaintextLength);
+      }
+    }
+
+    if (cseRangedGetEnabled) {
+      // identify the unencrypted length by doing a ranged GET operation.
+      if (contentLength >= CSE_PADDING_LENGTH) {
+        long minPlaintextLength = contentLength - CSE_PADDING_LENGTH;
+        if (minPlaintextLength < 0) {
+          minPlaintextLength = 0;
+        }
+        GetObjectRequest getObjectRequest = GetObjectRequest.builder()
+            .bucket(bucket)
+            .key(key)
+            .range(formatRange(minPlaintextLength, contentLength))
+            .build();
+        InputStream inputStream = s3Client.getObject(getObjectRequest);

Review Comment:
   error handling here?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/HeaderProcessing.java:
##########
@@ -325,19 +325,6 @@ private Map<String, byte[]> retrieveHeaders(
         md.contentEncoding());
     maybeSetHeader(headers, XA_CONTENT_LANGUAGE,
         md.contentLanguage());
-    // If CSE is enabled, use the unencrypted content length.
-    // TODO: CSE is not supported yet, add these headers in during CSE work.
-//    if (md.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null

Review Comment:
   should this be uncommented?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static 
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+  private static final String ENCRYPTION_CLIENT_CLASSNAME =
+      "software.amazon.encryption.s3.S3EncryptionClient";
+
+  /**
+   * Encryption client availability.
+   */
+  private static final boolean ENCRYPTION_CLIENT_FOUND = 
checkForEncryptionClient();

Review Comment:
   make this a LazyAtomicReference<Boolean> with the checkForEncryption method 
passed in for on-demand evel. otherwise every single debug log is going to 
include messages, possibly including stack traces



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static 
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+  private static final String ENCRYPTION_CLIENT_CLASSNAME =
+      "software.amazon.encryption.s3.S3EncryptionClient";
+
+  /**
+   * Encryption client availability.
+   */
+  private static final boolean ENCRYPTION_CLIENT_FOUND = 
checkForEncryptionClient();
+
+  /**
+   * S3Client to be wrapped by encryption client.
+   */
+  private S3Client s3Client;
+
+  /**
+   * S3AsyncClient to be wrapped by encryption client.
+   */
+  private S3AsyncClient s3AsyncClient;
+
+  private static boolean checkForEncryptionClient() {
+    try {
+      ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+      cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+      LOG.debug("encryption client class {} found", 
ENCRYPTION_CLIENT_CLASSNAME);
+      return true;
+    } catch (Exception e) {
+      LOG.debug("encryption client class {} not found", 
ENCRYPTION_CLIENT_CLASSNAME, e);
+      return false;
+    }
+  }
+
+  /**
+   * Is the Encryption client available?
+   * @return true if it was found in the classloader
+   */
+  private static synchronized boolean isEncryptionClientAvailable() {
+    return ENCRYPTION_CLIENT_FOUND;
+  }
+
+  /**
+   * Create encrypted s3 client.
+   * @param uri S3A file system URI
+   * @param parameters parameter object
+   * @return encrypted s3 client
+   * @throws IOException IO failures
+   */
+  @Override
+  public S3Client createS3Client(URI uri, S3ClientCreationParameters 
parameters)
+      throws IOException {
+    if (!isEncryptionClientAvailable()) {
+      throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+          "No encryption client available");
+    }
+
+    s3Client = super.createS3Client(uri, parameters);
+    s3AsyncClient = super.createS3AsyncClient(uri, parameters);
+
+    return 
createS3EncryptionClient(parameters.getClientSideEncryptionMaterials());
+  }
+
+  /**
+   * Create async encrypted s3 client.
+   * @param uri S3A file system URI
+   * @param parameters parameter object
+   * @return async encrypted s3 client
+   * @throws IOException IO failures
+   */
+  @Override
+  public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters 
parameters)
+      throws IOException {
+    if (!isEncryptionClientAvailable()) {
+      throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+          "No encryption client available");
+    }
+    return 
createS3AsyncEncryptionClient(parameters.getClientSideEncryptionMaterials());
+  }
+
+  private S3Client createS3EncryptionClient(final CSEMaterials cseMaterials) {
+    S3EncryptionClient.Builder s3EncryptionClientBuilder =
+        
S3EncryptionClient.builder().wrappedAsyncClient(s3AsyncClient).wrappedClient(s3Client)
+            // this is required for doing S3 ranged GET calls
+            .enableLegacyUnauthenticatedModes(true)
+            // this is required for backward compatibility with older 
encryption clients
+            .enableLegacyWrappingAlgorithms(true);
+
+    switch (cseMaterials.getCseKeyType()) {
+    case KMS:
+      s3EncryptionClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
+      break;
+    case CUSTOM:
+      Keyring keyring = 
getKeyringProvider(cseMaterials.getCustomKeyringClassName(),
+          cseMaterials.getConf());
+      CryptographicMaterialsManager cmm =  
DefaultCryptoMaterialsManager.builder()
+          .keyring(keyring)
+          .build();
+      s3EncryptionClientBuilder.cryptoMaterialsManager(cmm);
+      break;
+    default:
+      break;
+    }
+
+    return s3EncryptionClientBuilder.build();
+  }
+
+  private S3AsyncClient createS3AsyncEncryptionClient(final CSEMaterials 
cseMaterials) {

Review Comment:
   add a precondition check for s3AsyncClient non null.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java:
##########
@@ -580,14 +581,18 @@ public SinglePendingCommit uploadFileToPendingCommit(File 
localFile,
         for (int partNumber = 1; partNumber <= numParts; partNumber += 1) {
           progress.progress();
           long size = Math.min(length - offset, uploadPartSize);
-          UploadPartRequest part = writeOperations.newUploadPartRequestBuilder(
+          UploadPartRequest.Builder partBuilder = 
writeOperations.newUploadPartRequestBuilder(
               destKey,
               uploadId,
               partNumber,
-              size).build();
+              size);
+          if (partNumber == numParts) {

Review Comment:
   make isLastPart a parameter to the newUploadRequestBuilder method, set it in 
all places it is needed



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -768,16 +787,29 @@ public boolean accept(FileStatus status) {
    */
   static class AcceptAllButS3nDirs implements FileStatusAcceptor {

Review Comment:
   again, new acceptor



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1109,6 +1136,44 @@ private ClientManager createClientManager(URI fsURI, 
boolean dtEnabled) throws I
         S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
         S3ClientFactory.class);
 
+    S3ClientFactory clientFactory;
+    S3ClientFactory unecnryptedClientFactory = null;
+    CSEMaterials cseMaterials = null;
+
+    if (isCSEEnabled) {
+      S3AEncryptionMethods algorithm = getS3EncryptionAlgorithm();
+      switch (algorithm) {
+      case CSE_KMS:
+        String kmsKeyId = getS3EncryptionKey(bucket, conf, true);
+        Preconditions.checkArgument(kmsKeyId != null && !kmsKeyId.isEmpty(),
+            "KMS keyId cannot be null or empty");
+        cseMaterials  = new CSEMaterials()
+            .withCSEKeyType(CSEMaterials.CSEKeyType.KMS)
+            .withConf(conf)
+            .withKmsKeyId(kmsKeyId);
+        break;
+      case CSE_CUSTOM:
+        String customCryptoClassName = 
conf.get(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME);

Review Comment:
   getTrimmed()



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1109,6 +1136,44 @@ private ClientManager createClientManager(URI fsURI, 
boolean dtEnabled) throws I
         S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
         S3ClientFactory.class);
 
+    S3ClientFactory clientFactory;
+    S3ClientFactory unecnryptedClientFactory = null;
+    CSEMaterials cseMaterials = null;
+
+    if (isCSEEnabled) {
+      S3AEncryptionMethods algorithm = getS3EncryptionAlgorithm();
+      switch (algorithm) {
+      case CSE_KMS:
+        String kmsKeyId = getS3EncryptionKey(bucket, conf, true);
+        Preconditions.checkArgument(kmsKeyId != null && !kmsKeyId.isEmpty(),
+            "KMS keyId cannot be null or empty");
+        cseMaterials  = new CSEMaterials()
+            .withCSEKeyType(CSEMaterials.CSEKeyType.KMS)
+            .withConf(conf)
+            .withKmsKeyId(kmsKeyId);
+        break;
+      case CSE_CUSTOM:
+        String customCryptoClassName = 
conf.get(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME);
+        Preconditions.checkArgument(customCryptoClassName != null &&
+                !customCryptoClassName.isEmpty(),
+            "CSE custom cryptographic class name cannot be null or empty");
+        cseMaterials  = new CSEMaterials()
+            .withCSEKeyType(CSEMaterials.CSEKeyType.CUSTOM)
+            .withConf(conf)
+            .withCustomCryptographicClassName(customCryptoClassName);
+        break;
+      default:
+        throw new IllegalArgumentException("Invalid client side encryption 
algorithm."
+            + " Only CSE-KMS and CSE-CUSTOM is supported");
+      }
+      clientFactory = 
ReflectionUtils.newInstance(EncryptionS3ClientFactory.class, conf);
+      // This just creates a factory class. Unencrypted client will only be 
created when the
+      // config is enabled and when it is actually required.
+      unecnryptedClientFactory = 
ReflectionUtils.newInstance(s3ClientFactoryClass, conf);

Review Comment:
   typo



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java:
##########
@@ -1109,6 +1136,44 @@ private ClientManager createClientManager(URI fsURI, 
boolean dtEnabled) throws I
         S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
         S3ClientFactory.class);
 
+    S3ClientFactory clientFactory;
+    S3ClientFactory unecnryptedClientFactory = null;
+    CSEMaterials cseMaterials = null;
+
+    if (isCSEEnabled) {
+      S3AEncryptionMethods algorithm = getS3EncryptionAlgorithm();
+      switch (algorithm) {
+      case CSE_KMS:
+        String kmsKeyId = getS3EncryptionKey(bucket, conf, true);
+        Preconditions.checkArgument(kmsKeyId != null && !kmsKeyId.isEmpty(),
+            "KMS keyId cannot be null or empty");
+        cseMaterials  = new CSEMaterials()
+            .withCSEKeyType(CSEMaterials.CSEKeyType.KMS)
+            .withConf(conf)
+            .withKmsKeyId(kmsKeyId);
+        break;
+      case CSE_CUSTOM:
+        String customCryptoClassName = 
conf.get(S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME);
+        Preconditions.checkArgument(customCryptoClassName != null &&
+                !customCryptoClassName.isEmpty(),
+            "CSE custom cryptographic class name cannot be null or empty");
+        cseMaterials  = new CSEMaterials()
+            .withCSEKeyType(CSEMaterials.CSEKeyType.CUSTOM)
+            .withConf(conf)
+            .withCustomCryptographicClassName(customCryptoClassName);
+        break;
+      default:
+        throw new IllegalArgumentException("Invalid client side encryption 
algorithm."
+            + " Only CSE-KMS and CSE-CUSTOM is supported");

Review Comment:
   "are supported"



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import io.netty.util.internal.StringUtil;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static 
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class CSEUtils {

Review Comment:
   its too late to move S3AEncryptionMethods (and we should also declare that 
public evolviing/unstable, but should we add a .encryption package under .impl?



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws 
Exception {
     }
   }
 
+  /**
+   * Testing how file name with suffix ".instruction" is ignored when CSE is 
enabled
+   * based on configurations.
+   * @throws IOException
+   */
+  @Test
+  public void testEncryptionWithInstructionFile() throws IOException {
+    maybeSkipTest();
+    S3AFileSystem cseDisabledFS = new S3AFileSystem();
+    Configuration cseDisabledConf = getConfiguration();
+    S3AFileSystem cseEnabledFS = getFileSystem();
+    Path unEncryptedFilePath = path(getMethodName());
+    Path unEncryptedFile = new Path(unEncryptedFilePath,
+        "file" + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+
+    // Initialize a CSE disabled FS.
+    removeBaseAndBucketOverrides(getTestBucketName(cseDisabledConf),
+            cseDisabledConf,
+            S3_ENCRYPTION_ALGORITHM,
+            S3_ENCRYPTION_KEY,
+            SERVER_SIDE_ENCRYPTION_ALGORITHM,
+            SERVER_SIDE_ENCRYPTION_KEY);
+    cseDisabledFS.initialize(getFileSystem().getUri(),
+            cseDisabledConf);
+
+    // Unencrypted data written to a path.
+    try (FSDataOutputStream out = cseDisabledFS.create(unEncryptedFile)) {
+      out.write(new byte[SMALL_FILE_SIZE]);
+    }
+
+    // list from cse disabled FS
+    assertEquals("number of files didn't match", 1,

Review Comment:
   use assertJ assert with an explict hasLength() condition



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEMaterials.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This class is for storing information about key type and corresponding key
+ * to be used for client side encryption.
+ */
+public class CSEMaterials {
+  /**
+   * Enum for CSE key types.
+   */
+  public enum CSEKeyType {
+    KMS,

Review Comment:
   javadoc



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import io.netty.util.internal.StringUtil;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static 
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class CSEUtils {
+
+  private CSEUtils() {
+  }
+
+  /**
+   * Checks if the file suffix ends with
+   * {@link 
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX}
+   * when the config
+   * {@link 
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE_DEFAULT}
+   * is enabled and CSE is used.
+   * @param skipCSEInstructionFile whether to skip checking for the filename 
suffix
+   * @param key file name
+   * @return true if cse is disabled or if skipping of instruction file is 
disabled or file name
+   * does not end with defined suffix
+   */
+  public static boolean isCSEInstructionFile(boolean skipCSEInstructionFile, 
String key) {
+    if (!skipCSEInstructionFile) {
+      return true;
+    }
+    return !key.endsWith(S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+  }
+
+  /**
+   * Checks if CSE-KMS or CSE-CUSTOM is set.
+   * @param encryptionMethod type of encryption used
+   * @return true if encryption method is CSE-KMS or CSE-CUSTOM
+   */
+  public static boolean isCSEKmsOrCustom(String encryptionMethod) {
+    return S3AEncryptionMethods.CSE_KMS.getMethod().equals(encryptionMethod) ||
+        S3AEncryptionMethods.CSE_CUSTOM.getMethod().equals(encryptionMethod);
+  }
+
+  /**
+   * Checks if a given S3 object is encrypted or not by checking following two 
conditions
+   * 1. if object metadata contains x-amz-cek-alg
+   * 2. if instruction file is present
+   *
+   * @param s3Client S3 client
+   * @param bucket   bucket name of the s3 object
+   * @param key      key value of the s3 object
+   * @return true if S3 object is encrypted
+   */
+  public static boolean isObjectEncrypted(S3Client s3Client, String bucket, 
String key) {
+    HeadObjectRequest request = HeadObjectRequest.builder()
+        .bucket(bucket)
+        .key(key)
+        .build();
+    HeadObjectResponse headObjectResponse = s3Client.headObject(request);
+    if (headObjectResponse.hasMetadata() &&
+        headObjectResponse.metadata().get(CRYPTO_CEK_ALGORITHM) != null) {
+      return true;
+    }
+    HeadObjectRequest instructionFileCheckRequest = HeadObjectRequest.builder()
+        .bucket(bucket)
+        .key(key + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX)
+        .build();
+    try {
+      s3Client.headObject(instructionFileCheckRequest);
+      return true;
+    } catch (NoSuchKeyException e) {
+      // Ignore. This indicates no instruction file is present
+    }
+    return false;
+  }
+
+  /**
+   * Get the unencrypted object length by either subtracting
+   * {@link InternalConstants#CSE_PADDING_LENGTH} from the object size or 
calculating the
+   * actual size by doing S3 ranged GET operation.
+   *
+   * @param s3Client           S3 client
+   * @param bucket             bucket name of the s3 object
+   * @param key                key value of the s3 object
+   * @param contentLength      S3 object length
+   * @param headObjectResponse response from headObject call
+   * @param cseRangedGetEnabled is ranged get enabled
+   * @param cseReadUnencryptedObjects is reading of une
+   * @return unencrypted length of the object
+   * @throws IOException IO failures
+   */
+  public static long getUnencryptedObjectLength(S3Client s3Client,

Review Comment:
   this is going to have to be pulled out into listing callbacks, executed 
within the same auditspan and updating statistics. if a client is doing *any* 
IO we need to know who and why (server side) and how long (client)



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java:
##########
@@ -736,6 +736,61 @@ private Constants() {
   public static final String S3_ENCRYPTION_KEY =
       "fs.s3a.encryption.key";
 
+  /**
+   * Client side encryption (CSE-CUSTOM) with custom cryptographic material 
manager class name.

Review Comment:
   add {@value} tags on all of these



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import io.netty.util.internal.StringUtil;
+import org.apache.hadoop.classification.InterfaceAudience;

Review Comment:
   ordering



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws 
Exception {
     }
   }
 
+  /**
+   * Testing how file name with suffix ".instruction" is ignored when CSE is 
enabled
+   * based on configurations.
+   * @throws IOException
+   */
+  @Test
+  public void testEncryptionWithInstructionFile() throws IOException {
+    maybeSkipTest();
+    S3AFileSystem cseDisabledFS = new S3AFileSystem();

Review Comment:
   1. use createTestFileSystem()
   2. needs to be cleaned up in finally



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws 
Exception {
     }
   }
 
+  /**
+   * Testing how file name with suffix ".instruction" is ignored when CSE is 
enabled
+   * based on configurations.
+   * @throws IOException
+   */
+  @Test
+  public void testEncryptionWithInstructionFile() throws IOException {
+    maybeSkipTest();
+    S3AFileSystem cseDisabledFS = new S3AFileSystem();
+    Configuration cseDisabledConf = getConfiguration();
+    S3AFileSystem cseEnabledFS = getFileSystem();
+    Path unEncryptedFilePath = path(getMethodName());
+    Path unEncryptedFile = new Path(unEncryptedFilePath,
+        "file" + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+
+    // Initialize a CSE disabled FS.
+    removeBaseAndBucketOverrides(getTestBucketName(cseDisabledConf),
+            cseDisabledConf,
+            S3_ENCRYPTION_ALGORITHM,
+            S3_ENCRYPTION_KEY,
+            SERVER_SIDE_ENCRYPTION_ALGORITHM,
+            SERVER_SIDE_ENCRYPTION_KEY);
+    cseDisabledFS.initialize(getFileSystem().getUri(),
+            cseDisabledConf);
+
+    // Unencrypted data written to a path.
+    try (FSDataOutputStream out = cseDisabledFS.create(unEncryptedFile)) {
+      out.write(new byte[SMALL_FILE_SIZE]);
+    }
+
+    // list from cse disabled FS
+    assertEquals("number of files didn't match", 1,
+            cseDisabledFS.listStatus(unEncryptedFilePath).length);
+
+    // list from cse enabled fs with skipping of instruction file
+    cseEnabledFS.getConf().setBoolean(S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE, 
true);
+    cseEnabledFS.initialize(getFileSystem().getUri(), cseEnabledFS.getConf());
+    assertEquals("number of files didn't match", 0,
+            cseEnabledFS.listStatus(unEncryptedFilePath).length);
+
+    // disable skipping cse instruction file.
+    cseEnabledFS.getConf().setBoolean(S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE, 
false);
+    cseEnabledFS.initialize(getFileSystem().getUri(), cseEnabledFS.getConf());
+    assertEquals("number of files didn't match", 1,
+            cseEnabledFS.listStatus(unEncryptedFilePath).length);
+  }
+
+  /**
+   * Testing how unencrypted length is calculated/fetched under different 
circumstances
+   * when CSE is enabled.
+   * @throws IOException
+   */
+  @Test
+  public void testEncryptionWithDifferentWaysForCalculatingUnencryptedLength() 
throws IOException {
+    maybeSkipTest();
+    S3AFileSystem cseEnabledFS = getFileSystem();
+    Path filePath = path(getMethodName());
+    String key = cseEnabledFS.pathToKey(filePath);
+
+    // write object with unencrypted with random content length header
+    Map<String, String> metadata = new HashMap<>();
+    metadata.put(AWSHeaders.UNENCRYPTED_CONTENT_LENGTH, "10");
+    try (AuditSpan span = span()) {
+      PutObjectRequest request = PutObjectRequest.builder()

Review Comment:
   use RequestFactory through S3AStore



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;

Review Comment:
   import ordering on new files. always



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws 
Exception {
     }
   }
 
+  /**
+   * Testing how file name with suffix ".instruction" is ignored when CSE is 
enabled
+   * based on configurations.
+   * @throws IOException
+   */
+  @Test
+  public void testEncryptionWithInstructionFile() throws IOException {
+    maybeSkipTest();
+    S3AFileSystem cseDisabledFS = new S3AFileSystem();
+    Configuration cseDisabledConf = getConfiguration();

Review Comment:
   new Configuration(getConfiguration())



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static 
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {

Review Comment:
   javadocs. always. save time on my reviewing and you waiting for the reviews



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws 
Exception {
     }
   }
 
+  /**
+   * Testing how file name with suffix ".instruction" is ignored when CSE is 
enabled
+   * based on configurations.
+   * @throws IOException
+   */
+  @Test
+  public void testEncryptionWithInstructionFile() throws IOException {
+    maybeSkipTest();
+    S3AFileSystem cseDisabledFS = new S3AFileSystem();
+    Configuration cseDisabledConf = getConfiguration();
+    S3AFileSystem cseEnabledFS = getFileSystem();
+    Path unEncryptedFilePath = path(getMethodName());
+    Path unEncryptedFile = new Path(unEncryptedFilePath,
+        "file" + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+
+    // Initialize a CSE disabled FS.
+    removeBaseAndBucketOverrides(getTestBucketName(cseDisabledConf),
+            cseDisabledConf,
+            S3_ENCRYPTION_ALGORITHM,
+            S3_ENCRYPTION_KEY,
+            SERVER_SIDE_ENCRYPTION_ALGORITHM,
+            SERVER_SIDE_ENCRYPTION_KEY);
+    cseDisabledFS.initialize(getFileSystem().getUri(),
+            cseDisabledConf);
+
+    // Unencrypted data written to a path.
+    try (FSDataOutputStream out = cseDisabledFS.create(unEncryptedFile)) {
+      out.write(new byte[SMALL_FILE_SIZE]);
+    }
+
+    // list from cse disabled FS
+    assertEquals("number of files didn't match", 1,
+            cseDisabledFS.listStatus(unEncryptedFilePath).length);
+
+    // list from cse enabled fs with skipping of instruction file
+    cseEnabledFS.getConf().setBoolean(S3_ENCRYPTION_CSE_SKIP_INSTRUCTION_FILE, 
true);

Review Comment:
   and now you are trying to patch the settings of the default fs and call 
initialize again. Do not do this. In fact, if we don't have a check to stop 
initialize() being called twice -we should



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryptionCustom.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.AWSHeaders;
+import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
+
+public class ITestS3AClientSideEncryptionCustom extends 
ITestS3AClientSideEncryption {

Review Comment:
   javadoc



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java:
##########
@@ -153,11 +154,16 @@ public S3AsyncClient createS3AsyncClient(
         .thresholdInBytes(parameters.getMultiPartThreshold())
         .build();
 
-    return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, 
bucket)
-        .httpClientBuilder(httpClientBuilder)
-        .multipartConfiguration(multipartConfiguration)
-        .multipartEnabled(parameters.isMultipartCopy())
-        .build();
+    S3AsyncClientBuilder s3AsyncClientBuilder =
+            configureClientBuilder(S3AsyncClient.builder(), parameters, conf, 
bucket)
+                .httpClientBuilder(httpClientBuilder);
+
+    if (!parameters.isClientSideEncryptionEnabled()) {

Review Comment:
   shouldn't this be part of  configureClientBuilder?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static 
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+  private static final String ENCRYPTION_CLIENT_CLASSNAME =
+      "software.amazon.encryption.s3.S3EncryptionClient";
+
+  /**
+   * Encryption client availability.
+   */
+  private static final boolean ENCRYPTION_CLIENT_FOUND = 
checkForEncryptionClient();
+
+  /**
+   * S3Client to be wrapped by encryption client.
+   */
+  private S3Client s3Client;
+
+  /**
+   * S3AsyncClient to be wrapped by encryption client.
+   */
+  private S3AsyncClient s3AsyncClient;
+
+  private static boolean checkForEncryptionClient() {
+    try {
+      ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+      cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+      LOG.debug("encryption client class {} found", 
ENCRYPTION_CLIENT_CLASSNAME);
+      return true;
+    } catch (Exception e) {
+      LOG.debug("encryption client class {} not found", 
ENCRYPTION_CLIENT_CLASSNAME, e);
+      return false;
+    }
+  }
+
+  /**
+   * Is the Encryption client available?
+   * @return true if it was found in the classloader
+   */
+  private static synchronized boolean isEncryptionClientAvailable() {
+    return ENCRYPTION_CLIENT_FOUND;
+  }
+
+  /**
+   * Create encrypted s3 client.
+   * @param uri S3A file system URI
+   * @param parameters parameter object
+   * @return encrypted s3 client
+   * @throws IOException IO failures
+   */
+  @Override
+  public S3Client createS3Client(URI uri, S3ClientCreationParameters 
parameters)
+      throws IOException {
+    if (!isEncryptionClientAvailable()) {
+      throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+          "No encryption client available");
+    }
+
+    s3Client = super.createS3Client(uri, parameters);
+    s3AsyncClient = super.createS3AsyncClient(uri, parameters);
+
+    return 
createS3EncryptionClient(parameters.getClientSideEncryptionMaterials());
+  }
+
+  /**
+   * Create async encrypted s3 client.

Review Comment:
   note that it only works if createS3Client was called first



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static 
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+  private static final String ENCRYPTION_CLIENT_CLASSNAME =
+      "software.amazon.encryption.s3.S3EncryptionClient";
+
+  /**
+   * Encryption client availability.
+   */
+  private static final boolean ENCRYPTION_CLIENT_FOUND = 
checkForEncryptionClient();
+
+  /**
+   * S3Client to be wrapped by encryption client.
+   */
+  private S3Client s3Client;
+
+  /**
+   * S3AsyncClient to be wrapped by encryption client.
+   */
+  private S3AsyncClient s3AsyncClient;
+
+  private static boolean checkForEncryptionClient() {
+    try {
+      ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+      cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+      LOG.debug("encryption client class {} found", 
ENCRYPTION_CLIENT_CLASSNAME);
+      return true;
+    } catch (Exception e) {
+      LOG.debug("encryption client class {} not found", 
ENCRYPTION_CLIENT_CLASSNAME, e);
+      return false;
+    }
+  }
+
+  /**
+   * Is the Encryption client available?
+   * @return true if it was found in the classloader
+   */
+  private static synchronized boolean isEncryptionClientAvailable() {
+    return ENCRYPTION_CLIENT_FOUND;
+  }
+
+  /**
+   * Create encrypted s3 client.
+   * @param uri S3A file system URI
+   * @param parameters parameter object
+   * @return encrypted s3 client
+   * @throws IOException IO failures
+   */
+  @Override
+  public S3Client createS3Client(URI uri, S3ClientCreationParameters 
parameters)
+      throws IOException {
+    if (!isEncryptionClientAvailable()) {
+      throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+          "No encryption client available");
+    }
+
+    s3Client = super.createS3Client(uri, parameters);
+    s3AsyncClient = super.createS3AsyncClient(uri, parameters);
+
+    return 
createS3EncryptionClient(parameters.getClientSideEncryptionMaterials());
+  }
+
+  /**
+   * Create async encrypted s3 client.
+   * @param uri S3A file system URI
+   * @param parameters parameter object
+   * @return async encrypted s3 client
+   * @throws IOException IO failures
+   */
+  @Override
+  public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters 
parameters)
+      throws IOException {
+    if (!isEncryptionClientAvailable()) {
+      throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+          "No encryption client available");
+    }
+    return 
createS3AsyncEncryptionClient(parameters.getClientSideEncryptionMaterials());
+  }
+
+  private S3Client createS3EncryptionClient(final CSEMaterials cseMaterials) {
+    S3EncryptionClient.Builder s3EncryptionClientBuilder =
+        
S3EncryptionClient.builder().wrappedAsyncClient(s3AsyncClient).wrappedClient(s3Client)
+            // this is required for doing S3 ranged GET calls
+            .enableLegacyUnauthenticatedModes(true)
+            // this is required for backward compatibility with older 
encryption clients
+            .enableLegacyWrappingAlgorithms(true);
+
+    switch (cseMaterials.getCseKeyType()) {
+    case KMS:
+      s3EncryptionClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
+      break;
+    case CUSTOM:
+      Keyring keyring = 
getKeyringProvider(cseMaterials.getCustomKeyringClassName(),
+          cseMaterials.getConf());
+      CryptographicMaterialsManager cmm =  
DefaultCryptoMaterialsManager.builder()
+          .keyring(keyring)
+          .build();
+      s3EncryptionClientBuilder.cryptoMaterialsManager(cmm);
+      break;
+    default:
+      break;
+    }
+
+    return s3EncryptionClientBuilder.build();
+  }
+
+  private S3AsyncClient createS3AsyncEncryptionClient(final CSEMaterials 
cseMaterials) {
+    S3AsyncEncryptionClient.Builder s3EncryptionAsyncClientBuilder =
+        S3AsyncEncryptionClient.builder().wrappedClient(s3AsyncClient)
+            // this is required for doing S3 ranged GET calls
+            .enableLegacyUnauthenticatedModes(true)
+            // this is required for backward compatibility with older 
encryption clients
+            .enableLegacyWrappingAlgorithms(true);
+
+    switch (cseMaterials.getCseKeyType()) {
+    case KMS:
+      s3EncryptionAsyncClientBuilder.kmsKeyId(cseMaterials.getKmsKeyId());
+      break;
+    case CUSTOM:
+      Keyring keyring = 
getKeyringProvider(cseMaterials.getCustomKeyringClassName(),
+          cseMaterials.getConf());
+      CryptographicMaterialsManager cmm =  
DefaultCryptoMaterialsManager.builder()
+          .keyring(keyring)
+          .build();
+      s3EncryptionAsyncClientBuilder.cryptoMaterialsManager(cmm);
+      break;
+    default:
+      break;
+    }
+
+    return s3EncryptionAsyncClientBuilder.build();
+  }
+
+  /**
+   * Get the custom Keyring class.
+   * @param className
+   * @param conf
+   * @return
+   */
+  private Keyring getKeyringProvider(String className,
+      Configuration conf) {
+    try {
+      return 
ReflectionUtils.newInstance(getCustomKeyringProviderClass(className), conf);
+    } catch (Exception e) {
+      // this is for testing purpose to support CustomKeyring.java
+      return 
ReflectionUtils.newInstance(getCustomKeyringProviderClass(className), conf,
+          new Class[] {Configuration.class}, conf);
+    }
+  }
+
+  private Class<? extends Keyring> getCustomKeyringProviderClass(String 
className) {
+    if (Strings.isNullOrEmpty(className)) {
+      throw new IllegalArgumentException(

Review Comment:
   Preconditions.checkArgument()



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws 
Exception {
     }
   }
 
+  /**
+   * Testing how file name with suffix ".instruction" is ignored when CSE is 
enabled
+   * based on configurations.
+   * @throws IOException
+   */
+  @Test
+  public void testEncryptionWithInstructionFile() throws IOException {
+    maybeSkipTest();
+    S3AFileSystem cseDisabledFS = new S3AFileSystem();
+    Configuration cseDisabledConf = getConfiguration();
+    S3AFileSystem cseEnabledFS = getFileSystem();

Review Comment:
   this is ref to the normal fs



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static 
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+  private static final String ENCRYPTION_CLIENT_CLASSNAME =
+      "software.amazon.encryption.s3.S3EncryptionClient";
+
+  /**
+   * Encryption client availability.
+   */
+  private static final boolean ENCRYPTION_CLIENT_FOUND = 
checkForEncryptionClient();
+
+  /**
+   * S3Client to be wrapped by encryption client.
+   */
+  private S3Client s3Client;
+
+  /**
+   * S3AsyncClient to be wrapped by encryption client.
+   */
+  private S3AsyncClient s3AsyncClient;
+
+  private static boolean checkForEncryptionClient() {
+    try {
+      ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+      cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+      LOG.debug("encryption client class {} found", 
ENCRYPTION_CLIENT_CLASSNAME);
+      return true;
+    } catch (Exception e) {
+      LOG.debug("encryption client class {} not found", 
ENCRYPTION_CLIENT_CLASSNAME, e);
+      return false;
+    }
+  }
+
+  /**
+   * Is the Encryption client available?
+   * @return true if it was found in the classloader
+   */
+  private static synchronized boolean isEncryptionClientAvailable() {
+    return ENCRYPTION_CLIENT_FOUND;
+  }
+
+  /**
+   * Create encrypted s3 client.
+   * @param uri S3A file system URI
+   * @param parameters parameter object
+   * @return encrypted s3 client
+   * @throws IOException IO failures
+   */
+  @Override
+  public S3Client createS3Client(URI uri, S3ClientCreationParameters 
parameters)

Review Comment:
   So we need to create both of these clients before creating encryption code?
   
   I am worried that this class now has an implicit expectation that the 
synchronous S3 client is always created before the async one. 
   1. javadocs MUST State this.
   2. async code must use Preconditions check to fail meaningfully if it is 
ever called before/without the sync client being created.
   
   What will happen if/when we move to async only?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/EncryptionS3ClientFactory.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.encryption.s3.S3AsyncEncryptionClient;
+import software.amazon.encryption.s3.S3EncryptionClient;
+import software.amazon.encryption.s3.materials.CryptographicMaterialsManager;
+import software.amazon.encryption.s3.materials.DefaultCryptoMaterialsManager;
+import software.amazon.encryption.s3.materials.Keyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.impl.CSEMaterials;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import static 
org.apache.hadoop.fs.s3a.impl.InstantiationIOException.unavailable;
+
+public class EncryptionS3ClientFactory extends DefaultS3ClientFactory {
+
+  private static final String ENCRYPTION_CLIENT_CLASSNAME =
+      "software.amazon.encryption.s3.S3EncryptionClient";
+
+  /**
+   * Encryption client availability.
+   */
+  private static final boolean ENCRYPTION_CLIENT_FOUND = 
checkForEncryptionClient();
+
+  /**
+   * S3Client to be wrapped by encryption client.
+   */
+  private S3Client s3Client;
+
+  /**
+   * S3AsyncClient to be wrapped by encryption client.
+   */
+  private S3AsyncClient s3AsyncClient;
+
+  private static boolean checkForEncryptionClient() {
+    try {
+      ClassLoader cl = EncryptionS3ClientFactory.class.getClassLoader();
+      cl.loadClass(ENCRYPTION_CLIENT_CLASSNAME);
+      LOG.debug("encryption client class {} found", 
ENCRYPTION_CLIENT_CLASSNAME);
+      return true;
+    } catch (Exception e) {
+      LOG.debug("encryption client class {} not found", 
ENCRYPTION_CLIENT_CLASSNAME, e);
+      return false;
+    }
+  }
+
+  /**
+   * Is the Encryption client available?
+   * @return true if it was found in the classloader
+   */
+  private static synchronized boolean isEncryptionClientAvailable() {
+    return ENCRYPTION_CLIENT_FOUND;
+  }
+
+  /**
+   * Create encrypted s3 client.
+   * @param uri S3A file system URI
+   * @param parameters parameter object
+   * @return encrypted s3 client
+   * @throws IOException IO failures
+   */
+  @Override
+  public S3Client createS3Client(URI uri, S3ClientCreationParameters 
parameters)
+      throws IOException {
+    if (!isEncryptionClientAvailable()) {
+      throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+          "No encryption client available");
+    }
+
+    s3Client = super.createS3Client(uri, parameters);
+    s3AsyncClient = super.createS3AsyncClient(uri, parameters);
+
+    return 
createS3EncryptionClient(parameters.getClientSideEncryptionMaterials());
+  }
+
+  /**
+   * Create async encrypted s3 client.
+   * @param uri S3A file system URI
+   * @param parameters parameter object
+   * @return async encrypted s3 client
+   * @throws IOException IO failures
+   */
+  @Override
+  public S3AsyncClient createS3AsyncClient(URI uri, S3ClientCreationParameters 
parameters)
+      throws IOException {
+    if (!isEncryptionClientAvailable()) {
+      throw unavailable(uri, ENCRYPTION_CLIENT_CLASSNAME, null,
+          "No encryption client available");
+    }
+    return 
createS3AsyncEncryptionClient(parameters.getClientSideEncryptionMaterials());
+  }
+
+  private S3Client createS3EncryptionClient(final CSEMaterials cseMaterials) {

Review Comment:
   javadocs



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -726,9 +742,11 @@ public void close() {
    */
   static class AcceptFilesOnly implements FileStatusAcceptor {

Review Comment:
   create a new subclass to perform the skipping, choose which one to pass down



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java:
##########
@@ -876,11 +877,15 @@ private void uploadBlockAsync(final 
S3ADataBlocks.DataBlock block,
             ? RequestBody.fromFile(uploadData.getFile())
             : RequestBody.fromInputStream(uploadData.getUploadStream(), size);
 
-        request = writeOperationHelper.newUploadPartRequestBuilder(
+        UploadPartRequest.Builder requestBuilder = 
writeOperationHelper.newUploadPartRequestBuilder(
             key,
             uploadId,
             currentPartNumber,
-            size).build();
+            size);
+        if (isLast) {

Review Comment:
   see CommitOperations



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java:
##########
@@ -1409,7 +1409,7 @@ public interface InputStreamCallbacks extends Closeable {
      * @return the response
      */
     @Retries.OnceRaw
-    ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request);
+    ResponseInputStream<GetObjectResponse> getObject(GetObjectRequest request) 
throws IOException;

Review Comment:
   update javadocs to explain change



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +288,100 @@ public void testEncryptionEnabledAndDisabledFS() throws 
Exception {
     }
   }
 
+  /**
+   * Testing how file name with suffix ".instruction" is ignored when CSE is 
enabled
+   * based on configurations.
+   * @throws IOException
+   */
+  @Test
+  public void testEncryptionWithInstructionFile() throws IOException {
+    maybeSkipTest();
+    S3AFileSystem cseDisabledFS = new S3AFileSystem();
+    Configuration cseDisabledConf = getConfiguration();
+    S3AFileSystem cseEnabledFS = getFileSystem();
+    Path unEncryptedFilePath = path(getMethodName());
+    Path unEncryptedFile = new Path(unEncryptedFilePath,
+        "file" + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+
+    // Initialize a CSE disabled FS.
+    removeBaseAndBucketOverrides(getTestBucketName(cseDisabledConf),
+            cseDisabledConf,
+            S3_ENCRYPTION_ALGORITHM,
+            S3_ENCRYPTION_KEY,
+            SERVER_SIDE_ENCRYPTION_ALGORITHM,
+            SERVER_SIDE_ENCRYPTION_KEY);
+    cseDisabledFS.initialize(getFileSystem().getUri(),
+            cseDisabledConf);
+
+    // Unencrypted data written to a path.
+    try (FSDataOutputStream out = cseDisabledFS.create(unEncryptedFile)) {

Review Comment:
   ContractTestUtils has a method for this



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ClientFactory.java:
##########
@@ -118,6 +119,13 @@ final class S3ClientCreationParameters {
      */
     private StatisticsFromAwsSdk metrics;
 
+    private Boolean isCSEEnabled = false;

Review Comment:
   javadoc



##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md:
##########
@@ -674,10 +674,18 @@ clients where S3-CSE has not been enabled.
 
 ### Features
 


> AWS SDK V2 - Implement CSE
> --------------------------
>
>                 Key: HADOOP-18708
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18708
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.0
>            Reporter: Ahmar Suhail
>            Assignee: Syed Shameerur Rahman
>            Priority: Major
>              Labels: pull-request-available
>
> S3 Encryption client for SDK V2 is now available, so add client side 
> encryption back in. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to