This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit d22fc3f8c4f41e7cb7becc601dd07f3ee0a042ae Author: Hussain Towaileb <[email protected]> AuthorDate: Tue Sep 30 18:29:52 2025 +0300 [ASTERIXDB-3653][EXT]: Properly handle errors when deleting for COPY TO Ext-ref: MB-68654 Change-Id: Ib670a014a5e4378b0a63be740ead72d699a84bb1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20428 Tested-by: Hussain Towaileb <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: Michael Blow <[email protected]> --- .../apache/asterix/cloud/clients/ICloudClient.java | 9 ++++ .../asterix/cloud/clients/UnstableCloudClient.java | 5 +++ .../cloud/clients/aws/s3/S3CloudClient.java | 17 ++++++++ .../blobstorage/AzBlobStorageCloudClient.java | 45 +++++++++++++------ .../cloud/clients/google/gcs/GCSCloudClient.java | 51 ++++++++++++++-------- .../AbstractCloudExternalFileWriterFactory.java | 3 +- 6 files changed, 97 insertions(+), 33 deletions(-) diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java index 0baebc7071..a8cb59b1dd 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java @@ -120,6 +120,15 @@ public interface ICloudClient { */ void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException; + /** + * Deletes an object at the specified bucket and path + * + * @param bucket bucket + * @param path path of object + * @throws HyracksDataException HyracksDataException + */ + void deleteObject(String bucket, String path) throws HyracksDataException; + /** * Deletes all objects at the specified bucket and paths * diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java index d81419bfef..1160c12386 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/UnstableCloudClient.java @@ -105,6 +105,11 @@ public class UnstableCloudClient implements ICloudClient { cloudClient.copy(bucket, srcPath, destPath); } + @Override + public void deleteObject(String bucket, String path) throws HyracksDataException { + cloudClient.deleteObject(bucket, path); + } + @Override public void deleteObjects(String bucket, Collection<String> paths) throws HyracksDataException { cloudClient.deleteObjects(bucket, paths); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java index 81b96d7862..c9fd485135 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java @@ -73,6 +73,7 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.model.CopyObjectRequest; import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -237,6 +238,22 @@ public final class S3CloudClient implements ICloudClient { } } + @Override + public void deleteObject(String bucket, String path) throws HyracksDataException { + try { + if (path.isEmpty()) { + return; + } + guardian.checkWriteAccess(bucket, path); + profiler.objectDelete(); + DeleteObjectRequest request = + DeleteObjectRequest.builder().bucket(bucket).key(config.getPrefix() + path).build(); + s3Client.deleteObject(request); + } catch (Exception ex) { + throw HyracksDataException.create(ex); + } + } + @Override public void deleteObjects(String bucket, Collection<String> paths) throws HyracksDataException { if (paths.isEmpty()) { diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java index bdb551b768..4a61f1caa9 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/azure/blobstorage/AzBlobStorageCloudClient.java @@ -258,6 +258,21 @@ public class AzBlobStorageCloudClient implements ICloudClient { destBlobClient.beginCopy(srcBlobUrl, null); } + @Override + public void deleteObject(String bucket, String path) throws HyracksDataException { + try { + if (path.isEmpty()) { + return; + } + guardian.checkWriteAccess(bucket, path); + profiler.objectDelete(); + BlobClient blobClient = blobContainerClient.getBlobClient(config.getPrefix() + path); + blobClient.delete(); + } catch (Exception ex) { + throw HyracksDataException.create(ex); + } + } + @Override public void deleteObjects(String bucket, Collection<String> paths) throws HyracksDataException { if (paths.isEmpty()) @@ -270,21 +285,25 @@ public class AzBlobStorageCloudClient implements ICloudClient { for (List<String> batch : batchedBlobURLs) { PagedIterable<Response<Void>> responses = blobBatchClient.deleteBlobs(batch, null); Iterator<String> deletePathIter = paths.iterator(); - String deletedPath = null; - try { - for (Response<Void> response : responses) { - deletedPath = deletePathIter.next(); - // The response.getStatusCode() method returns: - // - 202 (Accepted) if the delete operation is successful - // - exception if the delete operation fails - int statusCode = response.getStatusCode(); - if (statusCode != SUCCESS_RESPONSE_CODE) { - LOGGER.warn("Failed to delete blob: {} with status code: {} while deleting {}", deletedPath, - statusCode, paths.toString()); + String deletedPath; + String failedDeletedPath = null; + for (Response<Void> response : responses) { + deletedPath = deletePathIter.next(); + // The response.getStatusCode() method returns: + // - 202 (Accepted) if the delete operation is successful + // - exception if the delete operation fails + int statusCode = response.getStatusCode(); + if (statusCode != SUCCESS_RESPONSE_CODE) { + LOGGER.warn("Failed to delete blob: {} with status code: {} while deleting {}", deletedPath, + statusCode, paths.toString()); + if (failedDeletedPath == null) { + failedDeletedPath = deletedPath; } } - } catch (BlobStorageException e) { - throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE, e, "DELETE", deletedPath, paths.toString()); + } + if (failedDeletedPath != null) { + throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE, "DELETE", failedDeletedPath, + paths.toString()); } } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java index bd0a044f0d..48bb2cda05 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java @@ -75,7 +75,7 @@ public class GCSCloudClient implements ICloudClient { private final Storage gcsClient; private final GCSClientConfig config; private final ICloudGuardian guardian; - private final IRequestProfilerLimiter profilerLimiter; + private final IRequestProfilerLimiter profiler; private final int writeBufferSize; public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian) { @@ -86,9 +86,9 @@ public class GCSCloudClient implements ICloudClient { long profilerInterval = config.getProfilerLogInterval(); GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config); if (profilerInterval > 0) { - profilerLimiter = new CountRequestProfilerLimiter(profilerInterval, limiter); + profiler = new CountRequestProfilerLimiter(profilerInterval, limiter); } else { - profilerLimiter = new RequestLimiterNoOpProfiler(limiter); + profiler = new RequestLimiterNoOpProfiler(limiter); } guardian.setCloudClient(this); } @@ -104,18 +104,18 @@ public class GCSCloudClient implements ICloudClient { @Override public IRequestProfilerLimiter getProfilerLimiter() { - return profilerLimiter; + return profiler; } @Override public ICloudWriter createWriter(String bucket, String path, IWriteBufferProvider bufferProvider) { - return new GCSWriter(bucket, config.getPrefix() + path, gcsClient, profilerLimiter, guardian, writeBufferSize); + return new GCSWriter(bucket, config.getPrefix() + path, gcsClient, profiler, guardian, writeBufferSize); } @Override public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectsList(); + profiler.objectsList(); Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path), BlobListOption.fields(Storage.BlobField.SIZE)); Set<CloudFile> files = new HashSet<>(); @@ -130,7 +130,7 @@ public class GCSCloudClient implements ICloudClient { @Override public int read(String bucket, String path, long offset, ByteBuffer buffer) throws HyracksDataException { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectGet(); + profiler.objectGet(); BlobId blobId = BlobId.of(bucket, config.getPrefix() + path); long readTo = offset + buffer.remaining(); int totalRead = 0; @@ -152,7 +152,7 @@ public class GCSCloudClient implements ICloudClient { @Override public byte[] readAllBytes(String bucket, String path) throws HyracksDataException { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectGet(); + profiler.objectGet(); BlobId blobId = BlobId.of(bucket, config.getPrefix() + path); try { return gcsClient.readAllBytes(blobId); @@ -167,7 +167,7 @@ public class GCSCloudClient implements ICloudClient { @Override public InputStream getObjectStream(String bucket, String path, long offset, long length) { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectGet(); + profiler.objectGet(); ReadChannel reader = null; try { reader = gcsClient.reader(bucket, config.getPrefix() + path).limit(offset + length); @@ -181,7 +181,7 @@ public class GCSCloudClient implements ICloudClient { @Override public void write(String bucket, String path, byte[] data) { guardian.checkWriteAccess(bucket, path); - profilerLimiter.objectWrite(); + profiler.objectWrite(); BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + path).build(); gcsClient.create(blobInfo, data); } @@ -189,10 +189,10 @@ public class GCSCloudClient implements ICloudClient { @Override public void copy(String bucket, String srcPath, FileReference destPath) { guardian.checkReadAccess(bucket, srcPath); - profilerLimiter.objectsList(); + profiler.objectsList(); Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath)); for (Blob blob : blobs.iterateAll()) { - profilerLimiter.objectCopy(); + profiler.objectCopy(); BlobId source = blob.getBlobId(); String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName())); BlobId target = BlobId.of(bucket, targetName); @@ -202,6 +202,21 @@ public class GCSCloudClient implements ICloudClient { } } + @Override + public void deleteObject(String bucket, String path) throws HyracksDataException { + try { + if (path.isEmpty()) { + return; + } + guardian.checkWriteAccess(bucket, path); + profiler.objectDelete(); + BlobId blobId = BlobId.of(bucket, config.getPrefix() + path); + gcsClient.delete(blobId); + } catch (Exception ex) { + throw HyracksDataException.create(ex); + } + } + @Override public void deleteObjects(String bucket, Collection<String> paths) throws HyracksDataException { if (paths.isEmpty()) { @@ -236,14 +251,14 @@ public class GCSCloudClient implements ICloudClient { paths.toString()); } } - profilerLimiter.objectDelete(); + profiler.objectDelete(); } } @Override public long getObjectSize(String bucket, String path) { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectGet(); + profiler.objectGet(); Blob blob = gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE)); if (blob == null) { @@ -255,7 +270,7 @@ public class GCSCloudClient implements ICloudClient { @Override public boolean exists(String bucket, String path) { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectGet(); + profiler.objectGet(); Blob blob = gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.values())); return blob != null && blob.exists(); @@ -264,7 +279,7 @@ public class GCSCloudClient implements ICloudClient { @Override public boolean isEmptyPrefix(String bucket, String path) { guardian.checkReadAccess(bucket, path); - profilerLimiter.objectsList(); + profiler.objectsList(); Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path)); return !blobs.iterateAll().iterator().hasNext(); } @@ -272,13 +287,13 @@ public class GCSCloudClient implements ICloudClient { @Override public IParallelDownloader createParallelDownloader(String bucket, IOManager ioManager) throws HyracksDataException { - return new GCSParallelDownloader(bucket, ioManager, config, profilerLimiter); + return new GCSParallelDownloader(bucket, ioManager, config, profiler); } @Override public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) { guardian.checkReadAccess(bucket, "/"); - profilerLimiter.objectsList(); + profiler.objectsList(); Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE)); ArrayNode objectsInfo = objectMapper.createArrayNode(); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java index c52e2b61c6..b6af3232b2 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriterFactory.java @@ -23,7 +23,6 @@ import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import static org.apache.hyracks.cloud.util.CloudRetryableRequestUtil.runWithNoRetryOnInterruption; import java.io.IOException; -import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.Random; @@ -179,7 +178,7 @@ abstract class AbstractCloudExternalFileWriterFactory<T extends Throwable> imple } } finally { // Delete the written file - runWithNoRetryOnInterruption(() -> testClient.deleteObjects(bucket, Collections.singleton(finalPath))); + runWithNoRetryOnInterruption(() -> testClient.deleteObject(bucket, finalPath)); } } }
