io test
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d1759cf3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d1759cf3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d1759cf3 Branch: refs/heads/ignite-comm-balance Commit: d1759cf3a919a649f0b9ae4277fdfec9d29c6498 Parents: d3ecf93 Author: sboikov <[email protected]> Authored: Wed Sep 28 17:18:08 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Sep 28 17:40:34 2016 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/GridTopic.java | 5 +- .../apache/ignite/internal/IgniteKernal.java | 10 + .../managers/communication/GridIoManager.java | 104 +++++++++ .../communication/GridIoMessageFactory.java | 7 +- .../communication/IgniteIoTestMessage.java | 216 +++++++++++++++++++ .../yardstick/cache/IgniteIoTestBenchmark.java | 73 +++++++ 6 files changed, 413 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d1759cf3/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index 248f75b..dc20be0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -94,7 +94,10 @@ public enum GridTopic { TOPIC_QUERY, /** */ - TOPIC_TX; + TOPIC_TX, + + /** */ + TOPIC_IO_TEST; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d1759cf3/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 6c5a628..54371cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -3419,6 +3419,16 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } } + /** + * @param node Node. + * @param payload Message payload. + * @param processFromNioThread If {@code true} message is processed from NIO thread. + * @return Response future. + */ + public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean processFromNioThread) { + return ctx.io().sendIoTest(node, payload, processFromNioThread); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteKernal.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/d1759cf3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java index 3fdda30..d6a2835 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; @@ -35,6 +36,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -47,6 +50,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteComponentType; import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.direct.DirectMessageReader; import org.apache.ignite.internal.direct.DirectMessageWriter; import org.apache.ignite.internal.managers.GridManagerAdapter; @@ -57,6 +61,8 @@ import org.apache.ignite.internal.processors.platform.message.PlatformMessageFil import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet; import org.apache.ignite.internal.util.GridSpinReadWriteLock; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridTuple3; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -87,6 +93,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER; +import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL; @@ -207,6 +214,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa /** Stopping flag. */ private boolean stopping; + /** */ + private final AtomicReference<ConcurrentHashMap<Long, GridFutureAdapter>> ioTestMap = new AtomicReference<>(); + + /** */ + private final AtomicLong ioTestId = new AtomicLong(); + /** * @param ctx Grid kernal context. */ @@ -348,6 +361,87 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa log.debug(startInfo()); registerIoPoolExtensions(); + + addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg) { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node == null) + return; + + IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg; + + if (msg0.request()) { + IgniteIoTestMessage res = new IgniteIoTestMessage(msg0.id(), false, null); + + res.flags(msg0.flags()); + + try { + send(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send IO test response [msg=" + msg0 + "]", e); + } + } + else { + GridFutureAdapter fut = ioTestMap().remove(msg0.id()); + + if (fut == null) { + U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']'); + + return; + } + + fut.onDone(); + } + } + }); + } + + /** + * @param node Node. + * @param payload Payload. + * @param processFromNioThread If {@code true} message is processed from NIO thread. + * @return Response future. + */ + public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean processFromNioThread) { + if (ctx.localNodeId().equals(node.id())) + throw new IllegalArgumentException(); + + long id = ioTestId.getAndIncrement(); + + GridFutureAdapter fut = new GridFutureAdapter(); + + ioTestMap().put(id, fut); + + try { + IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload); + + msg.processFromNioThread(processFromNioThread); + + send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + ioTestMap().remove(id); + + return new GridFinishedFuture(e); + } + + return fut; + } + + /** + * @return IO test futures map. + */ + private ConcurrentHashMap<Long, GridFutureAdapter> ioTestMap() { + ConcurrentHashMap<Long, GridFutureAdapter> map = ioTestMap.get(); + + if (map == null) { + if (!ioTestMap.compareAndSet(null, map = new ConcurrentHashMap<>())) + map = ioTestMap.get(); + } + + return map; } /** @@ -836,6 +930,16 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa } }; + if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) { + IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message(); + + if (msg0.processFromNioThread()) { + c.run(); + + return; + } + } + try { pool(plc).execute(c); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d1759cf3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index bd6ac5b..1b92465 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -761,7 +761,12 @@ public class GridIoMessageFactory implements MessageFactory { break; - // [-3..119] [124-125] - this + case 126: + msg = new IgniteIoTestMessage(); + + break; + + // [-3..119] [124-126] - this // [120..123] - DR // [-4..-22, -30..-35] - SQL default: http://git-wip-us.apache.org/repos/asf/ignite/blob/d1759cf3/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java new file mode 100644 index 0000000..08bd110 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.managers.communication; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +import java.nio.ByteBuffer; + +/** + * + */ +public class IgniteIoTestMessage implements Message { + /** */ + private static byte FLAG_PROCESS_FROM_NIO = 1; + + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long id; + + /** */ + private byte flags; + + /** */ + private boolean req; + + /** */ + private byte payload[]; + + /** + * + */ + public IgniteIoTestMessage() { + // No-op. + } + + /** + * @param id + * @param req + * @param payload + */ + public IgniteIoTestMessage(long id, boolean req, byte[] payload) { + this.id = id; + this.req = req; + this.payload = payload; + } + + public boolean processFromNioThread() { + return isFlag(FLAG_PROCESS_FROM_NIO); + } + + public void processFromNioThread(boolean processFromNioThread) { + setFlag(processFromNioThread, FLAG_PROCESS_FROM_NIO); + } + + public void flags(byte flags) { + this.flags = flags; + } + + public byte flags() { + return flags; + } + + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + private void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reads flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + private boolean isFlag(int mask) { + return (flags & mask) != 0; + } + + public boolean request() { + return req; + } + + public long id() { + return id; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeLong("id", id)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeByteArray("payload", payload)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeBoolean("req", req)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + id = reader.readLong("id"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + payload = reader.readByteArray("payload"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + req = reader.readBoolean("req"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(IgniteIoTestMessage.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 126; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteIoTestMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d1759cf3/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java new file mode 100644 index 0000000..bee45e0 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/cache/IgniteIoTestBenchmark.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.cache; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.yardstick.IgniteAbstractBenchmark; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkUtils; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * + */ +public class IgniteIoTestBenchmark extends IgniteAbstractBenchmark { + /** */ + private List<ClusterNode> targetNodes; + + /** */ + private IgniteKernal ignite; + + /** {@inheritDoc} */ + @Override public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + + ignite = (IgniteKernal)ignite(); + + targetNodes = new ArrayList<>(); + + ClusterNode loc = ignite().cluster().localNode(); + + Collection<ClusterNode> nodes = ignite().cluster().forServers().nodes(); + + for (ClusterNode node : nodes) { + if (!loc.equals(node)) + targetNodes.add(node); + } + + if (targetNodes.isEmpty()) + throw new IgniteException("Failed to find remote server nodes [nodes=" + nodes + ']'); + + BenchmarkUtils.println(cfg, "Initialized target nodes: " + targetNodes + ']'); + } + + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + ClusterNode node = targetNodes.get(nextRandom(targetNodes.size())); + + ignite.sendIoTest(node, null, false).get(); + + return true; + } +}
