This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new ba2d8bd2fc [ASTERIXDB-3144][RT] Pass partitions map to inverted index
ba2d8bd2fc is described below
commit ba2d8bd2fcaf1992177c78dfb326c9ccebd23531
Author: Ali Alsuliman <[email protected]>
AuthorDate: Wed Apr 5 04:42:09 2023 -0700
[ASTERIXDB-3144][RT] Pass partitions map to inverted index
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Pass partitions map to the inverted index runtime.
- rename few methods.
Change-Id: I6ad1b0cd79f0f5e8e15da83330b8a52f9ac0108d
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17463
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../operators/physical/BTreeSearchPOperator.java | 2 +-
.../operators/physical/InvertedIndexPOperator.java | 19 +-
.../operators/physical/RTreeSearchPOperator.java | 2 +-
.../asterix/app/function/QueryIndexDatasource.java | 6 +-
.../org/apache/asterix/utils/FeedOperations.java | 2 +-
.../metadata/declared/DatasetDataSource.java | 4 +-
.../metadata/declared/FunctionDataSource.java | 4 +-
.../metadata/declared/LoadableDataSource.java | 2 +-
.../metadata/declared/MetadataProvider.java | 237 ++++++++++-----------
.../metadata/declared/SampleDataSource.java | 2 +-
.../core/algebra/metadata/IMetadataProvider.java | 31 +--
.../dataflow/BTreeSearchOperatorDescriptor.java | 8 +-
.../dataflow/BTreeSearchOperatorNodePushable.java | 4 +-
.../dataflow/IndexSearchOperatorNodePushable.java | 4 +-
...LSMBTreeBatchPointSearchOperatorDescriptor.java | 6 +-
...MBTreeBatchPointSearchOperatorNodePushable.java | 4 +-
.../LSMInvertedIndexSearchOperatorDescriptor.java | 10 +-
...LSMInvertedIndexSearchOperatorNodePushable.java | 6 +-
18 files changed, 178 insertions(+), 175 deletions(-)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index a7a383821b..b8eba74b00 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -162,7 +162,7 @@ public class BTreeSearchPOperator extends
IndexSearchPOperator {
String.valueOf(unnestMap.getOperatorTag()));
}
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch =
metadataProvider.buildBtreeRuntime(
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> btreeSearch =
metadataProvider.getBtreeSearchRuntime(
builder.getJobSpec(), opSchema, typeEnv, context,
jobGenParams.getRetainInput(), retainMissing,
nonMatchWriterFactory, dataset, jobGenParams.getIndexName(),
lowKeyIndexes, highKeyIndexes,
jobGenParams.isLowKeyInclusive(),
jobGenParams.isHighKeyInclusive(), propagateFilter,
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
index 00eef690e1..5bdb2dba5a 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/InvertedIndexPOperator.java
@@ -186,14 +186,17 @@ public class InvertedIndexPOperator extends
IndexSearchPOperator {
IIndexDataflowHelperFactory dataflowHelperFactory = new
IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(),
secondarySplitsAndConstraint.first);
- LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp = new
LSMInvertedIndexSearchOperatorDescriptor(
- jobSpec, outputRecDesc, queryField, dataflowHelperFactory,
queryTokenizerFactory,
- fullTextConfigEvaluatorFactory, searchModifierFactory,
retainInput, retainMissing,
- nonMatchWriterFactory,
-
dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(),
secondaryIndex,
- IndexOperation.SEARCH, null),
- minFilterFieldIndexes, maxFilterFieldIndexes,
isFullTextSearchQuery, numPrimaryKeys,
- propagateIndexFilter, nonFilterWriterFactory, frameLimit);
+ int numPartitions =
MetadataProvider.getNumPartitions(secondarySplitsAndConstraint.second);
+ int[][] partitionsMap =
MetadataProvider.getPartitionsMap(numPartitions);
+
+ LSMInvertedIndexSearchOperatorDescriptor invIndexSearchOp =
+ new LSMInvertedIndexSearchOperatorDescriptor(jobSpec,
outputRecDesc, queryField, dataflowHelperFactory,
+ queryTokenizerFactory, fullTextConfigEvaluatorFactory,
searchModifierFactory, retainInput,
+ retainMissing, nonMatchWriterFactory,
+
dataset.getSearchCallbackFactory(metadataProvider.getStorageComponentProvider(),
secondaryIndex,
+ IndexOperation.SEARCH, null),
+ minFilterFieldIndexes, maxFilterFieldIndexes,
isFullTextSearchQuery, numPrimaryKeys,
+ propagateIndexFilter, nonFilterWriterFactory,
frameLimit, partitionsMap);
return new Pair<>(invIndexSearchOp,
secondarySplitsAndConstraint.second);
}
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
index 6534ebe89b..6b5adea536 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/RTreeSearchPOperator.java
@@ -106,7 +106,7 @@ public class RTreeSearchPOperator extends
IndexSearchPOperator {
}
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> rtreeSearch =
- mp.buildRtreeRuntime(builder.getJobSpec(), outputVars,
opSchema, typeEnv, context,
+ mp.getRtreeSearchRuntime(builder.getJobSpec(), outputVars,
opSchema, typeEnv, context,
jobGenParams.getRetainInput(), retainMissing,
nonMatchWriterFactory, dataset,
jobGenParams.getIndexName(), keyIndexes,
propagateIndexFilter, nonFilterWriterFactory,
minFilterFieldIndexes, maxFilterFieldIndexes,
unnestMap.getGenerateCallBackProceedResultVar());
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index f962142758..fda3845b7b 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -105,9 +105,9 @@ public class QueryIndexDatasource extends
FunctionDataSource {
IVariableTypeEnvironment typeEnv, JobGenContext context,
JobSpecification jobSpec, Object implConfig,
IProjectionFiltrationInfo<?> projectionInfo,
IProjectionFiltrationInfo<?> metaProjectionInfo)
throws AlgebricksException {
- return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv,
context, true, false, null, ds, indexName,
- null, null, true, true, false, null, null, null,
tupleFilterFactory, outputLimit, false, false,
- DefaultTupleProjectorFactory.INSTANCE, false);
+ return metadataProvider.getBtreeSearchRuntime(jobSpec, opSchema,
typeEnv, context, true, false, null, ds,
+ indexName, null, null, true, true, false, null, null, null,
tupleFilterFactory, outputLimit, false,
+ false, DefaultTupleProjectorFactory.INSTANCE, false);
}
@Override
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index dcd52a0732..07500f710f 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -142,7 +142,7 @@ public class FeedOperations {
IOperatorDescriptor feedIngestor;
AlgebricksPartitionConstraint ingesterPc;
Triple<IOperatorDescriptor, AlgebricksPartitionConstraint,
ITypedAdapterFactory> t =
- metadataProvider.buildFeedIntakeRuntime(spec, feed,
policyAccessor);
+ metadataProvider.getFeedIntakeRuntime(spec, feed,
policyAccessor);
feedIngestor = t.first;
ingesterPc = t.second;
adapterFactory = t.third;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index c77e032f87..89e99fda08 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -139,7 +139,7 @@ public class DatasetDataSource extends DataSource {
properties.put(KEY_EXTERNAL_SCAN_BUFFER_SIZE,
String.valueOf(externalScanBufferSize));
ITypedAdapterFactory adapterFactory =
metadataProvider.getConfiguredAdapterFactory(externalDataset,
edd.getAdapter(), properties, (ARecordType) itemType,
null, context.getWarningCollector());
- return
metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType,
adapterFactory,
+ return metadataProvider.getExternalDatasetScanRuntime(jobSpec,
itemType, adapterFactory,
tupleFilterFactory, outputLimit);
case INTERNAL:
DataSourceId id = getId();
@@ -163,7 +163,7 @@ public class DatasetDataSource extends DataSource {
int[] minFilterFieldIndexes =
createFilterIndexes(minFilterVars, opSchema);
int[] maxFilterFieldIndexes =
createFilterIndexes(maxFilterVars, opSchema);
- return metadataProvider.buildBtreeRuntime(jobSpec, opSchema,
typeEnv, context, true, false, null,
+ return metadataProvider.getBtreeSearchRuntime(jobSpec,
opSchema, typeEnv, context, true, false, null,
((DatasetDataSource) dataSource).getDataset(),
primaryIndex.getIndexName(), null, null, true,
true, false, null, minFilterFieldIndexes,
maxFilterFieldIndexes, tupleFilterFactory,
outputLimit, false, false, tupleProjectorFactory,
false);
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
index 58377f449c..9f7d567410 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/FunctionDataSource.java
@@ -115,8 +115,8 @@ public abstract class FunctionDataSource extends DataSource
{
dataParserFactory.setRecordType(RecordUtil.FULLY_OPEN_RECORD_TYPE);
dataParserFactory.configure(Collections.emptyMap());
adapterFactory.configure(factory, dataParserFactory);
- return
metadataProvider.buildExternalDatasetDataScannerRuntime(jobSpec, itemType,
adapterFactory,
- tupleFilterFactory, outputLimit);
+ return metadataProvider.getExternalDatasetScanRuntime(jobSpec,
itemType, adapterFactory, tupleFilterFactory,
+ outputLimit);
}
protected abstract IDatasourceFunction createFunction(MetadataProvider
metadataProvider,
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
index c7ccc531d9..fc65c4b7af 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/LoadableDataSource.java
@@ -147,7 +147,7 @@ public class LoadableDataSource extends DataSource {
ITypedAdapterFactory adapterFactory =
metadataProvider.getConfiguredAdapterFactory(alds.getTargetDataset(),
alds.getAdapter(), alds.getAdapterProperties(), itemType,
null, context.getWarningCollector());
RecordDescriptor rDesc = JobGenHelper.mkRecordDescriptor(typeEnv,
opSchema, context);
- return metadataProvider.buildLoadableDatasetScan(jobSpec,
adapterFactory, rDesc);
+ return metadataProvider.getLoadableDatasetScanRuntime(jobSpec,
adapterFactory, rDesc);
}
@Override
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index debf60efb4..486238959f 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -188,7 +188,6 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
private ResultSetId resultSetId;
private Counter resultSetIdCounter;
private TxnId txnId;
- private Map<String, Integer> externalDataLocks;
private boolean blockingOperatorDisabled = false;
public static MetadataProvider create(ICcApplicationContext appCtx,
Dataverse defaultDataverse) {
@@ -326,14 +325,6 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
return storageProperties;
}
- public Map<String, Integer> getExternalDataLocks() {
- return externalDataLocks;
- }
-
- public void setExternalDataLocks(Map<String, Integer> locks) {
- this.externalDataLocks = locks;
- }
-
private DataverseName getActiveDataverseName(DataverseName dataverseName) {
return dataverseName != null ? dataverseName
: defaultDataverse != null ?
defaultDataverse.getDataverseName() : null;
@@ -496,7 +487,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
context, jobSpec, implConfig, projectionInfo,
metaProjectionInfo);
}
- protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
buildLoadableDatasetScan(
+ protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getLoadableDatasetScanRuntime(
JobSpecification jobSpec, ITypedAdapterFactory adapterFactory,
RecordDescriptor rDesc)
throws AlgebricksException {
ExternalScanOperatorDescriptor dataScanner = new
ExternalScanOperatorDescriptor(jobSpec, rDesc, adapterFactory);
@@ -511,7 +502,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
return MetadataManager.INSTANCE.getDataverse(mdTxnCtx, dataverseName);
}
- public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint,
ITypedAdapterFactory> buildFeedIntakeRuntime(
+ public Triple<IOperatorDescriptor, AlgebricksPartitionConstraint,
ITypedAdapterFactory> getFeedIntakeRuntime(
JobSpecification jobSpec, Feed feed, FeedPolicyAccessor
policyAccessor) throws Exception {
Triple<ITypedAdapterFactory, RecordDescriptor,
IDataSourceAdapter.AdapterType> factoryOutput;
factoryOutput =
@@ -539,7 +530,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
return new Triple<>(feedIngestor, partitionConstraint, adapterFactory);
}
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
buildBtreeRuntime(JobSpecification jobSpec,
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getBtreeSearchRuntime(JobSpecification jobSpec,
IOperatorSchema opSchema, IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput,
boolean retainMissing, IMissingWriterFactory
nonMatchWriterFactory, Dataset dataset, String indexName,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
boolean highKeyInclusive,
@@ -633,23 +624,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
return new Pair<>(btreeSearchOp, spPc.second);
}
- public static int getNumPartitions(AlgebricksPartitionConstraint
constraint) {
- if (constraint.getPartitionConstraintType() ==
AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
- return ((AlgebricksCountPartitionConstraint)
constraint).getCount();
- } else {
- return ((AlgebricksAbsolutePartitionConstraint)
constraint).getLocations().length;
- }
- }
-
- public static int[][] getPartitionsMap(int numPartitions) {
- int[][] map = new int[numPartitions][1];
- for (int i = 0; i < numPartitions; i++) {
- map[i] = new int[] { i };
- }
- return map;
- }
-
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
buildRtreeRuntime(JobSpecification jobSpec,
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getRtreeSearchRuntime(JobSpecification jobSpec,
List<LogicalVariable> outputVars, IOperatorSchema opSchema,
IVariableTypeEnvironment typeEnv,
JobGenContext context, boolean retainInput, boolean retainMissing,
IMissingWriterFactory nonMatchWriterFactory, Dataset dataset,
String indexName, int[] keyFields,
@@ -808,6 +783,53 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
additionalNonKeyFields, inputRecordDesc, context, spec, false,
additionalNonFilteringFields);
}
+ @Override
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getUpsertRuntime(
+ IDataSource<DataSourceId> dataSource, IOperatorSchema inputSchema,
IVariableTypeEnvironment typeEnv,
+ List<LogicalVariable> primaryKeys, LogicalVariable payload,
List<LogicalVariable> filterKeys,
+ List<LogicalVariable> additionalNonFilterFields, RecordDescriptor
recordDesc, JobGenContext context,
+ JobSpecification spec) throws AlgebricksException {
+ DataverseName dataverseName = dataSource.getId().getDataverseName();
+ String datasetName = dataSource.getId().getDatasourceName();
+ Dataset dataset = findDataset(dataverseName, datasetName);
+ if (dataset == null) {
+ throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE,
datasetName, dataverseName);
+ }
+ int numKeys = primaryKeys.size();
+ int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0
: 1;
+ int numOfAdditionalFields = additionalNonFilterFields == null ? 0 :
additionalNonFilterFields.size();
+ // Move key fields to front. [keys, record, filters]
+ int[] fieldPermutation = new int[numKeys + 1 + numFilterFields +
numOfAdditionalFields];
+ int[] bloomFilterKeyFields = new int[numKeys];
+ int i = 0;
+ // set the keys' permutations
+ for (LogicalVariable varKey : primaryKeys) {
+ int idx = inputSchema.findVariable(varKey);
+ fieldPermutation[i] = idx;
+ bloomFilterKeyFields[i] = i;
+ i++;
+ }
+ // set the record permutation
+ fieldPermutation[i++] = inputSchema.findVariable(payload);
+
+ // set the meta record permutation
+ if (additionalNonFilterFields != null) {
+ for (LogicalVariable var : additionalNonFilterFields) {
+ int idx = inputSchema.findVariable(var);
+ fieldPermutation[i++] = idx;
+ }
+ }
+
+ // set the filters' permutations.
+ if (numFilterFields > 0) {
+ int idx = inputSchema.findVariable(filterKeys.get(0));
+ fieldPermutation[i++] = idx;
+ }
+
+ return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc,
fieldPermutation,
+ context.getMissingWriterFactory());
+ }
+
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getIndexInsertRuntime(
IDataSourceIndex<String, DataSourceId> dataSourceIndex,
IOperatorSchema propagatedSchema,
@@ -816,9 +838,9 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec,
boolean bulkload, List<List<AlgebricksPipeline>>
secondaryKeysPipelines, IOperatorSchema pipelineTopSchema)
throws AlgebricksException {
- return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.INSERT,
dataSourceIndex, propagatedSchema,
- inputSchemas, typeEnv, primaryKeys, secondaryKeys,
additionalNonKeyFields, filterExpr, null, recordDesc,
- context, spec, bulkload, null, null, null,
secondaryKeysPipelines, pipelineTopSchema);
+ return getIndexModificationRuntime(IndexOperation.INSERT,
dataSourceIndex, propagatedSchema, inputSchemas,
+ typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields,
filterExpr, null, recordDesc, context,
+ spec, bulkload, null, null, null, secondaryKeysPipelines,
pipelineTopSchema);
}
@Override
@@ -829,9 +851,9 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
ILogicalExpression filterExpr, RecordDescriptor recordDesc,
JobGenContext context, JobSpecification spec,
List<List<AlgebricksPipeline>> secondaryKeysPipelines,
IOperatorSchema pipelineTopSchema)
throws AlgebricksException {
- return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.DELETE,
dataSourceIndex, propagatedSchema,
- inputSchemas, typeEnv, primaryKeys, secondaryKeys,
additionalNonKeyFields, filterExpr, null, recordDesc,
- context, spec, false, null, null, null,
secondaryKeysPipelines, pipelineTopSchema);
+ return getIndexModificationRuntime(IndexOperation.DELETE,
dataSourceIndex, propagatedSchema, inputSchemas,
+ typeEnv, primaryKeys, secondaryKeys, additionalNonKeyFields,
filterExpr, null, recordDesc, context,
+ spec, false, null, null, null, secondaryKeysPipelines,
pipelineTopSchema);
}
@Override
@@ -843,9 +865,9 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
List<LogicalVariable> prevSecondaryKeys, LogicalVariable
prevAdditionalFilteringKey,
RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec,
List<List<AlgebricksPipeline>> secondaryKeysPipelines) throws
AlgebricksException {
- return getIndexInsertOrDeleteOrUpsertRuntime(IndexOperation.UPSERT,
dataSourceIndex, propagatedSchema,
- inputSchemas, typeEnv, primaryKeys, secondaryKeys,
additionalFilteringKeys, filterExpr, prevFilterExpr,
- recordDesc, context, spec, false, operationVar,
prevSecondaryKeys, prevAdditionalFilteringKey,
+ return getIndexModificationRuntime(IndexOperation.UPSERT,
dataSourceIndex, propagatedSchema, inputSchemas,
+ typeEnv, primaryKeys, secondaryKeys, additionalFilteringKeys,
filterExpr, prevFilterExpr, recordDesc,
+ context, spec, false, operationVar, prevSecondaryKeys,
prevAdditionalFilteringKey,
secondaryKeysPipelines, null);
}
@@ -969,53 +991,6 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
return null;
}
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getUpsertRuntime(
- IDataSource<DataSourceId> dataSource, IOperatorSchema inputSchema,
IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys, LogicalVariable payload,
List<LogicalVariable> filterKeys,
- List<LogicalVariable> additionalNonFilterFields, RecordDescriptor
recordDesc, JobGenContext context,
- JobSpecification spec) throws AlgebricksException {
- DataverseName dataverseName = dataSource.getId().getDataverseName();
- String datasetName = dataSource.getId().getDatasourceName();
- Dataset dataset = findDataset(dataverseName, datasetName);
- if (dataset == null) {
- throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE,
datasetName, dataverseName);
- }
- int numKeys = primaryKeys.size();
- int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0
: 1;
- int numOfAdditionalFields = additionalNonFilterFields == null ? 0 :
additionalNonFilterFields.size();
- // Move key fields to front. [keys, record, filters]
- int[] fieldPermutation = new int[numKeys + 1 + numFilterFields +
numOfAdditionalFields];
- int[] bloomFilterKeyFields = new int[numKeys];
- int i = 0;
- // set the keys' permutations
- for (LogicalVariable varKey : primaryKeys) {
- int idx = inputSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- bloomFilterKeyFields[i] = i;
- i++;
- }
- // set the record permutation
- fieldPermutation[i++] = inputSchema.findVariable(payload);
-
- // set the meta record permutation
- if (additionalNonFilterFields != null) {
- for (LogicalVariable var : additionalNonFilterFields) {
- int idx = inputSchema.findVariable(var);
- fieldPermutation[i++] = idx;
- }
- }
-
- // set the filters' permutations.
- if (numFilterFields > 0) {
- int idx = inputSchema.findVariable(filterKeys.get(0));
- fieldPermutation[i++] = idx;
- }
-
- return createPrimaryIndexUpsertOp(spec, this, dataset, recordDesc,
fieldPermutation,
- context.getMissingWriterFactory());
- }
-
protected Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
createPrimaryIndexUpsertOp(JobSpecification spec,
MetadataProvider metadataProvider, Dataset dataset,
RecordDescriptor inputRecordDesc,
int[] fieldPermutation, IMissingWriterFactory
missingWriterFactory) throws AlgebricksException {
@@ -1024,7 +999,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
missingWriterFactory);
}
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
buildExternalDatasetDataScannerRuntime(
+ public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getExternalDatasetScanRuntime(
JobSpecification jobSpec, IAType itemType, ITypedAdapterFactory
adapterFactory,
ITupleFilterFactory tupleFilterFactory, long outputLimit) throws
AlgebricksException {
if (itemType.getTypeTag() != ATypeTag.OBJECT) {
@@ -1162,13 +1137,12 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
tupleFilterFactory, isPrimary, modCallbackFactory,
tuplePartitionerFactory, partitionsMap);
}
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getIndexInsertOrDeleteOrUpsertRuntime(
- IndexOperation indexOp, IDataSourceIndex<String, DataSourceId>
dataSourceIndex,
- IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas,
IVariableTypeEnvironment typeEnv,
- List<LogicalVariable> primaryKeys, List<LogicalVariable>
secondaryKeys,
- List<LogicalVariable> additionalNonKeyFields, ILogicalExpression
filterExpr,
- ILogicalExpression prevFilterExpr, RecordDescriptor
inputRecordDesc, JobGenContext context,
- JobSpecification spec, boolean bulkload, LogicalVariable
operationVar,
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getIndexModificationRuntime(IndexOperation indexOp,
+ IDataSourceIndex<String, DataSourceId> dataSourceIndex,
IOperatorSchema propagatedSchema,
+ IOperatorSchema[] inputSchemas, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> primaryKeys,
+ List<LogicalVariable> secondaryKeys, List<LogicalVariable>
additionalNonKeyFields,
+ ILogicalExpression filterExpr, ILogicalExpression prevFilterExpr,
RecordDescriptor inputRecordDesc,
+ JobGenContext context, JobSpecification spec, boolean bulkload,
LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys, LogicalVariable
prevAdditionalFilteringKey,
List<List<AlgebricksPipeline>> secondaryKeysPipelines,
IOperatorSchema pipelineTopSchema)
throws AlgebricksException {
@@ -1202,33 +1176,33 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
switch (secondaryIndex.getIndexType()) {
case BTREE:
- return getBTreeRuntime(dataverseName, datasetName, indexName,
propagatedSchema, primaryKeys,
+ return getBTreeModificationRuntime(dataverseName, datasetName,
indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory,
prevFilterFactory, inputRecordDesc,
context, spec, indexOp, bulkload, operationVar,
prevSecondaryKeys, prevAdditionalFilteringKeys);
case ARRAY:
if (bulkload) {
// In the case of bulk-load, we do not handle any nested
plans. We perform the exact same behavior
// as a normal B-Tree bulk load.
- return getBTreeRuntime(dataverseName, datasetName,
indexName, propagatedSchema, primaryKeys,
- secondaryKeys, additionalNonKeyFields,
filterFactory, prevFilterFactory, inputRecordDesc,
- context, spec, indexOp, bulkload, operationVar,
prevSecondaryKeys,
+ return getBTreeModificationRuntime(dataverseName,
datasetName, indexName, propagatedSchema,
+ primaryKeys, secondaryKeys,
additionalNonKeyFields, filterFactory, prevFilterFactory,
+ inputRecordDesc, context, spec, indexOp, bulkload,
operationVar, prevSecondaryKeys,
prevAdditionalFilteringKeys);
} else {
- return getArrayIndexRuntime(dataverseName, datasetName,
indexName, propagatedSchema, primaryKeys,
- additionalNonKeyFields, inputRecordDesc, spec,
indexOp, operationVar,
+ return getArrayIndexModificationRuntime(dataverseName,
datasetName, indexName, propagatedSchema,
+ primaryKeys, additionalNonKeyFields,
inputRecordDesc, spec, indexOp, operationVar,
secondaryKeysPipelines);
}
case RTREE:
- return getRTreeRuntime(dataverseName, datasetName, indexName,
propagatedSchema, primaryKeys,
+ return getRTreeModificationRuntime(dataverseName, datasetName,
indexName, propagatedSchema, primaryKeys,
secondaryKeys, additionalNonKeyFields, filterFactory,
prevFilterFactory, inputRecordDesc,
context, spec, indexOp, bulkload, operationVar,
prevSecondaryKeys, prevAdditionalFilteringKeys);
case SINGLE_PARTITION_WORD_INVIX:
case SINGLE_PARTITION_NGRAM_INVIX:
case LENGTH_PARTITIONED_WORD_INVIX:
case LENGTH_PARTITIONED_NGRAM_INVIX:
- return getInvertedIndexRuntime(dataverseName, datasetName,
indexName, propagatedSchema, primaryKeys,
- secondaryKeys, additionalNonKeyFields, filterFactory,
prevFilterFactory, inputRecordDesc,
- context, spec, indexOp, secondaryIndex.getIndexType(),
bulkload, operationVar,
+ return getInvertedIndexModificationRuntime(dataverseName,
datasetName, indexName, propagatedSchema,
+ primaryKeys, secondaryKeys, additionalNonKeyFields,
filterFactory, prevFilterFactory,
+ inputRecordDesc, context, spec, indexOp,
secondaryIndex.getIndexType(), bulkload, operationVar,
prevSecondaryKeys, prevAdditionalFilteringKeys);
default:
throw new AlgebricksException(
@@ -1236,13 +1210,14 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
}
}
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getBTreeRuntime(DataverseName dataverseName,
- String datasetName, String indexName, IOperatorSchema
propagatedSchema, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, List<LogicalVariable>
additionalNonKeyFields,
- AsterixTupleFilterFactory filterFactory, AsterixTupleFilterFactory
prevFilterFactory,
- RecordDescriptor inputRecordDesc, JobGenContext context,
JobSpecification spec, IndexOperation indexOp,
- boolean bulkload, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys,
- List<LogicalVariable> prevAdditionalFilteringKeys) throws
AlgebricksException {
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getBTreeModificationRuntime(
+ DataverseName dataverseName, String datasetName, String indexName,
IOperatorSchema propagatedSchema,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable>
secondaryKeys,
+ List<LogicalVariable> additionalNonKeyFields,
AsterixTupleFilterFactory filterFactory,
+ AsterixTupleFilterFactory prevFilterFactory, RecordDescriptor
inputRecordDesc, JobGenContext context,
+ JobSpecification spec, IndexOperation indexOp, boolean bulkload,
LogicalVariable operationVar,
+ List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable>
prevAdditionalFilteringKeys)
+ throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx,
dataverseName, datasetName);
int numKeys = primaryKeys.size() + secondaryKeys.size();
int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0
: 1;
@@ -1332,10 +1307,11 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
}
}
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getArrayIndexRuntime(DataverseName dataverseName,
- String datasetName, String indexName, IOperatorSchema
propagatedSchema, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> additionalNonKeyFields, RecordDescriptor
inputRecordDesc, JobSpecification spec,
- IndexOperation indexOp, LogicalVariable operationVar,
List<List<AlgebricksPipeline>> secondaryKeysPipelines)
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getArrayIndexModificationRuntime(
+ DataverseName dataverseName, String datasetName, String indexName,
IOperatorSchema propagatedSchema,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable>
additionalNonKeyFields,
+ RecordDescriptor inputRecordDesc, JobSpecification spec,
IndexOperation indexOp,
+ LogicalVariable operationVar, List<List<AlgebricksPipeline>>
secondaryKeysPipelines)
throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx,
dataverseName, datasetName);
@@ -1397,13 +1373,14 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
}
}
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getRTreeRuntime(DataverseName dataverseName,
- String datasetName, String indexName, IOperatorSchema
propagatedSchema, List<LogicalVariable> primaryKeys,
- List<LogicalVariable> secondaryKeys, List<LogicalVariable>
additionalNonKeyFields,
- AsterixTupleFilterFactory filterFactory, AsterixTupleFilterFactory
prevFilterFactory,
- RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec, IndexOperation indexOp,
- boolean bulkload, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys,
- List<LogicalVariable> prevAdditionalFilteringKeys) throws
AlgebricksException {
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getRTreeModificationRuntime(
+ DataverseName dataverseName, String datasetName, String indexName,
IOperatorSchema propagatedSchema,
+ List<LogicalVariable> primaryKeys, List<LogicalVariable>
secondaryKeys,
+ List<LogicalVariable> additionalNonKeyFields,
AsterixTupleFilterFactory filterFactory,
+ AsterixTupleFilterFactory prevFilterFactory, RecordDescriptor
recordDesc, JobGenContext context,
+ JobSpecification spec, IndexOperation indexOp, boolean bulkload,
LogicalVariable operationVar,
+ List<LogicalVariable> prevSecondaryKeys, List<LogicalVariable>
prevAdditionalFilteringKeys)
+ throws AlgebricksException {
Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx,
dataverseName, datasetName);
String itemTypeName = dataset.getItemTypeName();
IAType itemType = MetadataManager.INSTANCE
@@ -1506,7 +1483,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
return new Pair<>(op, splitsAndConstraint.second);
}
- private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getInvertedIndexRuntime(
+ private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getInvertedIndexModificationRuntime(
DataverseName dataverseName, String datasetName, String indexName,
IOperatorSchema propagatedSchema,
List<LogicalVariable> primaryKeys, List<LogicalVariable>
secondaryKeys,
List<LogicalVariable> additionalNonKeyFields,
AsterixTupleFilterFactory filterFactory,
@@ -1905,6 +1882,22 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
validateDatabaseObjectNameImpl(objectName, sourceLoc);
}
+ public static int getNumPartitions(AlgebricksPartitionConstraint
constraint) {
+ if (constraint.getPartitionConstraintType() ==
AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
+ return ((AlgebricksCountPartitionConstraint)
constraint).getCount();
+ } else {
+ return ((AlgebricksAbsolutePartitionConstraint)
constraint).getLocations().length;
+ }
+ }
+
+ public static int[][] getPartitionsMap(int numPartitions) {
+ int[][] map = new int[numPartitions][1];
+ for (int i = 0; i < numPartitions; i++) {
+ map[i] = new int[] { i };
+ }
+ return map;
+ }
+
private void validateDatabaseObjectNameImpl(String name, SourceLocation
sourceLoc) throws AlgebricksException {
if (name == null || name.isEmpty()) {
throw new AsterixException(ErrorCode.INVALID_DATABASE_OBJECT_NAME,
sourceLoc, "");
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
index 708c2c2047..448f5ce522 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
@@ -62,7 +62,7 @@ public class SampleDataSource extends DataSource {
IVariableTypeEnvironment typeEnv, JobGenContext context,
JobSpecification jobSpec, Object implConfig,
IProjectionFiltrationInfo<?> projectionInfo,
IProjectionFiltrationInfo<?> metaProjectionInfo)
throws AlgebricksException {
- return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv,
context, true, false, null, dataset,
+ return metadataProvider.getBtreeSearchRuntime(jobSpec, opSchema,
typeEnv, context, true, false, null, dataset,
sampleIndexName, null, null, true, true, false, null, null,
null, tupleFilterFactory, outputLimit,
false, false, DefaultTupleProjectorFactory.INSTANCE, false);
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 4596393b25..4a6e76e16b 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -43,7 +43,6 @@ import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
public interface IMetadataProvider<S, I> {
- IDataSource<S> findDataSource(S id) throws AlgebricksException;
/**
* Obs: A scanner may choose to contribute a null
@@ -78,6 +77,12 @@ public interface IMetadataProvider<S, I> {
List<LogicalVariable> additionalNonFilteringFields,
RecordDescriptor inputRecordDesc, JobGenContext context,
JobSpecification jobSpec, boolean bulkload) throws
AlgebricksException;
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getUpsertRuntime(IDataSource<S> dataSource,
+ IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> keys,
+ LogicalVariable payLoadVar, List<LogicalVariable>
additionalFilterFields,
+ List<LogicalVariable> additionalNonFilteringFields,
RecordDescriptor recordDesc, JobGenContext context,
+ JobSpecification jobSpec) throws AlgebricksException;
+
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getDeleteRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment
typeEnv, List<LogicalVariable> keys,
LogicalVariable payLoadVar, List<LogicalVariable>
additionalNonKeyFields,
@@ -115,6 +120,14 @@ public interface IMetadataProvider<S, I> {
List<List<AlgebricksPipeline>> secondaryKeysPipelines,
IOperatorSchema pipelineTopSchema)
throws AlgebricksException;
+ Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getIndexUpsertRuntime(
+ IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema
propagatedSchema, IOperatorSchema[] inputSchemas,
+ IVariableTypeEnvironment typeEnv, List<LogicalVariable>
primaryKeys, List<LogicalVariable> secondaryKeys,
+ List<LogicalVariable> additionalFilteringKeys, ILogicalExpression
filterExpr,
+ ILogicalExpression prevFilterExpr, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys,
+ LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor
inputDesc, JobGenContext context,
+ JobSpecification spec, List<List<AlgebricksPipeline>>
secondaryKeysPipelines) throws AlgebricksException;
+
/**
* Creates the delete runtime of IndexInsertDeletePOperator, which models
* insert/delete operations into a secondary index.
@@ -170,24 +183,12 @@ public interface IMetadataProvider<S, I> {
RecordDescriptor recordDesc, JobGenContext context,
JobSpecification spec, boolean bulkload)
throws AlgebricksException;
+ IDataSource<S> findDataSource(S id) throws AlgebricksException;
+
IDataSourceIndex<I, S> findDataSourceIndex(I indexId, S dataSourceId)
throws AlgebricksException;
IFunctionInfo lookupFunction(FunctionIdentifier fid);
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getUpsertRuntime(IDataSource<S> dataSource,
- IOperatorSchema inputSchema, IVariableTypeEnvironment typeEnv,
List<LogicalVariable> keys,
- LogicalVariable payLoadVar, List<LogicalVariable>
additionalFilterFields,
- List<LogicalVariable> additionalNonFilteringFields,
RecordDescriptor recordDesc, JobGenContext context,
- JobSpecification jobSpec) throws AlgebricksException;
-
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getIndexUpsertRuntime(
- IDataSourceIndex<I, S> dataSourceIndex, IOperatorSchema
propagatedSchema, IOperatorSchema[] inputSchemas,
- IVariableTypeEnvironment typeEnv, List<LogicalVariable>
primaryKeys, List<LogicalVariable> secondaryKeys,
- List<LogicalVariable> additionalFilteringKeys, ILogicalExpression
filterExpr,
- ILogicalExpression prevFilterExpr, LogicalVariable operationVar,
List<LogicalVariable> prevSecondaryKeys,
- LogicalVariable prevAdditionalFilteringKeys, RecordDescriptor
inputDesc, JobGenContext context,
- JobSpecification spec, List<List<AlgebricksPipeline>>
secondaryKeysPipelines) throws AlgebricksException;
-
ITupleFilterFactory createTupleFilterFactory(IOperatorSchema[]
inputSchemas, IVariableTypeEnvironment typeEnv,
ILogicalExpression filterExpr, JobGenContext context) throws
AlgebricksException;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 1c961c5c68..c46391f5c6 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -57,7 +57,7 @@ public class BTreeSearchOperatorDescriptor extends
AbstractSingleActivityOperato
protected final long outputLimit;
protected final ITupleProjectorFactory tupleProjectorFactory;
protected final ITuplePartitionerFactory tuplePartitionerFactory;
- protected final int[][] map;
+ protected final int[][] partitionsMap;
public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor outRecDesc,
int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive,
boolean highKeyInclusive,
@@ -79,7 +79,7 @@ public class BTreeSearchOperatorDescriptor extends
AbstractSingleActivityOperato
IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory
tupleFilterFactory, long outputLimit,
boolean appendOpCallbackProceedResult, byte[]
searchCallbackProceedResultFalseValue,
byte[] searchCallbackProceedResultTrueValue,
ITupleProjectorFactory tupleProjectorFactory,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][]
partitionsMap) {
super(spec, 1, 1);
this.indexHelperFactory = indexHelperFactory;
this.retainInput = retainInput;
@@ -102,7 +102,7 @@ public class BTreeSearchOperatorDescriptor extends
AbstractSingleActivityOperato
this.searchCallbackProceedResultTrueValue =
searchCallbackProceedResultTrueValue;
this.tupleProjectorFactory = tupleProjectorFactory;
this.tuplePartitionerFactory = tuplePartitionerFactory;
- this.map = map;
+ this.partitionsMap = partitionsMap;
}
@Override
@@ -114,7 +114,7 @@ public class BTreeSearchOperatorDescriptor extends
AbstractSingleActivityOperato
retainInput, retainMissing, missingWriterFactory,
searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, tupleFilterFactory, outputLimit,
appendOpCallbackProceedResult,
searchCallbackProceedResultFalseValue,
searchCallbackProceedResultTrueValue, tupleProjectorFactory,
- tuplePartitionerFactory, map);
+ tuplePartitionerFactory, partitionsMap);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index 24163ea9d9..3fd0cf96a2 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -68,12 +68,12 @@ public class BTreeSearchOperatorNodePushable extends
IndexSearchOperatorNodePush
IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory
tupleFilterFactory, long outputLimit,
boolean appendOpCallbackProceedResult, byte[]
searchCallbackProceedResultFalseValue,
byte[] searchCallbackProceedResultTrueValue,
ITupleProjectorFactory projectorFactory,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] map)
throws HyracksDataException {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][]
partitionsMap) throws HyracksDataException {
super(ctx, inputRecDesc, partition, minFilterFieldIndexes,
maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, nonMatchWriterFactory,
searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, tupleFilterFactory, outputLimit,
appendOpCallbackProceedResult,
searchCallbackProceedResultFalseValue,
searchCallbackProceedResultTrueValue, projectorFactory,
- tuplePartitionerFactory, map);
+ tuplePartitionerFactory, partitionsMap);
this.lowKeyInclusive = lowKeyInclusive;
this.highKeyInclusive = highKeyInclusive;
if (lowKeyFields != null && lowKeyFields.length > 0) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index c6cdd663b1..88c06eb91a 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -128,10 +128,10 @@ public abstract class IndexSearchOperatorNodePushable
extends AbstractUnaryInput
IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory
tupleFilterFactory, long outputLimit,
boolean appendSearchCallbackProceedResult, byte[]
searchCallbackProceedResultFalseValue,
byte[] searchCallbackProceedResultTrueValue,
ITupleProjectorFactory projectorFactory,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] map)
throws HyracksDataException {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][]
partitionsMap) throws HyracksDataException {
this.ctx = ctx;
this.appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
- this.partitions = map != null ? map[partition] : new int[] { partition
};
+ this.partitions = partitionsMap != null ? partitionsMap[partition] :
new int[] { partition };
for (int i = 0; i < partitions.length; i++) {
storagePartitionId2Index.put(partitions[i], i);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
index 9ed0782a89..4d8f1fe7af 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
@@ -41,11 +41,11 @@ public class LSMBTreeBatchPointSearchOperatorDescriptor
extends BTreeSearchOpera
IMissingWriterFactory missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes,
ITupleFilterFactory tupleFilterFactory,
long outputLimit, ITupleProjectorFactory tupleProjectorFactory,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][]
partitionsMap) {
super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive,
highKeyInclusive, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory,
searchCallbackFactory, minFilterFieldIndexes,
maxFilterFieldIndexes, false, null, tupleFilterFactory,
outputLimit, false, null, null,
- tupleProjectorFactory, tuplePartitionerFactory, map);
+ tupleProjectorFactory, tuplePartitionerFactory, partitionsMap);
}
@Override
@@ -55,7 +55,7 @@ public class LSMBTreeBatchPointSearchOperatorDescriptor
extends BTreeSearchOpera
recordDescProvider.getInputRecordDescriptor(getActivityId(),
0), lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes,
maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory,
searchCallbackFactory, tupleFilterFactory,
- outputLimit, tupleProjectorFactory, tuplePartitionerFactory,
map);
+ outputLimit, tupleProjectorFactory, tuplePartitionerFactory,
partitionsMap);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index 47d515a3e3..8c8a550627 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -54,11 +54,11 @@ public class LSMBTreeBatchPointSearchOperatorNodePushable
extends BTreeSearchOpe
IIndexDataflowHelperFactory indexHelperFactory, boolean
retainInput, boolean retainMissing,
IMissingWriterFactory missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory,
ITupleFilterFactory tupleFilterFactory, long outputLimit,
ITupleProjectorFactory tupleProjectorFactory,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][] map)
throws HyracksDataException {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][]
partitionsMap) throws HyracksDataException {
super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields,
lowKeyInclusive, highKeyInclusive,
minFilterKeyFields, maxFilterKeyFields, indexHelperFactory,
retainInput, retainMissing,
missingWriterFactory, searchCallbackFactory, false, null,
tupleFilterFactory, outputLimit, false, null,
- null, tupleProjectorFactory, tuplePartitionerFactory, map);
+ null, tupleProjectorFactory, tuplePartitionerFactory,
partitionsMap);
this.keyFields = lowKeyFields;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
index b5b951da0e..fcdf792c23 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorDescriptor.java
@@ -35,7 +35,7 @@ import
org.apache.hyracks.storage.am.lsm.invertedindex.fulltext.IFullTextConfigE
import
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
public class LSMInvertedIndexSearchOperatorDescriptor extends
AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final int queryField;
private final IInvertedIndexSearchModifierFactory searchModifierFactory;
@@ -54,6 +54,7 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends
AbstractSingleActi
private final int numOfFields;
// the maximum number of frames that this inverted-index-search can use
private final int frameLimit;
+ private final int[][] partitionsMap;
public
LSMInvertedIndexSearchOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor outRecDesc,
int queryField, IIndexDataflowHelperFactory indexHelperFactory,
@@ -62,7 +63,8 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends
AbstractSingleActi
IInvertedIndexSearchModifierFactory searchModifierFactory, boolean
retainInput, boolean retainMissing,
IMissingWriterFactory missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory,
int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean
isFullTextSearchQuery, int numOfFields,
- boolean appendIndexFilter, IMissingWriterFactory
nonFilterWriterFactory, int frameLimit) {
+ boolean appendIndexFilter, IMissingWriterFactory
nonFilterWriterFactory, int frameLimit,
+ int[][] partitionsMap) {
super(spec, 1, 1);
this.indexHelperFactory = indexHelperFactory;
this.queryTokenizerFactory = queryTokenizerFactory;
@@ -79,6 +81,7 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends
AbstractSingleActi
this.appendIndexFilter = appendIndexFilter;
this.nonFilterWriterFactory = nonFilterWriterFactory;
this.numOfFields = numOfFields;
+ this.partitionsMap = partitionsMap;
this.outRecDescs[0] = outRecDesc;
this.frameLimit = frameLimit;
}
@@ -91,6 +94,7 @@ public class LSMInvertedIndexSearchOperatorDescriptor extends
AbstractSingleActi
recordDescProvider.getInputRecordDescriptor(getActivityId(),
0), partition, minFilterFieldIndexes,
maxFilterFieldIndexes, indexHelperFactory, retainInput,
retainMissing, missingWriterFactory,
searchCallbackFactory, searchModifier, queryTokenizerFactory,
fullTextConfigEvaluatorFactory,
- queryField, isFullTextSearchQuery, numOfFields,
appendIndexFilter, nonFilterWriterFactory, frameLimit);
+ queryField, isFullTextSearchQuery, numOfFields,
appendIndexFilter, nonFilterWriterFactory, frameLimit,
+ partitionsMap);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
index 996241daef..742a86c14a 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
@@ -65,10 +65,12 @@ public class LSMInvertedIndexSearchOperatorNodePushable
extends IndexSearchOpera
IInvertedIndexSearchModifier searchModifier,
IBinaryTokenizerFactory binaryTokenizerFactory,
IFullTextConfigEvaluatorFactory fullTextConfigEvaluatorFactory,
int queryFieldIndex,
boolean isFullTextSearchQuery, int numOfFields, boolean
appendIndexFilter,
- IMissingWriterFactory nonFilterWriterFactory, int frameLimit)
throws HyracksDataException {
+ IMissingWriterFactory nonFilterWriterFactory, int frameLimit,
int[][] partitionsMap)
+ throws HyracksDataException {
super(ctx, inputRecDesc, partition, minFilterFieldIndexes,
maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory,
searchCallbackFactory, appendIndexFilter,
- nonFilterWriterFactory, null, -1, false, null, null,
DefaultTupleProjectorFactory.INSTANCE, null, null);
+ nonFilterWriterFactory, null, -1, false, null, null,
DefaultTupleProjectorFactory.INSTANCE, null,
+ partitionsMap);
this.searchModifier = searchModifier;
this.binaryTokenizerFactory = binaryTokenizerFactory;
this.fullTextConfigEvaluatorFactory = fullTextConfigEvaluatorFactory;