This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4fc85d3d81 [ASTERIXDB-3144][HYR][RT] Make other operators support
multiple partitions
4fc85d3d81 is described below
commit 4fc85d3d8112af713b0906821f9e95ceee9dd6c4
Author: Ali Alsuliman <[email protected]>
AuthorDate: Tue May 2 19:53:53 2023 -0700
[ASTERIXDB-3144][HYR][RT] Make other operators support multiple partitions
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
This patch changes the following operators to support
operating on multiple partitions. This is a step towards
achieving compute/storage separation:
- compact dataset/index
- drop dataverse
- remove feed storage
- RTree search
Remove unused getWriteResultRuntime from IMetadataProvider.
Change-Id: I2b1223c0d1c3248305014396dec4c1d0bdef13f6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17505
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../org/apache/asterix/utils/DataverseUtil.java | 4 +-
.../org/apache/asterix/utils/FeedOperations.java | 30 +++---------
.../metadata/declared/MetadataProvider.java | 56 ++++------------------
.../metadata/utils/DataPartitioningProvider.java | 29 ++++++++++-
.../apache/asterix/metadata/utils/DatasetUtil.java | 4 +-
.../utils/SecondaryTreeIndexOperationsHelper.java | 4 +-
.../DatasetStreamStatsOperatorDescriptor.java | 1 +
...SMSecondaryIndexBulkLoadOperatorDescriptor.java | 1 +
...exCreationTupleProcessorOperatorDescriptor.java | 1 +
.../core/algebra/metadata/IMetadataProvider.java | 5 --
.../hyracks/api/dataflow/IOperatorDescriptor.java | 4 +-
.../std/file/FileRemoveOperatorDescriptor.java | 52 ++++++++++----------
.../ConstantTupleSourceOperatorDescriptor.java | 6 +--
.../ConstantTupleSourceOperatorNodePushable.java | 8 ++--
...tterTuplesSecondaryIndexSearchOperatorTest.java | 3 +-
.../RTreeSecondaryIndexInsertOperatorTest.java | 7 +--
.../rtree/RTreeSecondaryIndexScanOperatorTest.java | 7 +--
.../RTreeSecondaryIndexSearchOperatorTest.java | 7 +--
.../dataflow/IndexSearchOperatorNodePushable.java | 1 +
.../TreeIndexDiskOrderScanOperatorDescriptor.java | 1 +
.../dataflow/TreeIndexStatsOperatorDescriptor.java | 1 +
...SMBTreeDiskComponentScanOperatorDescriptor.java | 1 +
.../LSMIndexCompactOperatorNodePushable.java | 28 +++++++----
.../LSMTreeIndexCompactOperatorDescriptor.java | 8 ++--
.../dataflow/RTreeSearchOperatorDescriptor.java | 13 +++--
.../dataflow/RTreeSearchOperatorNodePushable.java | 4 +-
26 files changed, 142 insertions(+), 144 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
index d5743cbdf9..bfba414669 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/DataverseUtil.java
@@ -34,8 +34,8 @@ public class DataverseUtil {
public static JobSpecification dropDataverseJobSpec(Dataverse dataverse,
MetadataProvider metadata) {
JobSpecification jobSpec =
RuntimeUtils.createJobSpecification(metadata.getApplicationContext());
PartitioningProperties partitioningProperties =
metadata.splitAndConstraints(dataverse.getDataverseName());
- FileRemoveOperatorDescriptor frod =
- new FileRemoveOperatorDescriptor(jobSpec,
partitioningProperties.getSpiltsProvider(), false);
+ FileRemoveOperatorDescriptor frod = new
FileRemoveOperatorDescriptor(jobSpec,
+ partitioningProperties.getSpiltsProvider(), false,
partitioningProperties.getComputeStorageMap());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(jobSpec,
frod,
partitioningProperties.getConstraints());
jobSpec.addRoot(frod);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
index 07500f710f..72961b8648 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/utils/FeedOperations.java
@@ -26,21 +26,17 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeSet;
import org.apache.asterix.app.result.ResponsePrinter;
import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import
org.apache.asterix.common.dataflow.LSMTreeInsertDeleteOperatorDescriptor;
import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.functions.FunctionSignature;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.transactions.TxnId;
-import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.compiler.provider.SqlppCompilationProvider;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.feed.management.FeedConnectionId;
@@ -50,7 +46,6 @@ import
org.apache.asterix.external.operators.FeedCollectOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.operators.FeedMetaOperatorDescriptor;
import org.apache.asterix.external.util.ExternalDataUtils;
-import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.external.util.FeedUtils.FeedRuntimeType;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.lang.common.base.Expression;
@@ -114,13 +109,11 @@ import
org.apache.hyracks.api.dataflow.IConnectorDescriptor;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.OperatorDescriptorId;
import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobSpecification;
import
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningConnectorDescriptor;
import
org.apache.hyracks.dataflow.std.connectors.MToNPartitioningWithMessageConnectorDescriptor;
import org.apache.hyracks.dataflow.std.connectors.OneToOneConnectorDescriptor;
import org.apache.hyracks.dataflow.std.file.FileRemoveOperatorDescriptor;
-import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
import org.apache.hyracks.dataflow.std.misc.ReplicateOperatorDescriptor;
@@ -155,23 +148,14 @@ public class FeedOperations {
}
public static JobSpecification buildRemoveFeedStorageJob(MetadataProvider
metadataProvider, Feed feed)
- throws AsterixException {
+ throws AlgebricksException {
ICcApplicationContext appCtx =
metadataProvider.getApplicationContext();
JobSpecification spec = RuntimeUtils.createJobSpecification(appCtx);
- IClusterStateManager csm = appCtx.getClusterStateManager();
- AlgebricksAbsolutePartitionConstraint allCluster =
csm.getClusterLocations();
- Set<String> nodes = new TreeSet<>();
- for (String node : allCluster.getLocations()) {
- nodes.add(node);
- }
- AlgebricksAbsolutePartitionConstraint locations =
- new AlgebricksAbsolutePartitionConstraint(nodes.toArray(new
String[nodes.size()]));
- FileSplit[] feedLogFileSplits =
- FeedUtils.splitsForAdapter(appCtx, feed.getDataverseName(),
feed.getFeedName(), locations);
- org.apache.hyracks.algebricks.common.utils.Pair<IFileSplitProvider,
AlgebricksPartitionConstraint> spC =
-
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
- FileRemoveOperatorDescriptor frod = new
FileRemoveOperatorDescriptor(spec, spC.first, true);
-
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod,
spC.second);
+ PartitioningProperties partitioningProperties =
metadataProvider.getPartitioningProperties(feed);
+ FileRemoveOperatorDescriptor frod = new
FileRemoveOperatorDescriptor(spec,
+ partitioningProperties.getSpiltsProvider(), true,
partitioningProperties.getComputeStorageMap());
+
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, frod,
+ partitioningProperties.getConstraints());
spec.addRoot(frod);
return spec;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index c648b3351f..a8df92aa2e 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -683,10 +683,12 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
IIndexDataflowHelperFactory indexDataflowHelperFactory = new
IndexDataflowHelperFactory(
storageComponentProvider.getStorageManager(),
partitioningProperties.getSpiltsProvider());
if (dataset.getDatasetType() == DatasetType.INTERNAL) {
+ int[][] partitionsMap =
partitioningProperties.getComputeStorageMap();
rtreeSearchOp = new RTreeSearchOperatorDescriptor(jobSpec,
outputRecDesc, keyFields, true, true,
indexDataflowHelperFactory, retainInput, retainMissing,
nonMatchWriterFactory,
searchCallbackFactory, minFilterFieldIndexes,
maxFilterFieldIndexes, propagateFilter,
- nonFilterWriterFactory, isIndexOnlyPlan,
failValueForIndexOnlyPlan, successValueForIndexOnlyPlan);
+ nonFilterWriterFactory, isIndexOnlyPlan,
failValueForIndexOnlyPlan, successValueForIndexOnlyPlan,
+ partitionsMap);
} else {
// Create the operator
rtreeSearchOp = null;
@@ -731,52 +733,6 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
return new Pair<>(resultWriter, null);
}
- @Override
- public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getWriteResultRuntime(
- IDataSource<DataSourceId> dataSource, IOperatorSchema
propagatedSchema, List<LogicalVariable> keys,
- LogicalVariable payload, List<LogicalVariable>
additionalNonKeyFields, JobGenContext context,
- JobSpecification spec) throws AlgebricksException {
- DataverseName dataverseName = dataSource.getId().getDataverseName();
- String datasetName = dataSource.getId().getDatasourceName();
- Dataset dataset = MetadataManagerUtil.findExistingDataset(mdTxnCtx,
dataverseName, datasetName);
- int numKeys = keys.size();
- int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0
: 1;
-
- // move key fields to front
- int[] fieldPermutation = new int[numKeys + 1 + numFilterFields];
- int[] pkFields = new int[numKeys];
- int i = 0;
- for (LogicalVariable varKey : keys) {
- int idx = propagatedSchema.findVariable(varKey);
- fieldPermutation[i] = idx;
- pkFields[i] = idx;
- i++;
- }
- fieldPermutation[numKeys] = propagatedSchema.findVariable(payload);
- if (numFilterFields > 0) {
- int idx =
propagatedSchema.findVariable(additionalNonKeyFields.get(0));
- fieldPermutation[numKeys + 1] = idx;
- }
-
- PartitioningProperties partitioningProperties =
getPartitioningProperties(dataset);
- long numElementsHint = getCardinalityPerPartitionHint(dataset);
- // TODO
- // figure out the right behavior of the bulkload and then give the
- // right callback
- // (ex. what's the expected behavior when there is an error during
- // bulkload?)
- IBinaryHashFunctionFactory[] pkHashFunFactories =
dataset.getPrimaryHashFunctionFactories(this);
- ITuplePartitionerFactory partitionerFactory = new
FieldHashPartitionerFactory(pkFields, pkHashFunFactories,
- partitioningProperties.getNumberOfPartitions());
- IIndexDataflowHelperFactory indexHelperFactory = new
IndexDataflowHelperFactory(
- storageComponentProvider.getStorageManager(),
partitioningProperties.getSpiltsProvider());
- LSMIndexBulkLoadOperatorDescriptor btreeBulkLoad = new
LSMIndexBulkLoadOperatorDescriptor(spec, null,
- fieldPermutation, StorageConstants.DEFAULT_TREE_FILL_FACTOR,
false, numElementsHint, true,
- indexHelperFactory, null, BulkLoadUsage.LOAD,
dataset.getDatasetId(), null, partitionerFactory,
- partitioningProperties.getComputeStorageMap());
- return new Pair<>(btreeBulkLoad,
partitioningProperties.getConstraints());
- }
-
@Override
public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getInsertRuntime(
IDataSource<DataSourceId> dataSource, IOperatorSchema
propagatedSchema, IVariableTypeEnvironment typeEnv,
@@ -972,7 +928,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
}
public PartitioningProperties splitAndConstraints(DataverseName
dataverseName) {
- return dataPartitioningProvider.splitAndConstraints(dataverseName);
+ return
dataPartitioningProvider.getPartitioningProperties(dataverseName);
}
public FileSplit[] splitsForIndex(MetadataTransactionContext mdTxnCtx,
Dataset dataset, String indexName)
@@ -1837,6 +1793,10 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
return dataPartitioningProvider.getPartitioningProperties(mdTxnCtx,
ds, indexName);
}
+ public PartitioningProperties getPartitioningProperties(Feed feed) throws
AlgebricksException {
+ return dataPartitioningProvider.getPartitioningProperties(feed);
+ }
+
public List<Pair<IFileSplitProvider, String>>
getSplitProviderOfAllIndexes(Dataset ds) throws AlgebricksException {
List<Index> dsIndexes = getDatasetIndexes(ds.getDataverseName(),
ds.getDatasetName()).stream()
.filter(idx -> idx.getIndexType() != IndexType.SAMPLE &&
idx.isSecondaryIndex())
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
index f5e96b1691..ec4c985369 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DataPartitioningProvider.java
@@ -21,14 +21,22 @@ package org.apache.asterix.metadata.utils;
import static org.apache.asterix.common.utils.PartitioningScheme.DYNAMIC;
import static org.apache.asterix.common.utils.PartitioningScheme.STATIC;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.IDataPartitioningProvider;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.utils.PartitioningScheme;
import org.apache.asterix.common.utils.StoragePathUtil;
+import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.entities.Dataset;
+import org.apache.asterix.metadata.entities.Feed;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksCountPartitionConstraint;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
@@ -48,7 +56,7 @@ public class DataPartitioningProvider implements
IDataPartitioningProvider {
scheme = appCtx.getStorageProperties().getPartitioningScheme();
}
- public PartitioningProperties splitAndConstraints(DataverseName
dataverseName) {
+ public PartitioningProperties getPartitioningProperties(DataverseName
dataverseName) {
if (scheme == DYNAMIC) {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint>
splitsAndConstraints = SplitsAndConstraintsUtil
.getDataverseSplitProviderAndConstraints(appCtx.getClusterStateManager(),
dataverseName);
@@ -75,6 +83,25 @@ public class DataPartitioningProvider implements
IDataPartitioningProvider {
throw new IllegalStateException();
}
+ public PartitioningProperties getPartitioningProperties(Feed feed) throws
AsterixException {
+ if (scheme == DYNAMIC) {
+ IClusterStateManager csm = appCtx.getClusterStateManager();
+ AlgebricksAbsolutePartitionConstraint allCluster =
csm.getClusterLocations();
+ Set<String> nodes = new
TreeSet<>(Arrays.asList(allCluster.getLocations()));
+ AlgebricksAbsolutePartitionConstraint locations =
+ new
AlgebricksAbsolutePartitionConstraint(nodes.toArray(new String[0]));
+ FileSplit[] feedLogFileSplits =
+ FeedUtils.splitsForAdapter(appCtx,
feed.getDataverseName(), feed.getFeedName(), locations);
+ Pair<IFileSplitProvider, AlgebricksPartitionConstraint> spC =
+
StoragePathUtil.splitProviderAndPartitionConstraints(feedLogFileSplits);
+ int[][] partitionsMap =
getPartitionsMap(getNumPartitions(spC.second));
+ return PartitioningProperties.of(spC.first, spC.second,
partitionsMap);
+ } else if (scheme == STATIC) {
+ throw new NotImplementedException();
+ }
+ throw new IllegalStateException();
+ }
+
private static int getNumPartitions(AlgebricksPartitionConstraint
constraint) {
if (constraint.getPartitionConstraintType() ==
AlgebricksPartitionConstraint.PartitionConstraintType.COUNT) {
return ((AlgebricksCountPartitionConstraint)
constraint).getCount();
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index 27e06eb8cb..130e39c357 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -370,8 +370,8 @@ public class DatasetUtil {
IIndexDataflowHelperFactory indexHelperFactory =
new
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
partitioningProperties.getSpiltsProvider());
- LSMTreeIndexCompactOperatorDescriptor compactOp =
- new LSMTreeIndexCompactOperatorDescriptor(spec,
indexHelperFactory);
+ LSMTreeIndexCompactOperatorDescriptor compactOp = new
LSMTreeIndexCompactOperatorDescriptor(spec,
+ indexHelperFactory,
partitioningProperties.getComputeStorageMap());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
compactOp,
partitioningProperties.getConstraints());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
compactOp,
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
index 498f3d4882..8dc0d966c8 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SecondaryTreeIndexOperationsHelper.java
@@ -100,8 +100,8 @@ public abstract class SecondaryTreeIndexOperationsHelper
extends SecondaryIndexO
IIndexDataflowHelperFactory dataflowHelperFactory =
new
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
partitioningProperties.getSpiltsProvider());
- LSMTreeIndexCompactOperatorDescriptor compactOp =
- new LSMTreeIndexCompactOperatorDescriptor(spec,
dataflowHelperFactory);
+ LSMTreeIndexCompactOperatorDescriptor compactOp = new
LSMTreeIndexCompactOperatorDescriptor(spec,
+ dataflowHelperFactory,
partitioningProperties.getComputeStorageMap());
compactOp.setSourceLocation(sourceLoc);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec,
compactOp,
secondaryPartitionConstraint);
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
index ef9e75b310..dce86387e1 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/DatasetStreamStatsOperatorDescriptor.java
@@ -59,6 +59,7 @@ public final class DatasetStreamStatsOperatorDescriptor
extends AbstractSingleAc
public DatasetStreamStatsOperatorDescriptor(IOperatorDescriptorRegistry
spec, RecordDescriptor rDesc,
String operatorName, IIndexDataflowHelperFactory[] indexes,
String[] indexesNames) {
super(spec, 1, 1);
+ //TODO(partitioning)
outRecDescs[0] = rDesc;
this.operatorName = operatorName;
this.indexes = indexes;
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
index 4f1dfd7acc..50fe998303 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadOperatorDescriptor.java
@@ -47,6 +47,7 @@ public class LSMSecondaryIndexBulkLoadOperatorDescriptor
extends AbstractSingleA
IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[]
fieldPermutation, int numTagFields,
int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree) {
super(spec, 1, 1);
+ //TODO(partitioning) correlated
this.outRecDescs[0] = outRecDesc;
this.primaryIndexHelperFactory = primaryIndexHelperFactory;
this.secondaryIndexHelperFactory = secondaryIndexHelperFactory;
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
index f788d2374f..488b73bdde 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor.java
@@ -48,6 +48,7 @@ public class
LSMSecondaryIndexCreationTupleProcessorOperatorDescriptor
int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree,
boolean excludeUnknownKeys,
boolean forAnyUnknownKey) {
super(spec, 1, 1);
+ //TODO(partitioning) correlated
this.outRecDescs[0] = outRecDesc;
this.missingWriterFactory = missingWriterFactory;
this.numTagFields = numTagFields;
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 4a6e76e16b..e34ff2143a 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -66,11 +66,6 @@ public interface IMetadataProvider<S, I> {
IResultSerializerFactoryProvider resultSerializerFactoryProvider,
RecordDescriptor inputDesc,
IResultMetadata metadata, JobSpecification spec) throws
AlgebricksException;
- Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getWriteResultRuntime(IDataSource<S> dataSource,
- IOperatorSchema propagatedSchema, List<LogicalVariable> keys,
LogicalVariable payLoadVar,
- List<LogicalVariable> additionalNonKeyFields, JobGenContext
context, JobSpecification jobSpec)
- throws AlgebricksException;
-
Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getInsertRuntime(IDataSource<S> dataSource,
IOperatorSchema propagatedSchema, IVariableTypeEnvironment
typeEnv, List<LogicalVariable> keys,
LogicalVariable payLoadVar, List<LogicalVariable>
additionalFilterKeyFields,
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
index ed03cc74f8..49110b9638 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/IOperatorDescriptor.java
@@ -82,8 +82,8 @@ public interface IOperatorDescriptor extends Serializable {
*
* @param constraintAcceptor
* - Constraint Acceptor
- * @param plan
- * - Job Plan
+ * @param ccServiceCtx
+ * - CC Service Context
*/
void contributeSchedulingConstraints(IConstraintAcceptor
constraintAcceptor, ICCServiceContext ccServiceCtx);
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
index 57a5f692b2..76cd48afa1 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/file/FileRemoveOperatorDescriptor.java
@@ -20,7 +20,6 @@
package org.apache.hyracks.dataflow.std.file;
import java.io.File;
-import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hyracks.api.comm.IFrameWriter;
@@ -32,36 +31,31 @@ import
org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
+import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
import
org.apache.hyracks.dataflow.std.base.AbstractSingleActivityOperatorDescriptor;
public class FileRemoveOperatorDescriptor extends
AbstractSingleActivityOperatorDescriptor {
+ private static final long serialVersionUID = 2L;
+
private final IFileSplitProvider fileSplitProvider;
private final boolean quietly;
+ private final int[][] partitionsMap;
public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec,
IFileSplitProvider fileSplitProvder,
- boolean quietly) {
+ boolean quietly, int[][] partitionsMap) {
super(spec, 0, 0);
this.fileSplitProvider = fileSplitProvder;
this.quietly = quietly;
+ this.partitionsMap = partitionsMap;
}
- /**
- *
- * @deprecated use {@link
#FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec,
IFileSplitProvider fileSplitProvder, boolean quietly)} instead.
- */
- @Deprecated
- public FileRemoveOperatorDescriptor(IOperatorDescriptorRegistry spec,
IFileSplitProvider fileSplitProvder) {
- this(spec, fileSplitProvder, false);
- }
-
- private static final long serialVersionUID = 1L;
-
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
- final FileSplit split = fileSplitProvider.getFileSplits()[partition];
+ final FileSplit[] splits = fileSplitProvider.getFileSplits();
+ final int[] splitsIndexes = partitionsMap[partition];
final IIOManager ioManager = ctx.getIoManager();
return new AbstractOperatorNodePushable() {
@@ -73,16 +67,7 @@ public class FileRemoveOperatorDescriptor extends
AbstractSingleActivityOperator
@Override
public void initialize() throws HyracksDataException {
// will only work for files inside the io devices
- File f = split.getFile(ioManager);
- if (quietly) {
- FileUtils.deleteQuietly(f);
- } else {
- try {
- FileUtils.deleteDirectory(f);
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- }
+ deleteFiles();
}
@Override
@@ -98,6 +83,25 @@ public class FileRemoveOperatorDescriptor extends
AbstractSingleActivityOperator
@Override
public void deinitialize() throws HyracksDataException {
}
+
+ private void deleteFiles() throws HyracksDataException {
+ Throwable failure = null;
+ for (int splitsIndex : splitsIndexes) {
+ try {
+ File file = splits[splitsIndex].getFile(ioManager);
+ if (quietly) {
+ FileUtils.deleteQuietly(file);
+ } else {
+ FileUtils.deleteDirectory(file);
+ }
+ } catch (Throwable th) {
+ failure = ExceptionUtils.suppress(failure, th);
+ }
+ }
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
+ }
+ }
};
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
index 7b687c44e1..1302bce0a1 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorDescriptor.java
@@ -30,9 +30,9 @@ public class ConstantTupleSourceOperatorDescriptor extends
AbstractSingleActivit
private static final long serialVersionUID = 1L;
- private int[] fieldSlots;
- private byte[] tupleData;
- private int tupleSize;
+ private final int[] fieldSlots;
+ private final byte[] tupleData;
+ private final int tupleSize;
public ConstantTupleSourceOperatorDescriptor(IOperatorDescriptorRegistry
spec, RecordDescriptor recDesc,
int[] fieldSlots, byte[] tupleData, int tupleSize) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
index 9bd0c598a1..785a33044c 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/misc/ConstantTupleSourceOperatorNodePushable.java
@@ -26,11 +26,11 @@ import
org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
public class ConstantTupleSourceOperatorNodePushable extends
AbstractUnaryOutputSourceOperatorNodePushable {
- private IHyracksTaskContext ctx;
- private int[] fieldSlots;
- private byte[] tupleData;
- private int tupleSize;
+ private final IHyracksTaskContext ctx;
+ private final int[] fieldSlots;
+ private final byte[] tupleData;
+ private final int tupleSize;
public ConstantTupleSourceOperatorNodePushable(IHyracksTaskContext ctx,
int[] fieldSlots, byte[] tupleData,
int tupleSize) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
index 6a31962654..68e4ac59b7 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/rtree/LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest.java
@@ -44,6 +44,7 @@ import
org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescripto
import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
import org.apache.hyracks.storage.common.IResourceFactory;
import org.apache.hyracks.test.support.TestStorageManagerComponentHolder;
+import org.apache.hyracks.test.support.TestUtils;
import org.apache.hyracks.tests.am.common.ITreeIndexOperatorTestHelper;
import org.apache.hyracks.tests.am.rtree.RTreeSecondaryIndexSearchOperatorTest;
import org.junit.Test;
@@ -95,7 +96,7 @@ public class
LSMRTreeWithAntiMatterTuplesSecondaryIndexSearchOperatorTest
int[] keyFields = { 0, 1, 2, 3 };
RTreeSearchOperatorDescriptor secondarySearchOp = new
RTreeSearchOperatorDescriptor(spec,
secondaryWithFilterRecDesc, keyFields, true, true,
secondaryHelperFactory, false, false, null,
- NoOpOperationCallbackFactory.INSTANCE, null, null, false,
null);
+ NoOpOperationCallbackFactory.INSTANCE, null, null, false,
null, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
secondarySearchOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new
FileSplit[] { createFile(nc1) });
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
index 0fcf8921a3..f233417318 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexInsertOperatorTest.java
@@ -43,6 +43,7 @@ import
org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import
org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -86,9 +87,9 @@ public class RTreeSecondaryIndexInsertOperatorTest extends
AbstractRTreeOperator
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
tb.getSize());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
keyProviderOp, NC1_ID);
int[] keyFields = { 0, 1, 2, 3 };
- RTreeSearchOperatorDescriptor secondarySearchOp =
- new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
keyFields, true, true, secondaryHelperFactory,
- false, false, null,
NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+ RTreeSearchOperatorDescriptor secondarySearchOp = new
RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+ keyFields, true, true, secondaryHelperFactory, false, false,
null,
+ NoOpOperationCallbackFactory.INSTANCE, null, null, false,
null, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
secondarySearchOp, NC1_ID);
// fifth field from the tuples coming from secondary index
int[] primaryLowKeyFields = { 4 };
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
index 3cfef19bda..2748988fea 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexScanOperatorTest.java
@@ -42,6 +42,7 @@ import
org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import
org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -84,9 +85,9 @@ public class RTreeSecondaryIndexScanOperatorTest extends
AbstractRTreeOperatorTe
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
tb.getSize());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
keyProviderOp, NC1_ID);
int[] keyFields = null;
- RTreeSearchOperatorDescriptor secondarySearchOp =
- new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
keyFields, true, true, secondaryHelperFactory,
- false, false, null,
NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+ RTreeSearchOperatorDescriptor secondarySearchOp = new
RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+ keyFields, true, true, secondaryHelperFactory, false, false,
null,
+ NoOpOperationCallbackFactory.INSTANCE, null, null, false,
null, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
secondarySearchOp, NC1_ID);
IFileSplitProvider outSplits = new ConstantFileSplitProvider(new
FileSplit[] { createFile(nc1) });
IOperatorDescriptor printer = new
PlainFileWriterOperatorDescriptor(spec, outSplits, ",");
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
index 8ea0701242..da5de77ccb 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/rtree/RTreeSecondaryIndexSearchOperatorTest.java
@@ -43,6 +43,7 @@ import
org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import
org.apache.hyracks.storage.am.rtree.dataflow.RTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.rtree.frames.RTreePolicyType;
import org.apache.hyracks.storage.common.IResourceFactory;
+import org.apache.hyracks.test.support.TestUtils;
import org.junit.Before;
import org.junit.Test;
@@ -84,9 +85,9 @@ public class RTreeSecondaryIndexSearchOperatorTest extends
AbstractRTreeOperator
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(),
tb.getSize());
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
keyProviderOp, NC1_ID);
int[] keyFields = { 0, 1, 2, 3 };
- RTreeSearchOperatorDescriptor secondarySearchOp =
- new RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
keyFields, true, true, secondaryHelperFactory,
- false, false, null,
NoOpOperationCallbackFactory.INSTANCE, null, null, false, null);
+ RTreeSearchOperatorDescriptor secondarySearchOp = new
RTreeSearchOperatorDescriptor(spec, secondaryRecDesc,
+ keyFields, true, true, secondaryHelperFactory, false, false,
null,
+ NoOpOperationCallbackFactory.INSTANCE, null, null, false,
null, TestUtils.getPartitionsMap(1));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
secondarySearchOp, NC1_ID);
// fifth field from the tuples coming from secondary index
int[] primaryLowKeyFields = { 4 };
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
index 88c06eb91a..c3719c744f 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexSearchOperatorNodePushable.java
@@ -131,6 +131,7 @@ public abstract class IndexSearchOperatorNodePushable
extends AbstractUnaryInput
ITuplePartitionerFactory tuplePartitionerFactory, int[][]
partitionsMap) throws HyracksDataException {
this.ctx = ctx;
this.appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
+ //TODO(partitioning) partitionsMap should not be null
this.partitions = partitionsMap != null ? partitionsMap[partition] :
new int[] { partition };
for (int i = 0; i < partitions.length; i++) {
storagePartitionId2Index.put(partitions[i], i);
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
index 099d668c6d..06dad1c093 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexDiskOrderScanOperatorDescriptor.java
@@ -37,6 +37,7 @@ public class TreeIndexDiskOrderScanOperatorDescriptor extends
AbstractSingleActi
public
TreeIndexDiskOrderScanOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor outRecDesc,
IIndexDataflowHelperFactory indexHelperFactory,
ISearchOperationCallbackFactory searchCallbackFactory) {
super(spec, 0, 1);
+ //TODO(maybe don't fix)
this.indexHelperFactory = indexHelperFactory;
this.searchCallbackFactory = searchCallbackFactory;
this.outRecDescs[0] = outRecDesc;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
index 6ed185892b..104eed4f0c 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/TreeIndexStatsOperatorDescriptor.java
@@ -40,6 +40,7 @@ public class TreeIndexStatsOperatorDescriptor extends
AbstractSingleActivityOper
public TreeIndexStatsOperatorDescriptor(IOperatorDescriptorRegistry spec,
IStorageManager storageManager,
IIndexDataflowHelperFactory indexHelperFactory) {
super(spec, 0, 1);
+ //TODO(maybe don't fix)
this.indexHelperFactory = indexHelperFactory;
this.storageManager = storageManager;
this.outRecDescs[0] = recDesc;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
index 291363d738..da41f7e205 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeDiskComponentScanOperatorDescriptor.java
@@ -38,6 +38,7 @@ public class LSMBTreeDiskComponentScanOperatorDescriptor
extends AbstractSingleA
public
LSMBTreeDiskComponentScanOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor outRecDesc,
IIndexDataflowHelperFactory indexHelperFactory,
ISearchOperationCallbackFactory searchCallbackFactory) {
super(spec, 1, 1);
+ //TODO(partitioning) correlated
this.indexHelperFactory = indexHelperFactory;
this.searchCallbackFactory = searchCallbackFactory;
this.outRecDescs[0] = outRecDesc;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
index 682ffefae4..139a87195d 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexCompactOperatorNodePushable.java
@@ -22,6 +22,7 @@ import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.std.base.AbstractOperatorNodePushable;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
@@ -30,16 +31,25 @@ import
org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
public class LSMIndexCompactOperatorNodePushable extends
AbstractOperatorNodePushable {
- private final IIndexDataflowHelper indexHelper;
+
+ private final IIndexDataflowHelper[] indexHelpers;
public LSMIndexCompactOperatorNodePushable(IHyracksTaskContext ctx, int
partition,
- IIndexDataflowHelperFactory indexHelperFactory) throws
HyracksDataException {
- this.indexHelper =
indexHelperFactory.create(ctx.getJobletContext().getServiceContext(),
partition);
+ IIndexDataflowHelperFactory indexHelperFactory, int[][]
partitionsMap) throws HyracksDataException {
+ int[] partitions = partitionsMap[partition];
+ indexHelpers = new IIndexDataflowHelper[partitions.length];
+ for (int i = 0; i < partitions.length; i++) {
+ indexHelpers[i] =
indexHelperFactory.create(ctx.getJobletContext().getServiceContext(),
partitions[i]);
+ }
+
}
@Override
public void deinitialize() throws HyracksDataException {
- indexHelper.close();
+ Throwable failure = CleanupUtils.close(indexHelpers, null);
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
+ }
}
@Override
@@ -54,10 +64,12 @@ public class LSMIndexCompactOperatorNodePushable extends
AbstractOperatorNodePus
@Override
public void initialize() throws HyracksDataException {
- indexHelper.open();
- ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
- ILSMIndexAccessor accessor =
index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- accessor.scheduleFullMerge();
+ for (IIndexDataflowHelper indexHelper : indexHelpers) {
+ indexHelper.open();
+ ILSMIndex index = (ILSMIndex) indexHelper.getIndexInstance();
+ ILSMIndexAccessor accessor =
index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ accessor.scheduleFullMerge();
+ }
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
index 6da4c8f5bc..aad1cc909f 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMTreeIndexCompactOperatorDescriptor.java
@@ -29,18 +29,20 @@ import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory
public class LSMTreeIndexCompactOperatorDescriptor extends
AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final IIndexDataflowHelperFactory indexHelperFactory;
+ private final int[][] partitionsMap;
public LSMTreeIndexCompactOperatorDescriptor(IOperatorDescriptorRegistry
spec,
- IIndexDataflowHelperFactory indexHelperFactory) {
+ IIndexDataflowHelperFactory indexHelperFactory, int[][]
partitionsMap) {
super(spec, 0, 0);
this.indexHelperFactory = indexHelperFactory;
+ this.partitionsMap = partitionsMap;
}
@Override
public IOperatorNodePushable createPushRuntime(IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, int partition, int
nPartitions) throws HyracksDataException {
- return new LSMIndexCompactOperatorNodePushable(ctx, partition,
indexHelperFactory);
+ return new LSMIndexCompactOperatorNodePushable(ctx, partition,
indexHelperFactory, partitionsMap);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
index cc6d76d9b1..8e7a7fedf2 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorDescriptor.java
@@ -32,7 +32,7 @@ import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory
public class RTreeSearchOperatorDescriptor extends
AbstractSingleActivityOperatorDescriptor {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
protected final int[] keyFields;
protected final boolean lowKeyInclusive;
protected final boolean highKeyInclusive;
@@ -45,6 +45,7 @@ public class RTreeSearchOperatorDescriptor extends
AbstractSingleActivityOperato
protected final boolean retainMissing;
protected final IMissingWriterFactory missingWriterFactory;
protected final ISearchOperationCallbackFactory searchCallbackFactory;
+ protected final int[][] partitionsMap;
protected boolean appendOpCallbackProceedResult;
protected byte[] searchCallbackProceedResultFalseValue;
protected byte[] searchCallbackProceedResultTrueValue;
@@ -53,10 +54,11 @@ public class RTreeSearchOperatorDescriptor extends
AbstractSingleActivityOperato
boolean lowKeyInclusive, boolean highKeyInclusive,
IIndexDataflowHelperFactory indexHelperFactory,
boolean retainInput, boolean retainMissing, IMissingWriterFactory
missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory, int[]
minFilterFieldIndexes,
- int[] maxFilterFieldIndexes, boolean appendIndexFilter,
IMissingWriterFactory nonFilterWriterFactory) {
+ int[] maxFilterFieldIndexes, boolean appendIndexFilter,
IMissingWriterFactory nonFilterWriterFactory,
+ int[][] partitionsMap) {
this(spec, outRecDesc, keyFields, lowKeyInclusive, highKeyInclusive,
indexHelperFactory, retainInput,
retainMissing, missingWriterFactory, searchCallbackFactory,
minFilterFieldIndexes,
- maxFilterFieldIndexes, appendIndexFilter,
nonFilterWriterFactory, false, null, null);
+ maxFilterFieldIndexes, appendIndexFilter,
nonFilterWriterFactory, false, null, null, partitionsMap);
}
public RTreeSearchOperatorDescriptor(IOperatorDescriptorRegistry spec,
RecordDescriptor outRecDesc, int[] keyFields,
@@ -65,7 +67,7 @@ public class RTreeSearchOperatorDescriptor extends
AbstractSingleActivityOperato
ISearchOperationCallbackFactory searchCallbackFactory, int[]
minFilterFieldIndexes,
int[] maxFilterFieldIndexes, boolean appendIndexFilter,
IMissingWriterFactory nonFilterWriterFactory,
boolean appendOpCallbackProceedResult, byte[]
searchCallbackProceedResultFalseValue,
- byte[] searchCallbackProceedResultTrueValue) {
+ byte[] searchCallbackProceedResultTrueValue, int[][]
partitionsMap) {
super(spec, 1, 1);
this.indexHelperFactory = indexHelperFactory;
this.retainInput = retainInput;
@@ -79,6 +81,7 @@ public class RTreeSearchOperatorDescriptor extends
AbstractSingleActivityOperato
this.maxFilterFieldIndexes = maxFilterFieldIndexes;
this.appendIndexFilter = appendIndexFilter;
this.nonFilterWriterFactory = nonFilterWriterFactory;
+ this.partitionsMap = partitionsMap;
this.outRecDescs[0] = outRecDesc;
this.appendOpCallbackProceedResult = appendOpCallbackProceedResult;
this.searchCallbackProceedResultFalseValue =
searchCallbackProceedResultFalseValue;
@@ -92,6 +95,6 @@ public class RTreeSearchOperatorDescriptor extends
AbstractSingleActivityOperato
recordDescProvider.getInputRecordDescriptor(getActivityId(),
0), keyFields, minFilterFieldIndexes,
maxFilterFieldIndexes, indexHelperFactory, retainInput,
retainMissing, missingWriterFactory,
searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, appendOpCallbackProceedResult,
- searchCallbackProceedResultFalseValue,
searchCallbackProceedResultTrueValue);
+ searchCallbackProceedResultFalseValue,
searchCallbackProceedResultTrueValue, partitionsMap);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
index aef624fa7b..7a7dc0e771 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-rtree/src/main/java/org/apache/hyracks/storage/am/rtree/dataflow/RTreeSearchOperatorNodePushable.java
@@ -47,12 +47,12 @@ public class RTreeSearchOperatorNodePushable extends
IndexSearchOperatorNodePush
IMissingWriterFactory missingWriterFactory,
ISearchOperationCallbackFactory searchCallbackFactory,
boolean appendIndexFilter, IMissingWriterFactory
nonFilterWriterFactory,
boolean appendOpCallbackProceedResult, byte[]
searchCallbackProceedResultFalseValue,
- byte[] searchCallbackProceedResultTrueValue) throws
HyracksDataException {
+ byte[] searchCallbackProceedResultTrueValue, int[][]
partitionsMap) throws HyracksDataException {
// TODO: predicate & limit pushdown not enabled for RTree yet
super(ctx, inputRecDesc, partition, minFilterFieldIndexes,
maxFilterFieldIndexes, indexHelperFactory,
retainInput, retainMissing, missingWriterFactory,
searchCallbackFactory, appendIndexFilter,
nonFilterWriterFactory, null, -1,
appendOpCallbackProceedResult, searchCallbackProceedResultFalseValue,
- searchCallbackProceedResultTrueValue,
DefaultTupleProjectorFactory.INSTANCE, null, null);
+ searchCallbackProceedResultTrueValue,
DefaultTupleProjectorFactory.INSTANCE, null, partitionsMap);
if (keyFields != null && keyFields.length > 0) {
searchKey = new PermutingFrameTupleReference();
searchKey.setFieldPermutation(keyFields);