[ASTERIXDB-2194][COMP] Introduce datasource functions

- user model changes: yes
  Some functions can be datasources
- storage format changes: no
- interface changes: yes
  - Add IDatasourceFunction: A function that is also a datasource
  - Add IFunctionToDataSourceTransformer: transform an unnest
    function into a datascan during compilation

Details:
- Currently, functions are location agnostic and are run on
  parameters that are either passed through them during compile
  time or runtime.
- An exception to this is the dataset function which has
  an associated location constraints running on the nodes
  which host the dataset.
- In this change, we introduce a general framework that allows
  creation of new functions similar to the dataset function.
- Such functions are called datasource Functions.
- A datasource function takes constant parameters and run on
  a set of partitions similar to the dataset function.
- The first example of such functions is the DatasetResources
  function.
- The DatasetResources function takes two parameters, a dataverse
  and a dataset. It is then run on all nodes and returns a set
  of dataset resources.
- Test cases are added for this function.

Change-Id: Ibcf807ac713a21e8f4d59868525467386e801303
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2216
Sonar-Qube: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: abdullah alamoudi <[email protected]>
Tested-by: abdullah alamoudi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b4d166b3
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b4d166b3
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b4d166b3

Branch: refs/heads/master
Commit: b4d166b3ca042ce34d737f5d2a4fb758fa45d3e5
Parents: 26cc908
Author: Abdullah Alamoudi <[email protected]>
Authored: Sun Dec 17 11:34:33 2017 +0300
Committer: abdullah alamoudi <[email protected]>
Committed: Sun Dec 17 05:00:40 2017 -0800

----------------------------------------------------------------------
 .../asterix/optimizer/base/AnalysisUtil.java    |  14 +-
 .../rules/MetaFunctionToMetaVariableRule.java   |   3 +-
 .../optimizer/rules/UnnestToDataScanRule.java   | 244 +------------------
 ...IntroducePrimaryIndexForAggregationRule.java |   8 +-
 .../function/DatasetResourcesDatasource.java    |  43 ++++
 .../app/function/DatasetResourcesFunction.java  |  49 ++++
 .../app/function/DatasetResourcesReader.java    |  68 ++++++
 .../app/function/DatasetResourcesRewriter.java  |  52 ++++
 .../asterix/app/function/DatasetRewriter.java   | 192 +++++++++++++++
 .../asterix/app/function/FeedRewriter.java      | 209 ++++++++++++++++
 .../asterix/app/function/FunctionReader.java    |  55 +++++
 .../asterix/app/function/FunctionRewriter.java  |  99 ++++++++
 .../function/StorageComponentsDatasource.java   |  43 ++++
 .../app/function/StorageComponentsFunction.java |  51 ++++
 .../app/function/StorageComponentsReader.java   |  96 ++++++++
 .../app/function/StorageComponentsRewriter.java |  52 ++++
 .../hyracks/bootstrap/CCApplication.java        |   2 +
 .../hyracks/bootstrap/NCApplication.java        |   2 +
 .../asterix/util/MetadataBuiltinFunctions.java  |  60 +++++
 .../apache/asterix/utils/FeedOperations.java    |   4 +-
 .../dataset-resources.1.ddl.sqlpp               |  50 ++++
 .../dataset-resources.2.update.sqlpp            |  24 ++
 .../dataset-resources.3.query.sqlpp             |  20 ++
 .../dataset-resources.4.query.sqlpp             |  20 ++
 .../dataset-resources.5.query.sqlpp             |  21 ++
 .../dataset-resources/dataset-resources.3.adm   |   1 +
 .../dataset-resources/dataset-resources.4.adm   |   1 +
 .../dataset-resources/dataset-resources.5.adm   |   1 +
 .../resources/runtimets/testsuite_sqlpp.xml     |   5 +
 .../adapter/factory/GenericAdapterFactory.java  |  19 ++
 .../asterix/external/util/FeedConstants.java    |   5 -
 .../metadata/api/IDatasourceFunction.java       |  48 ++++
 .../bootstrap/MetadataBuiltinEntities.java      |   6 +-
 .../declared/AbstractDatasourceFunction.java    |  37 +++
 .../asterix/metadata/declared/DataSource.java   |   1 +
 .../metadata/declared/FunctionDataSource.java   |  91 +++++++
 .../declared/FunctionDataSourceFactory.java     |  74 ++++++
 .../functions/MetadataBuiltinFunctions.java     | 185 --------------
 .../asterix/metadata/utils/DatasetUtil.java     |  15 ++
 .../asterix/om/functions/BuiltinFunctions.java  |  27 +-
 .../IFunctionToDataSourceRewriter.java          |  40 +++
 .../org/apache/asterix/om/utils/RecordUtil.java |   5 +-
 42 files changed, 1574 insertions(+), 468 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index 24fe8e78..8dca64b 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -84,14 +84,6 @@ public class AnalysisUtil {
         return fieldAccessFunctions.contains(fid);
     }
 
-    public static boolean isDataSetCall(ILogicalExpression e) {
-        if (((AbstractLogicalExpression) e).getExpressionTag() != 
LogicalExpressionTag.FUNCTION_CALL) {
-            return false;
-        }
-        AbstractFunctionCallExpression fe = (AbstractFunctionCallExpression) e;
-        return BuiltinFunctions.isDatasetFunction(fe.getFunctionIdentifier());
-    }
-
     public static boolean isRunnableAccessToFieldRecord(ILogicalExpression 
expr) {
         if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
             AbstractFunctionCallExpression fc = 
(AbstractFunctionCallExpression) expr;
@@ -129,17 +121,17 @@ public class AnalysisUtil {
 
     public static Pair<String, String> 
getDatasetInfo(AbstractDataSourceOperator op) throws AlgebricksException {
         DataSourceId srcId = (DataSourceId) op.getDataSource().getId();
-        return new Pair<String, String>(srcId.getDataverseName(), 
srcId.getDatasourceName());
+        return new Pair<>(srcId.getDataverseName(), srcId.getDatasourceName());
     }
 
     public static Pair<String, String> 
getExternalDatasetInfo(UnnestMapOperator op) throws AlgebricksException {
         AbstractFunctionCallExpression unnestExpr = 
(AbstractFunctionCallExpression) op.getExpressionRef().getValue();
         String dataverseName = 
AccessMethodUtils.getStringConstant(unnestExpr.getArguments().get(0));
         String datasetName = 
AccessMethodUtils.getStringConstant(unnestExpr.getArguments().get(1));
-        return new Pair<String, String>(dataverseName, datasetName);
+        return new Pair<>(dataverseName, datasetName);
     }
 
-    private static List<FunctionIdentifier> fieldAccessFunctions = new 
ArrayList<FunctionIdentifier>();
+    private static List<FunctionIdentifier> fieldAccessFunctions = new 
ArrayList<>();
 
     static {
         fieldAccessFunctions.add(BuiltinFunctions.GET_DATA);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
index f0917af..13ff0ee 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
@@ -87,7 +87,8 @@ public class MetaFunctionToMetaVariableRule implements 
IAlgebraicRewriteRule {
             // https://issues.apache.org/jira/browse/ASTERIXDB-1618
             if (dataSource.getDatasourceType() != 
DataSource.Type.EXTERNAL_DATASET
                     && dataSource.getDatasourceType() != 
DataSource.Type.INTERNAL_DATASET
-                    && dataSource.getDatasourceType() != 
DataSource.Type.LOADABLE) {
+                    && dataSource.getDatasourceType() != 
DataSource.Type.LOADABLE
+                    && dataSource.getDatasourceType() != 
DataSource.Type.FUNCTION) {
                 IMutationDataSource mds = (IMutationDataSource) dataSource;
                 if (mds.isChange()) {
                     transformers = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 4550ba6..0d02385 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -18,52 +18,17 @@
  */
 package org.apache.asterix.optimizer.rules;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.external.feed.watch.FeedActivityDetails;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.asterix.metadata.declared.DataSource;
-import org.apache.asterix.metadata.declared.DataSourceId;
-import org.apache.asterix.metadata.declared.FeedDataSource;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.FeedConnection;
-import org.apache.asterix.metadata.entities.FeedPolicyEntity;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.constants.AsterixConstantValue;
 import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
-import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
-import org.apache.asterix.translator.util.PlanTranslationUtil;
 import org.apache.commons.lang3.mutable.Mutable;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
 import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
 import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
 import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
 import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import 
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
 import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
 
 public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
@@ -77,213 +42,24 @@ public class UnnestToDataScanRule implements 
IAlgebraicRewriteRule {
     @Override
     public boolean rewritePost(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
             throws AlgebricksException {
+        AbstractFunctionCallExpression f = getFunctionCall(opRef);
+        if (f == null) {
+            return false;
+        }
+        return 
BuiltinFunctions.getDatasourceTransformer(f.getFunctionIdentifier()).rewrite(opRef,
 context);
+    }
+
+    public static AbstractFunctionCallExpression 
getFunctionCall(Mutable<ILogicalOperator> opRef) {
         AbstractLogicalOperator op = (AbstractLogicalOperator) 
opRef.getValue();
         if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) {
-            return false;
+            return null;
         }
         UnnestOperator unnest = (UnnestOperator) op;
         ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
         if (unnestExpr.getExpressionTag() != 
LogicalExpressionTag.FUNCTION_CALL) {
-            return false;
-        }
-        return handleFunction(opRef, context, unnest, 
(AbstractFunctionCallExpression) unnestExpr);
-    }
-
-    protected boolean handleFunction(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context,
-            UnnestOperator unnest, AbstractFunctionCallExpression f) throws 
AlgebricksException {
-        FunctionIdentifier fid = f.getFunctionIdentifier();
-        if (fid.equals(BuiltinFunctions.DATASET)) {
-            if (unnest.getPositionalVariable() != null) {
-                // TODO remove this after enabling the support of positional 
variables in data scan
-                throw new AlgebricksException("No positional variables are 
allowed over datasets.");
-            }
-            ILogicalExpression expr = f.getArguments().get(0).getValue();
-            if (expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
-                return false;
-            }
-            ConstantExpression ce = (ConstantExpression) expr;
-            IAlgebricksConstantValue acv = ce.getValue();
-            if (!(acv instanceof AsterixConstantValue)) {
-                return false;
-            }
-            AsterixConstantValue acv2 = (AsterixConstantValue) acv;
-            if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
-                return false;
-            }
-            String datasetArg = ((AString) acv2.getObject()).getStringValue();
-            MetadataProvider metadataProvider = (MetadataProvider) 
context.getMetadataProvider();
-            Pair<String, String> datasetReference = 
parseDatasetReference(metadataProvider, datasetArg);
-            String dataverseName = datasetReference.first;
-            String datasetName = datasetReference.second;
-            Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
-            if (dataset == null) {
-                throw new AlgebricksException(
-                        "Could not find dataset " + datasetName + " in 
dataverse " + dataverseName);
-            }
-            DataSourceId asid = new DataSourceId(dataverseName, datasetName);
-            List<LogicalVariable> variables = new ArrayList<>();
-            if (dataset.getDatasetType() == DatasetType.INTERNAL) {
-                int numPrimaryKeys = dataset.getPrimaryKeys().size();
-                for (int i = 0; i < numPrimaryKeys; i++) {
-                    variables.add(context.newVar());
-                }
-            }
-            variables.add(unnest.getVariable());
-            DataSource dataSource = metadataProvider.findDataSource(asid);
-            boolean hasMeta = dataSource.hasMeta();
-            if (hasMeta) {
-                variables.add(context.newVar());
-            }
-            DataSourceScanOperator scan = new 
DataSourceScanOperator(variables, dataSource);
-            List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
-            scanInpList.addAll(unnest.getInputs());
-            opRef.setValue(scan);
-            addPrimaryKey(variables, dataSource, context);
-            context.computeAndSetTypeEnvironmentForOperator(scan);
-            // Adds equivalence classes --- one equivalent class between a 
primary key
-            // variable and a record field-access expression.
-            IAType[] schemaTypes = dataSource.getSchemaTypes();
-            ARecordType recordType =
-                    (ARecordType) (hasMeta ? schemaTypes[schemaTypes.length - 
2] : schemaTypes[schemaTypes.length - 1]);
-            ARecordType metaRecordType = (ARecordType) (hasMeta ? 
schemaTypes[schemaTypes.length - 1] : null);
-            
EquivalenceClassUtils.addEquivalenceClassesForPrimaryIndexAccess(scan, 
variables, recordType,
-                    metaRecordType, dataset, context);
-            return true;
-        } else if (fid.equals(BuiltinFunctions.FEED_COLLECT)) {
-            if (unnest.getPositionalVariable() != null) {
-                throw new AlgebricksException("No positional variables are 
allowed over feeds.");
-            }
-            String dataverse = ConstantExpressionUtil.getStringArgument(f, 0);
-            String sourceFeedName = 
ConstantExpressionUtil.getStringArgument(f, 1);
-            String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 
2);
-            String subscriptionLocation = 
ConstantExpressionUtil.getStringArgument(f, 3);
-            String targetDataset = ConstantExpressionUtil.getStringArgument(f, 
4);
-            String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
-            MetadataProvider metadataProvider = (MetadataProvider) 
context.getMetadataProvider();
-            DataSourceId asid = new DataSourceId(dataverse, getTargetFeed);
-            String policyName = 
metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
-            FeedPolicyEntity policy = 
metadataProvider.findFeedPolicy(dataverse, policyName);
-            if (policy == null) {
-                policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
-                if (policy == null) {
-                    throw new AlgebricksException("Unknown feed policy:" + 
policyName);
-                }
-            }
-            ArrayList<LogicalVariable> feedDataScanOutputVariables = new 
ArrayList<>();
-            String csLocations = 
metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
-            List<LogicalVariable> pkVars = new ArrayList<>();
-            FeedDataSource ds = createFeedDataSource(asid, targetDataset, 
sourceFeedName, subscriptionLocation,
-                    metadataProvider, policy, outputType, csLocations, 
unnest.getVariable(), context, pkVars);
-            // The order for feeds is <Record-Meta-PK>
-            feedDataScanOutputVariables.add(unnest.getVariable());
-            // Does it produce meta?
-            if (ds.hasMeta()) {
-                feedDataScanOutputVariables.add(context.newVar());
-            }
-            // Does it produce pk?
-            if (ds.isChange()) {
-                feedDataScanOutputVariables.addAll(pkVars);
-            }
-            DataSourceScanOperator scan = new 
DataSourceScanOperator(feedDataScanOutputVariables, ds);
-            List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
-            scanInpList.addAll(unnest.getInputs());
-            opRef.setValue(scan);
-            context.computeAndSetTypeEnvironmentForOperator(scan);
-            return true;
-        }
-        return false;
-    }
-
-    private void addPrimaryKey(List<LogicalVariable> scanVariables, DataSource 
dataSource,
-            IOptimizationContext context) {
-        List<LogicalVariable> primaryKey = 
dataSource.getPrimaryKeyVariables(scanVariables);
-        List<LogicalVariable> tail = new ArrayList<>();
-        tail.addAll(scanVariables);
-        FunctionalDependency pk = new FunctionalDependency(primaryKey, tail);
-        context.addPrimaryKey(pk);
-    }
-
-    private FeedDataSource createFeedDataSource(DataSourceId aqlId, String 
targetDataset, String sourceFeedName,
-            String subscriptionLocation, MetadataProvider metadataProvider, 
FeedPolicyEntity feedPolicy,
-            String outputType, String locations, LogicalVariable recordVar, 
IOptimizationContext context,
-            List<LogicalVariable> pkVars) throws AlgebricksException {
-        if 
(!aqlId.getDataverseName().equals(metadataProvider.getDefaultDataverse() == 
null ? null
-                : metadataProvider.getDefaultDataverse().getDataverseName())) {
             return null;
         }
-        Dataset dataset = 
metadataProvider.findDataset(aqlId.getDataverseName(), targetDataset);
-        ARecordType feedOutputType = (ARecordType) 
metadataProvider.findType(aqlId.getDataverseName(), outputType);
-        Feed sourceFeed = metadataProvider.findFeed(aqlId.getDataverseName(), 
sourceFeedName);
-        FeedConnection feedConnection =
-                metadataProvider.findFeedConnection(aqlId.getDataverseName(), 
sourceFeedName, targetDataset);
-        ARecordType metaType = null;
-        // Does dataset have meta?
-        if (dataset.hasMetaPart()) {
-            String metaTypeName = 
FeedUtils.getFeedMetaTypeName(sourceFeed.getAdapterConfiguration());
-            if (metaTypeName == null) {
-                throw new AlgebricksException("Feed to a dataset with metadata 
doesn't have meta type specified");
-            }
-            String dataverseName = aqlId.getDataverseName();
-            if (metaTypeName.contains(".")) {
-                dataverseName = metaTypeName.substring(0, 
metaTypeName.indexOf('.'));
-                metaTypeName = 
metaTypeName.substring(metaTypeName.indexOf('.') + 1);
-            }
-            metaType = (ARecordType) metadataProvider.findType(dataverseName, 
metaTypeName);
-        }
-        // Is a change feed?
-        List<IAType> pkTypes = null;
-        List<List<String>> partitioningKeys = null;
-        List<Integer> keySourceIndicator = null;
-        List<Mutable<ILogicalExpression>> keyAccessExpression = null;
-        List<ScalarFunctionCallExpression> 
keyAccessScalarFunctionCallExpression;
-        if 
(ExternalDataUtils.isChangeFeed(sourceFeed.getAdapterConfiguration())) {
-            keyAccessExpression = new ArrayList<>();
-            keyAccessScalarFunctionCallExpression = new ArrayList<>();
-            pkTypes = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getPrimaryKeyType();
-            partitioningKeys = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getPartitioningKey();
-            if (dataset.hasMetaPart()) {
-                keySourceIndicator = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getKeySourceIndicator();
-            }
-            for (int i = 0; i < partitioningKeys.size(); i++) {
-                List<String> key = partitioningKeys.get(i);
-                if (keySourceIndicator == null || 
keySourceIndicator.get(i).intValue() == 0) {
-                    PlanTranslationUtil.prepareVarAndExpression(key, 
recordVar, pkVars, keyAccessExpression, null,
-                            context);
-                } else {
-                    PlanTranslationUtil.prepareMetaKeyAccessExpression(key, 
recordVar, keyAccessExpression, pkVars,
-                            null, context);
-                }
-            }
-            keyAccessExpression.forEach(
-                    expr -> 
keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression) 
expr.getValue()));
-        } else {
-            keyAccessExpression = null;
-            keyAccessScalarFunctionCallExpression = null;
-        }
-        FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) 
context.getMetadataProvider(), sourceFeed,
-                aqlId, targetDataset, feedOutputType, metaType, pkTypes, 
keyAccessScalarFunctionCallExpression,
-                sourceFeed.getFeedId(), 
FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
-                context.getComputationNodeDomain(), feedConnection);
-        
feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, 
feedPolicy);
-        return feedDataSource;
+        return (AbstractFunctionCallExpression) unnestExpr;
     }
 
-    private Pair<String, String> parseDatasetReference(MetadataProvider 
metadataProvider, String datasetArg)
-            throws AlgebricksException {
-        String[] datasetNameComponents = datasetArg.split("\\.");
-        String dataverseName;
-        String datasetName;
-        if (datasetNameComponents.length == 1) {
-            Dataverse defaultDataverse = 
metadataProvider.getDefaultDataverse();
-            if (defaultDataverse == null) {
-                throw new AlgebricksException("Unresolved dataset " + 
datasetArg + " Dataverse not specified.");
-            }
-            dataverseName = defaultDataverse.getDataverseName();
-            datasetName = datasetNameComponents[0];
-        } else {
-            dataverseName = datasetNameComponents[0];
-            datasetName = datasetNameComponents[1];
-        }
-        return new Pair<>(dataverseName, datasetName);
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
index 7633f4c..eaea208 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.metadata.declared.DataSource;
 import org.apache.asterix.metadata.declared.DatasetDataSource;
 import org.apache.asterix.metadata.declared.MetadataProvider;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -261,7 +262,12 @@ public class IntroducePrimaryIndexForAggregationRule 
implements IAlgebraicRewrit
         Dataset dataset;
         // case 1: dataset scan
         if (scanOperator.getOperatorTag() == 
LogicalOperatorTag.DATASOURCESCAN) {
-            dataset = 
((DatasetDataSource)((DataSourceScanOperator)scanOperator).getDataSource()).getDataset();
+            DataSourceScanOperator dss = (DataSourceScanOperator) scanOperator;
+            DataSource ds = (DataSource) dss.getDataSource();
+            if (ds.getDatasourceType() != DataSource.Type.INTERNAL_DATASET) {
+                return null;
+            }
+            dataset = ((DatasetDataSource) ds).getDataset();
         } else {
             // case 2: dataset range search
             AbstractFunctionCallExpression primaryIndexFunctionCall =

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java
new file mode 100644
index 0000000..e6b025a
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class DatasetResourcesDatasource extends FunctionDataSource {
+    private final int datasetId;
+
+    public DatasetResourcesDatasource(INodeDomain domain, int datasetId) 
throws AlgebricksException {
+        super(new 
DataSourceId(DatasetResourcesRewriter.DATASET_RESOURCES.getNamespace(),
+                DatasetResourcesRewriter.DATASET_RESOURCES.getName()), domain);
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    protected IDatasourceFunction createFunction(MetadataProvider 
metadataProvider,
+            AlgebricksAbsolutePartitionConstraint locations) {
+        return new DatasetResourcesFunction(locations, datasetId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java
new file mode 100644
index 0000000..05d192e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class DatasetResourcesFunction extends AbstractDatasourceFunction {
+
+    private static final long serialVersionUID = 1L;
+    private final int datasetId;
+
+    public DatasetResourcesFunction(AlgebricksAbsolutePartitionConstraint 
locations, int datasetId) {
+        super(locations);
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, 
int partition) {
+        INCServiceContext serviceCtx = 
ctx.getJobletContext().getServiceContext();
+        INcApplicationContext appCtx = (INcApplicationContext) 
serviceCtx.getApplicationContext();
+        DatasetLifecycleManager dsLifecycleMgr = (DatasetLifecycleManager) 
appCtx.getDatasetLifecycleManager();
+        DatasetResource dsr = dsLifecycleMgr.getDatasetLifecycle(datasetId);
+        return new DatasetResourcesReader(dsr);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java
new file mode 100644
index 0000000..bce2002
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class DatasetResourcesReader extends FunctionReader {
+
+    private final List<String> components;
+    private final Iterator<String> it;
+    private final CharArrayRecord record;
+
+    public DatasetResourcesReader(DatasetResource dsr) {
+        components = new ArrayList<>();
+        if (dsr != null && dsr.isOpen()) {
+            Map<Long, IndexInfo> indexes = dsr.getIndexes();
+            for (Entry<Long, IndexInfo> entry : indexes.entrySet()) {
+                IndexInfo value = entry.getValue();
+                ILSMIndex index = value.getIndex();
+                components.add(index.toString());
+            }
+            record = new CharArrayRecord();
+        } else {
+            record = null;
+        }
+        it = components.iterator();
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return it.hasNext();
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException, InterruptedException {
+        record.reset();
+        record.append(it.next().toCharArray());
+        record.endRecord();
+        return record;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java
new file mode 100644
index 0000000..a575ba4
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class DatasetResourcesRewriter extends FunctionRewriter {
+
+    // Parameters are dataverse name, and dataset name
+    public static final FunctionIdentifier DATASET_RESOURCES =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"dataset-resources", 2);
+    public static final DatasetResourcesRewriter INSTANCE = new 
DatasetResourcesRewriter(DATASET_RESOURCES);
+
+    private DatasetResourcesRewriter(FunctionIdentifier functionId) {
+        super(functionId);
+    }
+
+    @Override
+    public DatasetResourcesDatasource toDatasource(IOptimizationContext 
context, AbstractFunctionCallExpression f)
+            throws AlgebricksException {
+        String dataverseName = getString(f.getArguments(), 0);
+        String datasetName = getString(f.getArguments(), 1);
+        MetadataProvider metadataProvider = (MetadataProvider) 
context.getMetadataProvider();
+        Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Could not find dataset " + 
datasetName + " in dataverse " + dataverseName);
+        }
+        return new 
DatasetResourcesDatasource(context.getComputationNodeDomain(), 
dataset.getDatasetId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
new file mode 100644
index 0000000..c857ce0
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+
+public class DatasetRewriter implements IFunctionToDataSourceRewriter, 
IResultTypeComputer {
+    public static final DatasetRewriter INSTANCE = new DatasetRewriter();
+
+    private DatasetRewriter() {
+    }
+
+    @Override
+    public boolean rewrite(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) throws AlgebricksException {
+        AbstractFunctionCallExpression f = 
UnnestToDataScanRule.getFunctionCall(opRef);
+        UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+        if (unnest.getPositionalVariable() != null) {
+            // TODO remove this after enabling the support of positional 
variables in data scan
+            throw new AlgebricksException("No positional variables are allowed 
over datasets.");
+        }
+        ILogicalExpression expr = f.getArguments().get(0).getValue();
+        if (expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+            return false;
+        }
+        ConstantExpression ce = (ConstantExpression) expr;
+        IAlgebricksConstantValue acv = ce.getValue();
+        if (!(acv instanceof AsterixConstantValue)) {
+            return false;
+        }
+        AsterixConstantValue acv2 = (AsterixConstantValue) acv;
+        if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
+            return false;
+        }
+        String datasetArg = ((AString) acv2.getObject()).getStringValue();
+        MetadataProvider metadataProvider = (MetadataProvider) 
context.getMetadataProvider();
+        Pair<String, String> datasetReference = 
parseDatasetReference(metadataProvider, datasetArg);
+        String dataverseName = datasetReference.first;
+        String datasetName = datasetReference.second;
+        Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Could not find dataset " + 
datasetName + " in dataverse " + dataverseName);
+        }
+        DataSourceId asid = new DataSourceId(dataverseName, datasetName);
+        List<LogicalVariable> variables = new ArrayList<>();
+        if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            int numPrimaryKeys = dataset.getPrimaryKeys().size();
+            for (int i = 0; i < numPrimaryKeys; i++) {
+                variables.add(context.newVar());
+            }
+        }
+        variables.add(unnest.getVariable());
+        DataSource dataSource = metadataProvider.findDataSource(asid);
+        boolean hasMeta = dataSource.hasMeta();
+        if (hasMeta) {
+            variables.add(context.newVar());
+        }
+        DataSourceScanOperator scan = new DataSourceScanOperator(variables, 
dataSource);
+        List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+        scanInpList.addAll(unnest.getInputs());
+        opRef.setValue(scan);
+        addPrimaryKey(variables, dataSource, context);
+        context.computeAndSetTypeEnvironmentForOperator(scan);
+        // Adds equivalence classes --- one equivalent class between a primary 
key
+        // variable and a record field-access expression.
+        IAType[] schemaTypes = dataSource.getSchemaTypes();
+        ARecordType recordType =
+                (ARecordType) (hasMeta ? schemaTypes[schemaTypes.length - 2] : 
schemaTypes[schemaTypes.length - 1]);
+        ARecordType metaRecordType = (ARecordType) (hasMeta ? 
schemaTypes[schemaTypes.length - 1] : null);
+        EquivalenceClassUtils.addEquivalenceClassesForPrimaryIndexAccess(scan, 
variables, recordType, metaRecordType,
+                dataset, context);
+        return true;
+    }
+
+    private Pair<String, String> parseDatasetReference(MetadataProvider 
metadataProvider, String datasetArg)
+            throws AlgebricksException {
+        String[] datasetNameComponents = datasetArg.split("\\.");
+        String dataverseName;
+        String datasetName;
+        if (datasetNameComponents.length == 1) {
+            Dataverse defaultDataverse = 
metadataProvider.getDefaultDataverse();
+            if (defaultDataverse == null) {
+                throw new AlgebricksException("Unresolved dataset " + 
datasetArg + " Dataverse not specified.");
+            }
+            dataverseName = defaultDataverse.getDataverseName();
+            datasetName = datasetNameComponents[0];
+        } else {
+            dataverseName = datasetNameComponents[0];
+            datasetName = datasetNameComponents[1];
+        }
+        return new Pair<>(dataverseName, datasetName);
+    }
+
+    private void addPrimaryKey(List<LogicalVariable> scanVariables, DataSource 
dataSource,
+            IOptimizationContext context) {
+        List<LogicalVariable> primaryKey = 
dataSource.getPrimaryKeyVariables(scanVariables);
+        List<LogicalVariable> tail = new ArrayList<>();
+        tail.addAll(scanVariables);
+        FunctionalDependency pk = new FunctionalDependency(primaryKey, tail);
+        context.addPrimaryKey(pk);
+    }
+
+    @Override
+    public IAType computeType(ILogicalExpression expression, 
IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
+            throws AlgebricksException {
+        AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) 
expression;
+        if (f.getArguments().size() != 1) {
+            throw new AlgebricksException("dataset arity is 1, not " + 
f.getArguments().size());
+        }
+        ILogicalExpression a1 = f.getArguments().get(0).getValue();
+        IAType t1 = (IAType) env.getType(a1);
+        if (t1.getTypeTag() == ATypeTag.ANY) {
+            return BuiltinType.ANY;
+        }
+        if (t1.getTypeTag() != ATypeTag.STRING) {
+            throw new AlgebricksException("Illegal type " + t1 + " for 
dataset() argument.");
+        }
+        String datasetArg = ConstantExpressionUtil.getStringConstant(a1);
+        if (datasetArg == null) {
+            return BuiltinType.ANY;
+        }
+        MetadataProvider metadata = (MetadataProvider) mp;
+        Pair<String, String> datasetInfo = 
DatasetUtil.getDatasetInfo(metadata, datasetArg);
+        String dataverseName = datasetInfo.first;
+        String datasetName = datasetInfo.second;
+        if (dataverseName == null) {
+            throw new AlgebricksException("Unspecified dataverse!");
+        }
+        Dataset dataset = metadata.findDataset(dataverseName, datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Could not find dataset " + 
datasetName + " in dataverse " + dataverseName);
+        }
+        String tn = dataset.getItemTypeName();
+        IAType t2 = metadata.findType(dataset.getItemTypeDataverseName(), tn);
+        if (t2 == null) {
+            throw new AlgebricksException("No type for dataset " + 
datasetName);
+        }
+        return t2;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
new file mode 100644
index 0000000..ee3976e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FeedDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.asterix.translator.util.PlanTranslationUtil;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+public class FeedRewriter implements IFunctionToDataSourceRewriter, 
IResultTypeComputer {
+    public static final FeedRewriter INSTANCE = new FeedRewriter();
+
+    private FeedRewriter() {
+    }
+
+    @Override
+    public boolean rewrite(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context) throws AlgebricksException {
+        AbstractFunctionCallExpression f = 
UnnestToDataScanRule.getFunctionCall(opRef);
+        UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+        if (unnest.getPositionalVariable() != null) {
+            throw new AlgebricksException("No positional variables are allowed 
over feeds.");
+        }
+        String dataverse = ConstantExpressionUtil.getStringArgument(f, 0);
+        String sourceFeedName = ConstantExpressionUtil.getStringArgument(f, 1);
+        String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 2);
+        String subscriptionLocation = 
ConstantExpressionUtil.getStringArgument(f, 3);
+        String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
+        String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
+        MetadataProvider metadataProvider = (MetadataProvider) 
context.getMetadataProvider();
+        DataSourceId asid = new DataSourceId(dataverse, getTargetFeed);
+        String policyName = 
metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
+        FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse, 
policyName);
+        if (policy == null) {
+            policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
+            if (policy == null) {
+                throw new AlgebricksException("Unknown feed policy:" + 
policyName);
+            }
+        }
+        ArrayList<LogicalVariable> feedDataScanOutputVariables = new 
ArrayList<>();
+        String csLocations = 
metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
+        List<LogicalVariable> pkVars = new ArrayList<>();
+        FeedDataSource ds = createFeedDataSource(asid, targetDataset, 
sourceFeedName, subscriptionLocation,
+                metadataProvider, policy, outputType, csLocations, 
unnest.getVariable(), context, pkVars);
+        // The order for feeds is <Record-Meta-PK>
+        feedDataScanOutputVariables.add(unnest.getVariable());
+        // Does it produce meta?
+        if (ds.hasMeta()) {
+            feedDataScanOutputVariables.add(context.newVar());
+        }
+        // Does it produce pk?
+        if (ds.isChange()) {
+            feedDataScanOutputVariables.addAll(pkVars);
+        }
+        DataSourceScanOperator scan = new 
DataSourceScanOperator(feedDataScanOutputVariables, ds);
+        List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+        scanInpList.addAll(unnest.getInputs());
+        opRef.setValue(scan);
+        context.computeAndSetTypeEnvironmentForOperator(scan);
+        return true;
+    }
+
+    private FeedDataSource createFeedDataSource(DataSourceId aqlId, String 
targetDataset, String sourceFeedName,
+            String subscriptionLocation, MetadataProvider metadataProvider, 
FeedPolicyEntity feedPolicy,
+            String outputType, String locations, LogicalVariable recordVar, 
IOptimizationContext context,
+            List<LogicalVariable> pkVars) throws AlgebricksException {
+        Dataset dataset = 
metadataProvider.findDataset(aqlId.getDataverseName(), targetDataset);
+        ARecordType feedOutputType = (ARecordType) 
metadataProvider.findType(aqlId.getDataverseName(), outputType);
+        Feed sourceFeed = metadataProvider.findFeed(aqlId.getDataverseName(), 
sourceFeedName);
+        FeedConnection feedConnection =
+                metadataProvider.findFeedConnection(aqlId.getDataverseName(), 
sourceFeedName, targetDataset);
+        ARecordType metaType = null;
+        // Does dataset have meta?
+        if (dataset.hasMetaPart()) {
+            String metaTypeName = 
FeedUtils.getFeedMetaTypeName(sourceFeed.getAdapterConfiguration());
+            if (metaTypeName == null) {
+                throw new AlgebricksException("Feed to a dataset with metadata 
doesn't have meta type specified");
+            }
+            String dataverseName = aqlId.getDataverseName();
+            if (metaTypeName.contains(".")) {
+                dataverseName = metaTypeName.substring(0, 
metaTypeName.indexOf('.'));
+                metaTypeName = 
metaTypeName.substring(metaTypeName.indexOf('.') + 1);
+            }
+            metaType = (ARecordType) metadataProvider.findType(dataverseName, 
metaTypeName);
+        }
+        // Is a change feed?
+        List<IAType> pkTypes = null;
+        List<List<String>> partitioningKeys = null;
+        List<Integer> keySourceIndicator = null;
+
+        List<ScalarFunctionCallExpression> 
keyAccessScalarFunctionCallExpression;
+        if 
(ExternalDataUtils.isChangeFeed(sourceFeed.getAdapterConfiguration())) {
+            List<Mutable<ILogicalExpression>> keyAccessExpression = new 
ArrayList<>();
+            keyAccessScalarFunctionCallExpression = new ArrayList<>();
+            pkTypes = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getPrimaryKeyType();
+            partitioningKeys = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getPartitioningKey();
+            if (dataset.hasMetaPart()) {
+                keySourceIndicator = ((InternalDatasetDetails) 
dataset.getDatasetDetails()).getKeySourceIndicator();
+            }
+            for (int i = 0; i < partitioningKeys.size(); i++) {
+                List<String> key = partitioningKeys.get(i);
+                if (keySourceIndicator == null || 
keySourceIndicator.get(i).intValue() == 0) {
+                    PlanTranslationUtil.prepareVarAndExpression(key, 
recordVar, pkVars, keyAccessExpression, null,
+                            context);
+                } else {
+                    PlanTranslationUtil.prepareMetaKeyAccessExpression(key, 
recordVar, keyAccessExpression, pkVars,
+                            null, context);
+                }
+            }
+            keyAccessExpression.forEach(
+                    expr -> 
keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression) 
expr.getValue()));
+        } else {
+            keyAccessScalarFunctionCallExpression = null;
+        }
+        FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider) 
context.getMetadataProvider(), sourceFeed,
+                aqlId, targetDataset, feedOutputType, metaType, pkTypes, 
keyAccessScalarFunctionCallExpression,
+                sourceFeed.getFeedId(), 
FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
+                context.getComputationNodeDomain(), feedConnection);
+        
feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY, 
feedPolicy);
+        return feedDataSource;
+    }
+
+    @Override
+    public IAType computeType(ILogicalExpression expression, 
IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
+            throws AlgebricksException {
+        AbstractFunctionCallExpression f = (AbstractFunctionCallExpression) 
expression;
+        if (f.getArguments().size() != 
BuiltinFunctions.FEED_COLLECT.getArity()) {
+            throw new AlgebricksException("Incorrect number of arguments -> 
arity is "
+                    + BuiltinFunctions.FEED_COLLECT.getArity() + ", not " + 
f.getArguments().size());
+        }
+        ILogicalExpression a1 = f.getArguments().get(5).getValue();
+        IAType t1 = (IAType) env.getType(a1);
+        if (t1.getTypeTag() == ATypeTag.ANY) {
+            return BuiltinType.ANY;
+        }
+        if (t1.getTypeTag() != ATypeTag.STRING) {
+            throw new AlgebricksException("Illegal type " + t1 + " for 
feed-ingest argument.");
+        }
+        String typeArg = ConstantExpressionUtil.getStringConstant(a1);
+        if (typeArg == null) {
+            return BuiltinType.ANY;
+        }
+        MetadataProvider metadata = (MetadataProvider) mp;
+        Pair<String, String> argInfo = DatasetUtil.getDatasetInfo(metadata, 
typeArg);
+        String dataverseName = argInfo.first;
+        String typeName = argInfo.second;
+        if (dataverseName == null) {
+            throw new AlgebricksException("Unspecified dataverse!");
+        }
+        IAType t2 = metadata.findType(dataverseName, typeName);
+        if (t2 == null) {
+            throw new AlgebricksException("Unknown type  " + typeName);
+        }
+        return t2;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
new file mode 100644
index 0000000..c73f8e8
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class FunctionReader implements IRecordReader<char[]> {
+
+    @Override
+    public void close() throws IOException {
+        // No Op for function reader
+    }
+
+    @Override
+    public boolean stop() {
+        return true;
+    }
+
+    @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+        // No Op for function reader
+    }
+
+    @Override
+    public void setFeedLogManager(FeedLogManager feedLogManager) throws 
HyracksDataException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java
new file mode 100644
index 0000000..2ff9282
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+public abstract class FunctionRewriter implements 
IFunctionToDataSourceRewriter {
+
+    private FunctionIdentifier functionId;
+
+    public FunctionRewriter(FunctionIdentifier functionId) {
+        this.functionId = functionId;
+    }
+
+    @Override
+    public final boolean rewrite(Mutable<ILogicalOperator> opRef, 
IOptimizationContext context)
+            throws AlgebricksException {
+        AbstractFunctionCallExpression f = 
UnnestToDataScanRule.getFunctionCall(opRef);
+        List<Mutable<ILogicalExpression>> args = f.getArguments();
+        if (args.size() != functionId.getArity()) {
+            throw new AlgebricksException("Function " + 
functionId.getNamespace() + "." + functionId.getName()
+                    + " expects " + functionId.getArity() + " arguments");
+        }
+        for (int i = 0; i < args.size(); i++) {
+            if (args.get(i).getValue().getExpressionTag() != 
LogicalExpressionTag.CONSTANT) {
+                throw new AlgebricksException("Function " + 
functionId.getNamespace() + "." + functionId.getName()
+                        + " expects constant arguments while arg[" + i + "] is 
of type "
+                        + args.get(i).getValue().getExpressionTag());
+            }
+        }
+        UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+        if (unnest.getPositionalVariable() != null) {
+            throw new AlgebricksException("No positional variables are allowed 
over datasource functions");
+        }
+        FunctionDataSource datasource = toDatasource(context, f);
+        List<LogicalVariable> variables = new ArrayList<>();
+        variables.add(unnest.getVariable());
+        DataSourceScanOperator scan = new DataSourceScanOperator(variables, 
datasource);
+        List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+        scanInpList.addAll(unnest.getInputs());
+        opRef.setValue(scan);
+        context.computeAndSetTypeEnvironmentForOperator(scan);
+        return true;
+    }
+
+    protected abstract FunctionDataSource toDatasource(IOptimizationContext 
context, AbstractFunctionCallExpression f)
+            throws AlgebricksException;
+
+    protected String getString(List<Mutable<ILogicalExpression>> args, int i) 
throws AlgebricksException {
+        ConstantExpression ce = (ConstantExpression) args.get(i).getValue();
+        IAlgebricksConstantValue acv = ce.getValue();
+        if (!(acv instanceof AsterixConstantValue)) {
+            throw new AlgebricksException("Expected arg[" + i + "] to be of 
type String");
+        }
+        AsterixConstantValue acv2 = (AsterixConstantValue) acv;
+        if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
+            throw new AlgebricksException("Expected arg[" + i + "] to be of 
type String");
+        }
+        return ((AString) acv2.getObject()).getStringValue();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java
new file mode 100644
index 0000000..7245b88
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class StorageComponentsDatasource extends FunctionDataSource {
+    private final int datasetId;
+
+    public StorageComponentsDatasource(INodeDomain domain, int datasetId) 
throws AlgebricksException {
+        super(new 
DataSourceId(StorageComponentsRewriter.STORAGE_COMPONENTS.getNamespace(),
+                StorageComponentsRewriter.STORAGE_COMPONENTS.getName()), 
domain);
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    protected IDatasourceFunction createFunction(MetadataProvider 
metadataProvider,
+            AlgebricksAbsolutePartitionConstraint locations) {
+        return new StorageComponentsFunction(locations, datasetId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java
new file mode 100644
index 0000000..73b2d0e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class StorageComponentsFunction extends AbstractDatasourceFunction {
+
+    private static final long serialVersionUID = 1L;
+    private final int datasetId;
+
+    public StorageComponentsFunction(AlgebricksAbsolutePartitionConstraint 
locations, int datasetId) {
+        super(locations);
+        this.datasetId = datasetId;
+    }
+
+    @Override
+    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, 
int partition)
+            throws HyracksDataException {
+        INCServiceContext serviceCtx = 
ctx.getJobletContext().getServiceContext();
+        INcApplicationContext appCtx = (INcApplicationContext) 
serviceCtx.getApplicationContext();
+        DatasetLifecycleManager dsLifecycleMgr = (DatasetLifecycleManager) 
appCtx.getDatasetLifecycleManager();
+        DatasetResource dsr = dsLifecycleMgr.getDatasetLifecycle(datasetId);
+        return new StorageComponentsReader(dsr);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java
new file mode 100644
index 0000000..4958d14
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+
+public class StorageComponentsReader extends FunctionReader {
+
+    private final List<String> components;
+    private final Iterator<String> it;
+    private final CharArrayRecord record;
+
+    public StorageComponentsReader(DatasetResource dsr) throws 
HyracksDataException {
+        components = new ArrayList<>();
+        if (dsr != null && dsr.isOpen()) {
+            Map<Long, IndexInfo> indexes = dsr.getIndexes();
+            StringBuilder strBuilder = new StringBuilder();
+            for (Entry<Long, IndexInfo> entry : indexes.entrySet()) {
+                strBuilder.setLength(0);
+                IndexInfo value = entry.getValue();
+                ILSMIndex index = value.getIndex();
+                String path = value.getLocalResource().getPath();
+                strBuilder.append('{');
+                strBuilder.append("\"path\":\"");
+                strBuilder.append(path);
+                strBuilder.append("\", \"components\":[");
+                // syncronize over the opTracker
+                synchronized (index.getOperationTracker()) {
+                    List<ILSMDiskComponent> diskComponents = 
index.getDiskComponents();
+                    for (int i = diskComponents.size() - 1; i >= 0; i--) {
+                        if (i < diskComponents.size() - 1) {
+                            strBuilder.append(',');
+                        }
+                        ILSMDiskComponent c = diskComponents.get(i);
+                        LSMComponentId id = (LSMComponentId) c.getId();
+                        strBuilder.append('{');
+                        strBuilder.append("\"min\":");
+                        strBuilder.append(id.getMinId());
+                        strBuilder.append(",\"max\":");
+                        strBuilder.append(id.getMaxId());
+                        strBuilder.append('}');
+                    }
+                }
+                strBuilder.append("]}");
+                components.add(strBuilder.toString());
+            }
+            record = new CharArrayRecord();
+        } else {
+            record = null;
+        }
+        it = components.iterator();
+    }
+
+    @Override
+    public boolean hasNext() throws Exception {
+        return it.hasNext();
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException, InterruptedException {
+        record.reset();
+        record.append(it.next().toCharArray());
+        record.endRecord();
+        return record;
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java
new file mode 100644
index 0000000..89bd115
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class StorageComponentsRewriter extends FunctionRewriter {
+
+    // Parameters are dataverse name, and dataset name
+    public static final FunctionIdentifier STORAGE_COMPONENTS =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"storage-components", 2);
+    public static final StorageComponentsRewriter INSTANCE = new 
StorageComponentsRewriter(STORAGE_COMPONENTS);
+
+    private StorageComponentsRewriter(FunctionIdentifier functionId) {
+        super(functionId);
+    }
+
+    @Override
+    public StorageComponentsDatasource toDatasource(IOptimizationContext 
context, AbstractFunctionCallExpression f)
+            throws AlgebricksException {
+        String dataverseName = getString(f.getArguments(), 0);
+        String datasetName = getString(f.getArguments(), 1);
+        MetadataProvider metadataProvider = (MetadataProvider) 
context.getMetadataProvider();
+        Dataset dataset = metadataProvider.findDataset(dataverseName, 
datasetName);
+        if (dataset == null) {
+            throw new AlgebricksException("Could not find dataset " + 
datasetName + " in dataverse " + dataverseName);
+        }
+        return new 
StorageComponentsDatasource(context.getComputationNodeDomain(), 
dataset.getDatasetId());
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 670b2bd..622e28f 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -82,6 +82,7 @@ import 
org.apache.asterix.runtime.job.resource.JobCapacityController;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
 import org.apache.asterix.translator.IStatementExecutorContext;
 import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.util.MetadataBuiltinFunctions;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -137,6 +138,7 @@ public class CCApplication extends BaseCCApplication {
         String strIP = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
         int port = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
         hcc = new HyracksConnection(strIP, port);
+        MetadataBuiltinFunctions.init();
         ILibraryManager libraryManager = new ExternalLibraryManager();
         ReplicationProperties repProp = new ReplicationProperties(
                 PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index a05b2bb..e1a75cb 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -45,6 +45,7 @@ import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
 import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.asterix.util.MetadataBuiltinFunctions;
 import org.apache.asterix.utils.CompatibilityUtil;
 import org.apache.hyracks.api.application.INCServiceContext;
 import org.apache.hyracks.api.application.IServiceContext;
@@ -104,6 +105,7 @@ public class NCApplication extends BaseNCApplication {
             System.setProperty("java.rmi.server.hostname",
                     
(controllerService).getConfiguration().getClusterPublicAddress());
         }
+        MetadataBuiltinFunctions.init();
         runtimeContext = new NCAppRuntimeContext(ncServiceCtx, 
getExtensions());
         MetadataProperties metadataProperties = 
runtimeContext.getMetadataProperties();
         if 
(!metadataProperties.getNodeNames().contains(this.ncServiceCtx.getNodeId())) {

Reply via email to