[ 
https://issues.apache.org/jira/browse/HADOOP-18708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17893532#comment-17893532
 ] 

ASF GitHub Bot commented on HADOOP-18708:
-----------------------------------------

steveloughran commented on code in PR #6884:
URL: https://github.com/apache/hadoop/pull/6884#discussion_r1819181162


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BaseS3AFileSystemHandler.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Listing;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.DEFAULT_S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
+import static 
org.apache.hadoop.fs.s3a.Statistic.CLIENT_SIDE_ENCRYPTION_ENABLED;
+
+/**
+ * An implementation of the {@link S3AFileSystemHandler} interface.
+ * This handles certain filesystem operations when s3 client side encryption 
is disabled.
+ */
+public class BaseS3AFileSystemHandler implements S3AFileSystemHandler {
+
+  /**
+   * Constructs a new instance of {@code BaseS3AFileSystemHandler}.
+   */
+  public BaseS3AFileSystemHandler() {
+  }
+
+  /**
+   * Returns a {@link Listing.FileStatusAcceptor} object.
+   * That determines which files and directories should be included in a 
listing operation.
+   *
+   * @param path         the path for which the listing is being performed
+   * @param includeSelf  a boolean indicating whether the path itself should
+   *                     be included in the listing
+   * @return a {@link Listing.FileStatusAcceptor} object
+   */
+  @Override
+  public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path, boolean 
includeSelf) {
+    return includeSelf
+        ? Listing.ACCEPT_ALL_BUT_S3N
+        : new Listing.AcceptAllButSelfAndS3nDirs(path);
+  }
+
+  /**
+   * Returns a {@link Listing.FileStatusAcceptor} object.
+   * That determines which files and directories should be included in a 
listing operation.
+   *
+   * @param path the path for which the listing is being performed
+   * @return a {@link Listing.FileStatusAcceptor} object
+   */
+  @Override
+  public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path) {
+    return new Listing.AcceptFilesOnly(path);
+  }
+
+  /**
+   * Retrieves an object from the S3.
+   *
+   * @param store   The S3AStore object representing the S3 bucket.
+   * @param request The GetObjectRequest containing the details of the object 
to retrieve.
+   * @param factory The RequestFactory used to create the GetObjectRequest.
+   * @return A ResponseInputStream containing the GetObjectResponse.
+   * @throws IOException If an error occurs while retrieving the object.
+   */
+  @Override
+  public ResponseInputStream<GetObjectResponse> getObject(S3AStore store, 
GetObjectRequest request,

Review Comment:
   move send arg onto next line



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/BaseS3AFileSystemHandler.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Listing;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.DEFAULT_S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
+import static 
org.apache.hadoop.fs.s3a.Statistic.CLIENT_SIDE_ENCRYPTION_ENABLED;
+
+/**
+ * An implementation of the {@link S3AFileSystemHandler} interface.
+ * This handles certain filesystem operations when s3 client side encryption 
is disabled.
+ */
+public class BaseS3AFileSystemHandler implements S3AFileSystemHandler {
+
+  /**
+   * Constructs a new instance of {@code BaseS3AFileSystemHandler}.
+   */
+  public BaseS3AFileSystemHandler() {
+  }
+
+  /**
+   * Returns a {@link Listing.FileStatusAcceptor} object.
+   * That determines which files and directories should be included in a 
listing operation.
+   *
+   * @param path         the path for which the listing is being performed
+   * @param includeSelf  a boolean indicating whether the path itself should
+   *                     be included in the listing
+   * @return a {@link Listing.FileStatusAcceptor} object
+   */
+  @Override
+  public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path, boolean 
includeSelf) {
+    return includeSelf
+        ? Listing.ACCEPT_ALL_BUT_S3N
+        : new Listing.AcceptAllButSelfAndS3nDirs(path);
+  }
+
+  /**
+   * Returns a {@link Listing.FileStatusAcceptor} object.
+   * That determines which files and directories should be included in a 
listing operation.
+   *
+   * @param path the path for which the listing is being performed
+   * @return a {@link Listing.FileStatusAcceptor} object
+   */
+  @Override
+  public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path) {
+    return new Listing.AcceptFilesOnly(path);
+  }
+
+  /**
+   * Retrieves an object from the S3.
+   *
+   * @param store   The S3AStore object representing the S3 bucket.
+   * @param request The GetObjectRequest containing the details of the object 
to retrieve.
+   * @param factory The RequestFactory used to create the GetObjectRequest.
+   * @return A ResponseInputStream containing the GetObjectResponse.
+   * @throws IOException If an error occurs while retrieving the object.
+   */
+  @Override
+  public ResponseInputStream<GetObjectResponse> getObject(S3AStore store, 
GetObjectRequest request,
+      RequestFactory factory) throws IOException {
+    return store.getOrCreateS3Client().getObject(request);
+  }
+
+  /**
+   * Set the client side encryption gauge to 0.
+   * @param ioStatisticsStore The IOStatisticsStore of the filesystem.
+   */
+  @Override
+  public void setCSEGauge(IOStatisticsStore ioStatisticsStore) {
+    ioStatisticsStore.setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 0L);
+  }
+
+  /**
+   * Retrieves the client-side encryption materials for the given bucket and 
encryption algorithm.

Review Comment:
   nit: line width



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.util.Preconditions;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.CSE_CUSTOM;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.CSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static 
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
+@InterfaceAudience.Public

Review Comment:
   private



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java:
##########
@@ -373,7 +380,7 @@ private <BuilderT extends S3BaseClientBuilder<BuilderT, 
ClientT>, ClientT> void
    * @param conf config to build the URI from.
    * @return an endpoint uri
    */
-  private static URI getS3Endpoint(String endpoint, final Configuration conf) {
+  public static URI getS3Endpoint(String endpoint, final Configuration conf) {

Review Comment:
   make protected



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -464,9 +466,10 @@ private boolean buildNextStatusBatch(S3ListResult objects) 
{
         // Skip over keys that are ourselves and old S3N _$folder$ files
         if (acceptor.accept(keyPath, s3Object) && filter.accept(keyPath)) {
           S3AFileStatus status = createFileStatus(keyPath, s3Object,
-                  listingOperationCallbacks.getDefaultBlockSize(keyPath),
-                  getStoreContext().getUsername(),
-                  s3Object.eTag(), null, isCSEEnabled);
+              listingOperationCallbacks.getDefaultBlockSize(keyPath),

Review Comment:
   let move username and blocksize retrievel out of the for() loop...they don't 
change after all



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -763,6 +766,61 @@ public boolean accept(FileStatus status) {
     }
   }
 
+  /**
+   * Accept all entries except the base path and those which map to S3N
+   * pseudo directory markers and CSE instruction file.
+   */
+  public static class AcceptFilesOnlyExceptCSEInstructionFile implements 
FileStatusAcceptor {

Review Comment:
   * cut the S3N support in this one, and make the class final
   * move to CSEV1CompatibleS3AFileSystemHandler class



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +272,211 @@ public void testEncryptionEnabledAndDisabledFS() throws 
Exception {
     }
   }
 
+  /**
+   * Test to check if unencrypted objects are read with V1 client 
compatibility.
+   * @throws IOException
+   * @throws Exception
+   */
+  @Test
+  public void testUnencryptedObjectReadWithV1CompatibilityConfig() throws 
Exception {
+    maybeSkipTest();
+    // initialize base s3 client.
+    Configuration conf = new Configuration(getConfiguration());
+    S3AFileSystem nonCseFs = createTestFileSystem(conf);
+    removeBaseAndBucketOverrides(getTestBucketName(conf),
+        conf,
+        S3_ENCRYPTION_ALGORITHM,
+        S3_ENCRYPTION_KEY,
+        SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        SERVER_SIDE_ENCRYPTION_KEY);
+    nonCseFs.initialize(getFileSystem().getUri(), conf);

Review Comment:
   try/close clause must be active from here



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +272,211 @@ public void testEncryptionEnabledAndDisabledFS() throws 
Exception {
     }
   }
 
+  /**
+   * Test to check if unencrypted objects are read with V1 client 
compatibility.
+   * @throws IOException
+   * @throws Exception
+   */
+  @Test
+  public void testUnencryptedObjectReadWithV1CompatibilityConfig() throws 
Exception {
+    maybeSkipTest();
+    // initialize base s3 client.
+    Configuration conf = new Configuration(getConfiguration());
+    S3AFileSystem nonCseFs = createTestFileSystem(conf);
+    removeBaseAndBucketOverrides(getTestBucketName(conf),
+        conf,
+        S3_ENCRYPTION_ALGORITHM,
+        S3_ENCRYPTION_KEY,
+        SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        SERVER_SIDE_ENCRYPTION_KEY);
+    nonCseFs.initialize(getFileSystem().getUri(), conf);
+
+    Path file = path(getMethodName());
+    // write unencrypted file
+    ContractTestUtils.writeDataset(nonCseFs, file, new byte[SMALL_FILE_SIZE],
+        SMALL_FILE_SIZE, SMALL_FILE_SIZE, true);
+
+    Configuration cseConf = new Configuration(getConfiguration());
+    cseConf.setBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, true);
+    // create filesystem with cse enabled and v1 compatibility.
+    S3AFileSystem cseFs = createTestFileSystem(cseConf);
+    cseFs.initialize(getFileSystem().getUri(), cseConf);
+
+    // read unencrypted file. It should not throw any exception.
+    try (FSDataInputStream in = cseFs.open(file)) {
+      in.read(new byte[SMALL_FILE_SIZE]);
+    } finally {
+      // close the filesystem
+      nonCseFs.close();
+      cseFs.close();
+    }
+  }
+
+  /**
+   * Test to check if file name with suffix ".instruction" is ignored with V1 
compatibility.
+   * @throws IOException
+   */
+  @Test
+  public void testSkippingCSEInstructionFileWithV1Compatibility() throws 
IOException {

Review Comment:
   where you should add the new delete and rename tests. create the file in 
subdir 1 under methodPath, rename it, verify source dir is gone. delete rename 
target dir, again assert not found.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ClientManager.java:
##########
@@ -61,6 +61,8 @@ S3TransferManager getOrCreateTransferManager()
    */
   S3AsyncClient getOrCreateAsyncClient() throws IOException;
 
+  S3Client getOrCreateUnencryptedS3Client() throws IOException;

Review Comment:
   pull javadocs up from implementation



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AFileSystemHandler.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Listing;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+/**
+ * An interface that defines the contract for handling certain filesystem 
operations.
+ */
+public interface S3AFileSystemHandler {

Review Comment:
   Not sure this is the right name, but I can't think of anything obvious to 
use instead. 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.util.Preconditions;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.CSE_CUSTOM;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.CSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static 
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class CSEUtils {
+
+  private CSEUtils() {
+  }
+
+  /**
+   * Checks if the file suffix ends CSE file suffix.
+   * {@link 
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX}
+   * when the config
+   * @param key file name
+   * @return true if file name ends with CSE instruction file suffix
+   */
+  public static boolean isCSEInstructionFile(String key) {
+    return key.endsWith(S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+  }
+
+  /**
+   * Checks if CSE-KMS or CSE-CUSTOM is set.
+   * @param encryptionMethod type of encryption used
+   * @return true if encryption method is CSE-KMS or CSE-CUSTOM
+   */
+  public static boolean isCSEEnabled(String encryptionMethod) {
+    return CSE_KMS.getMethod().equals(encryptionMethod) ||
+        CSE_CUSTOM.getMethod().equals(encryptionMethod);
+  }
+
+  /**
+   * Checks if a given S3 object is encrypted or not by checking following two 
conditions
+   * 1. if object metadata contains x-amz-cek-alg
+   * 2. if instruction file is present
+   *
+   * @param s3Client S3 client
+   * @param factory   S3 request factory
+   * @param key      key value of the s3 object
+   * @return true if S3 object is encrypted
+   */
+  public static boolean isObjectEncrypted(S3Client s3Client, RequestFactory 
factory, String key) {

Review Comment:
   have it take S3AStore, so when we push all s3 api calls into the interface, 
it'll be easier to move. 



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEV1CompatibleS3AFileSystemHandler.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Listing;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.DEFAULT_S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.impl.CSEUtils.isObjectEncrypted;
+
+/**
+ * An extension of the {@link CSES3AFileSystemHandler} class.
+ * This handles certain file system operations when client-side encryption is 
enabled with v1 client
+ * compatibility.
+ * {@link 
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED}.
+ */
+public class CSEV1CompatibleS3AFileSystemHandler extends 
CSES3AFileSystemHandler {
+
+  /**
+   * Constructs a new instance of {@code CSEV1CompatibleS3AFileSystemHandler}.
+   */
+  public CSEV1CompatibleS3AFileSystemHandler() {
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>This implementation returns a {@link 
Listing.AcceptAllButS3nDirsAndCSEInstructionFile}
+   * or {@link Listing.AcceptAllButSelfAndS3nDirsAndCSEInstructionFile} object
+   * based on the value of the {@code includeSelf} parameter.
+   */
+  @Override
+  public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path, boolean 
includeSelf) {
+    return includeSelf
+        ? new Listing.AcceptAllButS3nDirsAndCSEInstructionFile()
+        : new Listing.AcceptAllButSelfAndS3nDirsAndCSEInstructionFile(path);
+  }
+
+  /**
+   * Returns a {@link Listing.FileStatusAcceptor} object.
+   * That determines which files and directories should be included in a 
listing operation.
+   *
+   * @param path the path for which the listing is being performed
+   * @return a {@link Listing.FileStatusAcceptor} object
+   */
+  @Override
+  public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path) {
+    return new Listing.AcceptFilesOnlyExceptCSEInstructionFile(path);
+  }
+
+  /**
+   * Retrieves an object from the S3.
+   * If the S3 object is encrypted, it uses the encrypted S3 client to 
retrieve the object else
+   * it uses the unencrypted S3 client.
+   *
+   * @param store   The S3AStore object representing the S3 bucket.
+   * @param request The GetObjectRequest containing the details of the object 
to retrieve.
+   * @param factory The RequestFactory used to create the GetObjectRequest.
+   * @return A ResponseInputStream containing the GetObjectResponse.
+   * @throws IOException If an error occurs while retrieving the object.
+   */
+  @Override
+  public ResponseInputStream<GetObjectResponse> getObject(S3AStore store, 
GetObjectRequest request,
+      RequestFactory factory) throws IOException {
+    boolean isEncrypted = isObjectEncrypted(store.getOrCreateS3Client(), 
factory, request.key());
+    return isEncrypted ? store.getOrCreateS3Client().getObject(request)
+        : store.getOrCreateUnencryptedS3Client().getObject(request);
+  }
+
+  /**
+   * Retrieves the S3 client factory for the specified class and configuration.
+   *
+   * @param conf  The Hadoop configuration object.
+   * @return The S3 client factory instance.
+   */
+  @Override
+  public S3ClientFactory getUnencryptedS3ClientFactory(Configuration conf) {
+    Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
+        S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
+        S3ClientFactory.class);
+    return ReflectionUtils.newInstance(s3ClientFactoryClass, conf);
+  }
+
+
+  /**
+   * Retrieves the unpadded size of an object in the S3 bucket.
+   *
+   * @param key The key (path) of the object in the S3 bucket.
+   * @param length The length of the object.
+   * @param store The S3AStore object representing the S3 bucket.
+   * @param bucket The name of the S3 bucket.
+   * @param factory The RequestFactory used to create the HeadObjectRequest.
+   * @param response The HeadObjectResponse containing the metadata of the 
object.
+   * @return The unpadded size of the object in bytes.
+   * @throws IOException If an error occurs while retrieving the object size.
+   */
+  @Override
+  public long getS3ObjectSize(String key, long length, S3AStore store, String 
bucket,
+      RequestFactory factory, HeadObjectResponse response) throws IOException {
+    return CSEUtils.getUnPaddedObjectLength(store.getOrCreateS3Client(), 
bucket,

Review Comment:
   pass the S3AStore into CSEUtils, have it create the client.



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -75,18 +77,18 @@
 public class Listing extends AbstractStoreOperation {
 
   private static final Logger LOG = S3AFileSystem.LOG;
-  private final boolean isCSEEnabled;
+  private final S3AFileSystemHandler handler;
 
-  static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
+  public static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
       new AcceptAllButS3nDirs();
 
   private final ListingOperationCallbacks listingOperationCallbacks;
 
   public Listing(ListingOperationCallbacks listingOperationCallbacks,
-      StoreContext storeContext) {
+      StoreContext storeContext, S3AFileSystemHandler handler) {

Review Comment:
   nit: on next line now things are already being chopped down



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AFileSystemHandler.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Listing;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
+
+/**
+ * An interface that defines the contract for handling certain filesystem 
operations.

Review Comment:
   how about "helps map from object store semantics to that of the fileystem, 
with special support for encrypted stores"



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DefaultS3ClientFactory.java:
##########
@@ -160,11 +161,17 @@ public S3AsyncClient createS3AsyncClient(
         .thresholdInBytes(parameters.getMultiPartThreshold())
         .build();
 
-    return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, 
bucket)
-        .httpClientBuilder(httpClientBuilder)
-        .multipartConfiguration(multipartConfiguration)
-        .multipartEnabled(parameters.isMultipartCopy())
-        .build();
+    S3AsyncClientBuilder s3AsyncClientBuilder =
+            configureClientBuilder(S3AsyncClient.builder(), parameters, conf, 
bucket)
+                .httpClientBuilder(httpClientBuilder);
+
+    // TODO: Enable multi part upload with cse once it is available.

Review Comment:
   ok, so no MPE right now. Is this planned?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEUtils.java:
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.util.Preconditions;
+
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_CUSTOM_KEYRING_CLASS_NAME;
+import static 
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.CSE_CUSTOM;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.CSE_KMS;
+import static org.apache.hadoop.fs.s3a.S3AUtils.formatRange;
+import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
+import static org.apache.hadoop.fs.s3a.impl.AWSHeaders.CRYPTO_CEK_ALGORITHM;
+import static 
org.apache.hadoop.fs.s3a.impl.AWSHeaders.UNENCRYPTED_CONTENT_LENGTH;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+
+/**
+ * S3 client side encryption (CSE) utility class.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class CSEUtils {
+
+  private CSEUtils() {
+  }
+
+  /**
+   * Checks if the file suffix ends CSE file suffix.
+   * {@link 
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX}
+   * when the config
+   * @param key file name
+   * @return true if file name ends with CSE instruction file suffix
+   */
+  public static boolean isCSEInstructionFile(String key) {
+    return key.endsWith(S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+  }
+
+  /**
+   * Checks if CSE-KMS or CSE-CUSTOM is set.
+   * @param encryptionMethod type of encryption used
+   * @return true if encryption method is CSE-KMS or CSE-CUSTOM
+   */
+  public static boolean isCSEEnabled(String encryptionMethod) {
+    return CSE_KMS.getMethod().equals(encryptionMethod) ||
+        CSE_CUSTOM.getMethod().equals(encryptionMethod);
+  }
+
+  /**
+   * Checks if a given S3 object is encrypted or not by checking following two 
conditions
+   * 1. if object metadata contains x-amz-cek-alg
+   * 2. if instruction file is present
+   *
+   * @param s3Client S3 client
+   * @param factory   S3 request factory
+   * @param key      key value of the s3 object
+   * @return true if S3 object is encrypted
+   */
+  public static boolean isObjectEncrypted(S3Client s3Client, RequestFactory 
factory, String key) {
+    HeadObjectRequest.Builder requestBuilder = 
factory.newHeadObjectRequestBuilder(key);
+    HeadObjectResponse headObjectResponse = 
s3Client.headObject(requestBuilder.build());
+    if (headObjectResponse.hasMetadata() &&
+        headObjectResponse.metadata().get(CRYPTO_CEK_ALGORITHM) != null) {
+      return true;
+    }
+    HeadObjectRequest.Builder instructionFileRequestBuilder =
+        factory.newHeadObjectRequestBuilder(key + 
S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+    try {
+      s3Client.headObject(instructionFileRequestBuilder.build());

Review Comment:
   we should somehow be tracking invocation count/performance



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -464,9 +466,10 @@ private boolean buildNextStatusBatch(S3ListResult objects) 
{
         // Skip over keys that are ourselves and old S3N _$folder$ files

Review Comment:
   we should get ride of that $folder$ stuff these days, shouldn't we? not this 
PR though



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -447,7 +449,7 @@ private boolean requestNextBatch() throws IOException {
      * @param objects the next object listing
      * @return true if this added any entries after filtering
      */
-    private boolean buildNextStatusBatch(S3ListResult objects) {
+    private boolean buildNextStatusBatch(S3ListResult objects) throws 
IOException {

Review Comment:
   upate javadocs to indicate this can happen with CSE only



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/test/MinimalListingOperationCallbacks.java:
##########
@@ -30,6 +30,8 @@
 import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 import org.apache.hadoop.fs.store.audit.AuditSpan;
 
+import software.amazon.awssdk.services.s3.model.S3Object;

Review Comment:
   nit: move above the asf block



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEV1CompatibleS3AFileSystemHandler.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Listing;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.DEFAULT_S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.impl.CSEUtils.isObjectEncrypted;
+
+/**
+ * An extension of the {@link CSES3AFileSystemHandler} class.
+ * This handles certain file system operations when client-side encryption is 
enabled with v1 client
+ * compatibility.
+ * {@link 
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED}.
+ */
+public class CSEV1CompatibleS3AFileSystemHandler extends 
CSES3AFileSystemHandler {
+
+  /**
+   * Constructs a new instance of {@code CSEV1CompatibleS3AFileSystemHandler}.
+   */
+  public CSEV1CompatibleS3AFileSystemHandler() {
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>This implementation returns a {@link 
Listing.AcceptAllButS3nDirsAndCSEInstructionFile}
+   * or {@link Listing.AcceptAllButSelfAndS3nDirsAndCSEInstructionFile} object
+   * based on the value of the {@code includeSelf} parameter.
+   */
+  @Override
+  public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path, boolean 
includeSelf) {
+    return includeSelf
+        ? new Listing.AcceptAllButS3nDirsAndCSEInstructionFile()
+        : new Listing.AcceptAllButSelfAndS3nDirsAndCSEInstructionFile(path);
+  }
+
+  /**
+   * Returns a {@link Listing.FileStatusAcceptor} object.
+   * That determines which files and directories should be included in a 
listing operation.
+   *
+   * @param path the path for which the listing is being performed
+   * @return a {@link Listing.FileStatusAcceptor} object
+   */
+  @Override
+  public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path) {
+    return new Listing.AcceptFilesOnlyExceptCSEInstructionFile(path);
+  }
+
+  /**
+   * Retrieves an object from the S3.
+   * If the S3 object is encrypted, it uses the encrypted S3 client to 
retrieve the object else
+   * it uses the unencrypted S3 client.
+   *
+   * @param store   The S3AStore object representing the S3 bucket.
+   * @param request The GetObjectRequest containing the details of the object 
to retrieve.
+   * @param factory The RequestFactory used to create the GetObjectRequest.
+   * @return A ResponseInputStream containing the GetObjectResponse.
+   * @throws IOException If an error occurs while retrieving the object.
+   */
+  @Override
+  public ResponseInputStream<GetObjectResponse> getObject(S3AStore store, 
GetObjectRequest request,
+      RequestFactory factory) throws IOException {
+    boolean isEncrypted = isObjectEncrypted(store.getOrCreateS3Client(), 
factory, request.key());
+    return isEncrypted ? store.getOrCreateS3Client().getObject(request)

Review Comment:
   second HEAD request



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java:
##########
@@ -464,9 +466,10 @@ private boolean buildNextStatusBatch(S3ListResult objects) 
{
         // Skip over keys that are ourselves and old S3N _$folder$ files
         if (acceptor.accept(keyPath, s3Object) && filter.accept(keyPath)) {
           S3AFileStatus status = createFileStatus(keyPath, s3Object,
-                  listingOperationCallbacks.getDefaultBlockSize(keyPath),
-                  getStoreContext().getUsername(),
-                  s3Object.eTag(), null, isCSEEnabled);
+              listingOperationCallbacks.getDefaultBlockSize(keyPath),
+              getStoreContext().getUsername(),
+              s3Object.eTag(), null,

Review Comment:
   move null to a new line



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/CSEV1CompatibleS3AFileSystemHandler.java:
##########
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Listing;
+import org.apache.hadoop.fs.s3a.S3AStore;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.s3a.api.RequestFactory;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import software.amazon.awssdk.core.ResponseInputStream;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
+import software.amazon.awssdk.services.s3.model.GetObjectResponse;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.DEFAULT_S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.impl.CSEUtils.isObjectEncrypted;
+
+/**
+ * An extension of the {@link CSES3AFileSystemHandler} class.
+ * This handles certain file system operations when client-side encryption is 
enabled with v1 client
+ * compatibility.
+ * {@link 
org.apache.hadoop.fs.s3a.Constants#S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED}.
+ */
+public class CSEV1CompatibleS3AFileSystemHandler extends 
CSES3AFileSystemHandler {
+
+  /**
+   * Constructs a new instance of {@code CSEV1CompatibleS3AFileSystemHandler}.
+   */
+  public CSEV1CompatibleS3AFileSystemHandler() {
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>This implementation returns a {@link 
Listing.AcceptAllButS3nDirsAndCSEInstructionFile}
+   * or {@link Listing.AcceptAllButSelfAndS3nDirsAndCSEInstructionFile} object
+   * based on the value of the {@code includeSelf} parameter.
+   */
+  @Override
+  public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path, boolean 
includeSelf) {
+    return includeSelf
+        ? new Listing.AcceptAllButS3nDirsAndCSEInstructionFile()
+        : new Listing.AcceptAllButSelfAndS3nDirsAndCSEInstructionFile(path);
+  }
+
+  /**
+   * Returns a {@link Listing.FileStatusAcceptor} object.
+   * That determines which files and directories should be included in a 
listing operation.
+   *
+   * @param path the path for which the listing is being performed
+   * @return a {@link Listing.FileStatusAcceptor} object
+   */
+  @Override
+  public Listing.FileStatusAcceptor getFileStatusAcceptor(Path path) {
+    return new Listing.AcceptFilesOnlyExceptCSEInstructionFile(path);
+  }
+
+  /**
+   * Retrieves an object from the S3.
+   * If the S3 object is encrypted, it uses the encrypted S3 client to 
retrieve the object else
+   * it uses the unencrypted S3 client.
+   *
+   * @param store   The S3AStore object representing the S3 bucket.
+   * @param request The GetObjectRequest containing the details of the object 
to retrieve.
+   * @param factory The RequestFactory used to create the GetObjectRequest.
+   * @return A ResponseInputStream containing the GetObjectResponse.
+   * @throws IOException If an error occurs while retrieving the object.
+   */
+  @Override
+  public ResponseInputStream<GetObjectResponse> getObject(S3AStore store, 
GetObjectRequest request,
+      RequestFactory factory) throws IOException {
+    boolean isEncrypted = isObjectEncrypted(store.getOrCreateS3Client(), 
factory, request.key());

Review Comment:
   HEAD request



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/CustomKeyring.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.List;
+
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.encryption.s3.materials.DecryptionMaterials;
+import software.amazon.encryption.s3.materials.EncryptedDataKey;
+import software.amazon.encryption.s3.materials.EncryptionMaterials;
+import software.amazon.encryption.s3.materials.Keyring;
+import software.amazon.encryption.s3.materials.KmsKeyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
+import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION;
+
+/**
+ * Custom Keyring implementation by warpping over KmsKeyring.
+ * This is used for testing {@link ITestS3AClientSideEncryptionCustom}.
+ */
+public class CustomKeyring implements Keyring {
+  private final KmsClient kmsClient;
+  private final Configuration conf;
+  private final KmsKeyring kmsKeyring;
+
+
+  public CustomKeyring(Configuration conf) throws IOException {
+    this.conf = conf;
+    String bucket = S3ATestUtils.getFsName(conf);
+    kmsClient = KmsClient.builder()
+        .region(Region.of(conf.get(AWS_REGION, AWS_S3_DEFAULT_REGION)))

Review Comment:
   what if is in a different region?
   



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/CustomKeyring.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.util.List;
+
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.encryption.s3.materials.DecryptionMaterials;
+import software.amazon.encryption.s3.materials.EncryptedDataKey;
+import software.amazon.encryption.s3.materials.EncryptionMaterials;
+import software.amazon.encryption.s3.materials.Keyring;
+import software.amazon.encryption.s3.materials.KmsKeyring;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
+import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_DEFAULT_REGION;
+
+/**
+ * Custom Keyring implementation by warpping over KmsKeyring.

Review Comment:
   typo
   



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +272,211 @@ public void testEncryptionEnabledAndDisabledFS() throws 
Exception {
     }
   }
 
+  /**
+   * Test to check if unencrypted objects are read with V1 client 
compatibility.
+   * @throws IOException
+   * @throws Exception
+   */
+  @Test
+  public void testUnencryptedObjectReadWithV1CompatibilityConfig() throws 
Exception {
+    maybeSkipTest();
+    // initialize base s3 client.
+    Configuration conf = new Configuration(getConfiguration());
+    S3AFileSystem nonCseFs = createTestFileSystem(conf);
+    removeBaseAndBucketOverrides(getTestBucketName(conf),
+        conf,
+        S3_ENCRYPTION_ALGORITHM,
+        S3_ENCRYPTION_KEY,
+        SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        SERVER_SIDE_ENCRYPTION_KEY);
+    nonCseFs.initialize(getFileSystem().getUri(), conf);
+
+    Path file = path(getMethodName());

Review Comment:
   methodPath() can do this



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClientSideEncryption.java:
##########
@@ -266,6 +272,211 @@ public void testEncryptionEnabledAndDisabledFS() throws 
Exception {
     }
   }
 
+  /**
+   * Test to check if unencrypted objects are read with V1 client 
compatibility.
+   * @throws IOException
+   * @throws Exception
+   */
+  @Test
+  public void testUnencryptedObjectReadWithV1CompatibilityConfig() throws 
Exception {
+    maybeSkipTest();
+    // initialize base s3 client.
+    Configuration conf = new Configuration(getConfiguration());
+    S3AFileSystem nonCseFs = createTestFileSystem(conf);
+    removeBaseAndBucketOverrides(getTestBucketName(conf),
+        conf,
+        S3_ENCRYPTION_ALGORITHM,
+        S3_ENCRYPTION_KEY,
+        SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        SERVER_SIDE_ENCRYPTION_KEY);
+    nonCseFs.initialize(getFileSystem().getUri(), conf);
+
+    Path file = path(getMethodName());
+    // write unencrypted file
+    ContractTestUtils.writeDataset(nonCseFs, file, new byte[SMALL_FILE_SIZE],
+        SMALL_FILE_SIZE, SMALL_FILE_SIZE, true);
+
+    Configuration cseConf = new Configuration(getConfiguration());
+    cseConf.setBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, true);
+    // create filesystem with cse enabled and v1 compatibility.
+    S3AFileSystem cseFs = createTestFileSystem(cseConf);
+    cseFs.initialize(getFileSystem().getUri(), cseConf);
+
+    // read unencrypted file. It should not throw any exception.
+    try (FSDataInputStream in = cseFs.open(file)) {
+      in.read(new byte[SMALL_FILE_SIZE]);
+    } finally {
+      // close the filesystem
+      nonCseFs.close();
+      cseFs.close();
+    }
+  }
+
+  /**
+   * Test to check if file name with suffix ".instruction" is ignored with V1 
compatibility.
+   * @throws IOException
+   */
+  @Test
+  public void testSkippingCSEInstructionFileWithV1Compatibility() throws 
IOException {
+    maybeSkipTest();
+    // initialize base s3 client.
+    Configuration conf = new Configuration(getConfiguration());
+    S3AFileSystem fs = createTestFileSystem(conf);
+    removeBaseAndBucketOverrides(getTestBucketName(conf),
+        conf,
+        S3_ENCRYPTION_ALGORITHM,
+        S3_ENCRYPTION_KEY,
+        SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        SERVER_SIDE_ENCRYPTION_KEY);
+    fs.initialize(getFileSystem().getUri(), conf);
+
+    // write file with suffix ".instruction"
+    Path filePath = path(getMethodName());
+    Path file = new Path(filePath,
+        "file" + S3_ENCRYPTION_CSE_INSTRUCTION_FILE_SUFFIX);
+    ContractTestUtils.writeDataset(fs, file, new byte[SMALL_FILE_SIZE],
+        SMALL_FILE_SIZE, SMALL_FILE_SIZE, true);
+
+    // create filesystem with cse enabled and v1 compatibility.
+    Configuration cseConf = new Configuration(getConfiguration());
+    cseConf.setBoolean(S3_ENCRYPTION_CSE_V1_COMPATIBILITY_ENABLED, true);
+    S3AFileSystem cseFs = createTestFileSystem(cseConf);
+    cseFs.initialize(getFileSystem().getUri(), cseConf);
+    try {

Review Comment:
   move to line 332



##########
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/encryption.md:
##########
@@ -704,10 +704,20 @@ clients where S3-CSE has not been enabled.
 
 ### Features
 


> AWS SDK V2 - Implement CSE
> --------------------------
>
>                 Key: HADOOP-18708
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18708
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.0
>            Reporter: Ahmar Suhail
>            Assignee: Syed Shameerur Rahman
>            Priority: Major
>              Labels: pull-request-available
>
> S3 Encryption client for SDK V2 is now available, so add client side 
> encryption back in. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to