This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit e5236fd0d9d9578345c8b8c815e461eca967e91d
Merge: a48fbb0f62 3a5a1936dc
Author: Michael Blow <[email protected]>
AuthorDate: Mon Nov 18 14:46:24 2024 -0500

    Merge branch 'gerrit/neo' into 'gerrit/trinity'
    
    Ext-ref: MB-64269
    Change-Id: If13d823b4e4a0e0a3887554a231995c9166a31da

 .../java/org/apache/asterix/external/util/google/gcs/GCSUtils.java | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --cc 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 6183a88143,0000000000..a7f74a91b8
mode 100644,000000..100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@@ -1,242 -1,0 +1,245 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.asterix.external.util.google.gcs;
 +
 +import static 
org.apache.asterix.common.exceptions.ErrorCode.EXTERNAL_SOURCE_ERROR;
 +import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
 +import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
 +import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
 +import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_TYPE;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_AUTH_UNAUTHENTICATED;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_ENDPOINT;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.HADOOP_GCS_PROTOCOL;
 +import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.JSON_CREDENTIALS_FIELD_NAME;
 +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.nio.file.Path;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.function.BiPredicate;
 +import java.util.regex.Matcher;
 +
 +import org.apache.asterix.common.exceptions.CompilationException;
 +import org.apache.asterix.common.exceptions.ErrorCode;
 +import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 +import org.apache.asterix.external.util.ExternalDataConstants;
 +import org.apache.asterix.external.util.ExternalDataUtils;
 +import org.apache.asterix.external.util.HDFSUtils;
 +import org.apache.hadoop.mapred.JobConf;
 +import org.apache.hyracks.api.exceptions.IWarningCollector;
 +import org.apache.hyracks.api.exceptions.SourceLocation;
 +import org.apache.hyracks.api.exceptions.Warning;
 +
 +import com.google.api.gax.paging.Page;
 +import com.google.auth.oauth2.GoogleCredentials;
 +import com.google.cloud.BaseServiceException;
 +import com.google.cloud.NoCredentials;
 +import com.google.cloud.storage.Blob;
 +import com.google.cloud.storage.Storage;
 +import com.google.cloud.storage.StorageOptions;
 +
 +public class GCSUtils {
 +    private GCSUtils() {
 +        throw new AssertionError("do not instantiate");
 +
 +    }
 +
 +    /**
 +     * Builds the client using the provided configuration
 +     *
 +     * @param configuration properties
 +     * @return 
clientasterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 +     * @throws CompilationException CompilationException
 +     */
 +    public static Storage buildClient(Map<String, String> configuration) 
throws CompilationException {
 +        String applicationDefaultCredentials = 
configuration.get(APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
 +        String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
 +        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
 +
 +        StorageOptions.Builder builder = StorageOptions.newBuilder();
 +
 +        // default credentials provider
 +        if (applicationDefaultCredentials != null) {
 +            // only "true" value is allowed
 +            if (!applicationDefaultCredentials.equalsIgnoreCase("true")) {
 +                throw new 
CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE,
 +                        APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME, "true");
 +            }
 +
 +            // no other authentication parameters are allowed
 +            if (jsonCredentials != null) {
 +                throw new 
CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, 
JSON_CREDENTIALS_FIELD_NAME,
 +                        APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME);
 +            }
 +
 +            try {
 +                
builder.setCredentials(GoogleCredentials.getApplicationDefault());
-             } catch (IOException ex) {
++            } catch (Exception ex) {
 +                throw CompilationException.create(EXTERNAL_SOURCE_ERROR, 
getMessageOrToString(ex));
 +            }
 +        } else if (jsonCredentials != null) {
 +            try (InputStream credentialsStream = new 
ByteArrayInputStream(jsonCredentials.getBytes())) {
 +                
builder.setCredentials(GoogleCredentials.fromStream(credentialsStream));
 +            } catch (IOException ex) {
-                 throw new CompilationException(EXTERNAL_SOURCE_ERROR, 
getMessageOrToString(ex));
++                throw CompilationException.create(EXTERNAL_SOURCE_ERROR, 
getMessageOrToString(ex));
++            } catch (Exception ex) {
++                throw new CompilationException(EXTERNAL_SOURCE_ERROR,
++                        "Encountered an issue while processing the JSON 
credentials. Please ensure the provided credentials are valid.");
 +            }
 +        } else {
 +            builder.setCredentials(NoCredentials.getInstance());
 +        }
 +
 +        if (endpoint != null) {
 +            builder.setHost(endpoint);
 +        }
 +
 +        return builder.build().getService();
 +    }
 +
 +    /**
 +     * Validate external dataset properties
 +     *
 +     * @param configuration properties
 +     * @throws CompilationException Compilation exception
 +     */
 +    public static void validateProperties(Map<String, String> configuration, 
SourceLocation srcLoc,
 +            IWarningCollector collector) throws CompilationException {
 +
 +        // check if the format property is present
 +        if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
 +            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
 +        }
 +
 +        validateIncludeExclude(configuration);
 +        String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
 +
 +        try {
 +            Storage.BlobListOption limitOption = 
Storage.BlobListOption.pageSize(1);
 +            Storage.BlobListOption prefixOption = 
Storage.BlobListOption.prefix(getPrefix(configuration));
 +            Storage storage = buildClient(configuration);
 +            Page<Blob> items = storage.list(container, limitOption, 
prefixOption);
 +
 +            if (!items.iterateAll().iterator().hasNext() && 
collector.shouldWarn()) {
 +                Warning warning = Warning.of(srcLoc, 
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
 +                collector.warn(warning);
 +            }
 +        } catch (CompilationException ex) {
 +            throw ex;
 +        } catch (Exception ex) {
 +            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
getMessageOrToString(ex));
 +        }
 +    }
 +
 +    public static List<Blob> listItems(Map<String, String> configuration, 
IncludeExcludeMatcher includeExcludeMatcher,
 +            IWarningCollector warningCollector) throws CompilationException {
 +        // Prepare to retrieve the objects
 +        List<Blob> filesOnly = new ArrayList<>();
 +        String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
 +        Storage gcs = buildClient(configuration);
 +        Storage.BlobListOption options = 
Storage.BlobListOption.prefix(ExternalDataUtils.getPrefix(configuration));
 +        Page<Blob> items;
 +
 +        try {
 +            items = gcs.list(container, options);
 +        } catch (BaseServiceException ex) {
 +            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, 
getMessageOrToString(ex));
 +        }
 +
 +        // Collect the paths to files only
 +        collectAndFilterFiles(items, includeExcludeMatcher.getPredicate(), 
includeExcludeMatcher.getMatchersList(),
 +                filesOnly);
 +
 +        // Warn if no files are returned
 +        if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
 +            Warning warning = Warning.of(null, 
ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
 +            warningCollector.warn(warning);
 +        }
 +
 +        return filesOnly;
 +    }
 +
 +    /**
 +     * Excludes paths ending with "/" as that's a directory indicator, we 
need to return the files only
 +     *
 +     * @param items List of returned objects
 +     */
 +    private static void collectAndFilterFiles(Page<Blob> items, 
BiPredicate<List<Matcher>, String> predicate,
 +            List<Matcher> matchers, List<Blob> filesOnly) {
 +        for (Blob item : items.iterateAll()) {
 +            // skip folders
 +            if (item.getName().endsWith("/")) {
 +                continue;
 +            }
 +
 +            // No filter, add file
 +            if (predicate.test(matchers, item.getName())) {
 +                filesOnly.add(item);
 +            }
 +        }
 +    }
 +
 +    /**
 +     * Builds the client using the provided configuration
 +     *
 +     * @param configuration      properties
 +     * @param numberOfPartitions number of partitions in the cluster
 +     */
 +    public static void configureHdfsJobConf(JobConf conf, Map<String, String> 
configuration, int numberOfPartitions) {
 +        String jsonCredentials = 
configuration.get(JSON_CREDENTIALS_FIELD_NAME);
 +        String endpoint = configuration.get(ENDPOINT_FIELD_NAME);
 +
 +        // disable caching FileSystem
 +        HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_GCS_PROTOCOL);
 +
 +        // TODO(htowaileb): needs further testing, recommended to disable by 
gcs-hadoop team
 +        conf.set(GCSConstants.HADOOP_SUPPORT_COMPRESSED, 
ExternalDataConstants.FALSE);
 +
 +        // TODO(htowaileb): needs further testing
 +        // set number of threads
 +        //        conf.set(GCSConstants.HADOOP_MAX_REQUESTS_PER_BATCH, 
String.valueOf(numberOfPartitions));
 +        //        conf.set(GCSConstants.HADOOP_BATCH_THREADS, 
String.valueOf(numberOfPartitions));
 +
 +        // authentication method
 +        // TODO(htowaileb): find a way to pass the content instead of the 
path to keyfile, this line is temporary
 +        Path credentials = Path.of("credentials.json");
 +        if (jsonCredentials == null) {
 +            // anonymous access
 +            conf.set(HADOOP_AUTH_TYPE, HADOOP_AUTH_UNAUTHENTICATED);
 +        } else {
 +            // TODO(htowaileb) need to pass the file content
 +            conf.set(HADOOP_AUTH_TYPE, 
HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE);
 +            conf.set(HADOOP_AUTH_SERVICE_ACCOUNT_JSON_KEY_FILE_PATH, 
credentials.toAbsolutePath().toString());
 +        }
 +
 +        // set endpoint if provided, default is 
https://storage.googleapis.com/
 +        if (endpoint != null) {
 +            conf.set(HADOOP_ENDPOINT, endpoint);
 +        }
 +    }
 +}

Reply via email to