This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit a7b10d8dbee54e1e98d1127ca610ad1a840142d5
Merge: a73ca5f5f4 32a3442633
Author: Hussain Towaileb <[email protected]>
AuthorDate: Tue Apr 8 17:24:28 2025 +0300

    merge branch gerrit/trinity into gerrit/ionic
    
    Ext-ref: MB-65953
    Change-Id: Ic7e9d34005c648d6b8cf5fb9cbc1ad14ed32bfee

 .../asterix/translator/AbstractLangTranslator.java |  2 +-
 .../asterix/app/translator/QueryTranslator.java    |  2 +-
 .../java/org/apache/asterix/test/dml/DmlTest.java  |  2 +
 .../asterix/test/runtime/TPCExecutionTest.java     |  2 +
 .../common/config/OptimizationConfUtil.java        |  6 +--
 .../common/exceptions/AsterixException.java        | 37 +++++++++------
 .../common/exceptions/MetadataException.java       | 16 +++----
 .../input/record/reader/aws/AwsS3InputStream.java  |  6 +--
 .../external/util/google/gcs/GCSAuthUtils.java     |  2 +-
 .../functions/ExternalFunctionCompilerUtil.java    |  2 +-
 .../data/common/ExpressionTypeComputer.java        |  7 ++-
 .../common/exceptions/AlgebricksException.java     | 52 +++++++++++++---------
 .../LogicalOperatorPrettyPrintVisitorJson.java     |  2 +-
 .../core/rewriter/base/AbstractRuleController.java |  6 +--
 .../core/rewriter/base/HeuristicOptimizer.java     |  2 +-
 .../apache/hyracks/api/util/ExceptionUtils.java    | 16 ++++++-
 16 files changed, 100 insertions(+), 62 deletions(-)

diff --cc 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
index 755d4f67c2,6a24d5971f..3948e2f03b
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/AbstractLangTranslator.java
@@@ -112,7 -98,7 +112,7 @@@ public abstract class AbstractLangTrans
              try {
                  clusterStateManager.waitForState(ClusterState.ACTIVE, 
maxWaitCycles, TimeUnit.SECONDS);
              } catch (HyracksDataException e) {
-                 throw new AlgebricksException(e, TIMEOUT);
 -                throw new AsterixException(e);
++                throw new AlgebricksException(TIMEOUT, e);
              } catch (InterruptedException e) {
                  if (LOGGER.isWarnEnabled()) {
                      LOGGER.warn("Thread interrupted while waiting for cluster 
to be " + ClusterState.ACTIVE);
diff --cc 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 069e99cba1,f4e52c536b..fee660d188
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@@ -5624,14 -4888,8 +5624,14 @@@ public class QueryTranslator extends Ab
                  ensureNotCancelled(clientRequest);
                  printer.print(jobId);
              }
 +            if (atomic) {
 +                globalTxManager.commitTransaction(jobId);
 +            }
          } catch (Exception e) {
 +            if (atomic && jobId != null) {
 +                globalTxManager.abortTransaction(jobId);
 +            }
-             if (ExceptionUtils.getRootCause(e) instanceof 
InterruptedException) {
+             if (ExceptionUtils.causedByInterrupt(e)) {
                  Thread.currentThread().interrupt();
                  throw new RuntimeDataException(ErrorCode.REQUEST_CANCELLED, 
clientRequest.getId());
              }
diff --cc 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
index f950febfde,77897ea9d9..a9f12341ff
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStream.java
@@@ -92,18 -88,14 +92,18 @@@ public class AwsS3InputStream extends A
                  in = s3InStream;
                  break;
              } catch (NoSuchKeyException ex) {
 -                LOGGER.debug(() -> "Key " + 
LogRedactionUtil.userData(request.key()) + " was not found in bucket "
 -                        + request.bucket());
 +                LOGGER.debug(() -> "Key " + userData(request.key()) + " was 
not found in bucket {}" + request.bucket());
                  return false;
              } catch (S3Exception ex) {
 -                if (!shouldRetry(ex.awsErrorDetails().errorCode(), 
retries++)) {
 +                if (S3AuthUtils.isArnAssumedRoleExpiredToken(configuration, 
ex.awsErrorDetails().errorCode())) {
 +                    LOGGER.debug(() -> "Expired AWS assume role session, will 
attempt to refresh the session");
 +                    rebuildAwsS3Client(configuration);
 +                    LOGGER.debug(() -> "Successfully refreshed AWS assume 
role session");
 +                } else if (shouldRetry(ex.awsErrorDetails().errorCode(), 
retries++)) {
 +                    LOGGER.debug(() -> "S3 retryable error: " + 
userData(ex.getMessage()));
 +                } else {
-                     throw new 
RuntimeDataException(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
+                     throw 
RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
                  }
 -                LOGGER.debug(() -> "S3 retryable error: " + 
LogRedactionUtil.userData(ex.getMessage()));
  
                  // Backoff for 1 sec for the first 2 retries, and 2 seconds 
from there onward
                  try {
diff --cc 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java
index 4473ad759a,0000000000..4a3f0984f9
mode 100644,000000..100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSAuthUtils.java
@@@ -1,294 -1,0 +1,294 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.util.google.gcs;
 +
 +import static 
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
 +import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_ENDPOINT;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_GCS_PROTOCOL;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HadoopAuthServiceAccount.IMPERSONATE_SERVICE_ACCOUNT;
 +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Map;
 +
 +import org.apache.asterix.common.api.IApplicationContext;
 +import org.apache.asterix.common.exceptions.CompilationException;
 +import org.apache.asterix.common.exceptions.ErrorCode;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.HDFSUtils;
 +import org.apache.hadoop.mapred.JobConf;
 +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 +
 +import com.fasterxml.jackson.core.JsonProcessingException;
 +import com.fasterxml.jackson.core.json.JsonReadFeature;
 +import com.fasterxml.jackson.databind.JsonNode;
 +import com.fasterxml.jackson.databind.ObjectMapper;
 +import com.google.auth.Credentials;
 +import com.google.auth.oauth2.GoogleCredentials;
 +import com.google.auth.oauth2.ImpersonatedCredentials;
 +import com.google.cloud.NoCredentials;
 +import com.google.cloud.storage.Storage;
 +import com.google.cloud.storage.StorageOptions;
 +
 +public class GCSAuthUtils {
 +    enum AuthenticationType {
 +        ANONYMOUS,
 +        IMPERSONATE_SERVICE_ACCOUNT,
 +        APPLICATION_DEFAULT_CREDENTIALS,
 +        SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS,
 +        BAD_AUTHENTICATION
 +    }
 +
 +    private static final ObjectMapper JSON_CREDENTIALS_OBJECT_MAPPER = new 
ObjectMapper();
 +    private static final List<String> READ_WRITE_SCOPE_PERMISSION =
 +            
Collections.singletonList("https://www.googleapis.com/auth/devstorage.read_write";);
 +    static {
 +        
JSON_CREDENTIALS_OBJECT_MAPPER.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
 true);
 +    }
 +
 +    private GCSAuthUtils() {
 +        throw new AssertionError("do not instantiate");
 +    }
 +
 +    /**
 +     * Builds the client using the provided configuration
 +     *
 +     * @param appCtx application context
 +     * @param configuration properties
 +     * @return Storage client
 +     * @throws CompilationException CompilationException
 +     */
 +    public static Storage buildClient(IApplicationContext appCtx, Map<String, 
String> configuration)
 +            throws CompilationException {
 +        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
 +
 +        Credentials credentials = buildCredentials(appCtx, configuration);
 +        StorageOptions.Builder builder = StorageOptions.newBuilder();
 +        builder.setCredentials(credentials);
 +
 +        if (endpoint != null) {
 +            builder.setHost(endpoint);
 +        }
 +
 +        return builder.build().getService();
 +    }
 +
 +    public static Credentials buildCredentials(IApplicationContext appCtx, 
Map<String, String> configuration) throws CompilationException {
 +        AuthenticationType authenticationType = 
getAuthenticationType(configuration);
 +        return switch (authenticationType) {
 +            case ANONYMOUS -> NoCredentials.getInstance();
 +            case IMPERSONATE_SERVICE_ACCOUNT -> 
getImpersonatedServiceAccountCredentials(appCtx, configuration);
 +            case APPLICATION_DEFAULT_CREDENTIALS -> 
getApplicationDefaultCredentials(configuration);
 +            case SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS -> 
getServiceAccountKeyCredentials(configuration);
 +            case BAD_AUTHENTICATION -> throw new 
CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
 +        };
 +    }
 +
 +    private static AuthenticationType getAuthenticationType(Map<String, 
String> configuration) {
 +        String impersonateServiceAccount = 
configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME);
 +        String applicationDefaultCredentials = 
configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
 +        String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
 +
 +        if (noAuth(configuration)) {
 +            return AuthenticationType.ANONYMOUS;
 +        } else if (impersonateServiceAccount != null) {
 +            return AuthenticationType.IMPERSONATE_SERVICE_ACCOUNT;
 +        } else if (applicationDefaultCredentials != null) {
 +            return AuthenticationType.APPLICATION_DEFAULT_CREDENTIALS;
 +        } else if (jsonCredentials != null) {
 +            return AuthenticationType.SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS;
 +        } else {
 +            return AuthenticationType.BAD_AUTHENTICATION;
 +        }
 +    }
 +
 +    private static boolean noAuth(Map<String, String> configuration) {
 +        return getNonNull(configuration, 
APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME, JSON_CREDENTIALS_FIELD_NAME,
 +                IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME) == null;
 +    }
 +
 +    /**
 +     * Returns the cached credentials if valid, otherwise, generates new 
credentials
 +     *
 +     * @param appCtx application context
 +     * @param configuration configuration
 +     * @return returns the cached credentials if valid, otherwise, generates 
new credentials
 +     * @throws CompilationException CompilationException
 +     */
 +    public static GoogleCredentials 
getImpersonatedServiceAccountCredentials(IApplicationContext appCtx,
 +            Map<String, String> configuration) throws CompilationException {
 +        GoogleCredentials sourceCredentials = 
getCredentialsToImpersonateServiceAccount(configuration);
 +        String impersonateServiceAccount = 
configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME);
 +        int duration = 
appCtx.getExternalProperties().getGcpImpersonateServiceAccountDuration();
 +
 +        // Create impersonated credentials
 +        return ImpersonatedCredentials.create(sourceCredentials, 
impersonateServiceAccount, null,
 +                READ_WRITE_SCOPE_PERMISSION, duration);
 +    }
 +
 +    private static GoogleCredentials 
getCredentialsToImpersonateServiceAccount(Map<String, String> configuration)
 +            throws CompilationException {
 +        String applicationDefaultCredentials = 
configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
 +        String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
 +
 +        if (applicationDefaultCredentials != null) {
 +            return getApplicationDefaultCredentials(configuration);
 +        } else if (jsonCredentials != null) {
 +            return getServiceAccountKeyCredentials(configuration);
 +        } else {
 +            throw new CompilationException(
 +                    
ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED_TO_IMPERSONATE_SERVICE_ACCOUNT);
 +        }
 +    }
 +
 +    private static GoogleCredentials 
getApplicationDefaultCredentials(Map<String, String> configuration)
 +            throws CompilationException {
 +        try {
 +            String notAllowed = getNonNull(configuration, 
JSON_CREDENTIALS_FIELD_NAME);
 +            if (notAllowed != null) {
 +                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, notAllowed,
 +                        APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
 +            }
 +            return GoogleCredentials.getApplicationDefault();
 +        } catch (Exception ex) {
 +            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
 +        }
 +    }
 +
 +    private static GoogleCredentials 
getServiceAccountKeyCredentials(Map<String, String> configuration)
 +            throws CompilationException {
 +        String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
 +        try (InputStream credentialsStream = new 
ByteArrayInputStream(jsonCredentials.getBytes())) {
 +            return GoogleCredentials.fromStream(credentialsStream);
 +        } catch (IOException ex) {
 +            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex, 
getMessageOrToString(ex));
 +        } catch (Exception ex) {
-             throw new CompilationException(EXTERNAL_SOURCE_ERROR,
++            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, ex,
 +                    "Encountered an issue while processing the JSON 
credentials. Please ensure the provided credentials are valid.");
 +        }
 +    }
 +
 +    public static void configureHdfsJobConf(JobConf jobConf, Map<String, 
String> configuration)
 +            throws AlgebricksException {
 +        configureHdfsJobConf(jobConf, configuration, 0);
 +    }
 +
 +    /**
 +     * Builds the client using the provided configuration
 +     *
 +     * @param configuration      properties
 +     * @param numberOfPartitions number of partitions
 +     */
 +    public static void configureHdfsJobConf(JobConf jobConf, Map<String, 
String> configuration, int numberOfPartitions)
 +            throws AlgebricksException {
 +        setHadoopCredentials(jobConf, configuration);
 +
 +        // set endpoint if provided, default is 
https://storage.googleapis.com/
 +        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
 +        if (endpoint != null) {
 +            jobConf.set(HADOOP_ENDPOINT, endpoint);
 +        }
 +
 +        // disable caching FileSystem
 +        HDFSUtils.disableHadoopFileSystemCache(jobConf, HADOOP_GCS_PROTOCOL);
 +
 +        // TODO(htowaileb): make configurable, in case we hit rate limits 
then we can reduce it, default is 15
 +        if (numberOfPartitions != 0) {
 +            jobConf.set(GCSConstants.MAX_BATCH_THREADS, 
String.valueOf(numberOfPartitions));
 +        }
 +
 +        // recommended to be disabled by GCP hadoop team
 +        jobConf.set(GCSConstants.HADOOP_SUPPORT_COMPRESSED, 
ExternalDataConstants.FALSE);
 +    }
 +
 +    /**
 +     * Sets the credentials provider type and the credentials to hadoop based 
on the provided configuration
 +     *
 +     * @param jobConf hadoop job config
 +     * @param configuration external details configuration
 +     * @throws CompilationException CompilationException
 +     */
 +    private static void setHadoopCredentials(JobConf jobConf, Map<String, 
String> configuration)
 +            throws CompilationException {
 +        AuthenticationType authenticationType = 
getAuthenticationType(configuration);
 +        switch (authenticationType) {
 +            case ANONYMOUS:
 +                jobConf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_UNAUTHENTICATED);
 +                break;
 +            case IMPERSONATE_SERVICE_ACCOUNT:
 +                String impersonateServiceAccount = 
configuration.get(IMPERSONATE_SERVICE_ACCOUNT_FIELD_NAME);
 +                jobConf.set(IMPERSONATE_SERVICE_ACCOUNT, 
impersonateServiceAccount);
 +                setJsonCredentials(jobConf, configuration);
 +                break;
 +            case SERVICE_ACCOUNT_KEY_JSON_CREDENTIALS:
 +                setJsonCredentials(jobConf, configuration);
 +                break;
 +            case BAD_AUTHENTICATION:
 +                throw new 
CompilationException(ErrorCode.NO_VALID_AUTHENTICATION_PARAMS_PROVIDED);
 +        }
 +    }
 +
 +    /**
 +     * Sets the Json credentials to hadoop job configuration
 +     * Note:
 +     * Setting these values instead of 
HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH is supported
 +     * in com.google.cloud.bigdataoss:util-hadoop only up to version 
hadoop3-2.2.x and is removed in
 +     * version 3.x.y, which also removed support for hadoop-2
 +     *
 +     * @param jobConf hadoop job config
 +     * @param configuration external details configuration
 +     * @throws CompilationException CompilationException
 +     */
 +    private static void setJsonCredentials(JobConf jobConf, Map<String, 
String> configuration)
 +            throws CompilationException {
 +        try {
 +            String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
 +            JsonNode jsonCreds = 
JSON_CREDENTIALS_OBJECT_MAPPER.readTree(jsonCredentials);
 +            jobConf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY_ID,
 +                    
jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY_ID).asText());
 +            jobConf.set(GCSConstants.HadoopAuthServiceAccount.PRIVATE_KEY,
 +                    
jsonCreds.get(GCSConstants.JsonCredentials.PRIVATE_KEY).asText());
 +            jobConf.set(GCSConstants.HadoopAuthServiceAccount.CLIENT_EMAIL,
 +                    
jsonCreds.get(GCSConstants.JsonCredentials.CLIENT_EMAIL).asText());
 +        } catch (JsonProcessingException e) {
 +            throw CompilationException.create(EXTERNAL_SOURCE_ERROR, e, 
"Unable to parse Json Credentials",
 +                    getMessageOrToString(e));
 +        }
 +    }
 +
 +    private static String getNonNull(Map<String, String> configuration, 
String... fieldNames) {
 +        for (String fieldName : fieldNames) {
 +            if (configuration.get(fieldName) != null) {
 +                return fieldName;
 +            }
 +        }
 +        return null;
 +    }
 +}

Reply via email to