This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 83ab6f360ef42125f7a9efdcb62b519d13afbc5d Author: Hussain Towaileb <[email protected]> AuthorDate: Thu Oct 16 09:45:17 2025 +0300 [ASTERIXDB-3659][EXT]: delegate assume role auth to AWS SDK Details: when using assume role authentication for AWS SDK, we are using StsAssumeRoleCredentialsProvider, this provider will refresh the assume role credentials based on the provided configuration. However, as this provider stays open and refreshes itself, it requires the following: - Closing this provider manually when done using it. - STS client to talk to the STS service to assume role. - Which in turn requires manual closing when done. The method for building clients was changed to return CloseableAwsClients which contains: - Consuming client This is the target client we are building, e.g., S3 client or Glue client. - STS client This is the STS client used for talking with STS service to assume the role. - Credentials Provider The credentials provider used for the consume client. The above maintains a refrence for all clients and credential providers so they all can be closed after we are done using them. Ext-ref: MB-68987 Change-Id: I6a3c755f94d377b443f21594443fac830875610e Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20493 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Jenkins <[email protected]> --- .../asterix/app/cc/CcApplicationContext.java | 18 -- .../app/external/ExternalCredentialsCache.java | 110 ----------- .../external/ExternalCredentialsCacheUpdater.java | 162 ---------------- .../app/message/RefreshAwsCredentialsRequest.java | 81 -------- .../app/message/RefreshAwsCredentialsResponse.java | 73 -------- .../message/UpdateAwsCredentialsCacheRequest.java | 57 ------ .../apache/asterix/app/nc/NCAppRuntimeContext.java | 14 -- .../api/cluster_state_1/cluster_state_1.1.regexadm | 6 +- .../cluster_state_1_full.1.regexadm | 6 +- .../cluster_state_1_less.1.regexadm | 6 +- .../cloud/clients/aws/s3/S3CloudClient.java | 22 ++- .../asterix/common/api/IApplicationContext.java | 12 -- .../asterix/common/config/ExternalProperties.java | 42 ++++- asterixdb/asterix-external-data/pom.xml | 4 + .../input/record/reader/aws/AwsS3InputStream.java | 24 +-- .../apache/asterix/external/util/aws/AwsUtils.java | 203 ++++++++++++++------- .../asterix/external/util/aws/glue/GlueUtils.java | 67 +++++++ .../asterix/external/util/aws/s3/S3Constants.java | 4 + .../asterix/external/util/aws/s3/S3Utils.java | 123 ++++++------- .../LSMPrimaryUpsertOperatorNodePushable.java | 2 +- .../org/apache/hyracks/api/util/CleanupUtils.java | 4 +- .../LSMIndexCompactOperatorNodePushable.java | 2 +- 22 files changed, 345 insertions(+), 697 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java index 9cecaa7b5e..6ea7aebf83 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/cc/CcApplicationContext.java @@ -24,8 +24,6 @@ import java.io.IOException; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Supplier; -import org.apache.asterix.app.external.ExternalCredentialsCache; -import org.apache.asterix.app.external.ExternalCredentialsCacheUpdater; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.IConfigValidator; import org.apache.asterix.common.api.IConfigValidatorFactory; @@ -57,8 +55,6 @@ import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.dataflow.IDataPartitioningProvider; import org.apache.asterix.common.external.IAdapterFactoryService; -import org.apache.asterix.common.external.IExternalCredentialsCache; -import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater; import org.apache.asterix.common.metadata.IMetadataBootstrap; import org.apache.asterix.common.metadata.IMetadataLockUtil; import org.apache.asterix.common.replication.INcLifecycleCoordinator; @@ -131,8 +127,6 @@ public class CcApplicationContext implements ICcApplicationContext { private final IOManager ioManager; private final INamespacePathResolver namespacePathResolver; private final INamespaceResolver namespaceResolver; - private final IExternalCredentialsCache externalCredentialsCache; - private final IExternalCredentialsCacheUpdater externalCredentialsCacheUpdater; public CcApplicationContext(ICCServiceContext ccServiceCtx, HyracksConnection hcc, Supplier<IMetadataBootstrap> metadataBootstrapSupplier, IGlobalRecoveryManager globalRecoveryManager, @@ -183,8 +177,6 @@ public class CcApplicationContext implements ICcApplicationContext { this.globalTxManager = globalTxManager; this.ioManager = ioManager; dataPartitioningProvider = DataPartitioningProvider.create(this); - externalCredentialsCache = new ExternalCredentialsCache(this); - externalCredentialsCacheUpdater = new ExternalCredentialsCacheUpdater(this); } @Override @@ -423,14 +415,4 @@ public class CcApplicationContext implements ICcApplicationContext { public IOManager getIoManager() { return ioManager; } - - @Override - public IExternalCredentialsCache getExternalCredentialsCache() { - return externalCredentialsCache; - } - - @Override - public IExternalCredentialsCacheUpdater getExternalCredentialsCacheUpdater() { - return externalCredentialsCacheUpdater; - } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java deleted file mode 100644 index fcdbabda44..0000000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCache.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.external; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - -import org.apache.asterix.common.api.IApplicationContext; -import org.apache.asterix.common.external.IExternalCredentialsCache; -import org.apache.hyracks.util.Span; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -public class ExternalCredentialsCache implements IExternalCredentialsCache { - - private static final Logger LOGGER = LogManager.getLogger(); - private final ConcurrentMap<String, TemporaryCredentials> cache = new ConcurrentHashMap<>(); - private final int awsAssumeRoleDuration; - private final int refreshAwsAssumeRoleThresholdPercentage; - - public ExternalCredentialsCache(IApplicationContext appCtx) { - this.awsAssumeRoleDuration = appCtx.getExternalProperties().getAwsAssumeRoleDuration(); - this.refreshAwsAssumeRoleThresholdPercentage = - appCtx.getExternalProperties().getAwsRefreshAssumeRoleThresholdPercentage(); - } - - @Override - public synchronized Object get(String key) { - invalidateCache(); - if (cache.containsKey(key)) { - return cache.get(key).getCredentials(); - } - return null; - } - - @Override - public void delete(String key) { - Object removed = cache.remove(key); - if (removed != null) { - LOGGER.info("Removed cached credentials for {} because it got deleted", key); - } - } - - @Override - public synchronized void put(String key, Object credentials) { - cache.put(key, new TemporaryCredentials(Span.start(awsAssumeRoleDuration, TimeUnit.SECONDS), credentials)); - LOGGER.info("Received and cached new credentials for {}", key); - } - - /** - * Iterates the cache and removes the credentials that are considered expired - */ - private void invalidateCache() { - cache.entrySet().removeIf(entry -> { - boolean shouldRemove = needsRefresh(entry.getValue().getDuration()); - if (shouldRemove) { - LOGGER.info("Removing cached credentials for {} because it expired", entry.getKey()); - } - return shouldRemove; - }); - } - - /** - * Refresh if the remaining time is less than the configured refresh percentage - * - * @param span expiration span - * @return true if the remaining time is less than the configured refresh percentage, false otherwise - */ - private boolean needsRefresh(Span span) { - double remaining = (double) span.remaining(TimeUnit.SECONDS) / span.getSpan(TimeUnit.SECONDS); - double passed = 1 - remaining; - int passedPercentage = (int) (passed * 100); - return passedPercentage > refreshAwsAssumeRoleThresholdPercentage; - } - - static class TemporaryCredentials { - private final Span duration; - private final Object credentials; - - public TemporaryCredentials(Span duration, Object credentials) { - this.duration = duration; - this.credentials = credentials; - } - - public Span getDuration() { - return duration; - } - - public Object getCredentials() { - return credentials; - } - } -} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java deleted file mode 100644 index a9541260c9..0000000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalCredentialsCacheUpdater.java +++ /dev/null @@ -1,162 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.external; - -import static org.apache.asterix.app.message.ExecuteStatementRequestMessage.DEFAULT_NC_TIMEOUT_MILLIS; -import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.ACTIVE; -import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.REBALANCE_REQUIRED; -import static org.apache.asterix.common.exceptions.ErrorCode.REJECT_BAD_CLUSTER_STATE; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import org.apache.asterix.app.message.RefreshAwsCredentialsRequest; -import org.apache.asterix.app.message.RefreshAwsCredentialsResponse; -import org.apache.asterix.app.message.UpdateAwsCredentialsCacheRequest; -import org.apache.asterix.common.api.IApplicationContext; -import org.apache.asterix.common.api.IClusterManagementWork; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.exceptions.CompilationException; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.common.external.IExternalCredentialsCache; -import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater; -import org.apache.asterix.common.messaging.api.INcAddressedMessage; -import org.apache.asterix.common.messaging.api.MessageFuture; -import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.aws.AwsUtils; -import org.apache.asterix.messaging.CCMessageBroker; -import org.apache.asterix.messaging.NCMessageBroker; -import org.apache.hyracks.api.application.INCServiceContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; - -/** - * The class is responsible for generating new credentials based on the adapter type. Given a request: - * - if we are the CC, generate new creds and ask all NCs to update their cache - * - if we are the NC, send a message to the CC to generate new creds - */ -public class ExternalCredentialsCacheUpdater implements IExternalCredentialsCacheUpdater { - - private static final Logger LOGGER = LogManager.getLogger(); - private final IApplicationContext appCtx; - - public ExternalCredentialsCacheUpdater(IApplicationContext appCtx) { - this.appCtx = appCtx; - } - - @Override - public synchronized Object generateAndCacheCredentials(Map<String, String> configuration) - throws HyracksDataException, CompilationException { - IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache(); - String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID); - Object credentials = cache.get(key); - if (credentials != null) { - return credentials; - } - - String type = configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE); - if (ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3.equals(type)) { - return generateAwsCredentials(configuration); - } else { - // this should never happen - throw new IllegalArgumentException("Unsupported external source type: " + type); - } - } - - // TODO: this can probably be refactored out into something that is AWS-specific - private AwsSessionCredentials generateAwsCredentials(Map<String, String> configuration) - throws HyracksDataException, CompilationException { - String key = configuration.get(ExternalDataConstants.KEY_ENTITY_ID); - AwsSessionCredentials credentials; - if (appCtx instanceof ICcApplicationContext) { - validateClusterState(); - try { - LOGGER.info("attempting to update AWS credentials for {}", key); - AwsCredentialsProvider newCredentials = AwsUtils.assumeRoleAndGetCredentials(configuration); - LOGGER.info("updated AWS credentials successfully for {}", key); - credentials = (AwsSessionCredentials) newCredentials.resolveCredentials(); - appCtx.getExternalCredentialsCache().put(key, credentials); - } catch (CompilationException ex) { - LOGGER.info("failed to refresh AWS credentials for {}", key, ex); - throw ex; - } - - String accessKeyId = credentials.accessKeyId(); - String secretAccessKey = credentials.secretAccessKey(); - String sessionToken = credentials.sessionToken(); - UpdateAwsCredentialsCacheRequest request = - new UpdateAwsCredentialsCacheRequest(configuration, accessKeyId, secretAccessKey, sessionToken); - - // request all NCs to update their credentials cache with the latest creds - updateNcsCredentialsCache(key, request); - } else { - NCMessageBroker broker = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); - MessageFuture messageFuture = broker.registerMessageFuture(); - String nodeId = ((INCServiceContext) appCtx.getServiceContext()).getNodeId(); - long futureId = messageFuture.getFutureId(); - RefreshAwsCredentialsRequest request = new RefreshAwsCredentialsRequest(nodeId, futureId, configuration); - try { - LOGGER.info("no valid AWS credentials found for {}, requesting AWS credentials from CC", key); - broker.sendMessageToPrimaryCC(request); - RefreshAwsCredentialsResponse response = (RefreshAwsCredentialsResponse) messageFuture - .get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); - if (response.getFailure() != null) { - throw HyracksDataException.create(response.getFailure()); - } - credentials = AwsSessionCredentials.create(response.getAccessKeyId(), response.getSecretAccessKey(), - response.getSessionToken()); - } catch (Exception ex) { - LOGGER.info("failed to refresh AWS credentials for {}", key, ex); - throw HyracksDataException.create(ex); - } finally { - broker.deregisterMessageFuture(futureId); - } - } - return credentials; - } - - private void updateNcsCredentialsCache(String key, INcAddressedMessage request) throws HyracksDataException { - ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx; - final List<String> ncs = new ArrayList<>(ccAppCtx.getClusterStateManager().getParticipantNodes()); - CCMessageBroker broker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); - try { - LOGGER.info("requesting all NCs to update their credentials for {}", key); - for (String nc : ncs) { - broker.sendApplicationMessageToNC(request, nc); - } - } catch (Exception e) { - LOGGER.info("failed to send message to nc", e); - throw HyracksDataException.create(e); - } - } - - private void validateClusterState() throws HyracksDataException { - ICcApplicationContext ccAppCtx = (ICcApplicationContext) appCtx; - IClusterManagementWork.ClusterState state = ccAppCtx.getClusterStateManager().getState(); - if (!(state == ACTIVE || state == REBALANCE_REQUIRED)) { - throw new RuntimeDataException(REJECT_BAD_CLUSTER_STATE, state); - } - } -} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java deleted file mode 100644 index 32de92d45b..0000000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsRequest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.message; - -import java.util.Map; - -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater; -import org.apache.asterix.common.messaging.api.ICcAddressedMessage; -import org.apache.asterix.messaging.CCMessageBroker; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; - -public class RefreshAwsCredentialsRequest implements ICcAddressedMessage { - private static final long serialVersionUID = 1L; - private static final Logger LOGGER = LogManager.getLogger(); - - private final String nodeId; - private final long reqId; - private final Map<String, String> configuration; - - public RefreshAwsCredentialsRequest(String nodeId, long reqId, Map<String, String> configuration) { - this.nodeId = nodeId; - this.reqId = reqId; - this.configuration = configuration; - } - - @Override - public final void handle(ICcApplicationContext appCtx) throws HyracksDataException { - try { - IExternalCredentialsCacheUpdater cacheUpdater = appCtx.getExternalCredentialsCacheUpdater(); - Object credentials = cacheUpdater.generateAndCacheCredentials(configuration); - AwsSessionCredentials sessionCredentials = (AwsSessionCredentials) credentials; - - // respond with the credentials - RefreshAwsCredentialsResponse response = - new RefreshAwsCredentialsResponse(reqId, sessionCredentials.accessKeyId(), - sessionCredentials.secretAccessKey(), sessionCredentials.sessionToken(), null); - respond(appCtx, response); - } catch (Exception e) { - LOGGER.info("failed to refresh credentials", e); - RefreshAwsCredentialsResponse response = new RefreshAwsCredentialsResponse(reqId, null, null, null, e); - respond(appCtx, response); - } - } - - private void respond(ICcApplicationContext appCtx, RefreshAwsCredentialsResponse response) - throws HyracksDataException { - CCMessageBroker broker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); - try { - broker.sendApplicationMessageToNC(response, nodeId); - } catch (Exception e) { - LOGGER.info("failed to send reply to nc", e); - throw HyracksDataException.create(e); - } - } - - @Override - public boolean isWhispered() { - return true; - } -} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java deleted file mode 100644 index 9ea0e1165b..0000000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/RefreshAwsCredentialsResponse.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.message; - -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.messaging.api.INcAddressedMessage; -import org.apache.asterix.common.messaging.api.MessageFuture; -import org.apache.asterix.messaging.NCMessageBroker; - -public class RefreshAwsCredentialsResponse implements INcAddressedMessage { - - private static final long serialVersionUID = 1L; - private final long reqId; - private final String accessKeyId; - private final String secretAccessKey; - private final String sessionToken; - private final Throwable failure; - - public RefreshAwsCredentialsResponse(long reqId, String accessKeyId, String secretAccessKey, String sessionToken, - Throwable failure) { - this.reqId = reqId; - this.accessKeyId = accessKeyId; - this.secretAccessKey = secretAccessKey; - this.sessionToken = sessionToken; - this.failure = failure; - } - - @Override - public void handle(INcApplicationContext appCtx) { - NCMessageBroker mb = (NCMessageBroker) appCtx.getServiceContext().getMessageBroker(); - MessageFuture future = mb.deregisterMessageFuture(reqId); - if (future != null) { - future.complete(this); - } - } - - public String getAccessKeyId() { - return accessKeyId; - } - - public String getSecretAccessKey() { - return secretAccessKey; - } - - public String getSessionToken() { - return sessionToken; - } - - public Throwable getFailure() { - return failure; - } - - @Override - public boolean isWhispered() { - return true; - } -} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java deleted file mode 100644 index 753dbf11e5..0000000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/UpdateAwsCredentialsCacheRequest.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.app.message; - -import java.util.Map; - -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.messaging.api.INcAddressedMessage; -import org.apache.asterix.external.util.ExternalDataConstants; - -import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; - -public class UpdateAwsCredentialsCacheRequest implements INcAddressedMessage { - - private static final long serialVersionUID = 1L; - private final Map<String, String> configuration; - private final String accessKeyId; - private final String secretAccessKey; - private final String sessionToken; - - public UpdateAwsCredentialsCacheRequest(Map<String, String> configuration, String accessKeyId, - String secretAccessKey, String sessionToken) { - this.configuration = configuration; - this.accessKeyId = accessKeyId; - this.secretAccessKey = secretAccessKey; - this.sessionToken = sessionToken; - - } - - @Override - public void handle(INcApplicationContext appCtx) { - String name = configuration.get(ExternalDataConstants.KEY_ENTITY_ID); - AwsSessionCredentials credentials = AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken); - appCtx.getExternalCredentialsCache().put(name, credentials); - } - - @Override - public boolean isWhispered() { - return true; - } -} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 14d16515da..f669d4839f 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -31,8 +31,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import org.apache.asterix.active.ActiveManager; -import org.apache.asterix.app.external.ExternalCredentialsCache; -import org.apache.asterix.app.external.ExternalCredentialsCacheUpdater; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.cloud.CloudConfigurator; import org.apache.asterix.cloud.LocalPartitionBootstrapper; @@ -221,8 +219,6 @@ public class NCAppRuntimeContext implements INcApplicationContext { cacheManager = new CacheManager(); this.namespacePathResolver = namespacePathResolver; this.namespaceResolver = namespaceResolver; - this.externalCredentialsCache = new ExternalCredentialsCache(this); - this.externalCredentialsCacheUpdater = new ExternalCredentialsCacheUpdater(this); } @Override @@ -778,14 +774,4 @@ public class NCAppRuntimeContext implements INcApplicationContext { return isCloudDeployment() ? storageProperties.getStoragePartitionsCount() : ncServiceContext.getIoManager().getIODevices().size(); } - - @Override - public IExternalCredentialsCache getExternalCredentialsCache() { - return externalCredentialsCache; - } - - @Override - public IExternalCredentialsCacheUpdater getExternalCredentialsCacheUpdater() { - return externalCredentialsCacheUpdater; - } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm index eb32737916..b78e645452 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1/cluster_state_1.1.regexadm @@ -8,8 +8,10 @@ "active\.memory\.global\.budget" : 67108864, "active\.stop\.timeout" : 3600, "active\.suspend\.timeout" : 3600, - "aws.assume.role.duration" : 900, - "aws.refresh.assume.role.threshold.percentage" : 75, + "aws.assume.role.async.refresh.enabled" : true, + "aws.assume.role.duration" : 3600, + "aws.assume.role.prefetch.time" : 300, + "aws.assume.role.stale.time" : 60, "azure.request.timeout" : 120, "cloud.acquire.token.timeout" : 100, "cloud.deployment" : false, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm index 99fcba07cf..51afe07ea9 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_full/cluster_state_1_full.1.regexadm @@ -8,8 +8,10 @@ "active\.memory\.global\.budget" : 67108864, "active\.stop\.timeout" : 3600, "active\.suspend\.timeout" : 3600, - "aws.assume.role.duration" : 900, - "aws.refresh.assume.role.threshold.percentage" : 75, + "aws.assume.role.async.refresh.enabled" : true, + "aws.assume.role.duration" : 3600, + "aws.assume.role.prefetch.time" : 300, + "aws.assume.role.stale.time" : 60, "azure.request.timeout" : 120, "cloud.acquire.token.timeout" : 100, "cloud.deployment" : false, diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm index 98d1a44bdf..06dd8fbc42 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/api/cluster_state_1_less/cluster_state_1_less.1.regexadm @@ -8,8 +8,10 @@ "active\.memory\.global\.budget" : 67108864, "active\.stop\.timeout" : 3600, "active\.suspend\.timeout" : 3600, - "aws.assume.role.duration" : 900, - "aws.refresh.assume.role.threshold.percentage" : 75, + "aws.assume.role.async.refresh.enabled" : true, + "aws.assume.role.duration" : 3600, + "aws.assume.role.prefetch.time" : 300, + "aws.assume.role.stale.time" : 60, "azure.request.timeout" : 120, "cloud.acquire.token.timeout" : 100, "cloud.deployment" : false, diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java index c9fd485135..defffe0fc5 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java @@ -50,6 +50,8 @@ import org.apache.asterix.cloud.clients.profiler.IRequestProfilerLimiter; import org.apache.asterix.cloud.clients.profiler.RequestLimiterNoOpProfiler; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.external.util.aws.AwsUtils; +import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.util.IoUtil; @@ -63,6 +65,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.http.SdkHttpClient; @@ -90,6 +93,7 @@ import software.amazon.awssdk.utils.AttributeMap; public final class S3CloudClient implements ICloudClient { private static final Logger LOGGER = LogManager.getLogger(); private final S3ClientConfig config; + private final CloseableAwsClients awsClients; private final S3Client s3Client; private final ICloudGuardian guardian; private final IRequestProfilerLimiter profiler; @@ -99,9 +103,10 @@ public final class S3CloudClient implements ICloudClient { this(config, buildClient(config), guardian); } - public S3CloudClient(S3ClientConfig config, S3Client s3Client, ICloudGuardian guardian) { + public S3CloudClient(S3ClientConfig config, CloseableAwsClients awsClients, ICloudGuardian guardian) { this.config = config; - this.s3Client = s3Client; + this.awsClients = awsClients; + this.s3Client = (S3Client) awsClients.getConsumingClient(); this.guardian = guardian; this.writeBufferSize = config.getWriteBufferSize(); long profilerInterval = config.getProfilerLogInterval(); @@ -344,7 +349,7 @@ public final class S3CloudClient implements ICloudClient { @Override public void close() { - s3Client.close(); + AwsUtils.closeClients(awsClients); } @Override @@ -359,9 +364,11 @@ public final class S3CloudClient implements ICloudClient { return new S3BufferedWriter(s3Client, profiler, guardian, bucket, config.getPrefix() + path); } - private static S3Client buildClient(S3ClientConfig config) { + private static CloseableAwsClients buildClient(S3ClientConfig config) { + CloseableAwsClients awsClients = new CloseableAwsClients(); S3ClientBuilder builder = S3Client.builder(); - builder.credentialsProvider(config.createCredentialsProvider()); + AwsCredentialsProvider credentialsProvider = config.createCredentialsProvider(); + builder.credentialsProvider(credentialsProvider); builder.region(Region.of(config.getRegion())); builder.forcePathStyle(config.isForcePathStyle()); @@ -386,7 +393,10 @@ public final class S3CloudClient implements ICloudClient { } SdkHttpClient httpClient = ApacheHttpClient.builder().buildWithDefaults(customHttpConfigBuilder.build()); builder.httpClient(httpClient); - return builder.build(); + + awsClients.setConsumingClient(builder.build()); + awsClients.setCredentialsProvider(credentialsProvider); + return awsClients; } private Set<CloudFile> filterAndGet(List<S3Object> contents, FilenameFilter filter) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java index 19e4ad7ab7..eabebf7b3a 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/IApplicationContext.java @@ -29,8 +29,6 @@ import org.apache.asterix.common.config.NodeProperties; import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.config.StorageProperties; import org.apache.asterix.common.config.TransactionProperties; -import org.apache.asterix.common.external.IExternalCredentialsCache; -import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -108,14 +106,4 @@ public interface IApplicationContext { INamespaceResolver getNamespaceResolver(); INamespacePathResolver getNamespacePathResolver(); - - /** - * @return external credentials cache - */ - IExternalCredentialsCache getExternalCredentialsCache(); - - /** - * @return external credentials cache updater - */ - IExternalCredentialsCacheUpdater getExternalCredentialsCacheUpdater(); } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java index d233606fa6..9a183bd936 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/config/ExternalProperties.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.common.config; +import static org.apache.hyracks.control.common.config.OptionTypes.BOOLEAN; import static org.apache.hyracks.control.common.config.OptionTypes.LEVEL; import static org.apache.hyracks.control.common.config.OptionTypes.NONNEGATIVE_INTEGER; import static org.apache.hyracks.control.common.config.OptionTypes.POSITIVE_INTEGER; @@ -56,15 +57,26 @@ public class ExternalProperties extends AbstractProperties { AZURE_REQUEST_TIMEOUT(POSITIVE_INTEGER, 120, "Timeout for Azure client requests in seconds"), AWS_ASSUME_ROLE_DURATION( getRangedIntegerType(900, 43200), - 900, + 3600, "AWS assuming role duration in seconds. " + "Range from 900 seconds (15 mins) to 43200 seconds (12 hours)"), - AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE( - getRangedIntegerType(25, 90), - 75, - "Percentage of duration passed before assume role credentials need to be refreshed, the value ranges " - + "from 25 to 90, default is 75. For example, if the value is set to 65, this means the " - + "credentials need to be refreshed if 65% of the total expiration duration is already passed"), + AWS_ASSUME_ROLE_STALE_TIME( + POSITIVE_INTEGER, + 60, + "The amount of time (in seconds), relative to STS token expiration, that the cached credentials are " + + "considered stale and must be updated"), + AWS_ASSUME_ROLE_PREFETCH_TIME( + POSITIVE_INTEGER, + 300, + "the amount of time, relative to STS token expiration, that the cached credentials are considered " + + "close to stale and should be updated. Prefetch updates will occur between the specified " + + "time and the stale time of the provider."), + AWS_ASSUME_ROLE_ASYNC_REFRESH_ENABLED( + BOOLEAN, + true, + "Whether the provider should fetch credentials asynchronously in the background. If this is true, " + + "threads are less likely to block when credentials are loaded, but additional resources are " + + "used to maintain the provider."), GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION( getRangedIntegerType(60, 3600), 900, @@ -98,7 +110,9 @@ public class ExternalProperties extends AbstractProperties { case LIBRARY_DEPLOY_TIMEOUT: case AZURE_REQUEST_TIMEOUT: case AWS_ASSUME_ROLE_DURATION: - case AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE: + case AWS_ASSUME_ROLE_STALE_TIME: + case AWS_ASSUME_ROLE_PREFETCH_TIME: + case AWS_ASSUME_ROLE_ASYNC_REFRESH_ENABLED: case GCP_IMPERSONATE_SERVICE_ACCOUNT_DURATION: return Section.COMMON; case CC_JAVA_OPTS: @@ -185,8 +199,16 @@ public class ExternalProperties extends AbstractProperties { return accessor.getInt(Option.AWS_ASSUME_ROLE_DURATION); } - public int getAwsRefreshAssumeRoleThresholdPercentage() { - return accessor.getInt(Option.AWS_REFRESH_ASSUME_ROLE_THRESHOLD_PERCENTAGE); + public int getAwsAssumeRoleStaleTime() { + return accessor.getInt(Option.AWS_ASSUME_ROLE_STALE_TIME); + } + + public int getAwsAssumeRolePrefetchTime() { + return accessor.getInt(Option.AWS_ASSUME_ROLE_PREFETCH_TIME); + } + + public boolean getAwsAssumeRoleAsyncRefreshEnabled() { + return accessor.getBoolean(Option.AWS_ASSUME_ROLE_ASYNC_REFRESH_ENABLED); } public int getGcpImpersonateServiceAccountDuration() { diff --git a/asterixdb/asterix-external-data/pom.xml b/asterixdb/asterix-external-data/pom.xml index f3be28f091..520642ffaf 100644 --- a/asterixdb/asterix-external-data/pom.xml +++ b/asterixdb/asterix-external-data/pom.xml @@ -459,6 +459,10 @@ <groupId>software.amazon.awssdk</groupId> <artifactId>s3</artifactId> </dependency> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>glue</artifactId> + </dependency> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>regions</artifactId> diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java index bea096671c..ded052b851 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java @@ -36,6 +36,7 @@ import org.apache.asterix.external.input.record.reader.abstracts.AbstractExterna import org.apache.asterix.external.input.record.reader.stream.AvailableInputStream; import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.aws.AwsUtils; +import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients; import org.apache.asterix.external.util.aws.s3.S3Utils; import org.apache.commons.lang3.StringUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -53,15 +54,18 @@ public class AwsS3InputStream extends AbstractExternalInputStream { private static final int MAX_RETRIES = 5; // We will retry 5 times in case of internal error from AWS S3 service private final IApplicationContext ncAppCtx; private final String bucket; - private S3Client s3Client; + private final CloseableAwsClients awsClients; + private final S3Client s3Client; private ResponseInputStream<?> s3InStream; public AwsS3InputStream(IApplicationContext ncAppCtx, Map<String, String> configuration, List<String> filePaths, IExternalFilterValueEmbedder valueEmbedder) throws HyracksDataException { super(configuration, filePaths, valueEmbedder); this.ncAppCtx = ncAppCtx; - this.s3Client = buildAwsS3Client(configuration); this.bucket = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + + this.awsClients = buildAwsS3Client(configuration); + this.s3Client = (S3Client) awsClients.getConsumingClient(); } @Override @@ -97,11 +101,7 @@ public class AwsS3InputStream extends AbstractExternalInputStream { LOGGER.debug(() -> "Key " + userData(request.key()) + " was not found in bucket {}" + request.bucket()); return false; } catch (S3Exception ex) { - if (AwsUtils.isArnAssumedRoleExpiredToken(configuration, ex.awsErrorDetails().errorCode())) { - LOGGER.debug(() -> "Expired AWS assume role session, will attempt to refresh the session"); - rebuildAwsS3Client(configuration); - LOGGER.debug(() -> "Successfully refreshed AWS assume role session"); - } else if (shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) { + if (shouldRetry(ex.awsErrorDetails().errorCode(), retries++)) { LOGGER.debug(() -> "S3 retryable error: " + userData(ex.getMessage())); } else { throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); @@ -132,9 +132,7 @@ public class AwsS3InputStream extends AbstractExternalInputStream { } CleanupUtils.close(in, null); } - if (s3Client != null) { - CleanupUtils.close(s3Client, null); - } + AwsUtils.closeClients(awsClients); } @Override @@ -147,15 +145,11 @@ public class AwsS3InputStream extends AbstractExternalInputStream { return false; } - private S3Client buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException { + private CloseableAwsClients buildAwsS3Client(Map<String, String> configuration) throws HyracksDataException { try { return S3Utils.buildClient(ncAppCtx, configuration); } catch (CompilationException ex) { throw HyracksDataException.create(ex); } } - - private void rebuildAwsS3Client(Map<String, String> configuration) throws HyracksDataException { - s3Client = buildAwsS3Client(configuration); - } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java index a648853a46..5ffd7e2778 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/AwsUtils.java @@ -24,7 +24,6 @@ import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_P import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED; import static org.apache.asterix.external.util.aws.AwsConstants.ACCESS_KEY_ID_FIELD_NAME; import static org.apache.asterix.external.util.aws.AwsConstants.CROSS_REGION_FIELD_NAME; -import static org.apache.asterix.external.util.aws.AwsConstants.ERROR_EXPIRED_TOKEN; import static org.apache.asterix.external.util.aws.AwsConstants.EXTERNAL_ID_FIELD_NAME; import static org.apache.asterix.external.util.aws.AwsConstants.INSTANCE_PROFILE_FIELD_NAME; import static org.apache.asterix.external.util.aws.AwsConstants.REGION_FIELD_NAME; @@ -33,6 +32,9 @@ import static org.apache.asterix.external.util.aws.AwsConstants.SECRET_ACCESS_KE import static org.apache.asterix.external.util.aws.AwsConstants.SESSION_TOKEN_FIELD_NAME; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.Optional; @@ -41,10 +43,9 @@ import java.util.UUID; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.external.IExternalCredentialsCache; -import org.apache.asterix.common.external.IExternalCredentialsCacheUpdater; -import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.CleanupUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; @@ -52,16 +53,44 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.awscore.AwsClient; +import software.amazon.awssdk.core.SdkClient; +import software.amazon.awssdk.core.SdkRequest; +import software.amazon.awssdk.core.SdkResponse; +import software.amazon.awssdk.core.client.builder.SdkClientBuilder; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.interceptor.Context; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.sts.StsClient; +import software.amazon.awssdk.services.sts.StsClientBuilder; +import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; import software.amazon.awssdk.services.sts.model.AssumeRoleResponse; -import software.amazon.awssdk.services.sts.model.Credentials; public class AwsUtils { + private static final Logger LOGGER = LogManager.getLogger(); + private static final ExecutionInterceptor ASSUME_ROLE_INTERCEPTOR; + + static { + ASSUME_ROLE_INTERCEPTOR = new ExecutionInterceptor() { + @Override + public void afterExecution(Context.AfterExecution context, ExecutionAttributes executionAttributes) { + SdkRequest req = context.request(); + SdkResponse resp = context.response(); + if (req instanceof AssumeRoleRequest assumeReq && resp instanceof AssumeRoleResponse assumeResp) { + LOGGER.info("STS refresh success [Thread={}, Role={}, Expiry={}]", Thread.currentThread().getName(), + assumeReq.roleArn(), + assumeResp.credentials().expiration()); + } + ExecutionInterceptor.super.afterExecution(context, executionAttributes); + } + }; + } + public enum AuthenticationType { ANONYMOUS, ARN_ASSUME_ROLE, @@ -74,19 +103,14 @@ public class AwsUtils { throw new AssertionError("do not instantiate"); } - public static boolean isArnAssumedRoleExpiredToken(Map<String, String> configuration, String errorCode) { - return ERROR_EXPIRED_TOKEN.equals(errorCode) - && getAuthenticationType(configuration) == AuthenticationType.ARN_ASSUME_ROLE; - } - public static AwsCredentialsProvider buildCredentialsProvider(IApplicationContext appCtx, - Map<String, String> configuration) throws CompilationException { + Map<String, String> configuration, CloseableAwsClients awsClients) throws CompilationException { AuthenticationType authenticationType = getAuthenticationType(configuration); switch (authenticationType) { case ANONYMOUS: return AnonymousCredentialsProvider.create(); case ARN_ASSUME_ROLE: - return getTrustAccountCredentials(appCtx, configuration); + return getTrustAccountCredentials(appCtx, configuration, awsClients); case INSTANCE_PROFILE: return getInstanceProfileCredentials(configuration); case ACCESS_KEYS: @@ -157,57 +181,49 @@ public class AwsUtils { * @throws CompilationException CompilationException */ public static AwsCredentialsProvider getTrustAccountCredentials(IApplicationContext appCtx, - Map<String, String> configuration) throws CompilationException { - IExternalCredentialsCache cache = appCtx.getExternalCredentialsCache(); - Object credentialsObject = cache.get(configuration.get(ExternalDataConstants.KEY_ENTITY_ID)); - if (credentialsObject != null) { - return () -> (AwsSessionCredentials) credentialsObject; - } - IExternalCredentialsCacheUpdater cacheUpdater = appCtx.getExternalCredentialsCacheUpdater(); - AwsSessionCredentials credentials; - try { - credentials = (AwsSessionCredentials) cacheUpdater.generateAndCacheCredentials(configuration); - } catch (HyracksDataException ex) { - throw new CompilationException(ErrorCode.FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION, ex, ex.getMessage()); - } + Map<String, String> configuration, CloseableAwsClients awsClients) throws CompilationException { + AwsCredentialsProvider credentialsToAssumeRole = getCredentialsToAssumeRole(configuration); - return () -> credentials; - } + // build sts client used for assuming role + ClientOverrideConfiguration.Builder clientConfigurationBuilder = ClientOverrideConfiguration.builder(); + clientConfigurationBuilder.addExecutionInterceptor(ASSUME_ROLE_INTERCEPTOR); + ClientOverrideConfiguration clientConfiguration = clientConfigurationBuilder.build(); - /** - * Assume role using provided credentials and return the new credentials - * - * @param configuration configuration - * @return return credentials from the assume role - * @throws CompilationException CompilationException - */ - public static AwsCredentialsProvider assumeRoleAndGetCredentials(Map<String, String> configuration) - throws CompilationException { - String regionId = configuration.get(REGION_FIELD_NAME); - String arnRole = configuration.get(ROLE_ARN_FIELD_NAME); - String externalId = configuration.get(EXTERNAL_ID_FIELD_NAME); - Region region = validateAndGetRegion(regionId); - - AssumeRoleRequest.Builder builder = AssumeRoleRequest.builder(); - builder.roleArn(arnRole); - builder.roleSessionName(UUID.randomUUID().toString()); - builder.durationSeconds(900); // TODO(htowaileb): configurable? Can be 900 to 43200 (15 mins to 12 hours) - if (externalId != null) { - builder.externalId(externalId); + StsClientBuilder stsClientBuilder = StsClient.builder(); + stsClientBuilder.credentialsProvider(credentialsToAssumeRole); + stsClientBuilder.region(validateAndGetRegion(configuration.get(REGION_FIELD_NAME))); + stsClientBuilder.overrideConfiguration(clientConfiguration); + StsClient stsClient = stsClientBuilder.build(); + awsClients.setStsClient(stsClient); + + // build refresh role request + String sessionName = UUID.randomUUID().toString(); + LOGGER.info("Assuming role with session name ({}) for ({})", sessionName, Thread.currentThread().getName()); + AssumeRoleRequest.Builder refreshRequestBuilder = AssumeRoleRequest.builder(); + refreshRequestBuilder.roleArn(configuration.get(ROLE_ARN_FIELD_NAME)); + refreshRequestBuilder.externalId(configuration.get(EXTERNAL_ID_FIELD_NAME)); + refreshRequestBuilder.roleSessionName(sessionName); + if (appCtx != null) { + int duration = appCtx.getExternalProperties().getAwsAssumeRoleDuration(); + refreshRequestBuilder.durationSeconds(duration); } - AssumeRoleRequest request = builder.build(); - AwsCredentialsProvider credentialsProvider = getCredentialsToAssumeRole(configuration); - - // assume the role from the provided arn - try (StsClient stsClient = - StsClient.builder().region(region).credentialsProvider(credentialsProvider).build()) { - AssumeRoleResponse response = stsClient.assumeRole(request); - Credentials credentials = response.credentials(); - return StaticCredentialsProvider.create(AwsSessionCredentials.create(credentials.accessKeyId(), - credentials.secretAccessKey(), credentials.sessionToken())); - } catch (SdkException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); + + // build credentials provider + StsAssumeRoleCredentialsProvider.Builder builder = StsAssumeRoleCredentialsProvider.builder(); + builder.refreshRequest(refreshRequestBuilder.build()); + builder.stsClient(stsClient); + if (appCtx != null) { + int staleTime = appCtx.getExternalProperties().getAwsAssumeRoleStaleTime(); + int prefetchTime = appCtx.getExternalProperties().getAwsAssumeRolePrefetchTime(); + boolean asyncCredentialsUpdate = appCtx.getExternalProperties().getAwsAssumeRoleAsyncRefreshEnabled(); + builder.staleTime(Duration.ofSeconds(staleTime)); + builder.prefetchTime(Duration.ofSeconds(prefetchTime)); + builder.asyncCredentialUpdateEnabled(asyncCredentialsUpdate); } + + StsAssumeRoleCredentialsProvider credentialsProvider = builder.build(); + awsClients.setCredentialsProvider(credentialsProvider); + return credentialsProvider; } private static AwsCredentialsProvider getCredentialsToAssumeRole(Map<String, String> configuration) @@ -266,6 +282,24 @@ public class AwsUtils { } } + public static <B extends SdkClientBuilder<B, C>, C extends SdkClient> void setEndpoint(B builder, String endpoint) + throws CompilationException { + // Validate the service endpoint if present + if (endpoint != null) { + try { + URI uri = new URI(endpoint); + try { + builder.endpointOverride(uri); + } catch (NullPointerException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); + } + } catch (URISyntaxException ex) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, + String.format("Invalid service endpoint %s", endpoint)); + } + } + } + private static String getNonNull(Map<String, String> configuration, String... fieldNames) { for (String fieldName : fieldNames) { if (configuration.get(fieldName) != null) { @@ -283,4 +317,51 @@ public class AwsUtils { public static String generateExternalId() { return UUID.randomUUID().toString(); } + + public static void closeClients(CloseableAwsClients clients) { + if (clients == null) { + return; + } + + AwsCredentialsProvider credentialsProvider = clients.getCredentialsProvider(); + if (credentialsProvider instanceof StsAssumeRoleCredentialsProvider assumeRoleCredsProvider) { + CleanupUtils.close(null, clients.getConsumingClient(), clients.getStsClient(), assumeRoleCredsProvider); + } else { + CleanupUtils.close(null, clients.getConsumingClient(), clients.getStsClient()); + } + } + + public static class CloseableAwsClients { + private AwsClient consumingClient; + private StsClient stsClient; + private AwsCredentialsProvider credentialsProvider; + + public CloseableAwsClients() { + + } + + public AwsClient getConsumingClient() { + return consumingClient; + } + + public StsClient getStsClient() { + return stsClient; + } + + public AwsCredentialsProvider getCredentialsProvider() { + return credentialsProvider; + } + + public void setConsumingClient(AwsClient consumingClient) { + this.consumingClient = consumingClient; + } + + public void setStsClient(StsClient stsClient) { + this.stsClient = stsClient; + } + + public void setCredentialsProvider(AwsCredentialsProvider credentialsProvider) { + this.credentialsProvider = credentialsProvider; + } + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java new file mode 100644 index 0000000000..0ba702fce8 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.util.aws.glue; + +import static org.apache.asterix.external.util.aws.AwsConstants.REGION_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsConstants.SERVICE_END_POINT_FIELD_NAME; +import static org.apache.asterix.external.util.aws.AwsUtils.buildCredentialsProvider; +import static org.apache.asterix.external.util.aws.AwsUtils.validateAndGetRegion; + +import java.util.Map; + +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.external.util.aws.AwsUtils; +import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients; + +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.glue.GlueClient; +import software.amazon.awssdk.services.glue.GlueClientBuilder; + +public class GlueUtils { + private GlueUtils() { + throw new AssertionError("do not instantiate"); + } + + /** + * Builds the client using the provided configuration + * + * @param configuration properties + * @return client + * @throws CompilationException CompilationException + */ + public static CloseableAwsClients buildClient(IApplicationContext appCtx, Map<String, String> configuration) + throws CompilationException { + CloseableAwsClients awsClients = new CloseableAwsClients(); + String regionId = configuration.get(REGION_FIELD_NAME); + String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); + + Region region = validateAndGetRegion(regionId); + AwsCredentialsProvider credentialsProvider = buildCredentialsProvider(appCtx, configuration, awsClients); + + GlueClientBuilder builder = GlueClient.builder(); + builder.region(region); + builder.credentialsProvider(credentialsProvider); + AwsUtils.setEndpoint(builder, serviceEndpoint); + + awsClients.setConsumingClient(builder.build()); + return awsClients; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java index 3cfef1e996..dbddb38b4c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Constants.java @@ -62,4 +62,8 @@ public class S3Constants { public static final String HADOOP_INSTANCE_PROFILE = "org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider"; public static final String HADOOP_SIMPLE = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"; public static final String HADOOP_TEMPORARY = "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider"; + + // browse API + public static final String FILES = "files"; + public static final String FOLDERS = "folders"; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java index e20e39f2bf..6484b20ff0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java @@ -39,6 +39,8 @@ import static org.apache.asterix.external.util.aws.AwsUtils.buildCredentialsProv import static org.apache.asterix.external.util.aws.AwsUtils.getAuthenticationType; import static org.apache.asterix.external.util.aws.AwsUtils.validateAndGetCrossRegion; import static org.apache.asterix.external.util.aws.AwsUtils.validateAndGetRegion; +import static org.apache.asterix.external.util.aws.s3.S3Constants.FILES; +import static org.apache.asterix.external.util.aws.s3.S3Constants.FOLDERS; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ACCESS_KEY_ID; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ANONYMOUS; import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_ASSUMED_ROLE; @@ -60,8 +62,6 @@ import static org.apache.asterix.external.util.aws.s3.S3Constants.HADOOP_TEMPORA import static org.apache.asterix.external.util.aws.s3.S3Constants.PATH_STYLE_ADDRESSING_FIELD_NAME; import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; -import java.net.URI; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -79,6 +79,7 @@ import org.apache.asterix.external.util.ExternalDataConstants; import org.apache.asterix.external.util.ExternalDataPrefix; import org.apache.asterix.external.util.ExternalDataUtils; import org.apache.asterix.external.util.aws.AwsUtils; +import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; @@ -86,7 +87,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.api.exceptions.Warning; -import org.apache.hyracks.api.util.CleanupUtils; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.core.exception.SdkException; @@ -109,45 +109,33 @@ public class S3Utils { } /** - * Builds the S3 client using the provided configuration + * Builds the client using the provided configuration * * @param configuration properties - * @return S3 client + * @return client * @throws CompilationException CompilationException */ - public static S3Client buildClient(IApplicationContext appCtx, Map<String, String> configuration) + public static CloseableAwsClients buildClient(IApplicationContext appCtx, Map<String, String> configuration) throws CompilationException { + CloseableAwsClients awsClients = new CloseableAwsClients(); String regionId = configuration.get(REGION_FIELD_NAME); String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); Region region = validateAndGetRegion(regionId); boolean crossRegion = validateAndGetCrossRegion(configuration.get(CROSS_REGION_FIELD_NAME)); - AwsCredentialsProvider credentialsProvider = buildCredentialsProvider(appCtx, configuration); + AwsCredentialsProvider credentialsProvider = buildCredentialsProvider(appCtx, configuration, awsClients); S3ClientBuilder builder = S3Client.builder(); builder.region(region); builder.crossRegionAccessEnabled(crossRegion); builder.credentialsProvider(credentialsProvider); - - // Validate the service endpoint if present - if (serviceEndpoint != null) { - try { - URI uri = new URI(serviceEndpoint); - try { - builder.endpointOverride(uri); - } catch (NullPointerException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); - } - } catch (URISyntaxException ex) { - throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, - String.format("Invalid service endpoint %s", serviceEndpoint)); - } - } + AwsUtils.setEndpoint(builder, serviceEndpoint); boolean pathStyleAddressing = validateAndGetPathStyleAddressing(configuration.get(PATH_STYLE_ADDRESSING_FIELD_NAME), serviceEndpoint); builder.forcePathStyle(pathStyleAddressing); - return builder.build(); + awsClients.setConsumingClient(builder.build()); + return awsClients; } public static void configureAwsS3HdfsJobConf(IApplicationContext appCtx, JobConf conf, @@ -293,7 +281,8 @@ public class S3Utils { } // Check if the bucket is present - S3Client s3Client = buildClient(appCtx, configuration); + CloseableAwsClients awsClients = buildClient(appCtx, configuration); + S3Client s3Client = (S3Client) awsClients.getConsumingClient(); S3Response response; boolean useOldApi = false; String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); @@ -317,9 +306,7 @@ public class S3Utils { } catch (SdkException ex) { throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); } finally { - if (s3Client != null) { - CleanupUtils.close(s3Client, null); - } + AwsUtils.closeClients(awsClients); } boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty() @@ -379,7 +366,8 @@ public class S3Utils { // Prepare to retrieve the objects List<S3Object> filesOnly; String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); - S3Client s3Client = buildClient(appCtx, configuration); + CloseableAwsClients awsClients = buildClient(appCtx, configuration); + S3Client s3Client = (S3Client) awsClients.getConsumingClient(); String prefix = getPrefix(configuration); try { @@ -401,9 +389,7 @@ public class S3Utils { } catch (SdkException ex) { throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, getMessageOrToString(ex)); } finally { - if (s3Client != null) { - CleanupUtils.close(s3Client, null); - } + AwsUtils.closeClients(awsClients); } // Warn if no files are returned @@ -522,58 +508,59 @@ public class S3Utils { } } - public static Map<String, List<String>> S3ObjectsOfSingleDepth(IApplicationContext appCtx, - Map<String, String> configuration, String container, String prefix) - throws CompilationException, HyracksDataException { - // create s3 client - S3Client s3Client = buildClient(appCtx, configuration); - // fetch all the s3 objects - return listS3ObjectsOfSingleDepth(s3Client, container, prefix); - } - /** * Uses the latest API to retrieve the objects from the storage of a single level. * - * @param s3Client S3 client - * @param container container name - * @param prefix definition prefix + * @param appCtx application context + * @param configuration configuration + * @param container container name + * @param prefix definition prefix */ - private static Map<String, List<String>> listS3ObjectsOfSingleDepth(S3Client s3Client, String container, - String prefix) { - Map<String, List<String>> allObjects = new HashMap<>(); - ListObjectsV2Iterable listObjectsInterable; - ListObjectsV2Request.Builder listObjectsBuilder = - ListObjectsV2Request.builder().bucket(container).prefix(prefix).delimiter("/"); + public static Map<String, List<String>> listS3ObjectsOfSingleDepth(IApplicationContext appCtx, + Map<String, String> configuration, String container, String prefix) throws CompilationException { + CloseableAwsClients awsClients = buildClient(appCtx, configuration); + S3Client s3Client = (S3Client) awsClients.getConsumingClient(); + + ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder(); + listObjectsBuilder.bucket(container); listObjectsBuilder.prefix(prefix); + listObjectsBuilder.delimiter("/"); + ListObjectsV2Request listObjectsV2Request = listObjectsBuilder.build(); + + Map<String, List<String>> allObjects = new HashMap<>(); List<String> files = new ArrayList<>(); List<String> folders = new ArrayList<>(); + // to skip the prefix as a file from the response boolean checkPrefixInFile = true; - listObjectsInterable = s3Client.listObjectsV2Paginator(listObjectsBuilder.build()); - for (ListObjectsV2Response response : listObjectsInterable) { - // put all the files - for (S3Object object : response.contents()) { - String fileName = object.key(); - fileName = fileName.substring(prefix.length(), fileName.length()); - if (checkPrefixInFile) { - if (prefix.equals(object.key())) + try { + ListObjectsV2Iterable listObjectsIterable = s3Client.listObjectsV2Paginator(listObjectsV2Request); + for (ListObjectsV2Response response : listObjectsIterable) { + // put all the files + for (S3Object object : response.contents()) { + String fileName = object.key(); + fileName = fileName.substring(prefix.length()); + if (checkPrefixInFile && prefix.equals(object.key())) { checkPrefixInFile = false; - else { + } else { files.add(fileName); } - } else { - files.add(fileName); + } + + // put all the folders + for (CommonPrefix object : response.commonPrefixes()) { + String folderName = object.prefix(); + folderName = folderName.substring(prefix.length()); + folders.add( + folderName.endsWith("/") ? folderName.substring(0, folderName.length() - 1) : folderName); } } - // put all the folders - for (CommonPrefix object : response.commonPrefixes()) { - String folderName = object.prefix(); - folderName = folderName.substring(prefix.length(), folderName.length()); - folders.add(folderName.endsWith("/") ? folderName.substring(0, folderName.length() - 1) : folderName); - } + } finally { + AwsUtils.closeClients(awsClients); } - allObjects.put("files", files); - allObjects.put("folders", folders); + + allObjects.put(FILES, files); + allObjects.put(FOLDERS, folders); return allObjects; } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index dd9f4070d2..dc4d93e74c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -553,7 +553,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe @Override public void close() throws HyracksDataException { traceLastRecordIn(); - Throwable failure = CleanupUtils.close(frameOpCallbacks, null); + Throwable failure = CleanupUtils.close(null, frameOpCallbacks); failure = CleanupUtils.destroy(failure, cursors); failure = CleanupUtils.close(writer, failure); failure = closeIndexHelpers(failure); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java index 9778a309da..f192907215 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/CleanupUtils.java @@ -100,11 +100,11 @@ public class CleanupUtils { } } - public static Throwable close(AutoCloseable[] closables, Throwable root) { + public static Throwable close(Throwable root, AutoCloseable... closables) { return close(closables, root, false); } - public static Throwable closeSilently(AutoCloseable[] closables, Throwable root) { + public static Throwable closeSilently(Throwable root, AutoCloseable... closables) { return close(closables, root, true); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java index 139a87195d..2eb5258d35 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java @@ -46,7 +46,7 @@ public class LSMIndexCompactOperatorNodePushable extends AbstractOperatorNodePus @Override public void deinitialize() throws HyracksDataException { - Throwable failure = CleanupUtils.close(indexHelpers, null); + Throwable failure = CleanupUtils.close(null, indexHelpers); if (failure != null) { throw HyracksDataException.create(failure); }
