This is an automated email from the ASF dual-hosted git repository.

sdanilov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9afa9a66e5 IGNITE-17564 Use user flow for all REPL commands (#1060)
9afa9a66e5 is described below

commit 9afa9a66e585dd784a64e6ad88b800bc25b59331
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Wed Sep 14 16:17:56 2022 +0300

    IGNITE-17564 Use user flow for all REPL commands (#1060)
---
 .../cli/call/cluster/status/ClusterStatusCall.java |  22 +--
 .../call/cluster/topology/LogicalTopologyCall.java |  15 +-
 .../cluster/topology/PhysicalTopologyCall.java     |  15 +-
 .../call/cluster/topology/TopologyCallInput.java   |  67 --------
 .../cli/call/node/status/NodeStatusCall.java       |  11 +-
 .../config/ClusterConfigShowReplCommand.java       |   8 +-
 .../config/ClusterConfigUpdateReplCommand.java     |   8 +-
 .../cluster/init/ClusterInitReplCommand.java       |  37 ++---
 .../cluster/status/ClusterStatusCommand.java       |   8 +-
 .../cluster/status/ClusterStatusReplCommand.java   |  36 ++--
 .../node/config/NodeConfigShowReplCommand.java     |   6 +-
 .../node/config/NodeConfigUpdateReplCommand.java   |   6 +-
 .../commands/node/status/NodeStatusCommand.java    |   8 +-
 .../node/status/NodeStatusReplCommand.java         |  41 ++---
 .../questions/ConnectToClusterQuestion.java        |   7 +-
 .../commands/topology/LogicalTopologyCommand.java  |   6 +-
 .../topology/LogicalTopologyReplCommand.java       |  39 ++---
 .../commands/topology/PhysicalTopologyCommand.java |   4 +-
 .../topology/PhysicalTopologyReplCommand.java      |  39 ++---
 .../ignite/cli/core/call/DefaultCallOutput.java    |   2 +-
 .../{StatusCallInput.java => UrlCallInput.java}    |  27 +--
 .../ignite/cli/core/exception/ExceptionWriter.java |   6 +-
 .../org/apache/ignite/cli/core/flow/Flowable.java  |   2 +-
 .../ignite/cli/core/flow/builder/FlowBuilder.java  |  78 ++++++++-
 .../cli/core/flow/builder/FlowBuilderImpl.java     | 102 +++++++-----
 .../apache/ignite/cli/core/flow/builder/Flows.java |   4 +-
 .../repl/context/CommandLineContextProvider.java   |  33 +++-
 .../cli/core/style/component/CommonMessages.java   |  35 ----
 .../cli/decorators/DefaultDecoratorRegistry.java   |   1 -
 .../cli/call/node/status/NodeStatusCallTest.java   |   4 +-
 .../ignite/cli/commands/ProfileMixinTest.java      |  15 +-
 .../apache/ignite/cli/commands/flow/FlowTest.java  | 184 ++++++++++++++++-----
 .../cli/commands/flow/TestExceptionHandler.java    |   9 +-
 33 files changed, 466 insertions(+), 419 deletions(-)

diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/status/ClusterStatusCall.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/status/ClusterStatusCall.java
index c5f258db5b..7a4848bbc4 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/status/ClusterStatusCall.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/status/ClusterStatusCall.java
@@ -20,11 +20,10 @@ package org.apache.ignite.cli.call.cluster.status;
 import jakarta.inject.Singleton;
 import 
org.apache.ignite.cli.call.cluster.status.ClusterStatus.ClusterStatusBuilder;
 import org.apache.ignite.cli.call.cluster.topology.PhysicalTopologyCall;
-import org.apache.ignite.cli.call.cluster.topology.TopologyCallInput;
 import org.apache.ignite.cli.core.call.Call;
 import org.apache.ignite.cli.core.call.CallOutput;
 import org.apache.ignite.cli.core.call.DefaultCallOutput;
-import org.apache.ignite.cli.core.call.StatusCallInput;
+import org.apache.ignite.cli.core.call.UrlCallInput;
 import org.apache.ignite.cli.core.exception.IgniteCliApiException;
 import org.apache.ignite.rest.client.api.ClusterManagementApi;
 import org.apache.ignite.rest.client.invoker.ApiClient;
@@ -35,7 +34,7 @@ import org.apache.ignite.rest.client.model.ClusterState;
  * Call to get cluster status.
  */
 @Singleton
-public class ClusterStatusCall implements Call<StatusCallInput, ClusterStatus> 
{
+public class ClusterStatusCall implements Call<UrlCallInput, ClusterStatus> {
 
     private final PhysicalTopologyCall physicalTopologyCall;
 
@@ -44,31 +43,32 @@ public class ClusterStatusCall implements 
Call<StatusCallInput, ClusterStatus> {
     }
 
     @Override
-    public CallOutput<ClusterStatus> execute(StatusCallInput input) {
+    public CallOutput<ClusterStatus> execute(UrlCallInput input) {
         ClusterStatusBuilder clusterStatusBuilder = ClusterStatus.builder();
+        String clusterUrl = input.getUrl();
         try {
-            ClusterState clusterState = 
fetchClusterState(input.getClusterUrl());
+            ClusterState clusterState = fetchClusterState(clusterUrl);
             clusterStatusBuilder
-                    .nodeCount(fetchNumberOfAllNodes(input.getClusterUrl()))
+                    .nodeCount(fetchNumberOfAllNodes(input))
                     .initialized(true)
                     .name(clusterState.getClusterTag().getClusterName())
                     .metadataStorageNodes(clusterState.getMsNodes())
                     .cmgNodes(clusterState.getCmgNodes());
         } catch (ApiException e) {
             if (e.getCode() == 404) { // NOT_FOUND means the cluster is not 
initialized yet
-                
clusterStatusBuilder.initialized(false).nodeCount(fetchNumberOfAllNodes(input.getClusterUrl()));
+                
clusterStatusBuilder.initialized(false).nodeCount(fetchNumberOfAllNodes(input));
             } else {
-                return DefaultCallOutput.failure(new IgniteCliApiException(e, 
input.getClusterUrl()));
+                return DefaultCallOutput.failure(new IgniteCliApiException(e, 
clusterUrl));
             }
         } catch (IllegalArgumentException e) {
-            return DefaultCallOutput.failure(new IgniteCliApiException(e, 
input.getClusterUrl()));
+            return DefaultCallOutput.failure(new IgniteCliApiException(e, 
clusterUrl));
         }
 
         return DefaultCallOutput.success(clusterStatusBuilder.build());
     }
 
-    private int fetchNumberOfAllNodes(String url) {
-        return 
physicalTopologyCall.execute(TopologyCallInput.builder().clusterUrl(url).build()).body().size();
+    private int fetchNumberOfAllNodes(UrlCallInput input) {
+        return physicalTopologyCall.execute(input).body().size();
     }
 
     private ClusterState fetchClusterState(String url) throws ApiException {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/topology/LogicalTopologyCall.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/topology/LogicalTopologyCall.java
index 7ef8ed5cf2..2ac6f08bdd 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/topology/LogicalTopologyCall.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/topology/LogicalTopologyCall.java
@@ -18,10 +18,10 @@
 package org.apache.ignite.cli.call.cluster.topology;
 
 import jakarta.inject.Singleton;
-import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.cli.core.call.Call;
 import org.apache.ignite.cli.core.call.CallOutput;
+import org.apache.ignite.cli.core.call.UrlCallInput;
 import org.apache.ignite.cli.core.exception.IgniteCliApiException;
 import org.apache.ignite.rest.client.api.TopologyApi;
 import org.apache.ignite.rest.client.invoker.ApiException;
@@ -32,19 +32,20 @@ import org.apache.ignite.rest.client.model.ClusterNode;
  * Shows logical cluster topology.
  */
 @Singleton
-public class LogicalTopologyCall implements Call<TopologyCallInput, 
List<ClusterNode>> {
+public class LogicalTopologyCall implements Call<UrlCallInput, 
List<ClusterNode>> {
 
     /** {@inheritDoc} */
     @Override
-    public CallOutput<List<ClusterNode>> execute(TopologyCallInput input) {
+    public CallOutput<List<ClusterNode>> execute(UrlCallInput input) {
+        String clusterUrl = input.getUrl();
         try {
-            return TopologyCallOutput.success(fetchLogicalTopology(input));
+            return 
TopologyCallOutput.success(fetchLogicalTopology(clusterUrl));
         } catch (ApiException | IllegalArgumentException e) {
-            return TopologyCallOutput.failure(new IgniteCliApiException(e, 
input.getClusterUrl()));
+            return TopologyCallOutput.failure(new IgniteCliApiException(e, 
clusterUrl));
         }
     }
 
-    private List<ClusterNode> fetchLogicalTopology(TopologyCallInput input) 
throws ApiException {
-        return new ArrayList<>(new 
TopologyApi(Configuration.getDefaultApiClient().setBasePath(input.getClusterUrl())).logical());
+    private List<ClusterNode> fetchLogicalTopology(String url) throws 
ApiException {
+        return new 
TopologyApi(Configuration.getDefaultApiClient().setBasePath(url)).logical();
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/topology/PhysicalTopologyCall.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/topology/PhysicalTopologyCall.java
index 8f1cc3f6de..b890c8deab 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/topology/PhysicalTopologyCall.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/topology/PhysicalTopologyCall.java
@@ -18,10 +18,10 @@
 package org.apache.ignite.cli.call.cluster.topology;
 
 import jakarta.inject.Singleton;
-import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.cli.core.call.Call;
 import org.apache.ignite.cli.core.call.CallOutput;
+import org.apache.ignite.cli.core.call.UrlCallInput;
 import org.apache.ignite.cli.core.exception.IgniteCliApiException;
 import org.apache.ignite.rest.client.api.TopologyApi;
 import org.apache.ignite.rest.client.invoker.ApiException;
@@ -32,19 +32,20 @@ import org.apache.ignite.rest.client.model.ClusterNode;
  * Shows physical cluster topology.
  */
 @Singleton
-public class PhysicalTopologyCall implements Call<TopologyCallInput, 
List<ClusterNode>> {
+public class PhysicalTopologyCall implements Call<UrlCallInput, 
List<ClusterNode>> {
 
     /** {@inheritDoc} */
     @Override
-    public CallOutput<List<ClusterNode>> execute(TopologyCallInput input) {
+    public CallOutput<List<ClusterNode>> execute(UrlCallInput input) {
+        String clusterUrl = input.getUrl();
         try {
-            return TopologyCallOutput.success(fetchPhysicalTopology(input));
+            return 
TopologyCallOutput.success(fetchPhysicalTopology(clusterUrl));
         } catch (ApiException | IllegalArgumentException e) {
-            return TopologyCallOutput.failure(new IgniteCliApiException(e, 
input.getClusterUrl()));
+            return TopologyCallOutput.failure(new IgniteCliApiException(e, 
clusterUrl));
         }
     }
 
-    private List<ClusterNode> fetchPhysicalTopology(TopologyCallInput input) 
throws ApiException {
-        return new ArrayList<>(new 
TopologyApi(Configuration.getDefaultApiClient().setBasePath(input.getClusterUrl())).physical());
+    private List<ClusterNode> fetchPhysicalTopology(String url) throws 
ApiException {
+        return new 
TopologyApi(Configuration.getDefaultApiClient().setBasePath(url)).physical();
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/topology/TopologyCallInput.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/topology/TopologyCallInput.java
deleted file mode 100644
index 4d9470d951..0000000000
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/call/cluster/topology/TopologyCallInput.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cli.call.cluster.topology;
-
-import org.apache.ignite.cli.core.call.CallInput;
-
-/**
- * Input for physical or logical topology call.
- */
-public class TopologyCallInput implements CallInput {
-    /**
-     * Cluster url.
-     */
-    private final String clusterUrl;
-
-    private TopologyCallInput(String clusterUrl) {
-        this.clusterUrl = clusterUrl;
-    }
-
-    /**
-     * Builder for {@link TopologyCallInput}.
-     */
-    public static TopologyCallInputBuilder builder() {
-        return new TopologyCallInputBuilder();
-    }
-
-    /**
-     * Get cluster URL.
-     *
-     * @return Cluster URL.
-     */
-    public String getClusterUrl() {
-        return clusterUrl;
-    }
-
-    /**
-     * Builder for {@link 
org.apache.ignite.cli.call.configuration.ClusterConfigShowCallInput}.
-     */
-    public static class TopologyCallInputBuilder {
-
-        private String clusterUrl;
-
-        public TopologyCallInputBuilder clusterUrl(String clusterUrl) {
-            this.clusterUrl = clusterUrl;
-            return this;
-        }
-
-        public TopologyCallInput build() {
-            return new TopologyCallInput(clusterUrl);
-        }
-    }
-}
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/call/node/status/NodeStatusCall.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/call/node/status/NodeStatusCall.java
index aadf34f095..be40786e53 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/call/node/status/NodeStatusCall.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/call/node/status/NodeStatusCall.java
@@ -21,7 +21,7 @@ import jakarta.inject.Singleton;
 import org.apache.ignite.cli.core.call.Call;
 import org.apache.ignite.cli.core.call.CallOutput;
 import org.apache.ignite.cli.core.call.DefaultCallOutput;
-import org.apache.ignite.cli.core.call.StatusCallInput;
+import org.apache.ignite.cli.core.call.UrlCallInput;
 import org.apache.ignite.cli.core.exception.IgniteCliApiException;
 import org.apache.ignite.rest.client.api.NodeManagementApi;
 import org.apache.ignite.rest.client.invoker.ApiClient;
@@ -32,12 +32,13 @@ import org.apache.ignite.rest.client.model.NodeState;
  * Call to get node status.
  */
 @Singleton
-public class NodeStatusCall implements Call<StatusCallInput, NodeStatus> {
+public class NodeStatusCall implements Call<UrlCallInput, NodeStatus> {
 
     @Override
-    public CallOutput<NodeStatus> execute(StatusCallInput input) {
+    public CallOutput<NodeStatus> execute(UrlCallInput input) {
+        String clusterUrl = input.getUrl();
         try {
-            NodeState nodeState = fetchNodeState(input.getClusterUrl());
+            NodeState nodeState = fetchNodeState(clusterUrl);
             return DefaultCallOutput.success(
                     NodeStatus.builder()
                             .name(nodeState.getName())
@@ -45,7 +46,7 @@ public class NodeStatusCall implements Call<StatusCallInput, 
NodeStatus> {
                             .build()
             );
         } catch (ApiException | IllegalArgumentException e) {
-            return DefaultCallOutput.failure(new IgniteCliApiException(e, 
input.getClusterUrl()));
+            return DefaultCallOutput.failure(new IgniteCliApiException(e, 
clusterUrl));
         }
     }
 
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/config/ClusterConfigShowReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/config/ClusterConfigShowReplCommand.java
index 78194e4dde..8fc5e0d65e 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/config/ClusterConfigShowReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/config/ClusterConfigShowReplCommand.java
@@ -24,7 +24,6 @@ import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.cluster.ClusterUrlMixin;
 import org.apache.ignite.cli.commands.questions.ConnectToClusterQuestion;
 import 
org.apache.ignite.cli.core.exception.handler.ClusterNotInitializedExceptionHandler;
-import org.apache.ignite.cli.core.flow.Flowable;
 import org.apache.ignite.cli.core.flow.builder.Flows;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Mixin;
@@ -56,12 +55,11 @@ public class ClusterConfigShowReplCommand extends 
BaseCommand implements Runnabl
     @Override
     public void run() {
         question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
-                .exceptionHandler(new 
ClusterNotInitializedExceptionHandler("Cannot show cluster config", "cluster 
init"))
                 .map(this::configShowCallInput)
                 .then(Flows.fromCall(call))
-                .toOutput(spec.commandLine().getOut(), 
spec.commandLine().getErr())
-                .build()
-                .start(Flowable.empty());
+                .exceptionHandler(new 
ClusterNotInitializedExceptionHandler("Cannot show cluster config", "cluster 
init"))
+                .print()
+                .start();
     }
 
     private ClusterConfigShowCallInput configShowCallInput(String clusterUrl) {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/config/ClusterConfigUpdateReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/config/ClusterConfigUpdateReplCommand.java
index db365f8be8..09772a536f 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/config/ClusterConfigUpdateReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/config/ClusterConfigUpdateReplCommand.java
@@ -24,7 +24,6 @@ import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.cluster.ClusterUrlMixin;
 import org.apache.ignite.cli.commands.questions.ConnectToClusterQuestion;
 import 
org.apache.ignite.cli.core.exception.handler.ClusterNotInitializedExceptionHandler;
-import org.apache.ignite.cli.core.flow.Flowable;
 import org.apache.ignite.cli.core.flow.builder.Flows;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Mixin;
@@ -53,12 +52,11 @@ public class ClusterConfigUpdateReplCommand extends 
BaseCommand implements Runna
     @Override
     public void run() {
         question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
-                .exceptionHandler(new 
ClusterNotInitializedExceptionHandler("Cannot update cluster config", "cluster 
init"))
                 .map(this::configUpdateCallInput)
                 .then(Flows.fromCall(call))
-                .toOutput(spec.commandLine().getOut(), 
spec.commandLine().getErr())
-                .build()
-                .start(Flowable.empty());
+                .exceptionHandler(new 
ClusterNotInitializedExceptionHandler("Cannot update cluster config", "cluster 
init"))
+                .print()
+                .start();
     }
 
     private ClusterConfigUpdateCallInput configUpdateCallInput(String 
clusterUrl) {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/init/ClusterInitReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/init/ClusterInitReplCommand.java
index 4db8550579..a186d6a0fc 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/init/ClusterInitReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/init/ClusterInitReplCommand.java
@@ -17,17 +17,15 @@
 
 package org.apache.ignite.cli.commands.cluster.init;
 
-import static 
org.apache.ignite.cli.core.style.component.CommonMessages.CONNECT_OR_USE_CLUSTER_URL_MESSAGE;
 import static picocli.CommandLine.Command;
 
 import jakarta.inject.Inject;
 import org.apache.ignite.cli.call.cluster.ClusterInitCall;
 import org.apache.ignite.cli.call.cluster.ClusterInitCallInput;
-import 
org.apache.ignite.cli.call.cluster.ClusterInitCallInput.ClusterInitCallInputBuilder;
 import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.cluster.ClusterUrlMixin;
-import org.apache.ignite.cli.core.call.CallExecutionPipeline;
-import org.apache.ignite.cli.core.repl.Session;
+import org.apache.ignite.cli.commands.questions.ConnectToClusterQuestion;
+import org.apache.ignite.cli.core.flow.builder.Flows;
 import picocli.CommandLine.Mixin;
 
 /**
@@ -46,31 +44,22 @@ public class ClusterInitReplCommand extends BaseCommand 
implements Runnable {
     private ClusterInitCall call;
 
     @Inject
-    private Session session;
+    private ConnectToClusterQuestion question;
 
     /** {@inheritDoc} */
     @Override
     public void run() {
-        ClusterInitCallInputBuilder input = buildCallInput();
-
-        if (session.isConnectedToNode()) {
-            input.clusterUrl(session.nodeUrl());
-        } else if (clusterUrl.getClusterUrl() != null) {
-            input.clusterUrl(clusterUrl.getClusterUrl());
-        } else {
-            
spec.commandLine().getErr().println(CONNECT_OR_USE_CLUSTER_URL_MESSAGE.render());
-            return;
-        }
-
-        CallExecutionPipeline.builder(call)
-                .inputProvider(input::build)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .build()
-                .runPipeline();
+        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+                .map(this::buildCallInput)
+                .then(Flows.fromCall(call))
+                .print()
+                .start();
     }
 
-    private ClusterInitCallInputBuilder buildCallInput() {
-        return 
ClusterInitCallInput.builder().fromClusterInitOptions(clusterInitOptions);
+    private ClusterInitCallInput buildCallInput(String clusterUrl) {
+        return ClusterInitCallInput.builder()
+                .clusterUrl(clusterUrl)
+                .fromClusterInitOptions(clusterInitOptions)
+                .build();
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/status/ClusterStatusCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/status/ClusterStatusCommand.java
index 4e32359e1b..aef83ec43b 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/status/ClusterStatusCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/status/ClusterStatusCommand.java
@@ -23,7 +23,7 @@ import 
org.apache.ignite.cli.call.cluster.status.ClusterStatusCall;
 import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.cluster.ClusterUrlProfileMixin;
 import org.apache.ignite.cli.core.call.CallExecutionPipeline;
-import org.apache.ignite.cli.core.call.StatusCallInput;
+import org.apache.ignite.cli.core.call.UrlCallInput;
 import org.apache.ignite.cli.decorators.ClusterStatusDecorator;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Mixin;
@@ -40,13 +40,13 @@ public class ClusterStatusCommand extends BaseCommand 
implements Callable<Intege
     private ClusterUrlProfileMixin clusterUrl;
 
     @Inject
-    private ClusterStatusCall clusterStatusCall;
+    private ClusterStatusCall call;
 
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(clusterStatusCall)
-                .inputProvider(() -> new 
StatusCallInput(clusterUrl.getClusterUrl()))
+        return CallExecutionPipeline.builder(call)
+                .inputProvider(() -> new 
UrlCallInput(clusterUrl.getClusterUrl()))
                 .output(spec.commandLine().getOut())
                 .errOutput(spec.commandLine().getErr())
                 .decorator(new ClusterStatusDecorator())
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/status/ClusterStatusReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/status/ClusterStatusReplCommand.java
index 06581a9b64..ec42e7c00b 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/status/ClusterStatusReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/cluster/status/ClusterStatusReplCommand.java
@@ -17,16 +17,13 @@
 
 package org.apache.ignite.cli.commands.cluster.status;
 
-import static 
org.apache.ignite.cli.core.style.component.CommonMessages.CONNECT_OR_USE_CLUSTER_URL_MESSAGE;
-
 import jakarta.inject.Inject;
 import org.apache.ignite.cli.call.cluster.status.ClusterStatusCall;
 import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.cluster.ClusterUrlMixin;
-import org.apache.ignite.cli.core.call.CallExecutionPipeline;
-import org.apache.ignite.cli.core.call.StatusCallInput;
-import org.apache.ignite.cli.core.repl.Session;
-import org.apache.ignite.cli.decorators.ClusterStatusDecorator;
+import org.apache.ignite.cli.commands.questions.ConnectToClusterQuestion;
+import org.apache.ignite.cli.core.call.UrlCallInput;
+import org.apache.ignite.cli.core.flow.builder.Flows;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Mixin;
 
@@ -40,31 +37,18 @@ public class ClusterStatusReplCommand extends BaseCommand 
implements Runnable {
     private ClusterUrlMixin clusterUrl;
 
     @Inject
-    private ClusterStatusCall clusterStatusReplCall;
+    private ClusterStatusCall call;
 
     @Inject
-    private Session session;
+    private ConnectToClusterQuestion question;
 
     /** {@inheritDoc} */
     @Override
     public void run() {
-        String inputUrl;
-
-        if (clusterUrl.getClusterUrl() != null) {
-            inputUrl = clusterUrl.getClusterUrl();
-        } else if (session.isConnectedToNode()) {
-            inputUrl = session.nodeUrl();
-        } else {
-            
spec.commandLine().getErr().println(CONNECT_OR_USE_CLUSTER_URL_MESSAGE.render());
-            return;
-        }
-
-        CallExecutionPipeline.builder(clusterStatusReplCall)
-                .inputProvider(() -> new StatusCallInput(inputUrl))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .decorator(new ClusterStatusDecorator())
-                .build()
-                .runPipeline();
+        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+                .map(UrlCallInput::new)
+                .then(Flows.fromCall(call))
+                .print()
+                .start();
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/config/NodeConfigShowReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/config/NodeConfigShowReplCommand.java
index 77f03d4171..8e281aae7a 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/config/NodeConfigShowReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/config/NodeConfigShowReplCommand.java
@@ -23,7 +23,6 @@ import 
org.apache.ignite.cli.call.configuration.NodeConfigShowCallInput;
 import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.node.NodeUrlMixin;
 import org.apache.ignite.cli.commands.questions.ConnectToClusterQuestion;
-import org.apache.ignite.cli.core.flow.Flowable;
 import org.apache.ignite.cli.core.flow.builder.Flows;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Mixin;
@@ -54,9 +53,8 @@ public class NodeConfigShowReplCommand extends BaseCommand 
implements Runnable {
         question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
                 .map(this::nodeConfigShowCallInput)
                 .then(Flows.fromCall(call))
-                .toOutput(spec.commandLine().getOut(), 
spec.commandLine().getErr())
-                .build()
-                .start(Flowable.empty());
+                .print()
+                .start();
     }
 
     private NodeConfigShowCallInput nodeConfigShowCallInput(String nodeUrl) {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/config/NodeConfigUpdateReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/config/NodeConfigUpdateReplCommand.java
index 4c786be75f..64952fde92 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/config/NodeConfigUpdateReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/config/NodeConfigUpdateReplCommand.java
@@ -23,7 +23,6 @@ import 
org.apache.ignite.cli.call.configuration.NodeConfigUpdateCallInput;
 import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.node.NodeUrlMixin;
 import org.apache.ignite.cli.commands.questions.ConnectToClusterQuestion;
-import org.apache.ignite.cli.core.flow.Flowable;
 import org.apache.ignite.cli.core.flow.builder.Flows;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Mixin;
@@ -54,9 +53,8 @@ public class NodeConfigUpdateReplCommand extends BaseCommand 
implements Runnable
         question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
                 .map(this::nodeConfigUpdateCallInput)
                 .then(Flows.fromCall(call))
-                .toOutput(spec.commandLine().getOut(), 
spec.commandLine().getErr())
-                .build()
-                .start(Flowable.empty());
+                .print()
+                .start();
     }
 
     private NodeConfigUpdateCallInput nodeConfigUpdateCallInput(String 
nodeUrl) {
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/status/NodeStatusCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/status/NodeStatusCommand.java
index 7473e35b67..848c9eb47d 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/status/NodeStatusCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/status/NodeStatusCommand.java
@@ -23,7 +23,7 @@ import org.apache.ignite.cli.call.node.status.NodeStatusCall;
 import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.node.NodeUrlProfileMixin;
 import org.apache.ignite.cli.core.call.CallExecutionPipeline;
-import org.apache.ignite.cli.core.call.StatusCallInput;
+import org.apache.ignite.cli.core.call.UrlCallInput;
 import org.apache.ignite.cli.decorators.NodeStatusDecorator;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Mixin;
@@ -38,13 +38,13 @@ public class NodeStatusCommand extends BaseCommand 
implements Callable<Integer>
     private NodeUrlProfileMixin nodeUrl;
 
     @Inject
-    private NodeStatusCall nodeStatusCall;
+    private NodeStatusCall call;
 
     /** {@inheritDoc} */
     @Override
     public Integer call() {
-        return CallExecutionPipeline.builder(nodeStatusCall)
-                .inputProvider(() -> new StatusCallInput(nodeUrl.getNodeUrl()))
+        return CallExecutionPipeline.builder(call)
+                .inputProvider(() -> new UrlCallInput(nodeUrl.getNodeUrl()))
                 .output(spec.commandLine().getOut())
                 .errOutput(spec.commandLine().getErr())
                 .decorator(new NodeStatusDecorator())
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/status/NodeStatusReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/status/NodeStatusReplCommand.java
index 70a59894ee..eb6331be81 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/status/NodeStatusReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/node/status/NodeStatusReplCommand.java
@@ -17,17 +17,13 @@
 
 package org.apache.ignite.cli.commands.node.status;
 
-import static 
org.apache.ignite.cli.core.style.component.CommonMessages.CONNECT_OR_USE_NODE_URL_MESSAGE;
-
 import jakarta.inject.Inject;
-import java.util.concurrent.Callable;
 import org.apache.ignite.cli.call.node.status.NodeStatusCall;
 import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.node.NodeUrlMixin;
-import org.apache.ignite.cli.core.call.CallExecutionPipeline;
-import org.apache.ignite.cli.core.call.StatusCallInput;
-import org.apache.ignite.cli.core.repl.Session;
-import org.apache.ignite.cli.decorators.NodeStatusDecorator;
+import org.apache.ignite.cli.commands.questions.ConnectToClusterQuestion;
+import org.apache.ignite.cli.core.call.UrlCallInput;
+import org.apache.ignite.cli.core.flow.builder.Flows;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Mixin;
 
@@ -35,37 +31,24 @@ import picocli.CommandLine.Mixin;
  * Display the node status in REPL.
  */
 @Command(name = "status", description = "Prints status of the node")
-public class NodeStatusReplCommand extends BaseCommand implements 
Callable<Integer> {
+public class NodeStatusReplCommand extends BaseCommand implements Runnable {
     /** Node URL option. */
     @Mixin
     private NodeUrlMixin nodeUrl;
 
     @Inject
-    private NodeStatusCall nodeStatusCall;
+    private NodeStatusCall call;
 
     @Inject
-    private Session session;
+    private ConnectToClusterQuestion question;
 
     /** {@inheritDoc} */
     @Override
-    public Integer call() {
-        String inputUrl;
-
-        if (nodeUrl.getNodeUrl() != null) {
-            inputUrl = nodeUrl.getNodeUrl();
-        } else if (session.isConnectedToNode()) {
-            inputUrl = session.nodeUrl();
-        } else {
-            
spec.commandLine().getErr().println(CONNECT_OR_USE_NODE_URL_MESSAGE.render());
-            return 2;
-        }
-
-        return CallExecutionPipeline.builder(nodeStatusCall)
-                .inputProvider(() -> new StatusCallInput(inputUrl))
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .decorator(new NodeStatusDecorator())
-                .build()
-                .runPipeline();
+    public void run() {
+        question.askQuestionIfNotConnected(nodeUrl.getNodeUrl())
+                .map(UrlCallInput::new)
+                .then(Flows.fromCall(call))
+                .print()
+                .start();
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/questions/ConnectToClusterQuestion.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/questions/ConnectToClusterQuestion.java
index 00d6bfb369..145da5e0ea 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/questions/ConnectToClusterQuestion.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/questions/ConnectToClusterQuestion.java
@@ -29,7 +29,6 @@ import org.apache.ignite.cli.core.flow.Flowable;
 import org.apache.ignite.cli.core.flow.builder.FlowBuilder;
 import org.apache.ignite.cli.core.flow.builder.Flows;
 import org.apache.ignite.cli.core.repl.Session;
-import org.apache.ignite.cli.core.repl.context.CommandLineContextProvider;
 import org.apache.ignite.cli.core.style.component.QuestionUiComponent;
 import org.apache.ignite.cli.core.style.element.UiElements;
 
@@ -70,7 +69,7 @@ public class ConnectToClusterQuestion {
                 .ifThen(Objects::isNull, Flows.<String, 
ConnectCallInput>acceptQuestion(questionUiComponent,
                                 () -> new ConnectCallInput(defaultUrl))
                         .then(Flows.fromCall(connectCall))
-                        .toOutput(CommandLineContextProvider.getContext())
+                        .print()
                         .build())
                 .then(prevUrl -> {
                     // If inner flow from ifThen is interrupted we should 
interrupt outer flow as well.
@@ -108,9 +107,9 @@ public class ConnectToClusterQuestion {
 
         Flows.acceptQuestion(question, () -> new ConnectCallInput(clusterUrl))
                 .then(Flows.fromCall(connectCall))
-                .toOutput(CommandLineContextProvider.getContext())
+                .print()
                 .ifThen(s -> !Objects.equals(clusterUrl, defaultUrl) && 
session.isConnectedToNode(),
-                        
defaultUrlQuestion(clusterUrl).toOutput(CommandLineContextProvider.getContext()).build())
+                        defaultUrlQuestion(clusterUrl).print().build())
                 .build().start(Flowable.empty());
     }
 
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/LogicalTopologyCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/LogicalTopologyCommand.java
index aca36b43c6..8b1b31406f 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/LogicalTopologyCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/LogicalTopologyCommand.java
@@ -20,10 +20,10 @@ package org.apache.ignite.cli.commands.topology;
 import jakarta.inject.Inject;
 import java.util.concurrent.Callable;
 import org.apache.ignite.cli.call.cluster.topology.LogicalTopologyCall;
-import org.apache.ignite.cli.call.cluster.topology.TopologyCallInput;
 import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.cluster.ClusterUrlProfileMixin;
 import org.apache.ignite.cli.core.call.CallExecutionPipeline;
+import org.apache.ignite.cli.core.call.UrlCallInput;
 import 
org.apache.ignite.cli.core.exception.handler.ClusterNotInitializedExceptionHandler;
 import org.apache.ignite.cli.decorators.TopologyDecorator;
 import picocli.CommandLine.Command;
@@ -45,13 +45,13 @@ public class LogicalTopologyCommand extends BaseCommand 
implements Callable<Inte
     @Override
     public Integer call() {
         return CallExecutionPipeline.builder(call)
-                .inputProvider(() -> 
TopologyCallInput.builder().clusterUrl(clusterUrl.getClusterUrl()).build())
+                .inputProvider(() -> new 
UrlCallInput(clusterUrl.getClusterUrl()))
                 .output(spec.commandLine().getOut())
                 .errOutput(spec.commandLine().getErr())
+                .decorator(new TopologyDecorator())
                 .exceptionHandler(new ClusterNotInitializedExceptionHandler(
                         "Cannot show logical topology", "ignite cluster init"
                 ))
-                .decorator(new TopologyDecorator())
                 .build()
                 .runPipeline();
     }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/LogicalTopologyReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/LogicalTopologyReplCommand.java
index cc9a624a38..dd51bf9091 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/LogicalTopologyReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/LogicalTopologyReplCommand.java
@@ -17,18 +17,14 @@
 
 package org.apache.ignite.cli.commands.topology;
 
-import static 
org.apache.ignite.cli.core.style.component.CommonMessages.CONNECT_OR_USE_NODE_URL_MESSAGE;
-
 import jakarta.inject.Inject;
-import java.util.concurrent.Callable;
 import org.apache.ignite.cli.call.cluster.topology.LogicalTopologyCall;
-import org.apache.ignite.cli.call.cluster.topology.TopologyCallInput;
-import 
org.apache.ignite.cli.call.cluster.topology.TopologyCallInput.TopologyCallInputBuilder;
 import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.cluster.ClusterUrlMixin;
-import org.apache.ignite.cli.core.call.CallExecutionPipeline;
+import org.apache.ignite.cli.commands.questions.ConnectToClusterQuestion;
+import org.apache.ignite.cli.core.call.UrlCallInput;
 import 
org.apache.ignite.cli.core.exception.handler.ClusterNotInitializedExceptionHandler;
-import org.apache.ignite.cli.core.repl.Session;
+import org.apache.ignite.cli.core.flow.builder.Flows;
 import org.apache.ignite.cli.decorators.TopologyDecorator;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Mixin;
@@ -37,7 +33,7 @@ import picocli.CommandLine.Mixin;
  * Command that show logical cluster topology in REPL mode.
  */
 @Command(name = "logical")
-public class LogicalTopologyReplCommand extends BaseCommand implements 
Callable<Integer> {
+public class LogicalTopologyReplCommand extends BaseCommand implements 
Runnable {
     /** Cluster endpoint URL option. */
     @Mixin
     private ClusterUrlMixin clusterUrl;
@@ -46,29 +42,16 @@ public class LogicalTopologyReplCommand extends BaseCommand 
implements Callable<
     private LogicalTopologyCall call;
 
     @Inject
-    private Session session;
+    private ConnectToClusterQuestion question;
 
     /** {@inheritDoc} */
     @Override
-    public Integer call() {
-        TopologyCallInputBuilder inputBuilder = TopologyCallInput.builder();
-
-        if (clusterUrl.getClusterUrl() != null) {
-            inputBuilder.clusterUrl(clusterUrl.getClusterUrl());
-        } else if (session.isConnectedToNode()) {
-            inputBuilder.clusterUrl(session.nodeUrl());
-        } else {
-            
spec.commandLine().getErr().println(CONNECT_OR_USE_NODE_URL_MESSAGE.render());
-            return 2;
-        }
-
-        return CallExecutionPipeline.builder(call)
-                .inputProvider(inputBuilder::build)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .decorator(new TopologyDecorator())
+    public void run() {
+        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+                .map(UrlCallInput::new)
+                .then(Flows.fromCall(call))
                 .exceptionHandler(new 
ClusterNotInitializedExceptionHandler("Cannot show logical topology", "cluster 
init"))
-                .build()
-                .runPipeline();
+                .print(new TopologyDecorator())
+                .start();
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/PhysicalTopologyCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/PhysicalTopologyCommand.java
index 6051516903..9ed92adc25 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/PhysicalTopologyCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/PhysicalTopologyCommand.java
@@ -20,10 +20,10 @@ package org.apache.ignite.cli.commands.topology;
 import jakarta.inject.Inject;
 import java.util.concurrent.Callable;
 import org.apache.ignite.cli.call.cluster.topology.PhysicalTopologyCall;
-import org.apache.ignite.cli.call.cluster.topology.TopologyCallInput;
 import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.cluster.ClusterUrlProfileMixin;
 import org.apache.ignite.cli.core.call.CallExecutionPipeline;
+import org.apache.ignite.cli.core.call.UrlCallInput;
 import org.apache.ignite.cli.decorators.TopologyDecorator;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Mixin;
@@ -44,7 +44,7 @@ public class PhysicalTopologyCommand extends BaseCommand 
implements Callable<Int
     @Override
     public Integer call() {
         return CallExecutionPipeline.builder(call)
-                .inputProvider(() -> 
TopologyCallInput.builder().clusterUrl(clusterUrl.getClusterUrl()).build())
+                .inputProvider(() -> new 
UrlCallInput(clusterUrl.getClusterUrl()))
                 .output(spec.commandLine().getOut())
                 .errOutput(spec.commandLine().getErr())
                 .decorator(new TopologyDecorator())
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/PhysicalTopologyReplCommand.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/PhysicalTopologyReplCommand.java
index 6d366228d0..2f1c6df5dd 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/PhysicalTopologyReplCommand.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/commands/topology/PhysicalTopologyReplCommand.java
@@ -17,17 +17,13 @@
 
 package org.apache.ignite.cli.commands.topology;
 
-import static 
org.apache.ignite.cli.core.style.component.CommonMessages.CONNECT_OR_USE_NODE_URL_MESSAGE;
-
 import jakarta.inject.Inject;
-import java.util.concurrent.Callable;
 import org.apache.ignite.cli.call.cluster.topology.PhysicalTopologyCall;
-import org.apache.ignite.cli.call.cluster.topology.TopologyCallInput;
-import 
org.apache.ignite.cli.call.cluster.topology.TopologyCallInput.TopologyCallInputBuilder;
 import org.apache.ignite.cli.commands.BaseCommand;
 import org.apache.ignite.cli.commands.cluster.ClusterUrlMixin;
-import org.apache.ignite.cli.core.call.CallExecutionPipeline;
-import org.apache.ignite.cli.core.repl.Session;
+import org.apache.ignite.cli.commands.questions.ConnectToClusterQuestion;
+import org.apache.ignite.cli.core.call.UrlCallInput;
+import org.apache.ignite.cli.core.flow.builder.Flows;
 import org.apache.ignite.cli.decorators.TopologyDecorator;
 import picocli.CommandLine.Command;
 import picocli.CommandLine.Mixin;
@@ -36,7 +32,7 @@ import picocli.CommandLine.Mixin;
  * Command that show physical cluster topology in REPL mode.
  */
 @Command(name = "physical")
-public class PhysicalTopologyReplCommand extends BaseCommand implements 
Callable<Integer> {
+public class PhysicalTopologyReplCommand extends BaseCommand implements 
Runnable {
     /** Cluster endpoint URL option. */
     @Mixin
     private ClusterUrlMixin clusterUrl;
@@ -45,28 +41,15 @@ public class PhysicalTopologyReplCommand extends 
BaseCommand implements Callable
     private PhysicalTopologyCall call;
 
     @Inject
-    private Session session;
+    private ConnectToClusterQuestion question;
 
     /** {@inheritDoc} */
     @Override
-    public Integer call() {
-        TopologyCallInputBuilder inputBuilder = TopologyCallInput.builder();
-
-        if (clusterUrl.getClusterUrl() != null) {
-            inputBuilder.clusterUrl(clusterUrl.getClusterUrl());
-        } else if (session.isConnectedToNode()) {
-            inputBuilder.clusterUrl(session.nodeUrl());
-        } else {
-            
spec.commandLine().getErr().println(CONNECT_OR_USE_NODE_URL_MESSAGE.render());
-            return 2;
-        }
-
-        return CallExecutionPipeline.builder(call)
-                .inputProvider(inputBuilder::build)
-                .output(spec.commandLine().getOut())
-                .errOutput(spec.commandLine().getErr())
-                .decorator(new TopologyDecorator())
-                .build()
-                .runPipeline();
+    public void run() {
+        question.askQuestionIfNotConnected(clusterUrl.getClusterUrl())
+                .map(UrlCallInput::new)
+                .then(Flows.fromCall(call))
+                .print(new TopologyDecorator())
+                .start();
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/call/DefaultCallOutput.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/call/DefaultCallOutput.java
index 52779ff14f..b522726343 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/call/DefaultCallOutput.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/call/DefaultCallOutput.java
@@ -121,7 +121,7 @@ public class DefaultCallOutput<T> implements CallOutput<T> {
     }
 
     /**
-     * New empty coll output.
+     * New empty call output.
      *
      * @return Empty call output.
      */
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/call/StatusCallInput.java
 b/modules/cli/src/main/java/org/apache/ignite/cli/core/call/UrlCallInput.java
similarity index 67%
rename from 
modules/cli/src/main/java/org/apache/ignite/cli/core/call/StatusCallInput.java
rename to 
modules/cli/src/main/java/org/apache/ignite/cli/core/call/UrlCallInput.java
index 8d2dad95e1..76cecea224 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/call/StatusCallInput.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/call/UrlCallInput.java
@@ -17,18 +17,25 @@
 
 package org.apache.ignite.cli.core.call;
 
-/**
- * Input for status call.
- */
-public class StatusCallInput implements CallInput {
-    private final String clusterUrl;
+/** Input for executing commands with URL arguments. */
+public class UrlCallInput implements CallInput {
+    private final String url;
 
-    public StatusCallInput(String clusterUrl) {
-        this.clusterUrl = clusterUrl;
+    /**
+     * Constructor with specified URL.
+     *
+     * @param url URL input
+     */
+    public UrlCallInput(String url) {
+        this.url = url;
     }
 
-    public String getClusterUrl() {
-        return clusterUrl;
+    /**
+     * Argument getter.
+     *
+     * @return URL argument
+     */
+    public String getUrl() {
+        return url;
     }
-
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/exception/ExceptionWriter.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/exception/ExceptionWriter.java
index f387685934..b95e135895 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/exception/ExceptionWriter.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/exception/ExceptionWriter.java
@@ -37,7 +37,11 @@ public interface ExceptionWriter {
      * @return {@link ExceptionWriter} instance based on {@param pw}.
      */
     static ExceptionWriter fromPrintWriter(PrintWriter pw) {
-        return pw::println;
+        if (pw != null) {
+            return pw::println;
+        } else {
+            return nullWriter();
+        }
     }
 
 
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/Flowable.java 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/Flowable.java
index 4ab744cd8a..cac7301f26 100644
--- a/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/Flowable.java
+++ b/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/Flowable.java
@@ -111,7 +111,7 @@ public interface Flowable<T> {
     }
 
     /**
-     * New empty coll output.
+     * New empty call output.
      *
      * @return Empty call output.
      */
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/builder/FlowBuilder.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/builder/FlowBuilder.java
index 440c10f7c4..5ae4c490c0 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/builder/FlowBuilder.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/builder/FlowBuilder.java
@@ -17,14 +17,14 @@
 
 package org.apache.ignite.cli.core.flow.builder;
 
-import java.io.PrintWriter;
 import java.util.List;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import org.apache.ignite.cli.core.decorator.Decorator;
+import org.apache.ignite.cli.core.decorator.TerminalOutput;
 import org.apache.ignite.cli.core.exception.ExceptionHandler;
 import org.apache.ignite.cli.core.flow.Flow;
 import org.apache.ignite.cli.core.flow.question.QuestionAnswer;
-import org.apache.ignite.cli.core.repl.context.CommandLineContext;
 
 /**
  * Builder of {@link Flow}.
@@ -34,25 +34,91 @@ import 
org.apache.ignite.cli.core.repl.context.CommandLineContext;
  */
 public interface FlowBuilder<I, O>  {
 
+    /**
+     * Appends flow to this builder.
+     *
+     * @param flow flow to append
+     * @param <OT> output type of appended flow
+     * @return instance of builder with appended flow
+     */
     <OT> FlowBuilder<I, OT> then(Flow<O, OT> flow);
 
+    /**
+     * Transforms current flow result.
+     *
+     * @param mapper function to transform the result of the current flow
+     * @param <OT> output type of transformation function
+     * @return instance of builder with transform
+     */
     default <OT> FlowBuilder<I, OT> map(Function<O, OT> mapper) {
         return then(Flows.mono(mapper));
     }
 
+    /**
+     * Appends the flow to this builder if the result of the current flow 
matches the predicate.
+     *
+     * @param tester predicate to test
+     * @param flow flow to append
+     * @param <OT> output type of appended flow
+     * @return instance of builder
+     */
     <OT> FlowBuilder<I, O> ifThen(Predicate<O> tester, Flow<O, OT> flow);
 
+    /**
+     * Appends the flow which will ask a question based on the result of the 
current flow and return the question answer.
+     *
+     * @param questionText text to display as a question
+     * @param answers list of answers
+     * @param <QT> type of the answer
+     * @return instance of builder
+     */
     <QT> FlowBuilder<I, QT> question(String questionText, 
List<QuestionAnswer<O, QT>> answers);
 
+    /**
+     * Appends the flow which will ask a question based on the result of the 
current flow and return the question answer.
+     *
+     * @param questionText function which takes a result of the current flow 
and returns a question text
+     * @param answers list of answers
+     * @param <QT> type of the answer
+     * @return instance of builder
+     */
     <QT> FlowBuilder<I, QT> question(Function<O, String> questionText, 
List<QuestionAnswer<O, QT>> answers);
 
+    /**
+     * Adds exception handler to the flow chain which will be called during 
print operation if flow resulted in error.
+     *
+     * @param exceptionHandler exception handler
+     * @return instance of builder
+     */
     FlowBuilder<I, O> exceptionHandler(ExceptionHandler<?> exceptionHandler);
 
-    FlowBuilder<I, O> toOutput(PrintWriter output, PrintWriter errorOutput);
+    /**
+     * Appends print operation which will print the result of the current flow 
using provided {@code decorator} or call the exception
+     * handler.
+     *
+     * @param decorator output decorator
+     * @return instance of builder
+     */
+    FlowBuilder<I, O> print(Decorator<O, TerminalOutput> decorator);
 
-    default FlowBuilder<I, O> toOutput(CommandLineContext context) {
-        return toOutput(context.out(), context.err());
-    }
+    /**
+     * Appends print operation which will print the result of the current flow 
using decorator found in registry or call the exception
+     * handler.
+     *
+     * @return instance of builder
+     */
+    FlowBuilder<I, O> print();
 
+    /**
+     * Builds the flow from the builder.
+     *
+     * @return resulting flow
+     */
     Flow<I, O> build();
+
+    /**
+     * Convenience method which is equivalent to the {@code 
build().start(Flowable.empty())}. It builds the flow and starts it with the
+     * empty input.
+     */
+    void start();
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/builder/FlowBuilderImpl.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/builder/FlowBuilderImpl.java
index ffd8559f42..c8771b4c21 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/builder/FlowBuilderImpl.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/builder/FlowBuilderImpl.java
@@ -21,7 +21,9 @@ import java.io.PrintWriter;
 import java.util.List;
 import java.util.function.Function;
 import java.util.function.Predicate;
+import org.apache.ignite.cli.core.decorator.Decorator;
 import org.apache.ignite.cli.core.decorator.DecoratorRegistry;
+import org.apache.ignite.cli.core.decorator.TerminalOutput;
 import org.apache.ignite.cli.core.exception.ExceptionHandler;
 import org.apache.ignite.cli.core.exception.ExceptionHandlers;
 import org.apache.ignite.cli.core.exception.ExceptionWriter;
@@ -42,8 +44,8 @@ import 
org.apache.ignite.cli.decorators.DefaultDecoratorRegistry;
  */
 public class FlowBuilderImpl<I, O> implements FlowBuilder<I, O> {
     private final Flow<I, O> flow;
-    private final ExceptionHandlers exceptionHandlers = new 
DefaultExceptionHandlers();
-    private final DecoratorRegistry decoratorRegistry = new 
DefaultDecoratorRegistry();
+    private final ExceptionHandlers exceptionHandlers;
+    private final DecoratorRegistry decoratorRegistry;
 
     FlowBuilderImpl(Flow<I, O> flow) {
         this(flow, new DefaultExceptionHandlers(), new 
DefaultDecoratorRegistry());
@@ -56,10 +58,10 @@ public class FlowBuilderImpl<I, O> implements 
FlowBuilder<I, O> {
      * @param exceptionHandlers exception handlers.
      * @param decoratorRegistry decorator registry.
      */
-    public FlowBuilderImpl(Flow<I, O> flow, ExceptionHandlers 
exceptionHandlers, DecoratorRegistry decoratorRegistry) {
+    private FlowBuilderImpl(Flow<I, O> flow, ExceptionHandlers 
exceptionHandlers, DecoratorRegistry decoratorRegistry) {
         this.flow = flow;
-        this.exceptionHandlers.addExceptionHandlers(exceptionHandlers);
-        this.decoratorRegistry.addAll(decoratorRegistry);
+        this.exceptionHandlers = exceptionHandlers;
+        this.decoratorRegistry = decoratorRegistry;
     }
 
     @Override
@@ -69,26 +71,24 @@ public class FlowBuilderImpl<I, O> implements 
FlowBuilder<I, O> {
 
     @Override
     public <OT> FlowBuilder<I, O> ifThen(Predicate<O> tester, Flow<O, OT> 
flow) {
-        return new FlowBuilderImpl<>(this.flow.composite(input -> {
+        return then(input -> {
             if (tester.test(input.value())) {
                 flow.start(input);
             }
             return input;
-        }), exceptionHandlers, decoratorRegistry);
+        });
     }
 
     @Override
     public <QT> FlowBuilder<I, QT> question(String questionText, 
List<QuestionAnswer<O, QT>> questionAnswers) {
-        return new FlowBuilderImpl<>(flow.composite(input -> Flowable.success(
-                
QuestionAskerFactory.newQuestionAsker().askQuestion(questionText, 
input.value(), questionAnswers))),
-                exceptionHandlers, decoratorRegistry);
+        return then(input -> Flowable.success(
+                
QuestionAskerFactory.newQuestionAsker().askQuestion(questionText, 
input.value(), questionAnswers)));
     }
 
     @Override
     public <QT> FlowBuilder<I, QT> question(Function<O, String> questionText, 
List<QuestionAnswer<O, QT>> answers) {
-        return new FlowBuilderImpl<>(flow.composite(input -> Flowable.success(
-                
QuestionAskerFactory.newQuestionAsker().askQuestion(questionText.apply(input.value()),
 input.value(), answers))),
-                exceptionHandlers, decoratorRegistry);
+        return then(input -> Flowable.success(
+                
QuestionAskerFactory.newQuestionAsker().askQuestion(questionText.apply(input.value()),
 input.value(), answers)));
     }
 
     @Override
@@ -98,35 +98,59 @@ public class FlowBuilderImpl<I, O> implements 
FlowBuilder<I, O> {
     }
 
     @Override
-    public FlowBuilder<I, O> toOutput(PrintWriter output, PrintWriter 
errorOutput) {
-        return new FlowBuilderImpl<>(flow.composite(input -> {
-            if (input.hasResult()) {
-                // Workaround for the 
https://issues.apache.org/jira/browse/IGNITE-17346
-                // This will turn the tailtips off before printing
-                CommandLineContextProvider.print(() -> {
-                    String out = 
decoratorRegistry.getDecorator(input.type()).decorate(input.value()).toTerminalString();
-                    output.println(out);
-                });
-            } else if (input.hasError()) {
-                
exceptionHandlers.handleException(ExceptionWriter.fromPrintWriter(errorOutput), 
input.errorCause());
-                return Flowable.empty();
-            }
-            return input;
-        }));
+    public FlowBuilder<I, O> print(Decorator<O, TerminalOutput> decorator) {
+        return then(input -> printResult(input, type -> decorator));
+    }
+
+    @Override
+    public FlowBuilder<I, O> print() {
+        return then(input -> printResult(input, 
decoratorRegistry::getDecorator));
     }
 
     @Override
     public Flow<I, O> build() {
-        return input -> {
-            try {
-                Flowable<O> output = flow.start(input);
-                if (output.hasError()) {
-                    exceptionHandlers.handleException(output.errorCause());
-                }
-                return output;
-            } catch (FlowInterruptException e) {
-                return Flowable.empty();
-            }
-        };
+        return this::run;
+    }
+
+    @Override
+    public void start() {
+        run(Flowable.empty());
+    }
+
+    /**
+     * Flow method which starts current flow and returns its result or empty 
output if flow is interrupted.
+     *
+     * @param input input flowable
+     * @return output flowable
+     */
+    private Flowable<O> run(Flowable<I> input) {
+        try {
+            return flow.start(input);
+        } catch (FlowInterruptException e) {
+            return Flowable.empty();
+        }
+    }
+
+    /**
+     * Flow method which will print the decorated result of the {@code input} 
to the output provided by the context
+     * or handle the exception using the error output from the context.
+
+     * @param input input flowable
+     * @return input flowable
+     */
+    private Flowable<O> printResult(Flowable<O> input, Function<Class<O>, 
Decorator<O, TerminalOutput>> decoratorProvider) {
+        if (input.hasResult()) {
+            // Workaround for the 
https://issues.apache.org/jira/browse/IGNITE-17346
+            // This will turn the tailtips off before printing
+            CommandLineContextProvider.print(() -> {
+                String out = 
decoratorProvider.apply(input.type()).decorate(input.value()).toTerminalString();
+                PrintWriter output = 
CommandLineContextProvider.getContext().out();
+                output.println(out);
+            });
+        } else if (input.hasError()) {
+            PrintWriter errOutput = 
CommandLineContextProvider.getContext().err();
+            
exceptionHandlers.handleException(ExceptionWriter.fromPrintWriter(errOutput), 
input.errorCause());
+        }
+        return input;
     }
 }
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/builder/Flows.java 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/builder/Flows.java
index 9070efd5af..f1a9f26d07 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/builder/Flows.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/flow/builder/Flows.java
@@ -73,7 +73,7 @@ public final class Flows {
      * @return {@link FlowBuilder} which started from constant flow.
      */
     public static <T> FlowBuilder<Void, T> from(T value) {
-        return new FlowBuilderImpl<>(mono(unused -> value));
+        return from(unused -> value);
     }
 
     /**
@@ -137,7 +137,7 @@ public final class Flows {
                         List.of(new AcceptedQuestionAnswer<>((a, i) -> null),
                                 new InterruptQuestionAnswer<>())
                 )
-                .then(Flows.mono(unused -> onAccept.get()));
+                .then(mono(unused -> onAccept.get()));
     }
 
     /**
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/repl/context/CommandLineContextProvider.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/repl/context/CommandLineContextProvider.java
index eea15fc474..cd1f83e6a4 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/repl/context/CommandLineContextProvider.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/repl/context/CommandLineContextProvider.java
@@ -27,7 +27,7 @@ import picocli.CommandLine;
 //Tech Debt: IGNITE-17484
 public class CommandLineContextProvider {
 
-    private static volatile CommandLine cmd;
+    private static volatile CommandLineContext context;
 
     private static volatile Consumer<Runnable> printWrapper = Runnable::run;
 
@@ -37,7 +37,16 @@ public class CommandLineContextProvider {
      * @return context instance.
      */
     public static CommandLineContext getContext() {
-        return new CommandLineContext() {
+        return context;
+    }
+
+    /**
+     * Sets a context from {@link CommandLine} instance.
+     *
+     * @param cmd {@link CommandLine} instance
+     */
+    public static void setCmd(CommandLine cmd) {
+        context = new CommandLineContext() {
             @Override
             public PrintWriter out() {
                 return cmd.getOut();
@@ -50,8 +59,24 @@ public class CommandLineContextProvider {
         };
     }
 
-    public static void setCmd(CommandLine cmd) {
-        CommandLineContextProvider.cmd = cmd;
+    /**
+     * Sets a context from {@link PrintWriter}.
+     *
+     * @param out output writer
+     * @param err error output writer
+     */
+    public static void setWriters(PrintWriter out, PrintWriter err) {
+        context = new CommandLineContext() {
+            @Override
+            public PrintWriter out() {
+                return out;
+            }
+
+            @Override
+            public PrintWriter err() {
+                return err;
+            }
+        };
     }
 
     /**
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/style/component/CommonMessages.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/core/style/component/CommonMessages.java
deleted file mode 100644
index dae4fe5bfb..0000000000
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/core/style/component/CommonMessages.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cli.core.style.component;
-
-import org.apache.ignite.cli.core.style.element.UiElements;
-
-/** Common UI messages. */
-public class CommonMessages {
-    public static MessageUiComponent CONNECT_OR_USE_CLUSTER_URL_MESSAGE = 
MessageUiComponent.builder()
-            .message("You are not connected to node")
-            .hint("Run %s command or use %s option",
-                    UiElements.command("connect"), 
UiElements.option("--cluster-url"))
-            .build();
-
-    public static MessageUiComponent CONNECT_OR_USE_NODE_URL_MESSAGE = 
MessageUiComponent.builder()
-            .message("You are not connected to node")
-            .hint("Run %s command or use %s  option",
-                    UiElements.command("connect"), 
UiElements.option("--node-url"))
-            .build();
-}
diff --git 
a/modules/cli/src/main/java/org/apache/ignite/cli/decorators/DefaultDecoratorRegistry.java
 
b/modules/cli/src/main/java/org/apache/ignite/cli/decorators/DefaultDecoratorRegistry.java
index ef282949db..acaf221344 100644
--- 
a/modules/cli/src/main/java/org/apache/ignite/cli/decorators/DefaultDecoratorRegistry.java
+++ 
b/modules/cli/src/main/java/org/apache/ignite/cli/decorators/DefaultDecoratorRegistry.java
@@ -41,6 +41,5 @@ public class DefaultDecoratorRegistry extends 
DecoratorRegistry {
         add(SqlQueryResult.class, new SqlQueryResultDecorator());
         add(ClusterStatus.class, new ClusterStatusDecorator());
         add(NodeStatus.class, new NodeStatusDecorator());
-
     }
 }
diff --git 
a/modules/cli/src/test/java/org/apache/ignite/cli/call/node/status/NodeStatusCallTest.java
 
b/modules/cli/src/test/java/org/apache/ignite/cli/call/node/status/NodeStatusCallTest.java
index f865b4e2aa..eb4c06dbd3 100644
--- 
a/modules/cli/src/test/java/org/apache/ignite/cli/call/node/status/NodeStatusCallTest.java
+++ 
b/modules/cli/src/test/java/org/apache/ignite/cli/call/node/status/NodeStatusCallTest.java
@@ -28,7 +28,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.Arrays;
 import java.util.stream.Stream;
 import org.apache.ignite.cli.core.call.CallOutput;
-import org.apache.ignite.cli.core.call.StatusCallInput;
+import org.apache.ignite.cli.core.call.UrlCallInput;
 import org.apache.ignite.rest.client.model.NodeState;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
@@ -71,7 +71,7 @@ class NodeStatusCallTest {
         nodeState(givenNodeState);
 
         // When call node status
-        CallOutput<NodeStatus> output = call.execute(new StatusCallInput(url));
+        CallOutput<NodeStatus> output = call.execute(new UrlCallInput(url));
 
         // Then output is successful
         assertThat(output.hasError(), is(false));
diff --git 
a/modules/cli/src/test/java/org/apache/ignite/cli/commands/ProfileMixinTest.java
 
b/modules/cli/src/test/java/org/apache/ignite/cli/commands/ProfileMixinTest.java
index d6d81e9157..b9ae0f0606 100644
--- 
a/modules/cli/src/test/java/org/apache/ignite/cli/commands/ProfileMixinTest.java
+++ 
b/modules/cli/src/test/java/org/apache/ignite/cli/commands/ProfileMixinTest.java
@@ -34,7 +34,6 @@ import org.apache.ignite.cli.call.cluster.ClusterInitCall;
 import org.apache.ignite.cli.call.cluster.ClusterInitCallInput;
 import org.apache.ignite.cli.call.cluster.topology.LogicalTopologyCall;
 import org.apache.ignite.cli.call.cluster.topology.PhysicalTopologyCall;
-import org.apache.ignite.cli.call.cluster.topology.TopologyCallInput;
 import org.apache.ignite.cli.call.configuration.ClusterConfigShowCall;
 import org.apache.ignite.cli.call.configuration.ClusterConfigShowCallInput;
 import org.apache.ignite.cli.call.configuration.ClusterConfigUpdateCall;
@@ -47,7 +46,7 @@ import org.apache.ignite.cli.call.node.status.NodeStatusCall;
 import org.apache.ignite.cli.core.call.Call;
 import org.apache.ignite.cli.core.call.CallInput;
 import org.apache.ignite.cli.core.call.DefaultCallOutput;
-import org.apache.ignite.cli.core.call.StatusCallInput;
+import org.apache.ignite.cli.core.call.UrlCallInput;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -179,8 +178,8 @@ public class ProfileMixinTest {
                 arguments(
                         "node status",
                         NodeStatusCall.class,
-                        StatusCallInput.class,
-                        (Function<StatusCallInput, String>) 
StatusCallInput::getClusterUrl
+                        UrlCallInput.class,
+                        (Function<UrlCallInput, String>) UrlCallInput::getUrl
                 )
         );
     }
@@ -208,14 +207,14 @@ public class ProfileMixinTest {
                 arguments(
                         "cluster topology physical",
                         PhysicalTopologyCall.class,
-                        TopologyCallInput.class,
-                        (Function<TopologyCallInput, String>) 
TopologyCallInput::getClusterUrl
+                        UrlCallInput.class,
+                        (Function<UrlCallInput, String>) UrlCallInput::getUrl
                 ),
                 arguments(
                         "cluster topology logical",
                         LogicalTopologyCall.class,
-                        TopologyCallInput.class,
-                        (Function<TopologyCallInput, String>) 
TopologyCallInput::getClusterUrl
+                        UrlCallInput.class,
+                        (Function<UrlCallInput, String>) UrlCallInput::getUrl
                 )
         );
     }
diff --git 
a/modules/cli/src/test/java/org/apache/ignite/cli/commands/flow/FlowTest.java 
b/modules/cli/src/test/java/org/apache/ignite/cli/commands/flow/FlowTest.java
index 20896800b2..7c6007f140 100644
--- 
a/modules/cli/src/test/java/org/apache/ignite/cli/commands/flow/FlowTest.java
+++ 
b/modules/cli/src/test/java/org/apache/ignite/cli/commands/flow/FlowTest.java
@@ -21,7 +21,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.emptyString;
 import static org.hamcrest.Matchers.equalTo;
 
-import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
 import java.io.FileDescriptor;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -32,25 +31,27 @@ import java.nio.file.Path;
 import java.util.List;
 import org.apache.ignite.cli.core.flow.Flow;
 import org.apache.ignite.cli.core.flow.Flowable;
+import org.apache.ignite.cli.core.flow.builder.FlowBuilder;
 import org.apache.ignite.cli.core.flow.builder.Flows;
 import org.apache.ignite.cli.core.flow.question.JlineQuestionWriterReader;
 import org.apache.ignite.cli.core.flow.question.QuestionAnswer;
 import org.apache.ignite.cli.core.flow.question.QuestionAskerFactory;
+import org.apache.ignite.cli.core.repl.context.CommandLineContextProvider;
 import org.jline.reader.impl.LineReaderImpl;
 import org.jline.terminal.Terminal;
 import org.jline.terminal.impl.DumbTerminal;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
 
-@MicronautTest
 class FlowTest {
     private Terminal terminal;
 
     private Path input;
+    private StringWriter out;
+    private StringWriter errOut;
 
     @BeforeEach
     public void setup() throws IOException {
@@ -59,6 +60,9 @@ class FlowTest {
         terminal = new DumbTerminal(Files.newInputStream(input), new 
FileOutputStream(FileDescriptor.out));
         LineReaderImpl reader = new LineReaderImpl(terminal);
         QuestionAskerFactory.setReadWriter(new 
JlineQuestionWriterReader(reader));
+        out = new StringWriter();
+        errOut = new StringWriter();
+        CommandLineContextProvider.setWriters(new PrintWriter(out), new 
PrintWriter(errOut));
     }
 
     @AfterEach
@@ -100,75 +104,173 @@ class FlowTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17519";)
-    void printsToOutput() throws IOException {
+    @DisplayName("exceptionHandler -> then -> print")
+    void handlerThenPrint() throws IOException {
         // Given
         bindAnswers("no"); // we don't care about answer in this test
 
-        StringWriter out = new StringWriter();
-        PrintWriter output = new PrintWriter(out);
-        StringWriter errOut = new StringWriter();
-        PrintWriter errOutput = new PrintWriter(errOut);
-
         // When build flow
-        Flow<Object, String> build = Flows.question("Do you like this?",
-                        List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> 
1),
-                                new QuestionAnswer<>("no"::equals, (a, i) -> 
2))
-                )
-                .map(String::valueOf)
+        Flow<Object, String> flow = askQuestion()
                 .exceptionHandler(new TestExceptionHandler())
                 .then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
-                .toOutput(output, errOutput)
+                .print()
                 .build();
 
         // Then the output is empty
         assertThat(errOut.toString(), emptyString());
 
         // When start flow
-        build.start(Flowable.empty());
+        flow.start(Flowable.empty());
 
         // Then output equals to the message from the exception because we use 
TestExceptionHandler
-        assertThat(errOut.toString(), equalTo("Ooops!")); // BUT there is the 
message taken from default exception handler
+        assertThat(errOut.toString(), equalTo("Ooops!" + 
System.lineSeparator()));
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-17519";)
-    void printsToOutputThatWorks() throws IOException {
+    @DisplayName("print -> then -> exceptionHandler")
+    void printThenHandler() throws IOException {
         // Given
         bindAnswers("no"); // we don't care about answer in this test
 
-        StringWriter out = new StringWriter();
-        PrintWriter output = new PrintWriter(out);
-        StringWriter errOut = new StringWriter();
-        PrintWriter errOutput = new PrintWriter(errOut);
+        // When build flow
+        Flow<Object, String> flow = askQuestion()
+                .print()
+                .then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
+                .exceptionHandler(new TestExceptionHandler())
+                .build();
+
+        // Then the output is empty
+        assertThat(errOut.toString(), emptyString());
+
+        // When start flow
+        flow.start(Flowable.empty());
+
+        // Then output is empty because print was used before the call
+        assertThat(errOut.toString(), emptyString());
+    }
+
+    @Test
+    @DisplayName("then -> exceptionHandler -> print")
+    void thenHandlerPrint() throws IOException {
+        // Given
+        bindAnswers("no"); // we don't care about answer in this test
 
         // When build flow
-        Flow<Object, String> build = Flows.question("Do you like this?",
-                        List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> 
1),
-                                new QuestionAnswer<>("no"::equals, (a, i) -> 
2))
-                )
-                .map(String::valueOf)
-                .toOutput(output, errOutput)
+        Flow<Object, String> flow = askQuestion()
                 .then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
                 .exceptionHandler(new TestExceptionHandler())
+                .print()
                 .build();
 
         // Then the output is empty
         assertThat(errOut.toString(), emptyString());
 
         // When start flow
-        build.start(Flowable.empty());
+        flow.start(Flowable.empty());
 
         // Then output equals to the message from the exception because we use 
TestExceptionHandler
-        assertThat(errOut.toString(), equalTo("Ooops!")); // BUT it is empty
+        assertThat(errOut.toString(), equalTo("Ooops!" + 
System.lineSeparator()));
+    }
+
+    @Test
+    void multiplePrints() throws IOException {
+        // Given
+        bindAnswers("no");
+
+        // When build flow
+        Flow<Object, String> flow = askQuestion()
+                .print()
+                .print()
+                .build();
+
+        // Then the output is empty
+        assertThat(out.toString(), emptyString());
+        assertThat(errOut.toString(), emptyString());
+
+        // When start flow
+        flow.start(Flowable.empty());
+
+        // Then output equals to 2 messages from print operations
+        assertThat(out.toString(), equalTo("2" + System.lineSeparator()
+                        + "2" + System.lineSeparator()));
+        assertThat(errOut.toString(), emptyString());
+    }
+
+    @Test
+    void multiplePrintsWithError() throws IOException {
+        // Given
+        bindAnswers("no");
+
+        // When build flow
+        Flow<Object, String> flow = askQuestion()
+                .then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
+                .exceptionHandler(new TestExceptionHandler())
+                .print()
+                .print()
+                .build();
+
+        // Then the output is empty
+        assertThat(out.toString(), emptyString());
+        assertThat(errOut.toString(), emptyString());
+
+        // When start flow
+        flow.start(Flowable.empty());
+
+        // Then error output equals to 2 messages from exception handler
+        assertThat(out.toString(), emptyString());
+        assertThat(errOut.toString(), equalTo("Ooops!" + System.lineSeparator()
+                        + "Ooops!" + System.lineSeparator()));
+    }
+
+    @Test
+    void printAndStart() throws IOException {
+        // Given
+        bindAnswers("no");
+
+        // When start flow with print
+        askQuestion()
+                .print()
+                .start();
+
+        // Then error output equals to the message from answer
+        assertThat(out.toString(), equalTo("2" + System.lineSeparator()));
+        assertThat(errOut.toString(), emptyString());
+    }
+
+    @Test
+    void printAndStartError() throws IOException {
+        // Given
+        bindAnswers("no");
+
+        // When start flow with print
+        askQuestion()
+                .then(Flows.fromCall(new ThrowingStrCall(), StrCallInput::new))
+                .exceptionHandler(new TestExceptionHandler())
+                .print()
+                .start();
+
+        // Then error output equals to the message from exception
+        assertThat(out.toString(), emptyString());
+        assertThat(errOut.toString(), equalTo("Ooops!" + 
System.lineSeparator()));
+    }
+
+    @Test
+    void customDecorator() throws IOException {
+        // Given
+        bindAnswers("no");
+
+        // When start flow with print
+        askQuestion()
+                .print(data -> () -> "*" + data + "*")
+                .start();
+
+        // Then error output equals to the message from exception
+        assertThat(out.toString(), equalTo("*2*" + System.lineSeparator()));
+        assertThat(errOut.toString(), emptyString());
     }
 
     private static Flow<Object, Integer> createFlow() {
-        return Flows.question("Do you like this?",
-                        List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> 
1),
-                                new QuestionAnswer<>("no"::equals, (a, i) -> 
2))
-                )
-                .map(String::valueOf)
+        return askQuestion()
                 .question(s -> "Here is your number " + s + ":, would you like 
to multiply it by 2?",
                         List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> 
Integer.parseInt(i) * 2),
                                 new QuestionAnswer<>("no"::equals, (a, i) -> 
Integer.parseInt(i))))
@@ -177,6 +279,14 @@ class FlowTest {
                 .build();
     }
 
+    private static FlowBuilder<Object, String> askQuestion() {
+        return Flows.question("Do you like this?",
+                        List.of(new QuestionAnswer<>("yes"::equals, (a, i) -> 
1),
+                                new QuestionAnswer<>("no"::equals, (a, i) -> 
2))
+                )
+                .map(String::valueOf);
+    }
+
     private void bindAnswers(String... answers) throws IOException {
         Files.writeString(input, String.join("\n", answers) + "\n");
     }
diff --git 
a/modules/cli/src/test/java/org/apache/ignite/cli/commands/flow/TestExceptionHandler.java
 
b/modules/cli/src/test/java/org/apache/ignite/cli/commands/flow/TestExceptionHandler.java
index 4466a6d23d..a25d5732eb 100644
--- 
a/modules/cli/src/test/java/org/apache/ignite/cli/commands/flow/TestExceptionHandler.java
+++ 
b/modules/cli/src/test/java/org/apache/ignite/cli/commands/flow/TestExceptionHandler.java
@@ -17,20 +17,19 @@
 
 package org.apache.ignite.cli.commands.flow;
 
-import jdk.jshell.spi.ExecutionControl.RunException;
 import org.apache.ignite.cli.core.exception.ExceptionHandler;
 import org.apache.ignite.cli.core.exception.ExceptionWriter;
 
-class TestExceptionHandler implements ExceptionHandler<RunException> {
+class TestExceptionHandler implements ExceptionHandler<RuntimeException> {
     @Override
-    public int handle(ExceptionWriter err, RunException e) {
+    public int handle(ExceptionWriter err, RuntimeException e) {
         err.write(e.getMessage());
 
         return 0;
     }
 
     @Override
-    public Class<RunException> applicableException() {
-        return RunException.class;
+    public Class<RuntimeException> applicableException() {
+        return RuntimeException.class;
     }
 }

Reply via email to