This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c4e589ea3 [ASTERIXDB-3144][HYR] Make index search runtime support 
multiple partitions
3c4e589ea3 is described below

commit 3c4e589ea381bbaaaba448b030867aa6c8cc03af
Author: Ali Alsuliman <[email protected]>
AuthorDate: Sun Mar 26 02:36:45 2023 -0700

    [ASTERIXDB-3144][HYR] Make index search runtime support multiple partitions
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    This patch changes the index search runtime to support
    operating on multiple partitions. With this change, an index
    search node pushable will read from multiple indexes
    representing multiple partitions. This is a step towards
    achieving compute/storage separation.
    
    Change-Id: Iea8418bdfbca2db9cc5f0aa23c2434f3779e8531
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17444
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Murtadha Al Hubail <[email protected]>
---
 .../operators/physical/BTreeSearchPOperator.java   |  28 ++--
 .../asterix/app/function/QueryIndexDatasource.java |   2 +-
 .../metadata/declared/DatasetDataSource.java       |   2 +-
 .../metadata/declared/MetadataProvider.java        |  34 +++-
 .../metadata/declared/SampleDataSource.java        |   2 +-
 .../apache/asterix/metadata/utils/DatasetUtil.java |   8 +-
 .../api/dataflow/value/ITuplePartitioner.java      |  36 +++++
 .../dataflow/value/ITuplePartitionerFactory.java   |  29 ++++
 .../data/partition/FieldHashPartitionComputer.java |  36 +++++
 .../FieldHashPartitionComputerFactory.java         |  29 +---
 .../data/partition/FieldHashPartitioner.java       |  39 +++++
 .../partition/FieldHashPartitionerFactory.java     |  49 ++++++
 .../common/data/partition/HashPartitioner.java     |  55 +++++++
 .../dataflow/BTreeSearchOperatorDescriptor.java    |  15 +-
 .../dataflow/BTreeSearchOperatorNodePushable.java  |  15 +-
 .../storage/am/btree/test/FramewriterTest.java     |   2 +-
 .../hyracks/hyracks-storage-am-common/pom.xml      |   4 +
 .../dataflow/IndexSearchOperatorNodePushable.java  | 178 +++++++++++++--------
 ...LSMBTreeBatchPointSearchOperatorDescriptor.java |   8 +-
 ...MBTreeBatchPointSearchOperatorNodePushable.java |  38 +++--
 ...BTreeDiskComponentScanOperatorNodePushable.java |  14 +-
 ...LSMInvertedIndexSearchOperatorNodePushable.java |   8 +-
 .../dataflow/RTreeSearchOperatorNodePushable.java  |  18 +--
 23 files changed, 475 insertions(+), 174 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
index a8dcb1f749..a7a383821b 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/BTreeSearchPOperator.java
@@ -84,7 +84,6 @@ public class BTreeSearchPOperator extends 
IndexSearchPOperator {
     private final List<LogicalVariable> highKeyVarList;
     private final boolean isPrimaryIndex;
     private final boolean isEqCondition;
-    private Object implConfig;
 
     public BTreeSearchPOperator(IDataSourceIndex<String, DataSourceId> idx, 
INodeDomain domain,
             boolean requiresBroadcast, boolean isPrimaryIndex, boolean 
isEqCondition,
@@ -96,14 +95,6 @@ public class BTreeSearchPOperator extends 
IndexSearchPOperator {
         this.highKeyVarList = highKeyVarList;
     }
 
-    public void setImplConfig(Object implConfig) {
-        this.implConfig = implConfig;
-    }
-
-    public Object getImplConfig() {
-        return implConfig;
-    }
-
     @Override
     public PhysicalOperatorTag getOperatorTag() {
         return PhysicalOperatorTag.BTREE_SEARCH;
@@ -177,7 +168,8 @@ public class BTreeSearchPOperator extends 
IndexSearchPOperator {
                 jobGenParams.isLowKeyInclusive(), 
jobGenParams.isHighKeyInclusive(), propagateFilter,
                 nonFilterWriterFactory, minFilterFieldIndexes, 
maxFilterFieldIndexes, tupleFilterFactory, outputLimit,
                 unnestMap.getGenerateCallBackProceedResultVar(),
-                isPrimaryIndexPointSearch(op, 
context.getPhysicalOptimizationConfig()), tupleProjectorFactory);
+                useBatchPointSearch(op, 
context.getPhysicalOptimizationConfig()), tupleProjectorFactory,
+                isPrimaryIndexPointSearch());
         IOperatorDescriptor opDesc = btreeSearch.first;
         opDesc.setSourceLocation(unnestMap.getSourceLocation());
 
@@ -188,18 +180,20 @@ public class BTreeSearchPOperator extends 
IndexSearchPOperator {
         builder.contributeGraphEdge(srcExchange, 0, unnestMap, 0);
     }
 
-    /**
-     * Check whether we can use {@link LSMBTreeBatchPointSearchCursor} to 
perform point-lookups on the primary index
-     */
-    private boolean isPrimaryIndexPointSearch(ILogicalOperator op, 
PhysicalOptimizationConfig config) {
-        if (!config.isBatchLookupEnabled() || !isEqCondition || !isPrimaryIndex
-                || !lowKeyVarList.equals(highKeyVarList)) {
+    private boolean isPrimaryIndexPointSearch() {
+        if (!isEqCondition || !isPrimaryIndex || 
!lowKeyVarList.equals(highKeyVarList)) {
             return false;
         }
         Index searchIndex = ((DataSourceIndex) idx).getIndex();
         int numberOfKeyFields = ((Index.ValueIndexDetails) 
searchIndex.getIndexDetails()).getKeyFieldNames().size();
+        return lowKeyVarList.size() == numberOfKeyFields && 
highKeyVarList.size() == numberOfKeyFields;
+    }
 
-        if (lowKeyVarList.size() != numberOfKeyFields || highKeyVarList.size() 
!= numberOfKeyFields) {
+    /**
+     * Check whether we can use {@link LSMBTreeBatchPointSearchCursor} to 
perform point-lookups on the primary index
+     */
+    private boolean useBatchPointSearch(ILogicalOperator op, 
PhysicalOptimizationConfig config) {
+        if (!config.isBatchLookupEnabled() || !isPrimaryIndexPointSearch()) {
             return false;
         }
 
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
index 047a6d5faa..f962142758 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/QueryIndexDatasource.java
@@ -107,7 +107,7 @@ public class QueryIndexDatasource extends 
FunctionDataSource {
             throws AlgebricksException {
         return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, 
context, true, false, null, ds, indexName,
                 null, null, true, true, false, null, null, null, 
tupleFilterFactory, outputLimit, false, false,
-                DefaultTupleProjectorFactory.INSTANCE);
+                DefaultTupleProjectorFactory.INSTANCE, false);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
index dbcbce0834..c77e032f87 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/DatasetDataSource.java
@@ -166,7 +166,7 @@ public class DatasetDataSource extends DataSource {
                 return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, 
typeEnv, context, true, false, null,
                         ((DatasetDataSource) dataSource).getDataset(), 
primaryIndex.getIndexName(), null, null, true,
                         true, false, null, minFilterFieldIndexes, 
maxFilterFieldIndexes, tupleFilterFactory,
-                        outputLimit, false, false, tupleProjectorFactory);
+                        outputLimit, false, false, tupleProjectorFactory, 
false);
             default:
                 throw new AlgebricksException("Unknown datasource type");
         }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 550864485e..3692a8d99e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -105,6 +105,7 @@ import 
org.apache.asterix.runtime.operators.LSMSecondaryInsertDeleteWithNestedPl
 import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertOperatorDescriptor;
 import 
org.apache.asterix.runtime.operators.LSMSecondaryUpsertWithNestedPlanOperatorDescriptor;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.utils.Pair;
@@ -134,10 +135,12 @@ import 
org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import 
org.apache.hyracks.algebricks.runtime.operators.std.SinkWriterRuntimeFactory;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ILinearizeComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IResultSerializerFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
@@ -149,6 +152,7 @@ import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultSetId;
 import org.apache.hyracks.data.std.primitive.ShortPointable;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.ShortSerializerDeserializer;
+import 
org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 import 
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
@@ -541,8 +545,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, 
boolean highKeyInclusive,
             boolean propagateFilter, IMissingWriterFactory 
nonFilterWriterFactory, int[] minFilterFieldIndexes,
             int[] maxFilterFieldIndexes, ITupleFilterFactory 
tupleFilterFactory, long outputLimit,
-            boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, 
ITupleProjectorFactory tupleProjectorFactory)
-            throws AlgebricksException {
+            boolean isIndexOnlyPlan, boolean isPrimaryIndexPointSearch, 
ITupleProjectorFactory tupleProjectorFactory,
+            boolean partitionInputTuples) throws AlgebricksException {
         boolean isSecondary = true;
         Index primaryIndex = MetadataManager.INSTANCE.getIndex(mdTxnCtx, 
dataset.getDataverseName(),
                 dataset.getDatasetName(), dataset.getDatasetName());
@@ -602,24 +606,46 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(storageManager, spPc.first);
         BTreeSearchOperatorDescriptor btreeSearchOp;
 
+        int numPartitions;
+        if (spPc.second.getPartitionConstraintType() == 
AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
+            numPartitions = ((AlgebricksCountPartitionConstraint) 
spPc.second).getCount();
+        } else {
+            numPartitions = ((AlgebricksAbsolutePartitionConstraint) 
spPc.second).getLocations().length;
+        }
+        int[][] partitionsMap = getPartitionsMap(numPartitions);
+        ITuplePartitionerFactory tuplePartitionerFactory = null;
+        if (partitionInputTuples) {
+            IBinaryHashFunctionFactory[] pkHashFunFactories = 
dataset.getPrimaryHashFunctionFactories(this);
+            tuplePartitionerFactory = new 
FieldHashPartitionerFactory(lowKeyFields, pkHashFunFactories, numPartitions);
+        }
+
         if (dataset.getDatasetType() == DatasetType.INTERNAL) {
             btreeSearchOp = !isSecondary && isPrimaryIndexPointSearch
                     ? new LSMBTreeBatchPointSearchOperatorDescriptor(jobSpec, 
outputRecDesc, lowKeyFields,
                             highKeyFields, lowKeyInclusive, highKeyInclusive, 
indexHelperFactory, retainInput,
                             retainMissing, nonMatchWriterFactory, 
searchCallbackFactory, minFilterFieldIndexes,
-                            maxFilterFieldIndexes, tupleFilterFactory, 
outputLimit, tupleProjectorFactory)
+                            maxFilterFieldIndexes, tupleFilterFactory, 
outputLimit, tupleProjectorFactory,
+                            tuplePartitionerFactory, partitionsMap)
                     : new BTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc, lowKeyFields, highKeyFields,
                             lowKeyInclusive, highKeyInclusive, 
indexHelperFactory, retainInput, retainMissing,
                             nonMatchWriterFactory, searchCallbackFactory, 
minFilterFieldIndexes, maxFilterFieldIndexes,
                             propagateFilter, nonFilterWriterFactory, 
tupleFilterFactory, outputLimit,
                             proceedIndexOnlyPlan, failValueForIndexOnlyPlan, 
successValueForIndexOnlyPlan,
-                            tupleProjectorFactory);
+                            tupleProjectorFactory, tuplePartitionerFactory, 
partitionsMap);
         } else {
             btreeSearchOp = null;
         }
         return new Pair<>(btreeSearchOp, spPc.second);
     }
 
+    private static int[][] getPartitionsMap(int numPartitions) {
+        int[][] map = new int[numPartitions][1];
+        for (int i = 0; i < numPartitions; i++) {
+            map[i] = new int[] { i };
+        }
+        return map;
+    }
+
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
buildRtreeRuntime(JobSpecification jobSpec,
             List<LogicalVariable> outputVars, IOperatorSchema opSchema, 
IVariableTypeEnvironment typeEnv,
             JobGenContext context, boolean retainInput, boolean retainMissing,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
index a2a0f19b71..708c2c2047 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/SampleDataSource.java
@@ -64,7 +64,7 @@ public class SampleDataSource extends DataSource {
             throws AlgebricksException {
         return metadataProvider.buildBtreeRuntime(jobSpec, opSchema, typeEnv, 
context, true, false, null, dataset,
                 sampleIndexName, null, null, true, true, false, null, null, 
null, tupleFilterFactory, outputLimit,
-                false, false, DefaultTupleProjectorFactory.INSTANCE);
+                false, false, DefaultTupleProjectorFactory.INSTANCE, false);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 79bf3e8454..b5dd8bfef5 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -404,10 +404,10 @@ public class DatasetUtil {
                 IRecoveryManager.ResourceType.LSM_BTREE);
         IndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
                 
metadataProvider.getStorageComponentProvider().getStorageManager(), 
primaryFileSplitProvider);
-        BTreeSearchOperatorDescriptor primarySearchOp =
-                new BTreeSearchOperatorDescriptor(spec, 
dataset.getPrimaryRecordDescriptor(metadataProvider),
-                        lowKeyFields, highKeyFields, true, true, 
indexHelperFactory, false, false, null,
-                        searchCallbackFactory, null, null, false, null, null, 
-1, false, null, null, projectorFactory);
+        BTreeSearchOperatorDescriptor primarySearchOp = new 
BTreeSearchOperatorDescriptor(spec,
+                dataset.getPrimaryRecordDescriptor(metadataProvider), 
lowKeyFields, highKeyFields, true, true,
+                indexHelperFactory, false, false, null, searchCallbackFactory, 
null, null, false, null, null, -1, false,
+                null, null, projectorFactory, null, null);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
primarySearchOp,
                 primaryPartitionConstraint);
         return primarySearchOp;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitioner.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitioner.java
new file mode 100644
index 0000000000..5e527dc2c0
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitioner.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface ITuplePartitioner {
+
+    /**
+     * For the tuple (located at tIndex in the frame), it determines which 
partition the tuple belongs to.
+     *
+     * @param accessor The accessor of the frame to access tuples
+     * @param tIndex The index of the tuple in consideration
+     * @return The partition number that the tuple belongs to
+     * @throws HyracksDataException
+     */
+    int partition(IFrameTupleAccessor accessor, int tIndex) throws 
HyracksDataException;
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionerFactory.java
new file mode 100644
index 0000000000..df64d06046
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/ITuplePartitionerFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.dataflow.value;
+
+import java.io.Serializable;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+
+public interface ITuplePartitionerFactory extends Serializable {
+
+    ITuplePartitioner createPartitioner(IHyracksTaskContext ctx);
+
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
new file mode 100644
index 0000000000..31a959f757
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputer.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FieldHashPartitionComputer extends HashPartitioner implements 
ITuplePartitionComputer {
+
+    public FieldHashPartitionComputer(int[] hashFields, IBinaryHashFunction[] 
hashFunctions) {
+        super(hashFields, hashFunctions);
+    }
+
+    @Override
+    public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) 
throws HyracksDataException {
+        return super.partition(accessor, tIndex, nParts);
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
index ab5ab010f8..52df3b7a43 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionComputerFactory.java
@@ -18,13 +18,11 @@
  */
 package org.apache.hyracks.dataflow.common.data.partition;
 
-import org.apache.hyracks.api.comm.IFrameTupleAccessor;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
 import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputerFactory;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class FieldHashPartitionComputerFactory implements 
ITuplePartitionComputerFactory {
     private static final long serialVersionUID = 1L;
@@ -37,34 +35,11 @@ public class FieldHashPartitionComputerFactory implements 
ITuplePartitionCompute
     }
 
     @Override
-    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext 
hyracksTaskContext) {
+    public ITuplePartitionComputer createPartitioner(IHyracksTaskContext ctx) {
         final IBinaryHashFunction[] hashFunctions = new 
IBinaryHashFunction[hashFunctionFactories.length];
         for (int i = 0; i < hashFunctionFactories.length; ++i) {
             hashFunctions[i] = 
hashFunctionFactories[i].createBinaryHashFunction();
         }
-        return new ITuplePartitionComputer() {
-            @Override
-            public int partition(IFrameTupleAccessor accessor, int tIndex, int 
nParts) throws HyracksDataException {
-                if (nParts == 1) {
-                    return 0;
-                }
-                int h = 0;
-                int startOffset = accessor.getTupleStartOffset(tIndex);
-                int slotLength = accessor.getFieldSlotsLength();
-                for (int j = 0; j < hashFields.length; ++j) {
-                    int fIdx = hashFields[j];
-                    IBinaryHashFunction hashFn = hashFunctions[j];
-                    int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
-                    int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
-                    int fh = hashFn.hash(accessor.getBuffer().array(), 
startOffset + slotLength + fStart,
-                            fEnd - fStart);
-                    h = h * 31 + fh;
-                }
-                if (h < 0) {
-                    h = -(h + 1);
-                }
-                return h % nParts;
-            }
-        };
+        return new FieldHashPartitionComputer(hashFields, hashFunctions);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
new file mode 100644
index 0000000000..5620a95d9d
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitioner.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class FieldHashPartitioner extends HashPartitioner implements 
ITuplePartitioner {
+
+    private final int numPartitions;
+
+    public FieldHashPartitioner(int[] hashFields, IBinaryHashFunction[] 
hashFunctions, int numPartitions) {
+        super(hashFields, hashFunctions);
+        this.numPartitions = numPartitions;
+    }
+
+    @Override
+    public int partition(IFrameTupleAccessor accessor, int tIndex) throws 
HyracksDataException {
+        return partition(accessor, tIndex, numPartitions);
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionerFactory.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionerFactory.java
new file mode 100644
index 0000000000..fb62fd8f4d
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/FieldHashPartitionerFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition;
+
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+
+public class FieldHashPartitionerFactory implements ITuplePartitionerFactory {
+
+    private static final long serialVersionUID = 1L;
+    private final int[] hashFields;
+    private final IBinaryHashFunctionFactory[] hashFunctionFactories;
+    private final int numPartitions;
+
+    public FieldHashPartitionerFactory(int[] hashFields, 
IBinaryHashFunctionFactory[] hashFunctionFactories,
+            int numPartitions) {
+        this.hashFields = hashFields;
+        this.hashFunctionFactories = hashFunctionFactories;
+        this.numPartitions = numPartitions;
+    }
+
+    @Override
+    public ITuplePartitioner createPartitioner(IHyracksTaskContext ctx) {
+        final IBinaryHashFunction[] hashFunctions = new 
IBinaryHashFunction[hashFunctionFactories.length];
+        for (int i = 0; i < hashFunctionFactories.length; ++i) {
+            hashFunctions[i] = 
hashFunctionFactories[i].createBinaryHashFunction();
+        }
+        return new FieldHashPartitioner(hashFields, hashFunctions, 
numPartitions);
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
new file mode 100644
index 0000000000..b09bcb8b12
--- /dev/null
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/data/partition/HashPartitioner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.dataflow.common.data.partition;
+
+import org.apache.hyracks.api.comm.IFrameTupleAccessor;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunction;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+class HashPartitioner {
+
+    private final int[] hashFields;
+    private final IBinaryHashFunction[] hashFunctions;
+
+    public HashPartitioner(int[] hashFields, IBinaryHashFunction[] 
hashFunctions) {
+        this.hashFields = hashFields;
+        this.hashFunctions = hashFunctions;
+    }
+
+    protected int partition(IFrameTupleAccessor accessor, int tIndex, int 
nParts) throws HyracksDataException {
+        if (nParts == 1) {
+            return 0;
+        }
+        int h = 0;
+        int startOffset = accessor.getTupleStartOffset(tIndex);
+        int slotLength = accessor.getFieldSlotsLength();
+        for (int j = 0; j < hashFields.length; ++j) {
+            int fIdx = hashFields[j];
+            IBinaryHashFunction hashFn = hashFunctions[j];
+            int fStart = accessor.getFieldStartOffset(tIndex, fIdx);
+            int fEnd = accessor.getFieldEndOffset(tIndex, fIdx);
+            int fh = hashFn.hash(accessor.getBuffer().array(), startOffset + 
slotLength + fStart, fEnd - fStart);
+            h = h * 31 + fh;
+        }
+        if (h < 0) {
+            h = -(h + 1);
+        }
+        return h % nParts;
+    }
+}
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
index 0ab88a524d..1c961c5c68 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorDescriptor.java
@@ -22,6 +22,7 @@ package org.apache.hyracks.storage.am.btree.dataflow;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -34,7 +35,7 @@ import 
org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
 
 public class BTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 2L;
+    private static final long serialVersionUID = 3L;
 
     protected final int[] lowKeyFields;
     protected final int[] highKeyFields;
@@ -55,6 +56,8 @@ public class BTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
     protected final ITupleFilterFactory tupleFilterFactory;
     protected final long outputLimit;
     protected final ITupleProjectorFactory tupleProjectorFactory;
+    protected final ITuplePartitionerFactory tuplePartitionerFactory;
+    protected final int[][] map;
 
     public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor outRecDesc,
             int[] lowKeyFields, int[] highKeyFields, boolean lowKeyInclusive, 
boolean highKeyInclusive,
@@ -65,7 +68,7 @@ public class BTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
         this(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, 
highKeyInclusive, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, minFilterFieldIndexes,
                 maxFilterFieldIndexes, appendIndexFilter, 
nonFilterWriterFactory, null, -1, false, null, null,
-                DefaultTupleProjectorFactory.INSTANCE);
+                DefaultTupleProjectorFactory.INSTANCE, null, null);
     }
 
     public BTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor outRecDesc,
@@ -75,7 +78,8 @@ public class BTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, boolean 
appendIndexFilter,
             IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory 
tupleFilterFactory, long outputLimit,
             boolean appendOpCallbackProceedResult, byte[] 
searchCallbackProceedResultFalseValue,
-            byte[] searchCallbackProceedResultTrueValue, 
ITupleProjectorFactory tupleProjectorFactory) {
+            byte[] searchCallbackProceedResultTrueValue, 
ITupleProjectorFactory tupleProjectorFactory,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.retainInput = retainInput;
@@ -97,6 +101,8 @@ public class BTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
         this.searchCallbackProceedResultFalseValue = 
searchCallbackProceedResultFalseValue;
         this.searchCallbackProceedResultTrueValue = 
searchCallbackProceedResultTrueValue;
         this.tupleProjectorFactory = tupleProjectorFactory;
+        this.tuplePartitionerFactory = tuplePartitionerFactory;
+        this.map = map;
     }
 
     @Override
@@ -107,7 +113,8 @@ public class BTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
                 lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, 
maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, appendIndexFilter,
                 nonFilterWriterFactory, tupleFilterFactory, outputLimit, 
appendOpCallbackProceedResult,
-                searchCallbackProceedResultFalseValue, 
searchCallbackProceedResultTrueValue, tupleProjectorFactory);
+                searchCallbackProceedResultFalseValue, 
searchCallbackProceedResultTrueValue, tupleProjectorFactory,
+                tuplePartitionerFactory, map);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
index a56e305e48..24163ea9d9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/dataflow/BTreeSearchOperatorNodePushable.java
@@ -20,6 +20,7 @@ package org.apache.hyracks.storage.am.btree.dataflow;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.HyracksConstants;
@@ -32,6 +33,7 @@ import 
org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
 import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
+import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
@@ -55,7 +57,7 @@ public class BTreeSearchOperatorNodePushable extends 
IndexSearchOperatorNodePush
         this(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, 
lowKeyInclusive, highKeyInclusive,
                 minFilterFieldIndexes, maxFilterFieldIndexes, 
indexHelperFactory, retainInput, retainMissing,
                 nonMatchWriterFactory, searchCallbackFactory, 
appendIndexFilter, nonFilterWriterFactory, null, -1,
-                false, null, null, DefaultTupleProjectorFactory.INSTANCE);
+                false, null, null, DefaultTupleProjectorFactory.INSTANCE, 
null, null);
     }
 
     public BTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int 
partition, RecordDescriptor inputRecDesc,
@@ -65,12 +67,13 @@ public class BTreeSearchOperatorNodePushable extends 
IndexSearchOperatorNodePush
             ISearchOperationCallbackFactory searchCallbackFactory, boolean 
appendIndexFilter,
             IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory 
tupleFilterFactory, long outputLimit,
             boolean appendOpCallbackProceedResult, byte[] 
searchCallbackProceedResultFalseValue,
-            byte[] searchCallbackProceedResultTrueValue, 
ITupleProjectorFactory projectorFactory)
-            throws HyracksDataException {
+            byte[] searchCallbackProceedResultTrueValue, 
ITupleProjectorFactory projectorFactory,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) 
throws HyracksDataException {
         super(ctx, inputRecDesc, partition, minFilterFieldIndexes, 
maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, nonMatchWriterFactory, 
searchCallbackFactory, appendIndexFilter,
                 nonFilterWriterFactory, tupleFilterFactory, outputLimit, 
appendOpCallbackProceedResult,
-                searchCallbackProceedResultFalseValue, 
searchCallbackProceedResultTrueValue, projectorFactory);
+                searchCallbackProceedResultFalseValue, 
searchCallbackProceedResultTrueValue, projectorFactory,
+                tuplePartitionerFactory, map);
         this.lowKeyInclusive = lowKeyInclusive;
         this.highKeyInclusive = highKeyInclusive;
         if (lowKeyFields != null && lowKeyFields.length > 0) {
@@ -100,7 +103,7 @@ public class BTreeSearchOperatorNodePushable extends 
IndexSearchOperatorNodePush
     }
 
     @Override
-    protected ISearchPredicate createSearchPredicate() {
+    protected ISearchPredicate createSearchPredicate(IIndex index) {
         ITreeIndex treeIndex = (ITreeIndex) index;
         lowKeySearchCmp = 
BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), lowKey);
         highKeySearchCmp = 
BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), 
highKey);
@@ -109,7 +112,7 @@ public class BTreeSearchOperatorNodePushable extends 
IndexSearchOperatorNodePush
     }
 
     @Override
-    protected int getFieldCount() {
+    protected int getFieldCount(IIndex index) {
         return ((ITreeIndex) index).getFieldCount();
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
index b42337cf45..a2487d388a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/test/java/org/apache/hyracks/storage/am/btree/test/FramewriterTest.java
@@ -230,7 +230,7 @@ public class FramewriterTest {
         }
         System.out.println("Number of passed tests: " + successes);
         System.out.println("Number of failed tests: " + failures);
-        Assert.assertEquals(failures, 0);
+        Assert.assertEquals(0, failures);
     }
 
     private void testBTreeSearchOperatorNodePushable() throws Exception {
diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
index 9d869e9226..9bdbf3cd84 100644
--- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
+++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/pom.xml
@@ -113,5 +113,9 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil-core</artifactId>
+    </dependency>
   </dependencies>
 </project>
\ No newline at end of file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 4fc80576a7..c6cdd663b1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -27,6 +27,8 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.IIntrospectingOperator;
 import org.apache.hyracks.api.dataflow.value.IMissingWriter;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitioner;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.profiling.IOperatorStats;
@@ -46,7 +48,6 @@ import 
org.apache.hyracks.storage.am.common.api.ILSMIndexCursor;
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITupleFilter;
 import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
-import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
 import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
 import 
org.apache.hyracks.storage.am.common.tuples.ReferenceFrameTupleReference;
@@ -64,22 +65,24 @@ import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import it.unimi.dsi.fastutil.ints.Int2IntMap;
+import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
+
 public abstract class IndexSearchOperatorNodePushable extends 
AbstractUnaryInputUnaryOutputOperatorNodePushable
         implements IIntrospectingOperator {
 
     static final Logger LOGGER = LogManager.getLogger();
     protected final IHyracksTaskContext ctx;
-    protected final IIndexDataflowHelper indexHelper;
     protected FrameTupleAccessor accessor;
-
     protected FrameTupleAppender appender;
     protected ArrayTupleBuilder tb;
     protected DataOutput dos;
 
-    protected IIndex index;
     protected ISearchPredicate searchPred;
-    protected IIndexCursor cursor;
-    protected IIndexAccessor indexAccessor;
+    protected final IIndexDataflowHelper[] indexHelpers;
+    protected IIndex[] indexes;
+    protected IIndexAccessor[] indexAccessors;
+    protected IIndexCursor[] cursors;
 
     protected final RecordDescriptor inputRecDesc;
     protected final boolean retainInput;
@@ -114,28 +117,31 @@ public abstract class IndexSearchOperatorNodePushable 
extends AbstractUnaryInput
     protected long outputCount = 0;
     protected boolean finished;
     protected final ITupleProjector tupleProjector;
+    protected final ITuplePartitioner tuplePartitioner;
+    protected final int[] partitions;
+    private final Int2IntMap storagePartitionId2Index = new 
Int2IntOpenHashMap();
 
-    // no filter and limit pushdown
     public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, 
RecordDescriptor inputRecDesc, int partition,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, 
IIndexDataflowHelperFactory indexHelperFactory,
             boolean retainInput, boolean retainMissing, IMissingWriterFactory 
nonMatchWriterFactory,
             ISearchOperationCallbackFactory searchCallbackFactory, boolean 
appendIndexFilter,
-            IMissingWriterFactory nonFilterWriterFactory) throws 
HyracksDataException {
-        this(ctx, inputRecDesc, partition, minFilterFieldIndexes, 
maxFilterFieldIndexes, indexHelperFactory,
-                retainInput, retainMissing, nonMatchWriterFactory, 
searchCallbackFactory, appendIndexFilter,
-                nonFilterWriterFactory, null, -1, false, null, null, 
DefaultTupleProjectorFactory.INSTANCE);
-    }
-
-    public IndexSearchOperatorNodePushable(IHyracksTaskContext ctx, 
RecordDescriptor inputRecDesc, int partition,
-            int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, 
IIndexDataflowHelperFactory indexHelperFactory,
-            boolean retainInput, boolean retainMissing, IMissingWriterFactory 
nonMatchWriterFactory,
-            ISearchOperationCallbackFactory searchCallbackFactory, boolean 
appendIndexFilter,
-            IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory 
tupleFactoryFactory, long outputLimit,
+            IMissingWriterFactory nonFilterWriterFactory, ITupleFilterFactory 
tupleFilterFactory, long outputLimit,
             boolean appendSearchCallbackProceedResult, byte[] 
searchCallbackProceedResultFalseValue,
-            byte[] searchCallbackProceedResultTrueValue, 
ITupleProjectorFactory projectorFactory)
-            throws HyracksDataException {
+            byte[] searchCallbackProceedResultTrueValue, 
ITupleProjectorFactory projectorFactory,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) 
throws HyracksDataException {
         this.ctx = ctx;
-        this.indexHelper = 
indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
partition);
+        this.appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
+        this.partitions = map != null ? map[partition] : new int[] { partition 
};
+        for (int i = 0; i < partitions.length; i++) {
+            storagePartitionId2Index.put(partitions[i], i);
+        }
+        this.indexHelpers = new IIndexDataflowHelper[partitions.length];
+        this.indexes = new IIndex[partitions.length];
+        this.indexAccessors = new IIndexAccessor[partitions.length];
+        this.cursors = new IIndexCursor[partitions.length];
+        for (int i = 0; i < partitions.length; i++) {
+            indexHelpers[i] = 
indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
partitions[i]);
+        }
         this.retainInput = retainInput;
         this.retainMissing = retainMissing;
         this.appendIndexFilter = appendIndexFilter;
@@ -160,7 +166,7 @@ public abstract class IndexSearchOperatorNodePushable 
extends AbstractUnaryInput
         this.appendSearchCallbackProceedResult = 
appendSearchCallbackProceedResult;
         this.searchCallbackProceedResultFalseValue = 
searchCallbackProceedResultFalseValue;
         this.searchCallbackProceedResultTrueValue = 
searchCallbackProceedResultTrueValue;
-        this.tupleFilterFactory = tupleFactoryFactory;
+        this.tupleFilterFactory = tupleFilterFactory;
         this.outputLimit = outputLimit;
         this.stats = new NoOpOperatorStats();
 
@@ -169,30 +175,43 @@ public abstract class IndexSearchOperatorNodePushable 
extends AbstractUnaryInput
         }
 
         tupleProjector = projectorFactory.createTupleProjector(ctx);
+        tuplePartitioner = tuplePartitionerFactory == null ? null : 
tuplePartitionerFactory.createPartitioner(ctx);
     }
 
-    protected abstract ISearchPredicate createSearchPredicate();
+    protected abstract ISearchPredicate createSearchPredicate(IIndex index);
 
     protected abstract void resetSearchPredicate(int tupleIndex);
 
     // Assigns any index-type specific related accessor parameters
     protected abstract void 
addAdditionalIndexAccessorParams(IIndexAccessParameters iap) throws 
HyracksDataException;
 
-    protected IIndexCursor createCursor() throws HyracksDataException {
-        return indexAccessor.createSearchCursor(false);
+    protected IIndexCursor createCursor(IIndex idx, IIndexAccessor 
idxAccessor) throws HyracksDataException {
+        return idxAccessor.createSearchCursor(false);
     }
 
-    protected abstract int getFieldCount();
+    protected abstract int getFieldCount(IIndex index);
 
     @Override
     public void open() throws HyracksDataException {
         writer.open();
-        indexHelper.open();
-        index = indexHelper.getIndexInstance();
-        subscribeForStats(index);
+        ISearchOperationCallback[] searchCallbacks = new 
ISearchOperationCallback[partitions.length];
+        IIndexAccessParameters[] iaps = new 
IndexAccessParameters[partitions.length];
+
+        for (int i = 0; i < partitions.length; i++) {
+            indexHelpers[i].open();
+            indexes[i] = indexHelpers[i].getIndexInstance();
+            searchCallbacks[i] = searchCallbackFactory
+                    
.createSearchOperationCallback(indexHelpers[i].getResource().getId(), ctx, 
null);
+            iaps[i] = new 
IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallbacks[i]);
+            addAdditionalIndexAccessorParams(iaps[i]);
+            indexAccessors[i] = indexes[i].createAccessor(iaps[i]);
+            cursors[i] = createCursor(indexes[i], indexAccessors[i]);
+        }
+
+        subscribeForStats(indexes[0]);
         accessor = new FrameTupleAccessor(inputRecDesc);
         if (retainMissing) {
-            int fieldCount = getFieldCount();
+            int fieldCount = getFieldCount(indexes[0]);
             // Field count in case searchCallback.proceed() result is needed.
             int finalFieldCount = appendSearchCallbackProceedResult ? 
fieldCount + 1 : fieldCount;
             nonMatchTupleBuild = new ArrayTupleBuilder(finalFieldCount);
@@ -206,7 +225,7 @@ public abstract class IndexSearchOperatorNodePushable 
extends AbstractUnaryInput
             nonMatchTupleBuild = null;
         }
         if (appendIndexFilter) {
-            int numIndexFilterFields = index.getNumOfFilterFields();
+            int numIndexFilterFields = indexes[0].getNumOfFilterFields();
             nonFilterTupleBuild = new ArrayTupleBuilder(numIndexFilterFields);
             buildMissingTuple(numIndexFilterFields, nonFilterTupleBuild, 
nonFilterWriter);
         }
@@ -219,16 +238,9 @@ public abstract class IndexSearchOperatorNodePushable 
extends AbstractUnaryInput
         outputCount = 0;
 
         try {
-            searchPred = createSearchPredicate();
+            searchPred = createSearchPredicate(indexes[0]);
             tb = new ArrayTupleBuilder(recordDesc.getFieldCount());
             dos = tb.getDataOutput();
-            appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
-            ISearchOperationCallback searchCallback =
-                    
searchCallbackFactory.createSearchOperationCallback(indexHelper.getResource().getId(),
 ctx, null);
-            IIndexAccessParameters iap = new 
IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallback);
-            addAdditionalIndexAccessorParams(iap);
-            indexAccessor = index.createAccessor(iap);
-            cursor = createCursor();
             if (retainInput) {
                 frameTuple = new FrameTupleReference();
             }
@@ -237,7 +249,7 @@ public abstract class IndexSearchOperatorNodePushable 
extends AbstractUnaryInput
         }
     }
 
-    protected void writeSearchResults(int tupleIndex) throws Exception {
+    protected void writeSearchResults(int tupleIndex, IIndexCursor cursor) 
throws Exception {
         long matchingTupleCount = 0;
         while (cursor.hasNext()) {
             cursor.next();
@@ -291,11 +303,10 @@ public abstract class IndexSearchOperatorNodePushable 
extends AbstractUnaryInput
         accessor.reset(buffer);
         int tupleCount = accessor.getTupleCount();
         try {
-            for (int i = 0; i < tupleCount && !finished; i++) {
-                resetSearchPredicate(i);
-                cursor.close();
-                indexAccessor.search(cursor, searchPred);
-                writeSearchResults(i);
+            if (tuplePartitioner != null) {
+                searchPartition(tupleCount);
+            } else {
+                searchAllPartitions(tupleCount);
             }
         } catch (Exception e) {
             throw HyracksDataException.create(e);
@@ -309,40 +320,52 @@ public abstract class IndexSearchOperatorNodePushable 
extends AbstractUnaryInput
 
     @Override
     public void close() throws HyracksDataException {
-        Throwable failure = releaseResources();
+        Throwable failure = flushFrame();
+        failure = releaseResources(failure);
         failure = CleanupUtils.close(writer, failure);
         if (failure != null) {
             throw HyracksDataException.create(failure);
         }
     }
 
-    private Throwable releaseResources() {
+    private Throwable flushFrame() {
         Throwable failure = null;
-        if (index != null) {
-            // if index == null, then the index open was not successful
-            if (!failed) {
+        if (!failed) {
+            try {
+                if (appender.getTupleCount() > 0) {
+                    appender.write(writer, true);
+                }
+                
stats.getPageReads().update(ctx.getThreadStats().getPinnedPagesCount());
+                
stats.coldReadCounter().update(ctx.getThreadStats().getColdReadCount());
+            } catch (Throwable th) { // NOSONAR Must ensure writer.fail is 
called.
+                // subsequently, the failure will be thrown
+                failure = th;
+            }
+            if (failure != null) {
                 try {
-                    if (appender.getTupleCount() > 0) {
-                        appender.write(writer, true);
-                    }
-                    
stats.getPageReads().update(ctx.getThreadStats().getPinnedPagesCount());
-                    
stats.coldReadCounter().update(ctx.getThreadStats().getColdReadCount());
-                } catch (Throwable th) { // NOSONAR Must ensure writer.fail is 
called.
+                    writer.fail();
+                } catch (Throwable th) {
                     // subsequently, the failure will be thrown
-                    failure = th;
+                    failure = ExceptionUtils.suppress(failure, th);
                 }
-                if (failure != null) {
-                    try {
-                        writer.fail();
-                    } catch (Throwable th) {// NOSONAR Must cursor.close is 
called.
-                        // subsequently, the failure will be thrown
-                        failure = ExceptionUtils.suppress(failure, th);
-                    }
+            }
+        }
+        return failure;
+    }
+
+    private Throwable releaseResources(Throwable failure) {
+        for (int i = 0; i < indexes.length; i++) {
+            // if index == null, then the index open was not successful
+            try {
+                if (indexes[i] != null) {
+                    failure = ResourceReleaseUtils.close(cursors[i], failure);
+                    failure = CleanupUtils.destroy(failure, cursors[i], 
indexAccessors[i]);
+                    failure = ResourceReleaseUtils.close(indexHelpers[i], 
failure);
                 }
+            } catch (Throwable th) {// NOSONAR ensure closing other indexes
+                // subsequently, the failure will be thrown
+                failure = ExceptionUtils.suppress(failure, th);
             }
-            failure = ResourceReleaseUtils.close(cursor, failure);
-            failure = CleanupUtils.destroy(failure, cursor, indexAccessor);
-            failure = ResourceReleaseUtils.close(indexHelper, failure);
         }
         return failure;
     }
@@ -413,4 +436,25 @@ public abstract class IndexSearchOperatorNodePushable 
extends AbstractUnaryInput
         this.stats = stats;
     }
 
+    private void searchPartition(int tupleCount) throws Exception {
+        for (int i = 0; i < tupleCount && !finished; i++) {
+            int storagePartition = tuplePartitioner.partition(accessor, i);
+            int pIdx = storagePartitionId2Index.get(storagePartition);
+            resetSearchPredicate(i);
+            cursors[pIdx].close();
+            indexAccessors[pIdx].search(cursors[pIdx], searchPred);
+            writeSearchResults(i, cursors[pIdx]);
+        }
+    }
+
+    private void searchAllPartitions(int tupleCount) throws Exception {
+        for (int p = 0; p < partitions.length; p++) {
+            for (int i = 0; i < tupleCount && !finished; i++) {
+                resetSearchPredicate(i);
+                cursors[p].close();
+                indexAccessors[p].search(cursors[p], searchPred);
+                writeSearchResults(i, cursors[p]);
+            }
+        }
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
index 3e03e5c3e0..9ed0782a89 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorDescriptor.java
@@ -22,6 +22,7 @@ package org.apache.hyracks.storage.am.lsm.btree.dataflow;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -39,11 +40,12 @@ public class LSMBTreeBatchPointSearchOperatorDescriptor 
extends BTreeSearchOpera
             IIndexDataflowHelperFactory indexHelperFactory, boolean 
retainInput, boolean retainMissing,
             IMissingWriterFactory missingWriterFactory, 
ISearchOperationCallbackFactory searchCallbackFactory,
             int[] minFilterFieldIndexes, int[] maxFilterFieldIndexes, 
ITupleFilterFactory tupleFilterFactory,
-            long outputLimit, ITupleProjectorFactory tupleProjectorFactory) {
+            long outputLimit, ITupleProjectorFactory tupleProjectorFactory,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) {
         super(spec, outRecDesc, lowKeyFields, highKeyFields, lowKeyInclusive, 
highKeyInclusive, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, minFilterFieldIndexes,
                 maxFilterFieldIndexes, false, null, tupleFilterFactory, 
outputLimit, false, null, null,
-                tupleProjectorFactory);
+                tupleProjectorFactory, tuplePartitionerFactory, map);
     }
 
     @Override
@@ -53,7 +55,7 @@ public class LSMBTreeBatchPointSearchOperatorDescriptor 
extends BTreeSearchOpera
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), lowKeyFields, highKeyFields,
                 lowKeyInclusive, highKeyInclusive, minFilterFieldIndexes, 
maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, tupleFilterFactory,
-                outputLimit, tupleProjectorFactory);
+                outputLimit, tupleProjectorFactory, tuplePartitionerFactory, 
map);
     }
 
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
index f6f97b789c..47d515a3e3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeBatchPointSearchOperatorNodePushable.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
@@ -37,6 +38,8 @@ import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory
 import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTree;
 import 
org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeBatchPointSearchCursor;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexAccessor;
 import org.apache.hyracks.storage.common.IIndexCursor;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
@@ -50,23 +53,23 @@ public class LSMBTreeBatchPointSearchOperatorNodePushable 
extends BTreeSearchOpe
             boolean highKeyInclusive, int[] minFilterKeyFields, int[] 
maxFilterKeyFields,
             IIndexDataflowHelperFactory indexHelperFactory, boolean 
retainInput, boolean retainMissing,
             IMissingWriterFactory missingWriterFactory, 
ISearchOperationCallbackFactory searchCallbackFactory,
-            ITupleFilterFactory tupleFilterFactory, long outputLimit, 
ITupleProjectorFactory tupleProjectorFactory)
-            throws HyracksDataException {
+            ITupleFilterFactory tupleFilterFactory, long outputLimit, 
ITupleProjectorFactory tupleProjectorFactory,
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] map) 
throws HyracksDataException {
         super(ctx, partition, inputRecDesc, lowKeyFields, highKeyFields, 
lowKeyInclusive, highKeyInclusive,
                 minFilterKeyFields, maxFilterKeyFields, indexHelperFactory, 
retainInput, retainMissing,
                 missingWriterFactory, searchCallbackFactory, false, null, 
tupleFilterFactory, outputLimit, false, null,
-                null, tupleProjectorFactory);
+                null, tupleProjectorFactory, tuplePartitionerFactory, map);
         this.keyFields = lowKeyFields;
     }
 
     @Override
-    protected IIndexCursor createCursor() throws HyracksDataException {
-        ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessor;
-        return ((LSMBTree) 
index).createBatchPointSearchCursor(lsmAccessor.getOpContext());
+    protected IIndexCursor createCursor(IIndex idx, IIndexAccessor 
idxAccessor) throws HyracksDataException {
+        ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) idxAccessor;
+        return ((LSMBTree) 
idx).createBatchPointSearchCursor(lsmAccessor.getOpContext());
     }
 
     @Override
-    protected ISearchPredicate createSearchPredicate() {
+    protected ISearchPredicate createSearchPredicate(IIndex index) {
         ITreeIndex treeIndex = (ITreeIndex) index;
         lowKeySearchCmp =
                 highKeySearchCmp = 
BTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), lowKey);
@@ -78,19 +81,21 @@ public class LSMBTreeBatchPointSearchOperatorNodePushable 
extends BTreeSearchOpe
         accessor.reset(buffer);
         if (accessor.getTupleCount() > 0) {
             BatchPredicate batchPred = (BatchPredicate) searchPred;
-            batchPred.reset(accessor);
-            try {
-                indexAccessor.search(cursor, batchPred);
-                writeSearchResults();
-            } catch (IOException e) {
-                throw HyracksDataException.create(e);
-            } finally {
-                cursor.close();
+            for (int p = 0; p < partitions.length; p++) {
+                batchPred.reset(accessor);
+                try {
+                    indexAccessors[p].search(cursors[p], batchPred);
+                    writeSearchResults(cursors[p]);
+                } catch (IOException e) {
+                    throw HyracksDataException.create(e);
+                } finally {
+                    cursors[p].close();
+                }
             }
         }
     }
 
-    protected void writeSearchResults() throws IOException {
+    protected void writeSearchResults(IIndexCursor cursor) throws IOException {
         long matchingTupleCount = 0;
         LSMBTreeBatchPointSearchCursor batchCursor = 
(LSMBTreeBatchPointSearchCursor) cursor;
         int tupleIndex = 0;
@@ -127,7 +132,6 @@ public class LSMBTreeBatchPointSearchOperatorNodePushable 
extends BTreeSearchOpe
             }
         }
         stats.getInputTupleCounter().update(matchingTupleCount);
-
     }
 
     private void appendMissingTuple(int start, int end) throws 
HyracksDataException {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java
index 4aa094b9b0..1bf229b4d7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorNodePushable.java
@@ -27,7 +27,9 @@ import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import org.apache.hyracks.storage.am.common.api.ITreeIndex;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 
@@ -37,21 +39,23 @@ public class LSMBTreeDiskComponentScanOperatorNodePushable 
extends IndexSearchOp
             RecordDescriptor inputRecDesc, IIndexDataflowHelperFactory 
indexHelperFactory,
             ISearchOperationCallbackFactory searchCallbackFactory) throws 
HyracksDataException {
         super(ctx, inputRecDesc, partition, null, null, indexHelperFactory, 
false, false, null, searchCallbackFactory,
-                false, null);
+                false, null, null, -1, false, null, null, 
DefaultTupleProjectorFactory.INSTANCE, null, null);
     }
 
     @Override
     public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
         try {
-            ((ILSMIndexAccessor) indexAccessor).scanDiskComponents(cursor);
-            writeSearchResults(0);
+            for (int p = 0; p < partitions.length; p++) {
+                ((ILSMIndexAccessor) 
indexAccessors[p]).scanDiskComponents(cursors[p]);
+                writeSearchResults(0, cursors[p]);
+            }
         } catch (Exception e) {
             throw HyracksDataException.create(e);
         }
     }
 
     @Override
-    protected ISearchPredicate createSearchPredicate() {
+    protected ISearchPredicate createSearchPredicate(IIndex index) {
         // do nothing
         // no need to create search predicate for disk component scan operation
         return null;
@@ -63,7 +67,7 @@ public class LSMBTreeDiskComponentScanOperatorNodePushable 
extends IndexSearchOp
     }
 
     @Override
-    protected int getFieldCount() {
+    protected int getFieldCount(IIndex index) {
         return ((ITreeIndex) index).getFieldCount() + 2;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
index 571ae5c431..996241daef 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-invertedindex/src/main/java/org/apache/hyracks/storage/am/lsm/invertedindex/dataflow/LSMInvertedIndexSearchOperatorNodePushable.java
@@ -33,12 +33,14 @@ import 
org.apache.hyracks.dataflow.std.buffermanager.ISimpleFrameBufferManager;
 import 
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
 import 
org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePushable;
+import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.api.IInvertedIndexSearchModifier;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.fulltext.IFullTextConfigEvaluator;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.fulltext.IFullTextConfigEvaluatorFactory;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.search.InvertedIndexSearchPredicate;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizer;
 import 
org.apache.hyracks.storage.am.lsm.invertedindex.tokenizers.IBinaryTokenizerFactory;
+import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 
@@ -66,7 +68,7 @@ public class LSMInvertedIndexSearchOperatorNodePushable 
extends IndexSearchOpera
             IMissingWriterFactory nonFilterWriterFactory, int frameLimit) 
throws HyracksDataException {
         super(ctx, inputRecDesc, partition, minFilterFieldIndexes, 
maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, appendIndexFilter,
-                nonFilterWriterFactory);
+                nonFilterWriterFactory, null, -1, false, null, null, 
DefaultTupleProjectorFactory.INSTANCE, null, null);
         this.searchModifier = searchModifier;
         this.binaryTokenizerFactory = binaryTokenizerFactory;
         this.fullTextConfigEvaluatorFactory = fullTextConfigEvaluatorFactory;
@@ -85,7 +87,7 @@ public class LSMInvertedIndexSearchOperatorNodePushable 
extends IndexSearchOpera
     }
 
     @Override
-    protected ISearchPredicate createSearchPredicate() {
+    protected ISearchPredicate createSearchPredicate(IIndex index) {
         IBinaryTokenizer tokenizer = binaryTokenizerFactory.createTokenizer();
         IFullTextConfigEvaluator fullTextConfigEvaluator =
                 fullTextConfigEvaluatorFactory.createFullTextConfigEvaluator();
@@ -110,7 +112,7 @@ public class LSMInvertedIndexSearchOperatorNodePushable 
extends IndexSearchOpera
     }
 
     @Override
-    protected int getFieldCount() {
+    protected int getFieldCount(IIndex index) {
         return numOfFields;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index e1c6f5b85e..aef624fa7b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -31,24 +31,16 @@ import 
org.apache.hyracks.storage.am.common.dataflow.IndexSearchOperatorNodePush
 import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
 import org.apache.hyracks.storage.am.rtree.impls.SearchPredicate;
 import org.apache.hyracks.storage.am.rtree.util.RTreeUtils;
+import org.apache.hyracks.storage.common.IIndex;
 import org.apache.hyracks.storage.common.IIndexAccessParameters;
 import org.apache.hyracks.storage.common.ISearchPredicate;
 import org.apache.hyracks.storage.common.MultiComparator;
 
 public class RTreeSearchOperatorNodePushable extends 
IndexSearchOperatorNodePushable {
+
     protected PermutingFrameTupleReference searchKey;
     protected MultiComparator cmp;
 
-    public RTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int 
partition, RecordDescriptor inputRecDesc,
-            int[] keyFields, int[] minFilterFieldIndexes, int[] 
maxFilterFieldIndexes,
-            IIndexDataflowHelperFactory indexHelperFactory, boolean 
retainInput, boolean retainMissing,
-            IMissingWriterFactory missingWriterFactory, 
ISearchOperationCallbackFactory searchCallbackFactory,
-            boolean appendIndexFilter, IMissingWriterFactory 
nonFilterWriterFactory) throws HyracksDataException {
-        this(ctx, partition, inputRecDesc, keyFields, minFilterFieldIndexes, 
maxFilterFieldIndexes, indexHelperFactory,
-                retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, appendIndexFilter,
-                nonFilterWriterFactory, false, null, null);
-    }
-
     public RTreeSearchOperatorNodePushable(IHyracksTaskContext ctx, int 
partition, RecordDescriptor inputRecDesc,
             int[] keyFields, int[] minFilterFieldIndexes, int[] 
maxFilterFieldIndexes,
             IIndexDataflowHelperFactory indexHelperFactory, boolean 
retainInput, boolean retainMissing,
@@ -60,7 +52,7 @@ public class RTreeSearchOperatorNodePushable extends 
IndexSearchOperatorNodePush
         super(ctx, inputRecDesc, partition, minFilterFieldIndexes, 
maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, appendIndexFilter,
                 nonFilterWriterFactory, null, -1, 
appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
-                searchCallbackProceedResultTrueValue, 
DefaultTupleProjectorFactory.INSTANCE);
+                searchCallbackProceedResultTrueValue, 
DefaultTupleProjectorFactory.INSTANCE, null, null);
         if (keyFields != null && keyFields.length > 0) {
             searchKey = new PermutingFrameTupleReference();
             searchKey.setFieldPermutation(keyFields);
@@ -68,7 +60,7 @@ public class RTreeSearchOperatorNodePushable extends 
IndexSearchOperatorNodePush
     }
 
     @Override
-    protected ISearchPredicate createSearchPredicate() {
+    protected ISearchPredicate createSearchPredicate(IIndex index) {
         ITreeIndex treeIndex = (ITreeIndex) index;
         cmp = 
RTreeUtils.getSearchMultiComparator(treeIndex.getComparatorFactories(), 
searchKey);
         return new SearchPredicate(searchKey, cmp, minFilterKey, maxFilterKey);
@@ -88,7 +80,7 @@ public class RTreeSearchOperatorNodePushable extends 
IndexSearchOperatorNodePush
     }
 
     @Override
-    protected int getFieldCount() {
+    protected int getFieldCount(IIndex index) {
         return ((ITreeIndex) index).getFieldCount();
     }
 

Reply via email to