This is an automated email from the ASF dual-hosted git repository.
mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9acfce145e [ASTERIXDB-3144][HYR][RT] Make dump_index() function
support multiple partitions
9acfce145e is described below
commit 9acfce145e846edcf719a009b8b925d027354604
Author: Ali Alsuliman <[email protected]>
AuthorDate: Fri May 5 17:21:15 2023 -0700
[ASTERIXDB-3144][HYR][RT] Make dump_index() function support multiple
partitions
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
This patch changes the dump_index() function to support
operating on multiple partitions.
Change-Id: I8754069a7340c0d9e3bf69e1fe5c94eb333b73b5
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17513
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../asterix/app/function/DumpIndexDatasource.java | 13 +++--
.../asterix/app/function/DumpIndexFunction.java | 14 +++--
.../asterix/app/function/DumpIndexReader.java | 62 ++++++++++++++--------
.../asterix/app/function/DumpIndexRewriter.java | 9 ++--
.../LSMSecondaryIndexBulkLoadNodePushable.java | 2 +-
5 files changed, 65 insertions(+), 35 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
index 691be4706c..bd72a66d66 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexDatasource.java
@@ -39,27 +39,30 @@ public class DumpIndexDatasource extends FunctionDataSource
{
private final IndexDataflowHelperFactory indexDataflowHelperFactory;
private final RecordDescriptor recDesc;
private final IBinaryComparatorFactory[] comparatorFactories;
- private final AlgebricksAbsolutePartitionConstraint storageLocations;
+ private final AlgebricksAbsolutePartitionConstraint constraint;
+ private final int[][] partitionsMap;
public DumpIndexDatasource(INodeDomain domain, IndexDataflowHelperFactory
indexDataflowHelperFactory,
RecordDescriptor recDesc, IBinaryComparatorFactory[]
comparatorFactories,
- AlgebricksAbsolutePartitionConstraint storageLocations) throws
AlgebricksException {
+ AlgebricksAbsolutePartitionConstraint constraint, int[][]
partitionsMap) throws AlgebricksException {
super(DUMP_INDEX_DATASOURCE_ID, DumpIndexRewriter.DUMP_INDEX, domain);
this.indexDataflowHelperFactory = indexDataflowHelperFactory;
this.recDesc = recDesc;
this.comparatorFactories = comparatorFactories;
- this.storageLocations = storageLocations;
+ this.constraint = constraint;
+ this.partitionsMap = partitionsMap;
}
@Override
protected AlgebricksAbsolutePartitionConstraint
getLocations(IClusterStateManager csm) {
- return storageLocations;
+ return constraint;
}
@Override
protected IDatasourceFunction createFunction(MetadataProvider
metadataProvider,
AlgebricksAbsolutePartitionConstraint locations) {
- return new DumpIndexFunction(locations, indexDataflowHelperFactory,
recDesc, comparatorFactories);
+ return new DumpIndexFunction(locations, indexDataflowHelperFactory,
recDesc, comparatorFactories,
+ partitionsMap);
}
@Override
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
index 2fdbef3f75..fcfe3e0b96 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexFunction.java
@@ -31,25 +31,31 @@ import
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
public class DumpIndexFunction extends AbstractDatasourceFunction {
- private static final long serialVersionUID = 1L;
+ private static final long serialVersionUID = 2L;
private final IndexDataflowHelperFactory indexDataflowHelperFactory;
private final RecordDescriptor recDesc;
private final IBinaryComparatorFactory[] comparatorFactories;
+ private final int[][] partitionsMap;
public DumpIndexFunction(AlgebricksAbsolutePartitionConstraint locations,
IndexDataflowHelperFactory indexDataflowHelperFactory,
RecordDescriptor recDesc,
- IBinaryComparatorFactory[] comparatorFactories) {
+ IBinaryComparatorFactory[] comparatorFactories, int[][]
partitionsMap) {
super(locations);
this.indexDataflowHelperFactory = indexDataflowHelperFactory;
this.recDesc = recDesc;
this.comparatorFactories = comparatorFactories;
+ this.partitionsMap = partitionsMap;
}
@Override
public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx,
int partition)
throws HyracksDataException {
INCServiceContext serviceCtx =
ctx.getJobletContext().getServiceContext();
- final IIndexDataflowHelper indexDataflowHelper =
indexDataflowHelperFactory.create(serviceCtx, partition);
- return new DumpIndexReader(indexDataflowHelper, recDesc,
comparatorFactories);
+ int[] partitions = partitionsMap[partition];
+ final IIndexDataflowHelper[] indexDataflowHelpers = new
IIndexDataflowHelper[partitions.length];
+ for (int i = 0; i < partitions.length; i++) {
+ indexDataflowHelpers[i] =
indexDataflowHelperFactory.create(serviceCtx, partitions[i]);
+ }
+ return new DumpIndexReader(indexDataflowHelpers, recDesc,
comparatorFactories);
}
}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
index 8ef094e923..5c5a218084 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexReader.java
@@ -34,12 +34,13 @@ import org.apache.asterix.om.types.ATypeTag;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.util.ByteBufferInputStream;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
-import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.MultiComparator;
@@ -48,38 +49,50 @@ import org.apache.hyracks.util.JSONUtil;
public class DumpIndexReader extends FunctionReader {
private final CharArrayRecord record;
- private final IIndexCursor searchCursor;
+ private final IIndexCursor[] searchCursors;
private final RecordDescriptor secondaryRecDesc;
private final StringBuilder recordBuilder = new StringBuilder();
private final ByteBufferInputStream bbis = new ByteBufferInputStream();
private final DataInputStream dis = new DataInputStream(bbis);
- private final IIndexDataflowHelper indexDataflowHelper;
- private final IIndexAccessor accessor;
+ private final IIndexDataflowHelper[] indexDataflowHelpers;
+ private final IIndexAccessor[] accessors;
+ private int currentSearchIdx;
- public DumpIndexReader(IIndexDataflowHelper indexDataflowHelper,
RecordDescriptor secondaryRecDesc,
+ public DumpIndexReader(IIndexDataflowHelper[] indexDataflowHelpers,
RecordDescriptor secondaryRecDesc,
IBinaryComparatorFactory[] comparatorFactories) throws
HyracksDataException {
- this.indexDataflowHelper = indexDataflowHelper;
+ this.indexDataflowHelpers = indexDataflowHelpers;
this.secondaryRecDesc = secondaryRecDesc;
- indexDataflowHelper.open();
- IIndex indexInstance = indexDataflowHelper.getIndexInstance();
- accessor =
indexInstance.createAccessor(NoOpIndexAccessParameters.INSTANCE);
- searchCursor = accessor.createSearchCursor(false);
MultiComparator searchMultiComparator =
MultiComparator.create(comparatorFactories);
RangePredicate rangePredicate =
new RangePredicate(null, null, true, true,
searchMultiComparator, searchMultiComparator, null, null);
- accessor.search(searchCursor, rangePredicate);
+ this.accessors = new IIndexAccessor[indexDataflowHelpers.length];
+ this.searchCursors = new IIndexCursor[indexDataflowHelpers.length];
+ for (int i = 0; i < indexDataflowHelpers.length; i++) {
+ IIndexDataflowHelper indexDataflowHelper = indexDataflowHelpers[i];
+ indexDataflowHelper.open();
+ accessors[i] =
indexDataflowHelper.getIndexInstance().createAccessor(NoOpIndexAccessParameters.INSTANCE);
+ searchCursors[i] = accessors[i].createSearchCursor(false);
+ accessors[i].search(searchCursors[i], rangePredicate);
+ }
+ currentSearchIdx = 0;
record = new CharArrayRecord();
}
@Override
public boolean hasNext() throws Exception {
- return searchCursor.hasNext();
+ while (currentSearchIdx < searchCursors.length) {
+ if (searchCursors[currentSearchIdx].hasNext()) {
+ return true;
+ }
+ currentSearchIdx++;
+ }
+ return false;
}
@Override
public IRawRecord<char[]> next() throws IOException, InterruptedException {
- searchCursor.next();
- ITupleReference tuple = searchCursor.getTuple();
+ searchCursors[currentSearchIdx].next();
+ ITupleReference tuple = searchCursors[currentSearchIdx].getTuple();
buildJsonRecord(tuple);
record.reset();
record.append(recordBuilder.toString().toCharArray());
@@ -89,16 +102,21 @@ public class DumpIndexReader extends FunctionReader {
@Override
public void close() throws IOException {
- bbis.close();
- dis.close();
- if (searchCursor != null) {
- searchCursor.close();
- searchCursor.destroy();
+ Throwable failure = releaseResources();
+ if (failure != null) {
+ throw HyracksDataException.create(failure);
}
- if (accessor != null) {
- accessor.destroy();
+ }
+
+ private Throwable releaseResources() {
+ Throwable failure = CleanupUtils.close(bbis, null);
+ failure = CleanupUtils.close(dis, failure);
+ for (int i = 0; i < indexDataflowHelpers.length; i++) {
+ failure = ResourceReleaseUtils.close(searchCursors[i], failure);
+ failure = CleanupUtils.destroy(failure, searchCursors[i],
accessors[i]);
+ failure = ResourceReleaseUtils.close(indexDataflowHelpers[i],
failure);
}
- indexDataflowHelper.close();
+ return failure;
}
private void buildJsonRecord(ITupleReference tuple) throws
HyracksDataException {
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
index 6c0382d52b..dac1ac7224 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/DumpIndexRewriter.java
@@ -18,6 +18,7 @@
*/
package org.apache.asterix.app.function;
+import org.apache.asterix.common.cluster.PartitioningProperties;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.functions.FunctionConstants;
@@ -66,13 +67,15 @@ public class DumpIndexRewriter extends FunctionRewriter {
}
ISecondaryIndexOperationsHelper secondaryIndexHelper =
SecondaryIndexOperationsHelper.createIndexOperationsHelper(dataset, index,
metadataProvider, loc);
+ PartitioningProperties partitioningProperties =
+ metadataProvider.getPartitioningProperties(dataset,
index.getIndexName());
IndexDataflowHelperFactory indexDataflowHelperFactory =
new
IndexDataflowHelperFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
- secondaryIndexHelper.getSecondaryFileSplitProvider());
+ partitioningProperties.getSpiltsProvider());
AlgebricksAbsolutePartitionConstraint secondaryPartitionConstraint =
- (AlgebricksAbsolutePartitionConstraint)
secondaryIndexHelper.getSecondaryPartitionConstraint();
+ (AlgebricksAbsolutePartitionConstraint)
partitioningProperties.getConstraints();
return new DumpIndexDatasource(context.getComputationNodeDomain(),
indexDataflowHelperFactory,
secondaryIndexHelper.getSecondaryRecDesc(),
secondaryIndexHelper.getSecondaryComparatorFactories(),
- secondaryPartitionConstraint);
+ secondaryPartitionConstraint,
partitioningProperties.getComputeStorageMap());
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
index 1b4fd23824..64dce1ea27 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryIndexBulkLoadNodePushable.java
@@ -63,7 +63,7 @@ public class LSMSecondaryIndexBulkLoadNodePushable extends
AbstractLSMSecondaryI
IIndexDataflowHelperFactory secondaryIndexHelperFactory, int[]
fieldPermutation, int numTagFields,
int numSecondaryKeys, int numPrimaryKeys, boolean hasBuddyBTree)
throws HyracksDataException {
super(ctx, partition, inputRecDesc, numTagFields, numSecondaryKeys,
numPrimaryKeys, hasBuddyBTree);
-
+ //TODO(partitioning) correlated
this.primaryIndexHelper =
primaryIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(),
partition);
this.secondaryIndexHelper =