This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fdc0d5dbce [ASTERIXDB-3237][RT] Add external filter rewrite pass
fdc0d5dbce is described below

commit fdc0d5dbce9e4e2e3b4b0b0cac64bda65195c886
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Mon Aug 7 15:30:49 2023 -0700

    [ASTERIXDB-3237][RT] Add external filter rewrite pass
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    This patch adds the rewrite pass to push down
    predicates that contain expressions that access
    external datasets' computed fields.
    
    Change-Id: If3f907191431e4d06e0ebf23e0db9f1f95595ca7
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17704
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Hussain Towaileb <[email protected]>
---
 .../rules/PushValueAccessAndFilterDownRule.java    | 10 ++-
 .../PushdownProcessorsExecutor.java                |  4 +-
 .../processor/AbstractFilterPushdownProcessor.java |  2 +-
 .../processor/ColumnFilterPushdownProcessor.java   |  2 +-
 .../ExternalDatasetFilterPushdownProcessor.java    | 90 ++++++++++++++++++++++
 .../input/filter/ExternalFilterValueEvaluator.java | 10 +--
 .../serde/AStringSerializerDeserializer.java       | 19 ++++-
 .../ExternalDatasetProjectionFiltrationInfo.java   |  2 +-
 8 files changed, 123 insertions(+), 16 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index b8f66f7134..3b298d07a3 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -27,11 +27,12 @@ import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.optimizer.base.AsterixOptimizationContext;
 import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownProcessorsExecutor;
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.ColumnFilterPushdownProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.ColumnRangeFilterPushdownProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.ColumnValueAccessPushdownProcessor;
+import 
org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.InlineFilterExpressionsProcessor;
-import 
org.apache.asterix.optimizer.rules.pushdown.processor.PushdownProcessorsExecutor;
 import 
org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -76,6 +77,7 @@ public class PushValueAccessAndFilterDownRule implements 
IAlgebraicRewriteRule {
     @Override
     public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
             throws AlgebricksException {
+        // TODO this should be revised after introducing the proper compiler 
flags
         if (!context.getPhysicalOptimizationConfig().isExternalFieldPushdown() 
|| !run) {
             //The rule was fired, or value access pushdown is disabled
             return false;
@@ -109,9 +111,11 @@ public class PushValueAccessAndFilterDownRule implements 
IAlgebraicRewriteRule {
         if (context.getPhysicalOptimizationConfig().isColumnFilterEnabled()) {
             // Performs filter pushdowns
             pushdownProcessorsExecutor.add(new 
ColumnFilterPushdownProcessor(pushdownContext, context));
-            // Perform range-filter pushdowns
+            // Performs range-filter pushdowns
             pushdownProcessorsExecutor.add(new 
ColumnRangeFilterPushdownProcessor(pushdownContext, context));
-            // Inline AND/OR expression
+            // Performs prefix pushdowns
+            pushdownProcessorsExecutor.add(new 
ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
+            // Inlines AND/OR expression (must be last to run)
             pushdownProcessorsExecutor.add(new 
InlineFilterExpressionsProcessor(pushdownContext, context));
         }
     }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
similarity index 98%
rename from 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
rename to 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
index 4106337336..1530f55a8c 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.optimizer.rules.pushdown.processor;
+package org.apache.asterix.optimizer.rules.pushdown;
 
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 
@@ -28,8 +28,8 @@ import org.apache.asterix.common.config.DatasetConfig;
 import org.apache.asterix.metadata.entities.Dataset;
 import org.apache.asterix.metadata.utils.DatasetUtil;
 import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
 import 
org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import 
org.apache.asterix.optimizer.rules.pushdown.processor.IPushdownProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
 import 
org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
 import 
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
index 4dbb06d754..4b5761784d 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
@@ -69,7 +69,7 @@ abstract class AbstractFilterPushdownProcessor extends 
AbstractPushdownProcessor
      * @param scanDefineDescriptor data-scan descriptor
      * @return true to skip, false otherwise
      */
-    protected abstract boolean skip(ScanDefineDescriptor scanDefineDescriptor);
+    protected abstract boolean skip(ScanDefineDescriptor scanDefineDescriptor) 
throws AlgebricksException;
 
     /**
      * Prepare data-scan for a pushdown
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
index 00b2dc4a65..8043f32008 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
@@ -73,7 +73,7 @@ public class ColumnFilterPushdownProcessor extends 
AbstractFilterPushdownProcess
     }
 
     @Override
-    protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) {
+    protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws 
AlgebricksException {
         Dataset dataset = scanDefineDescriptor.getDataset();
         LogicalOperatorTag scanOpTag = 
scanDefineDescriptor.getOperator().getOperatorTag();
         /*
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
new file mode 100644
index 0000000000..bb5c8531e4
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.ARRAY_FUNCTIONS;
+
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import 
org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ExternalDatasetFilterPushdownProcessor extends 
ColumnFilterPushdownProcessor {
+    private ExternalDataPrefix prefix;
+
+    public ExternalDatasetFilterPushdownProcessor(PushdownContext 
pushdownContext, IOptimizationContext context) {
+        super(pushdownContext, context);
+    }
+
+    @Override
+    protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws 
AlgebricksException {
+        Dataset dataset = scanDefineDescriptor.getDataset();
+        LogicalOperatorTag scanOpTag = 
scanDefineDescriptor.getOperator().getOperatorTag();
+        if (dataset.getDatasetType() != DatasetConfig.DatasetType.EXTERNAL) {
+            return true;
+        }
+
+        ExternalDatasetDetails edd = (ExternalDatasetDetails) 
dataset.getDatasetDetails();
+        prefix = new ExternalDataPrefix(edd.getProperties());
+
+        return !prefix.hasComputedFields() || scanOpTag != 
LogicalOperatorTag.DATASOURCESCAN
+                || !DatasetUtil.isFilterPushdownSupported(dataset);
+    }
+
+    @Override
+    protected void preparePushdown(UseDescriptor useDescriptor) throws 
AlgebricksException {
+        super.preparePushdown(useDescriptor);
+    }
+
+    @Override
+    protected boolean isPushable(AbstractFunctionCallExpression expression) {
+        FunctionIdentifier fid = expression.getFunctionIdentifier();
+        return !ARRAY_FUNCTIONS.contains(fid) && super.isPushable(expression);
+    }
+
+    @Override
+    protected boolean handlePath(AbstractFunctionCallExpression expression) 
throws AlgebricksException {
+        IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null);
+        if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) {
+            return false;
+        }
+
+        // The inferred path from the provided expression
+        ARecordType expressionPath = 
pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+        if (prefix.getPaths().contains(expressionPath)) {
+            // The expression refer to a declared computed field. Add it to 
the filter paths
+            paths.put(expression, expressionPath);
+            return true;
+        }
+        return false;
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
index 85da5690e0..6e3556c15b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.input.filter;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
 import 
org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -29,18 +30,17 @@ import 
org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
-import org.apache.hyracks.util.string.UTF8StringUtil;
 import org.apache.hyracks.util.string.UTF8StringWriter;
 
 class ExternalFilterValueEvaluator implements IExternalFilterValueEvaluator {
     private final ATypeTag typeTag;
     private final ArrayBackedValueStorage value;
-    private final UTF8StringWriter utf8StringWriter;
+    private final AStringSerializerDeserializer stringSerDer;
 
     ExternalFilterValueEvaluator(ATypeTag typeTag) {
         this.typeTag = typeTag;
         value = new ArrayBackedValueStorage();
-        utf8StringWriter = new UTF8StringWriter();
+        stringSerDer = new AStringSerializerDeserializer(new 
UTF8StringWriter(), null);
     }
 
     @Override
@@ -58,7 +58,7 @@ class ExternalFilterValueEvaluator implements 
IExternalFilterValueEvaluator {
         result.set(value);
     }
 
-    private void writeValue(ATypeTag typeTag, String stringValue) throws 
IOException {
+    private void writeValue(ATypeTag typeTag, String stringValue) throws 
HyracksDataException {
         DataOutput output = value.getDataOutput();
         SerializerDeserializerUtil.serializeTag(typeTag, output);
         switch (typeTag) {
@@ -70,7 +70,7 @@ class ExternalFilterValueEvaluator implements 
IExternalFilterValueEvaluator {
             case DOUBLE:
                 
DoubleSerializerDeserializer.write(Double.parseDouble(stringValue), output);
             case STRING:
-                UTF8StringUtil.writeUTF8(stringValue, output, 
utf8StringWriter);
+                stringSerDer.serialize(stringValue, output);
         }
     }
 }
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
index 814b8cbc69..5ac4794c6c 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
@@ -38,13 +38,22 @@ public class AStringSerializerDeserializer implements 
ISerializerDeserializer<AS
 
     private static final long serialVersionUID = 1L;
 
+    /**
+     * Using this singleton object may instantiate too many objects
+     *
+     * @deprecated use {{@link 
#AStringSerializerDeserializer(UTF8StringWriter, UTF8StringReader)}}
+     */
+    @Deprecated
     public static final AStringSerializerDeserializer INSTANCE = new 
AStringSerializerDeserializer();
     private final UTF8StringWriter utf8StringWriter;
     private final UTF8StringReader utf8StringReader;
 
     private AStringSerializerDeserializer() {
-        this.utf8StringWriter = null;
-        this.utf8StringReader = null;
+        this(null, null);
+    }
+
+    public AStringSerializerDeserializer(UTF8StringWriter utf8StringWriter) {
+        this(utf8StringWriter, null);
     }
 
     public AStringSerializerDeserializer(UTF8StringWriter utf8StringWriter, 
UTF8StringReader utf8StringReader) {
@@ -63,8 +72,12 @@ public class AStringSerializerDeserializer implements 
ISerializerDeserializer<AS
 
     @Override
     public void serialize(AString instance, DataOutput out) throws 
HyracksDataException {
+        serialize(instance.getStringValue(), out);
+    }
+
+    public void serialize(String value, DataOutput out) throws 
HyracksDataException {
         try {
-            UTF8StringUtil.writeUTF8(instance.getStringValue(), out, 
utf8StringWriter);
+            UTF8StringUtil.writeUTF8(value, out, utf8StringWriter);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
index 13fc6e2d9c..a1d22ac3fd 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
@@ -131,7 +131,7 @@ public class ExternalDatasetProjectionFiltrationInfo 
implements IProjectionFiltr
         }
 
         if (filterExpression != null) {
-            generator.writeStringField("filter-on", 
filterExpression.toString());
+            generator.writeStringField("prefix-filter-on", 
filterExpression.toString());
         }
     }
 

Reply via email to