This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 27f72040c4 [ASTERIXDB-3455][RT] Delete statement stuck indefinitely
27f72040c4 is described below

commit 27f72040c4ba84783084157be7f04b2e1af3ee31
Author: Peeyush Gupta <[email protected]>
AuthorDate: Thu Jul 11 13:39:16 2024 -0700

    [ASTERIXDB-3455][RT] Delete statement stuck indefinitely
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Use LSMPrimaryInsertNodePushable instead of
    LSMInsertDeleteOperatorNodePushable for deletes.
    
    Change-Id: I07c61009bd7bdce507151db7dccca195a6544c7f
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18465
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Peeyush Gupta <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../asterix/app/bootstrap/TestNodeController.java  |  2 +-
 .../metadata/declared/MetadataProvider.java        | 48 ++++++-------
 .../LSMPrimaryInsertOperatorDescriptor.java        |  8 +--
 .../LSMPrimaryInsertOperatorNodePushable.java      | 78 +++++++++++++---------
 4 files changed, 69 insertions(+), 67 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 668ff13caa..8aa25ad2ba 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -275,7 +275,7 @@ public class TestNodeController {
                     ctx.getTaskAttemptId().getTaskId().getPartition(), 
indexHelperFactory, pkIndexHelperFactory,
                     primaryIndexInfo.primaryIndexInsertFieldsPermutations, 
recordDesc, modOpCallbackFactory,
                     searchOpCallbackFactory, primaryKeyIndexes.length, 
filterFields, null, tuplePartitionerFactory,
-                    partitionsMap);
+                    partitionsMap, IndexOperation.UPSERT);
             // For now, this assumes a single secondary index. recordDesc is 
always <pk-record-meta>
             // for the index, we will have to create an assign operator that 
extract the sk
             // then the secondary LSMInsertDeleteOperatorNodePushable
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 0a73ec9518..023b01cd53 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -157,7 +157,6 @@ import 
org.apache.hyracks.api.dataflow.value.RecordDescriptor;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.api.io.FileSplit;
-import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.IResultMetadata;
 import org.apache.hyracks.api.result.ResultSetId;
@@ -1138,28 +1137,28 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                     BulkLoadUsage.LOAD, dataset.getDatasetId(), null, 
partitionerFactory,
                     partitioningProperties.getComputeStorageMap());
         } else {
+            ISearchOperationCallbackFactory searchCallbackFactory =
+                    dataset.getSearchCallbackFactory(storageComponentProvider, 
primaryIndex, indexOp, primaryKeyFields);
+
+            Optional<Index> primaryKeyIndex = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
+                    dataset.getDatabaseName(), dataset.getDataverseName(), 
dataset.getDatasetName()).stream()
+                    .filter(Index::isPrimaryKeyIndex).findFirst();
+            IIndexDataflowHelperFactory pkidfh = null;
+            if (primaryKeyIndex.isPresent()) {
+                PartitioningProperties idxPartitioningProperties =
+                        getPartitioningProperties(dataset, 
primaryKeyIndex.get().getIndexName());
+                pkidfh = new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
+                        idxPartitioningProperties.getSplitsProvider());
+            }
             if (indexOp == IndexOperation.INSERT) {
-                ISearchOperationCallbackFactory searchCallbackFactory = dataset
-                        .getSearchCallbackFactory(storageComponentProvider, 
primaryIndex, indexOp, primaryKeyFields);
-
-                Optional<Index> primaryKeyIndex = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx,
-                        dataset.getDatabaseName(), dataset.getDataverseName(), 
dataset.getDatasetName()).stream()
-                        .filter(Index::isPrimaryKeyIndex).findFirst();
-                IIndexDataflowHelperFactory pkidfh = null;
-                if (primaryKeyIndex.isPresent()) {
-                    PartitioningProperties idxPartitioningProperties =
-                            getPartitioningProperties(dataset, 
primaryKeyIndex.get().getIndexName());
-                    pkidfh = new 
IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(),
-                            idxPartitioningProperties.getSplitsProvider());
-                }
                 op = createLSMPrimaryInsertOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, idfh, pkidfh,
                         modificationCallbackFactory, searchCallbackFactory, 
numKeys, filterFields, partitionerFactory,
-                        partitioningProperties.getComputeStorageMap());
+                        partitioningProperties.getComputeStorageMap(), 
IndexOperation.UPSERT);
 
             } else {
-                op = createLSMTreeInsertDeleteOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, indexOp, idfh,
-                        null, true, modificationCallbackFactory, 
partitionerFactory,
-                        partitioningProperties.getComputeStorageMap());
+                op = createLSMPrimaryInsertOperatorDescriptor(spec, 
inputRecordDesc, fieldPermutation, idfh, pkidfh,
+                        modificationCallbackFactory, searchCallbackFactory, 
numKeys, filterFields, partitionerFactory,
+                        partitioningProperties.getComputeStorageMap(), 
IndexOperation.DELETE);
             }
         }
         return new Pair<>(op, partitioningProperties.getConstraints());
@@ -1169,20 +1168,11 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             RecordDescriptor inputRecordDesc, int[] fieldPermutation, 
IIndexDataflowHelperFactory idfh,
             IIndexDataflowHelperFactory pkidfh, 
IModificationOperationCallbackFactory modificationCallbackFactory,
             ISearchOperationCallbackFactory searchCallbackFactory, int 
numKeys, int[] filterFields,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] 
partitionsMap) {
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] 
partitionsMap, IndexOperation op) {
         // this can be used by extensions to pick up their own operators
         return new LSMPrimaryInsertOperatorDescriptor(spec, inputRecordDesc, 
fieldPermutation, idfh, pkidfh,
                 modificationCallbackFactory, searchCallbackFactory, numKeys, 
filterFields, tuplePartitionerFactory,
-                partitionsMap);
-    }
-
-    protected LSMTreeInsertDeleteOperatorDescriptor 
createLSMTreeInsertDeleteOperatorDescriptor(
-            IOperatorDescriptorRegistry spec, RecordDescriptor outRecDesc, 
int[] fieldPermutation, IndexOperation op,
-            IIndexDataflowHelperFactory indexHelperFactory, 
ITupleFilterFactory tupleFilterFactory, boolean isPrimary,
-            IModificationOperationCallbackFactory modCallbackFactory, 
ITuplePartitionerFactory tuplePartitionerFactory,
-            int[][] partitionsMap) {
-        return new LSMTreeInsertDeleteOperatorDescriptor(spec, outRecDesc, 
fieldPermutation, op, indexHelperFactory,
-                tupleFilterFactory, isPrimary, modCallbackFactory, 
tuplePartitionerFactory, partitionsMap);
+                partitionsMap, op);
     }
 
     private Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> 
getIndexModificationRuntime(IndexOperation indexOp,
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
index f46e1726b7..21f79daafa 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorDescriptor.java
@@ -45,9 +45,9 @@ public class LSMPrimaryInsertOperatorDescriptor extends 
LSMTreeInsertDeleteOpera
             IIndexDataflowHelperFactory keyIndexHelperFactory,
             IModificationOperationCallbackFactory 
modificationOpCallbackFactory,
             ISearchOperationCallbackFactory searchOpCallbackFactory, int 
numOfPrimaryKeys, int[] filterFields,
-            ITuplePartitionerFactory tuplePartitionerFactory, int[][] 
partitionsMap) {
-        super(spec, outRecDesc, fieldPermutation, IndexOperation.UPSERT, 
indexHelperFactory, null, true,
-                modificationOpCallbackFactory, tuplePartitionerFactory, 
partitionsMap);
+            ITuplePartitionerFactory tuplePartitionerFactory, int[][] 
partitionsMap, IndexOperation op) {
+        super(spec, outRecDesc, fieldPermutation, op, indexHelperFactory, 
null, true, modificationOpCallbackFactory,
+                tuplePartitionerFactory, partitionsMap);
         this.keyIndexHelperFactory = keyIndexHelperFactory;
         this.searchOpCallbackFactory = searchOpCallbackFactory;
         this.numOfPrimaryKeys = numOfPrimaryKeys;
@@ -60,6 +60,6 @@ public class LSMPrimaryInsertOperatorDescriptor extends 
LSMTreeInsertDeleteOpera
         RecordDescriptor intputRecDesc = 
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
         return new LSMPrimaryInsertOperatorNodePushable(ctx, partition, 
indexHelperFactory, keyIndexHelperFactory,
                 fieldPermutation, intputRecDesc, modCallbackFactory, 
searchOpCallbackFactory, numOfPrimaryKeys,
-                filterFields, sourceLoc, tuplePartitionerFactory, 
partitionsMap);
+                filterFields, sourceLoc, tuplePartitionerFactory, 
partitionsMap, op);
     }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 7fb3369f4e..072a9b6ae7 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -107,10 +107,10 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
             int[] fieldPermutation, RecordDescriptor inputRecDesc,
             IModificationOperationCallbackFactory modCallbackFactory,
             ISearchOperationCallbackFactory searchCallbackFactory, int 
numOfPrimaryKeys, int[] filterFields,
-            SourceLocation sourceLoc, ITuplePartitionerFactory 
tuplePartitionerFactory, int[][] partitionsMap)
-            throws HyracksDataException {
-        super(ctx, partition, indexHelperFactory, fieldPermutation, 
inputRecDesc, IndexOperation.UPSERT,
-                modCallbackFactory, null, tuplePartitionerFactory, 
partitionsMap);
+            SourceLocation sourceLoc, ITuplePartitionerFactory 
tuplePartitionerFactory, int[][] partitionsMap,
+            IndexOperation op) throws HyracksDataException {
+        super(ctx, partition, indexHelperFactory, fieldPermutation, 
inputRecDesc, op, modCallbackFactory, null,
+                tuplePartitionerFactory, partitionsMap);
         this.sourceLoc = sourceLoc;
         this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
         this.searchCallbacks = new ISearchOperationCallback[partitions.length];
@@ -160,47 +160,59 @@ public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                         // already processed; skip
                         return;
                     }
-                    keyTuple.reset(accessor, index);
-                    searchPred.reset(keyTuple, keyTuple, true, true, 
keySearchCmp, keySearchCmp);
-                    boolean duplicate = false;
+                    switch (op) {
+                        case INSERT:
+                        case UPSERT:
+                            keyTuple.reset(accessor, index);
+                            searchPred.reset(keyTuple, keyTuple, true, true, 
keySearchCmp, keySearchCmp);
+                            boolean duplicate = false;
 
-                    lsmAccessorForUniqunessCheck.search(cursor, searchPred);
-                    try {
-                        if (cursor.hasNext()) {
-                            // duplicate, skip
-                            if (searchCallback instanceof 
LockThenSearchOperationCallback) {
-                                ((LockThenSearchOperationCallback) 
searchCallback).release();
+                            lsmAccessorForUniqunessCheck.search(cursor, 
searchPred);
+                            try {
+                                if (cursor.hasNext()) {
+                                    // duplicate, skip
+                                    if (searchCallback instanceof 
LockThenSearchOperationCallback) {
+                                        ((LockThenSearchOperationCallback) 
searchCallback).release();
+                                    }
+                                    duplicate = true;
+                                }
+                            } finally {
+                                cursor.close();
                             }
-                            duplicate = true;
-                        }
-                    } finally {
-                        cursor.close();
-                    }
-                    if (!duplicate) {
-                        beforeModification(tuple);
-                        ((ILSMIndexAccessor) indexAccessor).forceUpsert(tuple);
-                        if (lsmAccessorForKeyIndex != null) {
-                            lsmAccessorForKeyIndex.forceUpsert(keyTuple);
-                        }
-                    } else {
-                        // we should flush previous inserted records so that 
these transactions can commit
-                        flushPartialFrame();
-                        // feed requires this nested exception to remove 
duplicated tuples
-                        // TODO: a better way to treat duplicates?
-                        throw 
HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
-                                
HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
+                            if (!duplicate) {
+                                beforeModification(tuple);
+                                ((ILSMIndexAccessor) 
indexAccessor).forceUpsert(tuple);
+                                if (lsmAccessorForKeyIndex != null) {
+                                    
lsmAccessorForKeyIndex.forceUpsert(keyTuple);
+                                }
+                            } else {
+                                // we should flush previous inserted records 
so that these transactions can commit
+                                flushPartialFrame();
+                                // feed requires this nested exception to 
remove duplicated tuples
+                                // TODO: a better way to treat duplicates?
+                                throw 
HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
+                                        
HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
+                            }
+                            break;
+                        case DELETE:
+                            ((ILSMIndexAccessor) 
indexAccessor).forceDelete(tuple);
+                            break;
+                        default:
+                            throw 
HyracksDataException.create(ErrorCode.INVALID_OPERATOR_OPERATION, sourceLoc,
+                                    op.toString(), 
LSMPrimaryInsertOperatorNodePushable.class.getSimpleName());
+
                     }
                     processedTuples.add(index);
                 }
 
                 @Override
                 public void start() throws HyracksDataException {
-                    ((LSMTreeIndexAccessor) 
indexAccessor).getCtx().setOperation(IndexOperation.UPSERT);
+                    ((LSMTreeIndexAccessor) 
indexAccessor).getCtx().setOperation(op);
                 }
 
                 @Override
                 public void finish() throws HyracksDataException {
-                    ((LSMTreeIndexAccessor) 
indexAccessor).getCtx().setOperation(IndexOperation.UPSERT);
+                    ((LSMTreeIndexAccessor) 
indexAccessor).getCtx().setOperation(op);
                 }
 
                 @Override

Reply via email to