rkhachatryan commented on code in PR #27187: URL: https://github.com/apache/flink/pull/27187#discussion_r2933235135
########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.ICloseableRegistry; +import org.apache.flink.core.fs.PathsCopyingFileSystem; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.CompletedCopy; +import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest; +import software.amazon.awssdk.transfer.s3.model.FileDownload; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * Helper class for performing bulk S3 to local file system copies using S3TransferManager. + * + * <p><b>Concurrency Model:</b> Uses batch-based concurrency control with {@code + * maxConcurrentCopies} to limit parallel downloads. The current implementation waits for each batch + * to complete before starting the next batch. A future enhancement could use a bounded thread pool + * (e.g., {@link java.util.concurrent.Semaphore} or bounded executor) to allow continuous submission + * of new downloads as slots become available, which would provide better throughput by avoiding the + * "slowest task in batch" bottleneck. + * + * <p><b>Retry Handling:</b> Relies on the S3TransferManager's built-in retry mechanism for + * transient failures. If a download fails after retries: + * + * <ul> + * <li>The entire bulk copy operation fails with an IOException + * <li>Successfully downloaded files are NOT cleaned up (they remain on disk) + * <li>Partial downloads may leave incomplete files that should be cleaned up by the caller + * </ul> + * + * <p><b>Cleanup:</b> No automatic cleanup is performed on failure. Callers are responsible for + * cleaning up destination files if the bulk copy fails. Consider wrapping in a try-finally or using + * a temp directory that can be deleted on failure. + * + * <p><b>TODO:</b> Consider extracting URI parsing logic to a shared S3UriUtils utility class to + * consolidate S3 URI handling across the codebase. + */ +@Internal +public class NativeS3BulkCopyHelper { + + private static final Logger LOG = LoggerFactory.getLogger(NativeS3BulkCopyHelper.class); + + private final S3TransferManager transferManager; + private final int maxConcurrentCopies; + + public NativeS3BulkCopyHelper(S3TransferManager transferManager, int maxConcurrentCopies) { + this.transferManager = transferManager; + this.maxConcurrentCopies = maxConcurrentCopies; + } + + /** + * Copies files from S3 to local filesystem in batches. + * + * @param requests List of copy requests (source S3 path to destination local path) + * @param closeableRegistry Registry for cleanup (currently unused, reserved for future use) + * @throws IOException if any copy operation fails + */ + public void copyFiles( + List<PathsCopyingFileSystem.CopyRequest> requests, ICloseableRegistry closeableRegistry) + throws IOException { + + if (requests.isEmpty()) { + return; + } + + LOG.info("Starting bulk copy of {} files using S3TransferManager", requests.size()); + + List<CompletableFuture<CompletedCopy>> copyFutures = new ArrayList<>(); + + for (int i = 0; i < requests.size(); i++) { + PathsCopyingFileSystem.CopyRequest request = requests.get(i); + String sourceUri = request.getSource().toUri().toString(); + if (sourceUri.startsWith("s3://") || sourceUri.startsWith("s3a://")) { + copyFutures.add(copyS3ToLocal(request)); + } else { + throw new UnsupportedOperationException( + "Only S3 to local copies are currently supported: " + sourceUri); + } + + if (copyFutures.size() >= maxConcurrentCopies || i == requests.size() - 1) { + waitForCopies(copyFutures); + copyFutures.clear(); + } + } + + LOG.info("Completed bulk copy of {} files", requests.size()); + } + + /** + * Initiates an async S3 to local file copy. + * + * @param request The copy request containing source S3 path and destination local path + * @return A CompletableFuture that completes when the download finishes + * @throws IOException if the destination directory cannot be created + */ + private CompletableFuture<CompletedCopy> copyS3ToLocal( + PathsCopyingFileSystem.CopyRequest request) throws IOException { + + String sourceUri = request.getSource().toUri().toString(); + String bucket = extractBucket(sourceUri); + String key = extractKey(sourceUri); + File destFile = new File(request.getDestination().getPath()); + + Files.createDirectories(destFile.getParentFile().toPath()); + + DownloadFileRequest downloadRequest = + DownloadFileRequest.builder() + .getObjectRequest(req -> req.bucket(bucket).key(key)) + .destination(destFile.toPath()) + .build(); + + FileDownload download = transferManager.downloadFile(downloadRequest); + + return download.completionFuture() + .thenApply( + completed -> { + LOG.debug("Successfully copied {} to {}", sourceUri, destFile); + return null; + }); + } + + private void waitForCopies(List<CompletableFuture<CompletedCopy>> futures) throws IOException { + try { + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Bulk copy interrupted", e); + } catch (ExecutionException e) { + throw new IOException("Bulk copy failed", e.getCause()); + } + } + + // TODO: Consider moving these URI parsing methods to a shared S3UriUtils class + + /** + * Extracts the bucket name from an S3 URI. + * + * <p>Supports both s3:// and s3a:// schemes (s3a is normalized to s3). + */ + private String extractBucket(String s3Uri) { + String uri = s3Uri.replaceFirst("s3a://", "s3://"); + int bucketStart = uri.indexOf("://") + 3; + int bucketEnd = uri.indexOf("/", bucketStart); + if (bucketEnd == -1) { + return uri.substring(bucketStart); + } + return uri.substring(bucketStart, bucketEnd); + } + + /** + * Extracts the object key from an S3 URI. + * + * <p>Supports both s3:// and s3a:// schemes (s3a is normalized to s3). + */ + private String extractKey(String s3Uri) { + String uri = s3Uri.replaceFirst("s3a://", "s3://"); + int bucketStart = uri.indexOf("://") + 3; + int keyStart = uri.indexOf("/", bucketStart); + if (keyStart == -1) { + return ""; + } + return uri.substring(keyStart + 1); + } Review Comment: Could you add some unit tests for these methods? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.ICloseableRegistry; +import org.apache.flink.core.fs.PathsCopyingFileSystem; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.CompletedCopy; +import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest; +import software.amazon.awssdk.transfer.s3.model.FileDownload; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * Helper class for performing bulk S3 to local file system copies using S3TransferManager. + * + * <p><b>Concurrency Model:</b> Uses batch-based concurrency control with {@code + * maxConcurrentCopies} to limit parallel downloads. The current implementation waits for each batch + * to complete before starting the next batch. A future enhancement could use a bounded thread pool + * (e.g., {@link java.util.concurrent.Semaphore} or bounded executor) to allow continuous submission + * of new downloads as slots become available, which would provide better throughput by avoiding the + * "slowest task in batch" bottleneck. + * + * <p><b>Retry Handling:</b> Relies on the S3TransferManager's built-in retry mechanism for + * transient failures. If a download fails after retries: + * + * <ul> + * <li>The entire bulk copy operation fails with an IOException + * <li>Successfully downloaded files are NOT cleaned up (they remain on disk) + * <li>Partial downloads may leave incomplete files that should be cleaned up by the caller + * </ul> + * + * <p><b>Cleanup:</b> No automatic cleanup is performed on failure. Callers are responsible for + * cleaning up destination files if the bulk copy fails. Consider wrapping in a try-finally or using + * a temp directory that can be deleted on failure. + * + * <p><b>TODO:</b> Consider extracting URI parsing logic to a shared S3UriUtils utility class to + * consolidate S3 URI handling across the codebase. + */ +@Internal +public class NativeS3BulkCopyHelper { + + private static final Logger LOG = LoggerFactory.getLogger(NativeS3BulkCopyHelper.class); + + private final S3TransferManager transferManager; + private final int maxConcurrentCopies; + + public NativeS3BulkCopyHelper(S3TransferManager transferManager, int maxConcurrentCopies) { + this.transferManager = transferManager; + this.maxConcurrentCopies = maxConcurrentCopies; + } + + /** + * Copies files from S3 to local filesystem in batches. + * + * @param requests List of copy requests (source S3 path to destination local path) + * @param closeableRegistry Registry for cleanup (currently unused, reserved for future use) + * @throws IOException if any copy operation fails + */ + public void copyFiles( + List<PathsCopyingFileSystem.CopyRequest> requests, ICloseableRegistry closeableRegistry) + throws IOException { + + if (requests.isEmpty()) { + return; + } + + LOG.info("Starting bulk copy of {} files using S3TransferManager", requests.size()); + + List<CompletableFuture<CompletedCopy>> copyFutures = new ArrayList<>(); + + for (int i = 0; i < requests.size(); i++) { + PathsCopyingFileSystem.CopyRequest request = requests.get(i); + String sourceUri = request.getSource().toUri().toString(); + if (sourceUri.startsWith("s3://") || sourceUri.startsWith("s3a://")) { + copyFutures.add(copyS3ToLocal(request)); + } else { + throw new UnsupportedOperationException( + "Only S3 to local copies are currently supported: " + sourceUri); + } Review Comment: Shouldn't we wait for the the already started futures to complete? So that for example the calling code can delete the destination **_after_** the upload. (maybe we can try to cancel if that's possible) ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java: ########## @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.ICloseableRegistry; +import org.apache.flink.core.fs.PathsCopyingFileSystem; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.CompletedCopy; +import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest; +import software.amazon.awssdk.transfer.s3.model.FileDownload; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +/** + * Helper class for performing bulk S3 to local file system copies using S3TransferManager. + * + * <p><b>Concurrency Model:</b> Uses batch-based concurrency control with {@code + * maxConcurrentCopies} to limit parallel downloads. The current implementation waits for each batch + * to complete before starting the next batch. A future enhancement could use a bounded thread pool + * (e.g., {@link java.util.concurrent.Semaphore} or bounded executor) to allow continuous submission + * of new downloads as slots become available, which would provide better throughput by avoiding the + * "slowest task in batch" bottleneck. + * + * <p><b>Retry Handling:</b> Relies on the S3TransferManager's built-in retry mechanism for + * transient failures. If a download fails after retries: + * + * <ul> + * <li>The entire bulk copy operation fails with an IOException + * <li>Successfully downloaded files are NOT cleaned up (they remain on disk) + * <li>Partial downloads may leave incomplete files that should be cleaned up by the caller + * </ul> + * + * <p><b>Cleanup:</b> No automatic cleanup is performed on failure. Callers are responsible for + * cleaning up destination files if the bulk copy fails. Consider wrapping in a try-finally or using + * a temp directory that can be deleted on failure. + * + * <p><b>TODO:</b> Consider extracting URI parsing logic to a shared S3UriUtils utility class to + * consolidate S3 URI handling across the codebase. + */ +@Internal +public class NativeS3BulkCopyHelper { Review Comment: Does this class need to be `public`? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java: ########## @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.core.fs.BlockLocation; +import org.apache.flink.core.fs.EntropyInjectingFileSystem; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.PathsCopyingFileSystem; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.s3native.writer.NativeS3AccessHelper; +import org.apache.flink.fs.s3native.writer.NativeS3RecoverableWriter; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Object; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Native S3 FileSystem implementation using AWS SDK v2. + * + * <p>This file system uses an {@link AtomicBoolean} guard ({@code closed}) checked at the start of + * every public operation via {@link #checkNotClosed()}. The close lifecycle works as follows: + * + * <ul> + * <li>New operations started after {@link #closeAsync()} will throw {@link + * IllegalStateException}. + * <li>Operations already past the {@code checkNotClosed()} barrier are allowed to complete + * naturally. There is a small race window where an operation may pass the check just before + * {@code closeAsync()} sets the flag. In this case, the operation either completes normally + * (the underlying clients have not been torn down yet) or fails with an {@link IOException} + * from the AWS SDK, which callers are already expected to handle. + * <li>The {@link #closeAsync()} teardown sequence (bulkCopyHelper, transferManager, asyncClient, + * syncClient) provides a natural grace period bounded by {@link #CLOSE_TIMEOUT_SECONDS}. + * </ul> + * + * <p>A thread-pool-routed approach would provide stricter guarantees but introduces latency + * overhead on every call and risks deadlocks from nested filesystem calls (e.g., {@code create()} + * calling {@code exists()} calling {@code getFileStatus()}). + * + * <p><b>Permission Considerations:</b> Some operations require specific IAM permissions: + * + * <ul> + * <li>{@link #getFileStatus}: Returns 403 for non-existent objects if ListBucket permission is + * not granted (to prevent object enumeration) + * <li>{@link #listStatus}: Requires ListBucket permission + * <li>{@link #delete}: With only DeleteObject permission, deleting non-existent objects may + * return errors + * </ul> + */ +public class NativeS3FileSystem extends FileSystem + implements EntropyInjectingFileSystem, PathsCopyingFileSystem, AutoCloseableAsync { + + private static final Logger LOG = LoggerFactory.getLogger(NativeS3FileSystem.class); + + /** Timeout in seconds for closing the filesystem. */ + private static final long CLOSE_TIMEOUT_SECONDS = 60; + + private final S3ClientProvider clientProvider; + private final URI uri; + private final String bucketName; + + @Nullable private final String entropyInjectionKey; + private final int entropyLength; + + @Nullable private final NativeS3AccessHelper s3AccessHelper; + private final long s3uploadPartSize; + private final int maxConcurrentUploadsPerStream; + private final String localTmpDir; + + @Nullable private final NativeS3BulkCopyHelper bulkCopyHelper; + private final boolean useAsyncOperations; + private final int readBufferSize; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public NativeS3FileSystem( + S3ClientProvider clientProvider, + URI uri, + @Nullable String entropyInjectionKey, + int entropyLength, + String localTmpDir, + long s3uploadPartSize, + int maxConcurrentUploadsPerStream, + @Nullable NativeS3BulkCopyHelper bulkCopyHelper, + boolean useAsyncOperations, + int readBufferSize) { + this.clientProvider = clientProvider; + this.uri = uri; + this.bucketName = uri.getHost(); + this.entropyInjectionKey = entropyInjectionKey; + this.entropyLength = entropyLength; + this.localTmpDir = localTmpDir; + this.s3uploadPartSize = s3uploadPartSize; + this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream; + this.useAsyncOperations = useAsyncOperations; + this.readBufferSize = readBufferSize; + this.s3AccessHelper = + new NativeS3AccessHelper( + clientProvider.getS3Client(), + clientProvider.getAsyncClient(), + clientProvider.getTransferManager(), + bucketName, + useAsyncOperations, + clientProvider.getEncryptionConfig()); + this.bulkCopyHelper = bulkCopyHelper; + + if (entropyInjectionKey != null && entropyLength <= 0) { + throw new IllegalArgumentException( + "Entropy length must be >= 0 when entropy injection key is set"); + } + + LOG.info( + "Created Native S3 FileSystem for bucket: {}, entropy injection: {}, bulk copy: {}, read buffer: {} KB", + bucketName, + entropyInjectionKey != null, + bulkCopyHelper != null, + readBufferSize / 1024); + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public Path getWorkingDirectory() { + return new Path(uri); + } + + @Override + public Path getHomeDirectory() { + return new Path(uri); + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + checkNotClosed(); + final String key = NativeS3AccessHelper.extractKey(path); + final S3Client s3Client = clientProvider.getS3Client(); + + LOG.debug("Getting file status for s3://{}/{}", bucketName, key); + + try { + final HeadObjectRequest request = + HeadObjectRequest.builder().bucket(bucketName).key(key).build(); + + final HeadObjectResponse response = s3Client.headObject(request); + final Long contentLength = response.contentLength(); + + // In S3, a successful HeadObject with null/zero contentLength means + // this is a directory marker (prefix), not an actual file + if (contentLength == null || contentLength == 0) { + LOG.debug( + "HeadObject returned null/zero content length, verifying if directory: {}", + key); + return getDirectoryStatus(s3Client, key, path); + } + + final long size = contentLength; + final long modificationTime = + (response.lastModified() != null) + ? response.lastModified().toEpochMilli() + : System.currentTimeMillis(); + + LOG.trace( + "HeadObject successful for {} - size: {}, lastModified: {}", + key, + size, + response.lastModified()); + + return S3FileStatus.withFile(size, modificationTime, path); + } catch (NoSuchKeyException e) { + LOG.debug("Object not found, checking if directory: {}", key); + return getDirectoryStatus(s3Client, key, path); + } catch (S3Exception e) { + LOG.error( + "S3 error getting file status for s3://{}/{} - StatusCode: {}, ErrorCode: {}, Message: {}", + bucketName, + key, + e.statusCode(), + S3ExceptionUtils.errorCode(e), + S3ExceptionUtils.errorMessage(e)); + if (e.statusCode() == 403) { + // Note: S3 returns 403 (not 404) for non-existent objects when the + // caller lacks s3:ListBucket permission, to prevent key enumeration. + // This 403 is therefore ambiguous: it may indicate a genuine access + // denial OR that the object simply does not exist. + // See: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html + // See: + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/troubleshoot-403-errors.html + LOG.error( + "Access denied (403) for s3://{}/{}. This may indicate invalid" + + " credentials/bucket policy, OR the object may not exist and" + + " s3:ListBucket permission is not granted (S3 returns 403" + + " instead of 404 to prevent key enumeration).", + bucketName, + key); + } else if (e.statusCode() == 404) { + LOG.debug("Object not found (404) for s3://{}/{}", bucketName, key); + } + + throw S3ExceptionUtils.toIOException( + String.format("Failed to get file status for s3://%s/%s", bucketName, key), e); + } + } + + /** + * Checks if the given key represents a directory by listing objects with that prefix. Returns a + * directory {@link FileStatus} if objects exist under the prefix, otherwise throws {@link + * FileNotFoundException}. + */ + private FileStatus getDirectoryStatus(S3Client s3Client, String key, Path path) + throws FileNotFoundException { + final String prefix = key.endsWith("/") ? key : key + "/"; + final ListObjectsV2Request listRequest = + ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).maxKeys(1).build(); + final ListObjectsV2Response listResponse = s3Client.listObjectsV2(listRequest); + + if (listResponse.contents().isEmpty() && !listResponse.hasCommonPrefixes()) { + throw new FileNotFoundException("File not found: " + path); + } + + LOG.debug("Path is a directory: {}", key); + return S3FileStatus.withDirectory(path); + } + + @Override + public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) { + return new BlockLocation[] { + new S3BlockLocation(new String[] {"localhost"}, 0, file.getLen()) + }; + } + + @Override + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + checkNotClosed(); + final String key = NativeS3AccessHelper.extractKey(path); + final S3Client s3Client = clientProvider.getS3Client(); + final long fileSize = getFileStatus(path).getLen(); + return new NativeS3InputStream(s3Client, bucketName, key, fileSize, bufferSize); + } + + @Override + public FSDataInputStream open(Path path) throws IOException { + checkNotClosed(); + final String key = NativeS3AccessHelper.extractKey(path); + final S3Client s3Client = clientProvider.getS3Client(); + final long fileSize = getFileStatus(path).getLen(); + return new NativeS3InputStream(s3Client, bucketName, key, fileSize, readBufferSize); + } + + /** + * Lists the contents of a directory. + * + * <p><b>Retry Behavior:</b> This method relies on the AWS SDK's built-in retry mechanism. If a + * pagination request fails after SDK retries are exhausted: + * + * <ul> + * <li>The entire operation fails with an IOException + * <li>Partial results from previous pages are NOT returned + * <li>The caller should retry the entire listStatus operation + * </ul> + * + * <p>This behavior is consistent with atomic semantics - either the full listing succeeds or + * the operation fails completely. + */ + @Override + public FileStatus[] listStatus(Path path) throws IOException { + checkNotClosed(); + String key = NativeS3AccessHelper.extractKey(path); + if (!key.isEmpty() && !key.endsWith("/")) { + key = key + "/"; + } + + final S3Client s3Client = clientProvider.getS3Client(); + final List<FileStatus> results = new ArrayList<>(); + String continuationToken = null; + + do { + ListObjectsV2Request.Builder requestBuilder = + ListObjectsV2Request.builder().bucket(bucketName).prefix(key).delimiter("/"); + + if (continuationToken != null) { + requestBuilder.continuationToken(continuationToken); + } + + final ListObjectsV2Response response = s3Client.listObjectsV2(requestBuilder.build()); + + for (S3Object s3Object : response.contents()) { + if (!s3Object.key().equals(key)) { + final Path objectPath = + new Path(uri.getScheme(), uri.getHost(), "/" + s3Object.key()); + results.add( + S3FileStatus.withFile( + s3Object.size(), + s3Object.lastModified().toEpochMilli(), + objectPath)); + } + } + + for (software.amazon.awssdk.services.s3.model.CommonPrefix prefix : + response.commonPrefixes()) { + final Path prefixPath = + new Path(uri.getScheme(), uri.getHost(), "/" + prefix.prefix()); + results.add(S3FileStatus.withDirectory(prefixPath)); + } + + continuationToken = response.nextContinuationToken(); + } while (continuationToken != null); + + return results.toArray(new FileStatus[0]); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + checkNotClosed(); + final String key = NativeS3AccessHelper.extractKey(path); + final S3Client s3Client = clientProvider.getS3Client(); + + try { + final FileStatus status = getFileStatus(path); + + if (!status.isDir()) { + final DeleteObjectRequest request = + DeleteObjectRequest.builder().bucket(bucketName).key(key).build(); + + s3Client.deleteObject(request); + return true; + } else { + if (!recursive) { + throw new IOException("Directory not empty and recursive = false"); + } + + final FileStatus[] contents = listStatus(path); + for (FileStatus file : contents) { + delete(file.getPath(), true); + } + + return true; + } + } catch (FileNotFoundException e) { + return false; + } catch (S3Exception e) { + throw new IOException("Failed to delete: " + path, e); + } + } + + /** + * Creates a directory at the specified path. + * + * <p><b>S3 Behavior:</b> S3 is a flat object store and doesn't have true directories. Directory + * semantics are simulated through key prefixes. This method always returns true because: + * + * <ul> + * <li>S3 doesn't require directories to exist before creating objects with that prefix + * <li>Creating an empty "directory marker" object (key ending with /) is optional + * <li>Most S3 implementations don't create these markers for consistency with Hadoop FS + * </ul> + * + * <p>If explicit directory markers are needed, consider using a custom implementation. + * + * @return always returns true (S3 doesn't require explicit directory creation) + */ + @Override + public boolean mkdirs(Path path) throws IOException { + checkNotClosed(); + LOG.debug("mkdirs called for {} - S3 doesn't require explicit directory creation", path); + return true; + } + + @Override + public FSDataOutputStream create(Path path, WriteMode overwriteMode) throws IOException { + checkNotClosed(); + if (overwriteMode == WriteMode.NO_OVERWRITE) { + try { + if (exists(path)) { + throw new IOException("File already exists: " + path); + } + } catch (FileNotFoundException ignored) { + } + } else { + try { + delete(path, false); + } catch (FileNotFoundException ignored) { + } + } + + final String key = NativeS3AccessHelper.extractKey(path); + return new NativeS3OutputStream( + clientProvider.getS3Client(), + bucketName, + key, + localTmpDir, + clientProvider.getEncryptionConfig()); + } + + /** + * Renames a single file from {@code src} to {@code dst}. + * + * <p><b>Directory rename is not supported.</b> + * + * @throws UnsupportedOperationException if {@code src} is a directory + */ + @Override + public boolean rename(Path src, Path dst) throws IOException { + checkNotClosed(); + final String srcKey = NativeS3AccessHelper.extractKey(src); + final String dstKey = NativeS3AccessHelper.extractKey(dst); + final S3Client s3Client = clientProvider.getS3Client(); + + final FileStatus srcStatus = getFileStatus(src); + if (srcStatus.isDir()) { + throw new UnsupportedOperationException( + "NativeS3FileSystem does not support renaming directories: " + src); + } + + try { + final CopyObjectRequest copyRequest = + CopyObjectRequest.builder() + .sourceBucket(bucketName) + .sourceKey(srcKey) + .destinationBucket(bucketName) + .destinationKey(dstKey) + .build(); + s3Client.copyObject(copyRequest); + final DeleteObjectRequest deleteRequest = + DeleteObjectRequest.builder().bucket(bucketName).key(srcKey).build(); + s3Client.deleteObject(deleteRequest); + return true; + } catch (S3Exception e) { + throw new IOException("Failed to rename " + src + " to " + dst, e); + } + } + + @Override + public boolean isDistributedFS() { + return true; + } + + @Nullable + @Override + public String getEntropyInjectionKey() { + return entropyInjectionKey; + } + + @Override + public String generateEntropy() { + return StringUtils.generateRandomAlphanumericString( + ThreadLocalRandom.current(), entropyLength); + } + + @Override + public boolean canCopyPaths(Path source, Path destination) { + return bulkCopyHelper != null; + } + + @Override + public void copyFiles( + List<CopyRequest> requests, + org.apache.flink.core.fs.ICloseableRegistry closeableRegistry) + throws IOException { + checkNotClosed(); + if (bulkCopyHelper == null) { + throw new UnsupportedOperationException( + "Bulk copy not enabled. Set s3.bulk-copy.enabled=true"); + } + bulkCopyHelper.copyFiles(requests, closeableRegistry); + } + + @Override + public RecoverableWriter createRecoverableWriter() throws IOException { + checkNotClosed(); + if (s3AccessHelper == null) { + throw new UnsupportedOperationException("Recoverable writer not available"); + } + return NativeS3RecoverableWriter.writer( + s3AccessHelper, localTmpDir, s3uploadPartSize, maxConcurrentUploadsPerStream); Review Comment: What happens if the current thread passes `checkNotClosed`, another calls `close()`, and then current thread continues? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3EncryptionConfig.java: ########## @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; + +import software.amazon.awssdk.services.s3.model.ServerSideEncryption; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Configuration for S3 server-side encryption (SSE). + * + * <p>Supported encryption types: + * + * <ul> + * <li><b>SSE-S3</b>: Server-side encryption with Amazon S3-managed keys + * <li><b>SSE-KMS</b>: Server-side encryption with AWS KMS-managed keys + * </ul> + * + * <p><b>Encryption Context (SSE-KMS only):</b> For SSE-KMS, you can optionally provide an + * encryption context - a set of key-value pairs that provide additional authenticated data (AAD). + * This allows for more granular permission control in IAM policies. See <a + * href="https://docs.aws.amazon.com/kms/latest/developerguide/encrypt_context.html">AWS KMS + * Encryption Context</a> for details. + */ +@Internal +public class S3EncryptionConfig implements Serializable { + + private static final long serialVersionUID = 2L; + + /** Encryption types supported by this configuration. */ + public enum EncryptionType { + /** No encryption. */ + NONE, + /** Server-side encryption with Amazon S3-managed keys (SSE-S3). */ + SSE_S3, + /** Server-side encryption with AWS KMS-managed keys (SSE-KMS). */ + SSE_KMS + } + + private final EncryptionType encryptionType; + @Nullable private final String kmsKeyId; + + /** + * Optional encryption context for SSE-KMS. Provides additional authenticated data for KMS + * encrypt/decrypt operations and can be used for fine-grained IAM policy conditions. + */ + private final Map<String, String> encryptionContext; + + private S3EncryptionConfig(EncryptionType encryptionType, @Nullable String kmsKeyId) { + this(encryptionType, kmsKeyId, Collections.emptyMap()); + } + + private S3EncryptionConfig( + EncryptionType encryptionType, + @Nullable String kmsKeyId, + Map<String, String> encryptionContext) { + this.encryptionType = encryptionType; + this.kmsKeyId = kmsKeyId; + this.encryptionContext = + encryptionContext != null + ? Collections.unmodifiableMap(new HashMap<>(encryptionContext)) + : Collections.emptyMap(); + } + + /** Creates a config with no encryption. */ + public static S3EncryptionConfig none() { + return new S3EncryptionConfig(EncryptionType.NONE, null); + } + + /** Creates a config for SSE-S3 encryption (S3-managed keys). */ + public static S3EncryptionConfig sseS3() { + return new S3EncryptionConfig(EncryptionType.SSE_S3, null); + } + + /** + * Creates a config for SSE-KMS encryption with the default KMS key. + * + * <p>Uses the AWS-managed KMS key (aws/s3) for the S3 bucket. + */ + public static S3EncryptionConfig sseKms() { + return new S3EncryptionConfig(EncryptionType.SSE_KMS, null); + } + + /** + * Creates a config for SSE-KMS encryption with a specific KMS key. + * + * @param kmsKeyId The KMS key ID, ARN, or alias (e.g., "arn:aws:kms:region:account:key/key-id" + * or "alias/my-key") + */ + public static S3EncryptionConfig sseKms(String kmsKeyId) { + return new S3EncryptionConfig(EncryptionType.SSE_KMS, kmsKeyId); + } + + /** + * Creates a config for SSE-KMS encryption with a specific KMS key and encryption context. + * + * <p>The encryption context is a set of key-value pairs that: + * + * <ul> + * <li>Provides additional authenticated data (AAD) for encryption + * <li>Can be used in IAM policy conditions for fine-grained access control + * <li>Is logged in AWS CloudTrail for auditing + * </ul> + * + * <p>Example: You might include context like {"department": "finance", "project": "budget"} to + * restrict which principals can encrypt/decrypt based on these values. + * + * @param kmsKeyId The KMS key ID, ARN, or alias + * @param encryptionContext The encryption context key-value pairs + * @see <a href="https://docs.aws.amazon.com/kms/latest/developerguide/encrypt_context.html">AWS + * KMS Encryption Context</a> + */ + public static S3EncryptionConfig sseKms( + String kmsKeyId, Map<String, String> encryptionContext) { + return new S3EncryptionConfig(EncryptionType.SSE_KMS, kmsKeyId, encryptionContext); + } + + /** + * Creates an encryption config from configuration strings. + * + * @param encryptionTypeStr The encryption type: "none", "sse-s3", "sse-kms", or "SSE_S3", + * "SSE_KMS" + * @param kmsKeyId The KMS key ID (required for SSE-KMS, ignored for other types) + * @return The encryption configuration + * @throws IllegalArgumentException if the encryption type is invalid + */ + public static S3EncryptionConfig fromConfig( + @Nullable String encryptionTypeStr, @Nullable String kmsKeyId) { + if (encryptionTypeStr == null + || encryptionTypeStr.isEmpty() + || "none".equalsIgnoreCase(encryptionTypeStr)) { + return none(); + } + + String normalizedType = encryptionTypeStr.toUpperCase().replace("-", "_"); Review Comment: This doesn't handle `:` separator mentioned in config option. ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java: ########## @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.fs.s3native.token.DynamicTemporaryAWSCredentialsProvider; +import org.apache.flink.util.AutoCloseableAsync; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.utils.SdkAutoCloseable; + +import javax.annotation.Nullable; + +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +/** + * Provider for S3 clients (sync and async). Handles credential management, delegation tokens, and + * connection configuration. + */ +@Internal +public class S3ClientProvider implements AutoCloseableAsync { Review Comment: (and make it non-public :) ) ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/writer/NativeS3RecoverableFsDataOutputStream.java: ########## @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native.writer; + +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.s3native.writer.NativeS3Recoverable.PartETag; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; + +/** + * A recoverable output stream writes data to S3 using multipart uploads. + * + * <p>This class is NOT thread-safe. All write operations ({@link #write}, {@link #flush}, {@link + * #persist}, {@link #closeForCommit}) must be called from a single thread (the Flink operator + * thread). This is consistent with Flink's {@link RecoverableFsDataOutputStream} contract where + * output streams are confined to a single task thread. + * + * <p>The {@link #close()} method may be called concurrently (e.g., during task cancellation). A + * {@link ReentrantLock} guards the critical sections in {@link #close()}, {@link + * #closeForCommit()}, and {@link #persist()} to ensure safe cleanup of local resources without + * corrupting S3 state. + */ +@NotThreadSafe +public class NativeS3RecoverableFsDataOutputStream extends RecoverableFsDataOutputStream { + + private static final Logger LOG = + LoggerFactory.getLogger(NativeS3RecoverableFsDataOutputStream.class); + + private static final int BUFFER_SIZE = 64 * 1024; + private final ReentrantLock lock = new ReentrantLock(); + + private final NativeS3AccessHelper s3AccessHelper; + private final String key; + private final String uploadId; + private final String localTmpDir; + private final long minPartSize; + + private final List<PartETag> completedParts; + private long numBytesInParts; + + private File currentTempFile; + private FileOutputStream currentFileStream; + private BufferedOutputStream currentOutputStream; + private long currentPartSize; + private int nextPartNumber; + + private volatile boolean closed; + + public NativeS3RecoverableFsDataOutputStream( + NativeS3AccessHelper s3AccessHelper, + String key, + String uploadId, + String localTmpDir, + long minPartSize) + throws IOException { + this(s3AccessHelper, key, uploadId, localTmpDir, minPartSize, new ArrayList<>(), 0L); + } + + public NativeS3RecoverableFsDataOutputStream( + NativeS3AccessHelper s3AccessHelper, + String key, + String uploadId, + String localTmpDir, + long minPartSize, + List<PartETag> existingParts, + long numBytesInParts) + throws IOException { + this.s3AccessHelper = s3AccessHelper; + this.key = key; + this.uploadId = uploadId; + this.localTmpDir = localTmpDir; + this.minPartSize = minPartSize; + this.completedParts = new ArrayList<>(existingParts); + this.numBytesInParts = numBytesInParts; + this.nextPartNumber = existingParts.size() + 1; + this.currentPartSize = 0; + this.closed = false; + + createNewTempFile(); + } + + private void createNewTempFile() throws IOException { + File tmpDir = new File(localTmpDir); + if (!tmpDir.exists()) { + tmpDir.mkdirs(); + } + + currentTempFile = new File(tmpDir, "s3-part-" + UUID.randomUUID()); + currentFileStream = new FileOutputStream(currentTempFile); + currentOutputStream = new BufferedOutputStream(currentFileStream, BUFFER_SIZE); + currentPartSize = 0; + } + + @Override + public long getPos() throws IOException { + return numBytesInParts + currentPartSize; + } + + @Override + public void write(int b) throws IOException { + if (closed) { + throw new IOException("Stream is closed"); + } + + currentOutputStream.write(b); + currentPartSize++; + + if (currentPartSize >= minPartSize) { + uploadCurrentPart(); + createNewTempFile(); + } + } Review Comment: Although `close` is volatile, `close()` can be called after `closed` is checked. This seems to be a common problem in this and other classes. Or am I missing something? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.core.fs.FSDataOutputStream; + +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +import javax.annotation.Nullable; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +/** + * S3 output stream for non-recoverable S3 writes. + * + * <p><b>Thread Safety:</b> The lock guards write, flush, sync, and close so that {@link #close()} + * can be safely invoked from another thread (e.g. during task cancellation) per {@link + * org.apache.flink.core.fs.FSDataOutputStream} contract. + */ +public class NativeS3OutputStream extends FSDataOutputStream { + + private static final int BUFFER_SIZE = 64 * 1024; + + private final S3Client s3Client; + private final String bucketName; + private final String key; + private final File tmpFile; + private final OutputStream bufferedStream; + private final S3EncryptionConfig encryptionConfig; + + private final ReentrantLock lock = new ReentrantLock(); + + private long position; + + /** Flag to ensure upload happens exactly once. */ + private final AtomicBoolean fileUploaded = new AtomicBoolean(false); + + public NativeS3OutputStream( + S3Client s3Client, String bucketName, String key, String localTmpDir) + throws IOException { + this(s3Client, bucketName, key, localTmpDir, null); + } + + public NativeS3OutputStream( + S3Client s3Client, + String bucketName, + String key, + String localTmpDir, + @Nullable S3EncryptionConfig encryptionConfig) + throws IOException { + this.s3Client = s3Client; + this.bucketName = bucketName; + this.key = key; + this.encryptionConfig = + encryptionConfig != null ? encryptionConfig : S3EncryptionConfig.none(); + + File tmpDir = new File(localTmpDir); + if (!tmpDir.exists()) { + tmpDir.mkdirs(); + } + + this.tmpFile = new File(tmpDir, "s3-upload-" + UUID.randomUUID()); + this.bufferedStream = new BufferedOutputStream(new FileOutputStream(tmpFile), BUFFER_SIZE); + this.position = 0; + } + + @Override + public long getPos() throws IOException { + lock.lock(); + try { + return position; + } finally { + lock.unlock(); + } + } + + @Override + public void write(int b) throws IOException { + lock.lock(); + try { + if (fileUploaded.get()) { + throw new IOException("Stream is closed"); + } + bufferedStream.write(b); + position++; + } finally { + lock.unlock(); + } + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + lock.lock(); + try { + if (fileUploaded.get()) { + throw new IOException("Stream is closed"); + } + bufferedStream.write(b, off, len); + position += len; + } finally { + lock.unlock(); + } + } + + @Override + public void flush() throws IOException { + lock.lock(); + try { + if (fileUploaded.get()) { + throw new IOException("Stream is closed"); + } + bufferedStream.flush(); + } finally { + lock.unlock(); + } + } + + /** + * Flushes all buffered data and uploads the file to S3. + * + * @throws IOException if upload fails + */ + @Override + public void sync() throws IOException { + lock.lock(); + try { + if (fileUploaded.compareAndSet(false, true)) { + uploadToS3(); + } + } finally { + lock.unlock(); + } + } + + /** + * Closes this output stream and uploads the file to S3. + * + * @throws IOException if upload fails + */ + @Override + public void close() throws IOException { + lock.lock(); + try { + if (fileUploaded.compareAndSet(false, true)) { + uploadToS3(); + } + } finally { + lock.unlock(); + } + } + + /** Uploads the temp file to S3 and cleans up local resources. */ + private void uploadToS3() throws IOException { + try { + bufferedStream.close(); Review Comment: Don't we need to flush the stream before close? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java: ########## @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.core.fs.BlockLocation; +import org.apache.flink.core.fs.EntropyInjectingFileSystem; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.PathsCopyingFileSystem; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.s3native.writer.NativeS3AccessHelper; +import org.apache.flink.fs.s3native.writer.NativeS3RecoverableWriter; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Object; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Native S3 FileSystem implementation using AWS SDK v2. + * + * <p>This file system uses an {@link AtomicBoolean} guard ({@code closed}) checked at the start of + * every public operation via {@link #checkNotClosed()}. The close lifecycle works as follows: + * + * <ul> + * <li>New operations started after {@link #closeAsync()} will throw {@link + * IllegalStateException}. + * <li>Operations already past the {@code checkNotClosed()} barrier are allowed to complete + * naturally. There is a small race window where an operation may pass the check just before + * {@code closeAsync()} sets the flag. In this case, the operation either completes normally + * (the underlying clients have not been torn down yet) or fails with an {@link IOException} + * from the AWS SDK, which callers are already expected to handle. + * <li>The {@link #closeAsync()} teardown sequence (bulkCopyHelper, transferManager, asyncClient, + * syncClient) provides a natural grace period bounded by {@link #CLOSE_TIMEOUT_SECONDS}. + * </ul> + * + * <p>A thread-pool-routed approach would provide stricter guarantees but introduces latency + * overhead on every call and risks deadlocks from nested filesystem calls (e.g., {@code create()} + * calling {@code exists()} calling {@code getFileStatus()}). + * + * <p><b>Permission Considerations:</b> Some operations require specific IAM permissions: + * + * <ul> + * <li>{@link #getFileStatus}: Returns 403 for non-existent objects if ListBucket permission is + * not granted (to prevent object enumeration) + * <li>{@link #listStatus}: Requires ListBucket permission + * <li>{@link #delete}: With only DeleteObject permission, deleting non-existent objects may + * return errors + * </ul> + */ +public class NativeS3FileSystem extends FileSystem Review Comment: Does this class need to be public? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java: ########## @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +public class NativeS3FileSystemFactory implements FileSystemFactory { + + private static final Logger LOG = LoggerFactory.getLogger(NativeS3FileSystemFactory.class); + + private static final String INVALID_ENTROPY_KEY_CHARS = "^.*[~#@*+%{}<>\\[\\]|\"\\\\].*$"; + + public static final long S3_MULTIPART_MIN_PART_SIZE = 5L << 20; + + public static final ConfigOption<String> ACCESS_KEY = + ConfigOptions.key("s3.access-key") + .stringType() + .noDefaultValue() + .withFallbackKeys("s3.access.key") + .withDescription("AWS access key"); + + public static final ConfigOption<String> SECRET_KEY = + ConfigOptions.key("s3.secret-key") + .stringType() + .noDefaultValue() + .withFallbackKeys("s3.secret.key") + .withDescription("AWS secret key"); + + public static final ConfigOption<String> REGION = + ConfigOptions.key("s3.region") + .stringType() + .noDefaultValue() + .withDescription( + "AWS region. If not specified, the region will be automatically detected using AWS SDK's " + + "DefaultAwsRegionProviderChain, which checks (in order): AWS_REGION env var, " + + "~/.aws/config, EC2 instance metadata, and bucket location API."); + + public static final ConfigOption<String> ENDPOINT = + ConfigOptions.key("s3.endpoint") + .stringType() + .noDefaultValue() + .withDescription("Custom S3 endpoint"); + + public static final ConfigOption<Boolean> PATH_STYLE_ACCESS = + ConfigOptions.key("s3.path-style-access") + .booleanType() + .defaultValue(false) + .withFallbackKeys("s3.path.style.access") + .withDescription("Use path-style access for S3 (for S3-compatible storage)"); + + public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE = + ConfigOptions.key("s3.upload.min.part.size") + .longType() + .defaultValue(S3_MULTIPART_MIN_PART_SIZE) + .withDescription( + "Minimum size of data buffered locally before sending to S3 (5MB to 5GB)"); + + public static final ConfigOption<Integer> MAX_CONCURRENT_UPLOADS = + ConfigOptions.key("s3.upload.max.concurrent.uploads") + .intType() + .defaultValue(Runtime.getRuntime().availableProcessors()) + .withDescription("Maximum number of concurrent part uploads per stream"); + + public static final ConfigOption<String> ENTROPY_INJECT_KEY_OPTION = + ConfigOptions.key("s3.entropy.key") + .stringType() + .noDefaultValue() + .withDescription( + "Key to be replaced by random entropy for sharding optimization"); + + public static final ConfigOption<Integer> ENTROPY_INJECT_LENGTH_OPTION = + ConfigOptions.key("s3.entropy.length") + .intType() + .defaultValue(4) + .withDescription("Number of random characters for entropy injection"); + + public static final ConfigOption<Boolean> BULK_COPY_ENABLED = + ConfigOptions.key("s3.bulk-copy.enabled") + .booleanType() + .defaultValue(true) + .withDescription("Enable bulk copy operations using S3TransferManager"); + + public static final ConfigOption<Integer> BULK_COPY_MAX_CONCURRENT = + ConfigOptions.key("s3.bulk-copy.max-concurrent") + .intType() + .defaultValue(16) + .withDescription("Maximum number of concurrent copy operations"); + + public static final ConfigOption<Boolean> USE_ASYNC_OPERATIONS = + ConfigOptions.key("s3.async.enabled") + .booleanType() + .defaultValue(true) + .withDescription( + "Enable async read/write operations using S3TransferManager for improved performance"); + + public static final ConfigOption<Integer> READ_BUFFER_SIZE = + ConfigOptions.key("s3.read.buffer.size") + .intType() + .defaultValue(256 * 1024) // 256KB default + .withDescription( + "Read buffer size in bytes for S3 input streams. " + + "Larger buffers improve throughput but consume more memory. " + + "Range: 64KB - 4MB. Default: 256KB"); + + // Server-Side Encryption (SSE) Configuration + public static final ConfigOption<String> SSE_TYPE = + ConfigOptions.key("s3.sse.type") + .stringType() + .defaultValue("none") + .withDescription( + "Server-side encryption type. Valid values: " + + "'none' (no encryption), " + + "'sse-s3' or 'AES256' (S3-managed keys), " + + "'sse-kms' or 'aws:kms' (KMS-managed keys)"); + + public static final ConfigOption<String> SSE_KMS_KEY_ID = + ConfigOptions.key("s3.sse.kms.key-id") + .stringType() + .noDefaultValue() + .withDescription( + "KMS key ID, ARN, or alias for SSE-KMS encryption. " + + "If not specified with SSE-KMS, the default AWS-managed key (aws/s3) is used. " + + "Example: 'arn:aws:kms:us-east-1:123456789:key/12345678-1234-1234-1234-123456789abc' " + + "or 'alias/my-s3-key'"); + + // IAM Assume Role Configuration + public static final ConfigOption<String> ASSUME_ROLE_ARN = + ConfigOptions.key("s3.assume-role.arn") + .stringType() + .noDefaultValue() + .withDescription( + "ARN of the IAM role to assume for S3 access. " + + "Enables cross-account access or temporary elevated permissions. " + + "Example: 'arn:aws:iam::123456789012:role/S3AccessRole'"); + + public static final ConfigOption<String> ASSUME_ROLE_EXTERNAL_ID = + ConfigOptions.key("s3.assume-role.external-id") + .stringType() + .noDefaultValue() + .withDescription( + "External ID for assume role (required for cross-account access with external ID condition)"); + + public static final ConfigOption<String> ASSUME_ROLE_SESSION_NAME = + ConfigOptions.key("s3.assume-role.session-name") + .stringType() + .defaultValue("flink-s3-session") + .withDescription("Session name for the assumed role session"); + + public static final ConfigOption<Integer> ASSUME_ROLE_SESSION_DURATION_SECONDS = + ConfigOptions.key("s3.assume-role.session-duration") + .intType() + .defaultValue(3600) // 1 hour default + .withDescription( + "Duration in seconds for the assumed role session (900-43200 seconds, default: 3600)"); + + public static final ConfigOption<Integer> MAX_RETRIES = + ConfigOptions.key("s3.retry.max-num-retries") + .intType() + .defaultValue(3) + .withDescription( + "Maximum number of retry attempts for failed S3 requests. " + + "Uses the AWS SDK's default retry strategy (exponential backoff with jitter). " + + "Set to 0 to disable retries."); + + public static final ConfigOption<String> AWS_CREDENTIALS_PROVIDER = + ConfigOptions.key("fs.s3.aws.credentials.provider") + .stringType() + .noDefaultValue() + .withDescription( + "Comma-separated list of AWS credentials provider class names. " + + "Providers are tried in order; the first one that returns credentials is used. " + + "Supports fully-qualified AWS SDK v2 class names " + + "(e.g. 'software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider') " + + "or simple names from the SDK auth package " + + "(e.g. 'AnonymousCredentialsProvider', 'DefaultCredentialsProvider'). " + + "When not set, the default chain is used: delegation tokens -> " + + "static credentials (if configured) -> DefaultCredentialsProvider."); + + private Configuration flinkConfig; + + @Override + public String getScheme() { + return "s3"; + } + + // setting to least priority so that it is not used by default + @Override + public int getPriority() { + return -1; + } + + @Override + public void configure(Configuration config) { + this.flinkConfig = config; + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + Configuration config = this.flinkConfig; + if (config == null) { + config = new Configuration(); + } + + String accessKey = config.get(ACCESS_KEY); + String secretKey = config.get(SECRET_KEY); + String region = config.get(REGION); + String endpoint = config.get(ENDPOINT); + boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS); + + if (endpoint != null && !pathStyleAccess) { + pathStyleAccess = true; + } + + S3EncryptionConfig encryptionConfig = + S3EncryptionConfig.fromConfig(config.get(SSE_TYPE), config.get(SSE_KMS_KEY_ID)); + + S3ClientProvider clientProvider = + S3ClientProvider.builder() + .accessKey(accessKey) + .secretKey(secretKey) + .region(region) + .endpoint(endpoint) + .pathStyleAccess(pathStyleAccess) + .assumeRoleArn(config.get(ASSUME_ROLE_ARN)) + .assumeRoleExternalId(config.get(ASSUME_ROLE_EXTERNAL_ID)) + .assumeRoleSessionName(config.get(ASSUME_ROLE_SESSION_NAME)) + .assumeRoleSessionDurationSeconds( + config.get(ASSUME_ROLE_SESSION_DURATION_SECONDS)) + .maxRetries(config.get(MAX_RETRIES)) + .credentialsProviderClasses(config.get(AWS_CREDENTIALS_PROVIDER)) + .encryptionConfig(encryptionConfig) + .build(); + + String entropyInjectionKey = config.get(ENTROPY_INJECT_KEY_OPTION); + int numEntropyChars = -1; + if (entropyInjectionKey != null) { + if (entropyInjectionKey.matches(INVALID_ENTROPY_KEY_CHARS)) { + throw new IllegalConfigurationException( + "Invalid character in entropy injection key: " + entropyInjectionKey); + } + numEntropyChars = config.get(ENTROPY_INJECT_LENGTH_OPTION); + if (numEntropyChars <= 0) { + throw new IllegalConfigurationException( + ENTROPY_INJECT_LENGTH_OPTION.key() + " must be > 0"); + } + } Review Comment: `S3ClientProvider` built in the beginning is leaked in case of exceptions thrown later. ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3InputStream.java: ########## @@ -0,0 +1,362 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.core.fs.FSDataInputStream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.util.concurrent.locks.ReentrantLock; + +/** + * S3 input stream with configurable read-ahead buffer, range-based requests for seek operations, + * automatic stream reopening on errors, and lazy initialization to minimize memory footprint. + * + * <p><b>Thread Safety:</b> Internal state is guarded by a lock to ensure safe concurrent access and + * resource cleanup. + */ +public class NativeS3InputStream extends FSDataInputStream { Review Comment: Does this class need to be public? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java: ########## @@ -0,0 +1,582 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.core.fs.BlockLocation; +import org.apache.flink.core.fs.EntropyInjectingFileSystem; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.PathsCopyingFileSystem; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.s3native.writer.NativeS3AccessHelper; +import org.apache.flink.fs.s3native.writer.NativeS3RecoverableWriter; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Object; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Native S3 FileSystem implementation using AWS SDK v2. + * + * <p>This file system uses an {@link AtomicBoolean} guard ({@code closed}) checked at the start of + * every public operation via {@link #checkNotClosed()}. The close lifecycle works as follows: + * + * <ul> + * <li>New operations started after {@link #closeAsync()} will throw {@link + * IllegalStateException}. + * <li>Operations already past the {@code checkNotClosed()} barrier are allowed to complete + * naturally. There is a small race window where an operation may pass the check just before + * {@code closeAsync()} sets the flag. In this case, the operation either completes normally + * (the underlying clients have not been torn down yet) or fails with an {@link IOException} + * from the AWS SDK, which callers are already expected to handle. + * <li>The {@link #closeAsync()} teardown sequence (bulkCopyHelper, transferManager, asyncClient, + * syncClient) provides a natural grace period bounded by {@link #CLOSE_TIMEOUT_SECONDS}. + * </ul> + * + * <p>A thread-pool-routed approach would provide stricter guarantees but introduces latency + * overhead on every call and risks deadlocks from nested filesystem calls (e.g., {@code create()} + * calling {@code exists()} calling {@code getFileStatus()}). + * + * <p><b>Permission Considerations:</b> Some operations require specific IAM permissions: + * + * <ul> + * <li>{@link #getFileStatus}: Returns 403 for non-existent objects if ListBucket permission is + * not granted (to prevent object enumeration) + * <li>{@link #listStatus}: Requires ListBucket permission + * <li>{@link #delete}: With only DeleteObject permission, deleting non-existent objects may + * return errors + * </ul> + */ +public class NativeS3FileSystem extends FileSystem + implements EntropyInjectingFileSystem, PathsCopyingFileSystem, AutoCloseableAsync { + + private static final Logger LOG = LoggerFactory.getLogger(NativeS3FileSystem.class); + + /** Timeout in seconds for closing the filesystem. */ + private static final long CLOSE_TIMEOUT_SECONDS = 60; + + private final S3ClientProvider clientProvider; + private final URI uri; + private final String bucketName; + + @Nullable private final String entropyInjectionKey; + private final int entropyLength; + + @Nullable private final NativeS3AccessHelper s3AccessHelper; + private final long s3uploadPartSize; + private final int maxConcurrentUploadsPerStream; + private final String localTmpDir; + + @Nullable private final NativeS3BulkCopyHelper bulkCopyHelper; + private final boolean useAsyncOperations; + private final int readBufferSize; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public NativeS3FileSystem( + S3ClientProvider clientProvider, + URI uri, + @Nullable String entropyInjectionKey, + int entropyLength, + String localTmpDir, + long s3uploadPartSize, + int maxConcurrentUploadsPerStream, + @Nullable NativeS3BulkCopyHelper bulkCopyHelper, + boolean useAsyncOperations, + int readBufferSize) { + this.clientProvider = clientProvider; + this.uri = uri; + this.bucketName = uri.getHost(); + this.entropyInjectionKey = entropyInjectionKey; + this.entropyLength = entropyLength; + this.localTmpDir = localTmpDir; + this.s3uploadPartSize = s3uploadPartSize; + this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream; + this.useAsyncOperations = useAsyncOperations; + this.readBufferSize = readBufferSize; + this.s3AccessHelper = + new NativeS3AccessHelper( + clientProvider.getS3Client(), + clientProvider.getAsyncClient(), + clientProvider.getTransferManager(), + bucketName, + useAsyncOperations, + clientProvider.getEncryptionConfig()); + this.bulkCopyHelper = bulkCopyHelper; + + if (entropyInjectionKey != null && entropyLength <= 0) { + throw new IllegalArgumentException( + "Entropy length must be >= 0 when entropy injection key is set"); + } + + LOG.info( + "Created Native S3 FileSystem for bucket: {}, entropy injection: {}, bulk copy: {}, read buffer: {} KB", + bucketName, + entropyInjectionKey != null, + bulkCopyHelper != null, + readBufferSize / 1024); + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public Path getWorkingDirectory() { + return new Path(uri); + } + + @Override + public Path getHomeDirectory() { + return new Path(uri); + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + checkNotClosed(); + final String key = NativeS3AccessHelper.extractKey(path); + final S3Client s3Client = clientProvider.getS3Client(); + + LOG.debug("Getting file status for s3://{}/{}", bucketName, key); + + try { + final HeadObjectRequest request = + HeadObjectRequest.builder().bucket(bucketName).key(key).build(); + + final HeadObjectResponse response = s3Client.headObject(request); + final Long contentLength = response.contentLength(); + + // In S3, a successful HeadObject with null/zero contentLength means + // this is a directory marker (prefix), not an actual file + if (contentLength == null || contentLength == 0) { + LOG.debug( + "HeadObject returned null/zero content length, verifying if directory: {}", + key); + return getDirectoryStatus(s3Client, key, path); + } + + final long size = contentLength; + final long modificationTime = + (response.lastModified() != null) + ? response.lastModified().toEpochMilli() + : System.currentTimeMillis(); + + LOG.trace( + "HeadObject successful for {} - size: {}, lastModified: {}", + key, + size, + response.lastModified()); + + return S3FileStatus.withFile(size, modificationTime, path); + } catch (NoSuchKeyException e) { + LOG.debug("Object not found, checking if directory: {}", key); + return getDirectoryStatus(s3Client, key, path); + } catch (S3Exception e) { + LOG.error( + "S3 error getting file status for s3://{}/{} - StatusCode: {}, ErrorCode: {}, Message: {}", + bucketName, + key, + e.statusCode(), + S3ExceptionUtils.errorCode(e), + S3ExceptionUtils.errorMessage(e)); + if (e.statusCode() == 403) { + // Note: S3 returns 403 (not 404) for non-existent objects when the + // caller lacks s3:ListBucket permission, to prevent key enumeration. + // This 403 is therefore ambiguous: it may indicate a genuine access + // denial OR that the object simply does not exist. + // See: https://docs.aws.amazon.com/AmazonS3/latest/API/API_HeadObject.html + // See: + // https://docs.aws.amazon.com/AmazonS3/latest/userguide/troubleshoot-403-errors.html + LOG.error( + "Access denied (403) for s3://{}/{}. This may indicate invalid" + + " credentials/bucket policy, OR the object may not exist and" + + " s3:ListBucket permission is not granted (S3 returns 403" + + " instead of 404 to prevent key enumeration).", + bucketName, + key); + } else if (e.statusCode() == 404) { + LOG.debug("Object not found (404) for s3://{}/{}", bucketName, key); + } + + throw S3ExceptionUtils.toIOException( + String.format("Failed to get file status for s3://%s/%s", bucketName, key), e); + } + } + + /** + * Checks if the given key represents a directory by listing objects with that prefix. Returns a + * directory {@link FileStatus} if objects exist under the prefix, otherwise throws {@link + * FileNotFoundException}. + */ + private FileStatus getDirectoryStatus(S3Client s3Client, String key, Path path) + throws FileNotFoundException { + final String prefix = key.endsWith("/") ? key : key + "/"; + final ListObjectsV2Request listRequest = + ListObjectsV2Request.builder().bucket(bucketName).prefix(prefix).maxKeys(1).build(); + final ListObjectsV2Response listResponse = s3Client.listObjectsV2(listRequest); + + if (listResponse.contents().isEmpty() && !listResponse.hasCommonPrefixes()) { + throw new FileNotFoundException("File not found: " + path); + } + + LOG.debug("Path is a directory: {}", key); + return S3FileStatus.withDirectory(path); + } + + @Override + public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) { + return new BlockLocation[] { + new S3BlockLocation(new String[] {"localhost"}, 0, file.getLen()) + }; + } + + @Override + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + checkNotClosed(); + final String key = NativeS3AccessHelper.extractKey(path); + final S3Client s3Client = clientProvider.getS3Client(); + final long fileSize = getFileStatus(path).getLen(); + return new NativeS3InputStream(s3Client, bucketName, key, fileSize, bufferSize); + } + + @Override + public FSDataInputStream open(Path path) throws IOException { + checkNotClosed(); + final String key = NativeS3AccessHelper.extractKey(path); + final S3Client s3Client = clientProvider.getS3Client(); + final long fileSize = getFileStatus(path).getLen(); + return new NativeS3InputStream(s3Client, bucketName, key, fileSize, readBufferSize); + } + + /** + * Lists the contents of a directory. + * + * <p><b>Retry Behavior:</b> This method relies on the AWS SDK's built-in retry mechanism. If a + * pagination request fails after SDK retries are exhausted: + * + * <ul> + * <li>The entire operation fails with an IOException + * <li>Partial results from previous pages are NOT returned + * <li>The caller should retry the entire listStatus operation + * </ul> + * + * <p>This behavior is consistent with atomic semantics - either the full listing succeeds or + * the operation fails completely. + */ + @Override + public FileStatus[] listStatus(Path path) throws IOException { + checkNotClosed(); + String key = NativeS3AccessHelper.extractKey(path); + if (!key.isEmpty() && !key.endsWith("/")) { + key = key + "/"; + } + + final S3Client s3Client = clientProvider.getS3Client(); + final List<FileStatus> results = new ArrayList<>(); + String continuationToken = null; + + do { + ListObjectsV2Request.Builder requestBuilder = + ListObjectsV2Request.builder().bucket(bucketName).prefix(key).delimiter("/"); + + if (continuationToken != null) { + requestBuilder.continuationToken(continuationToken); + } + + final ListObjectsV2Response response = s3Client.listObjectsV2(requestBuilder.build()); + + for (S3Object s3Object : response.contents()) { + if (!s3Object.key().equals(key)) { + final Path objectPath = + new Path(uri.getScheme(), uri.getHost(), "/" + s3Object.key()); + results.add( + S3FileStatus.withFile( + s3Object.size(), + s3Object.lastModified().toEpochMilli(), + objectPath)); + } + } + + for (software.amazon.awssdk.services.s3.model.CommonPrefix prefix : + response.commonPrefixes()) { + final Path prefixPath = + new Path(uri.getScheme(), uri.getHost(), "/" + prefix.prefix()); + results.add(S3FileStatus.withDirectory(prefixPath)); + } + + continuationToken = response.nextContinuationToken(); + } while (continuationToken != null); + + return results.toArray(new FileStatus[0]); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + checkNotClosed(); + final String key = NativeS3AccessHelper.extractKey(path); + final S3Client s3Client = clientProvider.getS3Client(); + + try { + final FileStatus status = getFileStatus(path); + + if (!status.isDir()) { + final DeleteObjectRequest request = + DeleteObjectRequest.builder().bucket(bucketName).key(key).build(); + + s3Client.deleteObject(request); + return true; + } else { + if (!recursive) { + throw new IOException("Directory not empty and recursive = false"); + } + + final FileStatus[] contents = listStatus(path); + for (FileStatus file : contents) { + delete(file.getPath(), true); + } + + return true; + } + } catch (FileNotFoundException e) { + return false; + } catch (S3Exception e) { + throw new IOException("Failed to delete: " + path, e); + } + } + + /** + * Creates a directory at the specified path. + * + * <p><b>S3 Behavior:</b> S3 is a flat object store and doesn't have true directories. Directory + * semantics are simulated through key prefixes. This method always returns true because: + * + * <ul> + * <li>S3 doesn't require directories to exist before creating objects with that prefix + * <li>Creating an empty "directory marker" object (key ending with /) is optional + * <li>Most S3 implementations don't create these markers for consistency with Hadoop FS + * </ul> + * + * <p>If explicit directory markers are needed, consider using a custom implementation. + * + * @return always returns true (S3 doesn't require explicit directory creation) + */ + @Override + public boolean mkdirs(Path path) throws IOException { + checkNotClosed(); + LOG.debug("mkdirs called for {} - S3 doesn't require explicit directory creation", path); + return true; + } + + @Override + public FSDataOutputStream create(Path path, WriteMode overwriteMode) throws IOException { + checkNotClosed(); + if (overwriteMode == WriteMode.NO_OVERWRITE) { + try { + if (exists(path)) { + throw new IOException("File already exists: " + path); + } + } catch (FileNotFoundException ignored) { + } + } else { + try { + delete(path, false); + } catch (FileNotFoundException ignored) { + } + } + + final String key = NativeS3AccessHelper.extractKey(path); + return new NativeS3OutputStream( + clientProvider.getS3Client(), + bucketName, + key, + localTmpDir, + clientProvider.getEncryptionConfig()); + } + + /** + * Renames a single file from {@code src} to {@code dst}. + * + * <p><b>Directory rename is not supported.</b> + * + * @throws UnsupportedOperationException if {@code src} is a directory + */ + @Override + public boolean rename(Path src, Path dst) throws IOException { + checkNotClosed(); + final String srcKey = NativeS3AccessHelper.extractKey(src); + final String dstKey = NativeS3AccessHelper.extractKey(dst); + final S3Client s3Client = clientProvider.getS3Client(); + + final FileStatus srcStatus = getFileStatus(src); + if (srcStatus.isDir()) { + throw new UnsupportedOperationException( + "NativeS3FileSystem does not support renaming directories: " + src); + } + + try { + final CopyObjectRequest copyRequest = + CopyObjectRequest.builder() + .sourceBucket(bucketName) + .sourceKey(srcKey) + .destinationBucket(bucketName) + .destinationKey(dstKey) + .build(); + s3Client.copyObject(copyRequest); + final DeleteObjectRequest deleteRequest = + DeleteObjectRequest.builder().bucket(bucketName).key(srcKey).build(); + s3Client.deleteObject(deleteRequest); + return true; + } catch (S3Exception e) { + throw new IOException("Failed to rename " + src + " to " + dst, e); + } + } + + @Override + public boolean isDistributedFS() { + return true; + } + + @Nullable + @Override + public String getEntropyInjectionKey() { + return entropyInjectionKey; + } + + @Override + public String generateEntropy() { + return StringUtils.generateRandomAlphanumericString( + ThreadLocalRandom.current(), entropyLength); + } + + @Override + public boolean canCopyPaths(Path source, Path destination) { + return bulkCopyHelper != null; + } + + @Override + public void copyFiles( + List<CopyRequest> requests, + org.apache.flink.core.fs.ICloseableRegistry closeableRegistry) + throws IOException { + checkNotClosed(); + if (bulkCopyHelper == null) { + throw new UnsupportedOperationException( + "Bulk copy not enabled. Set s3.bulk-copy.enabled=true"); + } + bulkCopyHelper.copyFiles(requests, closeableRegistry); + } + + @Override + public RecoverableWriter createRecoverableWriter() throws IOException { + checkNotClosed(); + if (s3AccessHelper == null) { + throw new UnsupportedOperationException("Recoverable writer not available"); + } + return NativeS3RecoverableWriter.writer( + s3AccessHelper, localTmpDir, s3uploadPartSize, maxConcurrentUploadsPerStream); + } + + @Override + public CompletableFuture<Void> closeAsync() { Review Comment: Why is this async? (especially given that it has a timeouts anyways) I'm concerned that it can leak resources or break termination order. ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java: ########## @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.fs.s3native.token.DynamicTemporaryAWSCredentialsProvider; +import org.apache.flink.util.AutoCloseableAsync; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.utils.SdkAutoCloseable; + +import javax.annotation.Nullable; + +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +/** + * Provider for S3 clients (sync and async). Handles credential management, delegation tokens, and + * connection configuration. + */ +@Internal +public class S3ClientProvider implements AutoCloseableAsync { + + private static final Logger LOG = LoggerFactory.getLogger(S3ClientProvider.class); + + /** Timeout in seconds for closing S3 clients. */ + private static final long CLIENT_CLOSE_TIMEOUT_SECONDS = 30; + + private final S3Client s3Client; + private final S3AsyncClient s3AsyncClient; Review Comment: It looks like the async client is not used at all? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java: ########## @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.fs.s3native.token.DynamicTemporaryAWSCredentialsProvider; +import org.apache.flink.util.AutoCloseableAsync; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.retry.RetryPolicy; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; +import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.utils.SdkAutoCloseable; + +import javax.annotation.Nullable; + +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; + +/** + * Provider for S3 clients (sync and async). Handles credential management, delegation tokens, and + * connection configuration. + */ +@Internal +public class S3ClientProvider implements AutoCloseableAsync { Review Comment: This class is `@ThreadSafe`, right? Could you clarify that in code? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java: ########## @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.core.fs.FSDataOutputStream; + +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +import javax.annotation.Nullable; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +/** + * S3 output stream for non-recoverable S3 writes. + * + * <p><b>Thread Safety:</b> The lock guards write, flush, sync, and close so that {@link #close()} + * can be safely invoked from another thread (e.g. during task cancellation) per {@link + * org.apache.flink.core.fs.FSDataOutputStream} contract. + */ +public class NativeS3OutputStream extends FSDataOutputStream { + + private static final int BUFFER_SIZE = 64 * 1024; + + private final S3Client s3Client; + private final String bucketName; + private final String key; + private final File tmpFile; + private final OutputStream bufferedStream; + private final S3EncryptionConfig encryptionConfig; + + private final ReentrantLock lock = new ReentrantLock(); + + private long position; + + /** Flag to ensure upload happens exactly once. */ + private final AtomicBoolean fileUploaded = new AtomicBoolean(false); Review Comment: This field is always accesses under the lock, so it can be `boolean`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
