pnowojski commented on code in PR #27187:
URL: https://github.com/apache/flink/pull/27187#discussion_r2832440066


##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3Committer.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Committer for S3 multipart uploads that finalizes the upload when commit is 
called.
+ *
+ * <p><b>Failure Handling:</b> A commit failure will propagate as an 
IOException, which typically
+ * causes the Flink job to fail and trigger recovery. This is intentional 
because:
+ *
+ * <ul>
+ *   <li>S3 requires all multipart uploads to be explicitly committed or 
aborted
+ *   <li>A failed commit means the file is not finalized and data would be lost
+ *   <li>Flink's checkpoint mechanism will handle recovery from the last 
successful checkpoint
+ * </ul>
+ *
+ * <p>The "empty parts" check is a defensive measure against programming 
errors - in normal
+ * operation, a multipart upload should always have at least one part before 
committing.
+ */
+public class NativeS3Committer implements 
RecoverableFsDataOutputStream.Committer {
+
+    private final NativeS3AccessHelper s3AccessHelper;
+    private final NativeS3Recoverable recoverable;
+    private final AtomicInteger errorCount;

Review Comment:
   Why is this an `AtomicInteger`? Is it being used across different threads?
   
   How is it even being used? That's a private field that is not exposed 
outside of this class and `commitMultiPartUpload` also doesn't seem to use it 
in any way apart of just incrementing it.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native.writer;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.s3native.writer.NativeS3Recoverable.PartETag;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * A recoverable output stream writes data to S3 using multipart uploads.
+ *
+ * <p>This class is NOT thread-safe. All write operations ({@link #write}, 
{@link #flush}, {@link
+ * #persist}, {@link #closeForCommit}) must be called from a single thread 
(the Flink operator
+ * thread). This is consistent with Flink's {@link 
RecoverableFsDataOutputStream} contract where
+ * output streams are confined to a single task thread.
+ *
+ * <p>The {@link #close()} method may be called concurrently (e.g., during 
task cancellation). A
+ * {@link ReentrantLock} guards the critical sections in {@link #close()}, 
{@link
+ * #closeForCommit()}, and {@link #persist()} to ensure safe cleanup of local 
resources without
+ * corrupting S3 state.
+ */
+@NotThreadSafe
+public class NativeS3RecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStream {
+
+    /** Lock to guard close/persist/commit paths against concurrent close() 
during cancellation. */
+    private final ReentrantLock lock = new ReentrantLock();
+
+    private final NativeS3AccessHelper s3AccessHelper;
+    private final String key;
+    private final String uploadId;
+    private final String localTmpDir;
+    private final long minPartSize;
+
+    private final List<PartETag> completedParts;
+    private long numBytesInParts;
+
+    private File currentTempFile;
+    private FileOutputStream currentOutputStream;
+    private long currentPartSize;
+    private int nextPartNumber;
+
+    private volatile boolean closed;
+
+    public NativeS3RecoverableFsDataOutputStream(
+            NativeS3AccessHelper s3AccessHelper,
+            String key,
+            String uploadId,
+            String localTmpDir,
+            long minPartSize)
+            throws IOException {
+        this(s3AccessHelper, key, uploadId, localTmpDir, minPartSize, new 
ArrayList<>(), 0L);
+    }
+
+    public NativeS3RecoverableFsDataOutputStream(
+            NativeS3AccessHelper s3AccessHelper,
+            String key,
+            String uploadId,
+            String localTmpDir,
+            long minPartSize,
+            List<PartETag> existingParts,
+            long numBytesInParts)
+            throws IOException {
+        this.s3AccessHelper = s3AccessHelper;
+        this.key = key;
+        this.uploadId = uploadId;
+        this.localTmpDir = localTmpDir;
+        this.minPartSize = minPartSize;
+        this.completedParts = new ArrayList<>(existingParts);
+        this.numBytesInParts = numBytesInParts;
+        this.nextPartNumber = existingParts.size() + 1;
+        this.currentPartSize = 0;
+        this.closed = false;
+
+        createNewTempFile();
+    }
+
+    private void createNewTempFile() throws IOException {
+        File tmpDir = new File(localTmpDir);
+        if (!tmpDir.exists()) {
+            tmpDir.mkdirs();
+        }
+
+        currentTempFile = new File(tmpDir, "s3-part-" + UUID.randomUUID());
+        currentOutputStream = new FileOutputStream(currentTempFile);
+        currentPartSize = 0;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return numBytesInParts + currentPartSize;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        if (closed) {
+            throw new IOException("Stream is closed");
+        }
+
+        currentOutputStream.write(b);
+        currentPartSize++;
+
+        if (currentPartSize >= minPartSize) {
+            uploadCurrentPart();
+            createNewTempFile();
+        }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        if (closed) {
+            throw new IOException("Stream is closed");
+        }
+        if (b == null) {
+            throw new NullPointerException();
+        }
+        if (off < 0 || len < 0 || len > b.length - off) {
+            throw new IndexOutOfBoundsException();
+        }
+
+        currentOutputStream.write(b, off, len);
+        currentPartSize += len;
+
+        if (currentPartSize >= minPartSize) {
+            uploadCurrentPart();
+            createNewTempFile();
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (!closed) {
+            currentOutputStream.flush();
+        }
+    }
+
+    @Override
+    public void sync() throws IOException {
+        flush();
+    }
+
+    private void uploadCurrentPart() throws IOException {
+        currentOutputStream.close();
+
+        int partNumber = nextPartNumber++;
+        NativeS3AccessHelper.UploadPartResult result =
+                s3AccessHelper.uploadPart(
+                        key, uploadId, partNumber, currentTempFile, 
currentPartSize);
+
+        completedParts.add(new PartETag(result.getPartNumber(), 
result.getETag()));
+        numBytesInParts += currentPartSize;
+
+        Files.delete(currentTempFile.toPath());
+    }
+
+    @Override
+    public Committer closeForCommit() throws IOException {
+        lock();
+        try {
+            if (closed) {
+                throw new IOException("Stream is already closed");
+            }
+
+            closed = true;
+            currentOutputStream.close();
+
+            if (currentPartSize > 0) {
+                uploadCurrentPart();
+            } else {
+                Files.delete(currentTempFile.toPath());
+            }
+
+            NativeS3Recoverable recoverable =
+                    new NativeS3Recoverable(
+                            key, uploadId, new ArrayList<>(completedParts), 
numBytesInParts);
+
+            return new NativeS3Committer(s3AccessHelper, recoverable);
+        } finally {
+            unlock();
+        }
+    }
+
+    @Override
+    public RecoverableWriter.ResumeRecoverable persist() throws IOException {
+        lock();
+        try {
+            flush();
+
+            String incompletePartKey = null;
+            long incompletePartLength = 0;
+
+            if (currentPartSize > 0) {
+                currentOutputStream.flush();
+                incompletePartKey = key + "/.incomplete/" + uploadId + "/" + 
UUID.randomUUID();
+                s3AccessHelper.putObject(incompletePartKey, currentTempFile);
+                incompletePartLength = currentPartSize;
+            }
+
+            return new NativeS3Recoverable(
+                    key,
+                    uploadId,
+                    new ArrayList<>(completedParts),
+                    numBytesInParts,
+                    incompletePartKey,
+                    incompletePartLength);
+        } finally {
+            unlock();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        lock();
+        try {
+            if (!closed) {
+                closed = true;
+                if (currentOutputStream != null) {
+                    currentOutputStream.close();
+                }
+                if (currentTempFile != null && currentTempFile.exists()) {
+                    Files.delete(currentTempFile.toPath());
+                }
+
+                try {
+                    s3AccessHelper.abortMultiPartUpload(key, uploadId);
+                } catch (IOException e) {
+                    // best-effort cleanup
+                }
+            }
+        } finally {
+            unlock();
+        }
+    }
+
+    private void lock() throws IOException {
+        try {
+            lock.lockInterruptibly();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("interrupted while acquiring lock", e);
+        }
+    }
+
+    private void unlock() {
+        lock.unlock();
+    }

Review Comment:
   Why do we have a lock here in a `NonThreadSafe` class? I also don't see it 
spawning any internal threads.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java:
##########
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.EntropyInjectingFileSystem;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.PathsCopyingFileSystem;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.fs.s3native.writer.NativeS3AccessHelper;
+import org.apache.flink.fs.s3native.writer.NativeS3RecoverableWriter;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
+import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+import javax.annotation.Nullable;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Native S3 FileSystem implementation using AWS SDK v2.
+ *
+ * <p>This file system uses an {@link AtomicBoolean} guard ({@code closed}) 
checked at the start of
+ * every public operation via {@link #checkNotClosed()}. The close lifecycle 
works as follows:
+ *
+ * <ul>
+ *   <li>New operations started after {@link #closeAsync()} will throw {@link
+ *       IllegalStateException}.
+ *   <li>Operations already past the {@code checkNotClosed()} barrier are 
allowed to complete
+ *       naturally. There is a small race window where an operation may pass 
the check just before
+ *       {@code closeAsync()} sets the flag. In this case, the operation 
either completes normally
+ *       (the underlying clients have not been torn down yet) or fails with an 
{@link IOException}
+ *       from the AWS SDK, which callers are already expected to handle.
+ *   <li>The {@link #closeAsync()} teardown sequence (bulkCopyHelper, 
transferManager, asyncClient,
+ *       syncClient) provides a natural grace period bounded by {@link 
#CLOSE_TIMEOUT_SECONDS}.
+ * </ul>
+ *
+ * <p>A thread-pool-routed approach would provide stricter guarantees but 
introduces latency
+ * overhead on every call and risks deadlocks from nested filesystem calls 
(e.g., {@code create()}
+ * calling {@code exists()} calling {@code getFileStatus()}).
+ *
+ * <p><b>Permission Considerations:</b> Some operations require specific IAM 
permissions:
+ *
+ * <ul>
+ *   <li>{@link #getFileStatus}: Returns 403 for non-existent objects if 
ListBucket permission is
+ *       not granted (to prevent object enumeration)
+ *   <li>{@link #listStatus}: Requires ListBucket permission
+ *   <li>{@link #delete}: With only DeleteObject permission, deleting 
non-existent objects may
+ *       return errors
+ * </ul>
+ */
+public class NativeS3FileSystem extends FileSystem
+        implements EntropyInjectingFileSystem, PathsCopyingFileSystem, 
AutoCloseableAsync {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(NativeS3FileSystem.class);
+
+    /** Timeout in seconds for closing the filesystem. */
+    private static final long CLOSE_TIMEOUT_SECONDS = 60;
+
+    private final S3ClientProvider clientProvider;
+    private final URI uri;
+    private final String bucketName;
+
+    @Nullable private final String entropyInjectionKey;
+    private final int entropyLength;
+
+    @Nullable private final NativeS3AccessHelper s3AccessHelper;
+    private final long s3uploadPartSize;
+    private final int maxConcurrentUploadsPerStream;
+    private final String localTmpDir;
+
+    @Nullable private final NativeS3BulkCopyHelper bulkCopyHelper;
+    private final boolean useAsyncOperations;
+    private final int readBufferSize;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    public NativeS3FileSystem(
+            S3ClientProvider clientProvider,
+            URI uri,
+            @Nullable String entropyInjectionKey,
+            int entropyLength,
+            String localTmpDir,
+            long s3uploadPartSize,
+            int maxConcurrentUploadsPerStream,
+            @Nullable NativeS3BulkCopyHelper bulkCopyHelper,
+            boolean useAsyncOperations,
+            int readBufferSize) {
+        this.clientProvider = clientProvider;
+        this.uri = uri;
+        this.bucketName = uri.getHost();
+        this.entropyInjectionKey = entropyInjectionKey;
+        this.entropyLength = entropyLength;
+        this.localTmpDir = localTmpDir;
+        this.s3uploadPartSize = s3uploadPartSize;
+        this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
+        this.useAsyncOperations = useAsyncOperations;
+        this.readBufferSize = readBufferSize;
+        this.s3AccessHelper =
+                new NativeS3AccessHelper(
+                        clientProvider.getS3Client(),
+                        clientProvider.getAsyncClient(),
+                        clientProvider.getTransferManager(),
+                        bucketName,
+                        useAsyncOperations,
+                        clientProvider.getEncryptionConfig());
+        this.bulkCopyHelper = bulkCopyHelper;
+
+        if (entropyInjectionKey != null && entropyLength <= 0) {
+            throw new IllegalArgumentException(
+                    "Entropy length must be >= 0 when entropy injection key is 
set");
+        }
+
+        LOG.info(
+                "Created Native S3 FileSystem for bucket: {}, entropy 
injection: {}, bulk copy: {}, read buffer: {} KB",
+                bucketName,
+                entropyInjectionKey != null,
+                bulkCopyHelper != null,
+                readBufferSize / 1024);
+    }
+
+    @Override
+    public URI getUri() {
+        return uri;
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+        return new Path(uri);
+    }
+
+    @Override
+    public Path getHomeDirectory() {
+        return new Path(uri);
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+        checkNotClosed();
+        final String key = NativeS3AccessHelper.extractKey(path);
+        final S3Client s3Client = clientProvider.getS3Client();
+
+        LOG.debug("Getting file status for s3://{}/{}", bucketName, key);
+
+        try {
+            final HeadObjectRequest request =
+                    
HeadObjectRequest.builder().bucket(bucketName).key(key).build();
+
+            final HeadObjectResponse response = s3Client.headObject(request);
+            final Long contentLength = response.contentLength();
+
+            // In S3, a successful HeadObject with null/zero contentLength 
means
+            // this is a directory marker (prefix), not an actual file
+            if (contentLength == null || contentLength == 0) {
+                LOG.debug(
+                        "HeadObject returned null/zero content length, 
verifying if directory: {}",
+                        key);
+                return getDirectoryStatus(s3Client, key, path);
+            }
+
+            final long size = contentLength;
+            final long modificationTime =
+                    (response.lastModified() != null)
+                            ? response.lastModified().toEpochMilli()
+                            : System.currentTimeMillis();
+
+            LOG.trace(
+                    "HeadObject successful for {} - size: {}, lastModified: 
{}",
+                    key,
+                    size,
+                    response.lastModified());
+
+            return S3FileStatus.withFile(size, modificationTime, path);
+        } catch (NoSuchKeyException e) {
+            LOG.debug("Object not found, checking if directory: {}", key);
+            return getDirectoryStatus(s3Client, key, path);
+        } catch (S3Exception e) {
+            LOG.error(
+                    "S3 error getting file status for s3://{}/{} - StatusCode: 
{}, ErrorCode: {}, Message: {}",
+                    bucketName,
+                    key,
+                    e.statusCode(),
+                    S3ExceptionUtils.errorCode(e),
+                    S3ExceptionUtils.errorMessage(e));
+            if (e.statusCode() == 403) {
+                // Note: S3 returns 403 (not 404) for non-existent objects 
when the
+                // caller lacks s3:ListBucket permission, to prevent key 
enumeration.
+                // This 403 is therefore ambiguous: it may indicate a genuine 
access
+                // denial OR that the object simply does not exist.
+                // See: 
https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html
+                // See:
+                // 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/troubleshoot-403-errors.html
+                LOG.error(
+                        "Access denied (403) for s3://{}/{}. This may indicate 
invalid"
+                                + " credentials/bucket policy, OR the object 
may not exist and"
+                                + " s3:ListBucket permission is not granted 
(S3 returns 403"
+                                + " instead of 404 to prevent key 
enumeration).",
+                        bucketName,
+                        key);
+            } else if (e.statusCode() == 404) {
+                LOG.debug("Object not found (404) for s3://{}/{}", bucketName, 
key);
+            }
+
+            throw S3ExceptionUtils.toIOException(
+                    String.format("Failed to get file status for s3://%s/%s", 
bucketName, key), e);
+        }
+    }
+
+    /**
+     * Checks if the given key represents a directory by listing objects with 
that prefix. Returns a
+     * directory {@link FileStatus} if objects exist under the prefix, 
otherwise throws {@link
+     * FileNotFoundException}.
+     */
+    private FileStatus getDirectoryStatus(S3Client s3Client, String key, Path 
path)
+            throws FileNotFoundException {
+        final String prefix = key.endsWith("/") ? key : key + "/";
+        final ListObjectsV2Request listRequest =
+                
ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).maxKeys(1).build();
+        final ListObjectsV2Response listResponse = 
s3Client.listObjectsV2(listRequest);
+
+        if (listResponse.contents().isEmpty() && 
!listResponse.hasCommonPrefixes()) {
+            throw new FileNotFoundException("File not found: " + path);
+        }
+
+        LOG.debug("Path is a directory: {}", key);
+        return S3FileStatus.withDirectory(path);
+    }
+
+    @Override
+    public BlockLocation[] getFileBlockLocations(FileStatus file, long start, 
long len) {
+        return new BlockLocation[] {
+            new S3BlockLocation(new String[] {"localhost"}, 0, file.getLen())
+        };
+    }
+
+    @Override
+    public FSDataInputStream open(Path path, int bufferSize) throws 
IOException {
+        checkNotClosed();
+        final String key = NativeS3AccessHelper.extractKey(path);
+        final S3Client s3Client = clientProvider.getS3Client();
+        final long fileSize = getFileStatus(path).getLen();
+        return new NativeS3InputStream(s3Client, bucketName, key, fileSize, 
bufferSize);
+    }
+
+    @Override
+    public FSDataInputStream open(Path path) throws IOException {
+        checkNotClosed();
+        final String key = NativeS3AccessHelper.extractKey(path);
+        final S3Client s3Client = clientProvider.getS3Client();
+        final long fileSize = getFileStatus(path).getLen();
+        return new NativeS3InputStream(s3Client, bucketName, key, fileSize, 
readBufferSize);
+    }
+
+    /**
+     * Lists the contents of a directory.
+     *
+     * <p><b>Retry Behavior:</b> This method relies on the AWS SDK's built-in 
retry mechanism. If a
+     * pagination request fails after SDK retries are exhausted:
+     *
+     * <ul>
+     *   <li>The entire operation fails with an IOException
+     *   <li>Partial results from previous pages are NOT returned
+     *   <li>The caller should retry the entire listStatus operation
+     * </ul>
+     *
+     * <p>This behavior is consistent with atomic semantics - either the full 
listing succeeds or
+     * the operation fails completely.
+     */
+    @Override
+    public FileStatus[] listStatus(Path path) throws IOException {
+        checkNotClosed();
+        String key = NativeS3AccessHelper.extractKey(path);
+        if (!key.isEmpty() && !key.endsWith("/")) {
+            key = key + "/";
+        }
+
+        final S3Client s3Client = clientProvider.getS3Client();
+        final List<FileStatus> results = new ArrayList<>();
+        String continuationToken = null;
+
+        do {
+            ListObjectsV2Request.Builder requestBuilder =
+                    
ListObjectsV2Request.builder().bucket(bucketName).prefix(key).delimiter("/");
+
+            if (continuationToken != null) {
+                requestBuilder.continuationToken(continuationToken);
+            }
+
+            final ListObjectsV2Response response = 
s3Client.listObjectsV2(requestBuilder.build());
+
+            for (S3Object s3Object : response.contents()) {
+                if (!s3Object.key().equals(key)) {
+                    final Path objectPath =
+                            new Path(uri.getScheme(), uri.getHost(), "/" + 
s3Object.key());
+                    results.add(
+                            S3FileStatus.withFile(
+                                    s3Object.size(),
+                                    s3Object.lastModified().toEpochMilli(),
+                                    objectPath));
+                }
+            }
+
+            for (software.amazon.awssdk.services.s3.model.CommonPrefix prefix :
+                    response.commonPrefixes()) {
+                final Path prefixPath =
+                        new Path(uri.getScheme(), uri.getHost(), "/" + 
prefix.prefix());
+                results.add(S3FileStatus.withDirectory(prefixPath));
+            }
+
+            continuationToken = response.nextContinuationToken();
+        } while (continuationToken != null);
+
+        return results.toArray(new FileStatus[0]);
+    }
+
+    @Override
+    public boolean delete(Path path, boolean recursive) throws IOException {
+        checkNotClosed();
+        final String key = NativeS3AccessHelper.extractKey(path);
+        final S3Client s3Client = clientProvider.getS3Client();
+
+        try {
+            final FileStatus status = getFileStatus(path);
+
+            if (!status.isDir()) {
+                final DeleteObjectRequest request =
+                        
DeleteObjectRequest.builder().bucket(bucketName).key(key).build();
+
+                s3Client.deleteObject(request);
+                return true;
+            } else {
+                if (!recursive) {
+                    throw new IOException("Directory not empty and recursive = 
false");
+                }
+
+                final FileStatus[] contents = listStatus(path);
+                for (FileStatus file : contents) {
+                    delete(file.getPath(), true);
+                }
+
+                return true;
+            }
+        } catch (FileNotFoundException e) {
+            return false;
+        } catch (S3Exception e) {
+            throw new IOException("Failed to delete: " + path, e);
+        }
+    }
+
+    /**
+     * Creates a directory at the specified path.
+     *
+     * <p><b>S3 Behavior:</b> S3 is a flat object store and doesn't have true 
directories. Directory
+     * semantics are simulated through key prefixes. This method always 
returns true because:
+     *
+     * <ul>
+     *   <li>S3 doesn't require directories to exist before creating objects 
with that prefix
+     *   <li>Creating an empty "directory marker" object (key ending with /) 
is optional
+     *   <li>Most S3 implementations don't create these markers for 
consistency with Hadoop FS
+     * </ul>
+     *
+     * <p>If explicit directory markers are needed, consider using a custom 
implementation.
+     *
+     * @return always returns true (S3 doesn't require explicit directory 
creation)
+     */
+    @Override
+    public boolean mkdirs(Path path) throws IOException {
+        checkNotClosed();
+        LOG.debug("mkdirs called for {} - S3 doesn't require explicit 
directory creation", path);
+        return true;
+    }
+
+    @Override
+    public FSDataOutputStream create(Path path, WriteMode overwriteMode) 
throws IOException {
+        checkNotClosed();
+        if (overwriteMode == WriteMode.NO_OVERWRITE) {
+            try {
+                if (exists(path)) {
+                    throw new IOException("File already exists: " + path);
+                }
+            } catch (FileNotFoundException ignored) {
+            }
+        } else {
+            try {
+                delete(path, false);
+            } catch (FileNotFoundException ignored) {
+            }
+        }
+
+        final String key = NativeS3AccessHelper.extractKey(path);
+        return new NativeS3OutputStream(
+                clientProvider.getS3Client(),
+                bucketName,
+                key,
+                localTmpDir,
+                clientProvider.getEncryptionConfig());
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+        checkNotClosed();
+        final String srcKey = NativeS3AccessHelper.extractKey(src);
+        final String dstKey = NativeS3AccessHelper.extractKey(dst);
+        final S3Client s3Client = clientProvider.getS3Client();
+        try {
+            final CopyObjectRequest copyRequest =
+                    CopyObjectRequest.builder()
+                            .sourceBucket(bucketName)
+                            .sourceKey(srcKey)
+                            .destinationBucket(bucketName)
+                            .destinationKey(dstKey)
+                            .build();
+            s3Client.copyObject(copyRequest);
+            final DeleteObjectRequest deleteRequest =
+                    
DeleteObjectRequest.builder().bucket(bucketName).key(srcKey).build();
+            s3Client.deleteObject(deleteRequest);
+            return true;
+        } catch (S3Exception e) {
+            throw new IOException("Failed to rename " + src + " to " + dst, e);
+        }
+    }
+
+    @Override
+    public boolean isDistributedFS() {
+        return true;
+    }
+
+    @Nullable
+    @Override
+    public String getEntropyInjectionKey() {
+        return entropyInjectionKey;
+    }
+
+    @Override
+    public String generateEntropy() {
+        return StringUtils.generateRandomAlphanumericString(
+                ThreadLocalRandom.current(), entropyLength);
+    }
+
+    @Override
+    public boolean canCopyPaths(Path source, Path destination) {
+        return bulkCopyHelper != null;
+    }
+
+    @Override
+    public void copyFiles(
+            List<CopyRequest> requests,
+            org.apache.flink.core.fs.ICloseableRegistry closeableRegistry)
+            throws IOException {
+        checkNotClosed();
+        if (bulkCopyHelper == null) {
+            throw new UnsupportedOperationException(
+                    "Bulk copy not enabled. Set s3.bulk-copy.enabled=true");
+        }
+        bulkCopyHelper.copyFiles(requests, closeableRegistry);
+    }
+
+    @Override
+    public RecoverableWriter createRecoverableWriter() throws IOException {
+        checkNotClosed();
+        if (s3AccessHelper == null) {
+            throw new UnsupportedOperationException("Recoverable writer not 
available");
+        }
+        return NativeS3RecoverableWriter.writer(
+                s3AccessHelper, localTmpDir, s3uploadPartSize, 
maxConcurrentUploadsPerStream);
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        if (!closed.compareAndSet(false, true)) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        LOG.info("Starting async close of Native S3 FileSystem for bucket: 
{}", bucketName);
+        return CompletableFuture.runAsync(
+                        () -> LOG.info("Native S3 FileSystem closed for 
bucket: {}", bucketName))
+                .thenCompose(
+                        ignored -> {
+                            if (clientProvider != null) {
+                                return clientProvider
+                                        .closeAsync()
+                                        .whenComplete(
+                                                (result, error) -> {
+                                                    if (error != null) {
+                                                        LOG.warn(
+                                                                "Error closing 
S3 client provider",
+                                                                error);
+                                                    } else {
+                                                        LOG.debug("S3 client 
provider closed");
+                                                    }
+                                                });
+                            }
+                            return CompletableFuture.completedFuture(null);
+                        })
+                .orTimeout(CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+                .exceptionally(
+                        ex -> {
+                            LOG.error(
+                                    "FileSystem close timed out after {} 
seconds for bucket: {}",
+                                    CLOSE_TIMEOUT_SECONDS,
+                                    bucketName,
+                                    ex);
+                            return null;
+                        });

Review Comment:
   Is this the correct thing to do in case of an errors? Shouldn't this be 
handled somehow? For example via 
`org.apache.flink.util.concurrent.FutureUtils#assertNoException`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to