This is an automated email from the ASF dual-hosted git repository.
htowaileb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 554dc27342 [ASTERIXDB-3247][EXT]: Push computed field evaluation to
files listing
554dc27342 is described below
commit 554dc27342f54f7e451670b5f3df787b4d2eba1b
Author: Hussain Towaileb <[email protected]>
AuthorDate: Fri Aug 18 14:10:13 2023 +0300
[ASTERIXDB-3247][EXT]: Push computed field evaluation to files listing
Change-Id: I36ba077a26fbb142945e7b5ea7298548263c1d67
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17730
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Hussain Towaileb <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
---
.../runtimets/testsuite_external_dataset_s3.xml | 1 +
.../record/reader/aws/AwsS3InputStreamFactory.java | 26 ++-----------
.../aws/parquet/AwsS3ParquetReaderFactory.java | 19 ++++++++--
.../asterix/external/util/ExternalDataPrefix.java | 5 +++
.../asterix/external/util/ExternalDataUtils.java | 18 +++++++++
.../asterix/external/util/aws/s3/S3Utils.java | 43 +++++++++++++---------
6 files changed, 68 insertions(+), 44 deletions(-)
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index 50502da39c..ce906f63fd 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -233,6 +233,7 @@
<expected-warn>Failed to evaluate computed field. File:
'external-filter/department/accounting/0.json'. Computed Field Name: 'name'.
Computed Field Type: 'bigint'. Computed Field Value: 'accounting'. Reason: 'For
input string: "accounting"'</expected-warn>
<expected-warn>Failed to evaluate computed field. File:
'external-filter/department/engineering/0.json'. Computed Field Name: 'name'.
Computed Field Type: 'bigint'. Computed Field Value: 'engineering'. Reason:
'For input string: "engineering"'</expected-warn>
<expected-warn>Failed to evaluate computed field. File:
'external-filter/department/hr/0.json'. Computed Field Name: 'name'. Computed
Field Type: 'bigint'. Computed Field Value: 'hr'. Reason: 'For input string:
"hr"'</expected-warn>
+ <expected-warn>The provided external dataset configuration returned no
files from the external source</expected-warn>
</compilation-unit>
</test-case>
<test-case FilePath="external-dataset/s3/filter">
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index 7ae992a5c5..045b746f65 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -18,7 +18,6 @@
*/
package org.apache.asterix.external.input.record.reader.aws;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -56,39 +55,22 @@ public class AwsS3InputStreamFactory extends
AbstractExternalInputStreamFactory
// Ensure the validity of include/exclude
ExternalDataUtils.validateIncludeExclude(configuration);
IncludeExcludeMatcher includeExcludeMatcher =
ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+ IExternalFilterEvaluator evaluator =
filterEvaluatorFactory.create(ctx, warningCollector);
- //Get a list of S3 objects
+ // prepare prefix for computed field calculations
ExternalDataPrefix externalDataPrefix = new
ExternalDataPrefix(configuration, warningCollector);
configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME,
externalDataPrefix.getRoot());
// TODO(htowaileb): Since we're using the root to load the files then
start filtering, it might end up being
// very expensive since at the root of the prefix we might load
millions of files, we should consider (when
// possible) to get the value and add it
- List<S3Object> filesOnly = S3Utils.listS3Objects(configuration,
includeExcludeMatcher, warningCollector);
- filesOnly = filterPrefixes(externalDataPrefix, filesOnly,
filterEvaluatorFactory.create(ctx, warningCollector));
+ List<S3Object> filesOnly = S3Utils.listS3Objects(configuration,
includeExcludeMatcher, warningCollector,
+ externalDataPrefix, evaluator);
// Distribute work load amongst the partitions
distributeWorkLoad(filesOnly, getPartitionsCount());
}
- private List<S3Object> filterPrefixes(ExternalDataPrefix prefix,
List<S3Object> filesOnly,
- IExternalFilterEvaluator evaluator) throws HyracksDataException {
-
- // if no computed fields or empty files list, return the original list
- if (filesOnly.isEmpty() || !prefix.hasComputedFields() ||
evaluator.isEmpty()) {
- return filesOnly;
- }
-
- List<S3Object> filteredList = new ArrayList<>();
- for (S3Object file : filesOnly) {
- if (prefix.evaluate(file.key(), evaluator)) {
- filteredList.add(file);
- }
- }
-
- return filteredList;
- }
-
/**
* To efficiently utilize the parallelism, work load will be distributed
amongst the partitions based on the file
* size.
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
index 66312bb173..bccb6f821f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/parquet/AwsS3ParquetReaderFactory.java
@@ -28,10 +28,12 @@ import java.util.Set;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.external.input.HDFSDataSourceFactory;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.aws.s3.S3Constants;
import org.apache.asterix.external.util.aws.s3.S3Utils;
@@ -56,9 +58,16 @@ public class AwsS3ParquetReaderFactory extends
HDFSDataSourceFactory {
public void configure(IServiceContext serviceCtx, Map<String, String>
configuration,
IWarningCollector warningCollector,
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
throws AlgebricksException, HyracksDataException {
+
+ // prepare prefix for computed field calculations
+ ExternalDataPrefix externalDataPrefix = new
ExternalDataPrefix(configuration, warningCollector);
+ IExternalFilterEvaluator evaluator =
filterEvaluatorFactory.create(serviceCtx, warningCollector);
+ configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME,
externalDataPrefix.getRoot());
+
//Get path
String path = configuration.containsKey(ExternalDataConstants.KEY_PATH)
- ? configuration.get(ExternalDataConstants.KEY_PATH) :
buildPathURIs(configuration, warningCollector);
+ ? configuration.get(ExternalDataConstants.KEY_PATH)
+ : buildPathURIs(configuration, warningCollector,
externalDataPrefix, evaluator);
//Put S3 configurations to AsterixDB's Hadoop configuration
putS3ConfToHadoopConf(configuration, path);
@@ -108,11 +117,13 @@ public class AwsS3ParquetReaderFactory extends
HDFSDataSourceFactory {
* @return Comma-delimited paths (e.g.,
"s3a://bucket/file1.parquet,s3a://bucket/file2.parquet")
* @throws CompilationException Compilation exception
*/
- private static String buildPathURIs(Map<String, String> configuration,
IWarningCollector warningCollector)
- throws CompilationException {
+ private static String buildPathURIs(Map<String, String> configuration,
IWarningCollector warningCollector,
+ ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator
evaluator)
+ throws CompilationException, HyracksDataException {
String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
IncludeExcludeMatcher includeExcludeMatcher =
ExternalDataUtils.getIncludeExcludeMatchers(configuration);
- List<S3Object> filesOnly = S3Utils.listS3Objects(configuration,
includeExcludeMatcher, warningCollector);
+ List<S3Object> filesOnly = S3Utils.listS3Objects(configuration,
includeExcludeMatcher, warningCollector,
+ externalDataPrefix, evaluator);
StringBuilder builder = new StringBuilder();
if (!filesOnly.isEmpty()) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
index 9d45fedf2d..f6973ee96c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
@@ -250,6 +250,11 @@ public class ExternalDataPrefix {
// TODO provide the List to avoid array creation
List<String> keySegments = extractPrefixSegments(key);
+ // no computed fields filter, accept path
+ if (!hasComputedFields() || evaluator.isEmpty()) {
+ return true;
+ }
+
// segments of object key have to be larger than segments of the prefix
if (keySegments.size() <= segments.size()) {
return false;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index e60190bcf4..318716f953 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -45,6 +45,7 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.BiPredicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -54,6 +55,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import org.apache.asterix.common.functions.ExternalFunctionLanguage;
import org.apache.asterix.common.library.ILibrary;
import org.apache.asterix.common.library.ILibraryManager;
@@ -980,4 +982,20 @@ public class ExternalDataUtils {
argHolder.getDataOutput().writeByte(ARRAY16);
argHolder.getDataOutput().writeShort((short) 0);
}
+
+ /**
+ * Tests the provided key against all the provided predicates/evaluators
and return true if they all pass.
+ *
+ * @param key key
+ * @param predicate predicate
+ * @param matchers matchers
+ * @param externalDataPrefix external data prefix
+ * @param evaluator evaluator
+ *
+ * @return true if key passes all tests, false otherwise
+ */
+ public static boolean evaluate(String key, BiPredicate<List<Matcher>,
String> predicate, List<Matcher> matchers,
+ ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator
evaluator) throws HyracksDataException {
+ return !key.endsWith("/") && predicate.test(matchers, key) &&
externalDataPrefix.evaluate(key, evaluator);
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index a4f3a1ea2f..1436e55962 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -56,13 +56,16 @@ import java.util.regex.Matcher;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
import
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
@@ -360,7 +363,8 @@ public class S3Utils {
*/
public static List<S3Object> listS3Objects(Map<String, String>
configuration,
AbstractExternalInputStreamFactory.IncludeExcludeMatcher
includeExcludeMatcher,
- IWarningCollector warningCollector) throws CompilationException {
+ IWarningCollector warningCollector, ExternalDataPrefix
externalDataPrefix,
+ IExternalFilterEvaluator evaluator) throws CompilationException,
HyracksDataException {
// Prepare to retrieve the objects
List<S3Object> filesOnly;
String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
@@ -368,13 +372,15 @@ public class S3Utils {
String prefix = getPrefix(configuration);
try {
- filesOnly = listS3Objects(s3Client, container, prefix,
includeExcludeMatcher);
+ filesOnly =
+ listS3Objects(s3Client, container, prefix,
includeExcludeMatcher, externalDataPrefix, evaluator);
} catch (S3Exception ex) {
// New API is not implemented, try falling back to old API
try {
// For error code, see
https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if
(ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
- filesOnly = oldApiListS3Objects(s3Client, container,
prefix, includeExcludeMatcher);
+ filesOnly = oldApiListS3Objects(s3Client, container,
prefix, includeExcludeMatcher,
+ externalDataPrefix, evaluator);
} else {
throw ex;
}
@@ -407,7 +413,8 @@ public class S3Utils {
* @param includeExcludeMatcher include/exclude matchers to apply
*/
private static List<S3Object> listS3Objects(S3Client s3Client, String
container, String prefix,
- AbstractExternalInputStreamFactory.IncludeExcludeMatcher
includeExcludeMatcher) {
+ AbstractExternalInputStreamFactory.IncludeExcludeMatcher
includeExcludeMatcher,
+ ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator
evaluator) throws HyracksDataException {
String newMarker = null;
List<S3Object> filesOnly = new ArrayList<>();
@@ -425,7 +432,7 @@ public class S3Utils {
// Collect the paths to files only
collectAndFilterFiles(listObjectsResponse.contents(),
includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly);
+ includeExcludeMatcher.getMatchersList(), filesOnly,
externalDataPrefix, evaluator);
// Mark the flag as done if done, otherwise, get the marker of the
previous response for the next request
if (!listObjectsResponse.isTruncated()) {
@@ -447,7 +454,8 @@ public class S3Utils {
* @param includeExcludeMatcher include/exclude matchers to apply
*/
private static List<S3Object> oldApiListS3Objects(S3Client s3Client,
String container, String prefix,
- AbstractExternalInputStreamFactory.IncludeExcludeMatcher
includeExcludeMatcher) {
+ AbstractExternalInputStreamFactory.IncludeExcludeMatcher
includeExcludeMatcher,
+ ExternalDataPrefix externalDataPrefix, IExternalFilterEvaluator
evaluator) throws HyracksDataException {
String newMarker = null;
List<S3Object> filesOnly = new ArrayList<>();
@@ -465,7 +473,7 @@ public class S3Utils {
// Collect the paths to files only
collectAndFilterFiles(listObjectsResponse.contents(),
includeExcludeMatcher.getPredicate(),
- includeExcludeMatcher.getMatchersList(), filesOnly);
+ includeExcludeMatcher.getMatchersList(), filesOnly,
externalDataPrefix, evaluator);
// Mark the flag as done if done, otherwise, get the marker of the
previous response for the next request
if (!listObjectsResponse.isTruncated()) {
@@ -479,21 +487,20 @@ public class S3Utils {
}
/**
- * AWS S3 returns all the objects as paths, not differentiating between
folder and files. The path is considered
- * a file if it does not end up with a "/" which is the separator in a
folder structure.
+ * Collects only files that pass all tests
*
- * @param s3Objects List of returned objects
+ * @param s3Objects s3 objects
+ * @param predicate predicate
+ * @param matchers matchers
+ * @param filesOnly filtered files
+ * @param externalDataPrefix external data prefix
+ * @param evaluator evaluator
*/
private static void collectAndFilterFiles(List<S3Object> s3Objects,
BiPredicate<List<Matcher>, String> predicate,
- List<Matcher> matchers, List<S3Object> filesOnly) {
+ List<Matcher> matchers, List<S3Object> filesOnly,
ExternalDataPrefix externalDataPrefix,
+ IExternalFilterEvaluator evaluator) throws HyracksDataException {
for (S3Object object : s3Objects) {
- // skip folders
- if (object.key().endsWith("/")) {
- continue;
- }
-
- // No filter, add file
- if (predicate.test(matchers, object.key())) {
+ if (ExternalDataUtils.evaluate(object.key(), predicate, matchers,
externalDataPrefix, evaluator)) {
filesOnly.add(object);
}
}