This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8e37291b97 IGNITE-19108 Check connection during REPL session in CLI
(#2359)
8e37291b97 is described below
commit 8e37291b97f85d84b6a824549d92668692c43bd1
Author: Dmitry Baranov <[email protected]>
AuthorDate: Tue Aug 8 19:09:44 2023 +0300
IGNITE-19108 Check connection during REPL session in CLI (#2359)
* start connection heartbeat thread on 'connect' command
* stop heartbeat on 'disconnect'
* introduce event publish/subscribe
---
...liCommandTestNotInitializedIntegrationBase.java | 12 +-
.../cli/commands/ItConnectionHeartbeatTest.java | 142 +++++++++++++++++++++
.../repl/executor/ItIgnitePicocliCommandsTest.java | 7 +-
.../apache/ignite/internal/cli/ReplManager.java | 5 +
.../internal/cli/call/connect/ConnectCall.java | 19 ++-
.../internal/cli/call/connect/DisconnectCall.java | 11 +-
...SessionEventListener.java => ConnectEvent.java} | 26 +++-
.../cli/core/repl/ConnectionHeartBeat.java | 134 +++++++++++++++++++
...EventListener.java => ConnectionLostEvent.java} | 17 ++-
...tListener.java => ConnectionRestoredEvent.java} | 17 ++-
...sionEventListener.java => DisconnectEvent.java} | 17 ++-
.../core/repl/EventListeningActivationPoint.java | 91 +++++++++++++
.../cli/core/repl/PeriodicSessionTaskExecutor.java | 3 +-
.../apache/ignite/internal/cli/core/repl/Repl.java | 11 +-
.../ignite/internal/cli/core/repl/ReplBuilder.java | 16 ++-
.../ignite/internal/cli/core/repl/Session.java | 42 ++----
.../ignite/internal/cli/core/repl/SessionInfo.java | 47 ++++++-
.../cli/core/repl/executor/ReplExecutor.java | 2 +
.../cli/core/repl/prompt/ReplPromptProvider.java | 19 ++-
.../registry/impl/ClusterConfigRegistryImpl.java | 4 +-
.../repl/registry/impl/MetricRegistryImpl.java | 4 +-
.../repl/registry/impl/NodeConfigRegistryImpl.java | 4 +-
.../core/repl/registry/impl/UnitsRegistryImpl.java | 4 +-
.../cli/event/ConnectionEventListener.java | 65 ++++++++++
.../Event.java} | 15 +--
.../ignite/internal/cli/event/EventFactory.java | 84 ++++++++++++
.../EventListener.java} | 15 +--
.../EventPublisher.java} | 19 +--
.../EventSubscriptionManager.java} | 14 +-
.../EventType.java} | 18 +--
.../apache/ignite/internal/cli/event/Events.java | 67 ++++++++++
.../core/repl/SessionDefaultValueProviderTest.java | 6 +-
.../filter/DynamicCompleterFilterTest.java | 10 +-
33 files changed, 833 insertions(+), 134 deletions(-)
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/CliCommandTestNotInitializedIntegrationBase.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/CliCommandTestNotInitializedIntegrationBase.java
index 18951465a9..ccd0fa3930 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/CliCommandTestNotInitializedIntegrationBase.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/CliCommandTestNotInitializedIntegrationBase.java
@@ -30,10 +30,12 @@ import
org.apache.ignite.internal.cli.commands.cliconfig.TestConfigManagerProvid
import org.apache.ignite.internal.cli.commands.node.NodeNameOrUrl;
import org.apache.ignite.internal.cli.config.ConfigDefaultValueProvider;
import org.apache.ignite.internal.cli.core.converters.NodeNameOrUrlConverter;
-import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.core.repl.EventListeningActivationPoint;
import
org.apache.ignite.internal.cli.core.repl.context.CommandLineContextProvider;
import org.apache.ignite.internal.cli.core.repl.registry.JdbcUrlRegistry;
import org.apache.ignite.internal.cli.core.repl.registry.NodeNameRegistry;
+import org.apache.ignite.internal.cli.event.EventPublisher;
+import org.apache.ignite.internal.cli.event.Events;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -73,7 +75,10 @@ public class CliCommandTestNotInitializedIntegrationBase
extends CliIntegrationT
private int exitCode = Integer.MIN_VALUE;
@Inject
- private Session session;
+ private EventPublisher eventPublisher;
+
+ @Inject
+ private EventListeningActivationPoint eventListeningActivationPoint;
/**
* Invokes before the test will start.
@@ -89,13 +94,14 @@ public class CliCommandTestNotInitializedIntegrationBase
extends CliIntegrationT
cmd = new CommandLine(getCommandClass(), new MicronautFactory(context))
.registerConverter(NodeNameOrUrl.class, new
NodeNameOrUrlConverter(nodeNameRegistry));
cmd.setDefaultValueProvider(configDefaultValueProvider);
+ eventListeningActivationPoint.subscribe();
resetOutput();
CommandLineContextProvider.setCmd(cmd);
}
@AfterEach
public void tearDown() {
- session.disconnect();
+ eventPublisher.publish(Events.disconnect());
}
protected void resetOutput() {
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java
new file mode 100644
index 0000000000..487835029c
--- /dev/null
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/commands/ItConnectionHeartbeatTest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.commands;
+
+import static org.apache.ignite.internal.cli.event.EventType.CONNECTION_LOST;
+import static
org.apache.ignite.internal.cli.event.EventType.CONNECTION_RESTORED;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertAll;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import io.micronaut.context.annotation.Property;
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Inject;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.cli.core.repl.Session;
+import org.apache.ignite.internal.cli.event.EventSubscriptionManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+@Property(name = "cli.check.connection.period.second", value = "1")
+class ItConnectionHeartbeatTest extends
CliCommandTestInitializedIntegrationBase {
+
+ @Inject
+ Session session;
+
+ @Inject
+ EventSubscriptionManager eventSubscriptionManager;
+
+ @Value("${cli.check.connection.period.second}")
+ private long cliCheckConnectionPeriodSecond;
+
+ private final AtomicInteger connectionLostCount = new AtomicInteger(0);
+ private final AtomicInteger connectionRestoredCount = new AtomicInteger(0);
+
+ @BeforeEach
+ void setUp() {
+ connectionLostCount.set(0);
+ connectionRestoredCount.set(0);
+
+ // Register listeners
+ eventSubscriptionManager.subscribe(CONNECTION_LOST, event ->
connectionLostCount.incrementAndGet());
+ eventSubscriptionManager.subscribe(CONNECTION_RESTORED, event ->
connectionRestoredCount.incrementAndGet());
+ }
+
+ @Override
+ protected Class<?> getCommandClass() {
+ return TopLevelCliReplCommand.class;
+ }
+
+ @Test
+ @DisplayName("Should send event CONNECTION_RESTORED on connection start")
+ void connectionEstablished() {
+ // Given null session info before connect
+ assertNull(session.info());
+
+ // When connect without parameters
+ execute("connect");
+
+ // Then
+ assertAll(
+ this::assertErrOutputIsEmpty,
+ () -> assertOutputContains("Connected to
http://localhost:10300")
+ );
+
+ // Listener was invoked
+ await().timeout(cliCheckConnectionPeriodSecond * 2,
TimeUnit.SECONDS).until(() -> connectionRestoredCount.get() == 1);
+ assertEquals(0, connectionLostCount.get());
+ }
+
+ @Test
+ @DisplayName("Should send event CONNECTION_LOST on cluster stop")
+ void onConnectionLost() {
+ // Given connected cli
+ execute("connect");
+
+ // Then
+ assertAll(
+ this::assertErrOutputIsEmpty,
+ () -> assertOutputContains("Connected to
http://localhost:10300")
+ );
+
+ // When stop node
+ String nodeName = session.info().nodeName();
+ stopNode(nodeName);
+
+ // Listener was invoked
+ await().timeout(cliCheckConnectionPeriodSecond * 2,
TimeUnit.SECONDS).until(
+ () -> connectionRestoredCount.get() == 1 &&
connectionLostCount.get() == 1
+ );
+
+ // Tear down. Restore initial state of node to exclude any impact on
next test.
+ startNode(nodeName);
+ }
+
+ @Test
+ @DisplayName("Should send event CONNECTION_LOST on cluster stop")
+ void restoreConnectionAfterConnectionLost() {
+ // Given connected cli
+ execute("connect");
+
+ // Then
+ assertAll(
+ this::assertErrOutputIsEmpty,
+ () -> assertOutputContains("Connected to
http://localhost:10300")
+ );
+
+ // When stop node
+ String nodeName = session.info().nodeName();
+ stopNode(nodeName);
+
+ // Then connection lost event obtained
+ await().timeout(cliCheckConnectionPeriodSecond * 2,
TimeUnit.SECONDS).until(
+ () -> connectionRestoredCount.get() == 1 &&
connectionLostCount.get() == 1
+ );
+
+ // When
+ startNode(nodeName);
+
+ // Then one more connection restore event obtained
+ await().timeout(cliCheckConnectionPeriodSecond * 2,
TimeUnit.SECONDS).until(
+ () -> connectionRestoredCount.get() == 2 &&
connectionLostCount.get() == 1
+ );
+ }
+}
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/core/repl/executor/ItIgnitePicocliCommandsTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/core/repl/executor/ItIgnitePicocliCommandsTest.java
index bf8697187d..8c3141924e 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/core/repl/executor/ItIgnitePicocliCommandsTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/core/repl/executor/ItIgnitePicocliCommandsTest.java
@@ -51,6 +51,8 @@ import
org.apache.ignite.internal.cli.core.repl.completer.filter.CompleterFilter
import
org.apache.ignite.internal.cli.core.repl.completer.filter.DynamicCompleterFilter;
import
org.apache.ignite.internal.cli.core.repl.completer.filter.NonRepeatableOptionsFilter;
import
org.apache.ignite.internal.cli.core.repl.completer.filter.ShortOptionsFilter;
+import org.apache.ignite.internal.cli.event.EventPublisher;
+import org.apache.ignite.internal.cli.event.Events;
import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
import org.assertj.core.util.Files;
import org.jline.reader.Candidate;
@@ -102,6 +104,9 @@ public class ItIgnitePicocliCommandsTest extends
CliCommandTestInitializedIntegr
@Inject
Session session;
+ @Inject
+ EventPublisher eventPublisher;
+
SystemCompleter completer;
LineReader lineReader;
@@ -245,7 +250,7 @@ public class ItIgnitePicocliCommandsTest extends
CliCommandTestInitializedIntegr
}
private void connected() {
- session.connect(new SessionInfo(DEFAULT_REST_URL, null, null, null));
+
eventPublisher.publish(Events.connect(SessionInfo.builder().nodeUrl(DEFAULT_REST_URL).build()));
}
private Stream<Arguments> nodeConfigUpdateSuggestedSource() {
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/ReplManager.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/ReplManager.java
index e225a16de7..b24f8a0efc 100644
--- a/modules/cli/src/main/java/org/apache/ignite/internal/cli/ReplManager.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/ReplManager.java
@@ -24,6 +24,7 @@ import
org.apache.ignite.internal.cli.commands.questions.ConnectToClusterQuestio
import org.apache.ignite.internal.cli.core.call.CallExecutionPipeline;
import org.apache.ignite.internal.cli.core.call.StringCallInput;
import
org.apache.ignite.internal.cli.core.exception.handler.DefaultExceptionHandlers;
+import org.apache.ignite.internal.cli.core.repl.EventListeningActivationPoint;
import org.apache.ignite.internal.cli.core.repl.Repl;
import org.apache.ignite.internal.cli.core.repl.SessionDefaultValueProvider;
import org.apache.ignite.internal.cli.core.repl.executor.ReplExecutorProvider;
@@ -46,6 +47,9 @@ public class ReplManager {
@Inject
private ConnectToClusterQuestion question;
+ @Inject
+ private EventListeningActivationPoint eventListeningActivationPoint;
+
/**
* Enters REPL mode.
*/
@@ -65,6 +69,7 @@ public class ReplManager {
.withOnStart(question::askQuestionOnReplStart)
.withHistoryFileName("history")
.withAutosuggestionsWidgets()
+ .withEventSubscriber(eventListeningActivationPoint)
.build());
}
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java
index c070777c04..91eb428084 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/ConnectCall.java
@@ -36,6 +36,8 @@ import org.apache.ignite.internal.cli.core.repl.SessionInfo;
import org.apache.ignite.internal.cli.core.rest.ApiClientFactory;
import org.apache.ignite.internal.cli.core.style.component.MessageUiComponent;
import org.apache.ignite.internal.cli.core.style.element.UiElements;
+import org.apache.ignite.internal.cli.event.EventPublisher;
+import org.apache.ignite.internal.cli.event.Events;
import org.apache.ignite.rest.client.api.NodeConfigurationApi;
import org.apache.ignite.rest.client.api.NodeManagementApi;
import org.apache.ignite.rest.client.invoker.ApiException;
@@ -56,15 +58,18 @@ public class ConnectCall implements Call<UrlCallInput,
String> {
private final JdbcUrlFactory jdbcUrlFactory;
+ private final EventPublisher eventPublisher;
+
/**
* Constructor.
*/
public ConnectCall(Session session, StateConfigProvider
stateConfigProvider, ApiClientFactory clientFactory,
- JdbcUrlFactory jdbcUrlFactory) {
+ JdbcUrlFactory jdbcUrlFactory, EventPublisher eventPublisher) {
this.session = session;
this.stateConfigProvider = stateConfigProvider;
this.clientFactory = clientFactory;
this.jdbcUrlFactory = jdbcUrlFactory;
+ this.eventPublisher = eventPublisher;
}
@Override
@@ -81,11 +86,19 @@ public class ConnectCall implements Call<UrlCallInput,
String> {
stateConfigProvider.get().setProperty(CliConfigKeys.LAST_CONNECTED_URL.value(),
nodeUrl);
String jdbcUrl = jdbcUrlFactory.constructJdbcUrl(configuration,
nodeUrl);
- session.connect(new SessionInfo(nodeUrl, fetchNodeName(nodeUrl),
jdbcUrl, username));
+ sessionInfo = SessionInfo.builder()
+ .nodeUrl(nodeUrl)
+ .nodeName(fetchNodeName(nodeUrl))
+ .jdbcUrl(jdbcUrl)
+ .username(username)
+ .build();
+ eventPublisher.publish(Events.connect(sessionInfo));
return
DefaultCallOutput.success(MessageUiComponent.fromMessage("Connected to %s",
UiElements.url(nodeUrl)).render());
} catch (Exception e) {
- session.disconnect();
+ if (session.info() != null) {
+ eventPublisher.publish(Events.disconnect());
+ }
return DefaultCallOutput.failure(handleException(e, nodeUrl));
}
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/DisconnectCall.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/DisconnectCall.java
index 1ce901538d..e16dc3e590 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/DisconnectCall.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/call/connect/DisconnectCall.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.cli.call.connect;
-import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.ignite.internal.cli.core.call.Call;
import org.apache.ignite.internal.cli.core.call.CallOutput;
@@ -27,17 +26,21 @@ import org.apache.ignite.internal.cli.core.repl.Session;
import org.apache.ignite.internal.cli.core.repl.SessionInfo;
import org.apache.ignite.internal.cli.core.style.component.MessageUiComponent;
import org.apache.ignite.internal.cli.core.style.element.UiElements;
+import org.apache.ignite.internal.cli.event.EventPublisher;
+import org.apache.ignite.internal.cli.event.Events;
/**
* Call for disconnect.
*/
@Singleton
public class DisconnectCall implements Call<EmptyCallInput, String> {
- @Inject
private final Session session;
- public DisconnectCall(Session session) {
+ private final EventPublisher eventPublisher;
+
+ public DisconnectCall(Session session, EventPublisher eventPublisher) {
this.session = session;
+ this.eventPublisher = eventPublisher;
}
@Override
@@ -45,7 +48,7 @@ public class DisconnectCall implements Call<EmptyCallInput,
String> {
SessionInfo sessionInfo = session.info();
if (sessionInfo != null) {
String nodeUrl = sessionInfo.nodeUrl();
- session.disconnect();
+ eventPublisher.publish(Events.disconnect());
return DefaultCallOutput.success(
MessageUiComponent.fromMessage("Disconnected from %s",
UiElements.url(nodeUrl)).render()
);
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectEvent.java
similarity index 63%
copy from
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
copy to
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectEvent.java
index 272fbbc3f3..1fd32470af 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectEvent.java
@@ -17,12 +17,26 @@
package org.apache.ignite.internal.cli.core.repl;
-/** Session event listener. */
-public interface AsyncSessionEventListener {
+import org.apache.ignite.internal.cli.event.Event;
+import org.apache.ignite.internal.cli.event.EventType;
- /** Implementation must be async. */
- void onConnect(SessionInfo sessionInfo);
+/**
+ * User session connected event.
+ */
+public class ConnectEvent implements Event {
+
+ private final SessionInfo sessionInfo;
+
+ public ConnectEvent(SessionInfo sessionInfo) {
+ this.sessionInfo = sessionInfo;
+ }
+
+ public SessionInfo sessionInfo() {
+ return sessionInfo;
+ }
- /** Implementation must be async. */
- void onDisconnect();
+ @Override
+ public EventType eventType() {
+ return EventType.CONNECT;
+ }
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionHeartBeat.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionHeartBeat.java
new file mode 100644
index 0000000000..84ac2e74b6
--- /dev/null
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionHeartBeat.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.repl;
+
+import io.micronaut.context.annotation.Value;
+import jakarta.inject.Singleton;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
+import org.apache.ignite.internal.cli.core.rest.ApiClientFactory;
+import org.apache.ignite.internal.cli.event.ConnectionEventListener;
+import org.apache.ignite.internal.cli.event.EventPublisher;
+import org.apache.ignite.internal.cli.event.Events;
+import org.apache.ignite.internal.cli.logger.CliLoggers;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.rest.client.api.NodeManagementApi;
+import org.apache.ignite.rest.client.invoker.ApiException;
+
+/**
+ * Connection to node heart beat.
+ */
+@Singleton
+public class ConnectionHeartBeat implements ConnectionEventListener {
+
+ private static final IgniteLogger LOG =
CliLoggers.forClass(ConnectionHeartBeat.class);
+
+ /** CLI check connection period period. */
+ private final long cliCheckConnectionPeriodSecond;
+
+ /** Scheduled executor for connection heartbeat. */
+ @Nullable
+ private ScheduledExecutorService scheduledConnectionHeartbeatExecutor;
+
+ private final ApiClientFactory clientFactory;
+
+ private final EventPublisher eventPublisher;
+
+ private final AtomicBoolean connected = new AtomicBoolean(false);
+
+ private final Lock lock = new ReentrantLock();
+
+ /**
+ * Creates the instance of connection heartbeat.
+ *
+ * @param clientFactory api client factory.
+ * @param eventPublisher event publisher.
+ */
+ public
ConnectionHeartBeat(@Value("${cli.check.connection.period.second:5}") long
cliCheckConnectionPeriodSecond,
+ ApiClientFactory clientFactory,
+ EventPublisher eventPublisher) {
+ this.clientFactory = clientFactory;
+ this.eventPublisher = eventPublisher;
+ this.cliCheckConnectionPeriodSecond = cliCheckConnectionPeriodSecond;
+ }
+
+ /**
+ * Starts connection heartbeat. By default connection will be checked
every 5 sec.
+ *
+ * @param sessionInfo session info with node url
+ */
+ @Override
+ public void onConnect(SessionInfo sessionInfo) {
+ if (connected.compareAndSet(false, true)) {
+ eventPublisher.publish(Events.connectionRestored());
+ }
+
+ lock.lock();
+ try {
+ if (scheduledConnectionHeartbeatExecutor == null) {
+ scheduledConnectionHeartbeatExecutor =
+ Executors.newScheduledThreadPool(1, new
NamedThreadFactory("cli-check-connection-thread", LOG));
+
+ // Start connection heart beat
+ scheduledConnectionHeartbeatExecutor.scheduleAtFixedRate(
+ () -> pingConnection(sessionInfo.nodeUrl()),
+ 0,
+ cliCheckConnectionPeriodSecond,
+ TimeUnit.SECONDS
+ );
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Stops connection heartbeat.
+ */
+ @Override
+ public void onDisconnect() {
+ lock.lock();
+ try {
+ if (scheduledConnectionHeartbeatExecutor != null) {
+ scheduledConnectionHeartbeatExecutor.shutdownNow();
+ scheduledConnectionHeartbeatExecutor = null;
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void pingConnection(String nodeUrl) {
+ try {
+ new
NodeManagementApi(clientFactory.getClient(nodeUrl)).nodeState();
+ if (connected.compareAndSet(false, true)) {
+ eventPublisher.publish(Events.connectionRestored());
+ }
+ } catch (ApiException exception) {
+ if (connected.compareAndSet(true, false)) {
+ eventPublisher.publish(Events.connectionLost());
+ }
+ }
+ }
+}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionLostEvent.java
similarity index 74%
copy from
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
copy to
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionLostEvent.java
index 272fbbc3f3..4c5f41f5b8 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionLostEvent.java
@@ -17,12 +17,15 @@
package org.apache.ignite.internal.cli.core.repl;
-/** Session event listener. */
-public interface AsyncSessionEventListener {
+import org.apache.ignite.internal.cli.event.Event;
+import org.apache.ignite.internal.cli.event.EventType;
- /** Implementation must be async. */
- void onConnect(SessionInfo sessionInfo);
-
- /** Implementation must be async. */
- void onDisconnect();
+/**
+ * Connection lost event.
+ */
+public class ConnectionLostEvent implements Event {
+ @Override
+ public EventType eventType() {
+ return EventType.CONNECTION_LOST;
+ }
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionRestoredEvent.java
similarity index 73%
copy from
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
copy to
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionRestoredEvent.java
index 272fbbc3f3..5a6c8438b0 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ConnectionRestoredEvent.java
@@ -17,12 +17,15 @@
package org.apache.ignite.internal.cli.core.repl;
-/** Session event listener. */
-public interface AsyncSessionEventListener {
+import org.apache.ignite.internal.cli.event.Event;
+import org.apache.ignite.internal.cli.event.EventType;
- /** Implementation must be async. */
- void onConnect(SessionInfo sessionInfo);
-
- /** Implementation must be async. */
- void onDisconnect();
+/**
+ * Connection restored event.
+ */
+public class ConnectionRestoredEvent implements Event {
+ @Override
+ public EventType eventType() {
+ return EventType.CONNECTION_RESTORED;
+ }
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/DisconnectEvent.java
similarity index 74%
copy from
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
copy to
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/DisconnectEvent.java
index 272fbbc3f3..74213043a2 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/DisconnectEvent.java
@@ -17,12 +17,15 @@
package org.apache.ignite.internal.cli.core.repl;
-/** Session event listener. */
-public interface AsyncSessionEventListener {
+import org.apache.ignite.internal.cli.event.Event;
+import org.apache.ignite.internal.cli.event.EventType;
- /** Implementation must be async. */
- void onConnect(SessionInfo sessionInfo);
-
- /** Implementation must be async. */
- void onDisconnect();
+/**
+ * User session disconnected event.
+ */
+public class DisconnectEvent implements Event {
+ @Override
+ public EventType eventType() {
+ return EventType.DISCONNECT;
+ }
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/EventListeningActivationPoint.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/EventListeningActivationPoint.java
new file mode 100644
index 0000000000..fe8040f240
--- /dev/null
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/EventListeningActivationPoint.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.core.repl;
+
+import jakarta.inject.Inject;
+import jakarta.inject.Singleton;
+import org.apache.ignite.internal.cli.core.repl.prompt.ReplPromptProvider;
+import
org.apache.ignite.internal.cli.core.repl.registry.impl.ClusterConfigRegistryImpl;
+import
org.apache.ignite.internal.cli.core.repl.registry.impl.MetricRegistryImpl;
+import
org.apache.ignite.internal.cli.core.repl.registry.impl.NodeConfigRegistryImpl;
+import
org.apache.ignite.internal.cli.core.repl.registry.impl.UnitsRegistryImpl;
+import org.apache.ignite.internal.cli.event.EventSubscriptionManager;
+import org.apache.ignite.internal.cli.event.EventType;
+
+/**
+ * Subscribes beans on appropriate events.
+ */
+@Singleton
+public class EventListeningActivationPoint {
+
+ @Inject
+ private EventSubscriptionManager eventSubscriptionManager;
+
+ @Inject
+ private PeriodicSessionTaskExecutor periodicSessionTaskExecutor;
+
+ @Inject
+ private ClusterConfigRegistryImpl clusterConfigRegistry;
+
+ @Inject
+ private MetricRegistryImpl metricRegistry;
+
+ @Inject
+ private NodeConfigRegistryImpl nodeConfigRegistry;
+
+ @Inject
+ private UnitsRegistryImpl unitsRegistry;
+
+ @Inject
+ private ConnectionHeartBeat connectionHeartBeat;
+
+ @Inject
+ private Session session;
+
+ @Inject
+ private ReplPromptProvider promptProvider;
+
+ /**
+ * Subscribes event listeners.
+ */
+ public void subscribe() {
+ eventSubscriptionManager.subscribe(EventType.CONNECT,
periodicSessionTaskExecutor);
+ eventSubscriptionManager.subscribe(EventType.DISCONNECT,
periodicSessionTaskExecutor);
+
+ eventSubscriptionManager.subscribe(EventType.CONNECT,
clusterConfigRegistry);
+ eventSubscriptionManager.subscribe(EventType.DISCONNECT,
clusterConfigRegistry);
+
+ eventSubscriptionManager.subscribe(EventType.CONNECT, metricRegistry);
+ eventSubscriptionManager.subscribe(EventType.DISCONNECT,
metricRegistry);
+
+ eventSubscriptionManager.subscribe(EventType.CONNECT,
nodeConfigRegistry);
+ eventSubscriptionManager.subscribe(EventType.DISCONNECT,
nodeConfigRegistry);
+
+ eventSubscriptionManager.subscribe(EventType.CONNECT, unitsRegistry);
+ eventSubscriptionManager.subscribe(EventType.DISCONNECT,
unitsRegistry);
+
+ eventSubscriptionManager.subscribe(EventType.CONNECT,
connectionHeartBeat);
+ eventSubscriptionManager.subscribe(EventType.DISCONNECT,
connectionHeartBeat);
+
+ eventSubscriptionManager.subscribe(EventType.CONNECT, session);
+ eventSubscriptionManager.subscribe(EventType.DISCONNECT, session);
+
+ eventSubscriptionManager.subscribe(EventType.CONNECTION_LOST,
promptProvider);
+ eventSubscriptionManager.subscribe(EventType.CONNECTION_RESTORED,
promptProvider);
+ }
+}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/PeriodicSessionTaskExecutor.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/PeriodicSessionTaskExecutor.java
index 364c49abe6..ba7b29dcc2 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/PeriodicSessionTaskExecutor.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/PeriodicSessionTaskExecutor.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.cli.event.ConnectionEventListener;
import org.apache.ignite.internal.cli.logger.CliLoggers;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.thread.NamedThreadFactory;
@@ -31,7 +32,7 @@ import org.jetbrains.annotations.Nullable;
/** Executes tasks periodically while the session is connected. */
@Singleton
-public class PeriodicSessionTaskExecutor implements AsyncSessionEventListener {
+public class PeriodicSessionTaskExecutor implements ConnectionEventListener {
private static final IgniteLogger LOG =
CliLoggers.forClass(PeriodicSessionTaskExecutor.class);
@Nullable
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/Repl.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/Repl.java
index 472a49fcd0..36744c75e9 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/Repl.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/Repl.java
@@ -55,6 +55,8 @@ public class Repl {
private final Runnable onStart;
+ private final EventListeningActivationPoint eventListeningActivationPoint;
+
/**
* Constructor.
*
@@ -68,6 +70,7 @@ public class Repl {
* @param historyFileName file name for storing commands history.
* @param tailTipWidgetsEnabled whether tailtip widgets are enabled.
* @param onStart callback that will run when REPL is started.
+ * @param eventListeningActivationPoint event listening activation point
*/
public Repl(PromptProvider promptProvider,
Class<?> commandClass,
@@ -79,7 +82,8 @@ public class Repl {
String historyFileName,
boolean tailTipWidgetsEnabled,
boolean autosuggestionsWidgetsEnabled,
- Runnable onStart
+ Runnable onStart,
+ EventListeningActivationPoint eventListeningActivationPoint
) {
this.promptProvider = promptProvider;
this.commandClass = commandClass;
@@ -92,6 +96,7 @@ public class Repl {
this.tailTipWidgetsEnabled = tailTipWidgetsEnabled;
this.autosuggestionsWidgetsEnabled = autosuggestionsWidgetsEnabled;
this.onStart = onStart;
+ this.eventListeningActivationPoint = eventListeningActivationPoint;
}
/**
@@ -164,4 +169,8 @@ public class Repl {
public void onStart() {
onStart.run();
}
+
+ public EventListeningActivationPoint getEventListeningActivationPoint() {
+ return eventListeningActivationPoint;
+ }
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ReplBuilder.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ReplBuilder.java
index 41cda51bdf..6e33567c84 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ReplBuilder.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/ReplBuilder.java
@@ -53,6 +53,8 @@ public class ReplBuilder {
private Runnable onStart = () -> {};
+ private EventListeningActivationPoint eventListeningActivationPoint;
+
/**
* Build methods.
*
@@ -70,7 +72,8 @@ public class ReplBuilder {
historyFileName,
tailTipWidgetsEnabled,
autosuggestionsWidgetsEnabled,
- onStart
+ onStart,
+ eventListeningActivationPoint
);
}
@@ -152,4 +155,15 @@ public class ReplBuilder {
this.autosuggestionsWidgetsEnabled = true;
return this;
}
+
+ /**
+ * Builder setter of {@code eventListeningActivationPoint} field.
+ *
+ * @param eventListeningActivationPoint event listening activation point.
+ * @return invoked builder instance {@link ReplBuilder}.
+ */
+ public ReplBuilder withEventSubscriber(EventListeningActivationPoint
eventListeningActivationPoint) {
+ this.eventListeningActivationPoint = eventListeningActivationPoint;
+ return this;
+ }
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/Session.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/Session.java
index 242764fe43..0f5ec6b008 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/Session.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/Session.java
@@ -18,55 +18,31 @@
package org.apache.ignite.internal.cli.core.repl;
import jakarta.inject.Singleton;
-import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.cli.core.exception.ConnectionException;
-import org.apache.ignite.internal.cli.logger.CliLoggers;
-import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.cli.event.ConnectionEventListener;
/**
* Connection session that in fact is holder for state: connected or
disconnected. Also has session info if the state is connected.
*/
@Singleton
-public class Session {
-
- private static final IgniteLogger log = CliLoggers.forClass(Session.class);
+public class Session implements ConnectionEventListener {
private final AtomicReference<SessionInfo> info = new AtomicReference<>();
- private final List<? extends AsyncSessionEventListener> listeners;
-
- public Session(List<? extends AsyncSessionEventListener> listeners) {
- this.listeners = listeners;
+ public Session() {
}
- /** Creates session info with provided nodeUrl, nodeName, jdbcUrl. */
- public void connect(SessionInfo newInfo) {
- if (info.compareAndSet(null, newInfo)) {
- listeners.forEach(it -> {
- try {
- it.onConnect(newInfo);
- } catch (Exception e) {
- log.warn("Got an exception: ", e);
- }
- });
- } else {
+ @Override
+ public void onConnect(SessionInfo sessionInfo) {
+ if (!info.compareAndSet(null, sessionInfo)) {
throw new ConnectionException("Already connected to " +
info.get().nodeUrl());
}
}
- /** Clears session info and sets false to connectedToNode. */
- public void disconnect() {
- SessionInfo wasConnected = info.getAndSet(null);
- if (wasConnected != null) {
- listeners.forEach(it -> {
- try {
- it.onDisconnect();
- } catch (Exception e) {
- log.warn("Got an exception: ", e);
- }
- });
- }
+ @Override
+ public void onDisconnect() {
+ info.getAndSet(null);
}
/** Returns {@link SessionInfo}. */
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/SessionInfo.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/SessionInfo.java
index 1111452cea..5ba88806f2 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/SessionInfo.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/SessionInfo.java
@@ -28,7 +28,7 @@ public class SessionInfo {
private final String username;
/** Constructor. */
- public SessionInfo(String nodeUrl, String nodeName, String jdbcUrl, String
username) {
+ private SessionInfo(String nodeUrl, String nodeName, String jdbcUrl,
String username) {
this.nodeUrl = nodeUrl;
this.nodeName = nodeName;
this.jdbcUrl = jdbcUrl;
@@ -50,4 +50,49 @@ public class SessionInfo {
public String username() {
return username;
}
+
+ public static SessionInfoBuilder builder() {
+ return new SessionInfoBuilder();
+ }
+
+ /**
+ * Session info builder.
+ */
+ public static final class SessionInfoBuilder {
+ private String nodeUrl;
+ private String nodeName;
+ private String jdbcUrl;
+ private String username;
+
+ private SessionInfoBuilder() {
+ }
+
+ public static SessionInfoBuilder builder() {
+ return new SessionInfoBuilder();
+ }
+
+ public SessionInfoBuilder nodeUrl(String nodeUrl) {
+ this.nodeUrl = nodeUrl;
+ return this;
+ }
+
+ public SessionInfoBuilder nodeName(String nodeName) {
+ this.nodeName = nodeName;
+ return this;
+ }
+
+ public SessionInfoBuilder jdbcUrl(String jdbcUrl) {
+ this.jdbcUrl = jdbcUrl;
+ return this;
+ }
+
+ public SessionInfoBuilder username(String username) {
+ this.username = username;
+ return this;
+ }
+
+ public SessionInfo build() {
+ return new SessionInfo(nodeUrl, nodeName, jdbcUrl, username);
+ }
+ }
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/executor/ReplExecutor.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/executor/ReplExecutor.java
index 42ffc5a5d1..c38e7b1acd 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/executor/ReplExecutor.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/executor/ReplExecutor.java
@@ -129,6 +129,8 @@ public class ReplExecutor {
setupWidgets(repl, registry, reader);
+ repl.getEventListeningActivationPoint().subscribe();
+
repl.onStart();
while (!interrupted.get()) {
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/prompt/ReplPromptProvider.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/prompt/ReplPromptProvider.java
index b56dd6312a..996e7243c5 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/prompt/ReplPromptProvider.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/prompt/ReplPromptProvider.java
@@ -21,17 +21,21 @@ import static
org.apache.ignite.internal.cli.core.style.AnsiStringSupport.ansi;
import static org.apache.ignite.internal.cli.core.style.AnsiStringSupport.fg;
import jakarta.inject.Singleton;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.cli.core.repl.Session;
import org.apache.ignite.internal.cli.core.repl.SessionInfo;
import org.apache.ignite.internal.cli.core.style.AnsiStringSupport.Color;
+import org.apache.ignite.internal.cli.event.ConnectionEventListener;
/**
* Provider for prompt in REPL.
*/
@Singleton
-public class ReplPromptProvider implements PromptProvider {
+public class ReplPromptProvider implements PromptProvider,
ConnectionEventListener {
private final Session session;
+ private final AtomicBoolean connected = new AtomicBoolean(true);
+
public ReplPromptProvider(Session session) {
this.session = session;
}
@@ -45,7 +49,8 @@ public class ReplPromptProvider implements PromptProvider {
SessionInfo sessionInfo = session.info();
if (sessionInfo != null) {
String username = sessionInfo.username();
- return ansi(fg(Color.GREEN).mark(
+ Color color = connected.get() ? Color.GREEN : Color.YELLOW;
+ return ansi(fg(color).mark(
"["
+ (username != null ? username + ":" : "")
+ sessionInfo.nodeName()
@@ -54,4 +59,14 @@ public class ReplPromptProvider implements PromptProvider {
}
return ansi(fg(Color.RED).mark("[disconnected]")) + postfix;
}
+
+ @Override
+ public void onConnectionLost() {
+ connected.getAndSet(false);
+ }
+
+ @Override
+ public void onConnectionRestored() {
+ connected.getAndSet(true);
+ }
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/ClusterConfigRegistryImpl.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/ClusterConfigRegistryImpl.java
index 28e1f0f359..23dacc456e 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/ClusterConfigRegistryImpl.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/ClusterConfigRegistryImpl.java
@@ -24,14 +24,14 @@ import
org.apache.ignite.internal.cli.call.configuration.ClusterConfigShowCall;
import
org.apache.ignite.internal.cli.call.configuration.ClusterConfigShowCallInput;
import org.apache.ignite.internal.cli.call.configuration.JsonString;
import org.apache.ignite.internal.cli.core.call.DefaultCallOutput;
-import org.apache.ignite.internal.cli.core.repl.AsyncSessionEventListener;
import org.apache.ignite.internal.cli.core.repl.SessionInfo;
import org.apache.ignite.internal.cli.core.repl.registry.ClusterConfigRegistry;
+import org.apache.ignite.internal.cli.event.ConnectionEventListener;
import org.jetbrains.annotations.Nullable;
/** Implementation of {@link ClusterConfigRegistry}. */
@Singleton
-public class ClusterConfigRegistryImpl implements ClusterConfigRegistry,
AsyncSessionEventListener {
+public class ClusterConfigRegistryImpl implements ClusterConfigRegistry,
ConnectionEventListener {
private final ClusterConfigShowCall clusterConfigShowCall;
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/MetricRegistryImpl.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/MetricRegistryImpl.java
index 22e22009fd..d41ce3a499 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/MetricRegistryImpl.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/MetricRegistryImpl.java
@@ -25,15 +25,15 @@ import java.util.stream.Collectors;
import
org.apache.ignite.internal.cli.call.node.metric.NodeMetricSourceListCall;
import org.apache.ignite.internal.cli.core.call.CallOutput;
import org.apache.ignite.internal.cli.core.call.UrlCallInput;
-import org.apache.ignite.internal.cli.core.repl.AsyncSessionEventListener;
import org.apache.ignite.internal.cli.core.repl.SessionInfo;
import org.apache.ignite.internal.cli.core.repl.registry.MetricRegistry;
+import org.apache.ignite.internal.cli.event.ConnectionEventListener;
import org.apache.ignite.rest.client.model.MetricSource;
import org.jetbrains.annotations.Nullable;
/** Implementation of {@link MetricRegistry}. */
@Singleton
-public class MetricRegistryImpl implements MetricRegistry,
AsyncSessionEventListener {
+public class MetricRegistryImpl implements MetricRegistry,
ConnectionEventListener {
@Inject
private NodeMetricSourceListCall metricSourceListCall;
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/NodeConfigRegistryImpl.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/NodeConfigRegistryImpl.java
index b50f09790c..4effb9cca9 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/NodeConfigRegistryImpl.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/NodeConfigRegistryImpl.java
@@ -22,13 +22,13 @@ import com.typesafe.config.ConfigFactory;
import jakarta.inject.Singleton;
import org.apache.ignite.internal.cli.call.configuration.NodeConfigShowCall;
import
org.apache.ignite.internal.cli.call.configuration.NodeConfigShowCallInput;
-import org.apache.ignite.internal.cli.core.repl.AsyncSessionEventListener;
import org.apache.ignite.internal.cli.core.repl.SessionInfo;
import org.apache.ignite.internal.cli.core.repl.registry.NodeConfigRegistry;
+import org.apache.ignite.internal.cli.event.ConnectionEventListener;
/** Implementation of {@link NodeConfigRegistry}. */
@Singleton
-public class NodeConfigRegistryImpl implements NodeConfigRegistry,
AsyncSessionEventListener {
+public class NodeConfigRegistryImpl implements NodeConfigRegistry,
ConnectionEventListener {
private final NodeConfigShowCall nodeConfigShowCall;
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/UnitsRegistryImpl.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/UnitsRegistryImpl.java
index e6c7169a4e..8643293861 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/UnitsRegistryImpl.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/registry/impl/UnitsRegistryImpl.java
@@ -26,15 +26,15 @@ import java.util.stream.Collectors;
import org.apache.ignite.internal.cli.call.cluster.unit.ClusterListUnitCall;
import org.apache.ignite.internal.cli.call.unit.ListUnitCallInput;
import org.apache.ignite.internal.cli.core.call.CallOutput;
-import org.apache.ignite.internal.cli.core.repl.AsyncSessionEventListener;
import org.apache.ignite.internal.cli.core.repl.SessionInfo;
import org.apache.ignite.internal.cli.core.repl.registry.UnitsRegistry;
+import org.apache.ignite.internal.cli.event.ConnectionEventListener;
import org.apache.ignite.rest.client.model.UnitStatus;
import org.apache.ignite.rest.client.model.UnitVersionStatus;
/** Implementation of {@link UnitsRegistry}. */
@Singleton
-public class UnitsRegistryImpl implements UnitsRegistry,
AsyncSessionEventListener {
+public class UnitsRegistryImpl implements UnitsRegistry,
ConnectionEventListener {
private final AtomicReference<String> lastKnownUrl = new
AtomicReference<>(null);
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/ConnectionEventListener.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/ConnectionEventListener.java
new file mode 100644
index 0000000000..83c36c07f0
--- /dev/null
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/ConnectionEventListener.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.event;
+
+import org.apache.ignite.internal.cli.core.repl.ConnectEvent;
+import org.apache.ignite.internal.cli.core.repl.SessionInfo;
+
+/**
+ * Listener of any connection related events.
+ */
+public interface ConnectionEventListener extends EventListener {
+
+ @Override
+ default void onEvent(Event event) {
+ switch (event.eventType()) {
+ case CONNECT:
+ ConnectEvent connectEvent = (ConnectEvent) event;
+ onConnect(connectEvent.sessionInfo());
+ break;
+ case DISCONNECT:
+ onDisconnect();
+ break;
+ case CONNECTION_LOST:
+ onConnectionLost();
+ break;
+ case CONNECTION_RESTORED:
+ onConnectionRestored();
+ break;
+ default:
+ break;
+ }
+ }
+
+ /** Implementation must be async. */
+ default void onConnect(SessionInfo sessionInfo) {
+ }
+
+ /** Implementation must be async. */
+ default void onDisconnect() {
+ }
+
+ /** Implementation must be async. */
+ default void onConnectionLost() {
+ }
+
+ /** Implementation must be async. */
+ default void onConnectionRestored() {
+ }
+
+}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/Event.java
similarity index 74%
copy from
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
copy to
modules/cli/src/main/java/org/apache/ignite/internal/cli/event/Event.java
index 272fbbc3f3..f3588be1fd 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/Event.java
@@ -15,14 +15,11 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.cli.core.repl;
+package org.apache.ignite.internal.cli.event;
-/** Session event listener. */
-public interface AsyncSessionEventListener {
-
- /** Implementation must be async. */
- void onConnect(SessionInfo sessionInfo);
-
- /** Implementation must be async. */
- void onDisconnect();
+/**
+ * The event cas which is produced by event producer component.
+ */
+public interface Event {
+ EventType eventType();
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventFactory.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventFactory.java
new file mode 100644
index 0000000000..c76a8738b0
--- /dev/null
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.event;
+
+import jakarta.inject.Singleton;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.cli.logger.CliLoggers;
+import org.apache.ignite.internal.logger.IgniteLogger;
+
+/**
+ * Register listeners and produces events.
+ */
+@Singleton
+public class EventFactory implements EventPublisher, EventSubscriptionManager {
+
+ private static final IgniteLogger LOG =
CliLoggers.forClass(EventFactory.class);
+
+ /** All listeners. */
+ private final ConcurrentHashMap<EventType, List<EventListener>> listeners
= new ConcurrentHashMap<>();
+
+ /**
+ * Registers an event listener.
+ *
+ * @param eventType type of event to listen.
+ * @param eventListener event listener.
+ */
+ @Override
+ public void subscribe(EventType eventType, EventListener eventListener) {
+ listeners.computeIfAbsent(eventType, evtKey -> new
CopyOnWriteArrayList<>()).add(eventListener);
+ }
+
+ /**
+ * Removes a listener associated with the event type.
+ *
+ * @param eventType type of event to listen.
+ * @param eventListener event listener.
+ */
+ @Override
+ public void removeSubscription(EventType eventType, EventListener
eventListener) {
+ listeners.computeIfPresent(eventType, (eventType1, eventListeners) -> {
+ eventListeners.remove(eventListener);
+ return eventListeners;
+ });
+ }
+
+ /**
+ * Notifies every listener that subscribed before.
+ *
+ * @param event event itself.
+ */
+ @Override
+ public void publish(Event event) {
+ List<EventListener> eventListeners = listeners.get(event.eventType());
+
+ if (eventListeners == null) {
+ return;
+ }
+
+ eventListeners.forEach(listener -> {
+ try {
+ listener.onEvent(event);
+ } catch (Exception exception) {
+ LOG.warn("Got an exception: ", exception);
+ }
+ });
+ }
+}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventListener.java
similarity index 74%
copy from
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
copy to
modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventListener.java
index 272fbbc3f3..f4d49c2631 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventListener.java
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.cli.core.repl;
+package org.apache.ignite.internal.cli.event;
-/** Session event listener. */
-public interface AsyncSessionEventListener {
-
- /** Implementation must be async. */
- void onConnect(SessionInfo sessionInfo);
+/**
+ * The listener handles events from a producer.
+ */
+@FunctionalInterface
+public interface EventListener {
- /** Implementation must be async. */
- void onDisconnect();
+ void onEvent(Event event);
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventPublisher.java
similarity index 74%
copy from
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
copy to
modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventPublisher.java
index 272fbbc3f3..b7c63b9a6d 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventPublisher.java
@@ -15,14 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.cli.core.repl;
+package org.apache.ignite.internal.cli.event;
-/** Session event listener. */
-public interface AsyncSessionEventListener {
-
- /** Implementation must be async. */
- void onConnect(SessionInfo sessionInfo);
+/**
+ * Publish events.
+ */
+public interface EventPublisher {
- /** Implementation must be async. */
- void onDisconnect();
+ /**
+ * Publish event to the registered event listeners.
+ *
+ * @param event event to fire
+ */
+ void publish(Event event);
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventSubscriptionManager.java
similarity index 74%
copy from
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
copy to
modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventSubscriptionManager.java
index 272fbbc3f3..50b82dce29 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventSubscriptionManager.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.cli.core.repl;
+package org.apache.ignite.internal.cli.event;
-/** Session event listener. */
-public interface AsyncSessionEventListener {
+/**
+ * Subscribes event listeners.
+ */
+public interface EventSubscriptionManager {
- /** Implementation must be async. */
- void onConnect(SessionInfo sessionInfo);
+ void subscribe(EventType eventType, EventListener eventListener);
- /** Implementation must be async. */
- void onDisconnect();
+ void removeSubscription(EventType eventType, EventListener eventListener);
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventType.java
similarity index 74%
rename from
modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
rename to
modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventType.java
index 272fbbc3f3..28f0489e20 100644
---
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/core/repl/AsyncSessionEventListener.java
+++
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/EventType.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.cli.core.repl;
+package org.apache.ignite.internal.cli.event;
-/** Session event listener. */
-public interface AsyncSessionEventListener {
-
- /** Implementation must be async. */
- void onConnect(SessionInfo sessionInfo);
-
- /** Implementation must be async. */
- void onDisconnect();
+/**
+ * Types of cli events.
+ */
+public enum EventType {
+ CONNECTION_RESTORED,
+ CONNECTION_LOST,
+ CONNECT,
+ DISCONNECT
}
diff --git
a/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/Events.java
b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/Events.java
new file mode 100644
index 0000000000..a0873a219f
--- /dev/null
+++ b/modules/cli/src/main/java/org/apache/ignite/internal/cli/event/Events.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cli.event;
+
+import org.apache.ignite.internal.cli.core.repl.ConnectEvent;
+import org.apache.ignite.internal.cli.core.repl.ConnectionLostEvent;
+import org.apache.ignite.internal.cli.core.repl.ConnectionRestoredEvent;
+import org.apache.ignite.internal.cli.core.repl.DisconnectEvent;
+import org.apache.ignite.internal.cli.core.repl.SessionInfo;
+
+/**
+ * Events factory.
+ */
+public final class Events {
+
+ /**
+ * Creates {@code ConnectEvent}.
+ *
+ * @param sessionInfo session info
+ * @return event
+ */
+ public static Event connect(SessionInfo sessionInfo) {
+ return new ConnectEvent(sessionInfo);
+ }
+
+ /**
+ * Creates {@code DisconnectEvent}.
+ *
+ * @return event
+ */
+ public static Event disconnect() {
+ return new DisconnectEvent();
+ }
+
+ /**
+ * Creates {@code ConnectionLostEvent}.
+ *
+ * @return event
+ */
+ public static Event connectionLost() {
+ return new ConnectionLostEvent();
+ }
+
+ /**
+ * Creates {@code ConnectionRestoredEvent}.
+ *
+ * @return event
+ */
+ public static Event connectionRestored() {
+ return new ConnectionRestoredEvent();
+ }
+}
diff --git
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/repl/SessionDefaultValueProviderTest.java
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/repl/SessionDefaultValueProviderTest.java
index 82b0d42bdb..82462722cc 100644
---
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/repl/SessionDefaultValueProviderTest.java
+++
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/repl/SessionDefaultValueProviderTest.java
@@ -23,9 +23,9 @@ import static
org.apache.ignite.internal.cli.commands.Options.Constants.JDBC_URL
import static
org.apache.ignite.internal.cli.commands.Options.Constants.JDBC_URL_OPTION_SHORT;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import java.util.List;
import
org.apache.ignite.internal.cli.commands.cliconfig.TestConfigManagerProvider;
import org.apache.ignite.internal.cli.config.ConfigDefaultValueProvider;
+import org.apache.ignite.internal.cli.event.Events;
import org.junit.jupiter.api.Test;
import picocli.CommandLine.Model.OptionSpec;
@@ -36,7 +36,7 @@ class SessionDefaultValueProviderTest {
@Test
void jdbcUrl() throws Exception {
// Given
- Session session = new Session(List.of());
+ Session session = new Session();
ConfigDefaultValueProvider configDefaultValueProvider = new
ConfigDefaultValueProvider(new TestConfigManagerProvider());
SessionDefaultValueProvider provider = new
SessionDefaultValueProvider(session, configDefaultValueProvider);
@@ -47,7 +47,7 @@ class SessionDefaultValueProviderTest {
assertEquals(JDBC_URL_CONFIG, provider.defaultValue(spec));
// When session is connected
- session.connect(new SessionInfo("nodeUrl", "nodeName",
JDBC_URL_SESSION, null));
+
session.onEvent(Events.connect(SessionInfo.builder().nodeUrl("nodeUrl").nodeName("nodeName").jdbcUrl(JDBC_URL_SESSION).build()));
// Then default value is taken from the session
assertEquals(JDBC_URL_SESSION, provider.defaultValue(spec));
diff --git
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/repl/completer/filter/DynamicCompleterFilterTest.java
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/repl/completer/filter/DynamicCompleterFilterTest.java
index bd9df952a6..a61add85ea 100644
---
a/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/repl/completer/filter/DynamicCompleterFilterTest.java
+++
b/modules/cli/src/test/java/org/apache/ignite/internal/cli/core/repl/completer/filter/DynamicCompleterFilterTest.java
@@ -22,9 +22,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.emptyArray;
-import java.util.Collections;
import org.apache.ignite.internal.cli.core.repl.Session;
import org.apache.ignite.internal.cli.core.repl.SessionInfo;
+import org.apache.ignite.internal.cli.event.Events;
import org.junit.jupiter.api.Test;
class DynamicCompleterFilterTest {
@@ -46,14 +46,14 @@ class DynamicCompleterFilterTest {
}
private static Session notConnected() {
- Session session = new Session(Collections.emptyList());
- session.disconnect();
+ Session session = new Session();
+ session.onEvent(Events.disconnect());
return session;
}
private static Session connected() {
- Session session = new Session(Collections.emptyList());
- session.connect(new SessionInfo("nodeUrl", "nodeName", "jdbcUrl",
null));
+ Session session = new Session();
+
session.onEvent(Events.connect(SessionInfo.builder().nodeUrl("nodeUrl").nodeName("nodeName").jdbcUrl("jdbcUrl").build()));
return session;
}