Izeren commented on code in PR #27187: URL: https://github.com/apache/flink/pull/27187#discussion_r2783316708
########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java: ########## @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.core.fs.BlockLocation; +import org.apache.flink.core.fs.EntropyInjectingFileSystem; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.PathsCopyingFileSystem; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.fs.s3native.writer.NativeS3AccessHelper; +import org.apache.flink.fs.s3native.writer.NativeS3RecoverableWriter; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.StringUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; +import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.services.s3.model.S3Object; + +import javax.annotation.Nullable; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** Native S3 FileSystem implementation using AWS SDK v2. */ +public class NativeS3FileSystem extends FileSystem + implements EntropyInjectingFileSystem, PathsCopyingFileSystem, AutoCloseableAsync { + + private static final Logger LOG = LoggerFactory.getLogger(NativeS3FileSystem.class); + + private final S3ClientProvider clientProvider; + private final URI uri; + private final String bucketName; + + @Nullable private final String entropyInjectionKey; + private final int entropyLength; + + @Nullable private final NativeS3AccessHelper s3AccessHelper; + private final long s3uploadPartSize; + private final int maxConcurrentUploadsPerStream; + private final String localTmpDir; + + @Nullable private final NativeS3BulkCopyHelper bulkCopyHelper; + private final boolean useAsyncOperations; + private final int readBufferSize; + private final AtomicBoolean closed = new AtomicBoolean(false); + + public NativeS3FileSystem( + S3ClientProvider clientProvider, + URI uri, + @Nullable String entropyInjectionKey, + int entropyLength, + String localTmpDir, + long s3uploadPartSize, + int maxConcurrentUploadsPerStream, + @Nullable NativeS3BulkCopyHelper bulkCopyHelper, + boolean useAsyncOperations, + int readBufferSize) { + this.clientProvider = clientProvider; + this.uri = uri; + this.bucketName = uri.getHost(); + this.entropyInjectionKey = entropyInjectionKey; + this.entropyLength = entropyLength; + this.localTmpDir = localTmpDir; + this.s3uploadPartSize = s3uploadPartSize; + this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream; + this.useAsyncOperations = useAsyncOperations; + this.readBufferSize = readBufferSize; + this.s3AccessHelper = + new NativeS3AccessHelper( + clientProvider.getS3Client(), + clientProvider.getAsyncClient(), + clientProvider.getTransferManager(), + bucketName, + useAsyncOperations, + clientProvider.getEncryptionConfig()); + this.bulkCopyHelper = bulkCopyHelper; + + if (entropyInjectionKey != null && entropyLength <= 0) { + throw new IllegalArgumentException( + "Entropy length must be >= 0 when entropy injection key is set"); + } + + LOG.info( + "Created Native S3 FileSystem for bucket: {}, entropy injection: {}, bulk copy: {}, read buffer: {} KB", + bucketName, + entropyInjectionKey != null, + bulkCopyHelper != null, + readBufferSize / 1024); + } + + @Override + public URI getUri() { + return uri; + } + + @Override + public Path getWorkingDirectory() { + return new Path(uri); + } + + @Override + public Path getHomeDirectory() { + return new Path(uri); + } + + @Override + public FileStatus getFileStatus(Path path) throws IOException { + checkNotClosed(); + String key = NativeS3AccessHelper.extractKey(path); + S3Client s3Client = clientProvider.getS3Client(); + + LOG.debug("Getting file status for s3://{}/{}", bucketName, key); + + try { + HeadObjectRequest request = + HeadObjectRequest.builder().bucket(bucketName).key(key).build(); + + HeadObjectResponse response = s3Client.headObject(request); + Long contentLength = response.contentLength(); + + // In S3, a successful HeadObject with null contentLength means + // this is a directory marker (prefix), not an actual file + if (contentLength == null || contentLength == 0) { + LOG.debug( + "HeadObject returned null/zero content length, verifying if directory: {}", + key); + ListObjectsV2Request listRequest = + ListObjectsV2Request.builder() + .bucket(bucketName) + .prefix(key.endsWith("/") ? key : key + "/") + .maxKeys(1) + .build(); + ListObjectsV2Response listResponse = s3Client.listObjectsV2(listRequest); + if (listResponse.contents().isEmpty() && !listResponse.hasCommonPrefixes()) { + throw new FileNotFoundException("File not found: " + path); + } + return new S3FileStatus(0, 0, 0, 0, true, path); + } + + long size = contentLength; + long modificationTime = + (response.lastModified() != null) + ? response.lastModified().toEpochMilli() + : System.currentTimeMillis(); + + LOG.trace( + "HeadObject successful for {} - size: {}, lastModified: {}", + key, + size, + response.lastModified()); + + return new S3FileStatus(size, size, modificationTime, 0, false, path); + } catch (NoSuchKeyException e) { + LOG.debug("Object not found, checking if directory: {}", key); + ListObjectsV2Request listRequest = + ListObjectsV2Request.builder() + .bucket(bucketName) + .prefix(key.endsWith("/") ? key : key + "/") + .maxKeys(1) + .build(); + + ListObjectsV2Response listResponse = s3Client.listObjectsV2(listRequest); + + if (listResponse.contents().isEmpty() && !listResponse.hasCommonPrefixes()) { + throw new FileNotFoundException("File not found: " + path); + } + + LOG.debug("Path is a directory: {}", key); + return new S3FileStatus(0, 0, 0, 0, true, path); + } catch (S3Exception e) { + String errorCode = + (e.awsErrorDetails() != null) ? e.awsErrorDetails().errorCode() : "Unknown"; + String errorMsg = + (e.awsErrorDetails() != null) + ? e.awsErrorDetails().errorMessage() + : e.getMessage(); + + LOG.error( + "S3 error getting file status for s3://{}/{} - StatusCode: {}, ErrorCode: {}, Message: {}", + bucketName, + key, + e.statusCode(), + errorCode, + errorMsg); + if (e.statusCode() == 403) { + LOG.error( + "Access denied (403). Check credentials, bucket policy, and bucket existence for s3://{}/{}", + bucketName, + key); + } else if (e.statusCode() == 404) { + LOG.debug("Object not found (404) for s3://{}/{}", bucketName, key); + } + + throw new IOException( + String.format( + "Failed to get file status for s3://%s/%s: %s", + bucketName, key, errorMsg), + e); + } + } + + @Override + public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) { + return new BlockLocation[] { + new S3BlockLocation(new String[] {"localhost"}, 0, file.getLen()) + }; + } + + @Override + public FSDataInputStream open(Path path, int bufferSize) throws IOException { + return open(path); + } + + @Override + public FSDataInputStream open(Path path) throws IOException { + checkNotClosed(); + String key = NativeS3AccessHelper.extractKey(path); + S3Client s3Client = clientProvider.getS3Client(); + long fileSize = getFileStatus(path).getLen(); + return new NativeS3InputStream(s3Client, bucketName, key, fileSize, readBufferSize); + } + + @Override + public FileStatus[] listStatus(Path path) throws IOException { + checkNotClosed(); + String key = NativeS3AccessHelper.extractKey(path); + if (!key.isEmpty() && !key.endsWith("/")) { + key = key + "/"; + } + + S3Client s3Client = clientProvider.getS3Client(); + List<FileStatus> results = new ArrayList<>(); + String continuationToken = null; + + do { + ListObjectsV2Request.Builder requestBuilder = + ListObjectsV2Request.builder().bucket(bucketName).prefix(key).delimiter("/"); + + if (continuationToken != null) { + requestBuilder.continuationToken(continuationToken); + } + + ListObjectsV2Response response = s3Client.listObjectsV2(requestBuilder.build()); + + for (S3Object s3Object : response.contents()) { + if (!s3Object.key().equals(key)) { + Path objectPath = + new Path(uri.getScheme(), uri.getHost(), "/" + s3Object.key()); + results.add( + new S3FileStatus( + s3Object.size(), + s3Object.size(), + s3Object.lastModified().toEpochMilli(), + 0, + false, + objectPath)); + } + } + + for (software.amazon.awssdk.services.s3.model.CommonPrefix prefix : + response.commonPrefixes()) { + Path prefixPath = new Path(uri.getScheme(), uri.getHost(), "/" + prefix.prefix()); + results.add(new S3FileStatus(0, 0, 0, 0, true, prefixPath)); + } + + continuationToken = response.nextContinuationToken(); + } while (continuationToken != null); + + return results.toArray(new FileStatus[0]); + } + + @Override + public boolean delete(Path path, boolean recursive) throws IOException { + checkNotClosed(); + String key = NativeS3AccessHelper.extractKey(path); + S3Client s3Client = clientProvider.getS3Client(); + + try { + FileStatus status = getFileStatus(path); + + if (!status.isDir()) { + DeleteObjectRequest request = + DeleteObjectRequest.builder().bucket(bucketName).key(key).build(); + + s3Client.deleteObject(request); + return true; + } else { + if (!recursive) { + throw new IOException("Directory not empty and recursive = false"); + } + + FileStatus[] contents = listStatus(path); + for (FileStatus file : contents) { + delete(file.getPath(), true); + } + + return true; + } + } catch (FileNotFoundException e) { + return false; + } catch (S3Exception e) { + throw new IOException("Failed to delete: " + path, e); + } + } + + @Override + public boolean mkdirs(Path path) throws IOException { + return true; Review Comment: Resolved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
