This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 27f72040c4 [ASTERIXDB-3455][RT] Delete statement stuck indefinitely
27f72040c4 is described below
commit 27f72040c4ba84783084157be7f04b2e1af3ee31
Author: Peeyush Gupta <[email protected]>
AuthorDate: Thu Jul 11 13:39:16 2024 -0700
[ASTERIXDB-3455][RT] Delete statement stuck indefinitely
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
Use LSMPrimaryInsertNodePushable instead of
LSMInsertDeleteOperatorNodePushable for deletes.
Change-Id: I07c61009bd7bdce507151db7dccca195a6544c7f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18465
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Peeyush Gupta <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../asterix/app/bootstrap/TestNodeController.java | 2 +-
.../metadata/declared/MetadataProvider.java | 48 ++++++-------
.../LSMPrimaryInsertOperatorDescriptor.java | 8 +--
.../LSMPrimaryInsertOperatorNodePushable.java | 78 +++++++++++++---------
4 files changed, 69 insertions(+), 67 deletions(-)
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 668ff13caa..8aa25ad2ba 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -275,7 +275,7 @@ public class TestNodeController {
ctx.getTaskAttemptId().getTaskId().getPartition(),
indexHelperFactory, pkIndexHelperFactory,
primaryIndexInfo.primaryIndexInsertFieldsPermutations,
recordDesc, modOpCallbackFactory,
searchOpCallbackFactory, primaryKeyIndexes.length,
filterFields, null, tuplePartitionerFactory,
- partitionsMap);
+ partitionsMap, IndexOperation.UPSERT);
// For now, this assumes a single secondary index. recordDesc is
always <pk-record-meta>
// for the index, we will have to create an assign operator that
extract the sk
// then the secondary LSMInsertDeleteOperatorNodePushable
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 0a73ec9518..023b01cd53 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -157,7 +157,6 @@ import
org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.result.IResultMetadata;
import org.apache.hyracks.api.result.ResultSetId;
@@ -1138,28 +1137,28 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
BulkLoadUsage.LOAD, dataset.getDatasetId(), null,
partitionerFactory,
partitioningProperties.getComputeStorageMap());
} else {
+ ISearchOperationCallbackFactory searchCallbackFactory =
+ dataset.getSearchCallbackFactory(storageComponentProvider,
primaryIndex, indexOp, primaryKeyFields);
+
+ Optional<Index> primaryKeyIndex =
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
+ dataset.getDatabaseName(), dataset.getDataverseName(),
dataset.getDatasetName()).stream()
+ .filter(Index::isPrimaryKeyIndex).findFirst();
+ IIndexDataflowHelperFactory pkidfh = null;
+ if (primaryKeyIndex.isPresent()) {
+ PartitioningProperties idxPartitioningProperties =
+ getPartitioningProperties(dataset,
primaryKeyIndex.get().getIndexName());
+ pkidfh = new
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+ idxPartitioningProperties.getSplitsProvider());
+ }
if (indexOp == IndexOperation.INSERT) {
- ISearchOperationCallbackFactory searchCallbackFactory = dataset
- .getSearchCallbackFactory(storageComponentProvider,
primaryIndex, indexOp, primaryKeyFields);
-
- Optional<Index> primaryKeyIndex =
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
- dataset.getDatabaseName(), dataset.getDataverseName(),
dataset.getDatasetName()).stream()
- .filter(Index::isPrimaryKeyIndex).findFirst();
- IIndexDataflowHelperFactory pkidfh = null;
- if (primaryKeyIndex.isPresent()) {
- PartitioningProperties idxPartitioningProperties =
- getPartitioningProperties(dataset,
primaryKeyIndex.get().getIndexName());
- pkidfh = new
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
- idxPartitioningProperties.getSplitsProvider());
- }
op = createLSMPrimaryInsertOperatorDescriptor(spec,
inputRecordDesc, fieldPermutation, idfh, pkidfh,
modificationCallbackFactory, searchCallbackFactory,
numKeys, filterFields, partitionerFactory,
- partitioningProperties.getComputeStorageMap());
+ partitioningProperties.getComputeStorageMap(),
IndexOperation.UPSERT);
} else {
- op = createLSMTreeInsertDeleteOperatorDescriptor(spec,
inputRecordDesc, fieldPermutation, indexOp, idfh,
- null, true, modificationCallbackFactory,
partitionerFactory,
- partitioningProperties.getComputeStorageMap());
+ op = createLSMPrimaryInsertOperatorDescriptor(spec,
inputRecordDesc, fieldPermutation, idfh, pkidfh,
+ modificationCallbackFactory, searchCallbackFactory,
numKeys, filterFields, partitionerFactory,
+ partitioningProperties.getComputeStorageMap(),
IndexOperation.DELETE);
}
}
return new Pair<>(op, partitioningProperties.getConstraints());
@@ -1169,20 +1168,11 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
RecordDescriptor inputRecordDesc, int[] fieldPermutation,
IIndexDataflowHelperFactory idfh,
IIndexDataflowHelperFactory pkidfh,
IModificationOperationCallbackFactory modificationCallbackFactory,
ISearchOperationCallbackFactory searchCallbackFactory, int
numKeys, int[] filterFields,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][]
partitionsMap) {
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][]
partitionsMap, IndexOperation op) {
// this can be used by extensions to pick up their own operators
return new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc,
fieldPermutation, idfh, pkidfh,
modificationCallbackFactory, searchCallbackFactory, numKeys,
filterFields, tuplePartitionerFactory,
- partitionsMap);
- }
-
- protected LSMTreeInsertDeleteOperatorDescriptor
createLSMTreeInsertDeleteOperatorDescriptor(
- IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc,
int[] fieldPermutation, IndexOperation op,
- IIndexDataflowHelperFactory indexHelperFactory,
ITupleFilterFactory tupleFilterFactory, boolean isPrimary,
- IModificationOperationCallbackFactory modCallbackFactory,
ITuplePartitionerFactory tuplePartitionerFactory,
- int[][] partitionsMap) {
- return new LSMTreeInsertDeleteOperatorDescriptor(spec, outRecDesc,
fieldPermutation, op, indexHelperFactory,
- tupleFilterFactory, isPrimary, modCallbackFactory,
tuplePartitionerFactory, partitionsMap);
+ partitionsMap, op);
}
private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint>
getIndexModificationRuntime(IndexOperation indexOp,
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
index f46e1726b7..21f79daafa 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
@@ -45,9 +45,9 @@ public class LSMPrimaryInsertOperatorDescriptor extends
LSMTreeInsertDeleteOpera
IIndexDataflowHelperFactory keyIndexHelperFactory,
IModificationOperationCallbackFactory
modificationOpCallbackFactory,
ISearchOperationCallbackFactory searchOpCallbackFactory, int
numOfPrimaryKeys, int[] filterFields,
- ITuplePartitionerFactory tuplePartitionerFactory, int[][]
partitionsMap) {
- super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT,
indexHelperFactory, null, true,
- modificationOpCallbackFactory, tuplePartitionerFactory,
partitionsMap);
+ ITuplePartitionerFactory tuplePartitionerFactory, int[][]
partitionsMap, IndexOperation op) {
+ super(spec, outRecDesc, fieldPermutation, op, indexHelperFactory,
null, true, modificationOpCallbackFactory,
+ tuplePartitionerFactory, partitionsMap);
this.keyIndexHelperFactory = keyIndexHelperFactory;
this.searchOpCallbackFactory = searchOpCallbackFactory;
this.numOfPrimaryKeys = numOfPrimaryKeys;
@@ -60,6 +60,6 @@ public class LSMPrimaryInsertOperatorDescriptor extends
LSMTreeInsertDeleteOpera
RecordDescriptor intputRecDesc =
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
return new LSMPrimaryInsertOperatorNodePushable(ctx, partition,
indexHelperFactory, keyIndexHelperFactory,
fieldPermutation, intputRecDesc, modCallbackFactory,
searchOpCallbackFactory, numOfPrimaryKeys,
- filterFields, sourceLoc, tuplePartitionerFactory,
partitionsMap);
+ filterFields, sourceLoc, tuplePartitionerFactory,
partitionsMap, op);
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 7fb3369f4e..072a9b6ae7 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -107,10 +107,10 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
int[] fieldPermutation, RecordDescriptor inputRecDesc,
IModificationOperationCallbackFactory modCallbackFactory,
ISearchOperationCallbackFactory searchCallbackFactory, int
numOfPrimaryKeys, int[] filterFields,
- SourceLocation sourceLoc, ITuplePartitionerFactory
tuplePartitionerFactory, int[][] partitionsMap)
- throws HyracksDataException {
- super(ctx, partition, indexHelperFactory, fieldPermutation,
inputRecDesc, IndexOperation.UPSERT,
- modCallbackFactory, null, tuplePartitionerFactory,
partitionsMap);
+ SourceLocation sourceLoc, ITuplePartitionerFactory
tuplePartitionerFactory, int[][] partitionsMap,
+ IndexOperation op) throws HyracksDataException {
+ super(ctx, partition, indexHelperFactory, fieldPermutation,
inputRecDesc, op, modCallbackFactory, null,
+ tuplePartitionerFactory, partitionsMap);
this.sourceLoc = sourceLoc;
this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
this.searchCallbacks = new ISearchOperationCallback[partitions.length];
@@ -160,47 +160,59 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
// already processed; skip
return;
}
- keyTuple.reset(accessor, index);
- searchPred.reset(keyTuple, keyTuple, true, true,
keySearchCmp, keySearchCmp);
- boolean duplicate = false;
+ switch (op) {
+ case INSERT:
+ case UPSERT:
+ keyTuple.reset(accessor, index);
+ searchPred.reset(keyTuple, keyTuple, true, true,
keySearchCmp, keySearchCmp);
+ boolean duplicate = false;
- lsmAccessorForUniqunessCheck.search(cursor, searchPred);
- try {
- if (cursor.hasNext()) {
- // duplicate, skip
- if (searchCallback instanceof
LockThenSearchOperationCallback) {
- ((LockThenSearchOperationCallback)
searchCallback).release();
+ lsmAccessorForUniqunessCheck.search(cursor,
searchPred);
+ try {
+ if (cursor.hasNext()) {
+ // duplicate, skip
+ if (searchCallback instanceof
LockThenSearchOperationCallback) {
+ ((LockThenSearchOperationCallback)
searchCallback).release();
+ }
+ duplicate = true;
+ }
+ } finally {
+ cursor.close();
}
- duplicate = true;
- }
- } finally {
- cursor.close();
- }
- if (!duplicate) {
- beforeModification(tuple);
- ((ILSMIndexAccessor) indexAccessor).forceUpsert(tuple);
- if (lsmAccessorForKeyIndex != null) {
- lsmAccessorForKeyIndex.forceUpsert(keyTuple);
- }
- } else {
- // we should flush previous inserted records so that
these transactions can commit
- flushPartialFrame();
- // feed requires this nested exception to remove
duplicated tuples
- // TODO: a better way to treat duplicates?
- throw
HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
-
HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
+ if (!duplicate) {
+ beforeModification(tuple);
+ ((ILSMIndexAccessor)
indexAccessor).forceUpsert(tuple);
+ if (lsmAccessorForKeyIndex != null) {
+
lsmAccessorForKeyIndex.forceUpsert(keyTuple);
+ }
+ } else {
+ // we should flush previous inserted records
so that these transactions can commit
+ flushPartialFrame();
+ // feed requires this nested exception to
remove duplicated tuples
+ // TODO: a better way to treat duplicates?
+ throw
HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
+
HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
+ }
+ break;
+ case DELETE:
+ ((ILSMIndexAccessor)
indexAccessor).forceDelete(tuple);
+ break;
+ default:
+ throw
HyracksDataException.create(ErrorCode.INVALID_OPERATOR_OPERATION, sourceLoc,
+ op.toString(),
LSMPrimaryInsertOperatorNodePushable.class.getSimpleName());
+
}
processedTuples.add(index);
}
@Override
public void start() throws HyracksDataException {
- ((LSMTreeIndexAccessor)
indexAccessor).getCtx().setOperation(IndexOperation.UPSERT);
+ ((LSMTreeIndexAccessor)
indexAccessor).getCtx().setOperation(op);
}
@Override
public void finish() throws HyracksDataException {
- ((LSMTreeIndexAccessor)
indexAccessor).getCtx().setOperation(IndexOperation.UPSERT);
+ ((LSMTreeIndexAccessor)
indexAccessor).getCtx().setOperation(op);
}
@Override