This is an automated email from the ASF dual-hosted git repository.

wyk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new ca2a2e0  [ASTERIXDB-2933][COMP][EXT] Pushdowns Part2: Pushdown Rule
ca2a2e0 is described below

commit ca2a2e034b2b9349f5c3664432cd6ced36c1903a
Author: Wail Alkowaileet <wael....@gmail.com>
AuthorDate: Tue Aug 24 16:25:45 2021 -0700

    [ASTERIXDB-2933][COMP][EXT] Pushdowns Part2: Pushdown Rule
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    Add a rule that computes the expected schema and sets it
    to the DataSourceScanOperator. The computed schema is then
    passed to the Parquet Reader to 'clip' the Parquet file's
    schema. The resulting clipped schema is then used to tell
    the reader what column should be read.
    
    Interface changes:
    - Change IProjectionInfo<List<T>> to IProjectionInfo<T>
    
    Change-Id: If0c0d05473be72df6f08dfcbab2d25c36c71368e
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12964
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Wael Alkowaileet <wael....@gmail.com>
    Reviewed-by: Dmitry Lychagin <dmitry.lycha...@couchbase.com>
---
 .../PushFieldAccessToExternalDataScanRule.java     |  26 +-
 .../PushValueAccessToExternalDataScanRule.java     | 123 ++++++
 .../rules/pushdown/ExpectedSchemaBuilder.java      | 218 ++++++++++
 ...xpectedSchemaNodeToIATypeTranslatorVisitor.java | 114 +++++
 .../ExpressionValueAccessPushdownVisitor.java      | 182 ++++++++
 .../OperatorValueAccessPushdownVisitor.java        | 477 +++++++++++++++++++++
 .../declared/ExternalDataProjectionInfo.java       |  10 +-
 .../runtime/projection/DataProjectionInfo.java     | 159 +++++++
 .../projection/FunctionCallInformation.java        | 112 +++++
 .../core/algebra/metadata/IProjectionInfo.java     |   6 +-
 .../logical/visitors/VariableUtilities.java        |   9 +
 11 files changed, 1407 insertions(+), 29 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessToExternalDataScanRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessToExternalDataScanRule.java
index f25e058..e0bd22e 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessToExternalDataScanRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushFieldAccessToExternalDataScanRule.java
@@ -52,30 +52,10 @@ import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperat
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 /**
- * Pushes field-access expression to the external dataset scan to minimize the 
size of the record.
- * This rule currently does not remove the field access expression in ASSIGN 
and SCAN operators. Instead,
- * it adds the requested field names to external dataset details to produce 
records that only contain the requested
- * fields. Thus, no changes would occur in the plan's structure after firing 
this rule.
- * Example:
- * Before plan:
- * ...
- * select (and(gt($$00, 20), gt($$r.getField("salary"), 70000)))
- * ...
- * assign [$$00] <- [$$r.getField("personalInfo").getField("age")]
- * ...
- * data-scan []<-[$$r] <- ParquetDataverse.ParquetDataset
- * <p>
- * After plan:
- * ...
- * select (and(gt($$00, 20), gt($$r.getField("salary"), 70000)))
- * ...
- * assign [$$00] <- [$$r.getField("personalInfo").getField("age")]
- * ...
- * data-scan []<-[$$r] <- ParquetDataverse.ParquetDataset project 
(personalInfo.age, salary)
- * <p>
- * The resulting record $$r will be {"personalInfo":{"age": *AGE*}, "salary": 
*SALARY*}
- * and other fields will not be included in $$r.
+ * TODO Use {@link PushValueAccessToExternalDataScanRule}
+ * Will be removed in a follow up change
  */
+@Deprecated
 public class PushFieldAccessToExternalDataScanRule implements 
IAlgebraicRewriteRule {
     //Datasets payload variables
     private final List<LogicalVariable> recordVariables = new ArrayList<>();
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessToExternalDataScanRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessToExternalDataScanRule.java
new file mode 100644
index 0000000..405e2bd
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessToExternalDataScanRule.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules;
+
+import java.util.Set;
+
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.optimizer.base.AsterixOptimizationContext;
+import 
org.apache.asterix.optimizer.rules.pushdown.OperatorValueAccessPushdownVisitor;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
+
+import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+
+/**
+ * Pushes value access expressions to the external dataset scan to minimize 
the size of the record.
+ * This rule currently does not remove the value access expression. Instead, 
it adds the requested field names to
+ * external dataset details to produce records that only contain the requested 
values. Thus, no changes would occur
+ * to the plan's structure after firing this rule.
+ * Example:
+ * Before plan:
+ * ...
+ * select (and(gt($$00, 20), gt($$r.getField("salary"), 70000)))
+ * ...
+ * assign [$$00] <- [$$r.getField("personalInfo").getField("age")]
+ * ...
+ * data-scan []<-[$$r] <- ParquetDataverse.ParquetDataset
+ * <p>
+ * After plan:
+ * ...
+ * select (and(gt($$00, 20), gt($$r.getField("salary"), 70000)))
+ * ...
+ * assign [$$00] <- [$$r.getField("personalInfo").getField("age")]
+ * ...
+ * data-scan []<-[$$r] <- ParquetDataverse.ParquetDataset project 
({personalInfo:{age: VALUE},salary:VALUE})
+ * <p>
+ * The resulting record $$r will be {"personalInfo":{"age": *AGE*}, "salary": 
*SALARY*}
+ * and other fields will not be included in $$r.
+ */
+public class PushValueAccessToExternalDataScanRule implements 
IAlgebraicRewriteRule {
+    //Initially, assume we need to run the rule
+    private boolean run = true;
+
+    @Override
+    public boolean rewritePre(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        if (!context.getPhysicalOptimizationConfig().isExternalFieldPushdown() 
|| !run) {
+            //The rule was fired, or value access pushdown is disabled
+            return false;
+        }
+
+        /*
+         * Only run the rewrite rule once and only if the plan contains a 
data-scan on an external dataset that
+         * support value access pushdown.
+         */
+        run = shouldRun(context);
+        if (run) {
+            run = false;
+            OperatorValueAccessPushdownVisitor visitor = new 
OperatorValueAccessPushdownVisitor(context);
+            opRef.getValue().accept(visitor, null);
+            visitor.finish();
+        }
+
+        //This rule does not do any actual structural changes to the plan
+        return false;
+    }
+
+    /**
+     * Check whether the plan contains an external dataset that supports 
pushdown
+     *
+     * @param context optimization context
+     * @return true if the plan contains such dataset, false otherwise
+     */
+    private boolean shouldRun(IOptimizationContext context) throws 
AlgebricksException {
+        ObjectSet<Int2ObjectMap.Entry<Set<DataSource>>> entrySet =
+                ((AsterixOptimizationContext) 
context).getDataSourceMap().int2ObjectEntrySet();
+        MetadataProvider metadataProvider = (MetadataProvider) 
context.getMetadataProvider();
+        for (Int2ObjectMap.Entry<Set<DataSource>> dataSources : entrySet) {
+            for (DataSource dataSource : dataSources.getValue()) {
+                if (supportPushdown(metadataProvider, dataSource)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    private boolean supportPushdown(MetadataProvider metadataProvider, 
DataSource dataSource)
+            throws AlgebricksException {
+        DataverseName dataverse = dataSource.getId().getDataverseName();
+        String datasetName = dataSource.getId().getDatasourceName();
+        Dataset dataset = metadataProvider.findDataset(dataverse, datasetName);
+
+        return dataset != null && dataset.getDatasetType() == 
DatasetConfig.DatasetType.EXTERNAL && ExternalDataUtils
+                .supportsPushdown(((ExternalDatasetDetails) 
dataset.getDatasetDetails()).getProperties());
+    }
+}
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java
new file mode 100644
index 0000000..b7632db
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaBuilder.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown;
+
+import static 
org.apache.asterix.optimizer.rules.pushdown.ExpressionValueAccessPushdownVisitor.ARRAY_FUNCTIONS;
+import static 
org.apache.asterix.optimizer.rules.pushdown.ExpressionValueAccessPushdownVisitor.SUPPORTED_FUNCTIONS;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.AbstractComplexExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.ArrayExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.ObjectExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.UnionExpectedSchemaNode;
+import org.apache.asterix.runtime.projection.DataProjectionInfo;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+
+/**
+ * This class takes a value access expression and produces an expected schema 
(given the expression).
+ * Example:
+ * - $$t.getField("hashtags").getItem(0)
+ * We expect:
+ * 1- $$t is OBJECT
+ * 2- the output type of getField("hashtags") is ARRAY
+ * 3- the output type of getItem(0) is ANY node
+ */
+class ExpectedSchemaBuilder {
+    //Registered Variables
+    private final Map<LogicalVariable, IExpectedSchemaNode> varToNode;
+    private final ExpectedSchemaNodeToIATypeTranslatorVisitor typeBuilder;
+
+    public ExpectedSchemaBuilder() {
+        varToNode = new HashMap<>();
+        typeBuilder = new ExpectedSchemaNodeToIATypeTranslatorVisitor();
+    }
+
+    public DataProjectionInfo createProjectionInfo(LogicalVariable 
recordVariable) {
+        IExpectedSchemaNode rootNode = varToNode.get(recordVariable);
+        Map<String, FunctionCallInformation> sourceInformation = new 
HashMap<>();
+        typeBuilder.reset(sourceInformation);
+        ARecordType recordType = (ARecordType) rootNode.accept(typeBuilder, 
null);
+        return new DataProjectionInfo(recordType, sourceInformation);
+    }
+
+    public boolean setSchemaFromExpression(AbstractFunctionCallExpression 
expr, LogicalVariable producedVar) {
+        //Parent always nested
+        AbstractComplexExpectedSchemaNode parent = 
(AbstractComplexExpectedSchemaNode) buildNestedNode(expr);
+        if (parent != null) {
+            IExpectedSchemaNode leaf =
+                    new AnyExpectedSchemaNode(parent, 
expr.getSourceLocation(), expr.getFunctionIdentifier().getName());
+            addChild(expr, parent, leaf);
+            if (producedVar != null) {
+                //Register the node if a variable is produced
+                varToNode.put(producedVar, leaf);
+            }
+        }
+        return parent != null;
+    }
+
+    public void registerDataset(LogicalVariable recordVar, 
RootExpectedSchemaNode rootNode) {
+        varToNode.put(recordVar, rootNode);
+    }
+
+    public void unregisterVariable(LogicalVariable variable) {
+        //Remove the node so no other expression will pushdown any expression 
in the future
+        IExpectedSchemaNode node = varToNode.remove(variable);
+        AbstractComplexExpectedSchemaNode parent = node.getParent();
+        if (parent == null) {
+            //It is a root node. Request the entire record
+            varToNode.put(variable, 
RootExpectedSchemaNode.ALL_FIELDS_ROOT_NODE);
+        } else {
+            //It is a nested node. Replace the node to a LEAF node
+            node.replaceIfNeeded(ExpectedSchemaNodeType.ANY, 
parent.getSourceLocation(), parent.getFunctionName());
+        }
+    }
+
+    public boolean isVariableRegistered(LogicalVariable recordVar) {
+        return varToNode.containsKey(recordVar);
+    }
+
+    public boolean containsRegisteredDatasets() {
+        return !varToNode.isEmpty();
+    }
+
+    private IExpectedSchemaNode buildNestedNode(ILogicalExpression expr) {
+        //The current node expression
+        AbstractFunctionCallExpression myExpr = 
(AbstractFunctionCallExpression) expr;
+        if (!SUPPORTED_FUNCTIONS.contains(myExpr.getFunctionIdentifier())) {
+            //Return null if the function is not supported.
+            return null;
+        }
+
+        //The parent expression
+        ILogicalExpression parentExpr = 
myExpr.getArguments().get(0).getValue();
+        if (isVariable(parentExpr)) {
+            //A variable could be the record's originated from data-scan or an 
expression from assign
+            LogicalVariable sourceVar = 
VariableUtilities.getVariable(parentExpr);
+            return changeNodeForVariable(sourceVar, myExpr);
+        }
+
+        //Recursively create the parent nodes. Parent is always a nested node
+        AbstractComplexExpectedSchemaNode newParent = 
(AbstractComplexExpectedSchemaNode) buildNestedNode(parentExpr);
+        //newParent could be null if the expression is not supported
+        if (newParent != null) {
+            //Parent expression must be a function call (as parent is a nested 
node)
+            AbstractFunctionCallExpression parentFuncExpr = 
(AbstractFunctionCallExpression) parentExpr;
+            //Get 'myType' as we will create the child type of the newParent
+            ExpectedSchemaNodeType myType = getExpectedNestedNodeType(myExpr);
+            /*
+             * Create 'myNode'. It is a nested node because the function is 
either getField() or supported array
+             * function
+             */
+            AbstractComplexExpectedSchemaNode myNode = 
AbstractComplexExpectedSchemaNode.createNestedNode(myType,
+                    newParent, myExpr.getSourceLocation(), 
myExpr.getFunctionIdentifier().getName());
+            //Add myNode to the parent
+            addChild(parentFuncExpr, newParent, myNode);
+            return myNode;
+        }
+        return null;
+    }
+
+    private IExpectedSchemaNode changeNodeForVariable(LogicalVariable 
sourceVar,
+            AbstractFunctionCallExpression myExpr) {
+        //Get the associated node with the sourceVar (if any)
+        IExpectedSchemaNode oldNode = varToNode.get(sourceVar);
+        if (oldNode == null) {
+            //Variable is not associated with a node. No pushdown is possible
+            return null;
+        }
+        //What is the expected type of the variable
+        ExpectedSchemaNodeType varExpectedType = 
getExpectedNestedNodeType(myExpr);
+        // Get the node associated with the variable (or change its type if 
needed).
+        IExpectedSchemaNode newNode = oldNode.replaceIfNeeded(varExpectedType, 
myExpr.getSourceLocation(),
+                myExpr.getFunctionIdentifier().getName());
+        //Map the sourceVar to the node
+        varToNode.put(sourceVar, newNode);
+        return newNode;
+    }
+
+    private void addChild(AbstractFunctionCallExpression parentExpr, 
AbstractComplexExpectedSchemaNode parent,
+            IExpectedSchemaNode child) {
+        switch (parent.getType()) {
+            case OBJECT:
+                handleObject(parentExpr, parent, child);
+                break;
+            case ARRAY:
+                handleArray(parent, child);
+                break;
+            case UNION:
+                handleUnion(parentExpr, parent, child);
+                break;
+            default:
+                throw new IllegalStateException("Node " + parent.getType() + " 
is not nested");
+
+        }
+    }
+
+    private void handleObject(AbstractFunctionCallExpression parentExpr, 
AbstractComplexExpectedSchemaNode parent,
+            IExpectedSchemaNode child) {
+        ObjectExpectedSchemaNode objectNode = (ObjectExpectedSchemaNode) 
parent;
+        
objectNode.addChild(ConstantExpressionUtil.getStringArgument(parentExpr, 1), 
child);
+    }
+
+    private void handleArray(AbstractComplexExpectedSchemaNode parent, 
IExpectedSchemaNode child) {
+        ArrayExpectedSchemaNode arrayNode = (ArrayExpectedSchemaNode) parent;
+        arrayNode.addChild(child);
+    }
+
+    private void handleUnion(AbstractFunctionCallExpression parentExpr, 
AbstractComplexExpectedSchemaNode parent,
+            IExpectedSchemaNode child) {
+        UnionExpectedSchemaNode unionNode = (UnionExpectedSchemaNode) parent;
+        ExpectedSchemaNodeType parentType = 
getExpectedNestedNodeType(parentExpr);
+        addChild(parentExpr, unionNode.getChild(parentType), child);
+    }
+
+    private static ExpectedSchemaNodeType 
getExpectedNestedNodeType(AbstractFunctionCallExpression funcExpr) {
+        FunctionIdentifier fid = funcExpr.getFunctionIdentifier();
+        if (BuiltinFunctions.FIELD_ACCESS_BY_NAME.equals(fid)) {
+            return ExpectedSchemaNodeType.OBJECT;
+        } else if (ARRAY_FUNCTIONS.contains(fid)) {
+            return ExpectedSchemaNodeType.ARRAY;
+        }
+        throw new IllegalStateException("Function " + fid + " should not be 
pushed down");
+    }
+
+    private static boolean isVariable(ILogicalExpression expr) {
+        return expr.getExpressionTag() == LogicalExpressionTag.VARIABLE;
+    }
+}
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
new file mode 100644
index 0000000..c746994
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpectedSchemaNodeToIATypeTranslatorVisitor.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.om.types.AOrderedListType;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.AUnionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.AbstractComplexExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.ArrayExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNodeVisitor;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.ObjectExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.UnionExpectedSchemaNode;
+import org.apache.asterix.runtime.projection.DataProjectionInfo;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+
+/**
+ * This visitor translates the {@link IExpectedSchemaNode} to {@link IAType} 
record.
+ * The {@link IAType#getTypeName()} is used to map each {@link IAType} to its 
{@link FunctionCallInformation}
+ */
+class ExpectedSchemaNodeToIATypeTranslatorVisitor implements 
IExpectedSchemaNodeVisitor<IAType, String> {
+    //Map typeName to source information
+    private Map<String, FunctionCallInformation> sourceInformationMap;
+    //To give a unique name for each type
+    private int counter;
+
+    public void reset(Map<String, FunctionCallInformation> 
sourceInformationMap) {
+        this.sourceInformationMap = sourceInformationMap;
+    }
+
+    @Override
+    public IAType visit(RootExpectedSchemaNode node, String arg) {
+        if (node.isAllFields()) {
+            return DataProjectionInfo.ALL_FIELDS_TYPE;
+        } else if (node.isEmpty()) {
+            return DataProjectionInfo.EMPTY_TYPE;
+        }
+        return createRecordType(node, String.valueOf(counter++));
+    }
+
+    @Override
+    public IAType visit(ObjectExpectedSchemaNode node, String arg) {
+        IAType recordType = createRecordType(node, arg);
+        sourceInformationMap.put(arg, createFunctionCallInformation(node));
+        return recordType;
+    }
+
+    @Override
+    public IAType visit(ArrayExpectedSchemaNode node, String arg) {
+        IAType itemType = node.getChild().accept(this, 
String.valueOf(counter++));
+        IAType listType = new AOrderedListType(itemType, arg);
+        sourceInformationMap.put(arg, createFunctionCallInformation(node));
+        return listType;
+    }
+
+    @Override
+    public IAType visit(UnionExpectedSchemaNode node, String arg) {
+        List<IAType> unionTypes = new ArrayList<>();
+        for (Map.Entry<ExpectedSchemaNodeType, 
AbstractComplexExpectedSchemaNode> child : node.getChildren()) {
+            unionTypes.add(child.getValue().accept(this, 
String.valueOf(counter++)));
+        }
+        IAType unionType = new AUnionType(unionTypes, arg);
+        sourceInformationMap.put(arg, createFunctionCallInformation(node));
+        return unionType;
+    }
+
+    @Override
+    public IAType visit(AnyExpectedSchemaNode node, String arg) {
+        return BuiltinType.ANY;
+    }
+
+    private ARecordType createRecordType(ObjectExpectedSchemaNode node, String 
arg) {
+        Set<Map.Entry<String, IExpectedSchemaNode>> children = 
node.getChildren();
+        String[] childrenFieldNames = new String[children.size()];
+        IAType[] childrenTypes = new IAType[children.size()];
+        int i = 0;
+        for (Map.Entry<String, IExpectedSchemaNode> child : children) {
+            childrenFieldNames[i] = child.getKey();
+            childrenTypes[i++] = child.getValue().accept(this, 
String.valueOf(counter++));
+        }
+
+        return new ARecordType(arg, childrenFieldNames, childrenTypes, true);
+    }
+
+    private FunctionCallInformation 
createFunctionCallInformation(IExpectedSchemaNode node) {
+        return new FunctionCallInformation(node.getFunctionName(), 
node.getSourceLocation());
+    }
+}
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpressionValueAccessPushdownVisitor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpressionValueAccessPushdownVisitor.java
new file mode 100644
index 0000000..5616f3f
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/ExpressionValueAccessPushdownVisitor.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
+import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalExpressionReferenceTransform;
+
+class ExpressionValueAccessPushdownVisitor implements 
ILogicalExpressionReferenceTransform {
+    //Set of supported type-check functions
+    static final Set<FunctionIdentifier> TYPE_CHECK_FUNCTIONS = 
createSupportedTypeCheckFunctions();
+    //Set of supported array functions
+    static final Set<FunctionIdentifier> ARRAY_FUNCTIONS = 
createSupportedArrayFunctions();
+    //Set of supported functions that we can pushdown
+    static final Set<FunctionIdentifier> SUPPORTED_FUNCTIONS = 
createSupportedFunctions();
+
+    private final ExpectedSchemaBuilder builder;
+    private List<LogicalVariable> producedVariables;
+    private int producedVariableIndex;
+
+    public ExpressionValueAccessPushdownVisitor(ExpectedSchemaBuilder builder) 
{
+        this.builder = builder;
+        end();
+    }
+
+    public void init(List<LogicalVariable> producedVariables) {
+        this.producedVariables = producedVariables;
+        producedVariableIndex = 0;
+    }
+
+    @Override
+    public boolean transform(Mutable<ILogicalExpression> expression) throws 
AlgebricksException {
+        if (producedVariableIndex == -1) {
+            //This for ensuring that the produced variables (if any) should be 
set
+            throw new IllegalStateException("init must be called first");
+        }
+        pushValueAccessExpression(expression, getNextProducedVariable());
+        return false;
+    }
+
+    public void end() {
+        producedVariables = null;
+        producedVariableIndex = -1;
+    }
+
+    private LogicalVariable getNextProducedVariable() {
+        LogicalVariable variable = producedVariables != null ? 
producedVariables.get(producedVariableIndex) : null;
+        producedVariableIndex++;
+        return variable;
+    }
+
+    /**
+     * Pushdown field access expressions and array access expressions down
+     */
+    private void pushValueAccessExpression(Mutable<ILogicalExpression> 
exprRef, LogicalVariable producedVar) {
+        final ILogicalExpression expr = exprRef.getValue();
+        if (skipPushdown(expr)) {
+            return;
+        }
+
+        final AbstractFunctionCallExpression funcExpr = 
(AbstractFunctionCallExpression) expr;
+
+        if (isSuccessfullyPushedDown(funcExpr, producedVar)) {
+            //We successfully pushed down the value access function
+            return;
+        }
+
+        //Check nested arguments if contains any pushable value access
+        pushValueAccessExpressionArg(funcExpr.getArguments());
+    }
+
+    /**
+     * Check if we can pushdown an expression. Also, unregister a variable if 
we found that a common expression value is
+     * required in its entirety.
+     */
+    private boolean skipPushdown(ILogicalExpression expr) {
+        if (expr.getExpressionTag() == LogicalExpressionTag.VARIABLE) {
+            LogicalVariable variable = VariableUtilities.getVariable(expr);
+            unregisterVariableIfNeeded(variable);
+            return true;
+        }
+        return expr.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL 
|| !builder.containsRegisteredDatasets()
+                || isTypeCheckOnVariable(expr);
+    }
+
+    /**
+     * If the expression is a type-check function on a variable. We should 
stop as we do not want to unregister
+     * the variable used by the type-check function.
+     * <p>
+     * Example:
+     * SELECT p.personInfo.name
+     * FROM Person p
+     * WHERE p.personInfo IS NOT MISSING;
+     * <p>
+     * Plan:
+     * ...
+     * assign [$$17] <- [$$18.getField(\"name\")]
+     * select (not(is-missing($$18)))
+     * ...
+     * assign [$$18] <- [$$p.getField(\"personInfo\")]
+     * ...
+     * data-scan []<-[$$p] <- test.ParquetDataset project 
({personInfo:{name:VALUE}})
+     * <p>
+     * In this case, is-missing($$18) could unregister $$18 since it requires 
the entire value (personInfo) and we
+     * won't be able to pushdown the access of (personInfo.name). This check 
would allow (personInfo.name) to be
+     * pushed down to data scan.
+     *
+     * @param expression expression
+     * @return if the function is a type-check function and has a variable 
argument.
+     */
+    private boolean isTypeCheckOnVariable(ILogicalExpression expression) {
+        AbstractFunctionCallExpression funcExpr = 
(AbstractFunctionCallExpression) expression;
+        return TYPE_CHECK_FUNCTIONS.contains(funcExpr.getFunctionIdentifier())
+                && 
funcExpr.getArguments().get(0).getValue().getExpressionTag() == 
LogicalExpressionTag.VARIABLE;
+    }
+
+    private void 
pushValueAccessExpressionArg(List<Mutable<ILogicalExpression>> exprList) {
+        for (Mutable<ILogicalExpression> exprRef : exprList) {
+            /*
+             * We need to set the produced variable as null here as the 
produced variable will not correspond to the
+             * nested expression.
+             */
+            pushValueAccessExpression(exprRef, null);
+        }
+    }
+
+    private boolean isSuccessfullyPushedDown(AbstractFunctionCallExpression 
funcExpr, LogicalVariable producedVar) {
+        return SUPPORTED_FUNCTIONS.contains(funcExpr.getFunctionIdentifier())
+                && builder.setSchemaFromExpression(funcExpr, producedVar);
+    }
+
+    private void unregisterVariableIfNeeded(LogicalVariable variable) {
+        if (builder.isVariableRegistered(variable)) {
+            builder.unregisterVariable(variable);
+        }
+    }
+
+    private static Set<FunctionIdentifier> createSupportedArrayFunctions() {
+        return Set.of(BuiltinFunctions.GET_ITEM, BuiltinFunctions.ARRAY_STAR, 
BuiltinFunctions.SCAN_COLLECTION);
+    }
+
+    private static Set<FunctionIdentifier> createSupportedFunctions() {
+        Set<FunctionIdentifier> supportedFunctions = new HashSet<>();
+        //For objects, only field-access-by-name is supported
+        supportedFunctions.add(BuiltinFunctions.FIELD_ACCESS_BY_NAME);
+        supportedFunctions.addAll(ARRAY_FUNCTIONS);
+        return supportedFunctions;
+    }
+
+    private static Set<FunctionIdentifier> createSupportedTypeCheckFunctions() 
{
+        return Set.of(BuiltinFunctions.IS_ARRAY, BuiltinFunctions.IS_OBJECT, 
BuiltinFunctions.IS_ATOMIC,
+                BuiltinFunctions.IS_NUMBER, BuiltinFunctions.IS_BOOLEAN, 
BuiltinFunctions.IS_STRING,
+                BuiltinFunctions.IS_MISSING, BuiltinFunctions.IS_NULL, 
BuiltinFunctions.IS_UNKNOWN);
+    }
+}
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/OperatorValueAccessPushdownVisitor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/OperatorValueAccessPushdownVisitor.java
new file mode 100644
index 0000000..6739384
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/OperatorValueAccessPushdownVisitor.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.DatasetDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.ExternalDatasetDetails;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.RootExpectedSchemaNode;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AggregateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DelegateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistinctOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.EmptyTupleSourceOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ExchangeOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ForwardOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.GroupByOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IndexInsertDeleteUpsertOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InnerJoinOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.InsertDeleteUpsertOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IntersectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LeftOuterUnnestOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.LimitOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.MaterializeOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.NestedTupleSourceOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ProjectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ReplicateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.RunningAggregateOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.ScriptOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SinkOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SplitOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.SubplanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.TokenizeOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnionAllOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.WriteResultOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
+
+/**
+ * This visitor visits the entire plan and tries to build the information of 
the required values from all dataset
+ */
+public class OperatorValueAccessPushdownVisitor implements 
ILogicalOperatorVisitor<Void, Void> {
+
+    private final IOptimizationContext context;
+    //Requested schema builder. It is only expected schema not a definite one
+    private final ExpectedSchemaBuilder builder;
+    //To visit every expression in each operator
+    private final ExpressionValueAccessPushdownVisitor pushdownVisitor;
+    //Datasets that allow pushdowns
+    private final Map<LogicalVariable, DataSourceScanOperator> 
registeredDatasets;
+    //visitedOperators so we do not visit the same operator twice (in case of 
REPLICATE)
+    private final Set<ILogicalOperator> visitedOperators;
+
+    public OperatorValueAccessPushdownVisitor(IOptimizationContext context) {
+        this.context = context;
+        builder = new ExpectedSchemaBuilder();
+        registeredDatasets = new HashMap<>();
+        pushdownVisitor = new ExpressionValueAccessPushdownVisitor(builder);
+        visitedOperators = new HashSet<>();
+    }
+
+    public void finish() {
+        for (Map.Entry<LogicalVariable, DataSourceScanOperator> scan : 
registeredDatasets.entrySet()) {
+            
scan.getValue().setProjectionInfo(builder.createProjectionInfo(scan.getKey()));
+        }
+    }
+
+    /**
+     * Visit every input of an operator. Then, start pushdown any value 
expression that the operator has
+     *
+     * @param op                the operator to process
+     * @param producedVariables any produced variables by the operator. We 
only care about the {@link AssignOperator}
+     *                          and {@link UnnestOperator} variables for now.
+     */
+    private void visitInputs(ILogicalOperator op, List<LogicalVariable> 
producedVariables) throws AlgebricksException {
+        if (visitedOperators.contains(op)) {
+            return;
+        }
+        for (Mutable<ILogicalOperator> child : op.getInputs()) {
+            child.getValue().accept(this, null);
+        }
+        visitedOperators.add(op);
+        //Initiate the pushdown visitor
+        pushdownVisitor.init(producedVariables);
+        //pushdown any expression the operator has
+        op.acceptExpressionTransform(pushdownVisitor);
+        pushdownVisitor.end();
+    }
+
+    /*
+     * 
******************************************************************************
+     * Operators that need to handle special cases
+     * 
******************************************************************************
+     */
+
+    @Override
+    public Void visitProjectOperator(ProjectOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        if (op.getVariables().isEmpty()) {
+            //If the variables are empty and the next operator is 
DataSourceScanOperator, then set empty record
+            setEmptyRecord(op.getInputs().get(0).getValue());
+        }
+        return null;
+    }
+
+    /**
+     * From the {@link DataSourceScanOperator}, we need to register the 
payload variable (record variable) to check
+     * which expression in the plan is using it.
+     */
+    @Override
+    public Void visitDataScanOperator(DataSourceScanOperator op, Void arg) 
throws AlgebricksException {
+        visitInputs(op);
+        DatasetDataSource datasetDataSource = 
getDatasetDataSourceIfApplicable(op);
+        if (datasetDataSource != null) {
+            LogicalVariable recordVar = 
datasetDataSource.getDataRecordVariable(op.getVariables());
+            if (!builder.isVariableRegistered(recordVar)) {
+                /*
+                 * This is the first time we see the dataset, and we know we 
might only need part of the record.
+                 * Register the dataset to prepare for value access expression 
pushdowns.
+                 * Initially, we will request the entire record.
+                 */
+                builder.registerDataset(recordVar, 
RootExpectedSchemaNode.ALL_FIELDS_ROOT_NODE);
+                registeredDatasets.put(recordVar, op);
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Void visitAggregateOperator(AggregateOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        if (!op.isGlobal() && isCountConstant(op.getExpressions())) {
+            /*
+             * Optimize the SELECT COUNT(*) case
+             * It is local aggregate and has agg-sql-count function with a 
constant argument. Set empty record if the
+             * input operator is DataSourceScanOperator
+             */
+            setEmptyRecord(op.getInputs().get(0).getValue());
+        }
+        return null;
+    }
+
+    /*
+     * 
******************************************************************************
+     * Helper methods
+     * 
******************************************************************************
+     */
+
+    /**
+     * The role of this method is:
+     * 1- Check whether the dataset is an external dataset and allows value 
access pushdowns
+     * 2- return the actual DatasetDataSource
+     */
+    private DatasetDataSource 
getDatasetDataSourceIfApplicable(DataSourceScanOperator scan) throws 
AlgebricksException {
+        DataSource dataSource = (DataSource) scan.getDataSource();
+        if (dataSource == null) {
+            return null;
+        }
+
+        MetadataProvider mp = (MetadataProvider) context.getMetadataProvider();
+        DataverseName dataverse = dataSource.getId().getDataverseName();
+        String datasetName = dataSource.getId().getDatasourceName();
+        Dataset dataset = mp.findDataset(dataverse, datasetName);
+
+        //Only external dataset can have pushed down expressions
+        if (dataset == null || dataset.getDatasetType() == 
DatasetConfig.DatasetType.INTERNAL
+                || dataset.getDatasetType() == 
DatasetConfig.DatasetType.EXTERNAL && !ExternalDataUtils
+                        .supportsPushdown(((ExternalDatasetDetails) 
dataset.getDatasetDetails()).getProperties())) {
+            return null;
+        }
+
+        return (DatasetDataSource) dataSource;
+    }
+
+    /**
+     * If the inputOp is a {@link DataSourceScanOperator}, then set the 
projected value needed as empty record
+     *
+     * @param inputOp an operator that is potentially a {@link 
DataSourceScanOperator}
+     * @see #visitAggregateOperator(AggregateOperator, Void)
+     * @see #visitProjectOperator(ProjectOperator, Void)
+     */
+    private void setEmptyRecord(ILogicalOperator inputOp) throws 
AlgebricksException {
+        if (inputOp.getOperatorTag() == LogicalOperatorTag.DATASOURCESCAN) {
+            DataSourceScanOperator scan = (DataSourceScanOperator) inputOp;
+            DatasetDataSource datasetDataSource = 
getDatasetDataSourceIfApplicable(scan);
+            if (datasetDataSource != null) {
+                //We know that we only need the count of objects. So return 
empty objects only
+                LogicalVariable recordVar = 
datasetDataSource.getDataRecordVariable(scan.getVariables());
+                /*
+                 * Set the root node as EMPTY_ROOT_NODE (i.e., no fields will 
be read from disk). We register the
+                 * dataset with EMPTY_ROOT_NODE so that we skip pushdowns on 
empty node.
+                 */
+                builder.registerDataset(recordVar, 
RootExpectedSchemaNode.EMPTY_ROOT_NODE);
+            }
+        }
+    }
+
+    private boolean isCountConstant(List<Mutable<ILogicalExpression>> 
expressions) {
+        if (expressions.size() != 1) {
+            return false;
+        }
+        ILogicalExpression expression = expressions.get(0).getValue();
+        if (expression.getExpressionTag() != 
LogicalExpressionTag.FUNCTION_CALL) {
+            return false;
+        }
+        AbstractFunctionCallExpression funcExpr = 
(AbstractFunctionCallExpression) expression;
+        FunctionIdentifier fid = funcExpr.getFunctionIdentifier();
+        return BuiltinFunctions.SQL_COUNT.equals(fid)
+                && 
funcExpr.getArguments().get(0).getValue().getExpressionTag() == 
LogicalExpressionTag.CONSTANT;
+    }
+
+    private void visitSubplans(List<ILogicalPlan> nestedPlans) throws 
AlgebricksException {
+        for (ILogicalPlan plan : nestedPlans) {
+            for (Mutable<ILogicalOperator> root : plan.getRoots()) {
+                visitInputs(root.getValue());
+            }
+        }
+    }
+
+    /*
+     * 
******************************************************************************
+     * Pushdown when possible for each operator
+     * 
******************************************************************************
+     */
+
+    @Override
+    public Void visitAssignOperator(AssignOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op, op.getVariables());
+        return null;
+    }
+
+    @Override
+    public Void visitSelectOperator(SelectOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitSubplanOperator(SubplanOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        visitSubplans(op.getNestedPlans());
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestOperator(UnnestOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op, op.getVariables());
+        return null;
+    }
+
+    @Override
+    public Void visitRunningAggregateOperator(RunningAggregateOperator op, 
Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitEmptyTupleSourceOperator(EmptyTupleSourceOperator op, 
Void arg) throws AlgebricksException {
+        return null;
+    }
+
+    @Override
+    public Void visitGroupByOperator(GroupByOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        visitSubplans(op.getNestedPlans());
+        return null;
+    }
+
+    @Override
+    public Void visitLimitOperator(LimitOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitInnerJoinOperator(InnerJoinOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterJoinOperator(LeftOuterJoinOperator op, Void arg) 
throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitNestedTupleSourceOperator(NestedTupleSourceOperator op, 
Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitOrderOperator(OrderOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitDelegateOperator(DelegateOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitReplicateOperator(ReplicateOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitSplitOperator(SplitOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitMaterializeOperator(MaterializeOperator op, Void arg) 
throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitScriptOperator(ScriptOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitSinkOperator(SinkOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitUnionOperator(UnionAllOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitIntersectOperator(IntersectOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterUnnestOperator(LeftOuterUnnestOperator op, Void 
arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitUnnestMapOperator(UnnestMapOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitLeftOuterUnnestMapOperator(LeftOuterUnnestMapOperator op, 
Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitDistinctOperator(DistinctOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitExchangeOperator(ExchangeOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteOperator(WriteOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitDistributeResultOperator(DistributeResultOperator op, 
Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitWriteResultOperator(WriteResultOperator op, Void arg) 
throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitInsertDeleteUpsertOperator(InsertDeleteUpsertOperator op, 
Void arg) throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void 
visitIndexInsertDeleteUpsertOperator(IndexInsertDeleteUpsertOperator op, Void 
arg)
+            throws AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitTokenizeOperator(TokenizeOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitForwardOperator(ForwardOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        return null;
+    }
+
+    @Override
+    public Void visitWindowOperator(WindowOperator op, Void arg) throws 
AlgebricksException {
+        visitInputs(op);
+        visitSubplans(op.getNestedPlans());
+        return null;
+    }
+
+    private void visitInputs(ILogicalOperator op) throws AlgebricksException {
+        visitInputs(op, null);
+    }
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ExternalDataProjectionInfo.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ExternalDataProjectionInfo.java
index fcbf522..9e8247e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ExternalDataProjectionInfo.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ExternalDataProjectionInfo.java
@@ -26,7 +26,12 @@ import 
org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionInfo;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
 
-public class ExternalDataProjectionInfo implements 
IProjectionInfo<List<String>> {
+/**
+ * TODO Use {@link org.apache.asterix.runtime.projection.DataProjectionInfo}
+ * Will be removed in a follow up change
+ */
+@Deprecated
+public class ExternalDataProjectionInfo implements 
IProjectionInfo<List<List<String>>> {
     private final List<List<String>> projectedFieldNames;
 
     public ExternalDataProjectionInfo() {
@@ -47,7 +52,7 @@ public class ExternalDataProjectionInfo implements 
IProjectionInfo<List<String>>
     }
 
     @Override
-    public IProjectionInfo<List<String>> createCopy() {
+    public IProjectionInfo<List<List<String>>> createCopy() {
         return new ExternalDataProjectionInfo(projectedFieldNames);
     }
 
@@ -61,6 +66,7 @@ public class ExternalDataProjectionInfo implements 
IProjectionInfo<List<String>>
                 && 
VariableUtilities.varListEqualUnordered(projectedFieldNames, 
otherProjectedFieldNames);
     }
 
+    @Override
     public String toString() {
         if (projectedFieldNames.isEmpty()) {
             return "";
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/DataProjectionInfo.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/DataProjectionInfo.java
new file mode 100644
index 0000000..de402ec
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/DataProjectionInfo.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.projection;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.visitor.SimpleStringBuilderForIATypeVisitor;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionInfo;
+
+public class DataProjectionInfo implements IProjectionInfo<ARecordType> {
+    //Default open record type when requesting the entire fields
+    public static final ARecordType ALL_FIELDS_TYPE = createType("");
+    //Default open record type when requesting none of the fields
+    public static final ARecordType EMPTY_TYPE = createType("{}");
+
+    private final ARecordType root;
+    private final Map<String, FunctionCallInformation> functionCallInfoMap;
+
+    public DataProjectionInfo(ARecordType root, Map<String, 
FunctionCallInformation> sourceInformationMap) {
+        this.root = root;
+        this.functionCallInfoMap = sourceInformationMap;
+    }
+
+    private DataProjectionInfo(DataProjectionInfo other) {
+        if (other.root == ALL_FIELDS_TYPE) {
+            root = ALL_FIELDS_TYPE;
+        } else if (other.root == EMPTY_TYPE) {
+            root = EMPTY_TYPE;
+        } else {
+            root = other.root.deepCopy(other.root);
+        }
+        functionCallInfoMap = new HashMap<>(other.functionCallInfoMap);
+    }
+
+    @Override
+    public ARecordType getProjectionInfo() {
+        return root;
+    }
+
+    @Override
+    public DataProjectionInfo createCopy() {
+        return new DataProjectionInfo(this);
+    }
+
+    public Map<String, FunctionCallInformation> getFunctionCallInfoMap() {
+        return functionCallInfoMap;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        DataProjectionInfo otherInfo = (DataProjectionInfo) o;
+        return root.deepEqual(otherInfo.root) && 
Objects.equals(functionCallInfoMap, otherInfo.functionCallInfoMap);
+    }
+
+    @Override
+    public String toString() {
+        if (root == ALL_FIELDS_TYPE || root == EMPTY_TYPE) {
+            //Return the type name if all fields or empty types
+            return root.getTypeName();
+        }
+        //Return a oneliner JSON like representation for the requested fields
+        StringBuilder builder = new StringBuilder();
+        SimpleStringBuilderForIATypeVisitor visitor = new 
SimpleStringBuilderForIATypeVisitor();
+        root.accept(visitor, builder);
+        return builder.toString();
+    }
+
+    /**
+     * Serialize expected record type
+     *
+     * @param expectedRecordType expected record type
+     * @param output             data output
+     */
+    public static void writeTypeField(ARecordType expectedRecordType, 
DataOutput output) throws IOException {
+        byte[] recordTypeBytes = 
SerializationUtils.serialize(expectedRecordType);
+        output.writeInt(recordTypeBytes.length);
+        output.write(recordTypeBytes);
+    }
+
+    /**
+     * Deserialize expected record type
+     *
+     * @param input data input
+     * @return deserialized expected record type
+     */
+    public static ARecordType createTypeField(DataInput input) throws 
IOException {
+        int length = input.readInt();
+        byte[] recordTypeBytes = new byte[length];
+        input.readFully(recordTypeBytes, 0, length);
+        return SerializationUtils.deserialize(recordTypeBytes);
+    }
+
+    /**
+     * Serialize function call information map
+     *
+     * @param functionCallInfoMap function information map
+     * @param output              data output
+     */
+    public static void writeFunctionCallInformationMapField(Map<String, 
FunctionCallInformation> functionCallInfoMap,
+            DataOutput output) throws IOException {
+        output.writeInt(functionCallInfoMap.size());
+        for (Map.Entry<String, FunctionCallInformation> info : 
functionCallInfoMap.entrySet()) {
+            output.writeUTF(info.getKey());
+            info.getValue().writeFields(output);
+        }
+    }
+
+    /**
+     * Deserialize function call information map
+     *
+     * @param input data input
+     * @return deserialized function call information map
+     */
+    public static Map<String, FunctionCallInformation> 
createFunctionCallInformationMap(DataInput input)
+            throws IOException {
+        int size = input.readInt();
+        Map<String, FunctionCallInformation> functionCallInfoMap = new 
HashMap<>();
+        for (int i = 0; i < size; i++) {
+            String key = input.readUTF();
+            FunctionCallInformation functionCallInfo = 
FunctionCallInformation.create(input);
+            functionCallInfoMap.put(key, functionCallInfo);
+        }
+        return functionCallInfoMap;
+    }
+
+    private static ARecordType createType(String typeName) {
+        return new ARecordType(typeName, new String[] {}, new IAType[] {}, 
true);
+    }
+}
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/FunctionCallInformation.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/FunctionCallInformation.java
new file mode 100644
index 0000000..5cb26fd
--- /dev/null
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/projection/FunctionCallInformation.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.runtime.projection;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.om.exceptions.ExceptionUtil;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+import org.apache.hyracks.api.exceptions.Warning;
+
+/**
+ * Function call information that holds {@link FunctionIdentifier#getName()} 
and {@link SourceLocation}
+ */
+public class FunctionCallInformation implements Serializable {
+    private static final long serialVersionUID = -7884346933746232736L;
+    private final String functionName;
+    private final SourceLocation sourceLocation;
+    private Set<ATypeTag> typeMismatches;
+
+    public FunctionCallInformation(String functionName, SourceLocation 
sourceLocation) {
+        this(functionName, sourceLocation, Collections.emptySet());
+    }
+
+    private FunctionCallInformation(String functionName, SourceLocation 
sourceLocation, Set<ATypeTag> typeMismatches) {
+        this.functionName = functionName;
+        this.sourceLocation = sourceLocation;
+        this.typeMismatches = typeMismatches;
+    }
+
+    public String getFunctionName() {
+        return functionName;
+    }
+
+    public SourceLocation getSourceLocation() {
+        return sourceLocation;
+    }
+
+    public Warning createTypeMismatchWarning(ATypeTag expectedType, ATypeTag 
actualType) {
+        if (typeMismatches == null) {
+            typeMismatches = EnumSet.noneOf(ATypeTag.class);
+        } else if (typeMismatches.contains(actualType)) {
+            //We already issued a warning containing the same actual type. So, 
we ignore it
+            return null;
+        }
+        typeMismatches.add(actualType);
+        return Warning.of(getSourceLocation(), 
ErrorCode.TYPE_MISMATCH_FUNCTION, getFunctionName(),
+                ExceptionUtil.indexToPosition(0), expectedType, actualType);
+    }
+
+    public void writeFields(DataOutput output) throws IOException {
+        output.writeUTF(functionName);
+        SourceLocation.writeFields(sourceLocation, output);
+        output.writeInt(typeMismatches.size());
+        for (ATypeTag typeTag : typeMismatches) {
+            output.write(typeTag.serialize());
+        }
+    }
+
+    public static FunctionCallInformation create(DataInput in) throws 
IOException {
+        String functionName = in.readUTF();
+        SourceLocation sourceLocation = SourceLocation.create(in);
+        int typeMismatchesLength = in.readInt();
+        Set<ATypeTag> typeMismatches = EnumSet.noneOf(ATypeTag.class);
+        for (int i = 0; i < typeMismatchesLength; i++) {
+            typeMismatches.add(ATypeTag.VALUE_TYPE_MAPPING[in.readByte()]);
+        }
+        return new FunctionCallInformation(functionName, sourceLocation, 
typeMismatches);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(functionName, sourceLocation);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        FunctionCallInformation that = (FunctionCallInformation) o;
+        return Objects.equals(functionName, that.functionName) && 
Objects.equals(sourceLocation, that.sourceLocation);
+    }
+}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionInfo.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionInfo.java
index 9de591e..3c1a24d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionInfo.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IProjectionInfo.java
@@ -18,17 +18,15 @@
  */
 package org.apache.hyracks.algebricks.core.algebra.metadata;
 
-import java.util.List;
-
 /**
  * Generic interface to include the projection information for
  * {@link 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator}
  */
 public interface IProjectionInfo<T> {
     /**
-     * @return list of projected values' information
+     * @return projected values' information
      */
-    List<T> getProjectionInfo();
+    T getProjectionInfo();
 
     /**
      * @return a copy of the {@link IProjectionInfo}
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
index 059a357..69db58d 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/logical/visitors/VariableUtilities.java
@@ -30,8 +30,11 @@ import java.util.Set;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext;
 import 
org.apache.hyracks.algebricks.core.algebra.visitors.ILogicalOperatorVisitor;
 
@@ -179,4 +182,10 @@ public class VariableUtilities {
         return varSet.equals(varArgSet);
     }
 
+    public static LogicalVariable getVariable(ILogicalExpression expr) {
+        if (expr != null && expr.getExpressionTag() == 
LogicalExpressionTag.VARIABLE) {
+            return ((VariableReferenceExpression) expr).getVariableReference();
+        }
+        return null;
+    }
 }

Reply via email to