This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 7e981e56d4 IGNITE-19007 Java client: Improve logging (#1793)
7e981e56d4 is described below
commit 7e981e56d4c5f938ab0d985dc7f39d03e230d3bf
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Wed Mar 15 15:32:26 2023 +0300
IGNITE-19007 Java client: Improve logging (#1793)
---
.../ignite/internal/client/ReliableChannel.java | 27 ++++++++++------
.../ignite/internal/client/TcpClientChannel.java | 36 +++++++++++++++++++---
.../ignite/internal/client/table/ClientTable.java | 11 +++++++
.../apache/ignite/client/ClientLoggingTest.java | 24 +++++++++++++++
.../org/apache/ignite/client/HeartbeatTest.java | 4 +--
.../org/apache/ignite/client/RetryPolicyTest.java | 4 +--
6 files changed, 89 insertions(+), 17 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 09fb936c88..5fbe5b3bf4 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -166,6 +166,10 @@ public final class ReliableChannel implements
AutoCloseable {
return res;
}
+ public IgniteClientConfiguration configuration() {
+ return clientCfg;
+ }
+
/**
* Sends request and handles response asynchronously.
*
@@ -560,7 +564,19 @@ public final class ReliableChannel implements
AutoCloseable {
private boolean shouldRetry(int opCode, ClientFutureUtils.RetryContext
ctx) {
ClientOperationType opType =
ClientUtils.opCodeToClientOperationType(opCode);
- return shouldRetry(opType, ctx);
+ boolean res = shouldRetry(opType, ctx);
+
+ if (log.isDebugEnabled()) {
+ if (res) {
+ log.debug("Retrying operation [opCode=" + opCode + ", opType="
+ opType + ", attempt=" + ctx.attempt
+ + ", lastError=" + ctx.lastError() + ']');
+ } else {
+ log.debug("Not retrying operation [opCode=" + opCode + ",
opType=" + opType + ", attempt=" + ctx.attempt
+ + ", lastError=" + ctx.lastError() + ']');
+ }
+ }
+
+ return res;
}
/** Determines whether specified operation should be retried. */
@@ -596,14 +612,7 @@ public final class ReliableChannel implements
AutoCloseable {
RetryPolicyContext retryPolicyContext = new
RetryPolicyContextImpl(clientCfg, opType, ctx.attempt, exception);
// Exception in shouldRetry will be handled by
ClientFutureUtils.doWithRetryAsync
- boolean shouldRetry = plc.shouldRetry(retryPolicyContext);
-
- if (shouldRetry) {
- log.debug("Going to retry operation because of error [op={},
currentAttempt={}, errMsg={}]",
- exception, opType, ctx.attempt, exception.getMessage());
- }
-
- return shouldRetry;
+ return plc.shouldRetry(retryPolicyContext);
}
/**
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 1036863386..86387df674 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -137,6 +137,10 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
return connMgr
.openAsync(cfg.getAddress(), this, this)
.thenCompose(s -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Connection established [remoteAddress=" +
s.remoteAddress() + ']');
+ }
+
sock = s;
return handshakeAsync(DEFAULT_VERSION);
@@ -207,7 +211,10 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
/** {@inheritDoc} */
@Override
public void onDisconnected(@Nullable Exception e) {
- log.debug("Disconnected from server: " + cfg.getAddress());
+ if (log.isDebugEnabled()) {
+ log.debug("Connection closed [remoteAddress=" + cfg.getAddress() +
']');
+ }
+
close(e);
}
@@ -219,6 +226,10 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
PayloadReader<T> payloadReader
) {
try {
+ if (log.isTraceEnabled()) {
+ log.trace("Sending request [opCode=" + opCode + ",
remoteAddress=" + cfg.getAddress() + ']');
+ }
+
ClientRequestFuture fut = send(opCode, payloadWriter);
return receiveAsync(fut, payloadReader);
@@ -268,6 +279,9 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
return fut;
} catch (Throwable t) {
+ log.warn("Failed to send request [id=" + id + ", op=" + opCode +
", remoteAddress=" + cfg.getAddress() + "]: "
+ + t.getMessage(), t);
+
// Close buffer manually on fail. Successful write closes the
buffer automatically.
payloadCh.close();
pendingReqs.remove(id);
@@ -297,6 +311,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
try (var in = new PayloadInputChannel(this, payload)) {
return payloadReader.apply(in);
} catch (Exception e) {
+ log.error("Failed to deserialize server response
[remoteAddress=" + cfg.getAddress() + "]: " + e.getMessage(), e);
+
throw new IgniteClientConnectionException(PROTOCOL_ERR,
"Failed to deserialize server response: " + e.getMessage(), e);
}
}, asyncContinuationExecutor);
@@ -317,6 +333,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
var type = unpacker.unpackInt();
if (type != ServerMessageType.RESPONSE) {
+ log.error("Unexpected message type [remoteAddress=" +
cfg.getAddress() + "]: " + type);
+
throw new IgniteClientConnectionException(PROTOCOL_ERR,
"Unexpected message type: " + type);
}
@@ -325,12 +343,18 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
ClientRequestFuture pendingReq = pendingReqs.remove(resId);
if (pendingReq == null) {
+ log.error("Unexpected response ID [remoteAddress=" +
cfg.getAddress() + "]: " + resId);
+
throw new IgniteClientConnectionException(PROTOCOL_ERR,
String.format("Unexpected response ID [%s]", resId));
}
int flags = unpacker.unpackInt();
if (ResponseFlags.getPartitionAssignmentChangedFlag(flags)) {
+ if (log.isInfoEnabled()) {
+ log.info("Partition assignment change notification received
[remoteAddress=" + cfg.getAddress() + "]");
+ }
+
for (Consumer<ClientChannel> listener : assignmentChangeListeners)
{
listener.accept(this);
}
@@ -484,6 +508,10 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
srvVer, ProtocolBitmaskFeature.allFeaturesAsEnumSet(),
serverIdleTimeout, clusterNode, clusterId);
return CompletableFuture.completedFuture(null);
+ } catch (Exception e) {
+ log.warn("Failed to handle handshake response [remoteAddress=" +
cfg.getAddress() + "]: " + e.getMessage(), e);
+
+ return CompletableFuture.failedFuture(e);
}
}
@@ -563,7 +591,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
.orTimeout(heartbeatTimeout,
TimeUnit.MILLISECONDS)
.exceptionally(e -> {
if (e instanceof TimeoutException) {
- log.warn("Heartbeat timeout, closing
the channel");
+ log.warn("Heartbeat timeout, closing
the channel [remoteAddress=" + cfg.getAddress() + ']');
close((TimeoutException) e);
}
@@ -572,8 +600,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
});
}
}
- } catch (Throwable ignored) {
- // Ignore failed heartbeats.
+ } catch (Throwable e) {
+ log.warn("Failed to send heartbeat [remoteAddress=" +
cfg.getAddress() + "]: " + e.getMessage(), e);
}
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index d4a36701b9..91c1a18bad 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -31,11 +31,13 @@ import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.ignite.internal.client.ClientChannel;
+import org.apache.ignite.internal.client.ClientUtils;
import org.apache.ignite.internal.client.PayloadOutputChannel;
import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.tx.ClientTransaction;
+import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.tostring.IgniteToStringBuilder;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
@@ -60,6 +62,8 @@ public class ClientTable implements Table {
private final ConcurrentHashMap<Integer, ClientSchema> schemas = new
ConcurrentHashMap<>();
+ private final IgniteLogger log;
+
private volatile int latestSchemaVer = -1;
private final Object latestSchemaLock = new Object();
@@ -83,6 +87,7 @@ public class ClientTable implements Table {
this.ch = ch;
this.id = id;
this.name = name;
+ this.log = ClientUtils.logger(ch.configuration(), ClientTable.class);
}
/**
@@ -160,6 +165,8 @@ public class ClientTable implements Table {
int schemaCnt = r.in().unpackMapHeader();
if (schemaCnt == 0) {
+ log.warn("Schema not found [tableId=" + id + ",
schemaVersion=" + ver + "]");
+
throw new IgniteException(UNEXPECTED_ERR, "Schema not found: "
+ ver);
}
@@ -167,6 +174,10 @@ public class ClientTable implements Table {
for (var i = 0; i < schemaCnt; i++) {
last = readSchema(r.in());
+
+ if (log.isDebugEnabled()) {
+ log.debug("Schema loaded [tableId=" + id + ",
schemaVersion=" + last.version() + "]");
+ }
}
return last;
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientLoggingTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientLoggingTest.java
index 71e557b836..41635df9b8 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientLoggingTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientLoggingTest.java
@@ -22,9 +22,11 @@ import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.ignite.client.fakes.FakeIgnite;
import org.apache.ignite.client.fakes.FakeIgniteTables;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.LoggerFactory;
import org.junit.jupiter.api.AfterEach;
@@ -78,6 +80,28 @@ public class ClientLoggingTest {
loggerFactory2.logger.entries().forEach(msg -> assertThat(msg,
startsWith("client2:")));
}
+ @Test
+ public void testBasicLogging() throws Exception {
+ FakeIgnite ignite = new FakeIgnite();
+ ((FakeIgniteTables) ignite.tables()).createTable("t");
+
+ server = startServer(10950, ignite);
+ server2 = startServer(10955, ignite);
+
+ var loggerFactory = new TestLoggerFactory("c");
+
+ try (var client = createClient(loggerFactory)) {
+ client.tables().tables();
+ client.tables().table("t");
+
+ assertTrue(IgniteTestUtils.waitForCondition(() ->
loggerFactory.logger.entries().size() > 10, 5_000));
+
+ loggerFactory.assertLogContains("Connection established");
+ loggerFactory.assertLogContains("c:Sending request [opCode=3,
remoteAddress=127.0.0.1:1095");
+ loggerFactory.assertLogContains("c:Failed to establish connection
to 127.0.0.1:1095");
+ }
+ }
+
private static TestServer startServer(int port, FakeIgnite ignite) {
return AbstractClientTest.startServer(
port,
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
index c505deaf30..aeeb47112e 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/HeartbeatTest.java
@@ -46,8 +46,8 @@ public class HeartbeatTest {
try (var ignored = builder.build()) {
assertTrue(
IgniteTestUtils.waitForCondition(
- () ->
loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Disconnected
from server")),
- 1000));
+ () ->
loggerFactory.logger.entries().stream().anyMatch(x -> x.contains("Connection
closed")),
+ 10000));
}
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index b154503415..f0e9f0f415 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -187,8 +187,8 @@ public class RetryPolicyTest {
recView.get(null, Tuple.create().set("id", 1L));
recView.get(null, Tuple.create().set("id", 1L));
- loggerFactory.assertLogContains("Disconnected from server");
- loggerFactory.assertLogContains("Going to retry operation because
of error [op=TUPLE_GET");
+ loggerFactory.assertLogContains("Connection closed");
+ loggerFactory.assertLogContains("Retrying operation [opCode=12,
opType=TUPLE_GET, attempt=0, lastError=java.util");
}
}