This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 1b975b5599a0091bec3ee272194233912810c4e4
Merge: 49929014b3 33db60a5ae
Author: Michael Blow <[email protected]>
AuthorDate: Wed Nov 20 16:04:08 2024 -0500

    Merge branch 'gerrit/trinity' into 'gerrit/goldfish'
    
    Ext-ref: MB-64229,MB-63741
    Change-Id: Ia47a6630f71e75b181afe8cdddd5dcfdbbda4a05

 .../NoOpFrameOperationCallbackFactory.java         |  5 +++
 .../LSMPrimaryInsertOperatorNodePushable.java      | 10 ++++-
 .../LSMPrimaryUpsertOperatorNodePushable.java      | 16 +++++++-
 .../runtime/operators/StandardBatchController.java | 45 ++++++++++++++++++++++
 .../apache/hyracks/api/comm/IFrameAppender.java    |  2 +-
 .../hyracks/api/util/HyracksThrowingAction.java}   | 30 ++-------------
 .../org/apache/hyracks/api/util/InvokeUtil.java    | 36 +++++++++++++++++
 .../common/io/MessagingFrameTupleAppender.java     |  9 -----
 .../hyracks/dataflow/common/utils/TaskUtil.java    | 14 +++++++
 ...perationCallback.java => IBatchController.java} | 29 +++-----------
 .../am/lsm/common/api/IFrameOperationCallback.java | 14 ++++++-
 .../storage/am/lsm/common/api/ILSMHarness.java     |  9 ++++-
 .../storage/am/lsm/common/impls/LSMHarness.java    | 31 +++++++++++----
 .../am/lsm/common/impls/LSMTreeIndexAccessor.java  |  6 ++-
 14 files changed, 180 insertions(+), 76 deletions(-)

diff --cc 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 072a9b6ae7,5c879944fd..9b8b103d55
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@@ -18,15 -18,13 +18,17 @@@
   */
  package org.apache.asterix.runtime.operators;
  
+ import static 
org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER;
+ 
  import java.nio.ByteBuffer;
 +import java.util.HashMap;
 +import java.util.Map;
  
  import org.apache.asterix.common.api.INcApplicationContext;
 +import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
  import org.apache.asterix.common.dataflow.LSMIndexUtil;
  import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
 +import org.apache.asterix.common.messaging.AtomicJobPreparedMessage;
  import org.apache.asterix.common.transactions.ILogMarkerCallback;
  import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
  import 
org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
@@@ -57,51 -53,38 +59,53 @@@ import org.apache.hyracks.storage.am.co
  import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
  import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
  import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
 +import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils;
+ import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
  import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
  import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
  import 
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
  import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
 +import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
  import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
  import org.apache.hyracks.storage.common.IIndex;
  import org.apache.hyracks.storage.common.IIndexAccessParameters;
 +import org.apache.hyracks.storage.common.IIndexAccessor;
  import org.apache.hyracks.storage.common.IIndexCursor;
 +import org.apache.hyracks.storage.common.ISearchOperationCallback;
  import org.apache.hyracks.storage.common.MultiComparator;
  
 +import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 +import it.unimi.dsi.fastutil.ints.IntArrayList;
 +import it.unimi.dsi.fastutil.ints.IntList;
 +import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 +import it.unimi.dsi.fastutil.ints.IntSet;
 +
  public class LSMPrimaryInsertOperatorNodePushable extends 
LSMIndexInsertUpdateDeleteOperatorNodePushable {
  
 -    private final IIndexDataflowHelper keyIndexHelper;
 +    private final boolean[] keyIndexHelpersOpen;
 +    private final IIndexDataflowHelper[] keyIndexHelpers;
      private MultiComparator keySearchCmp;
      private RangePredicate searchPred;
 -    private IIndexCursor cursor;
 -    private LockThenSearchOperationCallback searchCallback;
 +    private final IIndexCursor[] cursors;
 +    private final ISearchOperationCallback[] searchCallbacks;
      private final ISearchOperationCallbackFactory searchCallbackFactory;
 -    private final IFrameTupleProcessor processor;
 -    private LSMTreeIndexAccessor lsmAccessor;
 -    private LSMTreeIndexAccessor lsmAccessorForKeyIndex;
 -    private LSMTreeIndexAccessor lsmAccessorForUniqunessCheck;
 +    private final IFrameTupleProcessor[] processors;
 +    private final LSMTreeIndexAccessor[] lsmAccessorForKeyIndexes;
 +    private final LSMTreeIndexAccessor[] lsmAccessorForUniqunessChecks;
  
 -    private final IFrameOperationCallback frameOpCallback;
 +    private final IFrameOperationCallback[] frameOpCallbacks;
-     private boolean flushedPartialTuples;
 +    private final PermutingFrameTupleReference keyTuple;
 +    private final Int2ObjectMap<IntSet> partition2TuplesMap = new 
Int2ObjectOpenHashMap<>();
 +    private final IntSet processedTuples = new IntOpenHashSet();
 +    private final IntSet flushedTuples = new IntOpenHashSet();
 +    private final SourceLocation sourceLoc;
+     private boolean flushedPartialTuples;
 -    private int currentTupleIdx;
 -    private int lastFlushedTupleIdx;
+     private IBatchController batchController;
  
 -    private final PermutingFrameTupleReference keyTuple;
 -
      public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int 
partition,
              IIndexDataflowHelperFactory indexHelperFactory, 
IIndexDataflowHelperFactory keyIndexHelperFactory,
              int[] fieldPermutation, RecordDescriptor inputRecDesc,
@@@ -228,62 -182,51 +232,63 @@@
          flushedPartialTuples = false;
          accessor = new FrameTupleAccessor(inputRecDesc);
          writeBuffer = new VSizeFrame(ctx);
 -        indexHelper.open();
 -        index = indexHelper.getIndexInstance();
 -        IIndex indexForUniquessCheck;
 -        if (keyIndexHelper != null) {
 -            keyIndexHelper.open();
 -            indexForUniquessCheck = keyIndexHelper.getIndexInstance();
 -        } else {
 -            indexForUniquessCheck = index;
 -        }
          try {
 -            if (ctx.getSharedObject() != null) {
 -                PrimaryIndexLogMarkerCallback callback = new 
PrimaryIndexLogMarkerCallback((AbstractLSMIndex) index);
 -                TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, 
callback, ctx);
 -            }
 -            frameOpCallback.open();
 +            INcApplicationContext appCtx =
 +                    (INcApplicationContext) 
ctx.getJobletContext().getServiceContext().getApplicationContext();
              writer.open();
 +            writerOpen = true;
 +            for (int i = 0; i < partitions.length; i++) {
 +                IIndexDataflowHelper indexHelper = indexHelpers[i];
 +                indexHelpersOpen[i] = true;
 +                indexHelper.open();
 +                indexes[i] = indexHelper.getIndexInstance();
 +                IIndex index = indexes[i];
 +                if (((ILSMIndex) indexes[i]).isAtomic()) {
 +                    ((PrimaryIndexOperationTracker) ((ILSMIndex) 
indexes[i]).getOperationTracker()).clear();
 +                }
 +                IIndexDataflowHelper keyIndexHelper = keyIndexHelpers[i];
 +                IIndex indexForUniquessCheck;
 +                if (keyIndexHelper != null) {
 +                    keyIndexHelpersOpen[i] = true;
 +                    keyIndexHelper.open();
 +                    indexForUniquessCheck = keyIndexHelper.getIndexInstance();
 +                } else {
 +                    indexForUniquessCheck = index;
 +                }
 +                if (ctx.getSharedObject() != null && i == 0) {
 +                    PrimaryIndexLogMarkerCallback callback =
 +                            new 
PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]);
 +                    TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, 
callback, ctx);
 +                }
 +                modCallbacks[i] =
 +                        
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(),
 ctx, this);
 +                searchCallbacks[i] = searchCallbackFactory
 +                        
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
 +                IIndexAccessParameters iap = new 
IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
 +                indexAccessors[i] = index.createAccessor(iap);
 +                if (keyIndexHelper != null) {
 +                    lsmAccessorForKeyIndexes[i] = (LSMTreeIndexAccessor) 
indexForUniquessCheck.createAccessor(iap);
 +                }
 +                frameOpCallbacks[i] = 
NoOpFrameOperationCallbackFactory.INSTANCE.createFrameOperationCallback(ctx,
 +                        (ILSMIndexAccessor) indexAccessors[i]);
 +                frameOpCallbacks[i].open();
 +                IIndexAccessParameters iapForUniquenessCheck =
 +                        new 
IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallbacks[i]);
 +                lsmAccessorForUniqunessChecks[i] =
 +                        (LSMTreeIndexAccessor) 
indexForUniquessCheck.createAccessor(iapForUniquenessCheck);
 +                setAtomicOpContextIfAtomic(indexForUniquessCheck, 
lsmAccessorForUniqunessChecks[i]);
 +
 +                cursors[i] = 
lsmAccessorForUniqunessChecks[i].createSearchCursor(false);
 +                LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
 +                        appCtx.getTransactionSubsystem().getLogManager());
 +            }
 +            createTupleProcessors(sourceLoc);
              keySearchCmp =
 -                    BTreeUtils.getSearchMultiComparator(((ITreeIndex) 
index).getComparatorFactories(), frameTuple);
 +                    BTreeUtils.getSearchMultiComparator(((ITreeIndex) 
indexes[0]).getComparatorFactories(), frameTuple);
              searchPred = new RangePredicate(frameTuple, frameTuple, true, 
true, keySearchCmp, keySearchCmp, null, null);
              appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
 -            modCallback =
 -                    
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(),
 ctx, this);
 -            searchCallback = (LockThenSearchOperationCallback) 
searchCallbackFactory
 -                    
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
 -            IIndexAccessParameters iap = new 
IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE);
 -            indexAccessor = index.createAccessor(iap);
 -            lsmAccessor = (LSMTreeIndexAccessor) indexAccessor;
 -            if (keyIndexHelper != null) {
 -                lsmAccessorForKeyIndex = (LSMTreeIndexAccessor) 
indexForUniquessCheck.createAccessor(iap);
 -            }
 -
 -            IIndexAccessParameters iapForUniquenessCheck =
 -                    new IndexAccessParameters(NoOpOperationCallback.INSTANCE, 
searchCallback);
 -            lsmAccessorForUniqunessCheck =
 -                    (LSMTreeIndexAccessor) 
indexForUniquessCheck.createAccessor(iapForUniquenessCheck);
 -
 -            cursor = lsmAccessorForUniqunessCheck.createSearchCursor(false);
              frameTuple = new FrameTupleReference();
 -            INcApplicationContext appCtx =
 -                    (INcApplicationContext) 
ctx.getJobletContext().getServiceContext().getApplicationContext();
 -            LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
 -                    appCtx.getTransactionSubsystem().getLogManager());
+             batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, 
ctx, StandardBatchController.INSTANCE);
          } catch (Throwable e) { // NOSONAR: Re-thrown
              throw HyracksDataException.create(e);
          }
@@@ -292,21 -235,7 +297,22 @@@
      @Override
      public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
          accessor.reset(buffer);
 -        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, 
batchController);
 +        partition2TuplesMap.clear();
 +        int itemCount = accessor.getTupleCount();
 +        for (int i = 0; i < itemCount; i++) {
 +            int storagePartition = tuplePartitioner.partition(accessor, i);
 +            int pIdx = storagePartitionId2Index.get(storagePartition);
 +            IntSet tupleIndexes = partition2TuplesMap.computeIfAbsent(pIdx, k 
-> new IntOpenHashSet());
 +            tupleIndexes.add(i);
 +        }
 +        for (Int2ObjectMap.Entry<IntSet> p2tuplesMapEntry : 
partition2TuplesMap.int2ObjectEntrySet()) {
 +            int pIdx = p2tuplesMapEntry.getIntKey();
 +            LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) 
indexAccessors[pIdx];
 +            IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
 +            IFrameTupleProcessor processor = processors[pIdx];
-             lsmAccessor.batchOperate(accessor, tuple, processor, 
frameOpCallback, p2tuplesMapEntry.getValue());
++            lsmAccessor.batchOperate(accessor, tuple, processor, 
frameOpCallback, batchController,
++                    p2tuplesMapEntry.getValue());
 +        }
  
          writeBuffer.ensureFrameSize(buffer.capacity());
          if (flushedPartialTuples) {
diff --cc 
asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 8bc2b1ce55,d2dc3cffb5..dd9f4070d2
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@@ -131,20 -110,17 +134,21 @@@ public class LSMPrimaryUpsertOperatorNo
      private final boolean hasMeta;
      private final int filterFieldIndex;
      private final int metaFieldIndex;
 -    protected LockThenSearchOperationCallback searchCallback;
 -    protected IFrameOperationCallback frameOpCallback;
 +    protected final ISearchOperationCallback[] searchCallbacks;
 +    protected final IFrameOperationCallback[] frameOpCallbacks;
 +    protected final ILSMTupleFilterCallback[] tupleFilterCallbacks;
      private final IFrameOperationCallbackFactory frameOpCallbackFactory;
 -    protected AbstractIndexModificationOperationCallback abstractModCallback;
      private final ISearchOperationCallbackFactory searchCallbackFactory;
 -    private final IFrameTupleProcessor processor;
 -    protected LSMTreeIndexAccessor lsmAccessor;
 +    private final IFrameTupleProcessor[] processors;
      private final ITracer tracer;
      private final long traceCategory;
 +    private final ITupleProjector tupleProjector;
-     private long lastRecordInTimeStamp = 0L;
 +    private final Int2ObjectMap<IntSet> partition2TuplesMap = new 
Int2ObjectOpenHashMap<>();
 +    private final boolean hasSecondaries;
 +    private final ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory;
 +    private final int[] fieldPermutation;
+     private long lastRecordInTimeStamp = 0L;
+     private IBatchController batchController;
  
      public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int 
partition,
              IIndexDataflowHelperFactory indexHelperFactory, int[] 
fieldPermutation, RecordDescriptor inputRecDesc,
@@@ -376,8 -312,11 +385,9 @@@
                      callback.open();
                  }
              };
 -            frameOpCallback.open();
 -            batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, 
ctx, StandardBatchController.INSTANCE);
 -        } catch (Throwable e) { // NOSONAR: Re-thrown
 -            throw HyracksDataException.create(e);
 +            frameOpCallbacks[i].open();
          }
++        batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, 
StandardBatchController.INSTANCE);
      }
  
      protected void resetSearchPredicate(int tupleIndex) {
@@@ -417,28 -353,8 +427,29 @@@
      @Override
      public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
          accessor.reset(buffer);
 +        partition2TuplesMap.clear();
          int itemCount = accessor.getTupleCount();
 -        lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, 
batchController);
 +        for (int i = 0; i < itemCount; i++) {
 +            int storagePartition = tuplePartitioner.partition(accessor, i);
 +            if 
(tupleFilterCallbacks[storagePartitionId2Index.get(storagePartition)].filter(accessor,
 i)) {
 +                continue;
 +            }
 +            int pIdx = storagePartitionId2Index.get(storagePartition);
 +            IntSet tupleIndexes = partition2TuplesMap.computeIfAbsent(pIdx, k 
-> new IntOpenHashSet());
 +            tupleIndexes.add(i);
 +        }
 +        // to ensure all partitions will be processed at least once, add 
partitions with missing tuples
 +        for (int partition : storagePartitionId2Index.values()) {
 +            partition2TuplesMap.computeIfAbsent(partition, k -> new 
IntOpenHashSet());
 +        }
 +        for (Int2ObjectMap.Entry<IntSet> p2tuplesMapEntry : 
partition2TuplesMap.int2ObjectEntrySet()) {
 +            int pIdx = p2tuplesMapEntry.getIntKey();
 +            LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) 
indexAccessors[pIdx];
 +            IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
 +            IFrameTupleProcessor processor = processors[pIdx];
-             lsmAccessor.batchOperate(accessor, tuple, processor, 
frameOpCallback, p2tuplesMapEntry.getValue());
++            lsmAccessor.batchOperate(accessor, tuple, processor, 
frameOpCallback, batchController,
++                    p2tuplesMapEntry.getValue());
 +        }
          if (itemCount > 0) {
              lastRecordInTimeStamp = System.currentTimeMillis();
          }
@@@ -586,42 -495,4 +597,43 @@@
          // No op since nextFrame flushes by default
      }
  
 +    // TODO: Refactor and remove duplicated code
 +    private void commitAtomicUpsert() throws HyracksDataException {
 +        final Map<String, ILSMComponentId> componentIdMap = new HashMap<>();
 +        boolean atomic = false;
 +        for (IIndex index : indexes) {
 +            if (index != null && ((ILSMIndex) index).isAtomic()) {
 +                PrimaryIndexOperationTracker opTracker =
 +                        ((PrimaryIndexOperationTracker) ((ILSMIndex) 
index).getOperationTracker());
 +                opTracker.finishAllFlush();
 +                for (Map.Entry<String, FlushOperation> entry : 
opTracker.getLastFlushOperation().entrySet()) {
 +                    componentIdMap.put(entry.getKey(), 
entry.getValue().getFlushingComponent().getId());
 +                }
 +                atomic = true;
 +            }
 +        }
 +
 +        if (atomic) {
 +            AtomicJobPreparedMessage message = new 
AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(),
 +                    ctx.getJobletContext().getServiceContext().getNodeId(), 
componentIdMap);
 +            try {
 +                ((NodeControllerService) 
ctx.getJobletContext().getServiceContext().getControllerService())
 +                        
.sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(),
 +                                JavaSerializationUtils.serialize(message), 
null);
 +            } catch (Exception e) {
 +                throw new ACIDException(e);
 +            }
 +        }
 +    }
 +
 +    private void abortAtomicUpsert() throws HyracksDataException {
 +        for (IIndex index : indexes) {
 +            if (index != null && ((ILSMIndex) index).isAtomic()) {
 +                PrimaryIndexOperationTracker opTracker =
 +                        ((PrimaryIndexOperationTracker) ((ILSMIndex) 
index).getOperationTracker());
 +                opTracker.abort();
 +            }
 +        }
 +    }
++
  }
diff --cc 
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
index 3ed17b138d,abe62c22a1..ad5e91909b
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java
@@@ -288,32 -287,42 +288,68 @@@ public class InvokeUtil 
          }
      }
  
+     @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) 
// catching Throwable, instanceofs
+     public static void tryHyracksWithCleanups(HyracksThrowingAction action, 
HyracksThrowingAction... cleanups)
+             throws HyracksDataException {
+         Throwable savedT = null;
+         boolean suppressedInterrupted = false;
+         try {
+             action.run();
+         } catch (Throwable t) {
+             savedT = t;
+         } finally {
+             for (HyracksThrowingAction cleanup : cleanups) {
+                 try {
+                     cleanup.run();
+                 } catch (Throwable t) {
+                     if (savedT != null) {
+                         savedT.addSuppressed(t);
+                         suppressedInterrupted = suppressedInterrupted || t 
instanceof InterruptedException;
+                     } else {
+                         savedT = t;
+                     }
+                 }
+             }
+         }
+         if (savedT == null) {
+             return;
+         }
+         if (suppressedInterrupted) {
+             Thread.currentThread().interrupt();
+         }
+         if (savedT instanceof Error) {
+             throw (Error) savedT;
+         } else {
+             throw HyracksDataException.create(savedT);
+         }
+     }
+ 
 +    public static void tryWithCleanupsUnchecked(Runnable action, Runnable... 
cleanups) {
 +        Throwable savedT = null;
 +        try {
 +            action.run();
 +        } catch (Throwable t) {
 +            savedT = t;
 +        } finally {
 +            for (Runnable cleanup : cleanups) {
 +                try {
 +                    cleanup.run();
 +                } catch (Throwable t) {
 +                    if (savedT != null) {
 +                        savedT.addSuppressed(t);
 +                    } else {
 +                        savedT = t;
 +                    }
 +                }
 +            }
 +        }
 +        if (savedT instanceof Error) {
 +            throw (Error) savedT;
 +        } else if (savedT != null) {
 +            throw new UncheckedExecutionException(savedT);
 +        }
 +    }
 +
      /**
       * Runs the supplied action, after suspending any pending interruption. 
An error will be logged if
       * the action is itself interrupted.
diff --cc 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 214d9dc8dc,68de45ac08..dbf34c9458
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@@ -215,22 -214,21 +215,29 @@@ public interface ILSMHarness 
      /**
       * Perform batch operation on all tuples in the passed frame tuple 
accessor
       *
 -     * @param ctx             the operation ctx
 -     * @param accessor        the frame tuple accessor
 -     * @param tuple           the mutable tuple used to pass the tuple to the 
processor
 -     * @param processor       the tuple processor
 -     * @param frameOpCallback the callback at the end of the frame
 +     * @param ctx
 +     *            the operation ctx
 +     * @param accessor
 +     *            the frame tuple accessor
 +     * @param tuple
 +     *            the mutable tuple used to pass the tuple to the processor
 +     * @param processor
 +     *            the tuple processor
 +     * @param frameOpCallback
 +     *            the callback at the end of the frame
 +     * @param tuples
 +     *            the indexes of tuples to process
+      * @param batchController
++     *            the controller of the batch lifecycle
       * @throws HyracksDataException
       */
      void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor 
accessor, FrameTupleReference tuple,
-             IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, Set<Integer> tuples)
 -            IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, IBatchController batchController)
 -            throws HyracksDataException;
++            IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, IBatchController batchController,
++            Set<Integer> tuples) throws HyracksDataException;
+ 
+     void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws 
HyracksDataException;
+ 
+     void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback 
callback, boolean success, LSMOperationType op)
              throws HyracksDataException;
  
      /**
diff --cc 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 2d040204f1,8ad67f7f85..d019a088a1
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@@ -715,13 -724,15 +728,15 @@@ public class LSMHarness implements ILSM
  
      @Override
      public void batchOperate(ILSMIndexOperationContext ctx, 
FrameTupleAccessor accessor, FrameTupleReference tuple,
-             IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, Set<Integer> tuples)
 -            IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, IBatchController batchController)
--            throws HyracksDataException {
++            IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, IBatchController batchController,
++            Set<Integer> tuples) throws HyracksDataException {
          processor.start();
-         enter(ctx);
+         batchController.batchEnter(ctx, this, frameOpCallback);
+         boolean success = false;
          try {
              try {
 -                processFrame(accessor, tuple, processor);
 +                processFrame(accessor, tuple, processor, tuples);
+                 success = true;
                  frameOpCallback.frameCompleted();
              } catch (Throwable th) {
                  processor.fail(th);
diff --cc 
hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
index fb5984da61,e688727f17..c768768c0a
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java
@@@ -211,8 -211,8 +212,9 @@@ public class LSMTreeIndexAccessor imple
      }
  
      public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference 
tuple, IFrameTupleProcessor processor,
-             IFrameOperationCallback frameOpCallback, Set<Integer> tuples) 
throws HyracksDataException {
-         lsmHarness.batchOperate(ctx, accessor, tuple, processor, 
frameOpCallback, tuples);
 -            IFrameOperationCallback frameOpCallback, IBatchController 
batchController) throws HyracksDataException {
 -        lsmHarness.batchOperate(ctx, accessor, tuple, processor, 
frameOpCallback, batchController);
++            IFrameOperationCallback frameOpCallback, IBatchController 
batchController, Set<Integer> tuples)
++            throws HyracksDataException {
++        lsmHarness.batchOperate(ctx, accessor, tuple, processor, 
frameOpCallback, batchController, tuples);
      }
  
      @Override

Reply via email to