This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit f11758e0d86d07e687f21eb432e1ea2d0bb08c3e Author: Michael Blow <[email protected]> AuthorDate: Fri Nov 15 06:26:05 2024 -0500 [NO ISSUE][HYR][STO,MISC] LSM enhancements, misc utilities - provide ability to intercept entering / exiting components on batch LSM operations - += HyracksThrowingAction, InvokeUtil.tryHyracksWithCleanups Ext-ref: MB-64229 Change-Id: I03f573d44b170f0c4d889920a3991592fb2890e1 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19072 Reviewed-by: Michael Blow <[email protected]> Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Michael Blow <[email protected]> --- .../LSMPrimaryInsertOperatorNodePushable.java | 10 +++++- .../LSMPrimaryUpsertOperatorNodePushable.java | 8 ++++- .../runtime/operators/StandardBatchController.java | 42 ++++++++++++++++++++++ .../apache/hyracks/api/comm/IFrameAppender.java | 2 +- .../hyracks/api/util/HyracksThrowingAction.java | 26 ++++++++++++++ .../org/apache/hyracks/api/util/InvokeUtil.java | 36 +++++++++++++++++++ .../common/io/MessagingFrameTupleAppender.java | 9 ----- .../hyracks/dataflow/common/utils/TaskUtil.java | 14 ++++++++ .../am/lsm/common/api/IBatchController.java | 29 +++++++++++++++ .../storage/am/lsm/common/api/ILSMHarness.java | 34 +++++++----------- .../storage/am/lsm/common/impls/LSMHarness.java | 18 ++++++---- .../am/lsm/common/impls/LSMTreeIndexAccessor.java | 5 +-- 12 files changed, 190 insertions(+), 43 deletions(-) diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java index eadb614799..5c879944fd 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.runtime.operators; +import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER; + import java.nio.ByteBuffer; import org.apache.asterix.common.api.INcApplicationContext; @@ -33,6 +35,7 @@ import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.api.util.CleanupUtils; +import org.apache.hyracks.api.util.HyracksThrowingAction; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; import org.apache.hyracks.dataflow.common.comm.util.FrameUtils; @@ -50,6 +53,7 @@ import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters; import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable; @@ -77,6 +81,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDe private boolean flushedPartialTuples; private int currentTupleIdx; private int lastFlushedTupleIdx; + private IBatchController batchController; private final PermutingFrameTupleReference keyTuple; @@ -116,6 +121,8 @@ public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDe protected IFrameTupleProcessor createTupleProcessor(SourceLocation sourceLoc) { return new IFrameTupleProcessor() { + private HyracksThrowingAction exitAction; + @Override public void process(ITupleReference tuple, int index) throws HyracksDataException { if (index < currentTupleIdx) { @@ -219,6 +226,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDe (INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext(); LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index, appCtx.getTransactionSubsystem().getLogManager()); + batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE); } catch (Throwable e) { // NOSONAR: Re-thrown throw HyracksDataException.create(e); } @@ -227,7 +235,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDe @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback); + lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController); writeBuffer.ensureFrameSize(buffer.capacity()); if (flushedPartialTuples) { diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index 3b8ee68370..643c6abef1 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -18,6 +18,8 @@ */ package org.apache.asterix.runtime.operators; +import static org.apache.hyracks.storage.am.lsm.common.api.IBatchController.KEY_BATCH_CONTROLLER; + import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; @@ -62,6 +64,7 @@ import org.apache.hyracks.storage.am.common.api.ITreeIndex; import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory; import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; @@ -117,6 +120,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe private final ITracer tracer; private final long traceCategory; private long lastRecordInTimeStamp = 0L; + private IBatchController batchController; public LSMPrimaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition, IIndexDataflowHelperFactory indexHelperFactory, int[] fieldPermutation, RecordDescriptor inputRecDesc, @@ -304,6 +308,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe } }; frameOpCallback.open(); + batchController = TaskUtil.getOrDefault(KEY_BATCH_CONTROLLER, ctx, StandardBatchController.INSTANCE); } catch (Throwable e) { // NOSONAR: Re-thrown throw HyracksDataException.create(e); } @@ -344,7 +349,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe public void nextFrame(ByteBuffer buffer) throws HyracksDataException { accessor.reset(buffer); int itemCount = accessor.getTupleCount(); - lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback); + lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, batchController); if (itemCount > 0) { lastRecordInTimeStamp = System.currentTimeMillis(); } @@ -484,4 +489,5 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe public void flush() throws HyracksDataException { // No op since nextFrame flushes by default } + } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java new file mode 100644 index 0000000000..f9f758b64f --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.operators; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; +import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; + +class StandardBatchController implements IBatchController { + static final IBatchController INSTANCE = new StandardBatchController(); + + private StandardBatchController() { + } + + @Override + public void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException { + lsmHarness.enter(ctx, LSMOperationType.MODIFICATION); + } + + @Override + public void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException { + lsmHarness.exit(ctx, LSMOperationType.MODIFICATION); + } +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java index 0fc24c7ea2..98215bf738 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/IFrameAppender.java @@ -63,7 +63,7 @@ public interface IFrameAppender { * @param writer the FrameWriter to write to and flush * @throws HyracksDataException */ - public default void flush(IFrameWriter writer) throws HyracksDataException { + default void flush(IFrameWriter writer) throws HyracksDataException { write(writer, true); writer.flush(); } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java new file mode 100644 index 0000000000..7e3d5997b0 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksThrowingAction.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.api.util; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +@FunctionalInterface +public interface HyracksThrowingAction { + void run() throws HyracksDataException; // NOSONAR +} diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java index f4b6e20858..d331ab2135 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/InvokeUtil.java @@ -285,6 +285,42 @@ public class InvokeUtil { } } + @SuppressWarnings({ "squid:S1181", "squid:S1193", "ConstantConditions" }) // catching Throwable, instanceofs + public static void tryHyracksWithCleanups(HyracksThrowingAction action, HyracksThrowingAction... cleanups) + throws HyracksDataException { + Throwable savedT = null; + boolean suppressedInterrupted = false; + try { + action.run(); + } catch (Throwable t) { + savedT = t; + } finally { + for (HyracksThrowingAction cleanup : cleanups) { + try { + cleanup.run(); + } catch (Throwable t) { + if (savedT != null) { + savedT.addSuppressed(t); + suppressedInterrupted = suppressedInterrupted || t instanceof InterruptedException; + } else { + savedT = t; + } + } + } + } + if (savedT == null) { + return; + } + if (suppressedInterrupted) { + Thread.currentThread().interrupt(); + } + if (savedT instanceof Error) { + throw (Error) savedT; + } else { + throw HyracksDataException.create(savedT); + } + } + /** * Runs the supplied action, after suspending any pending interruption. An error will be logged if * the action is itself interrupted. diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java index c27a7e68dd..6c073f3bce 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java @@ -172,13 +172,4 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender { ++tupleCount; IntSerDeUtils.putInt(getBuffer().array(), FrameHelper.getTupleCountOffset(frame.getFrameSize()), tupleCount); } - - /* - * Always write and then flush to send out the message if exists - */ - @Override - public void flush(IFrameWriter writer) throws HyracksDataException { - write(writer, true); - writer.flush(); - } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java index 75c95b0053..b77883d254 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/utils/TaskUtil.java @@ -74,4 +74,18 @@ public class TaskUtil { Map<String, Object> sharedMap = TaskUtil.getSharedMap(ctx, false); return sharedMap == null ? null : (T) sharedMap.get(key); } + + /** + * get a <T> object from the shared map of the task, or returns the default value + * + * @param key + * @param ctx + * @param defaultValue + * @return the value associated with the key casted as T + */ + @SuppressWarnings("unchecked") + public static <T> T getOrDefault(String key, IHyracksTaskContext ctx, T defaultValue) { + Map<String, T> sharedMap = (Map<String, T>) TaskUtil.getSharedMap(ctx, false); + return sharedMap == null ? defaultValue : sharedMap.getOrDefault(key, defaultValue); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java new file mode 100644 index 0000000000..879a8d223b --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.common.api; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public interface IBatchController { + String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER"; + + void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException; + + void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException; +} diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 9e8c568bc6..721d809c64 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -39,7 +39,6 @@ public interface ILSMHarness { * @param tuple * the operation tuple * @throws HyracksDataException - * @throws IndexException */ void forceModify(ILSMIndexOperationContext ctx, ITupleReference tuple) throws HyracksDataException; @@ -54,7 +53,6 @@ public interface ILSMHarness { * the operation tuple * @return * @throws HyracksDataException - * @throws IndexException */ boolean modify(ILSMIndexOperationContext ctx, boolean tryOperation, ITupleReference tuple) throws HyracksDataException; @@ -69,7 +67,6 @@ public interface ILSMHarness { * @param pred * the search predicate * @throws HyracksDataException - * @throws IndexException */ void search(ILSMIndexOperationContext ctx, IIndexCursor cursor, ISearchPredicate pred) throws HyracksDataException; @@ -104,9 +101,7 @@ public interface ILSMHarness { * Schedule a merge * * @param ctx - * @param callback * @throws HyracksDataException - * @throws IndexException */ ILSMIOOperation scheduleMerge(ILSMIndexOperationContext ctx) throws HyracksDataException; @@ -114,9 +109,7 @@ public interface ILSMHarness { * Schedule full merge * * @param ctx - * @param callback * @throws HyracksDataException - * @throws IndexException */ ILSMIOOperation scheduleFullMerge(ILSMIndexOperationContext ctx) throws HyracksDataException; @@ -125,7 +118,6 @@ public interface ILSMHarness { * * @param operation * @throws HyracksDataException - * @throws IndexException */ void merge(ILSMIOOperation operation) throws HyracksDataException; @@ -133,7 +125,6 @@ public interface ILSMHarness { * Schedule a flush * * @param ctx - * @param callback * @throws HyracksDataException */ ILSMIOOperation scheduleFlush(ILSMIndexOperationContext ctx) throws HyracksDataException; @@ -143,7 +134,6 @@ public interface ILSMHarness { * * @param operation * @throws HyracksDataException - * @throws IndexException */ void flush(ILSMIOOperation operation) throws HyracksDataException; @@ -153,7 +143,6 @@ public interface ILSMHarness { * @param ioOperation * the io operation that added the new component * @throws HyracksDataException - * @throws IndexException */ void addBulkLoadedComponent(ILSMIOOperation ioOperation) throws HyracksDataException; @@ -225,20 +214,21 @@ public interface ILSMHarness { /** * Perform batch operation on all tuples in the passed frame tuple accessor * - * @param ctx - * the operation ctx - * @param accessor - * the frame tuple accessor - * @param tuple - * the mutable tuple used to pass the tuple to the processor - * @param processor - * the tuple processor - * @param frameOpCallback - * the callback at the end of the frame + * @param ctx the operation ctx + * @param accessor the frame tuple accessor + * @param tuple the mutable tuple used to pass the tuple to the processor + * @param processor the tuple processor + * @param frameOpCallback the callback at the end of the frame + * @param batchController * @throws HyracksDataException */ void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple, - IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException; + IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController) + throws HyracksDataException; + + void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException; + + void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException; /** * Rollback components that match the passed predicate diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 950a8e5b23..6e206869b1 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -33,6 +33,7 @@ import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; @@ -688,15 +689,17 @@ public class LSMHarness implements ILSMHarness { lsmIndex.updateFilter(ctx, tuple); } - private void enter(ILSMIndexOperationContext ctx) throws HyracksDataException { + @Override + public void enter(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException { if (!lsmIndex.isMemoryComponentsAllocated()) { lsmIndex.allocateMemoryComponents(); } - getAndEnterComponents(ctx, LSMOperationType.MODIFICATION, false); + getAndEnterComponents(ctx, op, false); } - private void exit(ILSMIndexOperationContext ctx) throws HyracksDataException { - getAndExitComponentsAndComplete(ctx, LSMOperationType.MODIFICATION); + @Override + public void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException { + getAndExitComponentsAndComplete(ctx, op); } private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op) @@ -711,9 +714,10 @@ public class LSMHarness implements ILSMHarness { @Override public void batchOperate(ILSMIndexOperationContext ctx, FrameTupleAccessor accessor, FrameTupleReference tuple, - IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback) throws HyracksDataException { + IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController) + throws HyracksDataException { processor.start(); - enter(ctx); + batchController.batchEnter(this, ctx); try { try { processFrame(accessor, tuple, processor); @@ -728,7 +732,7 @@ public class LSMHarness implements ILSMHarness { LOGGER.warn("Failed to process frame", e); throw e; } finally { - exit(ctx); + batchController.batchExit(this, ctx); ctx.logPerformanceCounters(accessor.getTupleCount()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java index 8412b8c648..e688727f17 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMTreeIndexAccessor.java @@ -28,6 +28,7 @@ import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent; @@ -210,8 +211,8 @@ public class LSMTreeIndexAccessor implements ILSMIndexAccessor { } public void batchOperate(FrameTupleAccessor accessor, FrameTupleReference tuple, IFrameTupleProcessor processor, - IFrameOperationCallback frameOpCallback) throws HyracksDataException { - lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback); + IFrameOperationCallback frameOpCallback, IBatchController batchController) throws HyracksDataException { + lsmHarness.batchOperate(ctx, accessor, tuple, processor, frameOpCallback, batchController); } @Override
