This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 1b975b5599a0091bec3ee272194233912810c4e4 Merge: 49929014b3 33db60a5ae Author: Michael Blow <[email protected]> AuthorDate: Wed Nov 20 16:04:08 2024 -0500 Merge branch 'gerrit/trinity' into 'gerrit/goldfish' Ext-ref: MB-64229,MB-63741 Change-Id: Ia47a6630f71e75b181afe8cdddd5dcfdbbda4a05 .../NoOpFrameOperationCallbackFactory.java | 5 +++ .../LSMPrimaryInsertOperatorNodePushable.java | 10 ++++- .../LSMPrimaryUpsertOperatorNodePushable.java | 16 +++++++- .../runtime/operators/StandardBatchController.java | 45 ++++++++++++++++++++++ .../apache/hyracks/api/comm/IFrameAppender.java | 2 +- .../hyracks/api/util/HyracksThrowingAction.java} | 30 ++------------- .../org/apache/hyracks/api/util/InvokeUtil.java | 36 +++++++++++++++++ .../common/io/MessagingFrameTupleAppender.java | 9 ----- .../hyracks/dataflow/common/utils/TaskUtil.java | 14 +++++++ ...perationCallback.java => IBatchController.java} | 29 +++----------- .../am/lsm/common/api/IFrameOperationCallback.java | 14 ++++++- .../storage/am/lsm/common/api/ILSMHarness.java | 9 ++++- .../storage/am/lsm/common/impls/LSMHarness.java | 31 +++++++++++---- .../am/lsm/common/impls/LSMTreeIndexAccessor.java | 6 ++- 14 files changed, 180 insertions(+), 76 deletions(-) diff --cc asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java index 072a9b6ae7,5c879944fd..9b8b103d55 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java @@@ -18,15 -18,13 +18,17 @@@ */ package org.apache.asterix.runtime.operators; + import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER; + import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.context.PrimaryIndexOperationTracker; import org.apache.asterix.common.dataflow.LSMIndexUtil; import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory; +import org.apache.asterix.common.messaging.AtomicJobPreparedMessage; import org.apache.asterix.common.transactions.ILogMarkerCallback; import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback; import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback; @@@ -57,51 -53,38 +59,53 @@@ import org.apache.hyracks.storage.am.co import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.common.util.ResourceReleaseUtils; + import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable; import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; +import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation; import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor; import org.apache.hyracks.storage.common.IIndex; import org.apache.hyracks.storage.common.IIndexAccessParameters; +import org.apache.hyracks.storage.common.IIndexAccessor; import org.apache.hyracks.storage.common.IIndexCursor; +import org.apache.hyracks.storage.common.ISearchOperationCallback; import org.apache.hyracks.storage.common.MultiComparator; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.ints.IntArrayList; +import it.unimi.dsi.fastutil.ints.IntList; +import it.unimi.dsi.fastutil.ints.IntOpenHashSet; +import it.unimi.dsi.fastutil.ints.IntSet; + public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable { - private final IIndexDataflowHelper keyIndexHelper; + private final boolean[] keyIndexHelpersOpen; + private final IIndexDataflowHelper[] keyIndexHelpers; private MultiComparator keySearchCmp; private RangePredicate searchPred; - private IIndexCursor cursor; - private LockThenSearchOperationCallback searchCallback; + private final IIndexCursor[] cursors; + private final ISearchOperationCallback[] searchCallbacks; private final ISearchOperationCallbackFactory searchCallbackFactory; - private final IFrameTupleProcessor processor; - private LSMTreeIndexAccessor lsmAccessor; - private LSMTreeIndexAccessor lsmAccessorForKeyIndex; - private LSMTreeIndexAccessor lsmAccessorForUniqunessCheck; + private final IFrameTupleProcessor[] processors; + private final LSMTreeIndexAccessor[] lsmAccessorForKeyIndexes; + private final LSMTreeIndexAccessor[] lsmAccessorForUniqunessChecks; - private final IFrameOperationCallback frameOpCallback; + private final IFrameOperationCallback[] frameOpCallbacks; - private boolean flushedPartialTuples; + private final PermutingFrameTupleReference keyTuple; + private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>(); + private final IntSet processedTuples = new IntOpenHashSet(); + private final IntSet flushedTuples = new IntOpenHashSet(); + private final SourceLocation sourceLoc; + private boolean flushedPartialTuples; - private int currentTupleIdx; - private int lastFlushedTupleIdx; + private IBatchController batchController; - private final PermutingFrameTupleReference keyTuple; - public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int partition, IIndexDataflowHelperFactory indexHelperFactory, IIndexDataflowHelperFactory keyIndexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc, @@@ -228,62 -182,51 +232,63 @@@ flushedPartialTuples = false; accessor = new FrameTupleAccessor(inputRecDesc); writeBuffer = new VSizeFrame(ctx); - indexHelper.open(); - index = indexHelper.getIndexInstance(); - IIndex indexForUniquessCheck; - if (keyIndexHelper != null) { - keyIndexHelper.open(); - indexForUniquessCheck = keyIndexHelper.getIndexInstance(); - } else { - indexForUniquessCheck = index; - } try { - if (ctx.getSharedObject() != null) { - PrimaryIndexLogMarkerCallback callback = new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) index); - TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx); - } - frameOpCallback.open(); + INcApplicationContext appCtx = + (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); writer.open(); + writerOpen = true; + for (int i = 0; i < partitions.length; i++) { + IIndexDataflowHelper indexHelper = indexHelpers[i]; + indexHelpersOpen[i] = true; + indexHelper.open(); + indexes[i] = indexHelper.getIndexInstance(); + IIndex index = indexes[i]; + if (((ILSMIndex) indexes[i]).isAtomic()) { + ((PrimaryIndexOperationTracker) ((ILSMIndex) indexes[i]).getOperationTracker()).clear(); + } + IIndexDataflowHelper keyIndexHelper = keyIndexHelpers[i]; + IIndex indexForUniquessCheck; + if (keyIndexHelper != null) { + keyIndexHelpersOpen[i] = true; + keyIndexHelper.open(); + indexForUniquessCheck = keyIndexHelper.getIndexInstance(); + } else { + indexForUniquessCheck = index; + } + if (ctx.getSharedObject() != null && i == 0) { + PrimaryIndexLogMarkerCallback callback = + new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]); + TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx); + } + modCallbacks[i] = + modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this); + searchCallbacks[i] = searchCallbackFactory + .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this); + IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE); + indexAccessors[i] = index.createAccessor(iap); + if (keyIndexHelper != null) { + lsmAccessorForKeyIndexes[i] = (LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iap); + } + frameOpCallbacks[i] = NoOpFrameOperationCallbackFactory.INSTANCE.createFrameOperationCallback(ctx, + (ILSMIndexAccessor) indexAccessors[i]); + frameOpCallbacks[i].open(); + IIndexAccessParameters iapForUniquenessCheck = + new IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallbacks[i]); + lsmAccessorForUniqunessChecks[i] = + (LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iapForUniquenessCheck); + setAtomicOpContextIfAtomic(indexForUniquessCheck, lsmAccessorForUniqunessChecks[i]); + + cursors[i] = lsmAccessorForUniqunessChecks[i].createSearchCursor(false); + LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index, + appCtx.getTransactionSubsystem().getLogManager()); + } + createTupleProcessors(sourceLoc); keySearchCmp = - BTreeUtils.getSearchMultiComparator(((ITreeIndex) index).getComparatorFactories(), frameTuple); + BTreeUtils.getSearchMultiComparator(((ITreeIndex) indexes[0]).getComparatorFactories(), frameTuple); searchPred = new RangePredicate(frameTuple, frameTuple, true, true, keySearchCmp, keySearchCmp, null, null); appender = new FrameTupleAppender(new VSizeFrame(ctx), true); - modCallback = - modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this); - searchCallback = (LockThenSearchOperationCallback) searchCallbackFactory - .createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this); - IIndexAccessParameters iap = new IndexAccessParameters(modCallback, NoOpOperationCallback.INSTANCE); - indexAccessor = index.createAccessor(iap); - lsmAccessor = (LSMTreeIndexAccessor) indexAccessor; - if (keyIndexHelper != null) { - lsmAccessorForKeyIndex = (LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iap); - } - - IIndexAccessParameters iapForUniquenessCheck = - new IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallback); - lsmAccessorForUniqunessCheck = - (LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iapForUniquenessCheck); - - cursor = lsmAccessorForUniqunessCheck.createSearchCursor(false); frameTuple = new FrameTupleReference(); - INcApplicationContext appCtx = - (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); - LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index, - appCtx.getTransactionSubsystem().getLogManager()); + batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE); } catch (Throwable e) { // NOSONAR: Re-thrown throw HyracksDataException.create(e); } @@@ -292,21 -235,7 +297,22 @@@ @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController); + partition2TuplesMap.clear(); + int itemCount = accessor.getTupleCount(); + for (int i = 0; i < itemCount; i++) { + int storagePartition = tuplePartitioner.partition(accessor, i); + int pIdx = storagePartitionId2Index.get(storagePartition); + IntSet tupleIndexes = partition2TuplesMap.computeIfAbsent(pIdx, k -> new IntOpenHashSet()); + tupleIndexes.add(i); + } + for (Int2ObjectMap.Entry<IntSet> p2tuplesMapEntry : partition2TuplesMap.int2ObjectEntrySet()) { + int pIdx = p2tuplesMapEntry.getIntKey(); + LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx]; + IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx]; + IFrameTupleProcessor processor = processors[pIdx]; - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue()); ++ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController, ++ p2tuplesMapEntry.getValue()); + } writeBuffer.ensureFrameSize(buffer.capacity()); if (flushedPartialTuples) { diff --cc asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index 8bc2b1ce55,d2dc3cffb5..dd9f4070d2 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@@ -131,20 -110,17 +134,21 @@@ public class LSMPrimaryUpsertOperatorNo private final boolean hasMeta; private final int filterFieldIndex; private final int metaFieldIndex; - protected LockThenSearchOperationCallback searchCallback; - protected IFrameOperationCallback frameOpCallback; + protected final ISearchOperationCallback[] searchCallbacks; + protected final IFrameOperationCallback[] frameOpCallbacks; + protected final ILSMTupleFilterCallback[] tupleFilterCallbacks; private final IFrameOperationCallbackFactory frameOpCallbackFactory; - protected AbstractIndexModificationOperationCallback abstractModCallback; private final ISearchOperationCallbackFactory searchCallbackFactory; - private final IFrameTupleProcessor processor; - protected LSMTreeIndexAccessor lsmAccessor; + private final IFrameTupleProcessor[] processors; private final ITracer tracer; private final long traceCategory; + private final ITupleProjector tupleProjector; - private long lastRecordInTimeStamp = 0L; + private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>(); + private final boolean hasSecondaries; + private final ILSMTupleFilterCallbackFactory tupleFilterCallbackFactory; + private final int[] fieldPermutation; + private long lastRecordInTimeStamp = 0L; + private IBatchController batchController; public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition, IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc, @@@ -376,8 -312,11 +385,9 @@@ callback.open(); } }; - frameOpCallback.open(); - batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE); - } catch (Throwable e) { // NOSONAR: Re-thrown - throw HyracksDataException.create(e); + frameOpCallbacks[i].open(); } ++ batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE); } protected void resetSearchPredicate(int tupleIndex) { @@@ -417,28 -353,8 +427,29 @@@ @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); + partition2TuplesMap.clear(); int itemCount = accessor.getTupleCount(); - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController); + for (int i = 0; i < itemCount; i++) { + int storagePartition = tuplePartitioner.partition(accessor, i); + if (tupleFilterCallbacks[storagePartitionId2Index.get(storagePartition)].filter(accessor, i)) { + continue; + } + int pIdx = storagePartitionId2Index.get(storagePartition); + IntSet tupleIndexes = partition2TuplesMap.computeIfAbsent(pIdx, k -> new IntOpenHashSet()); + tupleIndexes.add(i); + } + // to ensure all partitions will be processed at least once, add partitions with missing tuples + for (int partition : storagePartitionId2Index.values()) { + partition2TuplesMap.computeIfAbsent(partition, k -> new IntOpenHashSet()); + } + for (Int2ObjectMap.Entry<IntSet> p2tuplesMapEntry : partition2TuplesMap.int2ObjectEntrySet()) { + int pIdx = p2tuplesMapEntry.getIntKey(); + LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx]; + IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx]; + IFrameTupleProcessor processor = processors[pIdx]; - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue()); ++ lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController, ++ p2tuplesMapEntry.getValue()); + } if (itemCount > 0) { lastRecordInTimeStamp = System.currentTimeMillis(); } @@@ -586,42 -495,4 +597,43 @@@ // No op since nextFrame flushes by default } + // TODO: Refactor and remove duplicated code + private void commitAtomicUpsert() throws HyracksDataException { + final Map<String, ILSMComponentId> componentIdMap = new HashMap<>(); + boolean atomic = false; + for (IIndex index : indexes) { + if (index != null && ((ILSMIndex) index).isAtomic()) { + PrimaryIndexOperationTracker opTracker = + ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker()); + opTracker.finishAllFlush(); + for (Map.Entry<String, FlushOperation> entry : opTracker.getLastFlushOperation().entrySet()) { + componentIdMap.put(entry.getKey(), entry.getValue().getFlushingComponent().getId()); + } + atomic = true; + } + } + + if (atomic) { + AtomicJobPreparedMessage message = new AtomicJobPreparedMessage(ctx.getJobletContext().getJobId(), + ctx.getJobletContext().getServiceContext().getNodeId(), componentIdMap); + try { + ((NodeControllerService) ctx.getJobletContext().getServiceContext().getControllerService()) + .sendRealTimeApplicationMessageToCC(ctx.getJobletContext().getJobId().getCcId(), + JavaSerializationUtils.serialize(message), null); + } catch (Exception e) { + throw new ACIDException(e); + } + } + } + + private void abortAtomicUpsert() throws HyracksDataException { + for (IIndex index : indexes) { + if (index != null && ((ILSMIndex) index).isAtomic()) { + PrimaryIndexOperationTracker opTracker = + ((PrimaryIndexOperationTracker) ((ILSMIndex) index).getOperationTracker()); + opTracker.abort(); + } + } + } ++ } diff --cc hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java index 3ed17b138d,abe62c22a1..ad5e91909b --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java @@@ -288,32 -287,42 +288,68 @@@ public class InvokeUtil } } + @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs + public static void tryHyracksWithCleanups(HyracksThrowingAction action, HyracksThrowingAction... cleanups) + throws HyracksDataException { + Throwable savedT = null; + boolean suppressedInterrupted = false; + try { + action.run(); + } catch (Throwable t) { + savedT = t; + } finally { + for (HyracksThrowingAction cleanup : cleanups) { + try { + cleanup.run(); + } catch (Throwable t) { + if (savedT != null) { + savedT.addSuppressed(t); + suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException; + } else { + savedT = t; + } + } + } + } + if (savedT == null) { + return; + } + if (suppressedInterrupted) { + Thread.currentThread().interrupt(); + } + if (savedT instanceof Error) { + throw (Error) savedT; + } else { + throw HyracksDataException.create(savedT); + } + } + + public static void tryWithCleanupsUnchecked(Runnable action, Runnable... cleanups) { + Throwable savedT = null; + try { + action.run(); + } catch (Throwable t) { + savedT = t; + } finally { + for (Runnable cleanup : cleanups) { + try { + cleanup.run(); + } catch (Throwable t) { + if (savedT != null) { + savedT.addSuppressed(t); + } else { + savedT = t; + } + } + } + } + if (savedT instanceof Error) { + throw (Error) savedT; + } else if (savedT != null) { + throw new UncheckedExecutionException(savedT); + } + } + /** * Runs the supplied action, after suspending any pending interruption. An error will be logged if * the action is itself interrupted. diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 214d9dc8dc,68de45ac08..dbf34c9458 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@@ -215,22 -214,21 +215,29 @@@ public interface ILSMHarness /** * Perform batch operation on all tuples in the passed frame tuple accessor * - * @param ctx the operation ctx - * @param accessor the frame tuple accessor - * @param tuple the mutable tuple used to pass the tuple to the processor - * @param processor the tuple processor - * @param frameOpCallback the callback at the end of the frame + * @param ctx + * the operation ctx + * @param accessor + * the frame tuple accessor + * @param tuple + * the mutable tuple used to pass the tuple to the processor + * @param processor + * the tuple processor + * @param frameOpCallback + * the callback at the end of the frame + * @param tuples + * the indexes of tuples to process + * @param batchController ++ * the controller of the batch lifecycle * @throws HyracksDataException */ void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple, - IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples) - IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController) - throws HyracksDataException; ++ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController, ++ Set<Integer> tuples) throws HyracksDataException; + + void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException; + + void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op) throws HyracksDataException; /** diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 2d040204f1,8ad67f7f85..d019a088a1 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@@ -715,13 -724,15 +728,15 @@@ public class LSMHarness implements ILSM @Override public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple, - IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, Set<Integer> tuples) - IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController) -- throws HyracksDataException { ++ IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController, ++ Set<Integer> tuples) throws HyracksDataException { processor.start(); - enter(ctx); + batchController.batchEnter(ctx, this, frameOpCallback); + boolean success = false; try { try { - processFrame(accessor, tuple, processor); + processFrame(accessor, tuple, processor, tuples); + success = true; frameOpCallback.frameCompleted(); } catch (Throwable th) { processor.fail(th); diff --cc hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index fb5984da61,e688727f17..c768768c0a --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@@ -211,8 -211,8 +212,9 @@@ public class LSMTreeIndexAccessor imple } public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor, - IFrameOperationCallback frameOpCallback, Set<Integer> tuples) throws HyracksDataException { - lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, tuples); - IFrameOperationCallback frameOpCallback, IBatchController batchController) throws HyracksDataException { - lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController); ++ IFrameOperationCallback frameOpCallback, IBatchController batchController, Set<Integer> tuples) ++ throws HyracksDataException { ++ lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController, tuples); } @Override
