[ASTERIXDB-2194][COMP] Introduce datasource functions
- user model changes: yes
Some functions can be datasources
- storage format changes: no
- interface changes: yes
- Add IDatasourceFunction: A function that is also a datasource
- Add IFunctionToDataSourceTransformer: transform an unnest
function into a datascan during compilation
Details:
- Currently, functions are location agnostic and are run on
parameters that are either passed through them during compile
time or runtime.
- An exception to this is the dataset function which has
an associated location constraints running on the nodes
which host the dataset.
- In this change, we introduce a general framework that allows
creation of new functions similar to the dataset function.
- Such functions are called datasource Functions.
- A datasource function takes constant parameters and run on
a set of partitions similar to the dataset function.
- The first example of such functions is the DatasetResources
function.
- The DatasetResources function takes two parameters, a dataverse
and a dataset. It is then run on all nodes and returns a set
of dataset resources.
- Test cases are added for this function.
Change-Id: Ibcf807ac713a21e8f4d59868525467386e801303
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2216
Sonar-Qube: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: abdullah alamoudi <[email protected]>
Tested-by: abdullah alamoudi <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/b4d166b3
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/b4d166b3
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/b4d166b3
Branch: refs/heads/master
Commit: b4d166b3ca042ce34d737f5d2a4fb758fa45d3e5
Parents: 26cc908
Author: Abdullah Alamoudi <[email protected]>
Authored: Sun Dec 17 11:34:33 2017 +0300
Committer: abdullah alamoudi <[email protected]>
Committed: Sun Dec 17 05:00:40 2017 -0800
----------------------------------------------------------------------
.../asterix/optimizer/base/AnalysisUtil.java | 14 +-
.../rules/MetaFunctionToMetaVariableRule.java | 3 +-
.../optimizer/rules/UnnestToDataScanRule.java | 244 +------------------
...IntroducePrimaryIndexForAggregationRule.java | 8 +-
.../function/DatasetResourcesDatasource.java | 43 ++++
.../app/function/DatasetResourcesFunction.java | 49 ++++
.../app/function/DatasetResourcesReader.java | 68 ++++++
.../app/function/DatasetResourcesRewriter.java | 52 ++++
.../asterix/app/function/DatasetRewriter.java | 192 +++++++++++++++
.../asterix/app/function/FeedRewriter.java | 209 ++++++++++++++++
.../asterix/app/function/FunctionReader.java | 55 +++++
.../asterix/app/function/FunctionRewriter.java | 99 ++++++++
.../function/StorageComponentsDatasource.java | 43 ++++
.../app/function/StorageComponentsFunction.java | 51 ++++
.../app/function/StorageComponentsReader.java | 96 ++++++++
.../app/function/StorageComponentsRewriter.java | 52 ++++
.../hyracks/bootstrap/CCApplication.java | 2 +
.../hyracks/bootstrap/NCApplication.java | 2 +
.../asterix/util/MetadataBuiltinFunctions.java | 60 +++++
.../apache/asterix/utils/FeedOperations.java | 4 +-
.../dataset-resources.1.ddl.sqlpp | 50 ++++
.../dataset-resources.2.update.sqlpp | 24 ++
.../dataset-resources.3.query.sqlpp | 20 ++
.../dataset-resources.4.query.sqlpp | 20 ++
.../dataset-resources.5.query.sqlpp | 21 ++
.../dataset-resources/dataset-resources.3.adm | 1 +
.../dataset-resources/dataset-resources.4.adm | 1 +
.../dataset-resources/dataset-resources.5.adm | 1 +
.../resources/runtimets/testsuite_sqlpp.xml | 5 +
.../adapter/factory/GenericAdapterFactory.java | 19 ++
.../asterix/external/util/FeedConstants.java | 5 -
.../metadata/api/IDatasourceFunction.java | 48 ++++
.../bootstrap/MetadataBuiltinEntities.java | 6 +-
.../declared/AbstractDatasourceFunction.java | 37 +++
.../asterix/metadata/declared/DataSource.java | 1 +
.../metadata/declared/FunctionDataSource.java | 91 +++++++
.../declared/FunctionDataSourceFactory.java | 74 ++++++
.../functions/MetadataBuiltinFunctions.java | 185 --------------
.../asterix/metadata/utils/DatasetUtil.java | 15 ++
.../asterix/om/functions/BuiltinFunctions.java | 27 +-
.../IFunctionToDataSourceRewriter.java | 40 +++
.../org/apache/asterix/om/utils/RecordUtil.java | 5 +-
42 files changed, 1574 insertions(+), 468 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
index 24fe8e78..8dca64b 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java
@@ -84,14 +84,6 @@ public class AnalysisUtil {
return fieldAccessFunctions.contains(fid);
}
- public static boolean isDataSetCall(ILogicalExpression e) {
- if (((AbstractLogicalExpression) e).getExpressionTag() !=
LogicalExpressionTag.FUNCTION_CALL) {
- return false;
- }
- AbstractFunctionCallExpression fe = (AbstractFunctionCallExpression) e;
- return BuiltinFunctions.isDatasetFunction(fe.getFunctionIdentifier());
- }
-
public static boolean isRunnableAccessToFieldRecord(ILogicalExpression
expr) {
if (expr.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
AbstractFunctionCallExpression fc =
(AbstractFunctionCallExpression) expr;
@@ -129,17 +121,17 @@ public class AnalysisUtil {
public static Pair<String, String>
getDatasetInfo(AbstractDataSourceOperator op) throws AlgebricksException {
DataSourceId srcId = (DataSourceId) op.getDataSource().getId();
- return new Pair<String, String>(srcId.getDataverseName(),
srcId.getDatasourceName());
+ return new Pair<>(srcId.getDataverseName(), srcId.getDatasourceName());
}
public static Pair<String, String>
getExternalDatasetInfo(UnnestMapOperator op) throws AlgebricksException {
AbstractFunctionCallExpression unnestExpr =
(AbstractFunctionCallExpression) op.getExpressionRef().getValue();
String dataverseName =
AccessMethodUtils.getStringConstant(unnestExpr.getArguments().get(0));
String datasetName =
AccessMethodUtils.getStringConstant(unnestExpr.getArguments().get(1));
- return new Pair<String, String>(dataverseName, datasetName);
+ return new Pair<>(dataverseName, datasetName);
}
- private static List<FunctionIdentifier> fieldAccessFunctions = new
ArrayList<FunctionIdentifier>();
+ private static List<FunctionIdentifier> fieldAccessFunctions = new
ArrayList<>();
static {
fieldAccessFunctions.add(BuiltinFunctions.GET_DATA);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
index f0917af..13ff0ee 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/MetaFunctionToMetaVariableRule.java
@@ -87,7 +87,8 @@ public class MetaFunctionToMetaVariableRule implements
IAlgebraicRewriteRule {
// https://issues.apache.org/jira/browse/ASTERIXDB-1618
if (dataSource.getDatasourceType() !=
DataSource.Type.EXTERNAL_DATASET
&& dataSource.getDatasourceType() !=
DataSource.Type.INTERNAL_DATASET
- && dataSource.getDatasourceType() !=
DataSource.Type.LOADABLE) {
+ && dataSource.getDatasourceType() !=
DataSource.Type.LOADABLE
+ && dataSource.getDatasourceType() !=
DataSource.Type.FUNCTION) {
IMutationDataSource mds = (IMutationDataSource) dataSource;
if (mds.isChange()) {
transformers = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
index 4550ba6..0d02385 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/UnnestToDataScanRule.java
@@ -18,52 +18,17 @@
*/
package org.apache.asterix.optimizer.rules;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.asterix.common.config.DatasetConfig.DatasetType;
-import org.apache.asterix.external.feed.watch.FeedActivityDetails;
-import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
-import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
-import org.apache.asterix.metadata.declared.DataSource;
-import org.apache.asterix.metadata.declared.DataSourceId;
-import org.apache.asterix.metadata.declared.FeedDataSource;
-import org.apache.asterix.metadata.declared.MetadataProvider;
-import org.apache.asterix.metadata.entities.Dataset;
-import org.apache.asterix.metadata.entities.Dataverse;
-import org.apache.asterix.metadata.entities.Feed;
-import org.apache.asterix.metadata.entities.FeedConnection;
-import org.apache.asterix.metadata.entities.FeedPolicyEntity;
-import org.apache.asterix.metadata.entities.InternalDatasetDetails;
-import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
-import org.apache.asterix.om.base.AString;
-import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.BuiltinFunctions;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.utils.ConstantExpressionUtil;
-import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
-import org.apache.asterix.translator.util.PlanTranslationUtil;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
-import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
-import
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
-import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
-import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
-import
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public class UnnestToDataScanRule implements IAlgebraicRewriteRule {
@@ -77,213 +42,24 @@ public class UnnestToDataScanRule implements
IAlgebraicRewriteRule {
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef,
IOptimizationContext context)
throws AlgebricksException {
+ AbstractFunctionCallExpression f = getFunctionCall(opRef);
+ if (f == null) {
+ return false;
+ }
+ return
BuiltinFunctions.getDatasourceTransformer(f.getFunctionIdentifier()).rewrite(opRef,
context);
+ }
+
+ public static AbstractFunctionCallExpression
getFunctionCall(Mutable<ILogicalOperator> opRef) {
AbstractLogicalOperator op = (AbstractLogicalOperator)
opRef.getValue();
if (op.getOperatorTag() != LogicalOperatorTag.UNNEST) {
- return false;
+ return null;
}
UnnestOperator unnest = (UnnestOperator) op;
ILogicalExpression unnestExpr = unnest.getExpressionRef().getValue();
if (unnestExpr.getExpressionTag() !=
LogicalExpressionTag.FUNCTION_CALL) {
- return false;
- }
- return handleFunction(opRef, context, unnest,
(AbstractFunctionCallExpression) unnestExpr);
- }
-
- protected boolean handleFunction(Mutable<ILogicalOperator> opRef,
IOptimizationContext context,
- UnnestOperator unnest, AbstractFunctionCallExpression f) throws
AlgebricksException {
- FunctionIdentifier fid = f.getFunctionIdentifier();
- if (fid.equals(BuiltinFunctions.DATASET)) {
- if (unnest.getPositionalVariable() != null) {
- // TODO remove this after enabling the support of positional
variables in data scan
- throw new AlgebricksException("No positional variables are
allowed over datasets.");
- }
- ILogicalExpression expr = f.getArguments().get(0).getValue();
- if (expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
- return false;
- }
- ConstantExpression ce = (ConstantExpression) expr;
- IAlgebricksConstantValue acv = ce.getValue();
- if (!(acv instanceof AsterixConstantValue)) {
- return false;
- }
- AsterixConstantValue acv2 = (AsterixConstantValue) acv;
- if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
- return false;
- }
- String datasetArg = ((AString) acv2.getObject()).getStringValue();
- MetadataProvider metadataProvider = (MetadataProvider)
context.getMetadataProvider();
- Pair<String, String> datasetReference =
parseDatasetReference(metadataProvider, datasetArg);
- String dataverseName = datasetReference.first;
- String datasetName = datasetReference.second;
- Dataset dataset = metadataProvider.findDataset(dataverseName,
datasetName);
- if (dataset == null) {
- throw new AlgebricksException(
- "Could not find dataset " + datasetName + " in
dataverse " + dataverseName);
- }
- DataSourceId asid = new DataSourceId(dataverseName, datasetName);
- List<LogicalVariable> variables = new ArrayList<>();
- if (dataset.getDatasetType() == DatasetType.INTERNAL) {
- int numPrimaryKeys = dataset.getPrimaryKeys().size();
- for (int i = 0; i < numPrimaryKeys; i++) {
- variables.add(context.newVar());
- }
- }
- variables.add(unnest.getVariable());
- DataSource dataSource = metadataProvider.findDataSource(asid);
- boolean hasMeta = dataSource.hasMeta();
- if (hasMeta) {
- variables.add(context.newVar());
- }
- DataSourceScanOperator scan = new
DataSourceScanOperator(variables, dataSource);
- List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
- scanInpList.addAll(unnest.getInputs());
- opRef.setValue(scan);
- addPrimaryKey(variables, dataSource, context);
- context.computeAndSetTypeEnvironmentForOperator(scan);
- // Adds equivalence classes --- one equivalent class between a
primary key
- // variable and a record field-access expression.
- IAType[] schemaTypes = dataSource.getSchemaTypes();
- ARecordType recordType =
- (ARecordType) (hasMeta ? schemaTypes[schemaTypes.length -
2] : schemaTypes[schemaTypes.length - 1]);
- ARecordType metaRecordType = (ARecordType) (hasMeta ?
schemaTypes[schemaTypes.length - 1] : null);
-
EquivalenceClassUtils.addEquivalenceClassesForPrimaryIndexAccess(scan,
variables, recordType,
- metaRecordType, dataset, context);
- return true;
- } else if (fid.equals(BuiltinFunctions.FEED_COLLECT)) {
- if (unnest.getPositionalVariable() != null) {
- throw new AlgebricksException("No positional variables are
allowed over feeds.");
- }
- String dataverse = ConstantExpressionUtil.getStringArgument(f, 0);
- String sourceFeedName =
ConstantExpressionUtil.getStringArgument(f, 1);
- String getTargetFeed = ConstantExpressionUtil.getStringArgument(f,
2);
- String subscriptionLocation =
ConstantExpressionUtil.getStringArgument(f, 3);
- String targetDataset = ConstantExpressionUtil.getStringArgument(f,
4);
- String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
- MetadataProvider metadataProvider = (MetadataProvider)
context.getMetadataProvider();
- DataSourceId asid = new DataSourceId(dataverse, getTargetFeed);
- String policyName =
metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
- FeedPolicyEntity policy =
metadataProvider.findFeedPolicy(dataverse, policyName);
- if (policy == null) {
- policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
- if (policy == null) {
- throw new AlgebricksException("Unknown feed policy:" +
policyName);
- }
- }
- ArrayList<LogicalVariable> feedDataScanOutputVariables = new
ArrayList<>();
- String csLocations =
metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
- List<LogicalVariable> pkVars = new ArrayList<>();
- FeedDataSource ds = createFeedDataSource(asid, targetDataset,
sourceFeedName, subscriptionLocation,
- metadataProvider, policy, outputType, csLocations,
unnest.getVariable(), context, pkVars);
- // The order for feeds is <Record-Meta-PK>
- feedDataScanOutputVariables.add(unnest.getVariable());
- // Does it produce meta?
- if (ds.hasMeta()) {
- feedDataScanOutputVariables.add(context.newVar());
- }
- // Does it produce pk?
- if (ds.isChange()) {
- feedDataScanOutputVariables.addAll(pkVars);
- }
- DataSourceScanOperator scan = new
DataSourceScanOperator(feedDataScanOutputVariables, ds);
- List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
- scanInpList.addAll(unnest.getInputs());
- opRef.setValue(scan);
- context.computeAndSetTypeEnvironmentForOperator(scan);
- return true;
- }
- return false;
- }
-
- private void addPrimaryKey(List<LogicalVariable> scanVariables, DataSource
dataSource,
- IOptimizationContext context) {
- List<LogicalVariable> primaryKey =
dataSource.getPrimaryKeyVariables(scanVariables);
- List<LogicalVariable> tail = new ArrayList<>();
- tail.addAll(scanVariables);
- FunctionalDependency pk = new FunctionalDependency(primaryKey, tail);
- context.addPrimaryKey(pk);
- }
-
- private FeedDataSource createFeedDataSource(DataSourceId aqlId, String
targetDataset, String sourceFeedName,
- String subscriptionLocation, MetadataProvider metadataProvider,
FeedPolicyEntity feedPolicy,
- String outputType, String locations, LogicalVariable recordVar,
IOptimizationContext context,
- List<LogicalVariable> pkVars) throws AlgebricksException {
- if
(!aqlId.getDataverseName().equals(metadataProvider.getDefaultDataverse() ==
null ? null
- : metadataProvider.getDefaultDataverse().getDataverseName())) {
return null;
}
- Dataset dataset =
metadataProvider.findDataset(aqlId.getDataverseName(), targetDataset);
- ARecordType feedOutputType = (ARecordType)
metadataProvider.findType(aqlId.getDataverseName(), outputType);
- Feed sourceFeed = metadataProvider.findFeed(aqlId.getDataverseName(),
sourceFeedName);
- FeedConnection feedConnection =
- metadataProvider.findFeedConnection(aqlId.getDataverseName(),
sourceFeedName, targetDataset);
- ARecordType metaType = null;
- // Does dataset have meta?
- if (dataset.hasMetaPart()) {
- String metaTypeName =
FeedUtils.getFeedMetaTypeName(sourceFeed.getAdapterConfiguration());
- if (metaTypeName == null) {
- throw new AlgebricksException("Feed to a dataset with metadata
doesn't have meta type specified");
- }
- String dataverseName = aqlId.getDataverseName();
- if (metaTypeName.contains(".")) {
- dataverseName = metaTypeName.substring(0,
metaTypeName.indexOf('.'));
- metaTypeName =
metaTypeName.substring(metaTypeName.indexOf('.') + 1);
- }
- metaType = (ARecordType) metadataProvider.findType(dataverseName,
metaTypeName);
- }
- // Is a change feed?
- List<IAType> pkTypes = null;
- List<List<String>> partitioningKeys = null;
- List<Integer> keySourceIndicator = null;
- List<Mutable<ILogicalExpression>> keyAccessExpression = null;
- List<ScalarFunctionCallExpression>
keyAccessScalarFunctionCallExpression;
- if
(ExternalDataUtils.isChangeFeed(sourceFeed.getAdapterConfiguration())) {
- keyAccessExpression = new ArrayList<>();
- keyAccessScalarFunctionCallExpression = new ArrayList<>();
- pkTypes = ((InternalDatasetDetails)
dataset.getDatasetDetails()).getPrimaryKeyType();
- partitioningKeys = ((InternalDatasetDetails)
dataset.getDatasetDetails()).getPartitioningKey();
- if (dataset.hasMetaPart()) {
- keySourceIndicator = ((InternalDatasetDetails)
dataset.getDatasetDetails()).getKeySourceIndicator();
- }
- for (int i = 0; i < partitioningKeys.size(); i++) {
- List<String> key = partitioningKeys.get(i);
- if (keySourceIndicator == null ||
keySourceIndicator.get(i).intValue() == 0) {
- PlanTranslationUtil.prepareVarAndExpression(key,
recordVar, pkVars, keyAccessExpression, null,
- context);
- } else {
- PlanTranslationUtil.prepareMetaKeyAccessExpression(key,
recordVar, keyAccessExpression, pkVars,
- null, context);
- }
- }
- keyAccessExpression.forEach(
- expr ->
keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression)
expr.getValue()));
- } else {
- keyAccessExpression = null;
- keyAccessScalarFunctionCallExpression = null;
- }
- FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider)
context.getMetadataProvider(), sourceFeed,
- aqlId, targetDataset, feedOutputType, metaType, pkTypes,
keyAccessScalarFunctionCallExpression,
- sourceFeed.getFeedId(),
FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
- context.getComputationNodeDomain(), feedConnection);
-
feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY,
feedPolicy);
- return feedDataSource;
+ return (AbstractFunctionCallExpression) unnestExpr;
}
- private Pair<String, String> parseDatasetReference(MetadataProvider
metadataProvider, String datasetArg)
- throws AlgebricksException {
- String[] datasetNameComponents = datasetArg.split("\\.");
- String dataverseName;
- String datasetName;
- if (datasetNameComponents.length == 1) {
- Dataverse defaultDataverse =
metadataProvider.getDefaultDataverse();
- if (defaultDataverse == null) {
- throw new AlgebricksException("Unresolved dataset " +
datasetArg + " Dataverse not specified.");
- }
- dataverseName = defaultDataverse.getDataverseName();
- datasetName = datasetNameComponents[0];
- } else {
- dataverseName = datasetNameComponents[0];
- datasetName = datasetNameComponents[1];
- }
- return new Pair<>(dataverseName, datasetName);
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
index 7633f4c..eaea208 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/am/IntroducePrimaryIndexForAggregationRule.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Set;
import org.apache.asterix.common.config.DatasetConfig;
+import org.apache.asterix.metadata.declared.DataSource;
import org.apache.asterix.metadata.declared.DatasetDataSource;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
@@ -261,7 +262,12 @@ public class IntroducePrimaryIndexForAggregationRule
implements IAlgebraicRewrit
Dataset dataset;
// case 1: dataset scan
if (scanOperator.getOperatorTag() ==
LogicalOperatorTag.DATASOURCESCAN) {
- dataset =
((DatasetDataSource)((DataSourceScanOperator)scanOperator).getDataSource()).getDataset();
+ DataSourceScanOperator dss = (DataSourceScanOperator) scanOperator;
+ DataSource ds = (DataSource) dss.getDataSource();
+ if (ds.getDatasourceType() != DataSource.Type.INTERNAL_DATASET) {
+ return null;
+ }
+ dataset = ((DatasetDataSource) ds).getDataset();
} else {
// case 2: dataset range search
AbstractFunctionCallExpression primaryIndexFunctionCall =
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java
new file mode 100644
index 0000000..e6b025a
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesDatasource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class DatasetResourcesDatasource extends FunctionDataSource {
+ private final int datasetId;
+
+ public DatasetResourcesDatasource(INodeDomain domain, int datasetId)
throws AlgebricksException {
+ super(new
DataSourceId(DatasetResourcesRewriter.DATASET_RESOURCES.getNamespace(),
+ DatasetResourcesRewriter.DATASET_RESOURCES.getName()), domain);
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ protected IDatasourceFunction createFunction(MetadataProvider
metadataProvider,
+ AlgebricksAbsolutePartitionConstraint locations) {
+ return new DatasetResourcesFunction(locations, datasetId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java
new file mode 100644
index 0000000..05d192e
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesFunction.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public class DatasetResourcesFunction extends AbstractDatasourceFunction {
+
+ private static final long serialVersionUID = 1L;
+ private final int datasetId;
+
+ public DatasetResourcesFunction(AlgebricksAbsolutePartitionConstraint
locations, int datasetId) {
+ super(locations);
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx,
int partition) {
+ INCServiceContext serviceCtx =
ctx.getJobletContext().getServiceContext();
+ INcApplicationContext appCtx = (INcApplicationContext)
serviceCtx.getApplicationContext();
+ DatasetLifecycleManager dsLifecycleMgr = (DatasetLifecycleManager)
appCtx.getDatasetLifecycleManager();
+ DatasetResource dsr = dsLifecycleMgr.getDatasetLifecycle(datasetId);
+ return new DatasetResourcesReader(dsr);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java
new file mode 100644
index 0000000..bce2002
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class DatasetResourcesReader extends FunctionReader {
+
+ private final List<String> components;
+ private final Iterator<String> it;
+ private final CharArrayRecord record;
+
+ public DatasetResourcesReader(DatasetResource dsr) {
+ components = new ArrayList<>();
+ if (dsr != null && dsr.isOpen()) {
+ Map<Long, IndexInfo> indexes = dsr.getIndexes();
+ for (Entry<Long, IndexInfo> entry : indexes.entrySet()) {
+ IndexInfo value = entry.getValue();
+ ILSMIndex index = value.getIndex();
+ components.add(index.toString());
+ }
+ record = new CharArrayRecord();
+ } else {
+ record = null;
+ }
+ it = components.iterator();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return it.hasNext();
+ }
+
+ @Override
+ public IRawRecord<char[]> next() throws IOException, InterruptedException {
+ record.reset();
+ record.append(it.next().toCharArray());
+ record.endRecord();
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java
new file mode 100644
index 0000000..a575ba4
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetResourcesRewriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class DatasetResourcesRewriter extends FunctionRewriter {
+
+ // Parameters are dataverse name, and dataset name
+ public static final FunctionIdentifier DATASET_RESOURCES =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"dataset-resources", 2);
+ public static final DatasetResourcesRewriter INSTANCE = new
DatasetResourcesRewriter(DATASET_RESOURCES);
+
+ private DatasetResourcesRewriter(FunctionIdentifier functionId) {
+ super(functionId);
+ }
+
+ @Override
+ public DatasetResourcesDatasource toDatasource(IOptimizationContext
context, AbstractFunctionCallExpression f)
+ throws AlgebricksException {
+ String dataverseName = getString(f.getArguments(), 0);
+ String datasetName = getString(f.getArguments(), 1);
+ MetadataProvider metadataProvider = (MetadataProvider)
context.getMetadataProvider();
+ Dataset dataset = metadataProvider.findDataset(dataverseName,
datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Could not find dataset " +
datasetName + " in dataverse " + dataverseName);
+ }
+ return new
DatasetResourcesDatasource(context.getComputationNodeDomain(),
dataset.getDatasetId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
new file mode 100644
index 0000000..c857ce0
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DatasetRewriter.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.config.DatasetConfig.DatasetType;
+import org.apache.asterix.metadata.declared.DataSource;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Dataverse;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.asterix.optimizer.rules.util.EquivalenceClassUtils;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import
org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependency;
+
+public class DatasetRewriter implements IFunctionToDataSourceRewriter,
IResultTypeComputer {
+ public static final DatasetRewriter INSTANCE = new DatasetRewriter();
+
+ private DatasetRewriter() {
+ }
+
+ @Override
+ public boolean rewrite(Mutable<ILogicalOperator> opRef,
IOptimizationContext context) throws AlgebricksException {
+ AbstractFunctionCallExpression f =
UnnestToDataScanRule.getFunctionCall(opRef);
+ UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+ if (unnest.getPositionalVariable() != null) {
+ // TODO remove this after enabling the support of positional
variables in data scan
+ throw new AlgebricksException("No positional variables are allowed
over datasets.");
+ }
+ ILogicalExpression expr = f.getArguments().get(0).getValue();
+ if (expr.getExpressionTag() != LogicalExpressionTag.CONSTANT) {
+ return false;
+ }
+ ConstantExpression ce = (ConstantExpression) expr;
+ IAlgebricksConstantValue acv = ce.getValue();
+ if (!(acv instanceof AsterixConstantValue)) {
+ return false;
+ }
+ AsterixConstantValue acv2 = (AsterixConstantValue) acv;
+ if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
+ return false;
+ }
+ String datasetArg = ((AString) acv2.getObject()).getStringValue();
+ MetadataProvider metadataProvider = (MetadataProvider)
context.getMetadataProvider();
+ Pair<String, String> datasetReference =
parseDatasetReference(metadataProvider, datasetArg);
+ String dataverseName = datasetReference.first;
+ String datasetName = datasetReference.second;
+ Dataset dataset = metadataProvider.findDataset(dataverseName,
datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Could not find dataset " +
datasetName + " in dataverse " + dataverseName);
+ }
+ DataSourceId asid = new DataSourceId(dataverseName, datasetName);
+ List<LogicalVariable> variables = new ArrayList<>();
+ if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+ int numPrimaryKeys = dataset.getPrimaryKeys().size();
+ for (int i = 0; i < numPrimaryKeys; i++) {
+ variables.add(context.newVar());
+ }
+ }
+ variables.add(unnest.getVariable());
+ DataSource dataSource = metadataProvider.findDataSource(asid);
+ boolean hasMeta = dataSource.hasMeta();
+ if (hasMeta) {
+ variables.add(context.newVar());
+ }
+ DataSourceScanOperator scan = new DataSourceScanOperator(variables,
dataSource);
+ List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+ scanInpList.addAll(unnest.getInputs());
+ opRef.setValue(scan);
+ addPrimaryKey(variables, dataSource, context);
+ context.computeAndSetTypeEnvironmentForOperator(scan);
+ // Adds equivalence classes --- one equivalent class between a primary
key
+ // variable and a record field-access expression.
+ IAType[] schemaTypes = dataSource.getSchemaTypes();
+ ARecordType recordType =
+ (ARecordType) (hasMeta ? schemaTypes[schemaTypes.length - 2] :
schemaTypes[schemaTypes.length - 1]);
+ ARecordType metaRecordType = (ARecordType) (hasMeta ?
schemaTypes[schemaTypes.length - 1] : null);
+ EquivalenceClassUtils.addEquivalenceClassesForPrimaryIndexAccess(scan,
variables, recordType, metaRecordType,
+ dataset, context);
+ return true;
+ }
+
+ private Pair<String, String> parseDatasetReference(MetadataProvider
metadataProvider, String datasetArg)
+ throws AlgebricksException {
+ String[] datasetNameComponents = datasetArg.split("\\.");
+ String dataverseName;
+ String datasetName;
+ if (datasetNameComponents.length == 1) {
+ Dataverse defaultDataverse =
metadataProvider.getDefaultDataverse();
+ if (defaultDataverse == null) {
+ throw new AlgebricksException("Unresolved dataset " +
datasetArg + " Dataverse not specified.");
+ }
+ dataverseName = defaultDataverse.getDataverseName();
+ datasetName = datasetNameComponents[0];
+ } else {
+ dataverseName = datasetNameComponents[0];
+ datasetName = datasetNameComponents[1];
+ }
+ return new Pair<>(dataverseName, datasetName);
+ }
+
+ private void addPrimaryKey(List<LogicalVariable> scanVariables, DataSource
dataSource,
+ IOptimizationContext context) {
+ List<LogicalVariable> primaryKey =
dataSource.getPrimaryKeyVariables(scanVariables);
+ List<LogicalVariable> tail = new ArrayList<>();
+ tail.addAll(scanVariables);
+ FunctionalDependency pk = new FunctionalDependency(primaryKey, tail);
+ context.addPrimaryKey(pk);
+ }
+
+ @Override
+ public IAType computeType(ILogicalExpression expression,
IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
+ throws AlgebricksException {
+ AbstractFunctionCallExpression f = (AbstractFunctionCallExpression)
expression;
+ if (f.getArguments().size() != 1) {
+ throw new AlgebricksException("dataset arity is 1, not " +
f.getArguments().size());
+ }
+ ILogicalExpression a1 = f.getArguments().get(0).getValue();
+ IAType t1 = (IAType) env.getType(a1);
+ if (t1.getTypeTag() == ATypeTag.ANY) {
+ return BuiltinType.ANY;
+ }
+ if (t1.getTypeTag() != ATypeTag.STRING) {
+ throw new AlgebricksException("Illegal type " + t1 + " for
dataset() argument.");
+ }
+ String datasetArg = ConstantExpressionUtil.getStringConstant(a1);
+ if (datasetArg == null) {
+ return BuiltinType.ANY;
+ }
+ MetadataProvider metadata = (MetadataProvider) mp;
+ Pair<String, String> datasetInfo =
DatasetUtil.getDatasetInfo(metadata, datasetArg);
+ String dataverseName = datasetInfo.first;
+ String datasetName = datasetInfo.second;
+ if (dataverseName == null) {
+ throw new AlgebricksException("Unspecified dataverse!");
+ }
+ Dataset dataset = metadata.findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Could not find dataset " +
datasetName + " in dataverse " + dataverseName);
+ }
+ String tn = dataset.getItemTypeName();
+ IAType t2 = metadata.findType(dataset.getItemTypeDataverseName(), tn);
+ if (t2 == null) {
+ throw new AlgebricksException("No type for dataset " +
datasetName);
+ }
+ return t2;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
new file mode 100644
index 0000000..ee3976e
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FeedRewriter.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.external.feed.watch.FeedActivityDetails;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.FeedUtils;
+import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FeedDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Feed;
+import org.apache.asterix.metadata.entities.FeedConnection;
+import org.apache.asterix.metadata.entities.FeedPolicyEntity;
+import org.apache.asterix.metadata.entities.InternalDatasetDetails;
+import org.apache.asterix.metadata.feeds.BuiltinFeedPolicies;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.asterix.translator.util.PlanTranslationUtil;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+public class FeedRewriter implements IFunctionToDataSourceRewriter,
IResultTypeComputer {
+ public static final FeedRewriter INSTANCE = new FeedRewriter();
+
+ private FeedRewriter() {
+ }
+
+ @Override
+ public boolean rewrite(Mutable<ILogicalOperator> opRef,
IOptimizationContext context) throws AlgebricksException {
+ AbstractFunctionCallExpression f =
UnnestToDataScanRule.getFunctionCall(opRef);
+ UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+ if (unnest.getPositionalVariable() != null) {
+ throw new AlgebricksException("No positional variables are allowed
over feeds.");
+ }
+ String dataverse = ConstantExpressionUtil.getStringArgument(f, 0);
+ String sourceFeedName = ConstantExpressionUtil.getStringArgument(f, 1);
+ String getTargetFeed = ConstantExpressionUtil.getStringArgument(f, 2);
+ String subscriptionLocation =
ConstantExpressionUtil.getStringArgument(f, 3);
+ String targetDataset = ConstantExpressionUtil.getStringArgument(f, 4);
+ String outputType = ConstantExpressionUtil.getStringArgument(f, 5);
+ MetadataProvider metadataProvider = (MetadataProvider)
context.getMetadataProvider();
+ DataSourceId asid = new DataSourceId(dataverse, getTargetFeed);
+ String policyName =
metadataProvider.getConfig().get(FeedActivityDetails.FEED_POLICY_NAME);
+ FeedPolicyEntity policy = metadataProvider.findFeedPolicy(dataverse,
policyName);
+ if (policy == null) {
+ policy = BuiltinFeedPolicies.getFeedPolicy(policyName);
+ if (policy == null) {
+ throw new AlgebricksException("Unknown feed policy:" +
policyName);
+ }
+ }
+ ArrayList<LogicalVariable> feedDataScanOutputVariables = new
ArrayList<>();
+ String csLocations =
metadataProvider.getConfig().get(FeedActivityDetails.COLLECT_LOCATIONS);
+ List<LogicalVariable> pkVars = new ArrayList<>();
+ FeedDataSource ds = createFeedDataSource(asid, targetDataset,
sourceFeedName, subscriptionLocation,
+ metadataProvider, policy, outputType, csLocations,
unnest.getVariable(), context, pkVars);
+ // The order for feeds is <Record-Meta-PK>
+ feedDataScanOutputVariables.add(unnest.getVariable());
+ // Does it produce meta?
+ if (ds.hasMeta()) {
+ feedDataScanOutputVariables.add(context.newVar());
+ }
+ // Does it produce pk?
+ if (ds.isChange()) {
+ feedDataScanOutputVariables.addAll(pkVars);
+ }
+ DataSourceScanOperator scan = new
DataSourceScanOperator(feedDataScanOutputVariables, ds);
+ List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+ scanInpList.addAll(unnest.getInputs());
+ opRef.setValue(scan);
+ context.computeAndSetTypeEnvironmentForOperator(scan);
+ return true;
+ }
+
+ private FeedDataSource createFeedDataSource(DataSourceId aqlId, String
targetDataset, String sourceFeedName,
+ String subscriptionLocation, MetadataProvider metadataProvider,
FeedPolicyEntity feedPolicy,
+ String outputType, String locations, LogicalVariable recordVar,
IOptimizationContext context,
+ List<LogicalVariable> pkVars) throws AlgebricksException {
+ Dataset dataset =
metadataProvider.findDataset(aqlId.getDataverseName(), targetDataset);
+ ARecordType feedOutputType = (ARecordType)
metadataProvider.findType(aqlId.getDataverseName(), outputType);
+ Feed sourceFeed = metadataProvider.findFeed(aqlId.getDataverseName(),
sourceFeedName);
+ FeedConnection feedConnection =
+ metadataProvider.findFeedConnection(aqlId.getDataverseName(),
sourceFeedName, targetDataset);
+ ARecordType metaType = null;
+ // Does dataset have meta?
+ if (dataset.hasMetaPart()) {
+ String metaTypeName =
FeedUtils.getFeedMetaTypeName(sourceFeed.getAdapterConfiguration());
+ if (metaTypeName == null) {
+ throw new AlgebricksException("Feed to a dataset with metadata
doesn't have meta type specified");
+ }
+ String dataverseName = aqlId.getDataverseName();
+ if (metaTypeName.contains(".")) {
+ dataverseName = metaTypeName.substring(0,
metaTypeName.indexOf('.'));
+ metaTypeName =
metaTypeName.substring(metaTypeName.indexOf('.') + 1);
+ }
+ metaType = (ARecordType) metadataProvider.findType(dataverseName,
metaTypeName);
+ }
+ // Is a change feed?
+ List<IAType> pkTypes = null;
+ List<List<String>> partitioningKeys = null;
+ List<Integer> keySourceIndicator = null;
+
+ List<ScalarFunctionCallExpression>
keyAccessScalarFunctionCallExpression;
+ if
(ExternalDataUtils.isChangeFeed(sourceFeed.getAdapterConfiguration())) {
+ List<Mutable<ILogicalExpression>> keyAccessExpression = new
ArrayList<>();
+ keyAccessScalarFunctionCallExpression = new ArrayList<>();
+ pkTypes = ((InternalDatasetDetails)
dataset.getDatasetDetails()).getPrimaryKeyType();
+ partitioningKeys = ((InternalDatasetDetails)
dataset.getDatasetDetails()).getPartitioningKey();
+ if (dataset.hasMetaPart()) {
+ keySourceIndicator = ((InternalDatasetDetails)
dataset.getDatasetDetails()).getKeySourceIndicator();
+ }
+ for (int i = 0; i < partitioningKeys.size(); i++) {
+ List<String> key = partitioningKeys.get(i);
+ if (keySourceIndicator == null ||
keySourceIndicator.get(i).intValue() == 0) {
+ PlanTranslationUtil.prepareVarAndExpression(key,
recordVar, pkVars, keyAccessExpression, null,
+ context);
+ } else {
+ PlanTranslationUtil.prepareMetaKeyAccessExpression(key,
recordVar, keyAccessExpression, pkVars,
+ null, context);
+ }
+ }
+ keyAccessExpression.forEach(
+ expr ->
keyAccessScalarFunctionCallExpression.add((ScalarFunctionCallExpression)
expr.getValue()));
+ } else {
+ keyAccessScalarFunctionCallExpression = null;
+ }
+ FeedDataSource feedDataSource = new FeedDataSource((MetadataProvider)
context.getMetadataProvider(), sourceFeed,
+ aqlId, targetDataset, feedOutputType, metaType, pkTypes,
keyAccessScalarFunctionCallExpression,
+ sourceFeed.getFeedId(),
FeedRuntimeType.valueOf(subscriptionLocation), locations.split(","),
+ context.getComputationNodeDomain(), feedConnection);
+
feedDataSource.getProperties().put(BuiltinFeedPolicies.CONFIG_FEED_POLICY_KEY,
feedPolicy);
+ return feedDataSource;
+ }
+
+ @Override
+ public IAType computeType(ILogicalExpression expression,
IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
+ throws AlgebricksException {
+ AbstractFunctionCallExpression f = (AbstractFunctionCallExpression)
expression;
+ if (f.getArguments().size() !=
BuiltinFunctions.FEED_COLLECT.getArity()) {
+ throw new AlgebricksException("Incorrect number of arguments ->
arity is "
+ + BuiltinFunctions.FEED_COLLECT.getArity() + ", not " +
f.getArguments().size());
+ }
+ ILogicalExpression a1 = f.getArguments().get(5).getValue();
+ IAType t1 = (IAType) env.getType(a1);
+ if (t1.getTypeTag() == ATypeTag.ANY) {
+ return BuiltinType.ANY;
+ }
+ if (t1.getTypeTag() != ATypeTag.STRING) {
+ throw new AlgebricksException("Illegal type " + t1 + " for
feed-ingest argument.");
+ }
+ String typeArg = ConstantExpressionUtil.getStringConstant(a1);
+ if (typeArg == null) {
+ return BuiltinType.ANY;
+ }
+ MetadataProvider metadata = (MetadataProvider) mp;
+ Pair<String, String> argInfo = DatasetUtil.getDatasetInfo(metadata,
typeArg);
+ String dataverseName = argInfo.first;
+ String typeName = argInfo.second;
+ if (dataverseName == null) {
+ throw new AlgebricksException("Unspecified dataverse!");
+ }
+ IAType t2 = metadata.findType(dataverseName, typeName);
+ if (t2 == null) {
+ throw new AlgebricksException("Unknown type " + typeName);
+ }
+ return t2;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
new file mode 100644
index 0000000..c73f8e8
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionReader.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class FunctionReader implements IRecordReader<char[]> {
+
+ @Override
+ public void close() throws IOException {
+ // No Op for function reader
+ }
+
+ @Override
+ public boolean stop() {
+ return true;
+ }
+
+ @Override
+ public void setController(AbstractFeedDataFlowController controller) {
+ // No Op for function reader
+ }
+
+ @Override
+ public void setFeedLogManager(FeedLogManager feedLogManager) throws
HyracksDataException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java
new file mode 100644
index 0000000..2ff9282
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FunctionRewriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.IFunctionToDataSourceRewriter;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.optimizer.rules.UnnestToDataScanRule;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IAlgebricksConstantValue;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+
+public abstract class FunctionRewriter implements
IFunctionToDataSourceRewriter {
+
+ private FunctionIdentifier functionId;
+
+ public FunctionRewriter(FunctionIdentifier functionId) {
+ this.functionId = functionId;
+ }
+
+ @Override
+ public final boolean rewrite(Mutable<ILogicalOperator> opRef,
IOptimizationContext context)
+ throws AlgebricksException {
+ AbstractFunctionCallExpression f =
UnnestToDataScanRule.getFunctionCall(opRef);
+ List<Mutable<ILogicalExpression>> args = f.getArguments();
+ if (args.size() != functionId.getArity()) {
+ throw new AlgebricksException("Function " +
functionId.getNamespace() + "." + functionId.getName()
+ + " expects " + functionId.getArity() + " arguments");
+ }
+ for (int i = 0; i < args.size(); i++) {
+ if (args.get(i).getValue().getExpressionTag() !=
LogicalExpressionTag.CONSTANT) {
+ throw new AlgebricksException("Function " +
functionId.getNamespace() + "." + functionId.getName()
+ + " expects constant arguments while arg[" + i + "] is
of type "
+ + args.get(i).getValue().getExpressionTag());
+ }
+ }
+ UnnestOperator unnest = (UnnestOperator) opRef.getValue();
+ if (unnest.getPositionalVariable() != null) {
+ throw new AlgebricksException("No positional variables are allowed
over datasource functions");
+ }
+ FunctionDataSource datasource = toDatasource(context, f);
+ List<LogicalVariable> variables = new ArrayList<>();
+ variables.add(unnest.getVariable());
+ DataSourceScanOperator scan = new DataSourceScanOperator(variables,
datasource);
+ List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+ scanInpList.addAll(unnest.getInputs());
+ opRef.setValue(scan);
+ context.computeAndSetTypeEnvironmentForOperator(scan);
+ return true;
+ }
+
+ protected abstract FunctionDataSource toDatasource(IOptimizationContext
context, AbstractFunctionCallExpression f)
+ throws AlgebricksException;
+
+ protected String getString(List<Mutable<ILogicalExpression>> args, int i)
throws AlgebricksException {
+ ConstantExpression ce = (ConstantExpression) args.get(i).getValue();
+ IAlgebricksConstantValue acv = ce.getValue();
+ if (!(acv instanceof AsterixConstantValue)) {
+ throw new AlgebricksException("Expected arg[" + i + "] to be of
type String");
+ }
+ AsterixConstantValue acv2 = (AsterixConstantValue) acv;
+ if (acv2.getObject().getType().getTypeTag() != ATypeTag.STRING) {
+ throw new AlgebricksException("Expected arg[" + i + "] to be of
type String");
+ }
+ return ((AString) acv2.getObject()).getStringValue();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java
new file mode 100644
index 0000000..7245b88
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsDatasource.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+public class StorageComponentsDatasource extends FunctionDataSource {
+ private final int datasetId;
+
+ public StorageComponentsDatasource(INodeDomain domain, int datasetId)
throws AlgebricksException {
+ super(new
DataSourceId(StorageComponentsRewriter.STORAGE_COMPONENTS.getNamespace(),
+ StorageComponentsRewriter.STORAGE_COMPONENTS.getName()),
domain);
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ protected IDatasourceFunction createFunction(MetadataProvider
metadataProvider,
+ AlgebricksAbsolutePartitionConstraint locations) {
+ return new StorageComponentsFunction(locations, datasetId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java
new file mode 100644
index 0000000..73b2d0e
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.DatasetLifecycleManager;
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.application.INCServiceContext;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class StorageComponentsFunction extends AbstractDatasourceFunction {
+
+ private static final long serialVersionUID = 1L;
+ private final int datasetId;
+
+ public StorageComponentsFunction(AlgebricksAbsolutePartitionConstraint
locations, int datasetId) {
+ super(locations);
+ this.datasetId = datasetId;
+ }
+
+ @Override
+ public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx,
int partition)
+ throws HyracksDataException {
+ INCServiceContext serviceCtx =
ctx.getJobletContext().getServiceContext();
+ INcApplicationContext appCtx = (INcApplicationContext)
serviceCtx.getApplicationContext();
+ DatasetLifecycleManager dsLifecycleMgr = (DatasetLifecycleManager)
appCtx.getDatasetLifecycleManager();
+ DatasetResource dsr = dsLifecycleMgr.getDatasetLifecycle(datasetId);
+ return new StorageComponentsReader(dsr);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java
new file mode 100644
index 0000000..4958d14
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsReader.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.asterix.common.context.DatasetResource;
+import org.apache.asterix.common.context.IndexInfo;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
+
+public class StorageComponentsReader extends FunctionReader {
+
+ private final List<String> components;
+ private final Iterator<String> it;
+ private final CharArrayRecord record;
+
+ public StorageComponentsReader(DatasetResource dsr) throws
HyracksDataException {
+ components = new ArrayList<>();
+ if (dsr != null && dsr.isOpen()) {
+ Map<Long, IndexInfo> indexes = dsr.getIndexes();
+ StringBuilder strBuilder = new StringBuilder();
+ for (Entry<Long, IndexInfo> entry : indexes.entrySet()) {
+ strBuilder.setLength(0);
+ IndexInfo value = entry.getValue();
+ ILSMIndex index = value.getIndex();
+ String path = value.getLocalResource().getPath();
+ strBuilder.append('{');
+ strBuilder.append("\"path\":\"");
+ strBuilder.append(path);
+ strBuilder.append("\", \"components\":[");
+ // syncronize over the opTracker
+ synchronized (index.getOperationTracker()) {
+ List<ILSMDiskComponent> diskComponents =
index.getDiskComponents();
+ for (int i = diskComponents.size() - 1; i >= 0; i--) {
+ if (i < diskComponents.size() - 1) {
+ strBuilder.append(',');
+ }
+ ILSMDiskComponent c = diskComponents.get(i);
+ LSMComponentId id = (LSMComponentId) c.getId();
+ strBuilder.append('{');
+ strBuilder.append("\"min\":");
+ strBuilder.append(id.getMinId());
+ strBuilder.append(",\"max\":");
+ strBuilder.append(id.getMaxId());
+ strBuilder.append('}');
+ }
+ }
+ strBuilder.append("]}");
+ components.add(strBuilder.toString());
+ }
+ record = new CharArrayRecord();
+ } else {
+ record = null;
+ }
+ it = components.iterator();
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return it.hasNext();
+ }
+
+ @Override
+ public IRawRecord<char[]> next() throws IOException, InterruptedException {
+ record.reset();
+ record.append(it.next().toCharArray());
+ record.endRecord();
+ return record;
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java
new file mode 100644
index 0000000..89bd115
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/StorageComponentsRewriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class StorageComponentsRewriter extends FunctionRewriter {
+
+ // Parameters are dataverse name, and dataset name
+ public static final FunctionIdentifier STORAGE_COMPONENTS =
+ new FunctionIdentifier(FunctionConstants.ASTERIX_NS,
"storage-components", 2);
+ public static final StorageComponentsRewriter INSTANCE = new
StorageComponentsRewriter(STORAGE_COMPONENTS);
+
+ private StorageComponentsRewriter(FunctionIdentifier functionId) {
+ super(functionId);
+ }
+
+ @Override
+ public StorageComponentsDatasource toDatasource(IOptimizationContext
context, AbstractFunctionCallExpression f)
+ throws AlgebricksException {
+ String dataverseName = getString(f.getArguments(), 0);
+ String datasetName = getString(f.getArguments(), 1);
+ MetadataProvider metadataProvider = (MetadataProvider)
context.getMetadataProvider();
+ Dataset dataset = metadataProvider.findDataset(dataverseName,
datasetName);
+ if (dataset == null) {
+ throw new AlgebricksException("Could not find dataset " +
datasetName + " in dataverse " + dataverseName);
+ }
+ return new
StorageComponentsDatasource(context.getComputationNodeDomain(),
dataset.getDatasetId());
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 670b2bd..622e28f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -82,6 +82,7 @@ import
org.apache.asterix.runtime.job.resource.JobCapacityController;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.translator.IStatementExecutorContext;
import org.apache.asterix.translator.IStatementExecutorFactory;
+import org.apache.asterix.util.MetadataBuiltinFunctions;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.ICCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
@@ -137,6 +138,7 @@ public class CCApplication extends BaseCCApplication {
String strIP =
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetAddress();
int port =
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
hcc = new HyracksConnection(strIP, port);
+ MetadataBuiltinFunctions.init();
ILibraryManager libraryManager = new ExternalLibraryManager();
ReplicationProperties repProp = new ReplicationProperties(
PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/b4d166b3/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
index a05b2bb..e1a75cb 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/NCApplication.java
@@ -45,6 +45,7 @@ import org.apache.asterix.common.utils.StorageConstants;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.messaging.MessagingChannelInterfaceFactory;
import org.apache.asterix.messaging.NCMessageBroker;
+import org.apache.asterix.util.MetadataBuiltinFunctions;
import org.apache.asterix.utils.CompatibilityUtil;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.api.application.IServiceContext;
@@ -104,6 +105,7 @@ public class NCApplication extends BaseNCApplication {
System.setProperty("java.rmi.server.hostname",
(controllerService).getConfiguration().getClusterPublicAddress());
}
+ MetadataBuiltinFunctions.init();
runtimeContext = new NCAppRuntimeContext(ncServiceCtx,
getExtensions());
MetadataProperties metadataProperties =
runtimeContext.getMetadataProperties();
if
(!metadataProperties.getNodeNames().contains(this.ncServiceCtx.getNodeId())) {