Added IO latency test + made it available from MBean (cherry picked from commit 8195ae0)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c35dbf4e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c35dbf4e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c35dbf4e Branch: refs/heads/ignite-5398 Commit: c35dbf4ec3ba6b01932c76f14ad4c45fed402391 Parents: 8f1398f Author: Yakov Zhdanov <[email protected]> Authored: Mon May 15 18:03:07 2017 +0300 Committer: Yakov Zhdanov <[email protected]> Committed: Mon May 15 18:19:07 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 14 ++ .../managers/communication/GridIoManager.java | 204 +++++++++++++++++++ .../org/apache/ignite/mxbean/IgniteMXBean.java | 44 ++++ 3 files changed, 262 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c35dbf4e/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 391509e..1445443 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -3723,6 +3723,20 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** {@inheritDoc} */ + @Override public void runIoTest( + long warmup, + long duration, + int threads, + long maxLatency, + int rangesCnt, + int payLoadSize, + boolean procFromNioThread + ) { + ctx.io().runIoTest(warmup, duration, threads, maxLatency, rangesCnt, payLoadSize, procFromNioThread, + new ArrayList(ctx.cluster().get().forServers().forRemotes().nodes())); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteKernal.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/c35dbf4e/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 6738909..81233ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -21,16 +21,22 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -83,6 +89,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import org.jsr166.ConcurrentLinkedDeque8; +import org.jsr166.LongAdder8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; @@ -420,6 +427,203 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa return map; } + /** + * @param warmup Warmup duration in milliseconds. + * @param duration Test duration in milliseconds. + * @param threads Thread count. + * @param maxLatency Max latency in nanoseconds. + * @param rangesCnt Ranges count in resulting histogram. + * @param payLoadSize Payload size in bytes. + * @param procFromNioThread {@code True} to process requests in NIO threads. + * @param nodes Nodes participating in test. + */ + public void runIoTest( + final long warmup, + final long duration, + final int threads, + final long maxLatency, + final int rangesCnt, + final int payLoadSize, + final boolean procFromNioThread, + final List<ClusterNode> nodes + ) { + ExecutorService svc = Executors.newFixedThreadPool(threads + 1); + + final AtomicBoolean warmupFinished = new AtomicBoolean(); + final AtomicBoolean done = new AtomicBoolean(); + final CyclicBarrier bar = new CyclicBarrier(threads + 1); + final LongAdder8 cnt = new LongAdder8(); + final long sleepDuration = 5000; + final byte[] payLoad = new byte[payLoadSize]; + final Map<UUID, long[]>[] res = new Map[threads]; + + boolean failed = true; + + try { + svc.execute(new Runnable() { + @Override public void run() { + boolean failed = true; + + try { + bar.await(); + + long start = System.currentTimeMillis(); + + if (log.isInfoEnabled()) + log.info("IO test started " + + "[warmup=" + warmup + + ", duration=" + duration + + ", threads=" + threads + + ", maxLatency=" + maxLatency + + ", rangesCnt=" + rangesCnt + + ", payLoadSize=" + payLoadSize + + ", procFromNioThreads=" + procFromNioThread + ']' + ); + + for (;;) { + if (!warmupFinished.get() && System.currentTimeMillis() - start > warmup) { + if (log.isInfoEnabled()) + log.info("IO test warmup finished."); + + warmupFinished.set(true); + + start = System.currentTimeMillis(); + } + + if (warmupFinished.get() && System.currentTimeMillis() - start > duration) { + if (log.isInfoEnabled()) + log.info("IO test finished, will wait for all threads to finish."); + + done.set(true); + + bar.await(); + + failed = false; + + break; + } + + if (log.isInfoEnabled()) + log.info("IO test [opsCnt/sec=" + (cnt.sumThenReset() * 1000 / sleepDuration) + + ", warmup=" + !warmupFinished.get() + + ", elapsed=" + (System.currentTimeMillis() - start) + ']'); + + Thread.sleep(sleepDuration); + } + + // At this point all threads have finished the test and stored data to the result map. + Map<UUID, long[]> res0 = new HashMap<>(); + + for (Map<UUID, long[]> r : res) { + for (Entry<UUID, long[]> e : r.entrySet()) { + long[] r0 = res0.get(e.getKey()); + + if (r0 == null) + res0.put(e.getKey(), e.getValue()); + else { + for (int i = 0; i < rangesCnt + 1; i++) + r0[i] += e.getValue()[i]; + } + } + } + + StringBuilder b = new StringBuilder("IO test results " + + "[range=" + (maxLatency / (1000 * rangesCnt)) + "mcs]"); + + b.append(U.nl()); + + for (Entry<UUID, long[]> e : res0.entrySet()) { + ClusterNode node = ctx.discovery().node(e.getKey()); + + b.append(" ").append(e.getKey()).append(" (addrs=") + .append(node != null ? node.addresses().toString() : "n/a").append(')') + .append(Arrays.toString(e.getValue())).append(U.nl()); + } + + if (log.isInfoEnabled()) + log.info(b.toString()); + } + catch (InterruptedException | BrokenBarrierException e) { + U.error(log, "IO test failed.", e); + } + finally { + if (failed) + bar.reset(); + } + } + }); + + for (int i = 0; i < threads; i++) { + final int i0 = i; + + res[i] = U.newHashMap(nodes.size()); + + svc.execute(new Runnable() { + @Override public void run() { + boolean failed = true; + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + int size = nodes.size(); + Map<UUID, long[]> res0 = res[i0]; + + try { + boolean warmupFinished0 = false; + + bar.await(); + + for (;;) { + if (done.get()) + break; + + if (!warmupFinished0) + warmupFinished0 = warmupFinished.get(); + + ClusterNode node = nodes.get(rnd.nextInt(size)); + + long start = System.nanoTime(); + + sendIoTest(node, payLoad, procFromNioThread).get(); + + long latency = System.nanoTime() - start; + + cnt.increment(); + + long[] latencies = res0.get(node.id()); + + if (latencies == null) + res0.put(node.id(), latencies = new long[rangesCnt + 1]); + + if (latency >= maxLatency) + latencies[rangesCnt]++; // Timed out. + else { + int idx = (int)Math.floor((1.0 * latency) / ((1.0 * maxLatency) / rangesCnt)); + + latencies[idx]++; + } + } + + bar.await(); + + failed = false; + } + catch (Exception e) { + U.error(log, "IO test worker thread failed.", e); + } + finally { + if (failed) + bar.reset(); + } + } + }); + } + + failed = false; + } + finally { + if (failed) + U.shutdownNow(GridIoManager.class, svc, log); + } + } + /** {@inheritDoc} */ @SuppressWarnings({"deprecation", "SynchronizationOnLocalVariableOrMethodParameter"}) @Override public void onKernalStart0() throws IgniteCheckedException { http://git-wip-us.apache.org/repos/asf/ignite/blob/c35dbf4e/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java index bf84b0d..ce63e4f 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/IgniteMXBean.java @@ -385,4 +385,48 @@ public interface IgniteMXBean { */ @MXBeanDescription("Dumps debug information for the current node.") public void dumpDebugInfo(); + + /** + * Runs IO latency test against all remote server nodes in cluster. + * + * @param warmup Warmup duration in milliseconds. + * @param duration Test duration in milliseconds. + * @param threads Thread count. + * @param maxLatency Max latency in nanoseconds. + * @param rangesCnt Ranges count in resulting histogram. + * @param payLoadSize Payload size in bytes. + * @param procFromNioThread {@code True} to process requests in NIO threads. + */ + @MXBeanDescription("Runs IO latency test against all remote server nodes in cluster.") + @MXBeanParametersNames( + { + "warmup", + "duration", + "threads", + "maxLatency", + "rangesCnt", + "payLoadSize", + "procFromNioThread" + } + ) + @MXBeanParametersDescriptions( + { + "Warmup duration (millis).", + "Test duration (millis).", + "Threads count.", + "Maximum latency expected (nanos).", + "Ranges count for histogram.", + "Payload size (bytes).", + "Process requests in NIO-threads flag." + } + ) + void runIoTest( + long warmup, + long duration, + int threads, + long maxLatency, + int rangesCnt, + int payLoadSize, + boolean procFromNioThread + ); }
