This is an automated email from the ASF dual-hosted git repository.

htowaileb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 554dc27342 [ASTERIXDB-3247][EXT]: Push computed field evaluation to 
files listing
554dc27342 is described below

commit 554dc27342f54f7e451670b5f3df787b4d2eba1b
Author: Hussain Towaileb <[email protected]>
AuthorDate: Fri Aug 18 14:10:13 2023 +0300

    [ASTERIXDB-3247][EXT]: Push computed field evaluation to files listing
    
    Change-Id: I36ba077a26fbb142945e7b5ea7298548263c1d67
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17730
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Hussain Towaileb <[email protected]>
    Reviewed-by: Wail Alkowaileet <[email protected]>
---
 .../runtimets/testsuite_external_dataset_s3.xml    |  1 +
 .../record/reader/aws/AwsS3InputStreamFactory.java | 26 ++-----------
 .../aws/parquet/AwsS3ParquetReaderFactory.java     | 19 ++++++++--
 .../asterix/external/util/ExternalDataPrefix.java  |  5 +++
 .../asterix/external/util/ExternalDataUtils.java   | 18 +++++++++
 .../asterix/external/util/aws/s3/S3Utils.java      | 43 +++++++++++++---------
 6 files changed, 68 insertions(+), 44 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 50502da39c..ce906f63fd 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -233,6 +233,7 @@
         <expected-warn>Failed to evaluate computed field. File: 
'external-filter/department/accounting/0.json'. Computed Field Name: 'name'. 
Computed Field Type: 'bigint'. Computed Field Value: 'accounting'. Reason: 'For 
input string: "accounting"'</expected-warn>
         <expected-warn>Failed to evaluate computed field. File: 
'external-filter/department/engineering/0.json'. Computed Field Name: 'name'. 
Computed Field Type: 'bigint'. Computed Field Value: 'engineering'. Reason: 
'For input string: "engineering"'</expected-warn>
         <expected-warn>Failed to evaluate computed field. File: 
'external-filter/department/hr/0.json'. Computed Field Name: 'name'. Computed 
Field Type: 'bigint'. Computed Field Value: 'hr'. Reason: 'For input string: 
"hr"'</expected-warn>
+        <expected-warn>The provided external dataset configuration returned no 
files from the external source</expected-warn>
       </compilation-unit>
     </test-case>
     <test-case FilePath="external-dataset/s3/filter">
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 7ae992a5c5..045b746f65 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.asterix.external.input.record.reader.aws;
 
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -56,39 +55,22 @@ public class AwsS3InputStreamFactory extends 
AbstractExternalInputStreamFactory
         // Ensure the validity of include/exclude
         ExternalDataUtils.validateIncludeExclude(configuration);
         IncludeExcludeMatcher includeExcludeMatcher = 
ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+        IExternalFilterEvaluator evaluator = 
filterEvaluatorFactory.create(ctx, warningCollector);
 
-        //Get a list of S3 objects
+        // prepare prefix for computed field calculations
         ExternalDataPrefix externalDataPrefix = new 
ExternalDataPrefix(configuration, warningCollector);
         configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, 
externalDataPrefix.getRoot());
 
         // TODO(htowaileb): Since we're using the root to load the files then 
start filtering, it might end up being
         // very expensive since at the root of the prefix we might load 
millions of files, we should consider (when
         // possible) to get the value and add it
-        List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, 
includeExcludeMatcher, warningCollector);
-        filesOnly = filterPrefixes(externalDataPrefix, filesOnly, 
filterEvaluatorFactory.create(ctx, warningCollector));
+        List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, 
includeExcludeMatcher, warningCollector,
+                externalDataPrefix, evaluator);
 
         // Distribute work load amongst the partitions
         distributeWorkLoad(filesOnly, getPartitionsCount());
     }
 
-    private List<S3Object> filterPrefixes(ExternalDataPrefix prefix, 
List<S3Object> filesOnly,
-            IExternalFilterEvaluator evaluator) throws HyracksDataException {
-
-        // if no computed fields or empty files list, return the original list
-        if (filesOnly.isEmpty() || !prefix.hasComputedFields() || 
evaluator.isEmpty()) {
-            return filesOnly;
-        }
-
-        List<S3Object> filteredList = new ArrayList<>();
-        for (S3Object file : filesOnly) {
-            if (prefix.evaluate(file.key(), evaluator)) {
-                filteredList.add(file);
-            }
-        }
-
-        return filteredList;
-    }
-
     /**
      * To efficiently utilize the parallelism, work load will be distributed 
amongst the partitions based on the file
      * size.
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 66312bb173..bccb6f821f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -28,10 +28,12 @@ import java.util.Set;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
 import org.apache.asterix.external.input.HDFSDataSourceFactory;
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
 import org.apache.asterix.external.util.aws.s3.S3Utils;
@@ -56,9 +58,16 @@ public class AwsS3ParquetReaderFactory extends 
HDFSDataSourceFactory {
     public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
             IWarningCollector warningCollector, 
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
             throws AlgebricksException, HyracksDataException {
+
+        // prepare prefix for computed field calculations
+        ExternalDataPrefix externalDataPrefix = new 
ExternalDataPrefix(configuration, warningCollector);
+        IExternalFilterEvaluator evaluator = 
filterEvaluatorFactory.create(serviceCtx, warningCollector);
+        configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, 
externalDataPrefix.getRoot());
+
         //Get path
         String path = configuration.containsKey(ExternalDataConstants.KEY_PATH)
-                ? configuration.get(ExternalDataConstants.KEY_PATH) : 
buildPathURIs(configuration, warningCollector);
+                ? configuration.get(ExternalDataConstants.KEY_PATH)
+                : buildPathURIs(configuration, warningCollector, 
externalDataPrefix, evaluator);
         //Put S3 configurations to AsterixDB's Hadoop configuration
         putS3ConfToHadoopConf(configuration, path);
 
@@ -108,11 +117,13 @@ public class AwsS3ParquetReaderFactory extends 
HDFSDataSourceFactory {
      * @return Comma-delimited paths (e.g., 
"s3a://bucket/file1.parquet,s3a://bucket/file2.parquet")
      * @throws CompilationException Compilation exception
      */
-    private static String buildPathURIs(Map<String, String> configuration, 
IWarningCollector warningCollector)
-            throws CompilationException {
+    private static String buildPathURIs(Map<String, String> configuration, 
IWarningCollector warningCollector,
+            ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator 
evaluator)
+            throws CompilationException, HyracksDataException {
         String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
         IncludeExcludeMatcher includeExcludeMatcher = 
ExternalDataUtils.getIncludeExcludeMatchers(configuration);
-        List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, 
includeExcludeMatcher, warningCollector);
+        List<S3Object> filesOnly = S3Utils.listS3Objects(configuration, 
includeExcludeMatcher, warningCollector,
+                externalDataPrefix, evaluator);
         StringBuilder builder = new StringBuilder();
 
         if (!filesOnly.isEmpty()) {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
index 9d45fedf2d..f6973ee96c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
@@ -250,6 +250,11 @@ public class ExternalDataPrefix {
         // TODO provide the List to avoid array creation
         List<String> keySegments = extractPrefixSegments(key);
 
+        // no computed fields filter, accept path
+        if (!hasComputedFields() || evaluator.isEmpty()) {
+            return true;
+        }
+
         // segments of object key have to be larger than segments of the prefix
         if (keySegments.size() <= segments.size()) {
             return false;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index e60190bcf4..318716f953 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -45,6 +45,7 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
@@ -54,6 +55,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import org.apache.asterix.common.functions.ExternalFunctionLanguage;
 import org.apache.asterix.common.library.ILibrary;
 import org.apache.asterix.common.library.ILibraryManager;
@@ -980,4 +982,20 @@ public class ExternalDataUtils {
         argHolder.getDataOutput().writeByte(ARRAY16);
         argHolder.getDataOutput().writeShort((short) 0);
     }
+
+    /**
+     * Tests the provided key against all the provided predicates/evaluators 
and return true if they all pass.
+     *
+     * @param key key
+     * @param predicate predicate
+     * @param matchers matchers
+     * @param externalDataPrefix external data prefix
+     * @param evaluator evaluator
+     *
+     * @return true if key passes all tests, false otherwise
+     */
+    public static boolean evaluate(String key, BiPredicate<List<Matcher>, 
String> predicate, List<Matcher> matchers,
+            ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator 
evaluator) throws HyracksDataException {
+        return !key.endsWith("/") && predicate.test(matchers, key) && 
externalDataPrefix.evaluate(key, evaluator);
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index a4f3a1ea2f..1436e55962 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -56,13 +56,16 @@ import java.util.regex.Matcher;
 
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.exceptions.Warning;
@@ -360,7 +363,8 @@ public class S3Utils {
      */
     public static List<S3Object> listS3Objects(Map<String, String> 
configuration,
             AbstractExternalInputStreamFactory.IncludeExcludeMatcher 
includeExcludeMatcher,
-            IWarningCollector warningCollector) throws CompilationException {
+            IWarningCollector warningCollector, ExternalDataPrefix 
externalDataPrefix,
+            IExternalFilterEvaluator evaluator) throws CompilationException, 
HyracksDataException {
         // Prepare to retrieve the objects
         List<S3Object> filesOnly;
         String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -368,13 +372,15 @@ public class S3Utils {
         String prefix = getPrefix(configuration);
 
         try {
-            filesOnly = listS3Objects(s3Client, container, prefix, 
includeExcludeMatcher);
+            filesOnly =
+                    listS3Objects(s3Client, container, prefix, 
includeExcludeMatcher, externalDataPrefix, evaluator);
         } catch (S3Exception ex) {
             // New API is not implemented, try falling back to old API
             try {
                 // For error code, see 
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
                 if 
(ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
-                    filesOnly = oldApiListS3Objects(s3Client, container, 
prefix, includeExcludeMatcher);
+                    filesOnly = oldApiListS3Objects(s3Client, container, 
prefix, includeExcludeMatcher,
+                            externalDataPrefix, evaluator);
                 } else {
                     throw ex;
                 }
@@ -407,7 +413,8 @@ public class S3Utils {
      * @param includeExcludeMatcher include/exclude matchers to apply
      */
     private static List<S3Object> listS3Objects(S3Client s3Client, String 
container, String prefix,
-            AbstractExternalInputStreamFactory.IncludeExcludeMatcher 
includeExcludeMatcher) {
+            AbstractExternalInputStreamFactory.IncludeExcludeMatcher 
includeExcludeMatcher,
+            ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator 
evaluator) throws HyracksDataException {
         String newMarker = null;
         List<S3Object> filesOnly = new ArrayList<>();
 
@@ -425,7 +432,7 @@ public class S3Utils {
 
             // Collect the paths to files only
             collectAndFilterFiles(listObjectsResponse.contents(), 
includeExcludeMatcher.getPredicate(),
-                    includeExcludeMatcher.getMatchersList(), filesOnly);
+                    includeExcludeMatcher.getMatchersList(), filesOnly, 
externalDataPrefix, evaluator);
 
             // Mark the flag as done if done, otherwise, get the marker of the 
previous response for the next request
             if (!listObjectsResponse.isTruncated()) {
@@ -447,7 +454,8 @@ public class S3Utils {
      * @param includeExcludeMatcher include/exclude matchers to apply
      */
     private static List<S3Object> oldApiListS3Objects(S3Client s3Client, 
String container, String prefix,
-            AbstractExternalInputStreamFactory.IncludeExcludeMatcher 
includeExcludeMatcher) {
+            AbstractExternalInputStreamFactory.IncludeExcludeMatcher 
includeExcludeMatcher,
+            ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator 
evaluator) throws HyracksDataException {
         String newMarker = null;
         List<S3Object> filesOnly = new ArrayList<>();
 
@@ -465,7 +473,7 @@ public class S3Utils {
 
             // Collect the paths to files only
             collectAndFilterFiles(listObjectsResponse.contents(), 
includeExcludeMatcher.getPredicate(),
-                    includeExcludeMatcher.getMatchersList(), filesOnly);
+                    includeExcludeMatcher.getMatchersList(), filesOnly, 
externalDataPrefix, evaluator);
 
             // Mark the flag as done if done, otherwise, get the marker of the 
previous response for the next request
             if (!listObjectsResponse.isTruncated()) {
@@ -479,21 +487,20 @@ public class S3Utils {
     }
 
     /**
-     * AWS S3 returns all the objects as paths, not differentiating between 
folder and files. The path is considered
-     * a file if it does not end up with a "/" which is the separator in a 
folder structure.
+     * Collects only files that pass all tests
      *
-     * @param s3Objects List of returned objects
+     * @param s3Objects s3 objects
+     * @param predicate predicate
+     * @param matchers matchers
+     * @param filesOnly filtered files
+     * @param externalDataPrefix external data prefix
+     * @param evaluator evaluator
      */
     private static void collectAndFilterFiles(List<S3Object> s3Objects, 
BiPredicate<List<Matcher>, String> predicate,
-            List<Matcher> matchers, List<S3Object> filesOnly) {
+            List<Matcher> matchers, List<S3Object> filesOnly, 
ExternalDataPrefix externalDataPrefix,
+            IExternalFilterEvaluator evaluator) throws HyracksDataException {
         for (S3Object object : s3Objects) {
-            // skip folders
-            if (object.key().endsWith("/")) {
-                continue;
-            }
-
-            // No filter, add file
-            if (predicate.test(matchers, object.key())) {
+            if (ExternalDataUtils.evaluate(object.key(), predicate, matchers, 
externalDataPrefix, evaluator)) {
                 filesOnly.add(object);
             }
         }

Reply via email to