http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index fc4f5b1..1dd270b 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -266,7 +266,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter public static final int DFLT_PORT = 47100; /** Default port which node sets listener for shared memory connections (value is <tt>48100</tt>). */ - public static final int DFLT_SHMEM_PORT = 48100; + public static final int DFLT_SHMEM_PORT = -1; /** Default idle connection timeout (value is <tt>30000</tt>ms). */ public static final long DFLT_IDLE_CONN_TIMEOUT = 30000; @@ -288,9 +288,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** * Default count of selectors for TCP server equals to - * {@code "Math.min(8, Runtime.getRuntime().availableProcessors())"}. + * {@code "Runtime.getRuntime().availableProcessors()"}. */ - public static final int DFLT_SELECTORS_CNT = Math.min(8, Runtime.getRuntime().availableProcessors()); + public static final int DFLT_SELECTORS_CNT = Runtime.getRuntime().availableProcessors(); /** Connection index meta for session. */ private static final int CONN_IDX_META = GridNioSessionMetaKey.nextUniqueKey(); @@ -358,7 +358,12 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter if (log.isDebugEnabled()) log.debug("Sending local node ID to newly accepted session: " + ses); - ses.send(nodeIdMessage()); + try { + ses.sendNoFuture(nodeIdMessage()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } } } @@ -636,7 +641,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ", rcvCnt=" + rcvCnt + ']'); } - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(rcvCnt)); + ses.systemMessage(new RecoveryLastReceivedMessage(rcvCnt)); recovery.lastAcknowledged(rcvCnt); } @@ -688,8 +693,13 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter nioSrvr.resend(ses); - if (sndRes) - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); + try { + if (sndRes) + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } recovery.onConnected(); @@ -713,12 +723,17 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter GridNioRecoveryDescriptor recovery, GridNioSession ses, boolean sndRes) { - ses.inRecoveryDescriptor(recovery); + try { + ses.inRecoveryDescriptor(recovery); - if (sndRes) - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); + if (sndRes) + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recovery.received())); - recovery.onConnected(); + recovery.onConnected(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } } /** @@ -755,30 +770,35 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public void apply(Boolean success) { - failed = !success; + try { + failed = !success; - if (success) { - IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> msgFut) { - try { - msgFut.get(); + if (success) { + IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> msgFut) { + try { + msgFut.get(); - connectedNew(recoveryDesc, ses, false); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send recovery handshake " + - "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + connectedNew(recoveryDesc, ses, false); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send recovery handshake " + + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); - recoveryDesc.release(); + recoveryDesc.release(); + } } - } - }; + }; - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); + } + else + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1)); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); } - else - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(-1)); } } @@ -839,32 +859,37 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter /** {@inheritDoc} */ @Override public void apply(Boolean success) { if (success) { - IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> msgFut) { - try { - msgFut.get(); + try { + IgniteInClosure<IgniteInternalFuture<?>> lsnr = new IgniteInClosure<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> msgFut) { + try { + msgFut.get(); - GridTcpNioCommunicationClient client = - connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); + GridTcpNioCommunicationClient client = + connected(recoveryDesc, ses, rmtNode, msg.received(), false, createClient); - fut.onDone(client); - } - catch (IgniteCheckedException e) { - if (log.isDebugEnabled()) - log.debug("Failed to send recovery handshake " + - "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); + fut.onDone(client); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to send recovery handshake " + + "[rmtNode=" + rmtNode.id() + ", err=" + e + ']'); - recoveryDesc.release(); + recoveryDesc.release(); - fut.onDone(); - } - finally { - clientFuts.remove(connKey, fut); + fut.onDone(); + } + finally { + clientFuts.remove(connKey, fut); + } } - } - }; + }; - nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); + nioSrvr.sendSystem(ses, new RecoveryLastReceivedMessage(recoveryDesc.received()), lsnr); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } } else { try { @@ -3508,9 +3533,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter log.debug("Send recovery acknowledgement on timeout [rmtNode=" + nodeId + ", rcvCnt=" + msg.received() + ']'); - nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); + try { + nioSrvr.sendSystem(((GridTcpNioCommunicationClient)client).session(), msg); - recovery.lastAcknowledged(msg.received()); + recovery.lastAcknowledged(msg.received()); + } + catch (IgniteCheckedException err) { + U.error(log, "Failed to send message: " + err, err); + } continue; } @@ -3566,9 +3596,14 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter ", lastAcked=" + recovery.lastAcknowledged() + ']'); } - nioSrvr.sendSystem(ses, msg); + try { + nioSrvr.sendSystem(ses, msg); - recovery.lastAcknowledged(msg.received()); + recovery.lastAcknowledged(msg.received()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send message: " + e, e); + } } } @@ -3944,7 +3979,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter } /** - * + * Updated handshake message. */ @SuppressWarnings("PublicInnerClass") public static class HandshakeMessage2 extends HandshakeMessage {
http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java index 58b91e4..d403784 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/impl/GridNioFilterChainSelfTest.java @@ -114,7 +114,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { proceedExceptionCaught(ses, ex); } - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) { sndEvt.compareAndSet(null, ses.<String>meta(MESSAGE_WRITE_META_NAME)); sndMsgObj.compareAndSet(null, msg); @@ -155,7 +155,7 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { chain.onSessionIdleTimeout(ses); chain.onSessionWriteTimeout(ses); assertNull(chain.onSessionClose(ses)); - assertNull(chain.onSessionWrite(ses, snd)); + assertNull(chain.onSessionWrite(ses, snd, true)); assertEquals("DCBA", connectedEvt.get()); assertEquals("DCBA", disconnectedEvt.get()); @@ -210,10 +210,10 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ - @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException { + @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg, boolean fut) throws IgniteCheckedException { chainMeta(ses, MESSAGE_WRITE_META_NAME); - return proceedSessionWrite(ses, msg); + return proceedSessionWrite(ses, msg, fut); } /** {@inheritDoc} */ @@ -349,6 +349,11 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { } /** {@inheritDoc} */ + @Override public void sendNoFuture(Object msg) { + // No-op. + } + + /** {@inheritDoc} */ @Override public GridNioFuture<Object> resumeReads() { return null; } @@ -387,5 +392,10 @@ public class GridNioFilterChainSelfTest extends GridCommonAbstractTest { @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() { return null; } + + /** {@inheritDoc} */ + @Override public void systemMessage(Object msg) { + // No-op. + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestAbstractBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestAbstractBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestAbstractBenchmark.java new file mode 100644 index 0000000..8791c83 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestAbstractBenchmark.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.io; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.yardstick.IgniteAbstractBenchmark; +import org.yardstickframework.BenchmarkConfiguration; +import org.yardstickframework.BenchmarkUtils; + +/** + * + */ +public abstract class IgniteIoTestAbstractBenchmark extends IgniteAbstractBenchmark { + /** */ + protected final List<ClusterNode> targetNodes = new ArrayList<>(); + + /** */ + protected IgniteKernal ignite; + + /** {@inheritDoc} */ + @Override public void setUp(BenchmarkConfiguration cfg) throws Exception { + super.setUp(cfg); + + ignite = (IgniteKernal)ignite(); + + ClusterNode loc = ignite().cluster().localNode(); + + Collection<ClusterNode> nodes = ignite().cluster().forServers().nodes(); + + for (ClusterNode node : nodes) { + if (!loc.equals(node)) + targetNodes.add(node); + } + + if (targetNodes.isEmpty()) + throw new IgniteException("Failed to find remote server nodes [nodes=" + nodes + ']'); + + BenchmarkUtils.println(cfg, "Initialized target nodes: " + F.nodeIds(targetNodes) + ']'); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendAllBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendAllBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendAllBenchmark.java new file mode 100644 index 0000000..9011910 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendAllBenchmark.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.io; + +import java.util.Map; + +/** + * + */ +public class IgniteIoTestSendAllBenchmark extends IgniteIoTestAbstractBenchmark { + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + ignite.sendIoTest(targetNodes, null, false).get(); + + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e97176b4/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendRandomBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendRandomBenchmark.java b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendRandomBenchmark.java new file mode 100644 index 0000000..88368e0 --- /dev/null +++ b/modules/yardstick/src/main/java/org/apache/ignite/yardstick/io/IgniteIoTestSendRandomBenchmark.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yardstick.io; + +import java.util.Map; +import org.apache.ignite.cluster.ClusterNode; + +/** + * + */ +public class IgniteIoTestSendRandomBenchmark extends IgniteIoTestAbstractBenchmark { + /** {@inheritDoc} */ + @Override public boolean test(Map<Object, Object> ctx) throws Exception { + ClusterNode node = targetNodes.get(nextRandom(targetNodes.size())); + + ignite.sendIoTest(node, null, false).get(); + + return true; + } +}
