This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 096385e34610ed4cbb238124f66282cdaa3a7e49 Author: Michael Blow <[email protected]> AuthorDate: Tue Nov 19 11:00:01 2024 -0500 [NO ISSUE][HYR][STO] Add pre-exit hook to IFrameOperationCallback Ext-ref: MB-64229 Change-Id: I911884a0f4f6d66d750e452b6b8049ad67d0b00a Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19092 Reviewed-by: Ali Alsuliman <[email protected]> Tested-by: Jenkins <[email protected]> --- .../dataflow/NoOpFrameOperationCallbackFactory.java | 5 +++++ .../LSMPrimaryUpsertOperatorNodePushable.java | 5 +++++ .../runtime/operators/StandardBatchController.java | 9 ++++++--- .../storage/am/lsm/common/api/IBatchController.java | 6 ++++-- .../am/lsm/common/api/IFrameOperationCallback.java | 14 ++++++++++++-- .../storage/am/lsm/common/api/ILSMHarness.java | 3 ++- .../storage/am/lsm/common/impls/LSMHarness.java | 20 ++++++++++++++++---- 7 files changed, 50 insertions(+), 12 deletions(-) diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java index 915f3b34e2..3ab86c0cfa 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java @@ -46,6 +46,11 @@ public class NoOpFrameOperationCallbackFactory implements IFrameOperationCallbac // No Op } + @Override + public void beforeExit(boolean success) throws HyracksDataException { + // No Op + } + @Override public void close() throws IOException { // No Op diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java index 643c6abef1..d2dc3cffb5 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java @@ -292,6 +292,11 @@ public class LSMPrimaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDe callback.frameCompleted(); } + @Override + public void beforeExit(boolean success) throws HyracksDataException { + callback.beforeExit(success); + } + @Override public void close() throws IOException { callback.close(); diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java index f9f758b64f..40465c6c5f 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java @@ -20,6 +20,7 @@ package org.apache.asterix.runtime.operators; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.common.api.IBatchController; +import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback; import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType; @@ -31,12 +32,14 @@ class StandardBatchController implements IBatchController { } @Override - public void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException { + public void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback) + throws HyracksDataException { lsmHarness.enter(ctx, LSMOperationType.MODIFICATION); } @Override - public void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException { - lsmHarness.exit(ctx, LSMOperationType.MODIFICATION); + public void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback, + boolean batchSuccessful) throws HyracksDataException { + lsmHarness.exit(ctx, callback, batchSuccessful, LSMOperationType.MODIFICATION); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java index 879a8d223b..e0615897ec 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java @@ -23,7 +23,9 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IBatchController { String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER"; - void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException; + void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback) + throws HyracksDataException; - void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) throws HyracksDataException; + void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, IFrameOperationCallback callback, + boolean batchSuccessful) throws HyracksDataException; } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java index 1f89af2f09..2fbc0c5fac 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java @@ -28,14 +28,24 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public interface IFrameOperationCallback extends Closeable { /** * Called once processing the frame is done before calling nextFrame on the next IFrameWriter in - * the pipeline + * the pipeline. In the event this frame completion will also exit the component, this will be + * called prior to {@link #beforeExit(boolean)}. * * @throws HyracksDataException */ void frameCompleted() throws HyracksDataException; /** - * Called when the task has failed. + * Called just prior to exiting the component on batch completion: not all batches may result + * in a component exit, depending on the decision of the {@link IBatchController}. + * + * @throws HyracksDataException + */ + void beforeExit(boolean success) throws HyracksDataException; + + /** + * Called when the batch processing, {@link #frameCompleted()} or {@link #beforeExit(boolean)} + * invocation has failed. * * @param th */ diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java index 721d809c64..68de45ac08 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java @@ -228,7 +228,8 @@ public interface ILSMHarness { void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws HyracksDataException; - void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException; + void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, LSMOperationType op) + throws HyracksDataException; /** * Rollback components that match the passed predicate diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 6e206869b1..017e767721 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -698,8 +698,18 @@ public class LSMHarness implements ILSMHarness { } @Override - public void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws HyracksDataException { - getAndExitComponentsAndComplete(ctx, op); + public void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, boolean success, + LSMOperationType op) throws HyracksDataException { + try { + callback.beforeExit(success); + } catch (Throwable th) { + // TODO(mblow): we don't distinguish between the three distinct phases we can encounter + // failures in the callback API- we might need this eventually + callback.fail(th); + throw th; + } finally { + getAndExitComponentsAndComplete(ctx, op); + } } private void getAndExitComponentsAndComplete(ILSMIndexOperationContext ctx, LSMOperationType op) @@ -717,10 +727,12 @@ public class LSMHarness implements ILSMHarness { IFrameTupleProcessor processor, IFrameOperationCallback frameOpCallback, IBatchController batchController) throws HyracksDataException { processor.start(); - batchController.batchEnter(this, ctx); + batchController.batchEnter(ctx, this, frameOpCallback); + boolean success = false; try { try { processFrame(accessor, tuple, processor); + success = true; frameOpCallback.frameCompleted(); } catch (Throwable th) { processor.fail(th); @@ -732,7 +744,7 @@ public class LSMHarness implements ILSMHarness { LOGGER.warn("Failed to process frame", e); throw e; } finally { - batchController.batchExit(this, ctx); + batchController.batchExit(ctx, this, frameOpCallback, success); ctx.logPerformanceCounters(accessor.getTupleCount()); } }
