This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new fdc0d5dbce [ASTERIXDB-3237][RT] Add external filter rewrite pass
fdc0d5dbce is described below
commit fdc0d5dbce9e4e2e3b4b0b0cac64bda65195c886
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Mon Aug 7 15:30:49 2023 -0700
[ASTERIXDB-3237][RT] Add external filter rewrite pass
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
This patch adds the rewrite pass to push down
predicates that contain expressions that access
external datasets' computed fields.
Change-Id: If3f907191431e4d06e0ebf23e0db9f1f95595ca7
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17704
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Hussain Towaileb <[email protected]>
---
.../rules/PushValueAccessAndFilterDownRule.java | 10 ++-
.../PushdownProcessorsExecutor.java | 4 +-
.../processor/AbstractFilterPushdownProcessor.java | 2 +-
.../processor/ColumnFilterPushdownProcessor.java | 2 +-
.../ExternalDatasetFilterPushdownProcessor.java | 90 ++++++++++++++++++++++
.../input/filter/ExternalFilterValueEvaluator.java | 10 +--
.../serde/AStringSerializerDeserializer.java | 19 ++++-
.../ExternalDatasetProjectionFiltrationInfo.java | 2 +-
8 files changed, 123 insertions(+), 16 deletions(-)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index b8f66f7134..3b298d07a3 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -27,11 +27,12 @@ import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.optimizer.base.AsterixOptimizationContext;
import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownProcessorsExecutor;
import
org.apache.asterix.optimizer.rules.pushdown.processor.ColumnFilterPushdownProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.processor.ColumnRangeFilterPushdownProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.processor.ColumnValueAccessPushdownProcessor;
+import
org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.processor.InlineFilterExpressionsProcessor;
-import
org.apache.asterix.optimizer.rules.pushdown.processor.PushdownProcessorsExecutor;
import
org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -76,6 +77,7 @@ public class PushValueAccessAndFilterDownRule implements
IAlgebraicRewriteRule {
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef,
IOptimizationContext context)
throws AlgebricksException {
+ // TODO this should be revised after introducing the proper compiler
flags
if (!context.getPhysicalOptimizationConfig().isExternalFieldPushdown()
|| !run) {
//The rule was fired, or value access pushdown is disabled
return false;
@@ -109,9 +111,11 @@ public class PushValueAccessAndFilterDownRule implements
IAlgebraicRewriteRule {
if (context.getPhysicalOptimizationConfig().isColumnFilterEnabled()) {
// Performs filter pushdowns
pushdownProcessorsExecutor.add(new
ColumnFilterPushdownProcessor(pushdownContext, context));
- // Perform range-filter pushdowns
+ // Performs range-filter pushdowns
pushdownProcessorsExecutor.add(new
ColumnRangeFilterPushdownProcessor(pushdownContext, context));
- // Inline AND/OR expression
+ // Performs prefix pushdowns
+ pushdownProcessorsExecutor.add(new
ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
+ // Inlines AND/OR expression (must be last to run)
pushdownProcessorsExecutor.add(new
InlineFilterExpressionsProcessor(pushdownContext, context));
}
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
similarity index 98%
rename from
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
rename to
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
index 4106337336..1530f55a8c 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/PushdownProcessorsExecutor.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.optimizer.rules.pushdown.processor;
+package org.apache.asterix.optimizer.rules.pushdown;
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
@@ -28,8 +28,8 @@ import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
import
org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import
org.apache.asterix.optimizer.rules.pushdown.processor.IPushdownProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
import
org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
import
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
index 4dbb06d754..4b5761784d 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/AbstractFilterPushdownProcessor.java
@@ -69,7 +69,7 @@ abstract class AbstractFilterPushdownProcessor extends
AbstractPushdownProcessor
* @param scanDefineDescriptor data-scan descriptor
* @return true to skip, false otherwise
*/
- protected abstract boolean skip(ScanDefineDescriptor scanDefineDescriptor);
+ protected abstract boolean skip(ScanDefineDescriptor scanDefineDescriptor)
throws AlgebricksException;
/**
* Prepare data-scan for a pushdown
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
index 00b2dc4a65..8043f32008 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ColumnFilterPushdownProcessor.java
@@ -73,7 +73,7 @@ public class ColumnFilterPushdownProcessor extends
AbstractFilterPushdownProcess
}
@Override
- protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) {
+ protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws
AlgebricksException {
Dataset dataset = scanDefineDescriptor.getDataset();
LogicalOperatorTag scanOpTag =
scanDefineDescriptor.getOperator().getOperatorTag();
/*
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
new file mode 100644
index 0000000000..bb5c8531e4
--- /dev/null
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/ExternalDatasetFilterPushdownProcessor.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.ARRAY_FUNCTIONS;
+
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import
org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import org.apache.asterix.optimizer.rules.pushdown.descriptor.UseDescriptor;
+import
org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import
org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class ExternalDatasetFilterPushdownProcessor extends
ColumnFilterPushdownProcessor {
+ private ExternalDataPrefix prefix;
+
+ public ExternalDatasetFilterPushdownProcessor(PushdownContext
pushdownContext, IOptimizationContext context) {
+ super(pushdownContext, context);
+ }
+
+ @Override
+ protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws
AlgebricksException {
+ Dataset dataset = scanDefineDescriptor.getDataset();
+ LogicalOperatorTag scanOpTag =
scanDefineDescriptor.getOperator().getOperatorTag();
+ if (dataset.getDatasetType() != DatasetConfig.DatasetType.EXTERNAL) {
+ return true;
+ }
+
+ ExternalDatasetDetails edd = (ExternalDatasetDetails)
dataset.getDatasetDetails();
+ prefix = new ExternalDataPrefix(edd.getProperties());
+
+ return !prefix.hasComputedFields() || scanOpTag !=
LogicalOperatorTag.DATASOURCESCAN
+ || !DatasetUtil.isFilterPushdownSupported(dataset);
+ }
+
+ @Override
+ protected void preparePushdown(UseDescriptor useDescriptor) throws
AlgebricksException {
+ super.preparePushdown(useDescriptor);
+ }
+
+ @Override
+ protected boolean isPushable(AbstractFunctionCallExpression expression) {
+ FunctionIdentifier fid = expression.getFunctionIdentifier();
+ return !ARRAY_FUNCTIONS.contains(fid) && super.isPushable(expression);
+ }
+
+ @Override
+ protected boolean handlePath(AbstractFunctionCallExpression expression)
throws AlgebricksException {
+ IExpectedSchemaNode node = expression.accept(exprToNodeVisitor, null);
+ if (node == null || node.getType() != ExpectedSchemaNodeType.ANY) {
+ return false;
+ }
+
+ // The inferred path from the provided expression
+ ARecordType expressionPath =
pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+ if (prefix.getPaths().contains(expressionPath)) {
+ // The expression refer to a declared computed field. Add it to
the filter paths
+ paths.put(expression, expressionPath);
+ return true;
+ }
+ return false;
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
index 85da5690e0..6e3556c15b 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
@@ -21,6 +21,7 @@ package org.apache.asterix.external.input.filter;
import java.io.DataOutput;
import java.io.IOException;
+import
org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
import
org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -29,18 +30,17 @@ import
org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
import
org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
import
org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
-import org.apache.hyracks.util.string.UTF8StringUtil;
import org.apache.hyracks.util.string.UTF8StringWriter;
class ExternalFilterValueEvaluator implements IExternalFilterValueEvaluator {
private final ATypeTag typeTag;
private final ArrayBackedValueStorage value;
- private final UTF8StringWriter utf8StringWriter;
+ private final AStringSerializerDeserializer stringSerDer;
ExternalFilterValueEvaluator(ATypeTag typeTag) {
this.typeTag = typeTag;
value = new ArrayBackedValueStorage();
- utf8StringWriter = new UTF8StringWriter();
+ stringSerDer = new AStringSerializerDeserializer(new
UTF8StringWriter(), null);
}
@Override
@@ -58,7 +58,7 @@ class ExternalFilterValueEvaluator implements
IExternalFilterValueEvaluator {
result.set(value);
}
- private void writeValue(ATypeTag typeTag, String stringValue) throws
IOException {
+ private void writeValue(ATypeTag typeTag, String stringValue) throws
HyracksDataException {
DataOutput output = value.getDataOutput();
SerializerDeserializerUtil.serializeTag(typeTag, output);
switch (typeTag) {
@@ -70,7 +70,7 @@ class ExternalFilterValueEvaluator implements
IExternalFilterValueEvaluator {
case DOUBLE:
DoubleSerializerDeserializer.write(Double.parseDouble(stringValue), output);
case STRING:
- UTF8StringUtil.writeUTF8(stringValue, output,
utf8StringWriter);
+ stringSerDer.serialize(stringValue, output);
}
}
}
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
index 814b8cbc69..5ac4794c6c 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/AStringSerializerDeserializer.java
@@ -38,13 +38,22 @@ public class AStringSerializerDeserializer implements
ISerializerDeserializer<AS
private static final long serialVersionUID = 1L;
+ /**
+ * Using this singleton object may instantiate too many objects
+ *
+ * @deprecated use {{@link
#AStringSerializerDeserializer(UTF8StringWriter, UTF8StringReader)}}
+ */
+ @Deprecated
public static final AStringSerializerDeserializer INSTANCE = new
AStringSerializerDeserializer();
private final UTF8StringWriter utf8StringWriter;
private final UTF8StringReader utf8StringReader;
private AStringSerializerDeserializer() {
- this.utf8StringWriter = null;
- this.utf8StringReader = null;
+ this(null, null);
+ }
+
+ public AStringSerializerDeserializer(UTF8StringWriter utf8StringWriter) {
+ this(utf8StringWriter, null);
}
public AStringSerializerDeserializer(UTF8StringWriter utf8StringWriter,
UTF8StringReader utf8StringReader) {
@@ -63,8 +72,12 @@ public class AStringSerializerDeserializer implements
ISerializerDeserializer<AS
@Override
public void serialize(AString instance, DataOutput out) throws
HyracksDataException {
+ serialize(instance.getStringValue(), out);
+ }
+
+ public void serialize(String value, DataOutput out) throws
HyracksDataException {
try {
- UTF8StringUtil.writeUTF8(instance.getStringValue(), out,
utf8StringWriter);
+ UTF8StringUtil.writeUTF8(value, out, utf8StringWriter);
} catch (IOException e) {
throw HyracksDataException.create(e);
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
index 13fc6e2d9c..a1d22ac3fd 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
@@ -131,7 +131,7 @@ public class ExternalDatasetProjectionFiltrationInfo
implements IProjectionFiltr
}
if (filterExpression != null) {
- generator.writeStringField("filter-on",
filterExpression.toString());
+ generator.writeStringField("prefix-filter-on",
filterExpression.toString());
}
}