Repository: incubator-ignite Updated Branches: refs/heads/ignite-499_1 4ad742b8f -> b161f1c45
# IGNITE-499 Implement Client communication. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b161f1c4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b161f1c4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b161f1c4 Branch: refs/heads/ignite-499_1 Commit: b161f1c45e7be24d8e0d3e8c036b5c8391c23ccd Parents: 4ad742b Author: sevdokimov <[email protected]> Authored: Mon Apr 20 23:05:20 2015 +0300 Committer: sevdokimov <[email protected]> Committed: Mon Apr 20 23:05:20 2015 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 5 + .../tcp/TcpClientMessageWrapper.java | 176 +++++++++++++++++++ .../communication/tcp/TcpCommunicationSpi.java | 83 ++++++++- .../tcp/ClientTcpCommunicationSelfTest.java | 113 ++++++++++++ .../IgniteSpiCommunicationSelfTestSuite.java | 2 + 5 files changed, 377 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b161f1c4/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index b82147b..d64ff07 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -595,6 +595,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 112: + msg = new TcpClientMessageWrapper(); + + break; + default: if (ext != null) { for (MessageFactory factory : ext) { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b161f1c4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpClientMessageWrapper.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpClientMessageWrapper.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpClientMessageWrapper.java new file mode 100644 index 0000000..66d23dd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpClientMessageWrapper.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.plugin.extensions.communication.*; + +import java.nio.*; +import java.util.*; + +/** + * + */ +public class TcpClientMessageWrapper implements Message { + /** */ + private Message msg; + + /** */ + private UUID dest; + + /** */ + private UUID snd; + + /** + * Empty constructor required by {@link Message}. + */ + public TcpClientMessageWrapper() { + // No-op. + } + + /** + * @param msg Message. + * @param dest Destination. + */ + public TcpClientMessageWrapper(Message msg, UUID snd, UUID dest) { + this.msg = msg; + this.snd = snd; + this.dest = dest; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeMessage("msg", msg)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeUuid("sender", snd)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeUuid("dest", dest)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + msg = reader.readMessage("msg"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + snd = reader.readUuid("src"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + dest = reader.readUuid("dest"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 112; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 3; + } + + /** + * @return Message. + */ + public Message message() { + return msg; + } + + /** + * @param msg New message. + */ + public void message(Message msg) { + this.msg = msg; + } + + /** + * @return Destination. + */ + public UUID destination() { + return dest; + } + + /** + * @param dest New destination. + */ + public void destination(UUID dest) { + this.dest = dest; + } + + /** + * @return Source. + */ + public UUID sender() { + return snd; + } + + /** + * @param snd New sender. + */ + public void sender(UUID snd) { + this.snd = snd; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b161f1c4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 36bd03e..44c6b73 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -34,6 +34,7 @@ import org.apache.ignite.plugin.extensions.communication.*; import org.apache.ignite.resources.*; import org.apache.ignite.spi.*; import org.apache.ignite.spi.communication.*; +import org.apache.ignite.spi.discovery.tcp.internal.*; import org.jetbrains.annotations.*; import org.jsr166.*; @@ -150,6 +151,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Node attribute that is mapped to node's external addresses (value is <tt>comm.tcp.ext-addrs</tt>). */ public static final String ATTR_EXT_ADDRS = "comm.tcp.ext-addrs"; + /** Node attribute means that all messages should be send through router node + * (value is <tt>comm.tcp.connect.throw.router</tt>). + * @see org.apache.ignite.spi.discovery.tcp.TcpClientDiscoverySpi + */ + public static final String ATTR_CONNECT_TO_ROUTER_ONLY = "comm.tcp.connect.through.router"; + /** Default port which node sets listener to (value is <tt>47100</tt>). */ public static final int DFLT_PORT = 47100; @@ -322,6 +329,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** Local node ID message. */ private NodeIdMessage nodeIdMsg; + /** All messages shoult be sent through router node, not a directly. */ + private boolean connectToRouterOnly; + /** Received messages count. */ private final LongAdder8 rcvdMsgsCnt = new LongAdder8(); @@ -864,11 +874,16 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter Collection<InetSocketAddress> extAddrs = addrRslvr == null ? null : U.resolveAddresses(addrRslvr, F.flat(Arrays.asList(addrs.get1(), addrs.get2())), boundTcpPort); - return F.asMap( + Map<String, Object> res = F.asMap( createSpiAttributeName(ATTR_ADDRS), addrs.get1(), createSpiAttributeName(ATTR_HOST_NAMES), addrs.get2(), createSpiAttributeName(ATTR_PORT), boundTcpPort, createSpiAttributeName(ATTR_EXT_ADDRS), extAddrs); + + if (connectToRouterOnly) + res.put(createSpiAttributeName(ATTR_CONNECT_TO_ROUTER_ONLY), connectToRouterOnly); + + return res; } catch (IOException | IgniteCheckedException e) { throw new IgniteSpiException("Failed to resolve local host to addresses: " + locHost, e); @@ -1180,6 +1195,41 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (node.id().equals(locNodeId)) notifyListener(locNodeId, msg, NOOP); else { + ClusterNode locNode = getSpiContext().localNode(); + + if (connectToRouterOnly) { + if (locNode instanceof TcpDiscoveryNode) { + UUID routerNodeId = ((TcpDiscoveryNode)locNode).clientRouterNodeId(); + + if (routerNodeId != null && !routerNodeId.equals(node.id())) { + ClusterNode routerNode = getSpiContext().node(routerNodeId); + + if (routerNode != null) { + sendMessage(routerNode, new TcpClientMessageWrapper(msg, locNodeId, node.id())); + + return; + } + } + } + } + + if (Boolean.TRUE.equals(node.<Boolean>attribute(ATTR_CONNECT_TO_ROUTER_ONLY))) { + UUID routerNodeId = ((TcpDiscoveryNode)node).clientRouterNodeId(); + + if (routerNodeId != null) { + ClusterNode routerNode = getSpiContext().node(routerNodeId); + + if (routerNode != null) { + if (msg instanceof TcpClientMessageWrapper) + sendMessage(routerNode, msg); + else + sendMessage(routerNode, new TcpClientMessageWrapper(msg, locNodeId, node.id())); + + return; + } + } + } + GridCommunicationClient client = null; try { @@ -1190,7 +1240,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter UUID nodeId = null; - if (!client.async() && !getSpiContext().localNode().version().equals(node.version())) + if (!client.async() && !locNode.version().equals(node.version())) nodeId = node.id(); retry = client.sendMessage(nodeId, msg); @@ -1728,6 +1778,20 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter getExceptionRegistry().onException(msg, e); } + /** + * @return Use router. + */ + public boolean useRouter() { + return connectToRouterOnly; + } + + /** + * @param useRouter New use router. + */ + public void useRouter(boolean useRouter) { + this.connectToRouterOnly = useRouter; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TcpCommunicationSpi.class, this); @@ -2771,6 +2835,21 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter else c = NOOP; + if (msg instanceof TcpClientMessageWrapper) { + TcpClientMessageWrapper clientMsg = (TcpClientMessageWrapper)msg; + + if (getLocalNodeId().equals(clientMsg.destination())) + notifyListener(clientMsg.sender(), clientMsg.message(), c); + else { + ClusterNode destNode = getSpiContext().node(clientMsg.destination()); + + if (destNode != null) + sendMessage(destNode, clientMsg); + } + + return; + } + notifyListener(sndId, msg, c); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b161f1c4/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/ClientTcpCommunicationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/ClientTcpCommunicationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/ClientTcpCommunicationSelfTest.java new file mode 100644 index 0000000..794d846 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/ClientTcpCommunicationSelfTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp; + +import org.apache.ignite.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.resources.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.testframework.junits.common.*; +import org.apache.ignite.testframework.junits.spi.*; + +import java.util.*; + +/** + * + */ +@GridSpiTest(spi = TcpCommunicationSpi.class, group = "Communication SPI") +public class ClientTcpCommunicationSelfTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + if (client) { + TcpCommunicationSpi spi = new TcpCommunicationSpi(); + spi.useRouter(true); + + cfg.setCommunicationSpi(spi); + + TcpDiscoveryVmIpFinder clientIpFinder = new TcpDiscoveryVmIpFinder(); + + String addr = new ArrayList<>(ipFinder.getRegisteredAddresses()).iterator().next().toString(); + + if (addr.startsWith("/")) + addr = addr.substring(1); + + clientIpFinder.setAddresses(Arrays.asList(addr)); + + TcpClientDiscoverySpi discoSpi = new TcpClientDiscoverySpi(); + + discoSpi.setIpFinder(clientIpFinder); + + cfg.setDiscoverySpi(discoSpi); + + cfg.setClientMode(true); + } + else { + TcpDiscoverySpi discoverySpi = new TcpDiscoverySpi(); + + discoverySpi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(discoverySpi); + } + + return cfg; + } + + /** + * @throws Exception + */ + public void testClientCommunication() throws Exception { + startGrid(0); + startGrid(1); + + client = true; + + startGrid(2); + startGrid(3); + + Collection<UUID> uuids = ignite(1).compute().broadcast(new IdCollector()); + + for (int i = 0; i < 4; i++) + assert uuids.contains(ignite(i).cluster().localNode().id()); + } + + /** + * + */ + private static class IdCollector implements IgniteCallable<UUID> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** {@inheritDoc} */ + @Override public UUID call() throws Exception { + return ignite.cluster().localNode().id(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b161f1c4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java index 1d3bfcd..d509ffe 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java @@ -45,6 +45,8 @@ public class IgniteSpiCommunicationSelfTestSuite extends TestSuite { suite.addTest(new TestSuite(GridTcpCommunicationSpiConfigSelfTest.class)); + suite.addTest(new TestSuite(ClientTcpCommunicationSelfTest.class)); + return suite; } }
