This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new df30ee3674 IGNITE-23153 Some CLI commands don't print verbose log 
(#4339)
df30ee3674 is described below

commit df30ee367402c68ee82ea356ef4c3607e934daaf
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Sep 10 13:35:03 2024 +0300

    IGNITE-23153 Some CLI commands don't print verbose log (#4339)
    
    Move pipeline running into BaseCommand so we won't forget to add
    --verbose to any command.
---
 .../ignite/internal/cli/commands/BaseCommand.java  |  30 +++++-
 .../ignite/internal/cli/commands/Options.java      |   2 +-
 .../commands/cliconfig/CliConfigGetCommand.java    |   8 +-
 .../cliconfig/CliConfigGetReplCommand.java         |   8 +-
 .../commands/cliconfig/CliConfigRemoveCommand.java |   8 +-
 .../cliconfig/CliConfigRemoveReplCommand.java      |   8 +-
 .../commands/cliconfig/CliConfigSetCommand.java    |   8 +-
 .../cliconfig/CliConfigSetReplCommand.java         |   8 +-
 .../commands/cliconfig/CliConfigShowCommand.java   |   8 +-
 .../cliconfig/CliConfigShowReplCommand.java        |   8 +-
 .../profile/CliConfigProfileActivateCommand.java   |   7 +-
 .../profile/CliConfigProfileCreateCommand.java     |   7 +-
 .../profile/CliConfigProfileListCommand.java       |   8 +-
 .../profile/CliConfigProfileShowCommand.java       |   8 +-
 .../cluster/config/ClusterConfigShowCommand.java   |   8 +-
 .../config/ClusterConfigShowReplCommand.java       |   5 +-
 .../cluster/config/ClusterConfigUpdateCommand.java |   8 +-
 .../config/ClusterConfigUpdateReplCommand.java     |   5 +-
 .../commands/cluster/init/ClusterInitCommand.java  |   8 +-
 .../cluster/init/ClusterInitReplCommand.java       |   5 +-
 .../cluster/status/ClusterStatusCommand.java       |   8 +-
 .../cluster/status/ClusterStatusReplCommand.java   |   5 +-
 .../cluster/topology/LogicalTopologyCommand.java   |   8 +-
 .../topology/LogicalTopologyReplCommand.java       |   4 +-
 .../cluster/topology/PhysicalTopologyCommand.java  |   8 +-
 .../topology/PhysicalTopologyReplCommand.java      |   4 +-
 .../cluster/unit/ClusterUnitDeployCommand.java     |   7 +-
 .../cluster/unit/ClusterUnitDeployReplCommand.java |   7 +-
 .../cluster/unit/ClusterUnitListCommand.java       |   9 +-
 .../cluster/unit/ClusterUnitListReplCommand.java   |   5 +-
 .../cluster/unit/ClusterUnitUndeployCommand.java   |   9 +-
 .../unit/ClusterUnitUndeployReplCommand.java       |   5 +-
 .../cli/commands/connect/ConnectCommand.java       |   8 +-
 .../cli/commands/connect/ConnectReplCommand.java   |   5 +-
 .../cli/commands/connect/DisconnectCommand.java    |  10 +-
 .../node/config/NodeConfigShowCommand.java         |   8 +-
 .../node/config/NodeConfigShowReplCommand.java     |   5 +-
 .../node/config/NodeConfigUpdateCommand.java       |   8 +-
 .../node/config/NodeConfigUpdateReplCommand.java   |   5 +-
 .../node/metric/NodeMetricSetListCommand.java      |   8 +-
 .../node/metric/NodeMetricSetListReplCommand.java  |   4 +-
 .../metric/NodeMetricSourceDisableCommand.java     |   8 +-
 .../metric/NodeMetricSourceDisableReplCommand.java |   5 +-
 .../node/metric/NodeMetricSourceEnableCommand.java |   8 +-
 .../metric/NodeMetricSourceEnableReplCommand.java  |   5 +-
 .../node/metric/NodeMetricSourceListCommand.java   |   8 +-
 .../metric/NodeMetricSourceListReplCommand.java    |   4 +-
 .../commands/node/status/NodeStatusCommand.java    |   8 +-
 .../node/status/NodeStatusReplCommand.java         |   5 +-
 .../commands/node/unit/NodeUnitListCommand.java    |   9 +-
 .../node/unit/NodeUnitListReplCommand.java         |   5 +-
 .../commands/node/version/NodeVersionCommand.java  |  10 +-
 .../node/version/NodeVersionReplCommand.java       |   5 +-
 .../partitions/reset/ResetPartitionsCommand.java   |   8 +-
 .../reset/ResetPartitionsReplCommand.java          |   5 +-
 .../restart/RestartPartitionsCommand.java          |   8 +-
 .../restart/RestartPartitionsReplCommand.java      |   5 +-
 .../partitions/states/PartitionStatesCommand.java  |   8 +-
 .../states/PartitionStatesReplCommand.java         |   7 +-
 .../internal/cli/commands/sql/SqlCommand.java      |   7 +-
 .../call/AsyncCallExecutionPipelineBuilder.java    |   6 +-
 .../cli/core/call/CallExecutionPipeline.java       |   4 +-
 .../core/call/CallExecutionPipelineBuilder.java    | 115 ++++++---------------
 ...ava => SingleCallExecutionPipelineBuilder.java} |  26 +++--
 64 files changed, 205 insertions(+), 379 deletions(-)

diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/BaseCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/BaseCommand.java
index 1280f28ec6..fb81d3b115 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/BaseCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/BaseCommand.java
@@ -36,6 +36,9 @@ import static 
org.apache.ignite.internal.cli.commands.Options.Constants.VERBOSE_
 import static 
org.apache.ignite.internal.cli.commands.Options.Constants.VERBOSE_OPTION_DESC;
 import static 
org.apache.ignite.internal.cli.commands.Options.Constants.VERBOSE_OPTION_SHORT;
 
+import org.apache.ignite.internal.cli.core.call.CallExecutionPipelineBuilder;
+import org.apache.ignite.internal.cli.core.call.CallInput;
+import org.apache.ignite.internal.cli.core.flow.builder.FlowBuilder;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Model.CommandSpec;
 import picocli.CommandLine.Option;
@@ -58,8 +61,9 @@ import picocli.CommandLine.Spec;
 )
 public abstract class BaseCommand {
     /** Help option specification. */
+    @SuppressWarnings("unused")
     @Option(names = {HELP_OPTION, HELP_OPTION_SHORT}, description = 
HELP_OPTION_DESC, usageHelp = true, order = HELP_OPTION_ORDER)
-    protected boolean usageHelpRequested;
+    private boolean usageHelpRequested;
 
     /** Verbose option specification. */
     @Option(names = {VERBOSE_OPTION, VERBOSE_OPTION_SHORT}, description = 
VERBOSE_OPTION_DESC, order = VERBOSE_OPTION_ORDER)
@@ -68,4 +72,28 @@ public abstract class BaseCommand {
     /** Instance of picocli command specification. */
     @Spec
     protected CommandSpec spec;
+
+    /**
+     * Sets output printers and verbosity flag and runs the pipeline.
+     *
+     * @param pipelineBuilder Pipeline builder.
+     * @return Exit code.
+     */
+    protected <I extends CallInput, O> int 
runPipeline(CallExecutionPipelineBuilder<I, O> pipelineBuilder) {
+        return pipelineBuilder
+                .output(spec.commandLine().getOut())
+                .errOutput(spec.commandLine().getErr())
+                .verbose(verbose)
+                .build()
+                .runPipeline();
+    }
+
+    /**
+     * Sets verbosity flag and starts the flow.
+     *
+     * @param flowBuilder Flow builder.
+     */
+    protected <I, O> void runFlow(FlowBuilder<I, O> flowBuilder) {
+        flowBuilder.verbose(verbose).start();
+    }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/Options.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/Options.java
index c3d1c61025..0202049181 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/Options.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/Options.java
@@ -139,7 +139,7 @@ public enum Options {
         public static final String VERBOSE_OPTION_SHORT = "-v";
 
         /** Verbose option description. */
-        public static final String VERBOSE_OPTION_DESC = "Show additional 
information: logs, REST calls."
+        public static final String VERBOSE_OPTION_DESC = "Show additional 
information: logs, REST calls. "
                 + "This flag is useful for debugging";
 
         /** Help option long name. */
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigGetCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigGetCommand.java
index 1250fd62b6..3e0024a6c7 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigGetCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigGetCommand.java
@@ -44,14 +44,10 @@ public class CliConfigGetCommand extends BaseCommand 
implements Callable<Integer
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(CliConfigGetCallInput.builder()
                         .key(key)
                         .profileName(profileName.getProfileName())::build)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigGetReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigGetReplCommand.java
index c164b2a600..cc71e72f47 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigGetReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigGetReplCommand.java
@@ -39,13 +39,9 @@ public class CliConfigGetReplCommand extends BaseCommand 
implements Callable<Int
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(CliConfigGetCallInput.builder()
                         .key(key)::build)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigRemoveCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigRemoveCommand.java
index 01c6fda5f8..35a32e7266 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigRemoveCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigRemoveCommand.java
@@ -42,14 +42,10 @@ public class CliConfigRemoveCommand extends BaseCommand 
implements Callable<Inte
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(CliConfigRemoveCallInput.builder()
                         .key(key)
                         .profileName(profileName.getProfileName())::build)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigRemoveReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigRemoveReplCommand.java
index 7fc6f96463..5828105fd7 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigRemoveReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigRemoveReplCommand.java
@@ -37,13 +37,9 @@ public class CliConfigRemoveReplCommand extends BaseCommand 
implements Callable<
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(CliConfigRemoveCallInput.builder()
                         .key(key)::build)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigSetCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigSetCommand.java
index 0425cb528e..6f605c5614 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigSetCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigSetCommand.java
@@ -45,14 +45,10 @@ public class CliConfigSetCommand extends BaseCommand 
implements Callable<Integer
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(CliConfigSetCallInput.builder()
                         .parameters(parameters)
                         .profileName(profileName.getProfileName())::build)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigSetReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigSetReplCommand.java
index cc51a5da31..15c3befd1f 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigSetReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigSetReplCommand.java
@@ -40,13 +40,9 @@ public class CliConfigSetReplCommand extends BaseCommand 
implements Callable<Int
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(CliConfigSetCallInput.builder()
                         .parameters(parameters)::build)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigShowCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigShowCommand.java
index a6dd786cee..0248d73e87 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigShowCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigShowCommand.java
@@ -41,13 +41,9 @@ public class CliConfigShowCommand extends BaseCommand 
implements Callable<Intege
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> new 
StringCallInput(profileName.getProfileName()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 .decorator(new ProfileDecorator())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigShowReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigShowReplCommand.java
index a10b012d3e..063ddc0ab7 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigShowReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/CliConfigShowReplCommand.java
@@ -36,13 +36,9 @@ public class CliConfigShowReplCommand extends BaseCommand 
implements Callable<In
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(StringCallInput::new)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 .decorator(new ProfileDecorator())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileActivateCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileActivateCommand.java
index a5b5feeb88..ba3b751987 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileActivateCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileActivateCommand.java
@@ -39,11 +39,8 @@ public class CliConfigProfileActivateCommand extends 
BaseCommand implements Call
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> new StringCallInput(profileName))
-                .errOutput(spec.commandLine().getErr())
-                .output(spec.commandLine().getOut())
-                .verbose(verbose)
-                .build().runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileCreateCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileCreateCommand.java
index 7223090d42..384e9de5bc 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileCreateCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileCreateCommand.java
@@ -51,14 +51,11 @@ public class CliConfigProfileCreateCommand extends 
BaseCommand implements Callab
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(CliConfigProfileCreateCallInput.builder()
                         .profileName(profileName)
                         .copyFrom(copyFrom)
                         .activate(activate)::build)
-                .errOutput(spec.commandLine().getErr())
-                .output(spec.commandLine().getOut())
-                .verbose(verbose)
-                .build().runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileListCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileListCommand.java
index f18dda2c1b..912caa1ce1 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileListCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileListCommand.java
@@ -37,13 +37,9 @@ public class CliConfigProfileListCommand extends BaseCommand 
implements Callable
 
     @Override
     public Integer call() throws Exception {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(EmptyCallInput::new)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 .decorator(new ProfileListDecorator())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileShowCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileShowCommand.java
index b1f21bb8a3..0a722a8537 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileShowCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cliconfig/profile/CliConfigProfileShowCommand.java
@@ -36,12 +36,8 @@ public class CliConfigProfileShowCommand extends BaseCommand 
implements Callable
 
     @Override
     public Integer call() throws Exception {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(EmptyCallInput::new)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigShowCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigShowCommand.java
index 4ba949887d..11f59c2818 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigShowCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigShowCommand.java
@@ -54,15 +54,11 @@ public class ClusterConfigShowCommand extends BaseCommand 
implements Callable<In
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(this::buildCallInput)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 .decorator(format.decorator())
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
show cluster config"))
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 
     private ClusterConfigShowCallInput buildCallInput() {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigShowReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigShowReplCommand.java
index f9eda6a7ac..6092bfe280 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigShowReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigShowReplCommand.java
@@ -58,13 +58,12 @@ public class ClusterConfigShowReplCommand extends 
BaseCommand implements Runnabl
 
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+        runFlow(question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
                 .map(this::configShowCallInput)
                 .then(Flows.fromCall(call))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 show cluster config"))
-                .verbose(verbose)
                 .print(format.decorator())
-                .start();
+        );
     }
 
     private ClusterConfigShowCallInput configShowCallInput(String clusterUrl) {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigUpdateCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigUpdateCommand.java
index 49135925f8..5aa22925ec 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigUpdateCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigUpdateCommand.java
@@ -48,14 +48,10 @@ public class ClusterConfigUpdateCommand extends BaseCommand 
implements Callable<
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(this::buildCallInput)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
update cluster config"))
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 
     private ClusterConfigUpdateCallInput buildCallInput() {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigUpdateReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigUpdateReplCommand.java
index 2021a7c9b3..f9ea49a1b7 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigUpdateReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/config/ClusterConfigUpdateReplCommand.java
@@ -51,13 +51,12 @@ public class ClusterConfigUpdateReplCommand extends 
BaseCommand implements Runna
     /** {@inheritDoc} */
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+        runFlow(question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
                 .map(this::configUpdateCallInput)
                 .then(Flows.fromCall(call))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 update cluster config"))
-                .verbose(verbose)
                 .print()
-                .start();
+        );
     }
 
     private ClusterConfigUpdateCallInput configUpdateCallInput(String 
clusterUrl) {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/init/ClusterInitCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/init/ClusterInitCommand.java
index b41390fb11..63b0faac3c 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/init/ClusterInitCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/init/ClusterInitCommand.java
@@ -45,13 +45,9 @@ public class ClusterInitCommand extends BaseCommand 
implements Callable<Integer>
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(this::buildCallInput)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 
     private ClusterInitCallInput buildCallInput() {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/init/ClusterInitReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/init/ClusterInitReplCommand.java
index 26bcec55a9..b59616ddd6 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/init/ClusterInitReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/init/ClusterInitReplCommand.java
@@ -49,12 +49,11 @@ public class ClusterInitReplCommand extends BaseCommand 
implements Runnable {
     /** {@inheritDoc} */
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+        runFlow(question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
                 .map(this::buildCallInput)
                 .then(Flows.fromCall(call))
-                .verbose(verbose)
                 .print()
-                .start();
+        );
     }
 
     private ClusterInitCallInput buildCallInput(String clusterUrl) {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/status/ClusterStatusCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/status/ClusterStatusCommand.java
index 8f29593675..20a62c8473 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/status/ClusterStatusCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/status/ClusterStatusCommand.java
@@ -45,13 +45,9 @@ public class ClusterStatusCommand extends BaseCommand 
implements Callable<Intege
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> new 
UrlCallInput(clusterUrl.getClusterUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 .decorator(new ClusterStatusDecorator())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/status/ClusterStatusReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/status/ClusterStatusReplCommand.java
index 6d86df83ed..94fb46835c 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/status/ClusterStatusReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/status/ClusterStatusReplCommand.java
@@ -45,11 +45,10 @@ public class ClusterStatusReplCommand extends BaseCommand 
implements Runnable {
     /** {@inheritDoc} */
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+        runFlow(question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
                 .map(UrlCallInput::new)
                 .then(Flows.fromCall(call))
-                .verbose(verbose)
                 .print()
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/LogicalTopologyCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/LogicalTopologyCommand.java
index b9fd71b1cd..b4c13d6d82 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/LogicalTopologyCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/LogicalTopologyCommand.java
@@ -51,14 +51,10 @@ public class LogicalTopologyCommand extends BaseCommand 
implements Callable<Inte
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> new 
UrlCallInput(clusterUrl.getClusterUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 .decorator(new TopologyDecorator(plain))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
show logical topology"))
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/LogicalTopologyReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/LogicalTopologyReplCommand.java
index 4739dc4a50..59f49dfc26 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/LogicalTopologyReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/LogicalTopologyReplCommand.java
@@ -54,11 +54,11 @@ public class LogicalTopologyReplCommand extends BaseCommand 
implements Runnable
     /** {@inheritDoc} */
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+        runFlow(question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
                 .map(UrlCallInput::new)
                 .then(Flows.fromCall(call))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 show logical topology"))
                 .print(new TopologyDecorator(plain))
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/PhysicalTopologyCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/PhysicalTopologyCommand.java
index 51e6b81a5b..ed136a6dff 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/PhysicalTopologyCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/PhysicalTopologyCommand.java
@@ -50,13 +50,9 @@ public class PhysicalTopologyCommand extends BaseCommand 
implements Callable<Int
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> new 
UrlCallInput(clusterUrl.getClusterUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 .decorator(new TopologyDecorator(plain))
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/PhysicalTopologyReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/PhysicalTopologyReplCommand.java
index dbae939d88..7b9d264a77 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/PhysicalTopologyReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/topology/PhysicalTopologyReplCommand.java
@@ -53,10 +53,10 @@ public class PhysicalTopologyReplCommand extends 
BaseCommand implements Runnable
     /** {@inheritDoc} */
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+        runFlow(question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
                 .map(UrlCallInput::new)
                 .then(Flows.fromCall(call))
                 .print(new TopologyDecorator(plain))
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitDeployCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitDeployCommand.java
index 357dc39ac4..5f9d13ce75 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitDeployCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitDeployCommand.java
@@ -43,12 +43,9 @@ public class ClusterUnitDeployCommand extends BaseCommand 
implements Callable<In
 
     @Override
     public Integer call() throws Exception {
-        return CallExecutionPipeline.asyncBuilder(callFactory::create)
+        return 
runPipeline(CallExecutionPipeline.asyncBuilder(callFactory::create)
                 .inputProvider(() -> 
options.toDeployUnitCallInput(clusterUrl.getClusterUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
deploy unit"))
-                .build().runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitDeployReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitDeployReplCommand.java
index e7e0b23f1b..b9901d8c60 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitDeployReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitDeployReplCommand.java
@@ -50,13 +50,10 @@ public class ClusterUnitDeployReplCommand extends 
BaseCommand implements Runnabl
         question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
                 .map(options::toDeployUnitCallInput)
                 .then(Flows.mono(input ->
-                    CallExecutionPipeline.asyncBuilder(callFactory::create)
+                    
runPipeline(CallExecutionPipeline.asyncBuilder(callFactory::create)
                             .inputProvider(() -> input)
-                            .output(spec.commandLine().getOut())
-                            .errOutput(spec.commandLine().getErr())
-                            .verbose(verbose)
                             
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 deploy unit"))
-                            .build().runPipeline()
+                    )
                 ))
                 .start();
     }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitListCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitListCommand.java
index dea9d7a16b..03bb5a011c 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitListCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitListCommand.java
@@ -48,17 +48,14 @@ public class ClusterUnitListCommand extends BaseCommand 
implements Callable<Inte
     private boolean plain;
 
     @Inject
-    private ClusterListUnitCall listUnitCall;
+    private ClusterListUnitCall call;
 
     @Override
     public Integer call() throws Exception {
-        return CallExecutionPipeline.builder(listUnitCall)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> 
listOptions.toListUnitCallInput(clusterUrl.getClusterUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
                 .decorator(new UnitListDecorator(plain))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
list units"))
-                .build().runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitListReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitListReplCommand.java
index 5b696ca297..2eb0521545 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitListReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitListReplCommand.java
@@ -55,12 +55,11 @@ public class ClusterUnitListReplCommand extends BaseCommand 
implements Runnable
 
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+        runFlow(question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
                 .map(listOptions::toListUnitCallInput)
                 .then(Flows.fromCall(call))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 list units"))
-                .verbose(verbose)
                 .print(new UnitListDecorator(plain))
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitUndeployCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitUndeployCommand.java
index 3502e9cdf9..fd35c29544 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitUndeployCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitUndeployCommand.java
@@ -50,20 +50,17 @@ public class ClusterUnitUndeployCommand extends BaseCommand 
implements Callable<
     private String version;
 
     @Inject
-    private UndeployUnitCall undeployUnitCall;
+    private UndeployUnitCall call;
 
     @Override
     public Integer call() throws Exception {
-        return CallExecutionPipeline.builder(undeployUnitCall)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> UndeployUnitCallInput.builder()
                         .id(id)
                         .version(version)
                         .clusterUrl(clusterUrl.getClusterUrl())
                         .build())
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
undeploy unit"))
-                .build().runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitUndeployReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitUndeployReplCommand.java
index 870301372b..85acdd3a72 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitUndeployReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/cluster/unit/ClusterUnitUndeployReplCommand.java
@@ -57,7 +57,7 @@ public class ClusterUnitUndeployReplCommand extends 
BaseCommand implements Runna
 
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+        runFlow(question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
                 .map(clusterUrl -> UndeployUnitCallInput.builder()
                         .id(id)
                         .version(version)
@@ -65,8 +65,7 @@ public class ClusterUnitUndeployReplCommand extends 
BaseCommand implements Runna
                         .build())
                 .then(Flows.fromCall(call))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 undeploy unit"))
-                .verbose(verbose)
                 .print()
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/connect/ConnectCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/connect/ConnectCommand.java
index fff3f0ca88..12ea9c43db 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/connect/ConnectCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/connect/ConnectCommand.java
@@ -58,13 +58,9 @@ public class ConnectCommand extends BaseCommand implements 
Callable<Integer> {
         // We need to do this before the connect call since it will fire 
events even before repl start.
         replManager.subscribe();
 
-        int exitCode = CallExecutionPipeline.builder(connectCall)
+        int exitCode = runPipeline(CallExecutionPipeline.builder(connectCall)
                 .inputProvider(this::connectCallInput)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
         if (exitCode == 0) {
             replManager.startReplMode();
         }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/connect/ConnectReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/connect/ConnectReplCommand.java
index 01d221bc7f..7ae3325a71 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/connect/ConnectReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/connect/ConnectReplCommand.java
@@ -54,11 +54,10 @@ public class ConnectReplCommand extends BaseCommand 
implements Runnable {
     /** {@inheritDoc} */
     @Override
     public void run() {
-        question.askQuestionIfConnected(connectCallInput(nodeUrl.toString()))
+        
runFlow(question.askQuestionIfConnected(connectCallInput(nodeUrl.toString()))
                 .then(Flows.fromCall(connectCall))
-                .verbose(verbose)
                 .print()
-                .start();
+        );
     }
 
     private ConnectCallInput connectCallInput(String nodeUrl) {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/connect/DisconnectCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/connect/DisconnectCommand.java
index 4b35ae269b..4b06b6af1d 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/connect/DisconnectCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/connect/DisconnectCommand.java
@@ -30,17 +30,13 @@ import picocli.CommandLine.Command;
 @Command(name = "disconnect", description = "Disconnects from Ignite 3 node")
 public class DisconnectCommand extends BaseCommand implements Runnable {
     @Inject
-    private DisconnectCall disconnectCall;
+    private DisconnectCall call;
 
     /** {@inheritDoc} */
     @Override
     public void run() {
-        CallExecutionPipeline.builder(disconnectCall)
+        runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(EmptyCallInput::new)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigShowCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigShowCommand.java
index fae17ff388..1e2c64b683 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigShowCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigShowCommand.java
@@ -51,14 +51,10 @@ public class NodeConfigShowCommand extends BaseCommand 
implements Callable<Integ
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(this::buildCallInput)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 .decorator(format.decorator())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 
     private NodeConfigShowCallInput buildCallInput() {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigShowReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigShowReplCommand.java
index 90496bd2db..6fcfdc7528 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigShowReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigShowReplCommand.java
@@ -54,12 +54,11 @@ public class NodeConfigShowReplCommand extends BaseCommand 
implements Runnable {
     /** {@inheritDoc} */
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
+        runFlow(question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
                 .map(this::nodeConfigShowCallInput)
                 .then(Flows.fromCall(call))
-                .verbose(verbose)
                 .print(format.decorator())
-                .start();
+        );
     }
 
     private NodeConfigShowCallInput nodeConfigShowCallInput(String nodeUrl) {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigUpdateCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigUpdateCommand.java
index 68239b5bea..c98aec2be3 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigUpdateCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigUpdateCommand.java
@@ -47,13 +47,9 @@ public class NodeConfigUpdateCommand extends BaseCommand 
implements Callable<Int
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(this::buildCallInput)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 
     private NodeConfigUpdateCallInput buildCallInput() {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigUpdateReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigUpdateReplCommand.java
index 2133c98421..56ffbc8839 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigUpdateReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/config/NodeConfigUpdateReplCommand.java
@@ -50,12 +50,11 @@ public class NodeConfigUpdateReplCommand extends 
BaseCommand implements Runnable
     /** {@inheritDoc} */
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
+        runFlow(question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
                 .map(this::nodeConfigUpdateCallInput)
                 .then(Flows.fromCall(call))
-                .verbose(verbose)
                 .print()
-                .start();
+        );
     }
 
     private NodeConfigUpdateCallInput nodeConfigUpdateCallInput(String 
nodeUrl) {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSetListCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSetListCommand.java
index da07db099a..e985755e71 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSetListCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSetListCommand.java
@@ -49,14 +49,10 @@ public class NodeMetricSetListCommand extends BaseCommand 
implements Callable<In
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> new UrlCallInput(nodeUrl.getNodeUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 .decorator(new MetricSetListDecorator(plain))
-                .verbose(verbose)
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
list metrics"))
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSetListReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSetListReplCommand.java
index 2b6d9ac1e9..2aaa3acc99 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSetListReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSetListReplCommand.java
@@ -52,11 +52,11 @@ public class NodeMetricSetListReplCommand extends 
BaseCommand implements Runnabl
     /** {@inheritDoc} */
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
+        runFlow(question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
                 .map(UrlCallInput::new)
                 .then(Flows.fromCall(call))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 list metrics"))
                 .print(new MetricSetListDecorator(plain))
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceDisableCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceDisableCommand.java
index 27fd4bde0d..451dda9fe4 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceDisableCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceDisableCommand.java
@@ -44,13 +44,9 @@ public class NodeMetricSourceDisableCommand extends 
BaseCommand implements Calla
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> 
metricSource.buildDisableCallInput(nodeUrl.getNodeUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
disable metrics"))
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceDisableReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceDisableReplCommand.java
index d485e067b2..ca1183879d 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceDisableReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceDisableReplCommand.java
@@ -46,12 +46,11 @@ public class NodeMetricSourceDisableReplCommand extends 
BaseCommand implements R
 
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
+        runFlow(question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
                 .map(metricSource::buildDisableCallInput)
                 .then(Flows.fromCall(call))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 disable metrics"))
-                .verbose(verbose)
                 .print()
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceEnableCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceEnableCommand.java
index 6e4ef57512..eb03401dc8 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceEnableCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceEnableCommand.java
@@ -44,13 +44,9 @@ public class NodeMetricSourceEnableCommand extends 
BaseCommand implements Callab
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> 
metricSource.buildEnableCallInput(nodeUrl.getNodeUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
enable metrics"))
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceEnableReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceEnableReplCommand.java
index 9f47a2d270..923d2088e2 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceEnableReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceEnableReplCommand.java
@@ -46,12 +46,11 @@ public class NodeMetricSourceEnableReplCommand extends 
BaseCommand implements Ru
 
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
+        runFlow(question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
                 .map(metricSource::buildEnableCallInput)
                 .then(Flows.fromCall(call))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 enable metrics"))
-                .verbose(verbose)
                 .print()
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceListCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceListCommand.java
index abd7d27fc7..d9a1ce951c 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceListCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceListCommand.java
@@ -49,14 +49,10 @@ public class NodeMetricSourceListCommand extends 
BaseCommand implements Callable
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> new UrlCallInput(nodeUrl.getNodeUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 .decorator(new MetricSourceListDecorator(plain))
-                .verbose(verbose)
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
list metrics"))
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceListReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceListReplCommand.java
index a14f4b1bfd..93b9046187 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceListReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/metric/NodeMetricSourceListReplCommand.java
@@ -52,11 +52,11 @@ public class NodeMetricSourceListReplCommand extends 
BaseCommand implements Runn
     /** {@inheritDoc} */
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
+        runFlow(question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
                 .map(UrlCallInput::new)
                 .then(Flows.fromCall(call))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 list metrics"))
                 .print(new MetricSourceListDecorator(plain))
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/status/NodeStatusCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/status/NodeStatusCommand.java
index ae27e8ea99..c9ff89729d 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/status/NodeStatusCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/status/NodeStatusCommand.java
@@ -43,13 +43,9 @@ public class NodeStatusCommand extends BaseCommand 
implements Callable<Integer>
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> new UrlCallInput(nodeUrl.getNodeUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 .decorator(new NodeStatusDecorator())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/status/NodeStatusReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/status/NodeStatusReplCommand.java
index fb1c202568..7f96372535 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/status/NodeStatusReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/status/NodeStatusReplCommand.java
@@ -45,11 +45,10 @@ public class NodeStatusReplCommand extends BaseCommand 
implements Runnable {
     /** {@inheritDoc} */
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
+        runFlow(question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
                 .map(UrlCallInput::new)
                 .then(Flows.fromCall(call))
-                .verbose(verbose)
                 .print()
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/unit/NodeUnitListCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/unit/NodeUnitListCommand.java
index 27ccc9d349..bacb5eba28 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/unit/NodeUnitListCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/unit/NodeUnitListCommand.java
@@ -48,17 +48,14 @@ public class NodeUnitListCommand extends BaseCommand 
implements Callable<Integer
     private boolean plain;
 
     @Inject
-    private NodeListUnitCall listUnitCall;
+    private NodeListUnitCall call;
 
     @Override
     public Integer call() throws Exception {
-        return CallExecutionPipeline.builder(listUnitCall)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> 
listOptions.toListUnitCallInput(nodeUrl.getNodeUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
                 .decorator(new UnitListDecorator(plain))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
list units"))
-                .build().runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/unit/NodeUnitListReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/unit/NodeUnitListReplCommand.java
index a4ad93a99e..8608ee5685 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/unit/NodeUnitListReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/unit/NodeUnitListReplCommand.java
@@ -54,12 +54,11 @@ public class NodeUnitListReplCommand extends BaseCommand 
implements Runnable {
 
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
+        runFlow(question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
                 .map(listOptions::toListUnitCallInput)
                 .then(Flows.fromCall(call))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 list units"))
-                .verbose(verbose)
                 .print(new UnitListDecorator(plain))
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/version/NodeVersionCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/version/NodeVersionCommand.java
index a2bd318e71..84d9bd7b30 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/version/NodeVersionCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/version/NodeVersionCommand.java
@@ -34,17 +34,13 @@ public class NodeVersionCommand extends BaseCommand 
implements Callable<Integer>
     private NodeUrlProfileMixin nodeUrl;
 
     @Inject
-    private NodeVersionCall nodeVersionCall;
+    private NodeVersionCall call;
 
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(nodeVersionCall)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> new UrlCallInput(nodeUrl.getNodeUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/version/NodeVersionReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/version/NodeVersionReplCommand.java
index 69006b9175..97bea114bd 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/version/NodeVersionReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/node/version/NodeVersionReplCommand.java
@@ -42,11 +42,10 @@ public class NodeVersionReplCommand extends BaseCommand 
implements Runnable {
     /** {@inheritDoc} */
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
+        runFlow(question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
                 .map(UrlCallInput::new)
                 .then(Flows.fromCall(nodeVersionCall))
-                .verbose(verbose)
                 .print()
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ResetPartitionsCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ResetPartitionsCommand.java
index 088c75b43d..62d4885429 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ResetPartitionsCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ResetPartitionsCommand.java
@@ -43,13 +43,9 @@ public class ResetPartitionsCommand extends BaseCommand 
implements Callable<Inte
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> ResetPartitionsCallInput.of(options, 
clusterUrl.getClusterUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
reset partitions"))
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ResetPartitionsReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ResetPartitionsReplCommand.java
index e7efe8a1a9..34f5296ce9 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ResetPartitionsReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/reset/ResetPartitionsReplCommand.java
@@ -46,12 +46,11 @@ public class ResetPartitionsReplCommand extends BaseCommand 
implements Runnable
 
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+        runFlow(question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
                 .map(url -> ResetPartitionsCallInput.of(options, url))
                 .then(Flows.fromCall(call))
-                .verbose(verbose)
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 reset partitions"))
                 .print()
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/RestartPartitionsCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/RestartPartitionsCommand.java
index accf4a4063..f71e5a5dd0 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/RestartPartitionsCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/RestartPartitionsCommand.java
@@ -43,13 +43,9 @@ public class RestartPartitionsCommand extends BaseCommand 
implements Callable<In
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> RestartPartitionsCallInput.of(options, 
clusterUrl.getClusterUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .verbose(verbose)
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
restart partitions"))
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/RestartPartitionsReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/RestartPartitionsReplCommand.java
index d28f100e89..8444e66505 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/RestartPartitionsReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/restart/RestartPartitionsReplCommand.java
@@ -46,12 +46,11 @@ public class RestartPartitionsReplCommand extends 
BaseCommand implements Runnabl
 
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+        runFlow(question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
                 .map(url -> RestartPartitionsCallInput.of(options, url))
                 .then(Flows.fromCall(call))
-                .verbose(verbose)
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 restart partitions"))
                 .print()
-                .start();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/states/PartitionStatesCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/states/PartitionStatesCommand.java
index cfcceeee91..27fd149099 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/states/PartitionStatesCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/states/PartitionStatesCommand.java
@@ -44,14 +44,10 @@ public class PartitionStatesCommand extends BaseCommand 
implements Callable<Inte
 
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(call)
+        return runPipeline(CallExecutionPipeline.builder(call)
                 .inputProvider(() -> PartitionStatesCallInput.of(options, 
clusterUrl.getClusterUrl()))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
                 .decorator(new TableDecorator(options.plain()))
-                .verbose(verbose)
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createHandler("Cannot 
list partition states"))
-                .build()
-                .runPipeline();
+        );
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/states/PartitionStatesReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/states/PartitionStatesReplCommand.java
index 6905585508..dcb355832d 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/states/PartitionStatesReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/recovery/partitions/states/PartitionStatesReplCommand.java
@@ -47,13 +47,12 @@ public class PartitionStatesReplCommand extends BaseCommand 
implements Runnable
 
     @Override
     public void run() {
-        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+        runFlow(question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
                 .map(url -> PartitionStatesCallInput.of(options, url))
                 .then(Flows.fromCall(call))
-                .print(new TableDecorator(options.plain()))
                 
.exceptionHandler(ClusterNotInitializedExceptionHandler.createReplHandler("Cannot
 list partition states"))
-                .verbose(verbose)
-                .start();
+                .print(new TableDecorator(options.plain()))
+        );
     }
 
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlCommand.java
index 8666d78482..a730b10109 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/commands/sql/SqlCommand.java
@@ -82,13 +82,10 @@ public class SqlCommand extends BaseCommand implements 
Callable<Integer> {
     public Integer call() {
         try (SqlManager sqlManager = new SqlManager(jdbc)) {
             String executeCommand = execOptions.file != null ? 
extract(execOptions.file) : execOptions.command;
-            return CallExecutionPipeline.builder(new SqlQueryCall(sqlManager))
+            return runPipeline(CallExecutionPipeline.builder(new 
SqlQueryCall(sqlManager))
                     .inputProvider(() -> new StringCallInput(executeCommand))
-                    .output(spec.commandLine().getOut())
-                    .errOutput(spec.commandLine().getErr())
                     .decorator(new SqlQueryResultDecorator(plain))
-                    .verbose(verbose)
-                    .build().runPipeline();
+            );
         } catch (SQLException e) {
             ExceptionWriter exceptionWriter = 
ExceptionWriter.fromPrintWriter(spec.commandLine().getErr());
             return new SqlExceptionHandler().handle(exceptionWriter, e);
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java
index 9279bff18b..cf13110659 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/AsyncCallExecutionPipelineBuilder.java
@@ -33,7 +33,7 @@ import 
org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHan
 import org.apache.ignite.internal.cli.decorators.DefaultDecorator;
 
 /** Builder for {@link AsyncCallExecutionPipeline}. */
-public class AsyncCallExecutionPipelineBuilder<I extends CallInput, T> {
+public class AsyncCallExecutionPipelineBuilder<I extends CallInput, T> 
implements CallExecutionPipelineBuilder<I, T> {
 
     private final Function<ProgressTracker, AsyncCall<I, T>> callFactory;
 
@@ -76,6 +76,7 @@ public class AsyncCallExecutionPipelineBuilder<I extends 
CallInput, T> {
         return this;
     }
 
+    @Override
     public AsyncCallExecutionPipelineBuilder<I, T> output(PrintWriter output) {
         this.output = output;
         return this;
@@ -85,6 +86,7 @@ public class AsyncCallExecutionPipelineBuilder<I extends 
CallInput, T> {
         return output(wrapOutputStream(output));
     }
 
+    @Override
     public AsyncCallExecutionPipelineBuilder<I, T> errOutput(PrintWriter 
errOutput) {
         this.errOutput = errOutput;
         return this;
@@ -109,6 +111,7 @@ public class AsyncCallExecutionPipelineBuilder<I extends 
CallInput, T> {
         return this;
     }
 
+    @Override
     public AsyncCallExecutionPipelineBuilder<I, T> verbose(boolean verbose) {
         this.verbose = verbose;
         return this;
@@ -120,6 +123,7 @@ public class AsyncCallExecutionPipelineBuilder<I extends 
CallInput, T> {
     }
 
     /** Builds {@link AsyncCallExecutionPipeline}. */
+    @Override
     public CallExecutionPipeline<I, T> build() {
         return new AsyncCallExecutionPipeline<>(
                 callFactory, progressBarBuilder, output, errOutput, 
exceptionHandlers, decorator, inputProvider, verbose
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java
index a8fe9db3fb..2665640a9c 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipeline.java
@@ -27,8 +27,8 @@ public interface CallExecutionPipeline<I extends CallInput, 
T> {
      *
      * @return builder for {@link CallExecutionPipeline}.
      */
-    static <I extends CallInput, T> CallExecutionPipelineBuilder<I, T> 
builder(Call<I, T> call) {
-        return new CallExecutionPipelineBuilder<>(call);
+    static <I extends CallInput, T> SingleCallExecutionPipelineBuilder<I, T> 
builder(Call<I, T> call) {
+        return new SingleCallExecutionPipelineBuilder<>(call);
     }
 
     /** Builder helper method. */
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
index 321ac7e391..2515756ef9 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
@@ -17,91 +17,38 @@
 
 package org.apache.ignite.internal.cli.core.call;
 
-import java.io.OutputStream;
 import java.io.PrintWriter;
-import java.nio.charset.Charset;
-import java.util.function.Supplier;
-import org.apache.ignite.internal.cli.core.decorator.Decorator;
-import org.apache.ignite.internal.cli.core.decorator.TerminalOutput;
-import org.apache.ignite.internal.cli.core.exception.ExceptionHandler;
-import org.apache.ignite.internal.cli.core.exception.ExceptionHandlers;
-import 
org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHandlers;
-import org.apache.ignite.internal.cli.decorators.DefaultDecorator;
 
 /** Builder for {@link CallExecutionPipeline}. */
-public class CallExecutionPipelineBuilder<I extends CallInput, T> {
-
-    private final Call<I, T> call;
-
-    private final ExceptionHandlers exceptionHandlers = new 
DefaultExceptionHandlers();
-
-    private Supplier<I> inputProvider;
-
-    private PrintWriter output = wrapOutputStream(System.out);
-
-    private PrintWriter errOutput = wrapOutputStream(System.err);
-
-    private Decorator<T, TerminalOutput> decorator = new DefaultDecorator<>();
-
-    private boolean verbose;
-
-    CallExecutionPipelineBuilder(Call<I, T> call) {
-        this.call = call;
-    }
-
-    private static PrintWriter wrapOutputStream(OutputStream output) {
-        return new PrintWriter(output, true, getStdoutEncoding());
-    }
-
-    private static Charset getStdoutEncoding() {
-        String encoding = System.getProperty("sun.stdout.encoding");
-        return encoding != null ? Charset.forName(encoding) : 
Charset.defaultCharset();
-    }
-
-    public CallExecutionPipelineBuilder<I, T> inputProvider(Supplier<I> 
inputProvider) {
-        this.inputProvider = inputProvider;
-        return this;
-    }
-
-    public CallExecutionPipelineBuilder<I, T> output(PrintWriter output) {
-        this.output = output;
-        return this;
-    }
-
-    public CallExecutionPipelineBuilder<I, T> output(OutputStream output) {
-        return output(wrapOutputStream(output));
-    }
-
-    public CallExecutionPipelineBuilder<I, T> errOutput(PrintWriter errOutput) 
{
-        this.errOutput = errOutput;
-        return this;
-    }
-
-    public CallExecutionPipelineBuilder<I, T> errOutput(OutputStream output) {
-        return errOutput(wrapOutputStream(output));
-    }
-
-    public CallExecutionPipelineBuilder<I, T> 
exceptionHandler(ExceptionHandler<?> exceptionHandler) {
-        exceptionHandlers.addExceptionHandler(exceptionHandler);
-        return this;
-    }
-
-    public CallExecutionPipelineBuilder<I, T> 
exceptionHandlers(ExceptionHandlers exceptionHandlers) {
-        this.exceptionHandlers.addExceptionHandlers(exceptionHandlers);
-        return this;
-    }
-
-    public CallExecutionPipelineBuilder<I, T> decorator(Decorator<T, 
TerminalOutput> decorator) {
-        this.decorator = decorator;
-        return this;
-    }
-
-    public CallExecutionPipelineBuilder<I, T> verbose(boolean verbose) {
-        this.verbose = verbose;
-        return this;
-    }
-
-    public CallExecutionPipeline<I, T> build() {
-        return new SingleCallExecutionPipeline<>(call, output, errOutput, 
exceptionHandlers, decorator, inputProvider, verbose);
-    }
+public interface CallExecutionPipelineBuilder<I extends CallInput, T> {
+    /**
+     * Sets output {@link PrintWriter}.
+     *
+     * @param output Output print writer.
+     * @return Instance of the builder.
+     */
+    CallExecutionPipelineBuilder<I, T> output(PrintWriter output);
+
+    /**
+     * Sets error output {@link PrintWriter}.
+     *
+     * @param errOutput Error output print writer.
+     * @return Instance of the builder.
+     */
+    CallExecutionPipelineBuilder<I, T> errOutput(PrintWriter errOutput);
+
+    /**
+     * Sets verbosity status.
+     *
+     * @param verbose Verbosity status.
+     * @return Instance of the builder.
+     */
+    CallExecutionPipelineBuilder<I, T> verbose(boolean verbose);
+
+    /**
+     * Builds the pipeline.
+     *
+     * @return Constructed pipeline.
+     */
+    CallExecutionPipeline<I, T> build();
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/SingleCallExecutionPipelineBuilder.java
similarity index 74%
copy from 
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
copy to 
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/SingleCallExecutionPipelineBuilder.java
index 321ac7e391..7ef9967cdd 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/CallExecutionPipelineBuilder.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/call/SingleCallExecutionPipelineBuilder.java
@@ -29,7 +29,7 @@ import 
org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHan
 import org.apache.ignite.internal.cli.decorators.DefaultDecorator;
 
 /** Builder for {@link CallExecutionPipeline}. */
-public class CallExecutionPipelineBuilder<I extends CallInput, T> {
+public class SingleCallExecutionPipelineBuilder<I extends CallInput, T> 
implements CallExecutionPipelineBuilder<I, T> {
 
     private final Call<I, T> call;
 
@@ -45,7 +45,7 @@ public class CallExecutionPipelineBuilder<I extends 
CallInput, T> {
 
     private boolean verbose;
 
-    CallExecutionPipelineBuilder(Call<I, T> call) {
+    SingleCallExecutionPipelineBuilder(Call<I, T> call) {
         this.call = call;
     }
 
@@ -58,49 +58,53 @@ public class CallExecutionPipelineBuilder<I extends 
CallInput, T> {
         return encoding != null ? Charset.forName(encoding) : 
Charset.defaultCharset();
     }
 
-    public CallExecutionPipelineBuilder<I, T> inputProvider(Supplier<I> 
inputProvider) {
+    public SingleCallExecutionPipelineBuilder<I, T> inputProvider(Supplier<I> 
inputProvider) {
         this.inputProvider = inputProvider;
         return this;
     }
 
-    public CallExecutionPipelineBuilder<I, T> output(PrintWriter output) {
+    @Override
+    public SingleCallExecutionPipelineBuilder<I, T> output(PrintWriter output) 
{
         this.output = output;
         return this;
     }
 
-    public CallExecutionPipelineBuilder<I, T> output(OutputStream output) {
+    public SingleCallExecutionPipelineBuilder<I, T> output(OutputStream 
output) {
         return output(wrapOutputStream(output));
     }
 
-    public CallExecutionPipelineBuilder<I, T> errOutput(PrintWriter errOutput) 
{
+    @Override
+    public SingleCallExecutionPipelineBuilder<I, T> errOutput(PrintWriter 
errOutput) {
         this.errOutput = errOutput;
         return this;
     }
 
-    public CallExecutionPipelineBuilder<I, T> errOutput(OutputStream output) {
+    public SingleCallExecutionPipelineBuilder<I, T> errOutput(OutputStream 
output) {
         return errOutput(wrapOutputStream(output));
     }
 
-    public CallExecutionPipelineBuilder<I, T> 
exceptionHandler(ExceptionHandler<?> exceptionHandler) {
+    public SingleCallExecutionPipelineBuilder<I, T> 
exceptionHandler(ExceptionHandler<?> exceptionHandler) {
         exceptionHandlers.addExceptionHandler(exceptionHandler);
         return this;
     }
 
-    public CallExecutionPipelineBuilder<I, T> 
exceptionHandlers(ExceptionHandlers exceptionHandlers) {
+    public SingleCallExecutionPipelineBuilder<I, T> 
exceptionHandlers(ExceptionHandlers exceptionHandlers) {
         this.exceptionHandlers.addExceptionHandlers(exceptionHandlers);
         return this;
     }
 
-    public CallExecutionPipelineBuilder<I, T> decorator(Decorator<T, 
TerminalOutput> decorator) {
+    public SingleCallExecutionPipelineBuilder<I, T> decorator(Decorator<T, 
TerminalOutput> decorator) {
         this.decorator = decorator;
         return this;
     }
 
-    public CallExecutionPipelineBuilder<I, T> verbose(boolean verbose) {
+    @Override
+    public SingleCallExecutionPipelineBuilder<I, T> verbose(boolean verbose) {
         this.verbose = verbose;
         return this;
     }
 
+    @Override
     public CallExecutionPipeline<I, T> build() {
         return new SingleCallExecutionPipeline<>(call, output, errOutput, 
exceptionHandlers, decorator, inputProvider, verbose);
     }


Reply via email to