Minor: moved inner TcpCommunicationSpi messages to top-level classes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e358ae24 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e358ae24 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e358ae24 Branch: refs/heads/ignite-zk Commit: e358ae241b334b6c03fc62ab0eee99f08eeb778a Parents: c576d5a Author: sboikov <sboi...@gridgain.com> Authored: Fri Dec 15 13:50:19 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Dec 15 13:50:19 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 8 +- .../spi/communication/tcp/HandshakeMessage.java | 156 ------------------ .../communication/tcp/HandshakeMessage2.java | 95 ----------- .../spi/communication/tcp/NodeIdMessage.java | 115 -------------- .../tcp/RecoveryLastReceivedMessage.java | 113 ------------- .../communication/tcp/TcpCommunicationSpi.java | 31 ++-- .../tcp/messages/HandshakeMessage.java | 157 +++++++++++++++++++ .../tcp/messages/HandshakeMessage2.java | 95 +++++++++++ .../tcp/messages/NodeIdMessage.java | 128 +++++++++++++++ .../messages/RecoveryLastReceivedMessage.java | 114 ++++++++++++++ .../tcp/messages/package-info.java | 22 +++ 11 files changed, 541 insertions(+), 493 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 791dd91..78cb7a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -153,11 +153,11 @@ import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.spi.collision.jobstealing.JobStealingRequest; -import org.apache.ignite.spi.communication.tcp.HandshakeMessage; -import org.apache.ignite.spi.communication.tcp.HandshakeMessage2; -import org.apache.ignite.spi.communication.tcp.NodeIdMessage; -import org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage; +import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2; +import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage; +import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; import org.jsr166.ConcurrentHashMap8; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java deleted file mode 100644 index 00e8e46..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.communication.tcp; - -import java.nio.ByteBuffer; -import java.util.UUID; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * Handshake message. - */ -public class HandshakeMessage implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** Message body size in bytes. */ - private static final int MESSAGE_SIZE = 32; - - /** Full message size (with message type) in bytes. */ - public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE; - - /** */ - private UUID nodeId; - - /** */ - private long rcvCnt; - - /** */ - private long connectCnt; - - /** - * Default constructor required by {@link Message}. - */ - public HandshakeMessage() { - // No-op. - } - - /** - * @param nodeId Node ID. - * @param connectCnt Connect count. - * @param rcvCnt Number of received messages. - */ - public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) { - assert nodeId != null; - assert rcvCnt >= 0 : rcvCnt; - - this.nodeId = nodeId; - this.connectCnt = connectCnt; - this.rcvCnt = rcvCnt; - } - - /** - * @return Connection index. - */ - public int connectionIndex() { - return 0; - } - - /** - * @return Connect count. - */ - public long connectCount() { - return connectCnt; - } - - /** - * @return Number of received messages. - */ - public long received() { - return rcvCnt; - } - - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - if (buf.remaining() < MESSAGE_FULL_SIZE) - return false; - - TcpCommunicationSpi.writeMessageType(buf, directType()); - - byte[] bytes = U.uuidToBytes(nodeId); - - assert bytes.length == 16 : bytes.length; - - buf.put(bytes); - - buf.putLong(rcvCnt); - - buf.putLong(connectCnt); - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - if (buf.remaining() < MESSAGE_SIZE) - return false; - - byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE]; - - buf.get(nodeIdBytes); - - nodeId = U.bytesToUuid(nodeIdBytes, 0); - - rcvCnt = buf.getLong(); - - connectCnt = buf.getLong(); - - return true; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return TcpCommunicationSpi.HANDSHAKE_MSG_TYPE; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HandshakeMessage.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java deleted file mode 100644 index 1e8fdd9..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/HandshakeMessage2.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.communication.tcp; - -import java.nio.ByteBuffer; -import java.util.UUID; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * Updated handshake message. - */ -public class HandshakeMessage2 extends HandshakeMessage { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private int connIdx; - - /** - * - */ - public HandshakeMessage2() { - // No-op. - } - - /** - * @param nodeId Node ID. - * @param connectCnt Connect count. - * @param rcvCnt Number of received messages. - * @param connIdx Connection index. - */ - HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) { - super(nodeId, connectCnt, rcvCnt); - - this.connIdx = connIdx; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return -44; - } - - /** {@inheritDoc} */ - @Override public int connectionIndex() { - return connIdx; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - if (!super.writeTo(buf, writer)) - return false; - - if (buf.remaining() < 4) - return false; - - buf.putInt(connIdx); - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - if (!super.readFrom(buf, reader)) - return false; - - if (buf.remaining() < 4) - return false; - - connIdx = buf.getInt(); - - return true; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(HandshakeMessage2.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java deleted file mode 100644 index d05b7ff..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/NodeIdMessage.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.communication.tcp; - -import java.nio.ByteBuffer; -import java.util.UUID; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * Node ID message. - */ -public class NodeIdMessage implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** Message body size (with message type) in bytes. */ - static final int MESSAGE_SIZE = 16; - - /** Full message size (with message type) in bytes. */ - public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE; - - /** */ - byte[] nodeIdBytes; - - /** */ - byte[] nodeIdBytesWithType; - - /** */ - public NodeIdMessage() { - // No-op. - } - - /** - * @param nodeId Node ID. - */ - NodeIdMessage(UUID nodeId) { - assert nodeId != null; - - nodeIdBytes = U.uuidToBytes(nodeId); - - assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + MESSAGE_SIZE; - - nodeIdBytesWithType = new byte[MESSAGE_FULL_SIZE]; - - nodeIdBytesWithType[0] = (byte)(TcpCommunicationSpi.NODE_ID_MSG_TYPE & 0xFF); - nodeIdBytesWithType[1] = (byte)((TcpCommunicationSpi.NODE_ID_MSG_TYPE >> 8) & 0xFF); - - System.arraycopy(nodeIdBytes, 0, nodeIdBytesWithType, 2, nodeIdBytes.length); - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - assert nodeIdBytes.length == MESSAGE_SIZE; - - if (buf.remaining() < MESSAGE_FULL_SIZE) - return false; - - TcpCommunicationSpi.writeMessageType(buf, directType()); - - buf.put(nodeIdBytes); - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - if (buf.remaining() < MESSAGE_SIZE) - return false; - - nodeIdBytes = new byte[MESSAGE_SIZE]; - - buf.get(nodeIdBytes); - - return true; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return TcpCommunicationSpi.NODE_ID_MSG_TYPE; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 0; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(NodeIdMessage.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java deleted file mode 100644 index 5460084..0000000 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/RecoveryLastReceivedMessage.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.spi.communication.tcp; - -import java.nio.ByteBuffer; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * Recovery acknowledgment message. - */ -public class RecoveryLastReceivedMessage implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - static final long ALREADY_CONNECTED = -1; - - /** */ - static final long NODE_STOPPING = -2; - - /** Need wait. */ - static final long NEED_WAIT = -3; - - /** Message body size in bytes. */ - private static final int MESSAGE_SIZE = 8; - - /** Full message size (with message type) in bytes. */ - public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE; - - /** */ - private long rcvCnt; - - /** - * Default constructor required by {@link Message}. - */ - public RecoveryLastReceivedMessage() { - // No-op. - } - - /** - * @param rcvCnt Number of received messages. - */ - public RecoveryLastReceivedMessage(long rcvCnt) { - this.rcvCnt = rcvCnt; - } - - /** - * @return Number of received messages. - */ - public long received() { - return rcvCnt; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - if (buf.remaining() < MESSAGE_FULL_SIZE) - return false; - - TcpCommunicationSpi.writeMessageType(buf, directType()); - - buf.putLong(rcvCnt); - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - if (buf.remaining() < MESSAGE_SIZE) - return false; - - rcvCnt = buf.getLong(); - - return true; - } - - /** {@inheritDoc} */ - @Override public short directType() { - return TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 0; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(RecoveryLastReceivedMessage.class, this); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java index 1f0061f..69da9ca 100755 --- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java @@ -132,6 +132,10 @@ import org.apache.ignite.spi.IgniteSpiThread; import org.apache.ignite.spi.IgniteSpiTimeoutObject; import org.apache.ignite.spi.communication.CommunicationListener; import org.apache.ignite.spi.communication.CommunicationSpi; +import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage; +import org.apache.ignite.spi.communication.tcp.messages.HandshakeMessage2; +import org.apache.ignite.spi.communication.tcp.messages.NodeIdMessage; +import org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.thread.IgniteThread; @@ -141,9 +145,9 @@ import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META; -import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.ALREADY_CONNECTED; -import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.NEED_WAIT; -import static org.apache.ignite.spi.communication.tcp.RecoveryLastReceivedMessage.NODE_STOPPING; +import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED; +import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NEED_WAIT; +import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.NODE_STOPPING; /** * <tt>TcpCommunicationSpi</tt> is default communication SPI which uses @@ -456,7 +460,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati ConnectionKey connKey; if (msg instanceof NodeIdMessage) { - sndId = U.bytesToUuid(((NodeIdMessage) msg).nodeIdBytes, 0); + sndId = U.bytesToUuid(((NodeIdMessage)msg).nodeIdBytes(), 0); connKey = new ConnectionKey(sndId, 0, -1); } else { @@ -3550,10 +3554,10 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati if (isSslEnabled()) { assert sslHnd != null; - ch.write(sslHnd.encrypt(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType))); + ch.write(sslHnd.encrypt(ByteBuffer.wrap(NodeIdMessage.nodeIdBytesWithType(safeLocalNodeId())))); } else - ch.write(ByteBuffer.wrap(nodeIdMessage().nodeIdBytesWithType)); + ch.write(ByteBuffer.wrap(NodeIdMessage.nodeIdBytesWithType(safeLocalNodeId()))); } if (recovery != null) { @@ -3806,20 +3810,27 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @return Node ID message. */ private NodeIdMessage nodeIdMessage() { + return new NodeIdMessage(safeLocalNodeId()); + } + + /** + * @return Local node ID. + */ + private UUID safeLocalNodeId() { ClusterNode locNode = getLocalNode(); UUID id; if (locNode == null) { U.warn(log, "Local node is not started or fully initialized [isStopping=" + - getSpiContext().isStopping() + ']'); + getSpiContext().isStopping() + ']'); id = new UUID(0, 0); } else id = locNode.id(); - return new NodeIdMessage(id); + return id; } /** {@inheritDoc} */ @@ -3923,7 +3934,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati * @param buf Byte buffer. * @param type Message type. */ - static void writeMessageType(ByteBuffer buf, short type) { + public static void writeMessageType(ByteBuffer buf, short type) { buf.put((byte)(type & 0xFF)); buf.put((byte)((type >> 8) & 0xFF)); } @@ -4426,7 +4437,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati out.write(U.IGNITE_HEADER); writeMessageType(out, NODE_ID_MSG_TYPE); - out.write(msg.nodeIdBytes); + out.write(msg.nodeIdBytes()); out.flush(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java new file mode 100644 index 0000000..e3be9c9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp.messages; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; + +/** + * Handshake message. + */ +public class HandshakeMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Message body size in bytes. */ + private static final int MESSAGE_SIZE = 32; + + /** Full message size (with message type) in bytes. */ + public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE; + + /** */ + private UUID nodeId; + + /** */ + private long rcvCnt; + + /** */ + private long connectCnt; + + /** + * Default constructor required by {@link Message}. + */ + public HandshakeMessage() { + // No-op. + } + + /** + * @param nodeId Node ID. + * @param connectCnt Connect count. + * @param rcvCnt Number of received messages. + */ + public HandshakeMessage(UUID nodeId, long connectCnt, long rcvCnt) { + assert nodeId != null; + assert rcvCnt >= 0 : rcvCnt; + + this.nodeId = nodeId; + this.connectCnt = connectCnt; + this.rcvCnt = rcvCnt; + } + + /** + * @return Connection index. + */ + public int connectionIndex() { + return 0; + } + + /** + * @return Connect count. + */ + public long connectCount() { + return connectCnt; + } + + /** + * @return Number of received messages. + */ + public long received() { + return rcvCnt; + } + + /** + * @return Node ID. + */ + public UUID nodeId() { + return nodeId; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + if (buf.remaining() < MESSAGE_FULL_SIZE) + return false; + + TcpCommunicationSpi.writeMessageType(buf, directType()); + + byte[] bytes = U.uuidToBytes(nodeId); + + assert bytes.length == 16 : bytes.length; + + buf.put(bytes); + + buf.putLong(rcvCnt); + + buf.putLong(connectCnt); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + if (buf.remaining() < MESSAGE_SIZE) + return false; + + byte[] nodeIdBytes = new byte[NodeIdMessage.MESSAGE_SIZE]; + + buf.get(nodeIdBytes); + + nodeId = U.bytesToUuid(nodeIdBytes, 0); + + rcvCnt = buf.getLong(); + + connectCnt = buf.getLong(); + + return true; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TcpCommunicationSpi.HANDSHAKE_MSG_TYPE; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HandshakeMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java new file mode 100644 index 0000000..f27a825 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/HandshakeMessage2.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp.messages; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Updated handshake message. + */ +public class HandshakeMessage2 extends HandshakeMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private int connIdx; + + /** + * + */ + public HandshakeMessage2() { + // No-op. + } + + /** + * @param nodeId Node ID. + * @param connectCnt Connect count. + * @param rcvCnt Number of received messages. + * @param connIdx Connection index. + */ + public HandshakeMessage2(UUID nodeId, long connectCnt, long rcvCnt, int connIdx) { + super(nodeId, connectCnt, rcvCnt); + + this.connIdx = connIdx; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return -44; + } + + /** {@inheritDoc} */ + @Override public int connectionIndex() { + return connIdx; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + if (!super.writeTo(buf, writer)) + return false; + + if (buf.remaining() < 4) + return false; + + buf.putInt(connIdx); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + if (!super.readFrom(buf, reader)) + return false; + + if (buf.remaining() < 4) + return false; + + connIdx = buf.getInt(); + + return true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HandshakeMessage2.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java new file mode 100644 index 0000000..2c6aa30 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/NodeIdMessage.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp.messages; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; + +/** + * Node ID message. + */ +public class NodeIdMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** Message body size (with message type) in bytes. */ + static final int MESSAGE_SIZE = 16; + + /** Full message size (with message type) in bytes. */ + public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE; + + /** */ + private byte[] nodeIdBytes; + + /** */ + public NodeIdMessage() { + // No-op. + } + + /** + * @param nodeId Node ID. + */ + public NodeIdMessage(UUID nodeId) { + assert nodeId != null; + + nodeIdBytes = U.uuidToBytes(nodeId); + + assert nodeIdBytes.length == MESSAGE_SIZE : "Node ID size must be " + MESSAGE_SIZE; + } + + /** + * @return Node ID bytes. + */ + public byte[] nodeIdBytes() { + return nodeIdBytes; + } + + /** + * @param nodeId Node ID. + * @return Marshalled node ID bytes with direct message type. + */ + public static byte[] nodeIdBytesWithType(UUID nodeId) { + byte[] nodeIdBytesWithType = new byte[MESSAGE_FULL_SIZE]; + + nodeIdBytesWithType[0] = (byte)(TcpCommunicationSpi.NODE_ID_MSG_TYPE & 0xFF); + nodeIdBytesWithType[1] = (byte)((TcpCommunicationSpi.NODE_ID_MSG_TYPE >> 8) & 0xFF); + + U.uuidToBytes(nodeId, nodeIdBytesWithType, 2); + + return nodeIdBytesWithType; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + assert nodeIdBytes.length == MESSAGE_SIZE; + + if (buf.remaining() < MESSAGE_FULL_SIZE) + return false; + + TcpCommunicationSpi.writeMessageType(buf, directType()); + + buf.put(nodeIdBytes); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + if (buf.remaining() < MESSAGE_SIZE) + return false; + + nodeIdBytes = new byte[MESSAGE_SIZE]; + + buf.get(nodeIdBytes); + + return true; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TcpCommunicationSpi.NODE_ID_MSG_TYPE; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(NodeIdMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java new file mode 100644 index 0000000..eef2655 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/RecoveryLastReceivedMessage.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.spi.communication.tcp.messages; + +import java.nio.ByteBuffer; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; + +/** + * Recovery acknowledgment message. + */ +public class RecoveryLastReceivedMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + public static final long ALREADY_CONNECTED = -1; + + /** */ + public static final long NODE_STOPPING = -2; + + /** Need wait. */ + public static final long NEED_WAIT = -3; + + /** Message body size in bytes. */ + private static final int MESSAGE_SIZE = 8; + + /** Full message size (with message type) in bytes. */ + public static final int MESSAGE_FULL_SIZE = MESSAGE_SIZE + DIRECT_TYPE_SIZE; + + /** */ + private long rcvCnt; + + /** + * Default constructor required by {@link Message}. + */ + public RecoveryLastReceivedMessage() { + // No-op. + } + + /** + * @param rcvCnt Number of received messages. + */ + public RecoveryLastReceivedMessage(long rcvCnt) { + this.rcvCnt = rcvCnt; + } + + /** + * @return Number of received messages. + */ + public long received() { + return rcvCnt; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + if (buf.remaining() < MESSAGE_FULL_SIZE) + return false; + + TcpCommunicationSpi.writeMessageType(buf, directType()); + + buf.putLong(rcvCnt); + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + if (buf.remaining() < MESSAGE_SIZE) + return false; + + rcvCnt = buf.getLong(); + + return true; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return TcpCommunicationSpi.RECOVERY_LAST_ID_MSG_TYPE; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(RecoveryLastReceivedMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e358ae24/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/package-info.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/package-info.java new file mode 100644 index 0000000..662dd26 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/messages/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * <!-- Package description. --> + * Contains implementation messages. + */ +package org.apache.ignite.spi.communication.tcp.messages; \ No newline at end of file