This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new dc9718b745 [ASTERIXDB-3457][FUN] Add query-partition() to get all
tuples in a partition
dc9718b745 is described below
commit dc9718b745f5ea3c7f9e5be78af5b436541c3aac
Author: Ali Alsuliman <[email protected]>
AuthorDate: Sat Jul 13 15:24:56 2024 +0300
[ASTERIXDB-3457][FUN] Add query-partition() to get all tuples in a partition
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Add internal query-partition() to get all tuples in a partition.
Ext-ref: MB-62720
Change-Id: I37185d159a38d26c8cc93ddd6500e437891c44f5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18483
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Tested-by: Ali Alsuliman <[email protected]>
---
.../app/function/QueryPartitionDatasource.java | 131 ++++++++++++
.../app/function/QueryPartitionRewriter.java | 220 +++++++++++++++++++++
.../asterix/util/MetadataBuiltinFunctions.java | 6 +
.../metadata/declared/MetadataProvider.java | 45 ++++-
.../hyracks/hyracks-storage-am-btree/pom.xml | 4 +
.../BTreePartitionSearchOperatorDescriptor.java | 72 +++++++
.../BTreePartitionSearchOperatorNodePushable.java | 76 +++++++
.../dataflow/IndexSearchOperatorNodePushable.java | 2 +-
8 files changed, 549 insertions(+), 7 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionDatasource.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionDatasource.java
new file mode 100644
index 0000000000..4d40ba6913
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionDatasource.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IDataSource;
+import
org.apache.hyracks.algebricks.core.algebra.metadata.IDataSourcePropertiesProvider;
+import
org.apache.hyracks.algebricks.core.algebra.metadata.IProjectionFiltrationInfo;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import
org.apache.hyracks.algebricks.core.algebra.properties.ILocalStructuralProperty;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import
org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalPropertiesVector;
+import
org.apache.hyracks.algebricks.core.algebra.properties.RandomPartitioningProperty;
+import
org.apache.hyracks.algebricks.core.algebra.properties.StructuralPropertiesVector;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.job.JobSpecification;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+
+public class QueryPartitionDatasource extends FunctionDataSource {
+
+ private final Dataset ds;
+ private final AlgebricksAbsolutePartitionConstraint storageLocations;
+ private final int partitionNum;
+
+ public QueryPartitionDatasource(Dataset ds, INodeDomain domain,
+ AlgebricksAbsolutePartitionConstraint storageLocations,
ARecordType recType, int partitionNum)
+ throws AlgebricksException {
+ super(createQueryPartitionDataSourceId(ds),
QueryPartitionRewriter.QUERY_PARTITION, domain, recType);
+ if (partitionNum < 0) {
+ throw new IllegalArgumentException("partition must be >= 0");
+ }
+ this.partitionNum = partitionNum;
+ this.ds = ds;
+ this.storageLocations = storageLocations;
+ }
+
+ @Override
+ protected void initSchemaType(IAType iType) {
+ ARecordType type = (ARecordType) iType;
+ IAType[] fieldTypes = type.getFieldTypes();
+ schemaTypes = new IAType[fieldTypes.length];
+ System.arraycopy(fieldTypes, 0, schemaTypes, 0, schemaTypes.length);
+ }
+
+ @Override
+ protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm, MetadataProvider md) {
+ return storageLocations;
+ }
+
+ @Override
+ public boolean isScanAccessPathALeaf() {
+ // the index scan op is not a leaf op. the ETS op will start the scan
of the index. we need the ETS op below
+ // the index scan to be still generated
+ return false;
+ }
+
+ @Override
+ protected IDatasourceFunction createFunction(MetadataProvider
metadataProvider,
+ AlgebricksAbsolutePartitionConstraint locations) {
+ throw new UnsupportedOperationException("query-partition() does not
use record reader adapter");
+ }
+
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
buildDatasourceScanRuntime(
+ MetadataProvider metadataProvider, IDataSource<DataSourceId>
dataSource,
+ List<LogicalVariable> scanVariables, List<LogicalVariable>
projectVariables, boolean projectPushed,
+ List<LogicalVariable> minFilterVars, List<LogicalVariable>
maxFilterVars,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit,
IOperatorSchema opSchema,
+ IVariableTypeEnvironment typeEnv, JobGenContext context,
JobSpecification jobSpec, Object implConfig,
+ IProjectionFiltrationInfo projectionInfo) throws
AlgebricksException {
+ return metadataProvider.getBtreePartitionSearchRuntime(jobSpec,
opSchema, typeEnv, context, ds,
+ tupleFilterFactory, outputLimit, partitionNum);
+ }
+
+ @Override
+ public IDataSourcePropertiesProvider getPropertiesProvider() {
+ return new IDataSourcePropertiesProvider() {
+ @Override
+ public IPhysicalPropertiesVector
computeRequiredProperties(List<LogicalVariable> scanVariables,
+ IOptimizationContext ctx) {
+ return StructuralPropertiesVector.EMPTY_PROPERTIES_VECTOR;
+ }
+
+ @Override
+ public IPhysicalPropertiesVector
computeDeliveredProperties(List<LogicalVariable> scanVariables,
+ IOptimizationContext ctx) {
+ List<ILocalStructuralProperty> propsLocal = new ArrayList<>(1);
+ return new StructuralPropertiesVector(new
RandomPartitioningProperty(domain), propsLocal);
+ }
+ };
+ }
+
+ private static DataSourceId createQueryPartitionDataSourceId(Dataset
dataset) {
+ return new DataSourceId(dataset.getDatabaseName(),
dataset.getDataverseName(), dataset.getDatasetName(),
+ new String[] { dataset.getDatasetName(),
QueryPartitionRewriter.QUERY_PARTITION.getName() });
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionRewriter.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionRewriter.java
new file mode 100644
index 0000000000..4514a30362
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryPartitionRewriter.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import static
org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier.VARARGS;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.asterix.common.cluster.PartitioningProperties;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.common.metadata.DataverseName;
+import org.apache.asterix.common.metadata.MetadataUtil;
+import org.apache.asterix.lang.common.util.FunctionUtil;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.metadata.utils.KeyFieldTypeUtil;
+import org.apache.asterix.om.base.AString;
+import org.apache.asterix.om.constants.AsterixConstantValue;
+import org.apache.asterix.om.functions.BuiltinFunctions;
+import org.apache.asterix.om.typecomputer.base.IResultTypeComputer;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.utils.ConstantExpressionUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.mutable.Mutable;
+import org.apache.commons.lang3.mutable.MutableObject;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.DataSourceScanOperator;
+import
org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestOperator;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+import org.apache.hyracks.api.exceptions.SourceLocation;
+
+/**
+ * query-partition("db", "dv", "ds", 0);
+ * query-partition("dv", "ds", 0);
+ */
+public class QueryPartitionRewriter extends FunctionRewriter implements
IResultTypeComputer {
+
+ public static final FunctionIdentifier QUERY_PARTITION =
FunctionConstants.newAsterix("query-partition", VARARGS);
+ public static final QueryPartitionRewriter INSTANCE = new
QueryPartitionRewriter(QUERY_PARTITION);
+
+ private QueryPartitionRewriter(FunctionIdentifier functionId) {
+ super(functionId);
+ }
+
+ @Override
+ public IAType computeType(ILogicalExpression expression,
IVariableTypeEnvironment env, IMetadataProvider<?, ?> mp)
+ throws AlgebricksException {
+ return computeRecType((AbstractFunctionCallExpression) expression,
(MetadataProvider) mp, null, null, null);
+ }
+
+ @Override
+ public FunctionDataSource toDatasource(IOptimizationContext ctx,
AbstractFunctionCallExpression f)
+ throws AlgebricksException {
+ final SourceLocation loc = f.getSourceLocation();
+ int numArgs = f.getArguments().size();
+ int nextArg = 0;
+ if (numArgs > 3) {
+ nextArg++;
+ }
+ DataverseName dvName = getDataverseName(loc, f.getArguments(),
nextArg++);
+ String dsName = getString(loc, f.getArguments(), nextArg++);
+ Long partitionNum = ConstantExpressionUtil.getLongArgument(f, nextArg);
+ if (partitionNum == null) {
+ throw new IllegalArgumentException("partition number should be a
number");
+ }
+ String dbName;
+ if (numArgs > 3) {
+ dbName = getString(loc, f.getArguments(), 0);
+ } else {
+ dbName = MetadataUtil.databaseFor(dvName);
+ }
+ MetadataProvider mp = (MetadataProvider) ctx.getMetadataProvider();
+ final Dataset dataset = validateDataset(mp, dbName, dvName, dsName,
loc);
+ return createQueryPartitionDatasource(mp, dataset,
partitionNum.intValue(), loc, f);
+ }
+
+ @Override
+ protected void createDataScanOp(Mutable<ILogicalOperator> opRef,
UnnestOperator unnest, IOptimizationContext ctx,
+ AbstractFunctionCallExpression f) throws AlgebricksException {
+ FunctionDataSource datasource = toDatasource(ctx, f);
+ List<LogicalVariable> variables = new ArrayList<>();
+ List<Mutable<ILogicalExpression>> closedRecArgs = new ArrayList<>();
+ MetadataProvider mp = (MetadataProvider) ctx.getMetadataProvider();
+ computeRecType(f, mp, variables, closedRecArgs, ctx);
+ DataSourceScanOperator scan = new DataSourceScanOperator(variables,
datasource);
+ scan.setSourceLocation(unnest.getSourceLocation());
+ List<Mutable<ILogicalOperator>> scanInpList = scan.getInputs();
+ scanInpList.addAll(unnest.getInputs());
+ ScalarFunctionCallExpression recordCreationFunc = new
ScalarFunctionCallExpression(
+
FunctionUtil.getFunctionInfo(BuiltinFunctions.CLOSED_RECORD_CONSTRUCTOR),
closedRecArgs);
+ recordCreationFunc.setSourceLocation(unnest.getSourceLocation());
+ AssignOperator assignOp = new AssignOperator(unnest.getVariable(), new
MutableObject<>(recordCreationFunc));
+ assignOp.getInputs().add(new MutableObject<>(scan));
+ assignOp.setSourceLocation(unnest.getSourceLocation());
+ ctx.computeAndSetTypeEnvironmentForOperator(scan);
+ ctx.computeAndSetTypeEnvironmentForOperator(assignOp);
+ opRef.setValue(assignOp);
+ }
+
+ @Override
+ protected boolean invalidArgs(List<Mutable<ILogicalExpression>> args) {
+ return args.size() < 3;
+ }
+
+ private FunctionDataSource createQueryPartitionDatasource(MetadataProvider
mp, Dataset ds, int partitionNum,
+ SourceLocation loc, AbstractFunctionCallExpression f) throws
AlgebricksException {
+ INodeDomain domain = mp.findNodeDomain(ds.getNodeGroupName());
+ PartitioningProperties partitioningProperties =
mp.getPartitioningProperties(ds);
+ AlgebricksPartitionConstraint constraints =
partitioningProperties.getConstraints();
+ ARecordType recType = computeRecType(f, mp, null, null, null);
+ return new QueryPartitionDatasource(ds, domain,
(AlgebricksAbsolutePartitionConstraint) constraints, recType,
+ partitionNum);
+ }
+
+ private ARecordType computeRecType(AbstractFunctionCallExpression f,
MetadataProvider metadataProvider,
+ List<LogicalVariable> outVars, List<Mutable<ILogicalExpression>>
closedRecArgs,
+ IOptimizationContext context) throws AlgebricksException {
+ final SourceLocation loc = f.getSourceLocation();
+ int numArgs = f.getArguments().size();
+ int nextArg = 0;
+ if (numArgs > 3) {
+ nextArg++;
+ }
+ DataverseName dvName = getDataverseName(loc, f.getArguments(),
nextArg++);
+ String dsName = getString(loc, f.getArguments(), nextArg++);
+ String dbName;
+ if (numArgs > 3) {
+ dbName = getString(loc, f.getArguments(), 0);
+ } else {
+ dbName = MetadataUtil.databaseFor(dvName);
+ }
+ Dataset dataset = validateDataset(metadataProvider, dbName, dvName,
dsName, loc);
+ ARecordType dsType = (ARecordType) metadataProvider.findType(dataset);
+ ARecordType metaType = DatasetUtil.getMetaType(metadataProvider,
dataset);
+ dsType = (ARecordType)
metadataProvider.findTypeForDatasetWithoutType(dsType, metaType, dataset);
+
+ List<IAType> dsKeyTypes =
KeyFieldTypeUtil.getPartitoningKeyTypes(dataset, dsType, metaType);
+ List<List<String>> primaryKeys = dataset.getPrimaryKeys();
+ int numPrimaryKeys = dsKeyTypes.size();
+ int numPayload = metaType == null ? 1 : 2;
+ String[] fieldNames = new String[numPrimaryKeys + numPayload];
+ IAType[] fieldTypes = new IAType[numPrimaryKeys + numPayload];
+ int keyIdx = 0;
+ for (int k = 0; k < numPrimaryKeys; k++, keyIdx++) {
+ fieldTypes[keyIdx] = dsKeyTypes.get(k);
+ fieldNames[keyIdx] = StringUtils.join(primaryKeys.get(k), ".");
+ setAssignVarsExprs(outVars, closedRecArgs, context, loc,
fieldNames, keyIdx);
+ }
+ fieldTypes[keyIdx] = dsType;
+ fieldNames[keyIdx] = "rec";
+ setAssignVarsExprs(outVars, closedRecArgs, context, loc, fieldNames,
keyIdx);
+ if (metaType != null) {
+ keyIdx++;
+ fieldTypes[keyIdx] = metaType;
+ fieldNames[keyIdx] = "meta";
+ setAssignVarsExprs(outVars, closedRecArgs, context, loc,
fieldNames, keyIdx);
+ }
+ return new ARecordType("", fieldNames, fieldTypes, false);
+ }
+
+ private void setAssignVarsExprs(List<LogicalVariable> outVars,
List<Mutable<ILogicalExpression>> closedRecArgs,
+ IOptimizationContext context, SourceLocation loc, String[]
fieldNames, int n) {
+ if (context != null) {
+ LogicalVariable logicalVariable = context.newVar();
+ outVars.add(logicalVariable);
+ ConstantExpression nameExpr = new ConstantExpression(new
AsterixConstantValue(new AString(fieldNames[n])));
+ VariableReferenceExpression varRefExpr = new
VariableReferenceExpression(logicalVariable);
+ nameExpr.setSourceLocation(loc);
+ varRefExpr.setSourceLocation(loc);
+ closedRecArgs.add(new MutableObject<>(nameExpr));
+ closedRecArgs.add(new MutableObject<>(varRefExpr));
+ }
+ }
+
+ private static Dataset validateDataset(MetadataProvider mp, String dbName,
DataverseName dvName, String dsName,
+ SourceLocation loc) throws AlgebricksException {
+ Dataset dataset = mp.findDataset(dbName, dvName, dsName);
+ if (dataset == null) {
+ throw new
CompilationException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, loc, dsName,
+ MetadataUtil.dataverseName(dbName, dvName,
mp.isUsingDatabase()));
+ }
+ return dataset;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
index 5a2ef3c768..9d39088534 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -27,6 +27,7 @@ import org.apache.asterix.app.function.FeedRewriter;
import org.apache.asterix.app.function.JobSummariesRewriter;
import org.apache.asterix.app.function.PingRewriter;
import org.apache.asterix.app.function.QueryIndexRewriter;
+import org.apache.asterix.app.function.QueryPartitionRewriter;
import org.apache.asterix.app.function.StorageComponentsRewriter;
import org.apache.asterix.app.function.TPCDSAllTablesDataGeneratorRewriter;
import org.apache.asterix.app.function.TPCDSSingleTableDataGeneratorRewriter;
@@ -100,6 +101,11 @@ public class MetadataBuiltinFunctions {
BuiltinFunctions.addFunction(QueryIndexRewriter.QUERY_INDEX,
QueryIndexRewriter.INSTANCE, true);
BuiltinFunctions.addUnnestFun(QueryIndexRewriter.QUERY_INDEX, false);
BuiltinFunctions.addDatasourceFunction(QueryIndexRewriter.QUERY_INDEX,
QueryIndexRewriter.INSTANCE);
+ // Query index partition function
+
BuiltinFunctions.addPrivateFunction(QueryPartitionRewriter.QUERY_PARTITION,
QueryPartitionRewriter.INSTANCE,
+ true);
+ BuiltinFunctions.addUnnestFun(QueryPartitionRewriter.QUERY_PARTITION,
false);
+
BuiltinFunctions.addDatasourceFunction(QueryPartitionRewriter.QUERY_PARTITION,
QueryPartitionRewriter.INSTANCE);
}
private MetadataBuiltinFunctions() {
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 023b01cd53..fe2127cea3 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -164,12 +164,14 @@ import
org.apache.hyracks.data.std.primitive.ShortPointable;
import
org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
import
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
+import
org.apache.hyracks.storage.am.btree.dataflow.BTreePartitionSearchOperatorDescriptor;
import
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import
org.apache.hyracks.storage.am.lsm.btree.dataflow.LSMBTreeBatchPointSearchOperatorDescriptor;
import
org.apache.hyracks.storage.am.lsm.invertedindex.dataflow.BinaryTokenizerOperatorDescriptor;
@@ -596,6 +598,15 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
}
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getBtreePartitionSearchRuntime(
+ JobSpecification jobSpec, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv, JobGenContext context,
+ Dataset dataset, ITupleFilterFactory tupleFilterFactory, long
outputLimit, int partitionNum)
+ throws AlgebricksException {
+ return getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context,
true, false, null, dataset,
+ dataset.getDatasetName(), null, null, true, true, false, null,
null, null, tupleFilterFactory,
+ outputLimit, false, false,
DefaultTupleProjectorFactory.INSTANCE, false, partitionNum);
+ }
+
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getBtreeSearchRuntime(JobSpecification jobSpec,
IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput,
boolean retainMissing, IMissingWriterFactory
nonMatchWriterFactory, Dataset dataset, String indexName,
@@ -604,6 +615,21 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
int[] maxFilterFieldIndexes, ITupleFilterFactory
tupleFilterFactory, long outputLimit,
boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch,
ITupleProjectorFactory tupleProjectorFactory,
boolean partitionInputTuples) throws AlgebricksException {
+ return getBtreeSearchRuntime(jobSpec, opSchema, typeEnv, context,
retainInput, retainMissing,
+ nonMatchWriterFactory, dataset, indexName, lowKeyFields,
highKeyFields, lowKeyInclusive,
+ highKeyInclusive, propagateFilter, nonFilterWriterFactory,
minFilterFieldIndexes, maxFilterFieldIndexes,
+ tupleFilterFactory, outputLimit, isIndexOnlyPlan,
isPrimaryIndexPointSearch, tupleProjectorFactory,
+ partitionInputTuples, -1);
+ }
+
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getBtreeSearchRuntime(JobSpecification jobSpec,
+ IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput,
+ boolean retainMissing, IMissingWriterFactory
nonMatchWriterFactory, Dataset dataset, String indexName,
+ int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
boolean highKeyInclusive,
+ boolean propagateFilter, IMissingWriterFactory
nonFilterWriterFactory, int[] minFilterFieldIndexes,
+ int[] maxFilterFieldIndexes, ITupleFilterFactory
tupleFilterFactory, long outputLimit,
+ boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch,
ITupleProjectorFactory tupleProjectorFactory,
+ boolean partitionInputTuples, int targetPartition) throws
AlgebricksException {
boolean isSecondary = true;
Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx,
dataset.getDatabaseName(),
dataset.getDataverseName(), dataset.getDatasetName(),
dataset.getDatasetName());
@@ -678,12 +704,19 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
retainMissing, nonMatchWriterFactory,
searchCallbackFactory, minFilterFieldIndexes,
maxFilterFieldIndexes, tupleFilterFactory,
outputLimit, tupleProjectorFactory,
tuplePartitionerFactory, partitionsMap)
- : new BTreeSearchOperatorDescriptor(jobSpec,
outputRecDesc, lowKeyFields, highKeyFields,
- lowKeyInclusive, highKeyInclusive,
indexHelperFactory, retainInput, retainMissing,
- nonMatchWriterFactory, searchCallbackFactory,
minFilterFieldIndexes, maxFilterFieldIndexes,
- propagateFilter, nonFilterWriterFactory,
tupleFilterFactory, outputLimit,
- proceedIndexOnlyPlan, failValueForIndexOnlyPlan,
successValueForIndexOnlyPlan,
- tupleProjectorFactory, tuplePartitionerFactory,
partitionsMap);
+ : targetPartition < 0 ? new
BTreeSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
+ highKeyFields, lowKeyInclusive, highKeyInclusive,
indexHelperFactory, retainInput,
+ retainMissing, nonMatchWriterFactory,
searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes, propagateFilter,
nonFilterWriterFactory, tupleFilterFactory,
+ outputLimit, proceedIndexOnlyPlan,
failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
+ tupleProjectorFactory, tuplePartitionerFactory,
partitionsMap)
+ : new
BTreePartitionSearchOperatorDescriptor(jobSpec, outputRecDesc, lowKeyFields,
+ highKeyFields, lowKeyInclusive,
highKeyInclusive, indexHelperFactory, retainInput,
+ retainMissing, nonMatchWriterFactory,
searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes, propagateFilter,
nonFilterWriterFactory, tupleFilterFactory,
+ outputLimit, proceedIndexOnlyPlan,
failValueForIndexOnlyPlan,
+ successValueForIndexOnlyPlan,
tupleProjectorFactory, tuplePartitionerFactory,
+ partitionsMap, targetPartition);
} else {
btreeSearchOp = null;
}
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
index 52cd6cf1bb..f3881e2914 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/pom.xml
@@ -109,6 +109,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java
new file mode 100644
index 0000000000..dfa59b71a8
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorDescriptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hyracks.storage.am.btree.dataflow;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
+
+public class BTreePartitionSearchOperatorDescriptor extends
BTreeSearchOperatorDescriptor {
+
+ private static final long serialVersionUID = 1L;
+ private final int targetStoragePartition;
+
+ public BTreePartitionSearchOperatorDescriptor(IOperatorDescriptorRegistry
spec, RecordDescriptor outRecDesc,
+ int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
boolean highKeyInclusive,
+ IIndexDataflowHelperFactory indexHelperFactory, boolean
retainInput, boolean retainMissing,
+ IMissingWriterFactory missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory,
+ int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean
appendIndexFilter,
+ IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory
tupleFilterFactory, long outputLimit,
+ boolean appendOpCallbackProceedResult, byte[]
searchCallbackProceedResultFalseValue,
+ byte[] searchCallbackProceedResultTrueValue,
ITupleProjectorFactory tupleProjectorFactory,
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][]
partitionsMap, int targetStoragePartition) {
+ super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive,
highKeyInclusive, indexHelperFactory,
+ retainInput, retainMissing, missingWriterFactory,
searchCallbackFactory, minFilterFieldIndexes,
+ maxFilterFieldIndexes, appendIndexFilter,
nonFilterWriterFactory, tupleFilterFactory, outputLimit,
+ appendOpCallbackProceedResult,
searchCallbackProceedResultFalseValue,
+ searchCallbackProceedResultTrueValue, tupleProjectorFactory,
tuplePartitionerFactory, partitionsMap);
+ this.targetStoragePartition = targetStoragePartition;
+ }
+
+ @Override
+ public BTreeSearchOperatorNodePushable createPushRuntime(final
IHyracksTaskContext ctx,
+ IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
+ return new BTreePartitionSearchOperatorNodePushable(ctx, partition,
+ recordDescProvider.getInputRecordDescriptor(getActivityId(),
0), lowKeyFields, highKeyFields,
+ lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes,
maxFilterFieldIndexes, indexHelperFactory,
+ retainInput, retainMissing, missingWriterFactory,
searchCallbackFactory, appendIndexFilter,
+ nonFilterWriterFactory, tupleFilterFactory, outputLimit,
appendOpCallbackProceedResult,
+ searchCallbackProceedResultFalseValue,
searchCallbackProceedResultTrueValue, tupleProjectorFactory,
+ tuplePartitionerFactory, partitionsMap,
targetStoragePartition);
+ }
+
+ @Override
+ public String getDisplayName() {
+ return "BTree Partition Search";
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java
new file mode 100644
index 0000000000..75187914f4
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreePartitionSearchOperatorNodePushable.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.storage.am.btree.dataflow;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
+import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
+import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
+import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
+
+public class BTreePartitionSearchOperatorNodePushable extends
BTreeSearchOperatorNodePushable {
+
+ private final int pIdx;
+
+ public BTreePartitionSearchOperatorNodePushable(IHyracksTaskContext ctx,
int partition,
+ RecordDescriptor inputRecDesc, int[] lowKeyFields, int[]
highKeyFields, boolean lowKeyInclusive,
+ boolean highKeyInclusive, int[] minFilterFieldIndexes, int[]
maxFilterFieldIndexes,
+ IIndexDataflowHelperFactory indexHelperFactory, boolean
retainInput, boolean retainMissing,
+ IMissingWriterFactory nonMatchWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory,
+ boolean appendIndexFilter, IMissingWriterFactory
nonFilterWriterFactory,
+ ITupleFilterFactory tupleFilterFactory, long outputLimit, boolean
appendOpCallbackProceedResult,
+ byte[] searchCallbackProceedResultFalseValue, byte[]
searchCallbackProceedResultTrueValue,
+ ITupleProjectorFactory projectorFactory, ITuplePartitionerFactory
tuplePartitionerFactory,
+ int[][] partitionsMap, int targetStoragePartition) throws
HyracksDataException {
+ super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive,
+ minFilterFieldIndexes, maxFilterFieldIndexes,
indexHelperFactory, retainInput, retainMissing,
+ nonMatchWriterFactory, searchCallbackFactory,
appendIndexFilter, nonFilterWriterFactory,
+ tupleFilterFactory, outputLimit,
appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
+ searchCallbackProceedResultTrueValue, projectorFactory,
tuplePartitionerFactory, partitionsMap);
+ pIdx = storagePartitionId2Index.getOrDefault(targetStoragePartition,
Integer.MIN_VALUE);
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ accessor.reset(buffer);
+ int tupleCount = accessor.getTupleCount();
+ try {
+ searchPartition(tupleCount);
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void searchPartition(int tupleCount) throws Exception {
+ if (pIdx >= 0 && pIdx < cursors.length) {
+ for (int i = 0; i < tupleCount && !finished; i++) {
+ resetSearchPredicate(i);
+ cursors[pIdx].close();
+ indexAccessors[pIdx].search(cursors[pIdx], searchPred);
+ writeSearchResults(i, cursors[pIdx]);
+ }
+ }
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 91b87c66a2..da5df23997 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -120,7 +120,7 @@ public abstract class IndexSearchOperatorNodePushable
extends AbstractUnaryInput
protected final ITupleProjector tupleProjector;
protected final ITuplePartitioner tuplePartitioner;
protected final int[] partitions;
- private final Int2IntMap storagePartitionId2Index = new
Int2IntOpenHashMap();
+ protected final Int2IntMap storagePartitionId2Index = new
Int2IntOpenHashMap();
public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx,
RecordDescriptor inputRecDesc, int partition,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes,
IIndexDataflowHelperFactory indexHelperFactory,