Results printout for IO latency test and new metrics (cherry picked)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/018b25b2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/018b25b2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/018b25b2 Branch: refs/heads/ignite-5075-cc Commit: 018b25b29c3c491db7e44963e8c79677d77ceb23 Parents: 6f1dc3a Author: Yakov Zhdanov <[email protected]> Authored: Tue May 23 17:39:37 2017 +0300 Committer: Yakov Zhdanov <[email protected]> Committed: Tue May 23 17:44:33 2017 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoManager.java | 380 +++++++++++++++---- .../communication/IgniteIoTestMessage.java | 362 +++++++++++++++++- 2 files changed, 672 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/018b25b2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 147f94d..68bfd07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -18,9 +18,11 @@ package org.apache.ignite.internal.managers.communication; import java.io.Serializable; +import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,12 +66,12 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; import org.apache.ignite.internal.processors.pool.PoolProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; -import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridTuple3; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -341,10 +343,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg; + msg0.senderNodeId(nodeId); + if (msg0.request()) { IgniteIoTestMessage res = new IgniteIoTestMessage(msg0.id(), false, null); res.flags(msg0.flags()); + res.onRequestProcessed(); + + res.copyDataFromRequest(msg0); try { sendToGridTopic(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL); @@ -356,10 +363,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa else { IoTestFuture fut = ioTestMap().get(msg0.id()); + msg0.onResponseProcessed(); + if (fut == null) U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']'); else - fut.onResponse(); + fut.onResponse(msg0); } } }); @@ -404,7 +413,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param procFromNioThread If {@code true} message is processed from NIO thread. * @return Response future. */ - public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) { + public IgniteInternalFuture<List<IgniteIoTestMessage>> sendIoTest( + ClusterNode node, + byte[] payload, + boolean procFromNioThread + ) { long id = ioTestId.getAndIncrement(); IoTestFuture fut = new IoTestFuture(id, 1); @@ -445,7 +458,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa * @param warmup Warmup duration in milliseconds. * @param duration Test duration in milliseconds. * @param threads Thread count. - * @param maxLatency Max latency in nanoseconds. + * @param latencyLimit Max latency in nanoseconds. * @param rangesCnt Ranges count in resulting histogram. * @param payLoadSize Payload size in bytes. * @param procFromNioThread {@code True} to process requests in NIO threads. @@ -455,7 +468,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa final long warmup, final long duration, final int threads, - final long maxLatency, + final long latencyLimit, final int rangesCnt, final int payLoadSize, final boolean procFromNioThread, @@ -469,8 +482,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa final LongAdder8 cnt = new LongAdder8(); final long sleepDuration = 5000; final byte[] payLoad = new byte[payLoadSize]; - final Map<UUID, long[]>[] res = new Map[threads]; - final ConcurrentMap<UUID, GridAtomicLong> maxLatencies = new ConcurrentHashMap8<>(); + final Map<UUID, IoTestThreadLocalNodeResults>[] res = new Map[threads]; boolean failed = true; @@ -489,7 +501,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa "[warmup=" + warmup + ", duration=" + duration + ", threads=" + threads + - ", maxLatency=" + maxLatency + + ", latencyLimit=" + latencyLimit + ", rangesCnt=" + rangesCnt + ", payLoadSize=" + payLoadSize + ", procFromNioThreads=" + procFromNioThread + ']' @@ -529,22 +541,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa // At this point all threads have finished the test and // stored data to the resulting array of maps. // Need to iterate it over and sum values for all threads. - Map<UUID, long[]> res0 = new HashMap<>(); - - for (Map<UUID, long[]> r : res) { - for (Entry<UUID, long[]> e : r.entrySet()) { - long[] r0 = res0.get(e.getKey()); - - if (r0 == null) - res0.put(e.getKey(), e.getValue()); - else { - for (int i = 0; i < rangesCnt + 1; i++) - r0[i] += e.getValue()[i]; - } - } - } - - printIoTestResults(maxLatency / (1000 * rangesCnt), res0, maxLatencies); + printIoTestResults(res); } catch (InterruptedException | BrokenBarrierException e) { U.error(log, "IO test failed.", e); @@ -566,7 +563,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa boolean failed = true; ThreadLocalRandom rnd = ThreadLocalRandom.current(); int size = nodes.size(); - Map<UUID, long[]> res0 = res[i0]; + Map<UUID, IoTestThreadLocalNodeResults> res0 = res[i0]; try { boolean warmupFinished0 = false; @@ -582,38 +579,22 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa ClusterNode node = nodes.get(rnd.nextInt(size)); - long start = System.nanoTime(); - - sendIoTest(node, payLoad, procFromNioThread).get(); - - long latency = System.nanoTime() - start; + List<IgniteIoTestMessage> msgs = sendIoTest(node, payLoad, procFromNioThread).get(); cnt.increment(); - long[] latencies = res0.get(node.id()); - - if (latencies == null) - res0.put(node.id(), latencies = new long[rangesCnt + 1]); - - if (latency >= maxLatency) { - latencies[rangesCnt]++; // Timed out. + for (IgniteIoTestMessage msg : msgs) { + UUID nodeId = msg.senderNodeId(); - GridAtomicLong maxLatency = maxLatencies.get(node.id()); + assert nodeId != null; - if (maxLatency == null) { - GridAtomicLong old = maxLatencies.putIfAbsent(node.id(), - maxLatency = new GridAtomicLong()); + IoTestThreadLocalNodeResults nodeRes = res0.get(nodeId); - if (old != null) - maxLatency = old; - } + if (nodeRes == null) + res0.put(nodeId, + nodeRes = new IoTestThreadLocalNodeResults(rangesCnt, latencyLimit)); - maxLatency.setIfGreater(latency); - } - else { - int idx = (int)Math.floor((1.0 * latency) / ((1.0 * maxLatency) / rangesCnt)); - - latencies[idx]++; + nodeRes.onResult(msg); } } @@ -641,30 +622,44 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } /** - * @param binLatencyMcs Bin latency in microseconds. - * @param res Resulting map. - * @param maxLatencies Max latency for each node. + * @param rawRes Resulting map. */ private void printIoTestResults( - long binLatencyMcs, - Map<UUID, long[]> res, - ConcurrentMap<UUID, GridAtomicLong> maxLatencies + Map<UUID, IoTestThreadLocalNodeResults>[] rawRes ) { + Map<UUID, IoTestNodeResults> res = new HashMap<>(); + + for (Map<UUID, IoTestThreadLocalNodeResults> r : rawRes) { + for (Entry<UUID, IoTestThreadLocalNodeResults> e : r.entrySet()) { + IoTestNodeResults r0 = res.get(e.getKey()); + + if (r0 == null) + res.put(e.getKey(), r0 = new IoTestNodeResults()); + + r0.add(e.getValue()); + } + } + + SimpleDateFormat dateFmt = new SimpleDateFormat("HH:mm:ss,SSS"); + StringBuilder b = new StringBuilder(U.nl()) - .append("IO test results (round-trip count per each latency bin) " + - "[binLatency=" + binLatencyMcs + "mcs]") + .append("IO test results (round-trip count per each latency bin).") .append(U.nl()); - for (Entry<UUID, long[]> e : res.entrySet()) { + for (Entry<UUID, IoTestNodeResults> e : res.entrySet()) { ClusterNode node = ctx.discovery().node(e.getKey()); + long binLatencyMcs = e.getValue().binLatencyMcs(); + b.append("Node ID: ").append(e.getKey()).append(" (addrs=") - .append(node != null ? node.addresses().toString() : "n/a").append(')').append(U.nl()); + .append(node != null ? node.addresses().toString() : "n/a") + .append(", binLatency=").append(binLatencyMcs).append("mcs") + .append(')').append(U.nl()); b.append("Latency bin, mcs | Count exclusive | Percentage exclusive | " + "Count inclusive | Percentage inclusive ").append(U.nl()); - long[] nodeRes = e.getValue(); + long[] nodeRes = e.getValue().resLatency; long sum = 0; @@ -688,15 +683,49 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa curSum, (100.0 * curSum) / sum)); } - GridAtomicLong maxLatency = maxLatencies.get(e.getKey()); + b.append(U.nl()).append("Total latency (ns): ").append(U.nl()) + .append(String.format("%15d", e.getValue().totalLatency)).append(U.nl()); + + b.append(U.nl()).append("Max latencies (ns):").append(U.nl()); + format(b, e.getValue().maxLatency, dateFmt); + + b.append(U.nl()).append("Max request send queue times (ns):").append(U.nl()); + format(b, e.getValue().maxReqSendQueueTime, dateFmt); + + b.append(U.nl()).append("Max request receive queue times (ns):").append(U.nl()); + format(b, e.getValue().maxReqRcvQueueTime, dateFmt); + + b.append(U.nl()).append("Max response send queue times (ns):").append(U.nl()); + format(b, e.getValue().maxResSendQueueTime, dateFmt); + + b.append(U.nl()).append("Max response receive queue times (ns):").append(U.nl()); + format(b, e.getValue().maxResRcvQueueTime, dateFmt); - b.append("Max latency (ns): ").append(maxLatency != null ? maxLatency.get() : -1).append(U.nl()); + b.append(U.nl()).append("Max request wire times (millis):").append(U.nl()); + format(b, e.getValue().maxReqWireTimeMillis, dateFmt); + + b.append(U.nl()).append("Max response wire times (millis):").append(U.nl()); + format(b, e.getValue().maxResWireTimeMillis, dateFmt); + + b.append(U.nl()); } if (log.isInfoEnabled()) log.info(b.toString()); } + /** + * @param b Builder. + * @param pairs Pairs to format. + * @param dateFmt Formatter. + */ + private void format(StringBuilder b, Collection<IgnitePair<Long>> pairs, SimpleDateFormat dateFmt) { + for (IgnitePair<Long> p : pairs) { + b.append(String.format("%15d", p.get1())).append(" ") + .append(dateFmt.format(new Date(p.get2()))).append(U.nl()); + } + } + /** {@inheritDoc} */ @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"}) @Override public void onKernalStart0() throws IgniteCheckedException { @@ -2857,12 +2886,15 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** * */ - private class IoTestFuture extends GridFutureAdapter<Object> { + private class IoTestFuture extends GridFutureAdapter<List<IgniteIoTestMessage>> { /** */ private final long id; /** */ - private int cntr; + private final int cntr; + + /** */ + private final List<IgniteIoTestMessage> ress; /** * @param id ID. @@ -2873,24 +2905,28 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa this.id = id; this.cntr = cntr; + + ress = new ArrayList<>(cntr); } /** * */ - void onResponse() { + void onResponse(IgniteIoTestMessage res) { boolean complete; synchronized (this) { - complete = --cntr == 0; + ress.add(res); + + complete = cntr == ress.size(); } if (complete) - onDone(); + onDone(ress); } /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { + @Override public boolean onDone(List<IgniteIoTestMessage> res, @Nullable Throwable err) { if (super.onDone(res, err)) { ioTestMap().remove(id); @@ -2905,4 +2941,210 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return S.toString(IoTestFuture.class, this); } } + + /** + * + */ + private static class IoTestThreadLocalNodeResults { + /** */ + private final long[] resLatency; + + /** */ + private final int rangesCnt; + + /** */ + private long totalLatency; + + /** */ + private long maxLatency; + + /** */ + private long maxLatencyTs; + + /** */ + private long maxReqSendQueueTime; + + /** */ + private long maxReqSendQueueTimeTs; + + /** */ + private long maxReqRcvQueueTime; + + /** */ + private long maxReqRcvQueueTimeTs; + + /** */ + private long maxResSendQueueTime; + + /** */ + private long maxResSendQueueTimeTs; + + /** */ + private long maxResRcvQueueTime; + + /** */ + private long maxResRcvQueueTimeTs; + + /** */ + private long maxReqWireTimeMillis; + + /** */ + private long maxReqWireTimeTs; + + /** */ + private long maxResWireTimeMillis; + + /** */ + private long maxResWireTimeTs; + + /** */ + private final long latencyLimit; + + /** + * @param rangesCnt Ranges count. + * @param latencyLimit + */ + public IoTestThreadLocalNodeResults(int rangesCnt, long latencyLimit) { + this.rangesCnt = rangesCnt; + this.latencyLimit = latencyLimit; + + resLatency = new long[rangesCnt + 1]; + } + + /** + * @param msg + */ + public void onResult(IgniteIoTestMessage msg) { + long now = System.currentTimeMillis(); + + long latency = msg.responseProcessedTs() - msg.requestCreateTs(); + + int idx = latency >= latencyLimit ? + rangesCnt /* Timed out. */ : + (int)Math.floor((1.0 * latency) / ((1.0 * latencyLimit) / rangesCnt)); + + resLatency[idx]++; + + totalLatency += latency; + + if (maxLatency < latency) { + maxLatency = latency; + maxLatencyTs = now; + } + + long reqSndQueueTime = msg.requestSendTs() - msg.requestCreateTs(); + + if (maxReqSendQueueTime < reqSndQueueTime) { + maxReqSendQueueTime = reqSndQueueTime; + maxReqSendQueueTimeTs = now; + } + + long reqRcvQueueTime = msg.requestProcessTs() - msg.requestReceiveTs(); + + if (maxReqRcvQueueTime < reqRcvQueueTime) { + maxReqRcvQueueTime = reqRcvQueueTime; + maxReqRcvQueueTimeTs = now; + } + + long resSndQueueTime = msg.responseSendTs() - msg.requestProcessTs(); + + if (maxResSendQueueTime < resSndQueueTime) { + maxResSendQueueTime = resSndQueueTime; + maxResSendQueueTimeTs = now; + } + + long resRcvQueueTime = msg.responseProcessedTs() - msg.responseReceiveTs(); + + if (maxResRcvQueueTime < resRcvQueueTime) { + maxResRcvQueueTime = resRcvQueueTime; + maxResRcvQueueTimeTs = now; + } + + long reqWireTimeMillis = msg.requestReceivedTsMillis() - msg.requestSendTsMillis(); + + if (maxReqWireTimeMillis < reqWireTimeMillis) { + maxReqWireTimeMillis = reqWireTimeMillis; + maxReqWireTimeTs = now; + } + + long resWireTimeMillis = msg.responseRecievedTsMillis() - msg.requestSendTsMillis(); + + if (maxResWireTimeMillis < resWireTimeMillis) { + maxResWireTimeMillis = resWireTimeMillis; + maxResWireTimeTs = now; + } + } + } + + /** + * + */ + private static class IoTestNodeResults { + /** */ + private long latencyLimit; + + /** */ + private long[] resLatency; + + /** */ + private long totalLatency; + + /** */ + private Collection<IgnitePair<Long>> maxLatency = new ArrayList<>(); + + /** */ + private Collection<IgnitePair<Long>> maxReqSendQueueTime = new ArrayList<>(); + + /** */ + private Collection<IgnitePair<Long>> maxReqRcvQueueTime = new ArrayList<>(); + + /** */ + private Collection<IgnitePair<Long>> maxResSendQueueTime = new ArrayList<>(); + + /** */ + private Collection<IgnitePair<Long>> maxResRcvQueueTime = new ArrayList<>(); + + /** */ + private Collection<IgnitePair<Long>> maxReqWireTimeMillis = new ArrayList<>(); + + /** */ + private Collection<IgnitePair<Long>> maxResWireTimeMillis = new ArrayList<>(); + + /** + * @param res Node results to add. + */ + public void add(IoTestThreadLocalNodeResults res) { + if (resLatency == null) { + resLatency = res.resLatency.clone(); + latencyLimit = res.latencyLimit; + } + else { + assert latencyLimit == res.latencyLimit; + assert resLatency.length == res.resLatency.length; + + for (int i = 0; i < resLatency.length; i++) + resLatency[i] += res.resLatency[i]; + } + + totalLatency += res.totalLatency; + + maxLatency.add(F.pair(res.maxLatency, res.maxLatencyTs)); + maxReqSendQueueTime.add(F.pair(res.maxReqSendQueueTime, res.maxReqSendQueueTimeTs)); + maxReqRcvQueueTime.add(F.pair(res.maxReqRcvQueueTime, res.maxReqRcvQueueTimeTs)); + maxResSendQueueTime.add(F.pair(res.maxResSendQueueTime, res.maxResSendQueueTimeTs)); + maxResRcvQueueTime.add(F.pair(res.maxResRcvQueueTime, res.maxResRcvQueueTimeTs)); + maxReqWireTimeMillis.add(F.pair(res.maxReqWireTimeMillis, res.maxReqWireTimeTs)); + maxResWireTimeMillis.add(F.pair(res.maxResWireTimeMillis, res.maxResWireTimeTs)); + } + + /** + * @return Bin latency in microseconds. + */ + public long binLatencyMcs() { + if (resLatency == null) + throw new IllegalStateException(); + + return latencyLimit / (1000 * (resLatency.length - 1)); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/018b25b2/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java index 0a41622..3e0fa76 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.managers.communication; import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -45,6 +47,43 @@ public class IgniteIoTestMessage implements Message { /** */ private byte payload[]; + /** */ + private long reqCreateTs; + + /** */ + private long reqSndTs; + + /** */ + private long reqSndTsMillis; + + /** */ + private long reqRcvTs; + + /** */ + private long reqRcvTsMillis; + + /** */ + private long reqProcTs; + + /** */ + private long resSndTs; + + /** */ + private long resSndTsMillis; + + /** */ + private long resRcvTs; + + /** */ + private long resRcvTsMillis; + + /** */ + private long resProcTs; + + /** */ + @GridDirectTransient + private UUID sndNodeId; + /** * */ @@ -61,6 +100,8 @@ public class IgniteIoTestMessage implements Message { this.id = id; this.req = req; this.payload = payload; + + reqCreateTs = System.nanoTime(); } /** @@ -126,10 +167,173 @@ public class IgniteIoTestMessage implements Message { return id; } + /** + * @return Request create timestamp. + */ + public long requestCreateTs() { + return reqCreateTs; + } + + /** + * @return Request send timestamp. + */ + public long requestSendTs() { + return reqSndTs; + } + + /** + * @return Request receive timestamp. + */ + public long requestReceiveTs() { + return reqRcvTs; + } + + /** + * @return Request process started timestamp. + */ + public long requestProcessTs() { + return reqProcTs; + } + + /** + * @return Response send timestamp. + */ + public long responseSendTs() { + return resSndTs; + } + + /** + * @return Response receive timestamp. + */ + public long responseReceiveTs() { + return resRcvTs; + } + + /** + * @return Response process timestamp. + */ + public long responseProcessTs() { + return resProcTs; + } + + /** + * @return Request send timestamp (millis). + */ + public long requestSendTsMillis() { + return reqSndTsMillis; + } + + /** + * @return Request received timestamp (millis). + */ + public long requestReceivedTsMillis() { + return reqRcvTsMillis; + } + + /** + * @return Response send timestamp (millis). + */ + public long responseSendTsMillis() { + return resSndTsMillis; + } + + /** + * @return Response received timestamp (millis). + */ + public long responseRecievedTsMillis() { + return resRcvTsMillis; + } + + /** + * This method is called to initialize tracing variables. + * TODO: introduce direct message lifecycle API? + */ + public void onAfterRead() { + if (req && reqRcvTs == 0) { + reqRcvTs = System.nanoTime(); + + reqRcvTsMillis = System.currentTimeMillis(); + } + + if (!req && resRcvTs == 0) { + resRcvTs = System.nanoTime(); + + resRcvTsMillis = System.currentTimeMillis(); + } + } + + /** + * This method is called to initialize tracing variables. + * TODO: introduce direct message lifecycle API? + */ + public void onBeforeWrite() { + if (req && reqSndTs == 0) { + reqSndTs = System.nanoTime(); + + reqSndTsMillis = System.currentTimeMillis(); + } + + if (!req && resSndTs == 0) { + resSndTs = System.nanoTime(); + + resSndTsMillis = System.currentTimeMillis(); + } + } + + /** + * + */ + public void copyDataFromRequest(IgniteIoTestMessage req) { + reqCreateTs = req.reqCreateTs; + + reqSndTs = req.reqSndTs; + reqSndTsMillis = req.reqSndTsMillis; + + reqRcvTs = req.reqRcvTs; + reqRcvTsMillis = req.reqRcvTsMillis; + } + + /** + * + */ + public void onRequestProcessed() { + reqProcTs = System.nanoTime(); + } + + /** + * + */ + public void onResponseProcessed() { + resProcTs = System.nanoTime(); + } + + /** + * @return Response processed timestamp. + */ + public long responseProcessedTs() { + return resProcTs; + } + + /** + * @return Sender node ID. + */ + public UUID senderNodeId() { + return sndNodeId; + } + + /** + * @param sndNodeId Sender node ID. + */ + public void senderNodeId(UUID sndNodeId) { + this.sndNodeId = sndNodeId; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); + onBeforeWrite(); + if (!writer.isHeaderWritten()) { if (!writer.writeHeader(directType(), fieldsCount())) return false; @@ -162,6 +366,72 @@ public class IgniteIoTestMessage implements Message { writer.incrementState(); + case 4: + if (!writer.writeLong("reqCreateTs", reqCreateTs)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeLong("reqProcTs", reqProcTs)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeLong("reqRcvTs", reqRcvTs)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeLong("reqRcvTsMillis", reqRcvTsMillis)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeLong("reqSndTs", reqSndTs)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeLong("reqSndTsMillis", reqSndTsMillis)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeLong("resProcTs", resProcTs)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeLong("resRcvTs", resRcvTs)) + return false; + + writer.incrementState(); + + case 12: + if (!writer.writeLong("resRcvTsMillis", resRcvTsMillis)) + return false; + + writer.incrementState(); + + case 13: + if (!writer.writeLong("resSndTs", resSndTs)) + return false; + + writer.incrementState(); + + case 14: + if (!writer.writeLong("resSndTsMillis", resSndTsMillis)) + return false; + + writer.incrementState(); + } return true; @@ -207,8 +477,98 @@ public class IgniteIoTestMessage implements Message { reader.incrementState(); + case 4: + reqCreateTs = reader.readLong("reqCreateTs"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + reqProcTs = reader.readLong("reqProcTs"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + reqRcvTs = reader.readLong("reqRcvTs"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + reqRcvTsMillis = reader.readLong("reqRcvTsMillis"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + reqSndTs = reader.readLong("reqSndTs"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + reqSndTsMillis = reader.readLong("reqSndTsMillis"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + resProcTs = reader.readLong("resProcTs"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + resRcvTs = reader.readLong("resRcvTs"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 12: + resRcvTsMillis = reader.readLong("resRcvTsMillis"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 13: + resSndTs = reader.readLong("resSndTs"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: + resSndTsMillis = reader.readLong("resSndTsMillis"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + onAfterRead(); + return reader.afterMessageRead(IgniteIoTestMessage.class); } @@ -219,7 +579,7 @@ public class IgniteIoTestMessage implements Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 4; + return 15; } /** {@inheritDoc} */
