This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new db4de73f36 IGNITE-19062 Add handlers to flow pipeline (#1814)
db4de73f36 is described below

commit db4de73f363c72b7a51cbe209f693dcc10747133
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Mon Mar 20 18:23:15 2023 +0300

    IGNITE-19062 Add handlers to flow pipeline (#1814)
---
 .../cli/core/flow/builder/FlowBuilder.java         | 41 ++++++++++++++++++++++
 .../cli/core/flow/builder/FlowBuilderImpl.java     | 30 +++++++++++++++-
 .../internal/cli/commands/flow/FlowTest.java       | 10 ++++++
 3 files changed, 80 insertions(+), 1 deletion(-)

diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
index a51dfb0295..c5d28305e0 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.cli.core.flow.builder;
 
 import java.util.List;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.cli.core.decorator.Decorator;
@@ -102,6 +103,46 @@ public interface FlowBuilder<I, O>  {
      */
     FlowBuilder<I, O> exceptionHandler(ExceptionHandler<?> exceptionHandler);
 
+    /**
+     * Adds success handler to the flow chain which will be called at the end 
of the flow if the flow has succeeded. The order in which
+     * handlers are executed is not determined.
+     *
+     * @param handler handler
+     * @return instance of builder
+     */
+    FlowBuilder<I, O> onSuccess(Consumer<O> handler);
+
+    /**
+     * Adds success handler to the flow chain which will be called at the end 
of the flow if the flow has succeeded. The order in which
+     * handlers are executed is not determined.
+     *
+     * @param handler handler
+     * @return instance of builder
+     */
+    default FlowBuilder<I, O> onSuccess(Runnable handler) {
+        return onSuccess(result -> handler.run());
+    }
+
+    /**
+     * Adds failure handler to the flow chain which will be called at the end 
of the flow if flow resulted in error. The order in which
+     * handlers are executed is not determined.
+     *
+     * @param handler handler
+     * @return instance of builder
+     */
+    FlowBuilder<I, O> onFailure(Consumer<Throwable> handler);
+
+    /**
+     * Adds failure handler to the flow chain which will be called at the end 
of the flow if flow resulted in error. The order in which
+     * handlers are executed is not determined.
+     *
+     * @param handler handler
+     * @return instance of builder
+     */
+    default FlowBuilder<I, O> onFailure(Runnable handler) {
+        return onFailure(throwable -> handler.run());
+    }
+
     /**
      * Adds verbose output from debug log to the output.
      *
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
index b2889c5241..50a30d7556 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
@@ -18,7 +18,10 @@
 package org.apache.ignite.internal.cli.core.flow.builder;
 
 import java.io.PrintWriter;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import org.apache.ignite.internal.cli.core.decorator.Decorator;
@@ -47,6 +50,8 @@ public class FlowBuilderImpl<I, O> implements FlowBuilder<I, 
O> {
     private final Flow<I, O> flow;
     private final ExceptionHandlers exceptionHandlers;
     private final DecoratorRegistry decoratorRegistry;
+    private final Set<Consumer<O>> successHandlers = new HashSet<>();
+    private final Set<Consumer<Throwable>> failureHandlers = new HashSet<>();
     private boolean verbose;
 
     FlowBuilderImpl(Flow<I, O> flow) {
@@ -70,7 +75,18 @@ public class FlowBuilderImpl<I, O> implements FlowBuilder<I, 
O> {
 
     @Override
     public <OT> FlowBuilder<I, OT> then(Flow<O, OT> flow) {
-        return new FlowBuilderImpl<>(this.flow.composite(flow), 
exceptionHandlers, decoratorRegistry, verbose);
+        Flow<I, OT> composite = input -> {
+            Flowable<O> outputFlowable = this.flow.start(input);
+            if (outputFlowable.hasResult()) {
+                O result = outputFlowable.value();
+                successHandlers.forEach(handler -> handler.accept(result));
+            } else if (outputFlowable.hasError()) {
+                Throwable error = outputFlowable.errorCause();
+                failureHandlers.forEach(handler -> handler.accept(error));
+            }
+            return flow.start(outputFlowable);
+        };
+        return new FlowBuilderImpl<>(composite, exceptionHandlers, 
decoratorRegistry, verbose);
     }
 
     @Override
@@ -106,6 +122,18 @@ public class FlowBuilderImpl<I, O> implements 
FlowBuilder<I, O> {
         return this;
     }
 
+    @Override
+    public FlowBuilder<I, O> onSuccess(Consumer<O> handler) {
+        successHandlers.add(handler);
+        return this;
+    }
+
+    @Override
+    public FlowBuilder<I, O> onFailure(Consumer<Throwable> handler) {
+        failureHandlers.add(handler);
+        return this;
+    }
+
     @Override
     public FlowBuilder<I, O> verbose(boolean verbose) {
         this.verbose = verbose;
diff --git 
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
 
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
index 5b6521b14b..700330e890 100644
--- 
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
+++ 
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
@@ -314,6 +314,16 @@ class FlowTest {
         assertThat(out.toString(), equalTo("fizz1" + System.lineSeparator()));
     }
 
+    @Test
+    void testHandlers() {
+        String value = "foo";
+        Exception exception = new Exception();
+        Flows.from(value)
+                .onSuccess(result -> assertThat(result, equalTo(value)))
+                .then(input -> Flowable.failure(exception))
+                .onFailure(ex -> assertThat(ex, equalTo(exception)))
+                .start();
+    }
 
     private void bindAnswers(String... answers) throws IOException {
         Files.writeString(input, String.join("\n", answers) + "\n");

Reply via email to