This is an automated email from the ASF dual-hosted git repository. mhubail pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 6447488423a025b89be4953edbaf17984ecf21f1 Author: Hussain Towaileb <[email protected]> AuthorDate: Mon Mar 17 11:12:28 2025 +0300 [ASTERIXDB-3581][EXT]: Do not retry GCS SDK requests if thread is interrupted - user model changes: no - storage format changes: no - interface changes: no Details: - Do not retry any GCS requests if the thread has been interrupted. Ext-ref: MB-65548 Change-Id: I5deb956bda11ee2ca41a1c05616788b2d697622f Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19525 Reviewed-by: Michael Blow <[email protected]> Tested-by: Hussain Towaileb <[email protected]> Integration-Tests: Hussain Towaileb <[email protected]> --- .../apache/asterix/app/nc/NCAppRuntimeContext.java | 2 +- .../asterix/hyracks/bootstrap/CCApplication.java | 2 +- .../asterix/cloud/AbstractCloudIOManager.java | 6 +- .../apache/asterix/cloud/CloudConfigurator.java | 17 +-- .../apache/asterix/cloud/CloudManagerProvider.java | 9 +- .../apache/asterix/cloud/EagerCloudIOManager.java | 6 +- .../apache/asterix/cloud/LazyCloudIOManager.java | 5 +- .../asterix/cloud/clients/CloudClientProvider.java | 8 +- .../cloud/clients/google/gcs/GCSCloudClient.java | 146 ++++++--------------- .../clients/google/gcs/GCSParallelDownloader.java | 5 +- .../cloud/writer/GCSExternalFileWriterFactory.java | 3 +- .../org/apache/asterix/cloud/gcs/LSMGCSTest.java | 5 +- .../external/util/google/gcs/GCSConstants.java | 92 ++++++++++++- .../asterix/external/util/google/gcs/GCSUtils.java | 14 +- .../common/exceptions/AlgebricksException.java | 4 + .../api/exceptions/HyracksDataException.java | 2 - .../hyracks/api/exceptions/HyracksException.java | 4 + .../apache/hyracks/api/util/ExceptionUtils.java | 3 + 18 files changed, 172 insertions(+), 161 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 007826ba4e..6416df9245 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -233,7 +233,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { IBufferCacheReadContext defaultContext; if (isCloudDeployment()) { cloudConfigurator = CloudConfigurator.of(cloudProperties, ioManager, namespacePathResolver, - getCloudGuardian(cloudProperties), threadExecutor); + getCloudGuardian(cloudProperties)); persistenceIOManager = cloudConfigurator.getCloudIoManager(); partitionBootstrapper = cloudConfigurator.getPartitionBootstrapper(); lockNotifier = cloudConfigurator.getLockNotifier(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 5eafe05e0c..6928b64a0b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -191,7 +191,7 @@ public class CCApplication extends BaseCCApplication { if (cloudDeployment) { cloudProperties = new CloudProperties(PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig())); ioManager = CloudConfigurator.createIOManager(ioManager, cloudProperties, namespacePathResolver, - getCloudGuardian(cloudProperties), controllerService.getExecutor()); + getCloudGuardian(cloudProperties)); } IGlobalTxManager globalTxManager = createGlobalTxManager(ioManager); appCtx = createApplicationContext(null, globalRecoveryManager, lifecycleCoordinator, Receptionist::new, diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java index 516ede2445..8fefcf5bf3 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java @@ -31,7 +31,6 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutorService; import java.util.function.Predicate; import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation; @@ -85,13 +84,12 @@ public abstract class AbstractCloudIOManager extends IOManager implements IParti private final List<FileStore> drivePaths; public AbstractCloudIOManager(IOManager ioManager, CloudProperties cloudProperties, - INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor) - throws HyracksDataException { + INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException { super(ioManager.getIODevices(), ioManager.getDeviceComputer(), ioManager.getIOParallelism(), ioManager.getQueueSize()); this.nsPathResolver = nsPathResolver; this.bucket = cloudProperties.getStorageBucket(); - cloudClient = CloudClientProvider.getClient(cloudProperties, guardian, executor); + cloudClient = CloudClientProvider.getClient(cloudProperties, guardian); this.guardian = guardian; int numOfThreads = getIODevices().size() * getIOParallelism(); writeBufferProvider = new WriteBufferProvider(numOfThreads, cloudClient.getWriteBufferSize()); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java index e494333910..dced5b0f44 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudConfigurator.java @@ -70,12 +70,11 @@ public final class CloudConfigurator { private final long diskCacheMonitoringInterval; private CloudConfigurator(CloudProperties cloudProperties, IIOManager ioManager, - INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor) - throws HyracksDataException { + INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException { this.cloudProperties = cloudProperties; localIoManager = (IOManager) ioManager; diskCacheManagerRequired = cloudProperties.getCloudCachePolicy() == CloudCachePolicy.SELECTIVE; - cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver, guardian, executor); + cloudIOManager = createIOManager(ioManager, cloudProperties, nsPathResolver, guardian); physicalDrive = createPhysicalDrive(diskCacheManagerRequired, cloudProperties, ioManager); lockNotifier = createLockNotifier(diskCacheManagerRequired); pageAllocator = createPageAllocator(diskCacheManagerRequired); @@ -132,22 +131,20 @@ public final class CloudConfigurator { } public static CloudConfigurator of(CloudProperties cloudProperties, IIOManager ioManager, - INamespacePathResolver nsPathResolver, ICloudGuardian cloudGuardian, ExecutorService executor) - throws HyracksDataException { - return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver, cloudGuardian, executor); + INamespacePathResolver nsPathResolver, ICloudGuardian cloudGuardian) throws HyracksDataException { + return new CloudConfigurator(cloudProperties, ioManager, nsPathResolver, cloudGuardian); } public static AbstractCloudIOManager createIOManager(IIOManager ioManager, CloudProperties cloudProperties, - INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor) - throws HyracksDataException { + INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException { IOManager localIoManager = (IOManager) ioManager; CloudCachePolicy policy = cloudProperties.getCloudCachePolicy(); if (policy == CloudCachePolicy.EAGER) { - return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian, executor); + return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian); } boolean selective = policy == CloudCachePolicy.SELECTIVE; - return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective, guardian, executor); + return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, selective, guardian); } private static IPhysicalDrive createPhysicalDrive(boolean diskCacheManagerRequired, CloudProperties cloudProperties, diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java index b944161fb6..b49f3ce206 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudManagerProvider.java @@ -18,8 +18,6 @@ */ package org.apache.asterix.cloud; -import java.util.concurrent.ExecutorService; - import org.apache.asterix.cloud.clients.ICloudGuardian; import org.apache.asterix.common.api.INamespacePathResolver; import org.apache.asterix.common.cloud.CloudCachePolicy; @@ -34,14 +32,13 @@ public class CloudManagerProvider { } public static IIOManager createIOManager(CloudProperties cloudProperties, IIOManager ioManager, - INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor) - throws HyracksDataException { + INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException { IOManager localIoManager = (IOManager) ioManager; if (cloudProperties.getCloudCachePolicy() == CloudCachePolicy.LAZY) { - return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, false, guardian, executor); + return new LazyCloudIOManager(localIoManager, cloudProperties, nsPathResolver, false, guardian); } - return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian, executor); + return new EagerCloudIOManager(localIoManager, cloudProperties, nsPathResolver, guardian); } public static IPartitionBootstrapper getCloudPartitionBootstrapper(IIOManager ioManager) { diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java index 29e5da03d5..1cb6077c70 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java @@ -23,7 +23,6 @@ import static org.apache.asterix.common.utils.StorageConstants.STORAGE_ROOT_DIR_ import java.io.File; import java.util.Collections; import java.util.Set; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import org.apache.asterix.cloud.clients.ICloudGuardian; @@ -49,9 +48,8 @@ final class EagerCloudIOManager extends AbstractCloudIOManager { private static final Logger LOGGER = LogManager.getLogger(); public EagerCloudIOManager(IOManager ioManager, CloudProperties cloudProperties, - INamespacePathResolver nsPathResolver, ICloudGuardian guardian, ExecutorService executor) - throws HyracksDataException { - super(ioManager, cloudProperties, nsPathResolver, guardian, executor); + INamespacePathResolver nsPathResolver, ICloudGuardian guardian) throws HyracksDataException { + super(ioManager, cloudProperties, nsPathResolver, guardian); } /* diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java index 7a7ea06e25..1c5efd9c32 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java @@ -29,7 +29,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; import org.apache.asterix.cloud.bulk.DeleteBulkCloudOperation; @@ -71,9 +70,9 @@ final class LazyCloudIOManager extends AbstractCloudIOManager { private ILazyAccessor accessor; public LazyCloudIOManager(IOManager ioManager, CloudProperties cloudProperties, - INamespacePathResolver nsPathResolver, boolean selective, ICloudGuardian guardian, ExecutorService executor) + INamespacePathResolver nsPathResolver, boolean selective, ICloudGuardian guardian) throws HyracksDataException { - super(ioManager, cloudProperties, nsPathResolver, guardian, executor); + super(ioManager, cloudProperties, nsPathResolver, guardian); accessor = new InitialCloudAccessor(cloudClient, bucket, localIoManager); puncher = HolePuncherProvider.get(this, cloudProperties, writeBufferProvider); if (selective) { diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java index a0c8bd0650..c98c6b4ba7 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java @@ -18,8 +18,6 @@ */ package org.apache.asterix.cloud.clients; -import java.util.concurrent.ExecutorService; - import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig; import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient; import org.apache.asterix.cloud.clients.azure.blobstorage.AzBlobStorageClientConfig; @@ -40,8 +38,8 @@ public class CloudClientProvider { throw new AssertionError("do not instantiate"); } - public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian, - ExecutorService executor) throws HyracksDataException { + public static ICloudClient getClient(CloudProperties cloudProperties, ICloudGuardian guardian) + throws HyracksDataException { String storageScheme = cloudProperties.getStorageScheme(); ICloudClient cloudClient; if (S3.equalsIgnoreCase(storageScheme)) { @@ -49,7 +47,7 @@ public class CloudClientProvider { cloudClient = new S3CloudClient(config, guardian); } else if (GCS.equalsIgnoreCase(storageScheme)) { GCSClientConfig config = GCSClientConfig.of(cloudProperties); - cloudClient = new GCSCloudClient(config, guardian, executor); + cloudClient = new GCSCloudClient(config, guardian); } else if (AZ_BLOB.equalsIgnoreCase(storageScheme)) { AzBlobStorageClientConfig config = AzBlobStorageClientConfig.of(cloudProperties); cloudClient = new AzBlobStorageCloudClient(config, guardian); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java index 16fb27836c..62c1835f74 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSCloudClient.java @@ -19,6 +19,7 @@ package org.apache.asterix.cloud.clients.google.gcs; import static org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig.DELETE_BATCH_SIZE; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY; import java.io.FilenameFilter; import java.io.IOException; @@ -31,11 +32,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.function.Supplier; import org.apache.asterix.cloud.IWriteBufferProvider; import org.apache.asterix.cloud.clients.CloudFile; @@ -71,7 +67,6 @@ import com.google.cloud.storage.Storage.BlobListOption; import com.google.cloud.storage.Storage.CopyRequest; import com.google.cloud.storage.StorageBatch; import com.google.cloud.storage.StorageBatchResult; -import com.google.cloud.storage.StorageException; import com.google.cloud.storage.StorageOptions; public class GCSCloudClient implements ICloudClient { @@ -81,15 +76,12 @@ public class GCSCloudClient implements ICloudClient { private final ICloudGuardian guardian; private final IRequestProfilerLimiter profilerLimiter; private final int writeBufferSize; - private final ExecutorService executor; - public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian, - ExecutorService executor) { + public GCSCloudClient(GCSClientConfig config, Storage gcsClient, ICloudGuardian guardian) { this.gcsClient = gcsClient; this.config = config; this.guardian = guardian; this.writeBufferSize = config.getWriteBufferSize(); - this.executor = executor; long profilerInterval = config.getProfilerLogInterval(); GCSRequestRateLimiter limiter = new GCSRequestRateLimiter(config); if (profilerInterval > 0) { @@ -100,9 +92,8 @@ public class GCSCloudClient implements ICloudClient { guardian.setCloudClient(this); } - public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian, ExecutorService executor) - throws HyracksDataException { - this(config, buildClient(config), guardian, executor); + public GCSCloudClient(GCSClientConfig config, ICloudGuardian guardian) throws HyracksDataException { + this(config, buildClient(config), guardian); } @Override @@ -121,12 +112,11 @@ public class GCSCloudClient implements ICloudClient { } @Override - public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) throws HyracksDataException { + public Set<CloudFile> listObjects(String bucket, String path, FilenameFilter filter) { guardian.checkReadAccess(bucket, path); profilerLimiter.objectsList(); - // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread - Page<Blob> blobs = runOpInterruptibly(() -> gcsClient.list(bucket, - BlobListOption.prefix(config.getPrefix() + path), BlobListOption.fields(Storage.BlobField.SIZE))); + Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path), + BlobListOption.fields(Storage.BlobField.SIZE)); Set<CloudFile> files = new HashSet<>(); for (Blob blob : blobs.iterateAll()) { if (filter.accept(null, IoUtil.getFileNameFromPath(blob.getName()))) { @@ -148,7 +138,7 @@ public class GCSCloudClient implements ICloudClient { from.seek(offset + totalRead); totalRead += from.read(buffer); } - } catch (IOException | StorageException ex) { + } catch (IOException | BaseServiceException ex) { throw HyracksDataException.create(ex); } @@ -163,17 +153,14 @@ public class GCSCloudClient implements ICloudClient { guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); BlobId blobId = BlobId.of(bucket, config.getPrefix() + path); - // MB-65432: Storage.readAllBytes is not interrupt-safe; we need to offload onto another thread - return runOpInterruptibly(() -> { - try { - return gcsClient.readAllBytes(blobId); - } catch (StorageException e) { - if (e.getCode() == 404) { - return null; - } - throw e; + try { + return gcsClient.readAllBytes(blobId); + } catch (BaseServiceException ex) { + if (ex.getCode() == 404) { + return null; } - }); + throw HyracksDataException.create(ex); + } } @Override @@ -185,38 +172,33 @@ public class GCSCloudClient implements ICloudClient { reader = gcsClient.reader(bucket, config.getPrefix() + path).limit(offset + length); reader.seek(offset); return Channels.newInputStream(reader); - } catch (StorageException | IOException ex) { + } catch (BaseServiceException | IOException ex) { throw new RuntimeException(CleanupUtils.close(reader, ex)); } } @Override - public void write(String bucket, String path, byte[] data) throws HyracksDataException { + public void write(String bucket, String path, byte[] data) { guardian.checkWriteAccess(bucket, path); profilerLimiter.objectWrite(); BlobInfo blobInfo = BlobInfo.newBuilder(bucket, config.getPrefix() + path).build(); - // MB-65432: Storage.create is not interrupt-safe; we need to offload onto another thread - runOpInterruptibly(() -> gcsClient.create(blobInfo, data)); + gcsClient.create(blobInfo, data); } @Override - public void copy(String bucket, String srcPath, FileReference destPath) throws HyracksDataException { + public void copy(String bucket, String srcPath, FileReference destPath) { guardian.checkReadAccess(bucket, srcPath); profilerLimiter.objectsList(); - // MB-65432: Storage.list & copy are not interrupt-safe; we need to offload onto another thread - runOpInterruptibly(() -> { - Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath)); - for (Blob blob : blobs.iterateAll()) { - profilerLimiter.objectCopy(); - BlobId source = blob.getBlobId(); - String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName())); - BlobId target = BlobId.of(bucket, targetName); - guardian.checkWriteAccess(bucket, targetName); - CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build(); - gcsClient.copy(copyReq); - } - return null; - }); + Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + srcPath)); + for (Blob blob : blobs.iterateAll()) { + profilerLimiter.objectCopy(); + BlobId source = blob.getBlobId(); + String targetName = destPath.getChildPath(IoUtil.getFileNameFromPath(source.getName())); + BlobId target = BlobId.of(bucket, targetName); + guardian.checkWriteAccess(bucket, targetName); + CopyRequest copyReq = CopyRequest.newBuilder().setSource(source).setTarget(target).build(); + gcsClient.copy(copyReq); + } } @Override @@ -234,8 +216,7 @@ public class GCSCloudClient implements ICloudClient { guardian.checkWriteAccess(bucket, blobId.getName()); deleteResponses.add(batchRequest.delete(blobId)); } - // MB-65432: StorageBatch.submit may not be interrupt-safe; we need to offload onto another thread - runOpInterruptibly(batchRequest::submit); + batchRequest.submit(); Iterator<String> deletePathIter = paths.iterator(); for (StorageBatchResult<Boolean> deleteResponse : deleteResponses) { String deletedPath = deletePathIter.next(); @@ -250,7 +231,7 @@ public class GCSCloudClient implements ICloudClient { } } catch (BaseServiceException e) { LOGGER.warn("Failed to delete object {} while deleting {}", deletedPath, paths, e); - throw new RuntimeDataException(ErrorCode.CLOUD_IO_FAILURE, e, "DELETE", deletedPath, + throw RuntimeDataException.create(ErrorCode.CLOUD_IO_FAILURE, e, "DELETE", deletedPath, paths.toString()); } } @@ -259,12 +240,11 @@ public class GCSCloudClient implements ICloudClient { } @Override - public long getObjectSize(String bucket, String path) throws HyracksDataException { + public long getObjectSize(String bucket, String path) { guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); - // MB-65432: Storage.get is not interrupt-safe; we need to offload onto another thread - Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, config.getPrefix() + path, - Storage.BlobGetOption.fields(Storage.BlobField.SIZE))); + Blob blob = + gcsClient.get(bucket, config.getPrefix() + path, Storage.BlobGetOption.fields(Storage.BlobField.SIZE)); if (blob == null) { return 0; } @@ -272,22 +252,19 @@ public class GCSCloudClient implements ICloudClient { } @Override - public boolean exists(String bucket, String path) throws HyracksDataException { + public boolean exists(String bucket, String path) { guardian.checkReadAccess(bucket, path); profilerLimiter.objectGet(); - // MB-65432: Storage.get is not interrupt-safe; we need to offload onto another thread - Blob blob = runOpInterruptibly(() -> gcsClient.get(bucket, config.getPrefix() + path, - Storage.BlobGetOption.fields(Storage.BlobField.values()))); + Blob blob = gcsClient.get(bucket, config.getPrefix() + path, + Storage.BlobGetOption.fields(Storage.BlobField.values())); return blob != null && blob.exists(); } @Override - public boolean isEmptyPrefix(String bucket, String path) throws HyracksDataException { + public boolean isEmptyPrefix(String bucket, String path) { guardian.checkReadAccess(bucket, path); profilerLimiter.objectsList(); - // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread - Page<Blob> blobs = - runOpInterruptibly(() -> gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path))); + Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.prefix(config.getPrefix() + path)); return !blobs.iterateAll().iterator().hasNext(); } @@ -298,12 +275,10 @@ public class GCSCloudClient implements ICloudClient { } @Override - public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) throws HyracksDataException { + public JsonNode listAsJson(ObjectMapper objectMapper, String bucket) { guardian.checkReadAccess(bucket, "/"); profilerLimiter.objectsList(); - // MB-65432: Storage.list is not interrupt-safe; we need to offload onto another thread - Page<Blob> blobs = - runOpInterruptibly(() -> gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE))); + Page<Blob> blobs = gcsClient.list(bucket, BlobListOption.fields(Storage.BlobField.SIZE)); ArrayNode objectsInfo = objectMapper.createArrayNode(); List<Blob> objects = new ArrayList<>(); @@ -328,6 +303,7 @@ public class GCSCloudClient implements ICloudClient { private static Storage buildClient(GCSClientConfig config) throws HyracksDataException { StorageOptions.Builder builder = StorageOptions.newBuilder().setCredentials(config.createCredentialsProvider()); + builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY); if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) { builder.setHost(config.getEndpoint()); @@ -338,42 +314,4 @@ public class GCSCloudClient implements ICloudClient { private String stripCloudPrefix(String objectName) { return objectName.substring(config.getPrefix().length()); } - - private void runOpInterruptibly(Runnable operation) throws HyracksDataException { - try { - executor.submit(operation).get(); - } catch (InterruptedException e) { - throw HyracksDataException.create(e); - } catch (ExecutionException e) { - throw HyracksDataException.create(e.getCause()); - } - } - - private <T> T runOpInterruptibly(Supplier<T> operation) throws HyracksDataException { - Future<T> opTask = executor.submit(operation::get); - try { - return opTask.get(); - } catch (InterruptedException e) { - cancelAndUnwind(opTask); - throw HyracksDataException.create(e); - } catch (ExecutionException e) { - throw HyracksDataException.create(e.getCause()); - } - } - - private static <T> void cancelAndUnwind(Future<T> opTask) { - opTask.cancel(true); - while (true) { - try { - opTask.get(); - } catch (InterruptedException e1) { - continue; - } catch (CancellationException e1) { - LOGGER.debug("ignoring exception after cancel of op", e1); - } catch (ExecutionException e1) { - LOGGER.debug("ignoring exception after cancel of op", e1.getCause()); - } - return; - } - } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java index cbbaf31bc0..b9e7eeea5c 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSParallelDownloader.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.cloud.clients.google.gcs; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY; + import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; @@ -64,6 +66,7 @@ public class GCSParallelDownloader implements IParallelDownloader { this.ioManager = ioManager; this.profiler = profiler; StorageOptions.Builder builder = StorageOptions.newBuilder(); + builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY); if (config.getEndpoint() != null && !config.getEndpoint().isEmpty()) { builder.setHost(config.getEndpoint()); } @@ -95,7 +98,6 @@ public class GCSParallelDownloader implements IParallelDownloader { downloadJobs.add(transferManager.downloadBlobs(entry.getValue(), downConfig.setDownloadDirectory(entry.getKey()).build())); } - // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need to offload downloadJobs.forEach(DownloadJob::getDownloadResults); } @@ -122,7 +124,6 @@ public class GCSParallelDownloader implements IParallelDownloader { } List<DownloadResult> results; for (DownloadJob job : downloadJobs) { - // MB-65432: DownloadJob.getDownloadResults is interrupt-safe; no need to offload results = job.getDownloadResults(); for (DownloadResult result : results) { if (result.getStatus() != TransferStatus.SUCCESS) { diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java index 36bb94c7e6..3a34365fac 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/GCSExternalFileWriterFactory.java @@ -67,8 +67,7 @@ public final class GCSExternalFileWriterFactory extends AbstractCloudExternalFil ICloudClient createCloudClient(IApplicationContext appCtx) throws CompilationException { GCSClientConfig config = GCSClientConfig.of(configuration, writeBufferSize); return new GCSCloudClient(config, GCSUtils.buildClient(configuration), - ICloudGuardian.NoOpCloudGuardian.INSTANCE, - appCtx.getServiceContext().getControllerService().getExecutor()); + ICloudGuardian.NoOpCloudGuardian.INSTANCE); } @Override diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java index 2f64c37828..d89c872517 100644 --- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java +++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/gcs/LSMGCSTest.java @@ -18,8 +18,6 @@ */ package org.apache.asterix.cloud.gcs; -import java.util.concurrent.Executors; - import org.apache.asterix.cloud.AbstractLSMTest; import org.apache.asterix.cloud.clients.ICloudGuardian; import org.apache.asterix.cloud.clients.google.gcs.GCSClientConfig; @@ -54,8 +52,7 @@ public class LSMGCSTest extends AbstractLSMTest { int writeBufferSize = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE); GCSClientConfig config = new GCSClientConfig(MOCK_SERVER_REGION, MOCK_SERVER_HOSTNAME, true, 0, writeBufferSize, ""); - CLOUD_CLIENT = - new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE, Executors.newCachedThreadPool()); + CLOUD_CLIENT = new GCSCloudClient(config, ICloudGuardian.NoOpCloudGuardian.INSTANCE); } private static void cleanup() { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java index 2613f34e8f..739dbde549 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSConstants.java @@ -18,7 +18,23 @@ */ package org.apache.asterix.external.util.google.gcs; +import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace; + +import java.util.concurrent.CancellationException; + +import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.retrying.TimedAttemptSettings; +import com.google.cloud.ExceptionHandler; +import com.google.cloud.storage.StorageRetryStrategy; + public class GCSConstants { + + private static final Logger LOGGER = LogManager.getLogger(); + private GCSConstants() { throw new AssertionError("do not instantiate"); } @@ -36,25 +52,87 @@ public class GCSConstants { // hadoop credentials public static final String HADOOP_AUTH_TYPE = "fs.gs.auth.type"; public static final String HADOOP_AUTH_UNAUTHENTICATED = "UNAUTHENTICATED"; - public static final String HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE = "SERVICE_ACCOUNT_JSON_KEYFILE"; - public static final String HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH = - "google.cloud.auth.service.account.json.keyfile"; // gs hadoop parameters public static final String HADOOP_SUPPORT_COMPRESSED = "fs.gs.inputstream.support.gzip.encoding.enable"; public static final String HADOOP_ENDPOINT = "fs.gs.storage.root.url"; - public static final String HADOOP_MAX_REQUESTS_PER_BATCH = "fs.gs.max.requests.per.batch"; - public static final String HADOOP_BATCH_THREADS = "fs.gs.batch.threads"; - public static class JSON_CREDENTIALS_FIELDS { + public static class JsonCredentials { public static final String PRIVATE_KEY_ID = "private_key_id"; public static final String PRIVATE_KEY = "private_key"; public static final String CLIENT_EMAIL = "client_email"; } - public static class HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS { + public static class HadoopAuthServiceAccount { public static final String PRIVATE_KEY_ID = "fs.gs.auth.service.account.private.key.id"; public static final String PRIVATE_KEY = "fs.gs.auth.service.account.private.key"; public static final String CLIENT_EMAIL = "fs.gs.auth.service.account.email"; } + + public static final StorageRetryStrategy DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY; + static { + StorageRetryStrategy defaultStrategy = StorageRetryStrategy.getDefaultStorageRetryStrategy(); + ExceptionHandler defaultIdempotentHandler = (ExceptionHandler) defaultStrategy.getIdempotentHandler(); + ExceptionHandler defaultNonIdempotentHandler = (ExceptionHandler) defaultStrategy.getNonidempotentHandler(); + + ResultRetryAlgorithm<Object> noRetryOnThreadInterruptIdempotentHandler = new ResultRetryAlgorithm<>() { + @Override + public TimedAttemptSettings createNextAttempt(Throwable prevThrowable, Object prevResponse, + TimedAttemptSettings prevSettings) { + return defaultIdempotentHandler.createNextAttempt(prevThrowable, prevResponse, prevSettings); + } + + @Override + public boolean shouldRetry(Throwable prevThrowable, Object prevResponse) throws CancellationException { + if (ExceptionUtils.causedByInterrupt(prevThrowable) || Thread.currentThread().isInterrupted()) { + interruptRequest(prevThrowable); + } + return defaultIdempotentHandler.shouldRetry(prevThrowable, prevResponse); + } + }; + + ResultRetryAlgorithm<Object> noRetryOnThreadInterruptNonIdempotentHandler = new ResultRetryAlgorithm<>() { + @Override + public TimedAttemptSettings createNextAttempt(Throwable prevThrowable, Object prevResponse, + TimedAttemptSettings prevSettings) { + return defaultNonIdempotentHandler.createNextAttempt(prevThrowable, prevResponse, prevSettings); + } + + @Override + public boolean shouldRetry(Throwable prevThrowable, Object prevResponse) throws CancellationException { + if (ExceptionUtils.causedByInterrupt(prevThrowable) || Thread.currentThread().isInterrupted()) { + interruptRequest(prevThrowable); + } + return defaultNonIdempotentHandler.shouldRetry(prevThrowable, prevResponse); + } + }; + + DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY = new StorageRetryStrategy() { + private static final long serialVersionUID = 1L; + + @Override + public ResultRetryAlgorithm<?> getIdempotentHandler() { + return noRetryOnThreadInterruptIdempotentHandler; + } + + @Override + public ResultRetryAlgorithm<?> getNonidempotentHandler() { + return noRetryOnThreadInterruptNonIdempotentHandler; + } + }; + } + + /** + * Throwing a CancellationException will cause the GCS client to abort the whole operation, not only stop retrying + */ + private static void interruptRequest(Throwable th) { + Thread.currentThread().interrupt(); + CancellationException ex = new CancellationException("Request was interrupted, aborting retries and request"); + if (th != null) { + ex.initCause(th); + } + String stackTrace = getStackTrace(ex); + LOGGER.debug("Request was interrupted, aborting retries and request\n{}", stackTrace); + throw ex; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java index 481b7ff237..d768c230ce 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java @@ -26,6 +26,7 @@ import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties; import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME; +import static org.apache.asterix.external.util.google.gcs.GCSConstants.DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY; import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME; import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE; import static org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED; @@ -94,6 +95,7 @@ public class GCSUtils { String endpoint = configuration.get(ENDPOINT_FIELD_NAME); StorageOptions.Builder builder = StorageOptions.newBuilder(); + builder.setStorageRetryStrategy(DEFAULT_NO_RETRY_ON_THREAD_INTERRUPT_STRATEGY); // default credentials provider if (applicationDefaultCredentials != null) { @@ -259,12 +261,12 @@ public class GCSUtils { // Setting these values instead of HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported // in com.google.cloud.bigdataoss:util-hadoop only up to version hadoop3-2.2.x and is removed in // version 3.x.y, which also removed support for hadoop-2 - conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.PRIVATE_KEY_ID, - jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.PRIVATE_KEY_ID).asText()); - conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.PRIVATE_KEY, - jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.PRIVATE_KEY).asText()); - conf.set(GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_FIELDS.CLIENT_EMAIL, - jsonCreds.get(GCSConstants.JSON_CREDENTIALS_FIELDS.CLIENT_EMAIL).asText()); + conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY_ID, + jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY_ID).asText()); + conf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY, + jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY).asText()); + conf.set(GCSConstants.HadoopAuthServiceAccount.CLIENT_EMAIL, + jsonCreds.get(GCSConstants.JsonCredentials.CLIENT_EMAIL).asText()); } catch (JsonProcessingException e) { throw CompilationException.create(EXTERNAL_SOURCE_ERROR, "Unable to parse Json Credentials", getMessageOrToString(e)); diff --git a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java index 0b7af91120..483076b8f2 100644 --- a/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java +++ b/hyracks-fullstack/algebricks/algebricks-common/src/main/java/org/apache/hyracks/algebricks/common/exceptions/AlgebricksException.java @@ -61,6 +61,10 @@ public class AlgebricksException extends Exception implements IFormattedExceptio this.sourceLoc = sourceLoc; this.nodeId = nodeId; this.params = params; + + if (cause instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } } /** diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java index 46c8ec2049..50df1cb882 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java @@ -43,8 +43,6 @@ public class HyracksDataException extends HyracksException { } if (cause instanceof HyracksDataException) { return (HyracksDataException) cause; - } else if (cause instanceof InterruptedException) { - Thread.currentThread().interrupt(); } else if (cause instanceof Error) { throw (Error) cause; } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java index 12f10959de..b12c8df946 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksException.java @@ -79,6 +79,10 @@ public class HyracksException extends IOException implements IFormattedException this.errorCode = intCode; this.nodeId = nodeId; this.params = params; + + if (cause instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } } protected HyracksException(IError errorCode, Throwable cause, SourceLocation sourceLoc, String nodeId, diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java index d616eb5250..ddba3f0e29 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/ExceptionUtils.java @@ -128,6 +128,9 @@ public class ExceptionUtils { } public static Throwable getRootCause(Throwable e) { + if (e == null) { + return null; + } Throwable current = e; Throwable cause = e.getCause(); while (cause != null && cause != current) {
