This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 096385e34610ed4cbb238124f66282cdaa3a7e49
Author: Michael Blow <[email protected]>
AuthorDate: Tue Nov 19 11:00:01 2024 -0500

    [NO ISSUE][HYR][STO] Add pre-exit hook to IFrameOperationCallback
    
    Ext-ref: MB-64229
    Change-Id: I911884a0f4f6d66d750e452b6b8049ad67d0b00a
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19092
    Reviewed-by: Ali Alsuliman <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../dataflow/NoOpFrameOperationCallbackFactory.java  |  5 +++++
 .../LSMPrimaryUpsertOperatorNodePushable.java        |  5 +++++
 .../runtime/operators/StandardBatchController.java   |  9 ++++++---
 .../storage/am/lsm/common/api/IBatchController.java  |  6 ++++--
 .../am/lsm/common/api/IFrameOperationCallback.java   | 14 ++++++++++++--
 .../storage/am/lsm/common/api/ILSMHarness.java       |  3 ++-
 .../storage/am/lsm/common/impls/LSMHarness.java      | 20 ++++++++++++++++----
 7 files changed, 50 insertions(+), 12 deletions(-)

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
index 915f3b34e2..3ab86c0cfa 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/NoOpFrameOperationCallbackFactory.java
@@ -46,6 +46,11 @@ public class NoOpFrameOperationCallbackFactory implements 
IFrameOperationCallbac
             // No Op
         }
 
+        @Override
+        public void beforeExit(boolean success) throws HyracksDataException {
+            // No Op
+        }
+
         @Override
         public void close() throws IOException {
             // No Op
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 643c6abef1..d2dc3cffb5 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -292,6 +292,11 @@ public class LSMPrimaryUpsertOperatorNodePushable extends 
LSMIndexInsertUpdateDe
                     callback.frameCompleted();
                 }
 
+                @Override
+                public void beforeExit(boolean success) throws 
HyracksDataException {
+                    callback.beforeExit(success);
+                }
+
                 @Override
                 public void close() throws IOException {
                     callback.close();
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
index f9f758b64f..40465c6c5f 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/StandardBatchController.java
@@ -20,6 +20,7 @@ package org.apache.asterix.runtime.operators;
 
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.storage.am.lsm.common.api.IBatchController;
+import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMHarness;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext;
 import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
@@ -31,12 +32,14 @@ class StandardBatchController implements IBatchController {
     }
 
     @Override
-    public void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext 
ctx) throws HyracksDataException {
+    public void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness 
lsmHarness, IFrameOperationCallback callback)
+            throws HyracksDataException {
         lsmHarness.enter(ctx, LSMOperationType.MODIFICATION);
     }
 
     @Override
-    public void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext 
ctx) throws HyracksDataException {
-        lsmHarness.exit(ctx, LSMOperationType.MODIFICATION);
+    public void batchExit(ILSMIndexOperationContext ctx, ILSMHarness 
lsmHarness, IFrameOperationCallback callback,
+            boolean batchSuccessful) throws HyracksDataException {
+        lsmHarness.exit(ctx, callback, batchSuccessful, 
LSMOperationType.MODIFICATION);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
index 879a8d223b..e0615897ec 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IBatchController.java
@@ -23,7 +23,9 @@ import org.apache.hyracks.api.exceptions.HyracksDataException;
 public interface IBatchController {
     String KEY_BATCH_CONTROLLER = "BATCH_CONTROLLER";
 
-    void batchEnter(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) 
throws HyracksDataException;
+    void batchEnter(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, 
IFrameOperationCallback callback)
+            throws HyracksDataException;
 
-    void batchExit(ILSMHarness lsmHarness, ILSMIndexOperationContext ctx) 
throws HyracksDataException;
+    void batchExit(ILSMIndexOperationContext ctx, ILSMHarness lsmHarness, 
IFrameOperationCallback callback,
+            boolean batchSuccessful) throws HyracksDataException;
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
index 1f89af2f09..2fbc0c5fac 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/IFrameOperationCallback.java
@@ -28,14 +28,24 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 public interface IFrameOperationCallback extends Closeable {
     /**
      * Called once processing the frame is done before calling nextFrame on 
the next IFrameWriter in
-     * the pipeline
+     * the pipeline. In the event this frame completion will also exit the 
component, this will be
+     * called prior to {@link #beforeExit(boolean)}.
      *
      * @throws HyracksDataException
      */
     void frameCompleted() throws HyracksDataException;
 
     /**
-     * Called when the task has failed.
+     * Called just prior to exiting the component on batch completion: not all 
batches may result
+     * in a component exit, depending on the decision of the {@link 
IBatchController}.
+     *
+     * @throws HyracksDataException
+     */
+    void beforeExit(boolean success) throws HyracksDataException;
+
+    /**
+     * Called when the batch processing, {@link #frameCompleted()} or {@link 
#beforeExit(boolean)}
+     * invocation has failed.
      *
      * @param th
      */
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
index 721d809c64..68de45ac08 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMHarness.java
@@ -228,7 +228,8 @@ public interface ILSMHarness {
 
     void enter(ILSMIndexOperationContext ctx, LSMOperationType opType) throws 
HyracksDataException;
 
-    void exit(ILSMIndexOperationContext ctx, LSMOperationType op) throws 
HyracksDataException;
+    void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback callback, 
boolean success, LSMOperationType op)
+            throws HyracksDataException;
 
     /**
      * Rollback components that match the passed predicate
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index 6e206869b1..017e767721 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -698,8 +698,18 @@ public class LSMHarness implements ILSMHarness {
     }
 
     @Override
-    public void exit(ILSMIndexOperationContext ctx, LSMOperationType op) 
throws HyracksDataException {
-        getAndExitComponentsAndComplete(ctx, op);
+    public void exit(ILSMIndexOperationContext ctx, IFrameOperationCallback 
callback, boolean success,
+            LSMOperationType op) throws HyracksDataException {
+        try {
+            callback.beforeExit(success);
+        } catch (Throwable th) {
+            // TODO(mblow): we don't distinguish between the three distinct 
phases we can encounter
+            //              failures in the callback API- we might need this 
eventually
+            callback.fail(th);
+            throw th;
+        } finally {
+            getAndExitComponentsAndComplete(ctx, op);
+        }
     }
 
     private void getAndExitComponentsAndComplete(ILSMIndexOperationContext 
ctx, LSMOperationType op)
@@ -717,10 +727,12 @@ public class LSMHarness implements ILSMHarness {
             IFrameTupleProcessor processor, IFrameOperationCallback 
frameOpCallback, IBatchController batchController)
             throws HyracksDataException {
         processor.start();
-        batchController.batchEnter(this, ctx);
+        batchController.batchEnter(ctx, this, frameOpCallback);
+        boolean success = false;
         try {
             try {
                 processFrame(accessor, tuple, processor);
+                success = true;
                 frameOpCallback.frameCompleted();
             } catch (Throwable th) {
                 processor.fail(th);
@@ -732,7 +744,7 @@ public class LSMHarness implements ILSMHarness {
             LOGGER.warn("Failed to process frame", e);
             throw e;
         } finally {
-            batchController.batchExit(this, ctx);
+            batchController.batchExit(ctx, this, frameOpCallback, success);
             ctx.logPerformanceCounters(accessor.getTupleCount());
         }
     }

Reply via email to