This is an automated email from the ASF dual-hosted git repository.
wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 889ad8f12c [ASTERIXDB-3237][RT] Add external filter evaluator
889ad8f12c is described below
commit 889ad8f12cc08067808cf852ac6dc4bc40d20432
Author: Wail Alkowaileet <[email protected]>
AuthorDate: Mon Aug 7 10:07:50 2023 -0700
[ASTERIXDB-3237][RT] Add external filter evaluator
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
This patch adds the runtime part for dynamic prefixes
for external datasets.
Change-Id: Icdab84a15fe9ea71676b426c8efdac8c80c7d742
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17700
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Wail Alkowaileet <[email protected]>
Reviewed-by: Hussain Towaileb <[email protected]>
---
.../processor/PushdownProcessorsExecutor.java | 22 ++++---
.../common/external/IExternalFilterEvaluator.java | 6 +-
.../external/IExternalFilterEvaluatorFactory.java | 4 +-
.../input/filter/ExternalFilterEvaluator.java | 59 +++++++++++++++++
.../filter/ExternalFilterEvaluatorFactory.java | 49 ++++++++++++++
.../input/filter/ExternalFilterValueEvaluator.java | 76 ++++++++++++++++++++++
.../ExternalFilterValueEvaluatorFactory.java | 43 ++++++++++++
.../input/filter/FilterEvaluatorContext.java | 47 +++++++++++++
.../filter/IExternalFilterValueEvaluator.java} | 15 ++---
.../filter/NoOpExternalFilterValueEvaluator.java} | 22 ++++---
.../AbstractExternalInputStreamFactory.java | 2 +-
.../record/reader/aws/AwsS3InputStreamFactory.java | 4 +-
.../azure/blob/AzureBlobInputStreamFactory.java | 2 +-
.../datalake/AzureDataLakeInputStreamFactory.java | 2 +-
.../record/reader/gcs/GCSInputStreamFactory.java | 2 +-
.../asterix/external/util/ExternalDataPrefix.java | 6 +-
.../asterix/external/util/ExternalDataUtils.java | 9 +--
.../apache/asterix/external/util/HDFSUtils.java | 6 +-
.../metadata/declared/DatasetDataSource.java | 15 +++--
.../apache/asterix/metadata/utils/IndexUtil.java | 20 ++++++
.../utils/filter/ExternalFilterBuilder.java | 61 +++++++++++++++++
.../serde/SerializerDeserializerUtil.java | 5 +-
.../ColumnDatasetProjectionFiltrationInfo.java | 38 +++--------
...> ExternalDatasetProjectionFiltrationInfo.java} | 65 +++++++++++++-----
.../runtime/evaluators/EvaluatorContext.java | 2 +-
25 files changed, 478 insertions(+), 104 deletions(-)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
index 110cd298be..4106337336 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/PushdownProcessorsExecutor.java
@@ -32,10 +32,9 @@ import
org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
import
org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
import
org.apache.asterix.optimizer.rules.pushdown.visitor.ExpectedSchemaNodeToIATypeTranslatorVisitor;
import
org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
-import org.apache.asterix.runtime.projection.ExternalDatasetProjectionInfo;
+import
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import
org.apache.hyracks.algebricks.core.algebra.base.DefaultProjectionFiltrationInfo;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import
org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
@@ -101,16 +100,19 @@ public class PushdownProcessorsExecutor {
private IProjectionFiltrationInfo
createExternalDatasetProjectionInfo(ScanDefineDescriptor scanDefineDescriptor,
IOptimizationContext context) {
- if
(!context.getPhysicalOptimizationConfig().isExternalFieldPushdown()) {
- return DefaultProjectionFiltrationInfo.INSTANCE;
+ Map<String, FunctionCallInformation> pathLocations =
scanDefineDescriptor.getPathLocations();
+ ARecordType recordRequestedType = ALL_FIELDS_TYPE;
+ Dataset dataset = scanDefineDescriptor.getDataset();
+ if (context.getPhysicalOptimizationConfig().isExternalFieldPushdown()
+ && DatasetUtil.isFieldAccessPushdownSupported(dataset)) {
+ ExpectedSchemaNodeToIATypeTranslatorVisitor converter =
+ new
ExpectedSchemaNodeToIATypeTranslatorVisitor(pathLocations);
+ recordRequestedType = (ARecordType)
scanDefineDescriptor.getRecordNode().accept(converter,
+ scanDefineDescriptor.getDataset().getDatasetName());
}
- Map<String, FunctionCallInformation> pathLocations =
scanDefineDescriptor.getPathLocations();
- ExpectedSchemaNodeToIATypeTranslatorVisitor converter =
- new ExpectedSchemaNodeToIATypeTranslatorVisitor(pathLocations);
- ARecordType recordRequestedType = (ARecordType)
scanDefineDescriptor.getRecordNode().accept(converter,
- scanDefineDescriptor.getDataset().getDatasetName());
- return new ExternalDatasetProjectionInfo(recordRequestedType,
pathLocations);
+ return new
ExternalDatasetProjectionFiltrationInfo(recordRequestedType, pathLocations,
+ scanDefineDescriptor.getFilterPaths(),
scanDefineDescriptor.getFilterExpression());
}
private void setInfoToDataScan(AbstractScanOperator scanOp,
IProjectionFiltrationInfo info) {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
index a169ecb2be..22cd20a8bb 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
@@ -18,14 +18,14 @@
*/
package org.apache.asterix.common.external;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface IExternalFilterEvaluator {
boolean isEmpty();
boolean isComputedFieldUsed(int index);
- void setValue(int index, String stringValue) throws AlgebricksException;
+ void setValue(int index, String stringValue) throws HyracksDataException;
- boolean evaluate() throws AlgebricksException;
+ boolean evaluate() throws HyracksDataException;
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluatorFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluatorFactory.java
index c29e554f1f..38a38a69ae 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluatorFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluatorFactory.java
@@ -20,11 +20,11 @@ package org.apache.asterix.common.external;
import java.io.Serializable;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IWarningCollector;
public interface IExternalFilterEvaluatorFactory extends Serializable {
IExternalFilterEvaluator create(IServiceContext serviceContext,
IWarningCollector warningCollector)
- throws AlgebricksException;
+ throws HyracksDataException;
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterEvaluator.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterEvaluator.java
new file mode 100644
index 0000000000..49b6ae453e
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterEvaluator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.formats.nontagged.BinaryBooleanInspector;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.primitive.VoidPointable;
+
+class ExternalFilterEvaluator implements IExternalFilterEvaluator {
+ private final IScalarEvaluator evaluator;
+ private final IExternalFilterValueEvaluator[] valueEvaluators;
+ private final VoidPointable booleanResult;
+
+ ExternalFilterEvaluator(IScalarEvaluator evaluator,
IExternalFilterValueEvaluator[] valueEvaluators) {
+ this.evaluator = evaluator;
+ this.valueEvaluators = valueEvaluators;
+ booleanResult = new VoidPointable();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return valueEvaluators.length == 0;
+ }
+
+ @Override
+ public boolean isComputedFieldUsed(int index) {
+ return valueEvaluators[index] !=
NoOpExternalFilterValueEvaluator.INSTANCE;
+ }
+
+ @Override
+ public void setValue(int index, String stringValue) throws
HyracksDataException {
+ valueEvaluators[index].setValue(stringValue);
+ }
+
+ @Override
+ public boolean evaluate() throws HyracksDataException {
+ evaluator.evaluate(null, booleanResult);
+ return
BinaryBooleanInspector.getBooleanValue(booleanResult.getByteArray(),
booleanResult.getStartOffset(),
+ booleanResult.getLength());
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterEvaluatorFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterEvaluatorFactory.java
new file mode 100644
index 0000000000..0790700006
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterEvaluatorFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import java.util.Arrays;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class ExternalFilterEvaluatorFactory implements
IExternalFilterEvaluatorFactory {
+ private static final long serialVersionUID = -309935877927008746L;
+ private final int numberOfComputedFields;
+ private final IScalarEvaluatorFactory filterEvalFactory;
+
+ public ExternalFilterEvaluatorFactory(int numberOfComputedFields,
IScalarEvaluatorFactory filterEvalFactory) {
+ this.numberOfComputedFields = numberOfComputedFields;
+ this.filterEvalFactory = filterEvalFactory;
+ }
+
+ @Override
+ public IExternalFilterEvaluator create(IServiceContext serviceContext,
IWarningCollector warningCollector)
+ throws HyracksDataException {
+ IExternalFilterValueEvaluator[] valueEvaluators = new
IExternalFilterValueEvaluator[numberOfComputedFields];
+ Arrays.fill(valueEvaluators,
NoOpExternalFilterValueEvaluator.INSTANCE);
+ FilterEvaluatorContext filterContext =
+ new FilterEvaluatorContext(serviceContext, warningCollector,
valueEvaluators);
+ return new
ExternalFilterEvaluator(filterEvalFactory.createScalarEvaluator(filterContext),
valueEvaluators);
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
new file mode 100644
index 0000000000..85da5690e0
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+import
org.apache.asterix.dataflow.data.nontagged.serde.SerializerDeserializerUtil;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
+import
org.apache.hyracks.dataflow.common.data.marshalling.DoubleSerializerDeserializer;
+import
org.apache.hyracks.dataflow.common.data.marshalling.Integer64SerializerDeserializer;
+import org.apache.hyracks.util.string.UTF8StringUtil;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+class ExternalFilterValueEvaluator implements IExternalFilterValueEvaluator {
+ private final ATypeTag typeTag;
+ private final ArrayBackedValueStorage value;
+ private final UTF8StringWriter utf8StringWriter;
+
+ ExternalFilterValueEvaluator(ATypeTag typeTag) {
+ this.typeTag = typeTag;
+ value = new ArrayBackedValueStorage();
+ utf8StringWriter = new UTF8StringWriter();
+ }
+
+ @Override
+ public void setValue(String stringValue) throws HyracksDataException {
+ value.reset();
+ try {
+ writeValue(typeTag, stringValue);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws
HyracksDataException {
+ result.set(value);
+ }
+
+ private void writeValue(ATypeTag typeTag, String stringValue) throws
IOException {
+ DataOutput output = value.getDataOutput();
+ SerializerDeserializerUtil.serializeTag(typeTag, output);
+ switch (typeTag) {
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+
Integer64SerializerDeserializer.write(Long.parseLong(stringValue), output);
+ case DOUBLE:
+
DoubleSerializerDeserializer.write(Double.parseDouble(stringValue), output);
+ case STRING:
+ UTF8StringUtil.writeUTF8(stringValue, output,
utf8StringWriter);
+ }
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluatorFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluatorFactory.java
new file mode 100644
index 0000000000..250f0ef74c
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/ExternalFilterValueEvaluatorFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class ExternalFilterValueEvaluatorFactory implements
IScalarEvaluatorFactory {
+ private static final long serialVersionUID = 6651915525106158386L;
+ private final int index;
+ private final ATypeTag typeTag;
+
+ public ExternalFilterValueEvaluatorFactory(int index, IAType type) {
+ this.index = index;
+ this.typeTag = type.getTypeTag();
+ }
+
+ @Override
+ public IScalarEvaluator createScalarEvaluator(IEvaluatorContext ctx)
throws HyracksDataException {
+ FilterEvaluatorContext filterContext = (FilterEvaluatorContext) ctx;
+ return filterContext.createEvaluator(index, typeTag);
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/FilterEvaluatorContext.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/FilterEvaluatorContext.java
new file mode 100644
index 0000000000..db78caeebb
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/FilterEvaluatorContext.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import java.util.Arrays;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.algebricks.runtime.evaluators.EvaluatorContext;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+class FilterEvaluatorContext extends EvaluatorContext {
+ private final IExternalFilterValueEvaluator[] evaluators;
+
+ public FilterEvaluatorContext(IServiceContext serviceContext,
IWarningCollector warningCollector,
+ IExternalFilterValueEvaluator[] evaluators) {
+ super(serviceContext, warningCollector);
+ this.evaluators = evaluators;
+ Arrays.fill(evaluators, NoOpExternalFilterValueEvaluator.INSTANCE);
+ }
+
+ public IScalarEvaluator createEvaluator(int index, ATypeTag typeTag) {
+ IExternalFilterValueEvaluator evaluator = evaluators[index];
+ if (evaluator == NoOpExternalFilterValueEvaluator.INSTANCE) {
+ evaluator = new ExternalFilterValueEvaluator(typeTag);
+ evaluators[index] = evaluator;
+ }
+ return evaluator;
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/IExternalFilterValueEvaluator.java
similarity index 69%
copy from
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
copy to
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/IExternalFilterValueEvaluator.java
index a169ecb2be..1ed57c10f5 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/IExternalFilterValueEvaluator.java
@@ -16,16 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.external;
+package org.apache.asterix.external.input.filter;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
-public interface IExternalFilterEvaluator {
- boolean isEmpty();
-
- boolean isComputedFieldUsed(int index);
-
- void setValue(int index, String stringValue) throws AlgebricksException;
-
- boolean evaluate() throws AlgebricksException;
+interface IExternalFilterValueEvaluator extends IScalarEvaluator {
+ void setValue(String stringValue) throws HyracksDataException;
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpExternalFilterValueEvaluator.java
similarity index 54%
copy from
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
copy to
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpExternalFilterValueEvaluator.java
index a169ecb2be..14944d414a 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/external/IExternalFilterEvaluator.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpExternalFilterValueEvaluator.java
@@ -16,16 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.common.external;
+package org.apache.asterix.external.input.filter;
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
-public interface IExternalFilterEvaluator {
- boolean isEmpty();
+class NoOpExternalFilterValueEvaluator implements
IExternalFilterValueEvaluator {
+ public static final IExternalFilterValueEvaluator INSTANCE = new
NoOpExternalFilterValueEvaluator();
- boolean isComputedFieldUsed(int index);
+ @Override
+ public void setValue(String stringValue) throws HyracksDataException {
+ // NoOp
+ }
- void setValue(int index, String stringValue) throws AlgebricksException;
-
- boolean evaluate() throws AlgebricksException;
+ @Override
+ public void evaluate(IFrameTupleReference tuple, IPointable result) throws
HyracksDataException {
+ // NoOp
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
index ff6b03eb8b..37fd910e6d 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/abstracts/AbstractExternalInputStreamFactory.java
@@ -64,7 +64,7 @@ public abstract class AbstractExternalInputStreamFactory
implements IInputStream
@Override
public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
- IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException {
this.configuration = configuration;
this.partitionConstraint =
((ICcApplicationContext)
ctx.getApplicationContext()).getClusterStateManager().getClusterLocations();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
index cfa1e46874..881c5a3985 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/AwsS3InputStreamFactory.java
@@ -50,7 +50,7 @@ public class AwsS3InputStreamFactory extends
AbstractExternalInputStreamFactory
@Override
public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
- IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException {
super.configure(ctx, configuration, warningCollector,
filterEvaluatorFactory);
// Ensure the validity of include/exclude
@@ -72,7 +72,7 @@ public class AwsS3InputStreamFactory extends
AbstractExternalInputStreamFactory
}
private List<S3Object> filterPrefixes(ExternalDataPrefix prefix,
List<S3Object> filesOnly,
- IExternalFilterEvaluator evaluator) throws AlgebricksException {
+ IExternalFilterEvaluator evaluator) throws HyracksDataException {
// if no computed fields or empty files list, return the original list
if (filesOnly.isEmpty() || !prefix.hasComputedFields()) {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
index b18b655211..5cd396e4bd 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/blob/AzureBlobInputStreamFactory.java
@@ -54,7 +54,7 @@ public class AzureBlobInputStreamFactory extends
AbstractExternalInputStreamFact
@Override
public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
- IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException {
super.configure(ctx, configuration, warningCollector,
filterEvaluatorFactory);
IApplicationContext appCtx = (IApplicationContext)
ctx.getApplicationContext();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
index 35c3648b72..bd2535d995 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/azure/datalake/AzureDataLakeInputStreamFactory.java
@@ -54,7 +54,7 @@ public class AzureDataLakeInputStreamFactory extends
AbstractExternalInputStream
@Override
public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
- IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException {
super.configure(ctx, configuration, warningCollector,
filterEvaluatorFactory);
IApplicationContext appCtx = (IApplicationContext)
ctx.getApplicationContext();
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
index 433fecdcab..165f3403a9 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/GCSInputStreamFactory.java
@@ -49,7 +49,7 @@ public class GCSInputStreamFactory extends
AbstractExternalInputStreamFactory {
@Override
public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
- IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException {
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException {
super.configure(ctx, configuration, warningCollector,
filterEvaluatorFactory);
// Ensure the validity of include/exclude
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
index 97bf7767fa..899a179f63 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataPrefix.java
@@ -44,6 +44,7 @@ import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
public class ExternalDataPrefix {
@@ -208,12 +209,11 @@ public class ExternalDataPrefix {
/**
* Evaluates whether the provided key satisfies the conditions of the
evaluator or not
*
- * @param key ke
+ * @param key ke
* @param evaluator evaluator
- *
* @return true if key satisfies the evaluator conditions, false otherwise
*/
- public boolean evaluate(String key, IExternalFilterEvaluator evaluator)
throws AlgebricksException {
+ public boolean evaluate(String key, IExternalFilterEvaluator evaluator)
throws HyracksDataException {
List<String> keySegments = extractPrefixSegments(key);
// segments of object key have to be larger than segments of the prefix
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 9d36b4aa4e..e60190bcf4 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -75,7 +75,7 @@ import org.apache.asterix.om.types.EnumDeserializer;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.TypeTagUtil;
import org.apache.asterix.runtime.evaluators.common.NumberUtils;
-import org.apache.asterix.runtime.projection.ExternalDatasetProjectionInfo;
+import
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -900,7 +900,7 @@ public class ExternalDataUtils {
||
ExternalDataConstants.FORMAT_PARQUET.equals(properties.get(ExternalDataConstants.KEY_FORMAT));
}
- public static void
setExternalDataProjectionInfo(ExternalDatasetProjectionInfo projectionInfo,
+ public static void
setExternalDataProjectionInfo(ExternalDatasetProjectionFiltrationInfo
projectionInfo,
Map<String, String> properties) throws IOException {
properties.put(ExternalDataConstants.KEY_REQUESTED_FIELDS,
serializeExpectedTypeToString(projectionInfo.getProjectedType()));
@@ -922,7 +922,7 @@ public class ExternalDataUtils {
ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
Base64.Encoder encoder = Base64.getEncoder();
- ExternalDatasetProjectionInfo.writeTypeField(expectedType,
dataOutputStream);
+ ExternalDatasetProjectionFiltrationInfo.writeTypeField(expectedType,
dataOutputStream);
return encoder.encodeToString(byteArrayOutputStream.toByteArray());
}
@@ -938,7 +938,8 @@ public class ExternalDataUtils {
ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
DataOutputStream dataOutputStream = new
DataOutputStream(byteArrayOutputStream);
Base64.Encoder encoder = Base64.getEncoder();
-
ExternalDatasetProjectionInfo.writeFunctionCallInformationMapField(functionCallInfoMap,
dataOutputStream);
+
ExternalDatasetProjectionFiltrationInfo.writeFunctionCallInformationMapField(functionCallInfoMap,
+ dataOutputStream);
return encoder.encodeToString(byteArrayOutputStream.toByteArray());
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 2618ae6461..a11a8dd023 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -43,7 +43,7 @@ import
org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetReadS
import org.apache.asterix.external.input.stream.HDFSInputStream;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.projection.ExternalDatasetProjectionInfo;
+import
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -271,7 +271,7 @@ public class HDFSUtils {
Base64.Decoder decoder = Base64.getDecoder();
byte[] typeBytes = decoder.decode(encoded);
DataInputStream dataInputStream = new DataInputStream(new
ByteArrayInputStream(typeBytes));
- return ExternalDatasetProjectionInfo.createTypeField(dataInputStream);
+ return
ExternalDatasetProjectionFiltrationInfo.createTypeField(dataInputStream);
}
public static void setFunctionCallInformationMap(Map<String,
FunctionCallInformation> funcCallInfoMap,
@@ -287,7 +287,7 @@ public class HDFSUtils {
Base64.Decoder decoder = Base64.getDecoder();
byte[] functionCallInfoMapBytes = decoder.decode(encoded);
DataInputStream dataInputStream = new DataInputStream(new
ByteArrayInputStream(functionCallInfoMapBytes));
- return
ExternalDatasetProjectionInfo.createFunctionCallInformationMap(dataInputStream);
+ return
ExternalDatasetProjectionFiltrationInfo.createFunctionCallInformationMap(dataInputStream);
}
return null;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index 25eab7fe37..f4e15de534 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -28,7 +28,7 @@ import java.util.List;
import java.util.Map;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.util.ExternalDataUtils;
@@ -42,7 +42,7 @@ import org.apache.asterix.metadata.utils.IndexUtil;
import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.runtime.projection.ExternalDatasetProjectionInfo;
+import
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -139,9 +139,11 @@ public class DatasetDataSource extends DataSource {
addExternalProjectionInfo(projectionFiltrationInfo,
edd.getProperties());
properties = addSubPath(externalDataSource.getProperties(),
properties);
properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE,
String.valueOf(externalScanBufferSize));
- ITypedAdapterFactory adapterFactory =
metadataProvider.getConfiguredAdapterFactory(externalDataset,
- edd.getAdapter(), properties, (ARecordType) itemType,
context.getWarningCollector(),
- NoOpExternalFilterEvaluatorFactory.INSTANCE);
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory =
IndexUtil
+ .createExternalFilterEvaluatorFactory(context,
typeEnv, projectionFiltrationInfo, properties);
+ ITypedAdapterFactory adapterFactory =
+
metadataProvider.getConfiguredAdapterFactory(externalDataset, edd.getAdapter(),
properties,
+ (ARecordType) itemType,
context.getWarningCollector(), filterEvaluatorFactory);
return metadataProvider.getExternalDatasetScanRuntime(jobSpec,
itemType, adapterFactory,
tupleFilterFactory, outputLimit);
case INTERNAL:
@@ -182,7 +184,8 @@ public class DatasetDataSource extends DataSource {
//properties could be cached and reused, so we make a copy per
query
propertiesCopy = new HashMap<>(properties);
try {
- ExternalDatasetProjectionInfo externalProjectionInfo =
(ExternalDatasetProjectionInfo) projectionInfo;
+ ExternalDatasetProjectionFiltrationInfo externalProjectionInfo
=
+ (ExternalDatasetProjectionFiltrationInfo)
projectionInfo;
ExternalDataUtils.setExternalDataProjectionInfo(externalProjectionInfo,
propertiesCopy);
} catch (IOException e) {
throw new IllegalStateException(e);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 6a673135bc..b0d503c640 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -36,8 +36,11 @@ import
org.apache.asterix.column.operation.query.QueryColumnTupleProjectorFactor
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.indexing.ExternalFile;
+import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.metadata.dataset.DatasetFormatInfo;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -45,12 +48,14 @@ import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.utils.filter.ColumnFilterBuilder;
import org.apache.asterix.metadata.utils.filter.ColumnRangeFilterBuilder;
+import org.apache.asterix.metadata.utils.filter.ExternalFilterBuilder;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.base.IAObject;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.job.listener.JobEventListenerFactory;
import
org.apache.asterix.runtime.projection.ColumnDatasetProjectionFiltrationInfo;
+import
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -325,4 +330,19 @@ public class IndexUtil {
datasetRequestedType);
}
+ public static IExternalFilterEvaluatorFactory
createExternalFilterEvaluatorFactory(JobGenContext context,
+ IVariableTypeEnvironment typeEnv, IProjectionFiltrationInfo
projectionFiltrationInfo,
+ Map<String, String> properties) throws AlgebricksException {
+ if (projectionFiltrationInfo ==
DefaultProjectionFiltrationInfo.INSTANCE) {
+ return NoOpExternalFilterEvaluatorFactory.INSTANCE;
+ }
+
+ ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+ ExternalDatasetProjectionFiltrationInfo pfi =
+ (ExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo;
+ ExternalFilterBuilder build = new ExternalFilterBuilder(pfi, context,
typeEnv, prefix);
+
+ return build.build();
+ }
+
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ExternalFilterBuilder.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ExternalFilterBuilder.java
new file mode 100644
index 0000000000..4165e3286b
--- /dev/null
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/filter/ExternalFilterBuilder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.metadata.utils.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.common.external.NoOpExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.input.filter.ExternalFilterEvaluatorFactory;
+import
org.apache.asterix.external.input.filter.ExternalFilterValueEvaluatorFactory;
+import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.om.types.ARecordType;
+import
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+
+public class ExternalFilterBuilder extends AbstractFilterBuilder {
+ private final ExternalDataPrefix prefix;
+
+ public ExternalFilterBuilder(ExternalDatasetProjectionFiltrationInfo
projectionFiltrationInfo,
+ JobGenContext context, IVariableTypeEnvironment typeEnv,
ExternalDataPrefix prefix) {
+ super(projectionFiltrationInfo.getFilterPaths(),
projectionFiltrationInfo.getFilterExpression(), context,
+ typeEnv);
+ this.prefix = prefix;
+ }
+
+ public IExternalFilterEvaluatorFactory build() throws AlgebricksException {
+ if (filterExpression == null || filterPaths.isEmpty()) {
+ return NoOpExternalFilterEvaluatorFactory.INSTANCE;
+ }
+ IScalarEvaluatorFactory evalFactory =
createEvaluator(filterExpression);
+ if (evalFactory == null) {
+ return NoOpExternalFilterEvaluatorFactory.INSTANCE;
+ }
+ return new
ExternalFilterEvaluatorFactory(prefix.getComputedFieldNames().size(),
evalFactory);
+ }
+
+ @Override
+ protected IScalarEvaluatorFactory createValueAccessor(ILogicalExpression
expression) {
+ ARecordType path = filterPaths.get(expression);
+ int index = prefix.getPaths().indexOf(path);
+ return new ExternalFilterValueEvaluatorFactory(index,
prefix.getComputedFieldTypes().get(index));
+ }
+}
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/SerializerDeserializerUtil.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/SerializerDeserializerUtil.java
index 664eb28729..f2809b9d5a 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/SerializerDeserializerUtil.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/SerializerDeserializerUtil.java
@@ -73,7 +73,10 @@ public final class SerializerDeserializerUtil {
public static void serializeTag(IAObject instance, DataOutput out) throws
HyracksDataException {
IAType t = instance.getType();
- ATypeTag tag = t.getTypeTag();
+ serializeTag(t.getTypeTag(), out);
+ }
+
+ public static void serializeTag(ATypeTag tag, DataOutput out) throws
HyracksDataException {
try {
out.writeByte(tag.serialize());
} catch (IOException e) {
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ColumnDatasetProjectionFiltrationInfo.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ColumnDatasetProjectionFiltrationInfo.java
index 0a5ef7a432..4227c31655 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ColumnDatasetProjectionFiltrationInfo.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ColumnDatasetProjectionFiltrationInfo.java
@@ -22,7 +22,6 @@ import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELD
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -32,30 +31,21 @@ import
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksStringBu
import com.fasterxml.jackson.core.JsonGenerator;
-public class ColumnDatasetProjectionFiltrationInfo extends
ExternalDatasetProjectionInfo {
+public class ColumnDatasetProjectionFiltrationInfo extends
ExternalDatasetProjectionFiltrationInfo {
private final ARecordType metaProjectedType;
- private final ILogicalExpression filterExpression;
- private final Map<ILogicalExpression, ARecordType> filterPaths;
private final ILogicalExpression rangeFilterExpression;
public ColumnDatasetProjectionFiltrationInfo(ARecordType
recordRequestedType, ARecordType metaProjectedType,
Map<String, FunctionCallInformation> sourceInformationMap,
Map<ILogicalExpression, ARecordType> filterPaths,
ILogicalExpression filterExpression, ILogicalExpression
rangeFilterExpression) {
- super(recordRequestedType, sourceInformationMap);
+ super(recordRequestedType, sourceInformationMap, filterPaths,
filterExpression);
this.metaProjectedType = metaProjectedType;
-
- this.filterExpression = filterExpression;
this.rangeFilterExpression = rangeFilterExpression;
- this.filterPaths = filterPaths;
}
private
ColumnDatasetProjectionFiltrationInfo(ColumnDatasetProjectionFiltrationInfo
other) {
- super(other.projectedType, other.functionCallInfoMap);
+ super(other.projectedType, other.functionCallInfoMap,
other.filterPaths, other.filterExpression);
metaProjectedType = other.metaProjectedType;
-
- filterExpression = other.filterExpression;
- filterPaths = new HashMap<>(other.filterPaths);
-
rangeFilterExpression = other.rangeFilterExpression;
}
@@ -96,15 +86,13 @@ public class ColumnDatasetProjectionFiltrationInfo extends
ExternalDatasetProjec
@Override
public void print(JsonGenerator generator) throws IOException {
- if (projectedType == ALL_FIELDS_TYPE) {
- return;
- }
-
StringBuilder builder = new StringBuilder();
- if (projectedType == EMPTY_TYPE) {
- generator.writeStringField("project", projectedType.getTypeName());
- } else {
- generator.writeStringField("project",
getOnelinerSchema(projectedType, builder));
+ if (projectedType != ALL_FIELDS_TYPE) {
+ if (projectedType == EMPTY_TYPE) {
+ generator.writeStringField("project",
projectedType.getTypeName());
+ } else {
+ generator.writeStringField("project",
getOnelinerSchema(projectedType, builder));
+ }
}
if (metaProjectedType != null && metaProjectedType != ALL_FIELDS_TYPE)
{
@@ -124,14 +112,6 @@ public class ColumnDatasetProjectionFiltrationInfo extends
ExternalDatasetProjec
return metaProjectedType;
}
- public ILogicalExpression getFilterExpression() {
- return filterExpression;
- }
-
- public Map<ILogicalExpression, ARecordType> getFilterPaths() {
- return filterPaths;
- }
-
public ILogicalExpression getRangeFilterExpression() {
return rangeFilterExpression;
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionInfo.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
similarity index 72%
rename from
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionInfo.java
rename to
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
index 55919b3456..13fc6e2d9c 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionInfo.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/ExternalDatasetProjectionFiltrationInfo.java
@@ -31,22 +31,29 @@ import java.util.Objects;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.visitor.SimpleStringBuilderForIATypeVisitor;
import org.apache.commons.lang3.SerializationUtils;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import
org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
import
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksStringBuilderWriter;
import com.fasterxml.jackson.core.JsonGenerator;
-public class ExternalDatasetProjectionInfo implements
IProjectionFiltrationInfo {
+public class ExternalDatasetProjectionFiltrationInfo implements
IProjectionFiltrationInfo {
protected final ARecordType projectedType;
+
+ protected final ILogicalExpression filterExpression;
+ protected final Map<ILogicalExpression, ARecordType> filterPaths;
protected final Map<String, FunctionCallInformation> functionCallInfoMap;
- public ExternalDatasetProjectionInfo(ARecordType projectedType,
- Map<String, FunctionCallInformation> sourceInformationMap) {
+ public ExternalDatasetProjectionFiltrationInfo(ARecordType projectedType,
+ Map<String, FunctionCallInformation> sourceInformationMap,
Map<ILogicalExpression, ARecordType> filterPaths,
+ ILogicalExpression filterExpression) {
this.projectedType = projectedType;
this.functionCallInfoMap = sourceInformationMap;
+ this.filterExpression = filterExpression;
+ this.filterPaths = filterPaths;
}
- private ExternalDatasetProjectionInfo(ExternalDatasetProjectionInfo other)
{
+ private
ExternalDatasetProjectionFiltrationInfo(ExternalDatasetProjectionFiltrationInfo
other) {
if (other.projectedType == ALL_FIELDS_TYPE) {
projectedType = ALL_FIELDS_TYPE;
} else if (other.projectedType == EMPTY_TYPE) {
@@ -55,11 +62,14 @@ public class ExternalDatasetProjectionInfo implements
IProjectionFiltrationInfo
projectedType = other.projectedType.deepCopy(other.projectedType);
}
functionCallInfoMap = new HashMap<>(other.functionCallInfoMap);
+
+ filterExpression = other.filterExpression;
+ filterPaths = new HashMap<>(other.filterPaths);
}
@Override
- public ExternalDatasetProjectionInfo createCopy() {
- return new ExternalDatasetProjectionInfo(this);
+ public ExternalDatasetProjectionFiltrationInfo createCopy() {
+ return new ExternalDatasetProjectionFiltrationInfo(this);
}
public ARecordType getProjectedType() {
@@ -70,6 +80,14 @@ public class ExternalDatasetProjectionInfo implements
IProjectionFiltrationInfo
return functionCallInfoMap;
}
+ public ILogicalExpression getFilterExpression() {
+ return filterExpression;
+ }
+
+ public Map<ILogicalExpression, ARecordType> getFilterPaths() {
+ return filterPaths;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -78,32 +96,43 @@ public class ExternalDatasetProjectionInfo implements
IProjectionFiltrationInfo
if (o == null || getClass() != o.getClass()) {
return false;
}
- ExternalDatasetProjectionInfo otherInfo =
(ExternalDatasetProjectionInfo) o;
+ ExternalDatasetProjectionFiltrationInfo otherInfo =
(ExternalDatasetProjectionFiltrationInfo) o;
return projectedType.deepEqual(otherInfo.projectedType)
&& Objects.equals(functionCallInfoMap,
otherInfo.functionCallInfoMap);
}
@Override
public void print(AlgebricksStringBuilderWriter writer) {
- if (projectedType == ALL_FIELDS_TYPE) {
- return;
+ if (projectedType != ALL_FIELDS_TYPE) {
+ writer.append(" project (");
+ if (projectedType == EMPTY_TYPE) {
+ writer.append(projectedType.getTypeName());
+ } else {
+ writer.append(getOnelinerSchema(projectedType, new
StringBuilder()));
+ }
+ writer.append(')');
}
- writer.append(" project (");
- if (projectedType == EMPTY_TYPE) {
- writer.append(projectedType.getTypeName());
- } else {
- writer.append(getOnelinerSchema(projectedType, new
StringBuilder()));
+ if (filterExpression != null) {
+ writer.append(" prefix-filter on: ");
+ writer.append(filterExpression.toString());
}
- writer.append(')');
}
@Override
public void print(JsonGenerator generator) throws IOException {
- if (projectedType == ALL_FIELDS_TYPE) {
- return;
+ StringBuilder builder = new StringBuilder();
+ if (projectedType != ALL_FIELDS_TYPE) {
+ if (projectedType == EMPTY_TYPE) {
+ generator.writeStringField("project",
projectedType.getTypeName());
+ } else {
+ generator.writeStringField("project",
getOnelinerSchema(projectedType, builder));
+ }
+ }
+
+ if (filterExpression != null) {
+ generator.writeStringField("filter-on",
filterExpression.toString());
}
- generator.writeStringField("project", getOnelinerSchema(projectedType,
new StringBuilder()));
}
protected String getOnelinerSchema(ARecordType type, StringBuilder
builder) {
diff --git
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/EvaluatorContext.java
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/EvaluatorContext.java
index 901e7886cc..c74568b7a2 100644
---
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/EvaluatorContext.java
+++
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/evaluators/EvaluatorContext.java
@@ -26,7 +26,7 @@ import org.apache.hyracks.api.application.IServiceContext;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.IWarningCollector;
-public final class EvaluatorContext implements IEvaluatorContext {
+public class EvaluatorContext implements IEvaluatorContext {
private final IServiceContext serviceContext;