This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new db4de73f36 IGNITE-19062 Add handlers to flow pipeline (#1814)
db4de73f36 is described below
commit db4de73f363c72b7a51cbe209f693dcc10747133
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Mon Mar 20 18:23:15 2023 +0300
IGNITE-19062 Add handlers to flow pipeline (#1814)
---
.../cli/core/flow/builder/FlowBuilder.java | 41 ++++++++++++++++++++++
.../cli/core/flow/builder/FlowBuilderImpl.java | 30 +++++++++++++++-
.../internal/cli/commands/flow/FlowTest.java | 10 ++++++
3 files changed, 80 insertions(+), 1 deletion(-)
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
index a51dfb0295..c5d28305e0 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilder.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.cli.core.flow.builder;
import java.util.List;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.ignite.internal.cli.core.decorator.Decorator;
@@ -102,6 +103,46 @@ public interface FlowBuilder<I, O> {
*/
FlowBuilder<I, O> exceptionHandler(ExceptionHandler<?> exceptionHandler);
+ /**
+ * Adds success handler to the flow chain which will be called at the end
of the flow if the flow has succeeded. The order in which
+ * handlers are executed is not determined.
+ *
+ * @param handler handler
+ * @return instance of builder
+ */
+ FlowBuilder<I, O> onSuccess(Consumer<O> handler);
+
+ /**
+ * Adds success handler to the flow chain which will be called at the end
of the flow if the flow has succeeded. The order in which
+ * handlers are executed is not determined.
+ *
+ * @param handler handler
+ * @return instance of builder
+ */
+ default FlowBuilder<I, O> onSuccess(Runnable handler) {
+ return onSuccess(result -> handler.run());
+ }
+
+ /**
+ * Adds failure handler to the flow chain which will be called at the end
of the flow if flow resulted in error. The order in which
+ * handlers are executed is not determined.
+ *
+ * @param handler handler
+ * @return instance of builder
+ */
+ FlowBuilder<I, O> onFailure(Consumer<Throwable> handler);
+
+ /**
+ * Adds failure handler to the flow chain which will be called at the end
of the flow if flow resulted in error. The order in which
+ * handlers are executed is not determined.
+ *
+ * @param handler handler
+ * @return instance of builder
+ */
+ default FlowBuilder<I, O> onFailure(Runnable handler) {
+ return onFailure(throwable -> handler.run());
+ }
+
/**
* Adds verbose output from debug log to the output.
*
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
index b2889c5241..50a30d7556 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/flow/builder/FlowBuilderImpl.java
@@ -18,7 +18,10 @@
package org.apache.ignite.internal.cli.core.flow.builder;
import java.io.PrintWriter;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.ignite.internal.cli.core.decorator.Decorator;
@@ -47,6 +50,8 @@ public class FlowBuilderImpl<I, O> implements FlowBuilder<I,
O> {
private final Flow<I, O> flow;
private final ExceptionHandlers exceptionHandlers;
private final DecoratorRegistry decoratorRegistry;
+ private final Set<Consumer<O>> successHandlers = new HashSet<>();
+ private final Set<Consumer<Throwable>> failureHandlers = new HashSet<>();
private boolean verbose;
FlowBuilderImpl(Flow<I, O> flow) {
@@ -70,7 +75,18 @@ public class FlowBuilderImpl<I, O> implements FlowBuilder<I,
O> {
@Override
public <OT> FlowBuilder<I, OT> then(Flow<O, OT> flow) {
- return new FlowBuilderImpl<>(this.flow.composite(flow),
exceptionHandlers, decoratorRegistry, verbose);
+ Flow<I, OT> composite = input -> {
+ Flowable<O> outputFlowable = this.flow.start(input);
+ if (outputFlowable.hasResult()) {
+ O result = outputFlowable.value();
+ successHandlers.forEach(handler -> handler.accept(result));
+ } else if (outputFlowable.hasError()) {
+ Throwable error = outputFlowable.errorCause();
+ failureHandlers.forEach(handler -> handler.accept(error));
+ }
+ return flow.start(outputFlowable);
+ };
+ return new FlowBuilderImpl<>(composite, exceptionHandlers,
decoratorRegistry, verbose);
}
@Override
@@ -106,6 +122,18 @@ public class FlowBuilderImpl<I, O> implements
FlowBuilder<I, O> {
return this;
}
+ @Override
+ public FlowBuilder<I, O> onSuccess(Consumer<O> handler) {
+ successHandlers.add(handler);
+ return this;
+ }
+
+ @Override
+ public FlowBuilder<I, O> onFailure(Consumer<Throwable> handler) {
+ failureHandlers.add(handler);
+ return this;
+ }
+
@Override
public FlowBuilder<I, O> verbose(boolean verbose) {
this.verbose = verbose;
diff --git
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
index 5b6521b14b..700330e890 100644
---
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
+++
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/commands/flow/FlowTest.java
@@ -314,6 +314,16 @@ class FlowTest {
assertThat(out.toString(), equalTo("fizz1" + System.lineSeparator()));
}
+ @Test
+ void testHandlers() {
+ String value = "foo";
+ Exception exception = new Exception();
+ Flows.from(value)
+ .onSuccess(result -> assertThat(result, equalTo(value)))
+ .then(input -> Flowable.failure(exception))
+ .onFailure(ex -> assertThat(ex, equalTo(exception)))
+ .start();
+ }
private void bindAnswers(String... answers) throws IOException {
Files.writeString(input, String.join("\n", answers) + "\n");