This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b68b1564a8 IGNITE-18888 Java client: Implement client-side metrics
(#1846)
b68b1564a8 is described below
commit b68b1564a8305411ebccd3027392cd07c3331718
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Tue Mar 28 09:40:43 2023 +0300
IGNITE-18888 Java client: Implement client-side metrics (#1846)
* Add metrics to Java client:
* ConnectionsActive
* ConnectionsEstablished
* ConnectionsLost
* ConnectionsLostTimeout
* HandshakesFailed
* HandshakesFailedTimeout
* RequestsActive
* RequestsSent
* RequestsCompleted
* RequestsRetried
* RequestsFailed
* BytesSent
* BytesReceived
* Changed / skipped from initial list in the ticket
([IGNITE-18759](https://issues.apache.org/jira/browse/IGNITE-18759)):
* **HandshakesFailedTls** - skipped, no reliable way to collect on the
client (e.g. when TLS is not configured on server, we get a magic byte mismatch
instead). Also does not seem useful - this is just a misconfiguration on the
client side.
* **RequestsCompletedWithRetry** changed to **RequestsRetried** - total
number of retries is more useful than successful retries, better indicates
system health.
* Reuse existing metrics infrastructure (`MetricsManager`):
* Only JMX exporter is available on the client, not configurable.
* Refactor `JmxExporter` and `MetricsManager` to accept loggers in
constructor for client-side usage.
---
.../ignite/internal/cli/ssl/ItJdbcSslTest.java | 4 +-
modules/client/build.gradle | 3 +-
.../org/apache/ignite/client/IgniteClient.java | 19 +-
.../ignite/client/IgniteClientConfiguration.java | 7 +
.../internal/client/ClientChannelFactory.java | 40 ++
.../ignite/internal/client/ClientMetricSource.java | 424 +++++++++++++++++++++
.../client/IgniteClientConfigurationImpl.java | 16 +-
.../ignite/internal/client/ReliableChannel.java | 25 +-
.../ignite/internal/client/TcpClientChannel.java | 68 +++-
.../ignite/internal/client/TcpIgniteClient.java | 45 ++-
.../client/io/netty/NettyClientConnection.java | 29 +-
.../io/netty/NettyClientConnectionMultiplexer.java | 15 +-
.../apache/ignite/client/ClientMetricsTest.java | 197 ++++++++++
.../org/apache/ignite/client/RetryPolicyTest.java | 2 +-
.../{MetricsTest.java => ServerMetricsTest.java} | 2 +-
.../ignite/internal/metrics/MetricManager.java | 32 +-
.../metrics/exporters/jmx/JmxExporter.java | 18 +-
.../ignite/internal/metrics/JmxExporterTest.java | 3 +-
18 files changed, 894 insertions(+), 55 deletions(-)
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/ssl/ItJdbcSslTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/ssl/ItJdbcSslTest.java
index 16ca15d2a7..55de3866ec 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/ssl/ItJdbcSslTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/ssl/ItJdbcSslTest.java
@@ -84,7 +84,7 @@ public class ItJdbcSslTest extends
CliSslClientConnectorIntegrationTestBase {
() -> assertExitCodeIs(1),
this::assertOutputIsEmpty,
() -> assertErrOutputContains("Connection failed"),
- () -> assertErrOutputContains("Failed to send handshake
request")
+ () -> assertErrOutputContains("Handshake error")
);
}
@@ -107,7 +107,7 @@ public class ItJdbcSslTest extends
CliSslClientConnectorIntegrationTestBase {
() -> assertExitCodeIs(1),
this::assertOutputIsEmpty,
() -> assertErrOutputContains("Connection failed"),
- () -> assertErrOutputContains("Channel is closed")
+ () -> assertErrOutputContains("Handshake error")
);
}
diff --git a/modules/client/build.gradle b/modules/client/build.gradle
index 6cd1327175..aadb065754 100644
--- a/modules/client/build.gradle
+++ b/modules/client/build.gradle
@@ -23,8 +23,9 @@ dependencies {
implementation project(':ignite-api')
implementation project(':ignite-core')
implementation project(':ignite-binary-tuple')
- api project(':ignite-client-common')
+ implementation project(':ignite-client-common')
implementation project(':ignite-marshaller-common')
+ implementation project(':ignite-metrics')
implementation libs.jetbrains.annotations
implementation libs.msgpack.core
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
index 9be68d90be..ffdeb70fbf 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java
@@ -104,6 +104,9 @@ public interface IgniteClient extends Ignite {
/** SSL configuration. */
private @Nullable SslConfiguration sslConfiguration;
+ /** Metrics enabled flag. */
+ private boolean metricsEnabled;
+
/**
* Sets the addresses of Ignite server nodes within a cluster. An
address can be an IP address or a hostname, with or without port.
* If port is not set then Ignite will generate multiple addresses for
default port range. See {@link
@@ -302,6 +305,18 @@ public interface IgniteClient extends Ignite {
return this;
}
+ /**
+ * Enables or disables JMX metrics.
+ *
+ * @param metricsEnabled Metrics enabled flag.
+ * @return This instance.
+ */
+ public Builder metricsEnabled(boolean metricsEnabled) {
+ this.metricsEnabled = metricsEnabled;
+
+ return this;
+ }
+
/**
* Builds the client.
*
@@ -329,8 +344,8 @@ public interface IgniteClient extends Ignite {
heartbeatTimeout,
retryPolicy,
loggerFactory,
- sslConfiguration
- );
+ sslConfiguration,
+ metricsEnabled);
return TcpIgniteClient.startAsync(cfg);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
index 2511e8841c..20652b45b5 100644
---
a/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
+++
b/modules/client/src/main/java/org/apache/ignite/client/IgniteClientConfiguration.java
@@ -170,4 +170,11 @@ public interface IgniteClientConfiguration {
* @return Client SSL configuration.
*/
@Nullable SslConfiguration ssl();
+
+ /**
+ * Gets a value indicating whether JMX metrics are enabled.
+ *
+ * @return {@code true} if metrics are enabled.
+ */
+ boolean metricsEnabled();
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelFactory.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelFactory.java
new file mode 100644
index 0000000000..b517957d3e
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientChannelFactory.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
+
+/**
+ * Client channel factory.
+ */
+@FunctionalInterface
+public interface ClientChannelFactory {
+ /**
+ * Creates a new client channel.
+ *
+ * @param cfg Configuration.
+ * @param multiplexer Multiplexer.
+ * @param metrics Metric source.
+ * @return Channel future.
+ */
+ CompletableFuture<ClientChannel> create(
+ ClientChannelConfiguration cfg,
+ ClientConnectionMultiplexer multiplexer,
+ ClientMetricSource metrics);
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientMetricSource.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientMetricSource.java
new file mode 100644
index 0000000000..3a00b27841
--- /dev/null
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientMetricSource.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.AtomicLongMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSetBuilder;
+
+/**
+ * Client-side metrics.
+ */
+public class ClientMetricSource extends
AbstractMetricSource<ClientMetricSource.Holder> {
+ /**
+ * Constructor.
+ */
+ ClientMetricSource() {
+ super("client");
+ }
+
+ /**
+ * Gets active connections.
+ *
+ * @return Active connections.
+ */
+ public long connectionsActive() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.connectionsActive.value();
+ }
+
+ /**
+ * Increments active connections.
+ */
+ public void connectionsActiveIncrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.connectionsActive.increment();
+ }
+ }
+
+ /**
+ * Decrements active connections.
+ */
+ public void connectionsActiveDecrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.connectionsActive.decrement();
+ }
+ }
+
+ /**
+ * Gets total established connections.
+ *
+ * @return Total established connections.
+ */
+ public long connectionsEstablished() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.connectionsEstablished.value();
+ }
+
+ /**
+ * Increments established connections.
+ */
+ public void connectionsEstablishedIncrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.connectionsEstablished.increment();
+ }
+ }
+
+ /**
+ * Gets total lost connections.
+ *
+ * @return Total lost connections.
+ */
+ public long connectionsLost() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.connectionsLost.value();
+ }
+
+ /**
+ * Increments lost connections.
+ */
+ public void connectionsLostIncrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.connectionsLost.increment();
+ }
+ }
+
+ /**
+ * Gets total lost connections due to a timeout.
+ *
+ * @return Total lost connections due to a timeout.
+ */
+ public long connectionsLostTimeout() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.connectionsLostTimeout.value();
+ }
+
+ /**
+ * Increments lost connections due to a timeout.
+ */
+ public void connectionsLostTimeoutIncrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.connectionsLostTimeout.increment();
+ }
+ }
+
+ /**
+ * Gets total failed handshakes.
+ *
+ * @return Total failed handshakes.
+ */
+ public long handshakesFailed() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.handshakesFailed.value();
+ }
+
+ /**
+ * Increments failed handshakes.
+ */
+ public void handshakesFailedIncrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.handshakesFailed.increment();
+ }
+ }
+
+ /**
+ * Gets total failed handshakes due to a timeout.
+ *
+ * @return Total failed handshakes due to a timeout.
+ */
+ public long handshakesFailedTimeout() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.handshakesFailedTimeout.value();
+ }
+
+ /**
+ * Increments failed handshakes due to a timeout.
+ */
+ public void handshakesFailedTimeoutIncrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.handshakesFailedTimeout.increment();
+ }
+ }
+
+ /**
+ * Gets currently active (in-flight) requests.
+ *
+ * @return Currently active (in-flight) requests.
+ */
+ public long requestsActive() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.requestsActive.value();
+ }
+
+ /**
+ * Increments currently active (in-flight) requests.
+ */
+ public void requestsActiveIncrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.requestsActive.increment();
+ }
+ }
+
+ /**
+ * Decrements currently active (in-flight) requests.
+ */
+ public void requestsActiveDecrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.requestsActive.decrement();
+ }
+ }
+
+ /**
+ * Gets sent requests.
+ *
+ * @return Sent requests.
+ */
+ public long requestsSent() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.requestsSent.value();
+ }
+
+ /**
+ * Increments sent requests.
+ */
+ public void requestsSentIncrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.requestsSent.increment();
+ }
+ }
+
+ /**
+ * Gets completed requests.
+ *
+ * @return Completed requests.
+ */
+ public long requestsCompleted() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.requestsCompleted.value();
+ }
+
+ /**
+ * Increments completed requests.
+ */
+ public void requestsCompletedIncrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.requestsCompleted.increment();
+ }
+ }
+
+ /**
+ * Gets retried requests.
+ *
+ * @return Retried requests.
+ */
+ public long requestsRetried() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.requestsRetried.value();
+ }
+
+ /**
+ * Increments requests completed with retry.
+ */
+ public void requestsRetriedIncrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.requestsRetried.increment();
+ }
+ }
+
+ /**
+ * Gets failed requests.
+ *
+ * @return Failed requests.
+ */
+ public long requestsFailed() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.requestsFailed.value();
+ }
+
+ /**
+ * Increments failed requests.
+ */
+ public void requestsFailedIncrement() {
+ Holder h = holder();
+
+ if (h != null) {
+ h.requestsFailed.increment();
+ }
+ }
+
+ /**
+ * Gets total sent bytes.
+ *
+ * @return Sent bytes.
+ */
+ public long bytesSent() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.bytesSent.value();
+ }
+
+ /**
+ * Adds sent bytes.
+ *
+ * @param bytes Sent bytes.
+ */
+ public void bytesSentAdd(long bytes) {
+ Holder h = holder();
+
+ if (h != null) {
+ h.bytesSent.add(bytes);
+ }
+ }
+
+ /**
+ * Gets total received bytes.
+ *
+ * @return Received bytes.
+ */
+ public long bytesReceived() {
+ Holder h = holder();
+
+ return h == null ? 0 : h.bytesReceived.value();
+ }
+
+ /**
+ * Adds received bytes.
+ *
+ * @param bytes Received bytes.
+ */
+ public void bytesReceivedAdd(long bytes) {
+ Holder h = holder();
+
+ if (h != null) {
+ h.bytesReceived.add(bytes);
+ }
+ }
+
+ @Override
+ protected Holder createHolder() {
+ return new Holder();
+ }
+
+ @Override
+ protected void init(MetricSetBuilder bldr, Holder holder) {
+ holder.register(bldr);
+ }
+
+ /**
+ * Metrics holder.
+ */
+ protected static class Holder implements
AbstractMetricSource.Holder<Holder> {
+ private final AtomicLongMetric connectionsActive =
+ new AtomicLongMetric("ConnectionsActive", "Currently active
connections");
+
+ private final AtomicLongMetric connectionsEstablished =
+ new AtomicLongMetric("ConnectionsEstablished", "Total
established connections");
+
+ private final AtomicLongMetric connectionsLost =
+ new AtomicLongMetric("ConnectionsLost", "Total lost
connections");
+
+ private final AtomicLongMetric connectionsLostTimeout =
+ new AtomicLongMetric("ConnectionsLostTimeout", "Total lost
connections due to a timeout");
+
+ private final AtomicLongMetric handshakesFailed =
+ new AtomicLongMetric("HandshakesFailed", "Total failed
handshakes");
+
+ private final AtomicLongMetric handshakesFailedTimeout =
+ new AtomicLongMetric("HandshakesFailedTimeout", "Total failed
handshakes due to a timeout");
+
+ private final AtomicLongMetric requestsActive =
+ new AtomicLongMetric("RequestsActive", "Currently active
requests");
+
+ private final AtomicLongMetric requestsSent =
+ new AtomicLongMetric("RequestsSent", "Total requests sent");
+
+ private final AtomicLongMetric requestsCompleted =
+ new AtomicLongMetric("RequestsCompleted", "Total requests
completed (response received)");
+
+ private final AtomicLongMetric requestsRetried =
+ new AtomicLongMetric(
+ "RequestsRetried",
+ "Total requests retries (request sent again after a
failure)");
+
+ private final AtomicLongMetric requestsFailed = new
AtomicLongMetric("RequestsFailed", "Total requests failed");
+
+ private final AtomicLongMetric bytesSent = new
AtomicLongMetric("BytesSent", "Total bytes sent");
+
+ private final AtomicLongMetric bytesReceived = new
AtomicLongMetric("BytesReceived", "Total bytes received");
+
+ final List<Metric> metrics = Arrays.asList(
+ connectionsActive,
+ connectionsEstablished,
+ connectionsLost,
+ connectionsLostTimeout,
+ handshakesFailed,
+ handshakesFailedTimeout,
+ requestsActive,
+ requestsSent,
+ requestsCompleted,
+ requestsRetried,
+ requestsFailed,
+ bytesSent,
+ bytesReceived
+ );
+
+ void register(MetricSetBuilder bldr) {
+ for (var metric : metrics) {
+ bldr.register(metric);
+ }
+ }
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
index b665722f49..af7372a2ec 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/IgniteClientConfigurationImpl.java
@@ -63,11 +63,13 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
private final SslConfiguration sslConfiguration;
+ private final boolean metricsEnabled;
+
/**
* Constructor.
*
* @param addressFinder Address finder.
- * @param addresses Addresses.
+ * @param addresses Addresses.
* @param connectTimeout Socket connect timeout.
* @param reconnectThrottlingPeriod Reconnect throttling period, in
milliseconds.
* @param reconnectThrottlingRetries Reconnect throttling retries.
@@ -77,6 +79,7 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
* @param heartbeatTimeout Heartbeat message timeout.
* @param retryPolicy Retry policy.
* @param loggerFactory Logger factory which will be used to create a
logger instance for this this particular client when needed.
+ * @param metricsEnabled Whether metrics are enabled.
*/
public IgniteClientConfigurationImpl(
IgniteClientAddressFinder addressFinder,
@@ -90,8 +93,8 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
long heartbeatTimeout,
@Nullable RetryPolicy retryPolicy,
@Nullable LoggerFactory loggerFactory,
- @Nullable SslConfiguration sslConfiguration
- ) {
+ @Nullable SslConfiguration sslConfiguration,
+ boolean metricsEnabled) {
this.addressFinder = addressFinder;
//noinspection AssignmentOrReturnOfFieldWithMutableType (cloned in
Builder).
@@ -107,6 +110,7 @@ public final class IgniteClientConfigurationImpl implements
IgniteClientConfigur
this.retryPolicy = retryPolicy;
this.loggerFactory = loggerFactory;
this.sslConfiguration = sslConfiguration;
+ this.metricsEnabled = metricsEnabled;
}
/** {@inheritDoc} */
@@ -180,4 +184,10 @@ public final class IgniteClientConfigurationImpl
implements IgniteClientConfigur
public @Nullable SslConfiguration ssl() {
return sslConfiguration;
}
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean metricsEnabled() {
+ return metricsEnabled;
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 5fbe5b3bf4..dbee720752 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -43,7 +43,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.ignite.client.ClientOperationType;
@@ -64,7 +63,10 @@ import org.jetbrains.annotations.Nullable;
*/
public final class ReliableChannel implements AutoCloseable {
/** Channel factory. */
- private final BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, CompletableFuture<ClientChannel>> chFactory;
+ private final ClientChannelFactory chFactory;
+
+ /** Metrics. */
+ private final ClientMetricSource metrics;
/** Client channel holders for each configured address. */
private volatile List<ClientChannelHolder> channels;
@@ -117,13 +119,16 @@ public final class ReliableChannel implements
AutoCloseable {
* @param chFactory Channel factory.
* @param clientCfg Client config.
*/
- ReliableChannel(BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, CompletableFuture<ClientChannel>> chFactory,
- IgniteClientConfiguration clientCfg) {
+ ReliableChannel(
+ ClientChannelFactory chFactory,
+ IgniteClientConfiguration clientCfg,
+ ClientMetricSource metrics) {
this.clientCfg = Objects.requireNonNull(clientCfg, "clientCfg");
this.chFactory = Objects.requireNonNull(chFactory, "chFactory");
this.log = ClientUtils.logger(clientCfg, ReliableChannel.class);
+ this.metrics = metrics;
- connMgr = new NettyClientConnectionMultiplexer();
+ connMgr = new NettyClientConnectionMultiplexer(metrics);
connMgr.start(clientCfg);
}
@@ -132,8 +137,6 @@ public final class ReliableChannel implements AutoCloseable
{
public synchronized void close() {
closed = true;
- connMgr.stop();
-
List<ClientChannelHolder> holders = channels;
if (holders != null) {
@@ -141,6 +144,8 @@ public final class ReliableChannel implements AutoCloseable
{
hld.close();
}
}
+
+ connMgr.stop();
}
/**
@@ -576,6 +581,10 @@ public final class ReliableChannel implements
AutoCloseable {
}
}
+ if (res) {
+ metrics.requestsRetriedIncrement();
+ }
+
return res;
}
@@ -773,7 +782,7 @@ public final class ReliableChannel implements AutoCloseable
{
new
IgniteClientConnectionException(CONNECTION_ERR, "Reconnect is not allowed due
to applied throttling"));
}
- chFut0 = chFactory.apply(chCfg, connMgr).thenApply(ch -> {
+ chFut0 = chFactory.create(chCfg, connMgr,
metrics).thenApply(ch -> {
var oldClusterId = clusterId.compareAndExchange(null,
ch.protocolContext().clusterId());
if (oldClusterId != null &&
!oldClusterId.equals(ch.protocolContext().clusterId())) {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 86387df674..fc5b62dde0 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -78,6 +78,9 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/** Config. */
private final ClientChannelConfiguration cfg;
+ /** Metrics. */
+ private final ClientMetricSource metrics;
+
/** Protocol context. */
private volatile ProtocolContext protocolCtx;
@@ -117,11 +120,13 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/**
* Constructor.
*
- * @param cfg Config.
+ * @param cfg Config.
+ * @param metrics Metrics.
*/
- private TcpClientChannel(ClientChannelConfiguration cfg) {
+ private TcpClientChannel(ClientChannelConfiguration cfg,
ClientMetricSource metrics) {
validateConfiguration(cfg);
this.cfg = cfg;
+ this.metrics = metrics;
log = ClientUtils.logger(cfg.clientConfiguration(),
TcpClientChannel.class);
@@ -153,7 +158,9 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
.thenApplyAsync(unused -> {
// Netty has a built-in IdleStateHandler to detect idle
connections (used on the server side).
// However, to adjust the heartbeat interval dynamically,
we have to use a timer here.
- heartbeatTimer =
initHeartbeat(cfg.clientConfiguration().heartbeatInterval());
+ if (protocolCtx != null) {
+ heartbeatTimer =
initHeartbeat(cfg.clientConfiguration().heartbeatInterval());
+ }
return this;
}, asyncContinuationExecutor);
@@ -164,24 +171,34 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
*
* @param cfg Configuration.
* @param connMgr Connection manager.
+ * @param metrics Metrics.
* @return Channel.
*/
- static CompletableFuture<ClientChannel>
createAsync(ClientChannelConfiguration cfg, ClientConnectionMultiplexer
connMgr) {
+ static CompletableFuture<ClientChannel> createAsync(
+ ClientChannelConfiguration cfg,
+ ClientConnectionMultiplexer connMgr,
+ ClientMetricSource metrics) {
//noinspection resource - returned from method.
- return new TcpClientChannel(cfg).initAsync(connMgr);
+ return new TcpClientChannel(cfg, metrics).initAsync(connMgr);
}
/** {@inheritDoc} */
@Override
public void close() {
- close(null);
+ close(null, true);
}
/**
* Close the channel with cause.
*/
- private void close(@Nullable Exception cause) {
+ private void close(@Nullable Exception cause, boolean graceful) {
if (closed.compareAndSet(false, true)) {
+ if (cause != null && (cause instanceof TimeoutException ||
cause.getCause() instanceof TimeoutException)) {
+ metrics.connectionsLostTimeoutIncrement();
+ } else if (!graceful) {
+ metrics.connectionsLostIncrement();
+ }
+
// Disconnect can happen before we initialize the timer.
var timer = heartbeatTimer;
@@ -215,7 +232,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
log.debug("Connection closed [remoteAddress=" + cfg.getAddress() +
']');
}
- close(e);
+ close(e, false);
}
/** {@inheritDoc} */
@@ -242,7 +259,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
/**
- * Constructor.
+ * Sends request.
*
* @param opCode Operation code.
* @param payloadWriter Payload writer to stream or {@code null} if
request has no payload.
@@ -256,9 +273,10 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
ClientRequestFuture fut = new ClientRequestFuture();
-
pendingReqs.put(id, fut);
+ metrics.requestsActiveIncrement();
+
PayloadOutputChannel payloadCh = new PayloadOutputChannel(this, new
ClientMessagePacker(sock.getBuffer()));
try {
@@ -274,6 +292,10 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
write(req).addListener(f -> {
if (!f.isSuccess()) {
fut.completeExceptionally(new
IgniteClientConnectionException(CONNECTION_ERR, "Failed to send request",
f.cause()));
+ pendingReqs.remove(id);
+ metrics.requestsActiveDecrement();
+ } else {
+ metrics.requestsSentIncrement();
}
});
@@ -286,6 +308,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
payloadCh.close();
pendingReqs.remove(id);
+ metrics.requestsActiveDecrement();
+
throw IgniteException.wrap(t);
}
}
@@ -348,6 +372,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
throw new IgniteClientConnectionException(PROTOCOL_ERR,
String.format("Unexpected response ID [%s]", resId));
}
+ metrics.requestsActiveDecrement();
+
int flags = unpacker.unpackInt();
if (ResponseFlags.getPartitionAssignmentChangedFlag(flags)) {
@@ -362,12 +388,13 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
if (unpacker.tryUnpackNil()) {
pendingReq.complete(unpacker);
+ metrics.requestsCompletedIncrement();
} else {
IgniteException err = readError(unpacker);
-
unpacker.close();
pendingReq.completeExceptionally(err);
+ metrics.requestsFailedIncrement();
}
}
@@ -452,7 +479,22 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
fut.orTimeout(connectTimeout, TimeUnit.MILLISECONDS);
}
- return fut.thenCompose(res -> handshakeRes(res, ver));
+ return fut
+ .thenCompose(res -> handshakeRes(res, ver))
+ .handle((res, err) -> {
+ if (err != null) {
+ if (err instanceof TimeoutException || err.getCause()
instanceof TimeoutException) {
+ metrics.handshakesFailedTimeoutIncrement();
+ throw new
IgniteClientConnectionException(CONNECTION_ERR, "Handshake timeout", err);
+ } else {
+ metrics.handshakesFailedIncrement();
+ }
+
+ throw new
IgniteClientConnectionException(CONNECTION_ERR, "Handshake error", err);
+ }
+
+ return res;
+ });
}
/**
@@ -593,7 +635,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
if (e instanceof TimeoutException) {
log.warn("Heartbeat timeout, closing
the channel [remoteAddress=" + cfg.getAddress() + ']');
- close((TimeoutException) e);
+ close(new
IgniteClientConnectionException(CONNECTION_ERR, "Heartbeat timeout", e), false);
}
return null;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index e5b644b250..e7feb7862c 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -23,23 +23,25 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.function.BiFunction;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.IgniteClientConfiguration;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.deployment.IgniteDeployment;
import org.apache.ignite.internal.client.compute.ClientCompute;
-import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.sql.ClientSql;
import org.apache.ignite.internal.client.table.ClientTables;
import org.apache.ignite.internal.client.tx.ClientTransactions;
import org.apache.ignite.internal.jdbc.proto.ClientMessage;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.exporters.jmx.JmxExporter;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Implementation of {@link IgniteClient} over TCP protocol.
@@ -63,6 +65,12 @@ public class TcpIgniteClient implements IgniteClient {
/** Compute. */
private final ClientSql sql;
+ /** Metric manager. */
+ private final @Nullable MetricManager metricManager;
+
+ /** Metrics. */
+ private final ClientMetricSource metrics;
+
/**
* Constructor.
*
@@ -78,20 +86,34 @@ public class TcpIgniteClient implements IgniteClient {
* @param chFactory Channel factory.
* @param cfg Config.
*/
- private TcpIgniteClient(
- BiFunction<ClientChannelConfiguration,
ClientConnectionMultiplexer, CompletableFuture<ClientChannel>> chFactory,
- IgniteClientConfiguration cfg
- ) {
+ private TcpIgniteClient(ClientChannelFactory chFactory,
IgniteClientConfiguration cfg) {
assert chFactory != null;
assert cfg != null;
this.cfg = cfg;
- ch = new ReliableChannel(chFactory, cfg);
+ metrics = new ClientMetricSource();
+ ch = new ReliableChannel(chFactory, cfg, metrics);
tables = new ClientTables(ch);
transactions = new ClientTransactions(ch);
compute = new ClientCompute(ch, tables);
sql = new ClientSql(ch);
+ metricManager = initMetricManager(cfg);
+ }
+
+ @Nullable
+ private MetricManager initMetricManager(IgniteClientConfiguration cfg) {
+ if (!cfg.metricsEnabled()) {
+ return null;
+ }
+
+ var metricManager = new MetricManager(ClientUtils.logger(cfg,
MetricManager.class));
+ metricManager.start(List.of(new JmxExporter(ClientUtils.logger(cfg,
JmxExporter.class))));
+
+ metricManager.registerSource(metrics);
+ metrics.enable();
+
+ return metricManager;
}
/**
@@ -177,6 +199,10 @@ public class TcpIgniteClient implements IgniteClient {
@Override
public void close() throws Exception {
ch.close();
+
+ if (metricManager != null) {
+ metricManager.stop();
+ }
}
/** {@inheritDoc} */
@@ -197,6 +223,11 @@ public class TcpIgniteClient implements IgniteClient {
return ch.connections();
}
+ @TestOnly
+ public ClientMetricSource metrics() {
+ return metrics;
+ }
+
/**
* Sends ClientMessage request to server side asynchronously and returns
result future.
*
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
index a834318d99..3e0ed3e792 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnection.java
@@ -22,6 +22,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.AttributeKey;
import java.net.InetSocketAddress;
+import org.apache.ignite.internal.client.ClientMetricSource;
import org.apache.ignite.internal.client.io.ClientConnection;
import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
import org.apache.ignite.internal.client.io.ClientMessageHandler;
@@ -32,7 +33,7 @@ import org.apache.ignite.lang.IgniteException;
*/
public class NettyClientConnection implements ClientConnection {
/** Connection attribute. */
- public static final AttributeKey<NettyClientConnection> ATTR_CONN =
AttributeKey.newInstance("CONN");
+ static final AttributeKey<NettyClientConnection> ATTR_CONN =
AttributeKey.newInstance("CONN");
/** Channel. */
private final Channel channel;
@@ -43,26 +44,42 @@ public class NettyClientConnection implements
ClientConnection {
/** State handler. */
private final ClientConnectionStateHandler stateHnd;
+ /** Metrics. */
+ private final ClientMetricSource metrics;
+
/**
* Constructor.
*
- * @param channel Channel.
- * @param msgHnd Message handler.
+ * @param channel Channel.
+ * @param msgHnd Message handler.
* @param stateHnd State handler.
+ * @param metrics Metrics.
*/
- public NettyClientConnection(Channel channel, ClientMessageHandler msgHnd,
ClientConnectionStateHandler stateHnd) {
+ NettyClientConnection(
+ Channel channel,
+ ClientMessageHandler msgHnd,
+ ClientConnectionStateHandler stateHnd,
+ ClientMetricSource metrics) {
this.channel = channel;
this.msgHnd = msgHnd;
this.stateHnd = stateHnd;
+ this.metrics = metrics;
+ //noinspection ThisEscapedInObjectConstruction
channel.attr(ATTR_CONN).set(this);
}
/** {@inheritDoc} */
@Override
public ChannelFuture send(ByteBuf msg) throws IgniteException {
+ int bytes = msg.readableBytes();
+
// writeAndFlush releases pooled buffer.
- return channel.writeAndFlush(msg);
+ ChannelFuture fut = channel.writeAndFlush(msg);
+
+ metrics.bytesSentAdd(bytes);
+
+ return fut;
}
/** {@inheritDoc} */
@@ -89,6 +106,8 @@ public class NettyClientConnection implements
ClientConnection {
* @param buf Message.
*/
void onMessage(ByteBuf buf) {
+ metrics.bytesReceivedAdd(buf.readableBytes());
+
msgHnd.onMessage(buf);
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
index b0c17be0c4..ee4b1fdbb8 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/io/netty/NettyClientConnectionMultiplexer.java
@@ -46,6 +46,7 @@ import org.apache.ignite.client.ClientAuthenticationMode;
import org.apache.ignite.client.IgniteClientConfiguration;
import org.apache.ignite.client.IgniteClientConnectionException;
import org.apache.ignite.client.SslConfiguration;
+import org.apache.ignite.internal.client.ClientMetricSource;
import org.apache.ignite.internal.client.io.ClientConnection;
import org.apache.ignite.internal.client.io.ClientConnectionMultiplexer;
import org.apache.ignite.internal.client.io.ClientConnectionStateHandler;
@@ -62,12 +63,15 @@ public class NettyClientConnectionMultiplexer implements
ClientConnectionMultipl
private final Bootstrap bootstrap;
+ private final ClientMetricSource metrics;
+
/**
* Constructor.
*/
- public NettyClientConnectionMultiplexer() {
+ public NettyClientConnectionMultiplexer(ClientMetricSource metrics) {
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
+ this.metrics = metrics;
}
/** {@inheritDoc} */
@@ -116,7 +120,6 @@ public class NettyClientConnectionMultiplexer implements
ClientConnectionMultipl
} catch (NoSuchAlgorithmException | KeyStoreException |
CertificateException | IOException | UnrecoverableKeyException e) {
throw new IgniteException(CLIENT_SSL_CONFIGURATION_ERR, "Client
SSL configuration error: " + e.getMessage(), e);
}
-
}
private static KeyManagerFactory loadKeyManagerFactory(SslConfiguration
ssl)
@@ -183,7 +186,13 @@ public class NettyClientConnectionMultiplexer implements
ClientConnectionMultipl
connectFut.addListener(f -> {
if (f.isSuccess()) {
- NettyClientConnection conn = new
NettyClientConnection(((ChannelFuture) f).channel(), msgHnd, stateHnd);
+ metrics.connectionsEstablishedIncrement();
+ metrics.connectionsActiveIncrement();
+
+ ChannelFuture chFut = (ChannelFuture) f;
+ chFut.channel().closeFuture().addListener(unused ->
metrics.connectionsActiveDecrement());
+
+ NettyClientConnection conn = new
NettyClientConnection(chFut.channel(), msgHnd, stateHnd, metrics);
fut.complete(conn);
} else {
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
new file mode 100644
index 0000000000..3b2277ea5a
--- /dev/null
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import org.apache.ignite.client.IgniteClient.Builder;
+import org.apache.ignite.client.fakes.FakeIgnite;
+import org.apache.ignite.internal.client.ClientMetricSource;
+import org.apache.ignite.internal.client.TcpIgniteClient;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.lang.IgniteException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+/**
+ * Tests client-side metrics (see also server-side metrics tests in {@link
ServerMetricsTest}).
+ */
+public class ClientMetricsTest {
+ private TestServer server;
+ private IgniteClient client;
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testConnectionMetrics(boolean gracefulDisconnect) throws
Exception {
+ server = AbstractClientTest.startServer(10800, 10, 1000, new
FakeIgnite());
+ client = clientBuilder().build();
+
+ ClientMetricSource metrics = metrics();
+
+ assertEquals(1, metrics.connectionsEstablished());
+ assertEquals(1, metrics.connectionsActive());
+
+ if (gracefulDisconnect) {
+ client.close();
+ } else {
+ server.close();
+ }
+
+ assertTrue(
+ IgniteTestUtils.waitForCondition(() ->
metrics.connectionsActive() == 0, 1000),
+ () -> "connectionsActive: " + metrics.connectionsActive());
+
+ assertEquals(1, metrics.connectionsEstablished());
+ assertEquals(gracefulDisconnect ? 0 : 1, metrics.connectionsLost());
+ }
+
+ @Test
+ public void testConnectionsLostTimeout() throws InterruptedException {
+ Function<Integer, Boolean> shouldDropConnection = requestIdx ->
requestIdx == 0;
+ Function<Integer, Integer> responseDelay = idx -> idx > 1 ? 500 : 0;
+ server = new TestServer(10800, 10, 1000, new FakeIgnite(),
shouldDropConnection, responseDelay, null, AbstractClientTest.clusterId);
+ client = clientBuilder()
+ .connectTimeout(100)
+ .heartbeatTimeout(100)
+ .heartbeatInterval(100)
+ .build();
+
+ assertTrue(
+ IgniteTestUtils.waitForCondition(() ->
metrics().connectionsLostTimeout() == 1, 1000),
+ () -> "connectionsLostTimeout: " +
metrics().connectionsLostTimeout());
+ }
+
+ @Test
+ public void testHandshakesFailed() {
+ AtomicInteger counter = new AtomicInteger();
+ Function<Integer, Boolean> shouldDropConnection = requestIdx ->
counter.incrementAndGet() < 3; // Fail 2 handshakes.
+ server = new TestServer(10800, 10, 1000, new FakeIgnite(),
shouldDropConnection, null, null, AbstractClientTest.clusterId);
+
+ client = clientBuilder().build();
+
+ assertEquals(2, metrics().handshakesFailed());
+ }
+
+ @Test
+ public void testHandshakesFailedTimeout() throws InterruptedException {
+ AtomicInteger counter = new AtomicInteger();
+ Function<Integer, Boolean> shouldDropConnection = requestIdx -> false;
+ Function<Integer, Integer> responseDelay = idx ->
counter.incrementAndGet() == 1 ? 500 : 0;
+ server = new TestServer(10800, 10, 1000, new FakeIgnite(),
shouldDropConnection, responseDelay, null, AbstractClientTest.clusterId);
+ client = clientBuilder()
+ .connectTimeout(100)
+ .build();
+
+ assertTrue(
+ IgniteTestUtils.waitForCondition(() ->
metrics().handshakesFailedTimeout() == 1, 1000),
+ () -> "handshakesFailedTimeout: " +
metrics().handshakesFailedTimeout());
+ }
+
+ @Test
+ public void testRequestsMetrics() throws InterruptedException {
+ Function<Integer, Boolean> shouldDropConnection = requestIdx ->
requestIdx == 5;
+ Function<Integer, Integer> responseDelay = idx -> idx == 4 ? 1000 : 0;
+ server = new TestServer(10800, 10, 1000, new FakeIgnite(),
shouldDropConnection, responseDelay, null, AbstractClientTest.clusterId);
+ client = clientBuilder().build();
+
+ assertEquals(0, metrics().requestsActive());
+ assertEquals(0, metrics().requestsFailed());
+ assertEquals(0, metrics().requestsCompleted());
+ assertEquals(0, metrics().requestsSent());
+ assertEquals(0, metrics().requestsRetried());
+
+ client.tables().tables();
+
+ assertEquals(0, metrics().requestsActive());
+ assertEquals(0, metrics().requestsFailed());
+ assertEquals(1, metrics().requestsCompleted());
+ assertEquals(1, metrics().requestsSent());
+ assertEquals(0, metrics().requestsRetried());
+
+ assertThrows(IgniteException.class, () ->
client.sql().createSession().execute(null, "foo bar"));
+
+ assertEquals(0, metrics().requestsActive());
+ assertEquals(1, metrics().requestsFailed());
+ assertEquals(1, metrics().requestsCompleted());
+ assertEquals(2, metrics().requestsSent());
+ assertEquals(0, metrics().requestsRetried());
+
+ client.tables().tablesAsync();
+
+ assertTrue(
+ IgniteTestUtils.waitForCondition(() ->
metrics().requestsSent() == 3, 1000),
+ () -> "requestsSent: " + metrics().requestsSent());
+
+ assertEquals(1, metrics().requestsActive());
+ assertEquals(1, metrics().requestsFailed());
+ assertEquals(1, metrics().requestsCompleted());
+ assertEquals(0, metrics().requestsRetried());
+
+ client.tables().tables();
+
+ assertTrue(
+ IgniteTestUtils.waitForCondition(() ->
metrics().requestsRetried() == 1, 1000),
+ () -> "requestsRetried: " + metrics().requestsRetried());
+
+ assertEquals(1, metrics().requestsFailed());
+ assertEquals(3, metrics().requestsCompleted());
+ assertEquals(6, metrics().requestsSent());
+ assertEquals(1, metrics().requestsRetried());
+ }
+
+ @Test
+ public void testBytesSentReceived() {
+ server = AbstractClientTest.startServer(10800, 10, 1000, new
FakeIgnite());
+ client = clientBuilder().build();
+
+ assertEquals(15, metrics().bytesSent());
+ assertEquals(50, metrics().bytesReceived());
+
+ client.tables().tables();
+
+ assertEquals(21, metrics().bytesSent());
+ assertEquals(55, metrics().bytesReceived());
+ }
+
+ @AfterEach
+ public void afterAll() throws Exception {
+ if (client != null) {
+ client.close();
+ }
+
+ if (server != null) {
+ server.close();
+ }
+ }
+
+ private Builder clientBuilder() {
+ return IgniteClient.builder()
+ .addresses("127.0.0.1:" + server.port())
+ .metricsEnabled(true);
+ }
+
+ private ClientMetricSource metrics() {
+ return ((TcpIgniteClient) client).metrics();
+ }
+}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index 4904b6faca..133f204bde 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -228,7 +228,7 @@ public class RetryPolicyTest {
@Test
public void testRetryReadPolicyAllOperationsSupported() {
var plc = new RetryReadPolicy();
- var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, 0, 0,
null, 0, 0, null, null, null);
+ var cfg = new IgniteClientConfigurationImpl(null, null, 0, 0, 0, 0,
null, 0, 0, null, null, null, false);
for (var op : ClientOperationType.values()) {
var ctx = new RetryPolicyContextImpl(cfg, op, 0, null);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/MetricsTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ServerMetricsTest.java
similarity index 98%
rename from
modules/client/src/test/java/org/apache/ignite/client/MetricsTest.java
rename to
modules/client/src/test/java/org/apache/ignite/client/ServerMetricsTest.java
index 25257baf63..d2622124a9 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/MetricsTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ServerMetricsTest.java
@@ -38,7 +38,7 @@ import org.junit.jupiter.api.Test;
* Tests client handler metrics. See also {@code
org.apache.ignite.client.handler.ItClientHandlerMetricsTest}.
*/
@SuppressWarnings({"AssignmentToStaticFieldFromInstanceMethod", "rawtypes",
"unchecked"})
-public class MetricsTest extends AbstractClientTest {
+public class ServerMetricsTest extends AbstractClientTest {
@AfterEach
public void resetCompute() {
FakeCompute.future = null;
diff --git
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManager.java
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManager.java
index bb491208dd..64fb1d31c8 100644
---
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManager.java
+++
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/MetricManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.metrics;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.ServiceLoader.Provider;
@@ -44,7 +45,7 @@ import org.jetbrains.annotations.VisibleForTesting;
*/
public class MetricManager implements IgniteComponent {
/** Logger. */
- private static final IgniteLogger LOG =
Loggers.forClass(MetricManager.class);
+ private final IgniteLogger log;
/** Metric registry. */
private final MetricRegistry registry;
@@ -62,8 +63,18 @@ public class MetricManager implements IgniteComponent {
* Constructor.
*/
public MetricManager() {
+ this(Loggers.forClass(MetricManager.class));
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param log Logger.
+ */
+ public MetricManager(IgniteLogger log) {
registry = new MetricRegistry();
metricsProvider = new MetricProvider(registry);
+ this.log = log;
}
/**
@@ -102,6 +113,22 @@ public class MetricManager implements IgniteComponent {
metricConfiguration.exporters().listenElements(new
ExporterConfigurationListener());
}
+ /**
+ * Starts component with default configuration.
+ *
+ * @param exporters Exporters.
+ */
+ public void start(Iterable<MetricExporter<?>> exporters) {
+ this.availableExporters = new HashMap<>();
+
+ for (MetricExporter<?> exporter : exporters) {
+ exporter.start(metricsProvider, null);
+
+ availableExporters.put(exporter.name(), exporter);
+ enabledMetricExporters.put(exporter.name(), exporter);
+ }
+ }
+
/** {@inheritDoc} */
@Override public void stop() throws Exception {
for (MetricExporter metricExporter : enabledMetricExporters.values()) {
@@ -237,9 +264,8 @@ public class MetricManager implements IgniteComponent {
enabledMetricExporters.put(exporter.name(), exporter);
} else {
- LOG.warn("Received configuration for unknown metric exporter with
the name '" + exporterName + "'");
+ log.warn("Received configuration for unknown metric exporter with
the name '" + exporterName + "'");
}
-
}
private class ExporterConfigurationListener implements
ConfigurationNamedListListener<ExporterView> {
diff --git
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/jmx/JmxExporter.java
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/jmx/JmxExporter.java
index 47fab3cbec..aa14e1f957 100644
---
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/jmx/JmxExporter.java
+++
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/exporters/jmx/JmxExporter.java
@@ -54,13 +54,21 @@ public class JmxExporter extends
BasicMetricExporter<JmxExporterView> {
/**
* Logger.
*/
- private static IgniteLogger LOG = Loggers.forClass(JmxExporter.class);
+ private final IgniteLogger log;
/**
* Current registered MBeans.
*/
private final List<ObjectName> mbeans = new ArrayList<>();
+ public JmxExporter() {
+ log = Loggers.forClass(JmxExporter.class);
+ }
+
+ public JmxExporter(IgniteLogger log) {
+ this.log = log;
+ }
+
/**
* {@inheritDoc}
*/
@@ -128,7 +136,7 @@ public class JmxExporter extends
BasicMetricExporter<JmxExporterView> {
mbeans.add(mbean);
} catch (JMException e) {
- LOG.error("MBean for metric set " + metricSet.name() + " can't be
created.", e);
+ log.error("MBean for metric set " + metricSet.name() + " can't be
created.", e);
}
}
@@ -146,10 +154,10 @@ public class JmxExporter extends
BasicMetricExporter<JmxExporterView> {
if (rmv) {
unregBean(mbeanName);
} else {
- LOG.warn("Tried to unregister the MBean for non-registered
metric set " + metricSetName);
+ log.warn("Tried to unregister the MBean for non-registered
metric set " + metricSetName);
}
} catch (MalformedObjectNameException e) {
- LOG.error("MBean for metric set " + metricSetName + " can't be
unregistered.", e);
+ log.error("MBean for metric set " + metricSetName + " can't be
unregistered.", e);
}
}
@@ -162,7 +170,7 @@ public class JmxExporter extends
BasicMetricExporter<JmxExporterView> {
try {
ManagementFactory.getPlatformMBeanServer().unregisterMBean(bean);
} catch (JMException e) {
- LOG.error("Failed to unregister MBean: " + bean, e);
+ log.error("Failed to unregister MBean: " + bean, e);
}
}
}
diff --git
a/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/JmxExporterTest.java
b/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/JmxExporterTest.java
index fb5e5de1dc..d6c9f805e2 100644
---
a/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/JmxExporterTest.java
+++
b/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/JmxExporterTest.java
@@ -41,6 +41,7 @@ import javax.management.ObjectName;
import javax.management.ReflectionException;
import
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metrics.configuration.MetricConfiguration;
import
org.apache.ignite.internal.metrics.exporters.configuration.JmxExporterView;
import org.apache.ignite.internal.metrics.exporters.jmx.JmxExporter;
@@ -99,7 +100,7 @@ public class JmxExporterTest {
mbeanName = IgniteUtils.makeMbeanName("metrics", SRC_NAME);
- jmxExporter = new JmxExporter();
+ jmxExporter = new JmxExporter(Loggers.forClass(JmxExporter.class));
metricsProvider = mock(MetricProvider.class);
}