This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fc85d3d81 [ASTERIXDB-3144][HYR][RT] Make other operators support 
multiple partitions
4fc85d3d81 is described below

commit 4fc85d3d8112af713b0906821f9e95ceee9dd6c4
Author: Ali Alsuliman <[email protected]>
AuthorDate: Tue May 2 19:53:53 2023 -0700

    [ASTERIXDB-3144][HYR][RT] Make other operators support multiple partitions
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    This patch changes the following operators to support
    operating on multiple partitions. This is a step towards
    achieving compute/storage separation:
    - compact dataset/index
    - drop dataverse
    - remove feed storage
    - RTree search
    
    Remove unused getWriteResultRuntime from IMetadataProvider.
    
    Change-Id: I2b1223c0d1c3248305014396dec4c1d0bdef13f6
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17505
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../org/apache/asterix/utils/DataverseUtil.java    |  4 +-
 .../org/apache/asterix/utils/FeedOperations.java   | 30 +++---------
 .../metadata/declared/MetadataProvider.java        | 56 ++++------------------
 .../metadata/utils/DataPartitioningProvider.java   | 29 ++++++++++-
 .../apache/asterix/metadata/utils/DatasetUtil.java |  4 +-
 .../utils/SecondaryTreeIndexOperationsHelper.java  |  4 +-
 .../DatasetStreamStatsOperatorDescriptor.java      |  1 +
 ...SMSecondaryIndexBulkLoadOperatorDescriptor.java |  1 +
 ...exCreationTupleProcessorOperatorDescriptor.java |  1 +
 .../core/algebra/metadata/IMetadataProvider.java   |  5 --
 .../hyracks/api/dataflow/IOperatorDescriptor.java  |  4 +-
 .../std/file/FileRemoveOperatorDescriptor.java     | 52 ++++++++++----------
 .../ConstantTupleSourceOperatorDescriptor.java     |  6 +--
 .../ConstantTupleSourceOperatorNodePushable.java   |  8 ++--
 ...tterTuplesSecondaryIndexSearchOperatorTest.java |  3 +-
 .../RTreeSecondaryIndexInsertOperatorTest.java     |  7 +--
 .../rtree/RTreeSecondaryIndexScanOperatorTest.java |  7 +--
 .../RTreeSecondaryIndexSearchOperatorTest.java     |  7 +--
 .../dataflow/IndexSearchOperatorNodePushable.java  |  1 +
 .../TreeIndexDiskOrderScanOperatorDescriptor.java  |  1 +
 .../dataflow/TreeIndexStatsOperatorDescriptor.java |  1 +
 ...SMBTreeDiskComponentScanOperatorDescriptor.java |  1 +
 .../LSMIndexCompactOperatorNodePushable.java       | 28 +++++++----
 .../LSMTreeIndexCompactOperatorDescriptor.java     |  8 ++--
 .../dataflow/RTreeSearchOperatorDescriptor.java    | 13 +++--
 .../dataflow/RTreeSearchOperatorNodePushable.java  |  4 +-
 26 files changed, 142 insertions(+), 144 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
index d5743cbdf9..bfba414669 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
@@ -34,8 +34,8 @@ public class DataverseUtil {
     public static JobSpecification dropDataverseJobSpec(Dataverse dataverse, 
MetadataProvider metadata) {
         JobSpecification jobSpec = 
RuntimeUtils.createJobSpecification(metadata.getApplicationContext());
         PartitioningProperties partitioningProperties = 
metadata.splitAndConstraints(dataverse.getDataverseName());
-        FileRemoveOperatorDescriptor frod =
-                new FileRemoveOperatorDescriptor(jobSpec, 
partitioningProperties.getSpiltsProvider(), false);
+        FileRemoveOperatorDescriptor frod = new 
FileRemoveOperatorDescriptor(jobSpec,
+                partitioningProperties.getSpiltsProvider(), false, 
partitioningProperties.getComputeStorageMap());
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec, 
frod,
                 partitioningProperties.getConstraints());
         jobSpec.addRoot(frod);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 07500f710f..72961b8648 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -26,21 +26,17 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
 
 import org.apache.asterix.app.result.ResponsePrinter;
 import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import 
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.functions.FunctionSignature;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.transactions.TxnId;
-import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
 import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -50,7 +46,6 @@ import 
org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
 import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
 import org.apache.asterix.file.StorageComponentProvider;
 import org.apache.asterix.lang.common.base.Expression;
@@ -114,13 +109,11 @@ import 
org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
 import 
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
 import 
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
 import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
 import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
 
@@ -155,23 +148,14 @@ public class FeedOperations {
     }
 
     public static JobSpecification buildRemoveFeedStorageJob(MetadataProvider 
metadataProvider, Feed feed)
-            throws AsterixException {
+            throws AlgebricksException {
         ICcApplicationContext appCtx = 
metadataProvider.getApplicationContext();
         JobSpecification spec = RuntimeUtils.createJobSpecification(appCtx);
-        IClusterStateManager csm = appCtx.getClusterStateManager();
-        AlgebricksAbsolutePartitionConstraint allCluster = 
csm.getClusterLocations();
-        Set<String> nodes = new TreeSet<>();
-        for (String node : allCluster.getLocations()) {
-            nodes.add(node);
-        }
-        AlgebricksAbsolutePartitionConstraint locations =
-                new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new 
String[nodes.size()]));
-        FileSplit[] feedLogFileSplits =
-                FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(), 
feed.getFeedName(), locations);
-        org.apache.hyracks.algebricks.common.utils.Pair<IFileSplitProvider, 
AlgebricksPartitionConstraint> spC =
-                
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
-        FileRemoveOperatorDescriptor frod = new 
FileRemoveOperatorDescriptor(spec, spC.first, true);
-        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod, 
spC.second);
+        PartitioningProperties partitioningProperties = 
metadataProvider.getPartitioningProperties(feed);
+        FileRemoveOperatorDescriptor frod = new 
FileRemoveOperatorDescriptor(spec,
+                partitioningProperties.getSpiltsProvider(), true, 
partitioningProperties.getComputeStorageMap());
+        
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod,
+                partitioningProperties.getConstraints());
         spec.addRoot(frod);
         return spec;
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index c648b3351f..a8df92aa2e 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -683,10 +683,12 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         IIndexDataflowHelperFactory indexDataflowHelperFactory = new 
IndexDataflowHelperFactory(
                 storageComponentProvider.getStorageManager(), 
partitioningProperties.getSpiltsProvider());
         if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+            int[][] partitionsMap = 
partitioningProperties.getComputeStorageMap();
             rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec, 
outputRecDesc, keyFields, true, true,
                     indexDataflowHelperFactory, retainInput, retainMissing, 
nonMatchWriterFactory,
                     searchCallbackFactory, minFilterFieldIndexes, 
maxFilterFieldIndexes, propagateFilter,
-                    nonFilterWriterFactory, isIndexOnlyPlan, 
failValueForIndexOnlyPlan, successValueForIndexOnlyPlan);
+                    nonFilterWriterFactory, isIndexOnlyPlan, 
failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
+                    partitionsMap);
         } else {
             // Create the operator
             rtreeSearchOp = null;
@@ -731,52 +733,6 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         return new Pair<>(resultWriter, null);
     }
 
-    @Override
-    public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getWriteResultRuntime(
-            IDataSource<DataSourceId> dataSource, IOperatorSchema 
propagatedSchema, List<LogicalVariable> keys,
-            LogicalVariable payload, List<LogicalVariable> 
additionalNonKeyFields, JobGenContext context,
-            JobSpecification spec) throws AlgebricksException {
-        DataverseName dataverseName = dataSource.getId().getDataverseName();
-        String datasetName = dataSource.getId().getDatasourceName();
-        Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx, 
dataverseName, datasetName);
-        int numKeys = keys.size();
-        int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 
: 1;
-
-        // move key fields to front
-        int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
-        int[] pkFields = new int[numKeys];
-        int i = 0;
-        for (LogicalVariable varKey : keys) {
-            int idx = propagatedSchema.findVariable(varKey);
-            fieldPermutation[i] = idx;
-            pkFields[i] = idx;
-            i++;
-        }
-        fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
-        if (numFilterFields > 0) {
-            int idx = 
propagatedSchema.findVariable(additionalNonKeyFields.get(0));
-            fieldPermutation[numKeys + 1] = idx;
-        }
-
-        PartitioningProperties partitioningProperties = 
getPartitioningProperties(dataset);
-        long numElementsHint = getCardinalityPerPartitionHint(dataset);
-        // TODO
-        // figure out the right behavior of the bulkload and then give the
-        // right callback
-        // (ex. what's the expected behavior when there is an error during
-        // bulkload?)
-        IBinaryHashFunctionFactory[] pkHashFunFactories = 
dataset.getPrimaryHashFunctionFactories(this);
-        ITuplePartitionerFactory partitionerFactory = new 
FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
-                partitioningProperties.getNumberOfPartitions());
-        IIndexDataflowHelperFactory indexHelperFactory = new 
IndexDataflowHelperFactory(
-                storageComponentProvider.getStorageManager(), 
partitioningProperties.getSpiltsProvider());
-        LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new 
LSMIndexBulkLoadOperatorDescriptor(spec, null,
-                fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR, 
false, numElementsHint, true,
-                indexHelperFactory, null, BulkLoadUsage.LOAD, 
dataset.getDatasetId(), null, partitionerFactory,
-                partitioningProperties.getComputeStorageMap());
-        return new Pair<>(btreeBulkLoad, 
partitioningProperties.getConstraints());
-    }
-
     @Override
     public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getInsertRuntime(
             IDataSource<DataSourceId> dataSource, IOperatorSchema 
propagatedSchema, IVariableTypeEnvironment typeEnv,
@@ -972,7 +928,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
     }
 
     public PartitioningProperties splitAndConstraints(DataverseName 
dataverseName) {
-        return dataPartitioningProvider.splitAndConstraints(dataverseName);
+        return 
dataPartitioningProvider.getPartitioningProperties(dataverseName);
     }
 
     public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx, 
Dataset dataset, String indexName)
@@ -1837,6 +1793,10 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx, 
ds, indexName);
     }
 
+    public PartitioningProperties getPartitioningProperties(Feed feed) throws 
AlgebricksException {
+        return dataPartitioningProvider.getPartitioningProperties(feed);
+    }
+
     public List<Pair<IFileSplitProvider, String>> 
getSplitProviderOfAllIndexes(Dataset ds) throws AlgebricksException {
         List<Index> dsIndexes = getDatasetIndexes(ds.getDataverseName(), 
ds.getDatasetName()).stream()
                 .filter(idx -> idx.getIndexType() != IndexType.SAMPLE && 
idx.isSecondaryIndex())
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
index f5e96b1691..ec4c985369 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -21,14 +21,22 @@ package org.apache.asterix.metadata.utils;
 import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC;
 import static org.apache.asterix.common.utils.PartitioningScheme.STATIC;
 
+import java.util.Arrays;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.cluster.PartitioningProperties;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.common.utils.PartitioningScheme;
 import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.metadata.MetadataTransactionContext;
 import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Feed;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -48,7 +56,7 @@ public class DataPartitioningProvider implements 
IDataPartitioningProvider {
         scheme = appCtx.getStorageProperties().getPartitioningScheme();
     }
 
-    public PartitioningProperties splitAndConstraints(DataverseName 
dataverseName) {
+    public PartitioningProperties getPartitioningProperties(DataverseName 
dataverseName) {
         if (scheme == DYNAMIC) {
             Pair<IFileSplitProvider, AlgebricksPartitionConstraint> 
splitsAndConstraints = SplitsAndConstraintsUtil
                     
.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(), 
dataverseName);
@@ -75,6 +83,25 @@ public class DataPartitioningProvider implements 
IDataPartitioningProvider {
         throw new IllegalStateException();
     }
 
+    public PartitioningProperties getPartitioningProperties(Feed feed) throws 
AsterixException {
+        if (scheme == DYNAMIC) {
+            IClusterStateManager csm = appCtx.getClusterStateManager();
+            AlgebricksAbsolutePartitionConstraint allCluster = 
csm.getClusterLocations();
+            Set<String> nodes = new 
TreeSet<>(Arrays.asList(allCluster.getLocations()));
+            AlgebricksAbsolutePartitionConstraint locations =
+                    new 
AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0]));
+            FileSplit[] feedLogFileSplits =
+                    FeedUtils.splitsForAdapter(appCtx, 
feed.getDataverseName(), feed.getFeedName(), locations);
+            Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
+                    
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
+            int[][] partitionsMap = 
getPartitionsMap(getNumPartitions(spC.second));
+            return PartitioningProperties.of(spC.first, spC.second, 
partitionsMap);
+        } else if (scheme == STATIC) {
+            throw new NotImplementedException();
+        }
+        throw new IllegalStateException();
+    }
+
     private static int getNumPartitions(AlgebricksPartitionConstraint 
constraint) {
         if (constraint.getPartitionConstraintType() == 
AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
             return ((AlgebricksCountPartitionConstraint) 
constraint).getCount();
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 27e06eb8cb..130e39c357 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -370,8 +370,8 @@ public class DatasetUtil {
         IIndexDataflowHelperFactory indexHelperFactory =
                 new 
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
                         partitioningProperties.getSpiltsProvider());
-        LSMTreeIndexCompactOperatorDescriptor compactOp =
-                new LSMTreeIndexCompactOperatorDescriptor(spec, 
indexHelperFactory);
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new 
LSMTreeIndexCompactOperatorDescriptor(spec,
+                indexHelperFactory, 
partitioningProperties.getComputeStorageMap());
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
                 partitioningProperties.getConstraints());
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 498f3d4882..8dc0d966c8 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -100,8 +100,8 @@ public abstract class SecondaryTreeIndexOperationsHelper 
extends SecondaryIndexO
         IIndexDataflowHelperFactory dataflowHelperFactory =
                 new 
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
                         partitioningProperties.getSpiltsProvider());
-        LSMTreeIndexCompactOperatorDescriptor compactOp =
-                new LSMTreeIndexCompactOperatorDescriptor(spec, 
dataflowHelperFactory);
+        LSMTreeIndexCompactOperatorDescriptor compactOp = new 
LSMTreeIndexCompactOperatorDescriptor(spec,
+                dataflowHelperFactory, 
partitioningProperties.getComputeStorageMap());
         compactOp.setSourceLocation(sourceLoc);
         
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, 
compactOp,
                 secondaryPartitionConstraint);
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
index ef9e75b310..dce86387e1 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
@@ -59,6 +59,7 @@ public final class DatasetStreamStatsOperatorDescriptor 
extends AbstractSingleAc
     public DatasetStreamStatsOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor rDesc,
             String operatorName, IIndexDataflowHelperFactory[] indexes, 
String[] indexesNames) {
         super(spec, 1, 1);
+        //TODO(partitioning)
         outRecDescs[0] = rDesc;
         this.operatorName = operatorName;
         this.indexes = indexes;
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
index 4f1dfd7acc..50fe998303 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
@@ -47,6 +47,7 @@ public class LSMSecondaryIndexBulkLoadOperatorDescriptor 
extends AbstractSingleA
             IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[] 
fieldPermutation, int numTagFields,
             int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) {
         super(spec, 1, 1);
+        //TODO(partitioning) correlated
         this.outRecDescs[0] = outRecDesc;
         this.primaryIndexHelperFactory = primaryIndexHelperFactory;
         this.secondaryIndexHelperFactory = secondaryIndexHelperFactory;
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
index f788d2374f..488b73bdde 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
@@ -48,6 +48,7 @@ public class 
LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor
             int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree, 
boolean excludeUnknownKeys,
             boolean forAnyUnknownKey) {
         super(spec, 1, 1);
+        //TODO(partitioning) correlated
         this.outRecDescs[0] = outRecDesc;
         this.missingWriterFactory = missingWriterFactory;
         this.numTagFields = numTagFields;
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 4a6e76e16b..e34ff2143a 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -66,11 +66,6 @@ public interface IMetadataProvider<S, I> {
             IResultSerializerFactoryProvider resultSerializerFactoryProvider, 
RecordDescriptor inputDesc,
             IResultMetadata metadata, JobSpecification spec) throws 
AlgebricksException;
 
-    Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getWriteResultRuntime(IDataSource<S> dataSource,
-            IOperatorSchema propagatedSchema, List<LogicalVariable> keys, 
LogicalVariable payLoadVar,
-            List<LogicalVariable> additionalNonKeyFields, JobGenContext 
context, JobSpecification jobSpec)
-            throws AlgebricksException;
-
     Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getInsertRuntime(IDataSource<S> dataSource,
             IOperatorSchema propagatedSchema, IVariableTypeEnvironment 
typeEnv, List<LogicalVariable> keys,
             LogicalVariable payLoadVar, List<LogicalVariable> 
additionalFilterKeyFields,
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
index ed03cc74f8..49110b9638 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -82,8 +82,8 @@ public interface IOperatorDescriptor extends Serializable {
      *
      * @param constraintAcceptor
      *            - Constraint Acceptor
-     * @param plan
-     *            - Job Plan
+     * @param ccServiceCtx
+     *            - CC Service Context
      */
     void contributeSchedulingConstraints(IConstraintAcceptor 
constraintAcceptor, ICCServiceContext ccServiceCtx);
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
index 57a5f692b2..76cd48afa1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
@@ -20,7 +20,6 @@
 package org.apache.hyracks.dataflow.std.file;
 
 import java.io.File;
-import java.io.IOException;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.comm.IFrameWriter;
@@ -32,36 +31,31 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
 import 
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
 
 public class FileRemoveOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
 
+    private static final long serialVersionUID = 2L;
+
     private final IFileSplitProvider fileSplitProvider;
     private final boolean quietly;
+    private final int[][] partitionsMap;
 
     public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, 
IFileSplitProvider fileSplitProvder,
-            boolean quietly) {
+            boolean quietly, int[][] partitionsMap) {
         super(spec, 0, 0);
         this.fileSplitProvider = fileSplitProvder;
         this.quietly = quietly;
+        this.partitionsMap = partitionsMap;
     }
 
-    /**
-     *
-     * @deprecated use {@link 
#FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, 
IFileSplitProvider fileSplitProvder, boolean quietly)} instead.
-     */
-    @Deprecated
-    public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec, 
IFileSplitProvider fileSplitProvder) {
-        this(spec, fileSplitProvder, false);
-    }
-
-    private static final long serialVersionUID = 1L;
-
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
-        final FileSplit split = fileSplitProvider.getFileSplits()[partition];
+        final FileSplit[] splits = fileSplitProvider.getFileSplits();
+        final int[] splitsIndexes = partitionsMap[partition];
         final IIOManager ioManager = ctx.getIoManager();
         return new AbstractOperatorNodePushable() {
 
@@ -73,16 +67,7 @@ public class FileRemoveOperatorDescriptor extends 
AbstractSingleActivityOperator
             @Override
             public void initialize() throws HyracksDataException {
                 // will only work for files inside the io devices
-                File f = split.getFile(ioManager);
-                if (quietly) {
-                    FileUtils.deleteQuietly(f);
-                } else {
-                    try {
-                        FileUtils.deleteDirectory(f);
-                    } catch (IOException e) {
-                        throw HyracksDataException.create(e);
-                    }
-                }
+                deleteFiles();
             }
 
             @Override
@@ -98,6 +83,25 @@ public class FileRemoveOperatorDescriptor extends 
AbstractSingleActivityOperator
             @Override
             public void deinitialize() throws HyracksDataException {
             }
+
+            private void deleteFiles() throws HyracksDataException {
+                Throwable failure = null;
+                for (int splitsIndex : splitsIndexes) {
+                    try {
+                        File file = splits[splitsIndex].getFile(ioManager);
+                        if (quietly) {
+                            FileUtils.deleteQuietly(file);
+                        } else {
+                            FileUtils.deleteDirectory(file);
+                        }
+                    } catch (Throwable th) {
+                        failure = ExceptionUtils.suppress(failure, th);
+                    }
+                }
+                if (failure != null) {
+                    throw HyracksDataException.create(failure);
+                }
+            }
         };
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
index 7b687c44e1..1302bce0a1 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
@@ -30,9 +30,9 @@ public class ConstantTupleSourceOperatorDescriptor extends 
AbstractSingleActivit
 
     private static final long serialVersionUID = 1L;
 
-    private int[] fieldSlots;
-    private byte[] tupleData;
-    private int tupleSize;
+    private final int[] fieldSlots;
+    private final byte[] tupleData;
+    private final int tupleSize;
 
     public ConstantTupleSourceOperatorDescriptor(IOperatorDescriptorRegistry 
spec, RecordDescriptor recDesc,
             int[] fieldSlots, byte[] tupleData, int tupleSize) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 9bd0c598a1..785a33044c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -26,11 +26,11 @@ import 
org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
 import 
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
 
 public class ConstantTupleSourceOperatorNodePushable extends 
AbstractUnaryOutputSourceOperatorNodePushable {
-    private IHyracksTaskContext ctx;
 
-    private int[] fieldSlots;
-    private byte[] tupleData;
-    private int tupleSize;
+    private final IHyracksTaskContext ctx;
+    private final int[] fieldSlots;
+    private final byte[] tupleData;
+    private final int tupleSize;
 
     public ConstantTupleSourceOperatorNodePushable(IHyracksTaskContext ctx, 
int[] fieldSlots, byte[] tupleData,
             int tupleSize) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
index 6a31962654..68e4ac59b7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
@@ -44,6 +44,7 @@ import 
org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescripto
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import org.apache.hyracks.storage.common.IResourceFactory;
 import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.apache.hyracks.test.support.TestUtils;
 import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
 import org.apache.hyracks.tests.am.rtree.RTreeSecondaryIndexSearchOperatorTest;
 import org.junit.Test;
@@ -95,7 +96,7 @@ public class 
LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest
         int[] keyFields = { 0, 1, 2, 3 };
         RTreeSearchOperatorDescriptor secondarySearchOp = new 
RTreeSearchOperatorDescriptor(spec,
                 secondaryWithFilterRecDesc, keyFields, true, true, 
secondaryHelperFactory, false, false, null,
-                NoOpOperationCallbackFactory.INSTANCE, null, null, false, 
null);
+                NoOpOperationCallbackFactory.INSTANCE, null, null, false, 
null, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
secondarySearchOp, NC1_ID);
 
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new 
FileSplit[] { createFile(nc1) });
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
index 0fcf8921a3..f233417318 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
@@ -43,6 +43,7 @@ import 
org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -86,9 +87,9 @@ public class RTreeSecondaryIndexInsertOperatorTest extends 
AbstractRTreeOperator
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), 
tb.getSize());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
keyProviderOp, NC1_ID);
         int[] keyFields = { 0, 1, 2, 3 };
-        RTreeSearchOperatorDescriptor secondarySearchOp =
-                new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc, 
keyFields, true, true, secondaryHelperFactory,
-                        false, false, null, 
NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+        RTreeSearchOperatorDescriptor secondarySearchOp = new 
RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+                keyFields, true, true, secondaryHelperFactory, false, false, 
null,
+                NoOpOperationCallbackFactory.INSTANCE, null, null, false, 
null, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
secondarySearchOp, NC1_ID);
         // fifth field from the tuples coming from secondary index
         int[] primaryLowKeyFields = { 4 };
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
index 3cfef19bda..2748988fea 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
@@ -42,6 +42,7 @@ import 
org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -84,9 +85,9 @@ public class RTreeSecondaryIndexScanOperatorTest extends 
AbstractRTreeOperatorTe
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), 
tb.getSize());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
keyProviderOp, NC1_ID);
         int[] keyFields = null;
-        RTreeSearchOperatorDescriptor secondarySearchOp =
-                new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc, 
keyFields, true, true, secondaryHelperFactory,
-                        false, false, null, 
NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+        RTreeSearchOperatorDescriptor secondarySearchOp = new 
RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+                keyFields, true, true, secondaryHelperFactory, false, false, 
null,
+                NoOpOperationCallbackFactory.INSTANCE, null, null, false, 
null, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
secondarySearchOp, NC1_ID);
         IFileSplitProvider outSplits = new ConstantFileSplitProvider(new 
FileSplit[] { createFile(nc1) });
         IOperatorDescriptor printer = new 
PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
index 8ea0701242..da5de77ccb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
@@ -43,6 +43,7 @@ import 
org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
 import 
org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
 import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
 import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.test.support.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -84,9 +85,9 @@ public class RTreeSecondaryIndexSearchOperatorTest extends 
AbstractRTreeOperator
                 keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), 
tb.getSize());
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
keyProviderOp, NC1_ID);
         int[] keyFields = { 0, 1, 2, 3 };
-        RTreeSearchOperatorDescriptor secondarySearchOp =
-                new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc, 
keyFields, true, true, secondaryHelperFactory,
-                        false, false, null, 
NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+        RTreeSearchOperatorDescriptor secondarySearchOp = new 
RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+                keyFields, true, true, secondaryHelperFactory, false, false, 
null,
+                NoOpOperationCallbackFactory.INSTANCE, null, null, false, 
null, TestUtils.getPartitionsMap(1));
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
secondarySearchOp, NC1_ID);
         // fifth field from the tuples coming from secondary index
         int[] primaryLowKeyFields = { 4 };
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 88c06eb91a..c3719c744f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -131,6 +131,7 @@ public abstract class IndexSearchOperatorNodePushable 
extends AbstractUnaryInput
             ITuplePartitionerFactory tuplePartitionerFactory, int[][] 
partitionsMap) throws HyracksDataException {
         this.ctx = ctx;
         this.appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
+        //TODO(partitioning) partitionsMap should not be null
         this.partitions = partitionsMap != null ? partitionsMap[partition] : 
new int[] { partition };
         for (int i = 0; i < partitions.length; i++) {
             storagePartitionId2Index.put(partitions[i], i);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
index 099d668c6d..06dad1c093 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
@@ -37,6 +37,7 @@ public class TreeIndexDiskOrderScanOperatorDescriptor extends 
AbstractSingleActi
     public 
TreeIndexDiskOrderScanOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor outRecDesc,
             IIndexDataflowHelperFactory indexHelperFactory, 
ISearchOperationCallbackFactory searchCallbackFactory) {
         super(spec, 0, 1);
+        //TODO(maybe don't fix)
         this.indexHelperFactory = indexHelperFactory;
         this.searchCallbackFactory = searchCallbackFactory;
         this.outRecDescs[0] = outRecDesc;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
index 6ed185892b..104eed4f0c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
@@ -40,6 +40,7 @@ public class TreeIndexStatsOperatorDescriptor extends 
AbstractSingleActivityOper
     public TreeIndexStatsOperatorDescriptor(IOperatorDescriptorRegistry spec, 
IStorageManager storageManager,
             IIndexDataflowHelperFactory indexHelperFactory) {
         super(spec, 0, 1);
+        //TODO(maybe don't fix)
         this.indexHelperFactory = indexHelperFactory;
         this.storageManager = storageManager;
         this.outRecDescs[0] = recDesc;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
index 291363d738..da41f7e205 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
@@ -38,6 +38,7 @@ public class LSMBTreeDiskComponentScanOperatorDescriptor 
extends AbstractSingleA
     public 
LSMBTreeDiskComponentScanOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor outRecDesc,
             IIndexDataflowHelperFactory indexHelperFactory, 
ISearchOperationCallbackFactory searchCallbackFactory) {
         super(spec, 1, 1);
+        //TODO(partitioning) correlated
         this.indexHelperFactory = indexHelperFactory;
         this.searchCallbackFactory = searchCallbackFactory;
         this.outRecDescs[0] = outRecDesc;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
index 682ffefae4..139a87195d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
@@ -22,6 +22,7 @@ import org.apache.hyracks.api.comm.IFrameWriter;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
 import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
 import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
 import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -30,16 +31,25 @@ import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
 
 public class LSMIndexCompactOperatorNodePushable extends 
AbstractOperatorNodePushable {
-    private final IIndexDataflowHelper indexHelper;
+
+    private final IIndexDataflowHelper[] indexHelpers;
 
     public LSMIndexCompactOperatorNodePushable(IHyracksTaskContext ctx, int 
partition,
-            IIndexDataflowHelperFactory indexHelperFactory) throws 
HyracksDataException {
-        this.indexHelper = 
indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
partition);
+            IIndexDataflowHelperFactory indexHelperFactory, int[][] 
partitionsMap) throws HyracksDataException {
+        int[] partitions = partitionsMap[partition];
+        indexHelpers = new IIndexDataflowHelper[partitions.length];
+        for (int i = 0; i < partitions.length; i++) {
+            indexHelpers[i] = 
indexHelperFactory.create(ctx.getJobletContext().getServiceContext(), 
partitions[i]);
+        }
+
     }
 
     @Override
     public void deinitialize() throws HyracksDataException {
-        indexHelper.close();
+        Throwable failure = CleanupUtils.close(indexHelpers, null);
+        if (failure != null) {
+            throw HyracksDataException.create(failure);
+        }
     }
 
     @Override
@@ -54,10 +64,12 @@ public class LSMIndexCompactOperatorNodePushable extends 
AbstractOperatorNodePus
 
     @Override
     public void initialize() throws HyracksDataException {
-        indexHelper.open();
-        ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
-        ILSMIndexAccessor accessor = 
index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
-        accessor.scheduleFullMerge();
+        for (IIndexDataflowHelper indexHelper : indexHelpers) {
+            indexHelper.open();
+            ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
+            ILSMIndexAccessor accessor = 
index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+            accessor.scheduleFullMerge();
+        }
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
index 6da4c8f5bc..aad1cc909f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
@@ -29,18 +29,20 @@ import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory
 
 public class LSMTreeIndexCompactOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     private final IIndexDataflowHelperFactory indexHelperFactory;
+    private final int[][] partitionsMap;
 
     public LSMTreeIndexCompactOperatorDescriptor(IOperatorDescriptorRegistry 
spec,
-            IIndexDataflowHelperFactory indexHelperFactory) {
+            IIndexDataflowHelperFactory indexHelperFactory, int[][] 
partitionsMap) {
         super(spec, 0, 0);
         this.indexHelperFactory = indexHelperFactory;
+        this.partitionsMap = partitionsMap;
     }
 
     @Override
     public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
             IRecordDescriptorProvider recordDescProvider, int partition, int 
nPartitions) throws HyracksDataException {
-        return new LSMIndexCompactOperatorNodePushable(ctx, partition, 
indexHelperFactory);
+        return new LSMIndexCompactOperatorNodePushable(ctx, partition, 
indexHelperFactory, partitionsMap);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
index cc6d76d9b1..8e7a7fedf2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
@@ -32,7 +32,7 @@ import 
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory
 
 public class RTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperatorDescriptor {
 
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     protected final int[] keyFields;
     protected final boolean lowKeyInclusive;
     protected final boolean highKeyInclusive;
@@ -45,6 +45,7 @@ public class RTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
     protected final boolean retainMissing;
     protected final IMissingWriterFactory missingWriterFactory;
     protected final ISearchOperationCallbackFactory searchCallbackFactory;
+    protected final int[][] partitionsMap;
     protected boolean appendOpCallbackProceedResult;
     protected byte[] searchCallbackProceedResultFalseValue;
     protected byte[] searchCallbackProceedResultTrueValue;
@@ -53,10 +54,11 @@ public class RTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
             boolean lowKeyInclusive, boolean highKeyInclusive, 
IIndexDataflowHelperFactory indexHelperFactory,
             boolean retainInput, boolean retainMissing, IMissingWriterFactory 
missingWriterFactory,
             ISearchOperationCallbackFactory searchCallbackFactory, int[] 
minFilterFieldIndexes,
-            int[] maxFilterFieldIndexes, boolean appendIndexFilter, 
IMissingWriterFactory nonFilterWriterFactory) {
+            int[] maxFilterFieldIndexes, boolean appendIndexFilter, 
IMissingWriterFactory nonFilterWriterFactory,
+            int[][] partitionsMap) {
         this(spec, outRecDesc, keyFields, lowKeyInclusive, highKeyInclusive, 
indexHelperFactory, retainInput,
                 retainMissing, missingWriterFactory, searchCallbackFactory, 
minFilterFieldIndexes,
-                maxFilterFieldIndexes, appendIndexFilter, 
nonFilterWriterFactory, false, null, null);
+                maxFilterFieldIndexes, appendIndexFilter, 
nonFilterWriterFactory, false, null, null, partitionsMap);
     }
 
     public RTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec, 
RecordDescriptor outRecDesc, int[] keyFields,
@@ -65,7 +67,7 @@ public class RTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
             ISearchOperationCallbackFactory searchCallbackFactory, int[] 
minFilterFieldIndexes,
             int[] maxFilterFieldIndexes, boolean appendIndexFilter, 
IMissingWriterFactory nonFilterWriterFactory,
             boolean appendOpCallbackProceedResult, byte[] 
searchCallbackProceedResultFalseValue,
-            byte[] searchCallbackProceedResultTrueValue) {
+            byte[] searchCallbackProceedResultTrueValue, int[][] 
partitionsMap) {
         super(spec, 1, 1);
         this.indexHelperFactory = indexHelperFactory;
         this.retainInput = retainInput;
@@ -79,6 +81,7 @@ public class RTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
         this.maxFilterFieldIndexes = maxFilterFieldIndexes;
         this.appendIndexFilter = appendIndexFilter;
         this.nonFilterWriterFactory = nonFilterWriterFactory;
+        this.partitionsMap = partitionsMap;
         this.outRecDescs[0] = outRecDesc;
         this.appendOpCallbackProceedResult = appendOpCallbackProceedResult;
         this.searchCallbackProceedResultFalseValue = 
searchCallbackProceedResultFalseValue;
@@ -92,6 +95,6 @@ public class RTreeSearchOperatorDescriptor extends 
AbstractSingleActivityOperato
                 recordDescProvider.getInputRecordDescriptor(getActivityId(), 
0), keyFields, minFilterFieldIndexes,
                 maxFilterFieldIndexes, indexHelperFactory, retainInput, 
retainMissing, missingWriterFactory,
                 searchCallbackFactory, appendIndexFilter, 
nonFilterWriterFactory, appendOpCallbackProceedResult,
-                searchCallbackProceedResultFalseValue, 
searchCallbackProceedResultTrueValue);
+                searchCallbackProceedResultFalseValue, 
searchCallbackProceedResultTrueValue, partitionsMap);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index aef624fa7b..7a7dc0e771 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -47,12 +47,12 @@ public class RTreeSearchOperatorNodePushable extends 
IndexSearchOperatorNodePush
             IMissingWriterFactory missingWriterFactory, 
ISearchOperationCallbackFactory searchCallbackFactory,
             boolean appendIndexFilter, IMissingWriterFactory 
nonFilterWriterFactory,
             boolean appendOpCallbackProceedResult, byte[] 
searchCallbackProceedResultFalseValue,
-            byte[] searchCallbackProceedResultTrueValue) throws 
HyracksDataException {
+            byte[] searchCallbackProceedResultTrueValue, int[][] 
partitionsMap) throws HyracksDataException {
         // TODO: predicate & limit pushdown not enabled for RTree yet
         super(ctx, inputRecDesc, partition, minFilterFieldIndexes, 
maxFilterFieldIndexes, indexHelperFactory,
                 retainInput, retainMissing, missingWriterFactory, 
searchCallbackFactory, appendIndexFilter,
                 nonFilterWriterFactory, null, -1, 
appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
-                searchCallbackProceedResultTrueValue, 
DefaultTupleProjectorFactory.INSTANCE, null, null);
+                searchCallbackProceedResultTrueValue, 
DefaultTupleProjectorFactory.INSTANCE, null, partitionsMap);
         if (keyFields != null && keyFields.length > 0) {
             searchKey = new PermutingFrameTupleReference();
             searchKey.setFieldPermutation(keyFields);

Reply via email to