This is an automated email from the ASF dual-hosted git repository.
anton-vinogradov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new a54b9f453ea IGNITE-28721 Remove writeTo/readFrom methods from Message
(#13177)
a54b9f453ea is described below
commit a54b9f453eaf52b52df09c650f8bb62264f0232c
Author: Dmitry Werner <[email protected]>
AuthorDate: Tue May 26 21:26:03 2026 +0500
IGNITE-28721 Remove writeTo/readFrom methods from Message (#13177)
---
.../apache/ignite/internal/MessageProcessor.java | 4 +-
.../internal/MessageSerializerGenerator.java | 8 +-
.../ignite/internal/CoreMessagesProvider.java | 3 +-
.../managers/communication/CompressedMessage.java | 125 ++-----------------
.../communication/CompressedMessageSerializer.java | 133 +++++++++++++++++++++
.../communication/IgniteMessageFactoryImpl.java | 34 ------
.../plugin/extensions/communication/Message.java | 27 -----
.../extensions/communication/MessageFactory.java | 10 +-
.../tcp/internal/GridNioServerWrapper.java | 4 -
.../DuplicateDirectTypeIdMessage.java | 37 ++++++
.../GridCommunicationSendMessageSelfTest.java | 71 ++---------
.../communication/GridIoManagerSelfTest.java | 14 ---
.../IgniteMessageFactoryImplTest.java | 85 ++-----------
.../MessageDirectTypeIdConflictTest.java | 32 +----
.../managers/communication/TestMessage1.java | 36 ++++++
.../managers/communication/TestMessage2.java | 36 ++++++
.../managers/communication/TestMessage42.java | 36 ++++++
.../communication/TestOverByteIdMessage.java | 36 ++++++
.../communication/TestValidByteIdMessage.java | 36 ++++++
.../GridCacheConditionalDeploymentSelfTest.java | 52 +-------
.../processors/cache/TestCacheMessage.java | 26 ++++
.../loadtests/communication/GridTestMessage.java | 14 ---
.../CommunicationConnectionPoolMetricsTest.java | 87 ++------------
.../spi/communication/tcp/TestDelayMessage.java | 62 ++++++++++
24 files changed, 493 insertions(+), 515 deletions(-)
diff --git
a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java
b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java
index 6659d3e2195..0a9c0fd58ab 100644
---
a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java
+++
b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageProcessor.java
@@ -70,7 +70,7 @@ public class MessageProcessor extends AbstractProcessor {
static final String MESSAGE_INTERFACE =
"org.apache.ignite.plugin.extensions.communication.Message";
/** Compressed message. */
- static final String COMPRESSED_MESSAGE_INTERFACE =
"org.apache.ignite.internal.managers.communication.CompressedMessage";
+ static final String COMPRESSED_MESSAGE_CLASS =
"org.apache.ignite.internal.managers.communication.CompressedMessage";
/** Externalizable message. */
static final String MARSHALLABLE_MESSAGE_INTERFACE =
"org.apache.ignite.internal.MarshallableMessage";
@@ -85,7 +85,7 @@ public class MessageProcessor extends AbstractProcessor {
/** Messages with no fields. A serializer generation intentionally
skipped. */
static final String[] SKIP_MESSAGES = {
"org.apache.ignite.internal.processors.odbc.ClientMessage",
- "org.apache.ignite.internal.managers.communication.CompressedMessage",
+ COMPRESSED_MESSAGE_CLASS,
"org.apache.ignite.loadtests.communication.GridTestMessage"
};
diff --git
a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
index 7874b5dcd8c..6ee0585699b 100644
---
a/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
+++
b/modules/codegen/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
@@ -55,7 +55,7 @@ import
org.apache.ignite.internal.systemview.SystemViewRowAttributeWalkerProcess
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.jetbrains.annotations.Nullable;
-import static
org.apache.ignite.internal.MessageProcessor.COMPRESSED_MESSAGE_INTERFACE;
+import static
org.apache.ignite.internal.MessageProcessor.COMPRESSED_MESSAGE_CLASS;
import static
org.apache.ignite.internal.MessageProcessor.MARSHALLABLE_MESSAGE_INTERFACE;
import static org.apache.ignite.internal.MessageProcessor.MESSAGE_INTERFACE;
@@ -460,7 +460,7 @@ public class MessageSerializerGenerator {
returnFalseIfWriteFailed(write, field,
"writer.writeGridLongList", getExpr);
else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
- if (sameType(type, COMPRESSED_MESSAGE_INTERFACE))
+ if (sameType(type, COMPRESSED_MESSAGE_CLASS))
throw new IllegalArgumentException(COMPRESSED_MSG_ERROR);
if (compress)
@@ -686,7 +686,7 @@ public class MessageSerializerGenerator {
returnFalseIfReadFailed(field, "reader.readGridLongList");
else if (assignableFrom(type, type(MESSAGE_INTERFACE))) {
- if (sameType(type, COMPRESSED_MESSAGE_INTERFACE))
+ if (sameType(type, COMPRESSED_MESSAGE_CLASS))
throw new IllegalArgumentException(COMPRESSED_MSG_ERROR);
if (compress)
@@ -844,7 +844,7 @@ public class MessageSerializerGenerator {
if (primitiveType != null)
return primitiveType.getKind().toString();
- if (sameType(type, COMPRESSED_MESSAGE_INTERFACE))
+ if (sameType(type, COMPRESSED_MESSAGE_CLASS))
throw new IllegalArgumentException(COMPRESSED_MSG_ERROR);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
index b3666694a15..f4076df94f6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/CoreMessagesProvider.java
@@ -343,8 +343,7 @@ public class CoreMessagesProvider extends
AbstractMarshallableMessageFactoryProv
// [5000 - 5500]: Utility messages. Most of them originally come from
Discovery.
msgIdx = 5000;
- // We don't use the code‑generated serializer for CompressedMessage -
serialization is highly customized.
- factory.register(msgIdx++, CompressedMessage::new);
+ withNoSchema(CompressedMessage.class);
withNoSchemaResolvedClassLoader(ErrorMessage.class);
withNoSchema(InetSocketAddressMessage.class);
withNoSchema(InetAddressMessage.class);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java
index c7b530a217f..e3e46c43ff3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessage.java
@@ -28,8 +28,6 @@ import java.util.zip.InflaterInputStream;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* Internal message used when transmitting fields annotated with @Compress
over the network.
@@ -41,22 +39,22 @@ public class CompressedMessage implements Message {
static final int CHUNK_SIZE = 10 * 1024;
/** Reader buffer capacity. */
- private static final int BUFFER_CAPACITY = 10 * CHUNK_SIZE;
+ static final int BUFFER_CAPACITY = 10 * CHUNK_SIZE;
/** Temporary buffer for compressed data received over the network. */
- private ByteBuffer tmpBuf;
+ ByteBuffer tmpBuf;
/** Raw data size. */
- private int dataSize;
+ int dataSize;
/** Chunked byte reader. */
- private ChunkedByteReader chunkedReader;
+ ChunkedByteReader chunkedReader;
/** Chunk. */
- private byte[] chunk;
+ byte[] chunk;
/** Flag indicating whether this is the last chunk. */
- private boolean finalChunk;
+ boolean finalChunk;
/** Compression level. */
private int compressionLvl;
@@ -90,114 +88,9 @@ public class CompressedMessage implements Message {
return uncompress();
}
- /** {@inheritDoc} */
- @SuppressWarnings("deprecation")
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- while (true) {
- if (chunk == null && chunkedReader != null) {
- chunk = chunkedReader.nextChunk();
-
- finalChunk = (chunk == null);
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeInt(dataSize))
- return false;
-
- writer.incrementState();
-
- if (dataSize == 0)
- return true;
-
- case 1:
- if (!writer.writeBoolean(finalChunk))
- return false;
-
- writer.incrementState();
-
- if (finalChunk)
- return true;
-
- case 2:
- if (!writer.writeByteArray(chunk))
- return false;
-
- chunk = null;
-
- writer.decrementState();
- }
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("deprecation")
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (tmpBuf == null)
- tmpBuf = ByteBuffer.allocateDirect(BUFFER_CAPACITY);
-
- assert chunk == null : chunk;
-
- while (true) {
- switch (reader.state()) {
- case 0:
- dataSize = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- if (dataSize == 0)
- return true;
-
- reader.incrementState();
-
- case 1:
- finalChunk = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- if (finalChunk)
- return true;
-
- reader.incrementState();
-
- case 2:
- chunk = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- if (chunk != null) {
- if (tmpBuf.remaining() <= CHUNK_SIZE) {
- ByteBuffer newTmpBuf =
ByteBuffer.allocateDirect(tmpBuf.capacity() * 2);
-
- tmpBuf.flip();
-
- newTmpBuf.put(tmpBuf);
-
- tmpBuf = newTmpBuf;
- }
-
- tmpBuf.put(chunk);
-
- reader.decrementState();
-
- chunk = null;
- }
- }
- }
+ /** @return Next chunk of data or null. */
+ public byte[] nextChunk() {
+ return chunkedReader.nextChunk();
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessageSerializer.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessageSerializer.java
new file mode 100644
index 00000000000..594aab336e0
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CompressedMessageSerializer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import static
org.apache.ignite.internal.managers.communication.CompressedMessage.BUFFER_CAPACITY;
+import static
org.apache.ignite.internal.managers.communication.CompressedMessage.CHUNK_SIZE;
+
+/** Message serializer for compressed message. */
+public class CompressedMessageSerializer implements
MessageSerializer<CompressedMessage> {
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(CompressedMessage msg, MessageWriter
writer) {
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(msg.directType()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ while (true) {
+ if (msg.chunk == null && msg.chunkedReader != null) {
+ msg.chunk = msg.nextChunk();
+
+ msg.finalChunk = (msg.chunk == null);
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeInt(msg.dataSize))
+ return false;
+
+ writer.incrementState();
+
+ if (msg.dataSize == 0)
+ return true;
+
+ case 1:
+ if (!writer.writeBoolean(msg.finalChunk))
+ return false;
+
+ writer.incrementState();
+
+ if (msg.finalChunk)
+ return true;
+
+ case 2:
+ if (!writer.writeByteArray(msg.chunk))
+ return false;
+
+ msg.chunk = null;
+
+ writer.decrementState();
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(CompressedMessage msg, MessageReader
reader) {
+ if (msg.tmpBuf == null)
+ msg.tmpBuf = ByteBuffer.allocateDirect(BUFFER_CAPACITY);
+
+ assert msg.chunk == null : msg.chunk;
+
+ while (true) {
+ switch (reader.state()) {
+ case 0:
+ msg.dataSize = reader.readInt();
+
+ if (!reader.isLastRead())
+ return false;
+
+ if (msg.dataSize == 0)
+ return true;
+
+ reader.incrementState();
+
+ case 1:
+ msg.finalChunk = reader.readBoolean();
+
+ if (!reader.isLastRead())
+ return false;
+
+ if (msg.finalChunk)
+ return true;
+
+ reader.incrementState();
+
+ case 2:
+ msg.chunk = reader.readByteArray();
+
+ if (!reader.isLastRead())
+ return false;
+
+ if (msg.chunk != null) {
+ if (msg.tmpBuf.remaining() <= CHUNK_SIZE) {
+ ByteBuffer newTmpBuf =
ByteBuffer.allocateDirect(msg.tmpBuf.capacity() * 2);
+
+ msg.tmpBuf.flip();
+
+ newTmpBuf.put(msg.tmpBuf);
+
+ msg.tmpBuf = newTmpBuf;
+ }
+
+ msg.tmpBuf.put(msg.chunk);
+
+ reader.decrementState();
+
+ msg.chunk = null;
+ }
+ }
+ }
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
index 5388cefc2c1..6c1d45ef164 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImpl.java
@@ -18,18 +18,12 @@
package org.apache.ignite.internal.managers.communication;
import java.lang.reflect.Array;
-import java.nio.ByteBuffer;
import java.util.function.Supplier;
-
import org.apache.ignite.IgniteException;
-import org.apache.ignite.internal.direct.DirectMessageReader;
-import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageSerializer;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -42,29 +36,6 @@ public class IgniteMessageFactoryImpl implements
MessageFactory {
/** Array size. */
private static final int ARR_SIZE = 1 << Short.SIZE;
- /** Delegate serialization to {@code Message} methods. */
- private static final MessageSerializer DEFAULT_SERIALIZER = new
MessageSerializer() {
- /** {@inheritDoc} */
- @Override public boolean writeTo(Message msg, MessageWriter writer) {
- ByteBuffer buf = null;
-
- if (writer instanceof DirectMessageWriter)
- buf = ((DirectMessageWriter)writer).getBuffer();
-
- return msg.writeTo(buf, writer);
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(Message msg, MessageReader reader) {
- ByteBuffer buf = null;
-
- if (reader instanceof DirectMessageReader)
- buf = ((DirectMessageReader)reader).getBuffer();
-
- return msg.readFrom(buf, reader);
- }
- };
-
/** Message suppliers. */
private final Supplier<Message>[] msgSuppliers =
(Supplier<Message>[])Array.newInstance(Supplier.class, ARR_SIZE);
@@ -131,11 +102,6 @@ public class IgniteMessageFactoryImpl implements
MessageFactory {
throw new IgniteException("Message factory is already registered
for direct type: " + directType);
}
- /** {@inheritDoc} */
- @Override public void register(short directType, Supplier<Message>
supplier) throws IgniteException {
- register(directType, supplier, DEFAULT_SERIALIZER);
- }
-
/**
* Creates new message instance of provided direct type.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java
index 5dae2d9afd3..1e4abc845c5 100644
---
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java
+++
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/Message.java
@@ -17,7 +17,6 @@
package org.apache.ignite.plugin.extensions.communication;
-import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.ignite.IgniteException;
@@ -33,32 +32,6 @@ public interface Message {
/** Registry of message class to direct type mappings, populated during
factory initialization. */
Map<Class<?>, Short> REGISTRATIONS = new ConcurrentHashMap<>();
- /**
- * Writes this message to provided byte buffer.
- *
- * @param buf Byte buffer.
- * @param writer Writer.
- * @return Whether message was fully written.
- * @deprecated Use the code-generated {@code MessageSerializer} instead.
- */
- @Deprecated
- default boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Reads this message from provided byte buffer.
- *
- * @param buf Byte buffer.
- * @param reader Reader.
- * @return Whether message was fully read.
- * @deprecated Use the code-generated {@code MessageSerializer} instead.
- */
- @Deprecated
- default boolean readFrom(ByteBuffer buf, MessageReader reader) {
- throw new UnsupportedOperationException();
- }
-
/**
* Gets message type.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
index 4098504ee56..53366108bfc 100644
---
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFactory.java
@@ -21,7 +21,7 @@ import java.util.function.Supplier;
import org.apache.ignite.IgniteException;
/**
- * Message factory for all communication messages registered using {@link
#register(short, Supplier)} method call.
+ * Message factory for all communication messages registered using {@link
#register(short, Supplier, MessageSerializer)} method call.
*/
public interface MessageFactory {
/**
@@ -34,8 +34,12 @@ public interface MessageFactory {
* @throws IgniteException In case of attempt to register message with
direct type which is already registered.
* @throws IllegalStateException On any invocation of this method when
class which implements this interface
* is alredy constructed.
+ * @deprecated Use {@link #register(short, Supplier, MessageSerializer)}
instead.
*/
- public void register(short directType, Supplier<Message> supplier) throws
IgniteException;
+ @Deprecated(forRemoval = true)
+ default void register(short directType, Supplier<Message> supplier) throws
IgniteException {
+ throw new UnsupportedOperationException();
+ }
/**
* Register message factory with given direct type. All messages must be
registered during construction
@@ -47,7 +51,9 @@ public interface MessageFactory {
* @throws IgniteException In case of attempt to register message with
direct type which is already registered.
* @throws IllegalStateException On any invocation of this method when
class which implements this interface
* is alredy constructed.
+ * @deprecated Use {@link #register(int, Supplier, MessageSerializer)}
instead.
*/
+ @Deprecated(forRemoval = true)
default void register(int directType, Supplier<Message> supplier) throws
IgniteException {
register((short)directType, supplier);
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
index 99d310b5c03..de9a209546f 100644
---
a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
+++
b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/internal/GridNioServerWrapper.java
@@ -814,10 +814,6 @@ public class GridNioServerWrapper {
MessageFactory msgFactory = new MessageFactory() {
private MessageFactory impl;
- @Override public void register(short directType,
Supplier<Message> supplier) throws IgniteException {
- get().register(directType, supplier);
- }
-
@Override public void register(short directType,
Supplier<Message> supplier,
MessageSerializer serializer) throws IgniteException {
get().register(directType, supplier, serializer);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/DuplicateDirectTypeIdMessage.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/DuplicateDirectTypeIdMessage.java
new file mode 100644
index 00000000000..d293affedcc
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/DuplicateDirectTypeIdMessage.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.internal.CoreMessagesProvider;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Test message with already registered direct type. */
+public class DuplicateDirectTypeIdMessage implements Message {
+ /** Message direct type. Message with this direct type will be registered
by {@link CoreMessagesProvider} first. */
+ static final short DIRECT_TYPE = CoreMessagesProvider.HANDSHAKE_MSG_TYPE;
+
+ /** */
+ @Order(0)
+ int val;
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return DIRECT_TYPE;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
index c813901fd5f..90d9027ad7b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCommunicationSendMessageSelfTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.managers.communication;
-import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -27,8 +26,6 @@ import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -36,6 +33,8 @@ import org.junit.Test;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.managers.communication.TestOverByteIdMessage.DIRECT_TYPE_OVER_BYTE;
+import static
org.apache.ignite.internal.managers.communication.TestValidByteIdMessage.DIRECT_TYPE;
/**
* Send message test.
@@ -44,12 +43,6 @@ public class GridCommunicationSendMessageSelfTest extends
GridCommonAbstractTest
/** Sample count. */
private static final int SAMPLE_CNT = 1;
- /** */
- private static final short DIRECT_TYPE = -127;
-
- /** */
- private static final short DIRECT_TYPE_OVER_BYTE = 1000;
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration c = super.getConfiguration(igniteInstanceName);
@@ -71,7 +64,7 @@ public class GridCommunicationSendMessageSelfTest extends
GridCommonAbstractTest
try {
startGridsMultiThreaded(2);
- doSend(new TestMessage(), TestMessage.class);
+ doSend(new TestValidByteIdMessage());
}
finally {
stopAllGrids();
@@ -86,7 +79,7 @@ public class GridCommunicationSendMessageSelfTest extends
GridCommonAbstractTest
try {
startGridsMultiThreaded(2);
- doSend(new TestOverByteIdMessage(), TestOverByteIdMessage.class);
+ doSend(new TestOverByteIdMessage());
}
finally {
stopAllGrids();
@@ -101,7 +94,7 @@ public class GridCommunicationSendMessageSelfTest extends
GridCommonAbstractTest
try {
startGridsMultiThreaded(2);
- doSend(new TestMessage(), TestMessage.class);
+ doSend(new TestValidByteIdMessage());
}
finally {
stopAllGrids();
@@ -110,15 +103,15 @@ public class GridCommunicationSendMessageSelfTest extends
GridCommonAbstractTest
/**
* @param msg Message to send.
- * @param msgCls Message class to check the received message.
*
* @throws Exception If failed.
*/
- private void doSend(Message msg, final Class<?> msgCls) throws Exception {
+ private void doSend(Message msg) throws Exception {
GridIoManager mgr0 = grid(0).context().io();
GridIoManager mgr1 = grid(1).context().io();
String topic = "test-topic";
+ Class<?> msgCls = msg.getClass();
final CountDownLatch latch = new CountDownLatch(SAMPLE_CNT);
@@ -147,52 +140,6 @@ public class GridCommunicationSendMessageSelfTest extends
GridCommonAbstractTest
info(">>>");
}
- /** */
- private static class TestMessage implements Message {
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer)
{
- writer.setBuffer(buf);
-
- if (!writer.writeHeader(directType()))
- return false;
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader
reader) {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return DIRECT_TYPE;
- }
- }
-
- /** */
- private static class TestOverByteIdMessage implements Message {
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer)
{
- writer.setBuffer(buf);
-
- if (!writer.writeHeader(directType()))
- return false;
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader
reader) {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return DIRECT_TYPE_OVER_BYTE;
- }
- }
-
/** */
public static class TestPluginProvider extends AbstractTestPluginProvider {
/** {@inheritDoc} */
@@ -204,8 +151,8 @@ public class GridCommunicationSendMessageSelfTest extends
GridCommonAbstractTest
@Override public void initExtensions(PluginContext ctx,
ExtensionRegistry registry) {
registry.registerExtension(MessageFactoryProvider.class, new
MessageFactoryProvider() {
@Override public void registerAll(MessageFactory factory) {
- factory.register(DIRECT_TYPE, TestMessage::new);
- factory.register(DIRECT_TYPE_OVER_BYTE,
TestOverByteIdMessage::new);
+ factory.register(DIRECT_TYPE, TestValidByteIdMessage::new,
new TestValidByteIdMessageSerializer());
+ factory.register(DIRECT_TYPE_OVER_BYTE,
TestOverByteIdMessage::new, new TestOverByteIdMessageSerializer());
}
});
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index 593d2b305a8..45c72def558 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.managers.communication;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -29,8 +28,6 @@ import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
@@ -227,17 +224,6 @@ public class GridIoManagerSelfTest extends
GridCommonAbstractTest {
/** */
private static class TestMessage implements Message {
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer)
{
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader
reader) {
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 0;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
index c34eb96cf4d..838474c82c7 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/IgniteMessageFactoryImplTest.java
@@ -17,17 +17,16 @@
package org.apache.ignite.internal.managers.communication;
-import java.nio.ByteBuffer;
-
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.CoreMessagesProvider;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.junit.Test;
+import static
org.apache.ignite.internal.managers.communication.TestMessage1.TEST_MSG_1_TYPE;
+import static
org.apache.ignite.internal.managers.communication.TestMessage2.TEST_MSG_2_TYPE;
+import static
org.apache.ignite.internal.managers.communication.TestMessage42.TEST_MSG_42_TYPE;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertTrue;
@@ -35,15 +34,6 @@ import static org.junit.Assert.assertTrue;
* Tests for default implementation of {@link CoreMessagesProvider} interface.
*/
public class IgniteMessageFactoryImplTest {
- /** Test message 1 type. */
- private static final short TEST_MSG_1_TYPE = 1;
-
- /** Test message 2 type. */
- private static final short TEST_MSG_2_TYPE = 2;
-
- /** Test message 42 type. */
- private static final short TEST_MSG_42_TYPE = 42;
-
/** Unknown message type. */
private static final short UNKNOWN_MSG_TYPE = 0;
@@ -56,7 +46,7 @@ public class IgniteMessageFactoryImplTest {
MessageFactory msgFactory = new IgniteMessageFactoryImpl(factories);
- msgFactory.register((short)0, () -> null);
+ msgFactory.register((short)0, () -> null, null);
}
/**
@@ -81,7 +71,7 @@ public class IgniteMessageFactoryImplTest {
short[] directTypes = msgFactory.registeredDirectTypes();
- assertArrayEquals(directTypes, new short[] {TEST_MSG_1_TYPE,
TEST_MSG_2_TYPE, TEST_MSG_42_TYPE});
+ assertArrayEquals(new short[] {TEST_MSG_1_TYPE, TEST_MSG_2_TYPE,
TEST_MSG_42_TYPE}, directTypes);
}
/**
@@ -117,8 +107,8 @@ public class IgniteMessageFactoryImplTest {
private static class TestMessageFactoryPovider implements
MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
- factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
- factory.register(TEST_MSG_42_TYPE, TestMessage42::new);
+ factory.register(TEST_MSG_1_TYPE, TestMessage1::new, new
TestMessage1Serializer());
+ factory.register(TEST_MSG_42_TYPE, TestMessage42::new, new
TestMessage42Serializer());
}
}
@@ -128,7 +118,7 @@ public class IgniteMessageFactoryImplTest {
private static class TestMessageFactoryPoviderWithTheSameDirectType
implements MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
- factory.register(TEST_MSG_1_TYPE, TestMessage1::new);
+ factory.register(TEST_MSG_1_TYPE, TestMessage1::new, new
TestMessage1Serializer());
}
}
@@ -138,64 +128,7 @@ public class IgniteMessageFactoryImplTest {
private static class TestMessageFactory implements MessageFactoryProvider {
/** {@inheritDoc} */
@Override public void registerAll(MessageFactory factory) {
- factory.register(TEST_MSG_2_TYPE, TestMessage2::new);
- }
- }
-
- /** Test message. */
- private static class TestMessage1 implements Message {
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer)
{
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader
reader) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return TEST_MSG_1_TYPE;
- }
-
- }
-
- /** Test message. */
- private static class TestMessage2 implements Message {
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer)
{
- return false;
+ factory.register(TEST_MSG_2_TYPE, TestMessage2::new, new
TestMessage2Serializer());
}
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader
reader) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return TEST_MSG_2_TYPE;
- }
-
- }
-
- /** Test message. */
- private static class TestMessage42 implements Message {
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer)
{
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader
reader) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return TEST_MSG_42_TYPE;
- }
-
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
index a9df4dbc686..f540a08293d 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/MessageDirectTypeIdConflictTest.java
@@ -17,22 +17,18 @@
package org.apache.ignite.internal.managers.communication;
-import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.CoreMessagesProvider;
import org.apache.ignite.plugin.AbstractTestPluginProvider;
import org.apache.ignite.plugin.ExtensionRegistry;
import org.apache.ignite.plugin.PluginContext;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
+import static
org.apache.ignite.internal.managers.communication.DuplicateDirectTypeIdMessage.DIRECT_TYPE;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
/**
@@ -40,9 +36,6 @@ import static
org.apache.ignite.testframework.GridTestUtils.assertThrows;
* for which message factory is already registered.
*/
public class MessageDirectTypeIdConflictTest extends GridCommonAbstractTest {
- /** Message direct type. Message with this direct type will be registered
by {@link CoreMessagesProvider} first. */
- private static final short MSG_DIRECT_TYPE =
CoreMessagesProvider.HANDSHAKE_MSG_TYPE;
-
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -74,7 +67,7 @@ public class MessageDirectTypeIdConflictTest extends
GridCommonAbstractTest {
@SuppressWarnings({"RedundantThrows", "ThrowableNotThrown"})
public void testRegisterMessageFactoryWithConflictDirectTypeId() throws
Exception {
assertThrows(log, (Callable<Object>)this::startGrid,
IgniteCheckedException.class,
- "Message factory is already registered for direct type: " +
MSG_DIRECT_TYPE);
+ "Message factory is already registered for direct type: " +
DIRECT_TYPE);
}
/** */
@@ -88,28 +81,9 @@ public class MessageDirectTypeIdConflictTest extends
GridCommonAbstractTest {
@Override public void initExtensions(PluginContext ctx,
ExtensionRegistry registry) {
registry.registerExtension(MessageFactoryProvider.class, new
MessageFactoryProvider() {
@Override public void registerAll(MessageFactory factory) {
- factory.register(MSG_DIRECT_TYPE, TestMessage::new);
+ factory.register(DIRECT_TYPE,
DuplicateDirectTypeIdMessage::new, new
DuplicateDirectTypeIdMessageSerializer());
}
});
}
}
-
- /** Test message with already registered direct type. */
- private static class TestMessage implements Message {
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer)
{
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader
reader) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return MSG_DIRECT_TYPE;
- }
-
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestMessage1.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestMessage1.java
new file mode 100644
index 00000000000..c40795c0dc2
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestMessage1.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Test message. */
+public class TestMessage1 implements Message {
+ /** Test message 1 type. */
+ static final short TEST_MSG_1_TYPE = 1;
+
+ /** */
+ @Order(0)
+ int val;
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TEST_MSG_1_TYPE;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestMessage2.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestMessage2.java
new file mode 100644
index 00000000000..a457623f235
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestMessage2.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Test message. */
+public class TestMessage2 implements Message {
+ /** Test message 2 type. */
+ static final short TEST_MSG_2_TYPE = 2;
+
+ /** */
+ @Order(0)
+ int val;
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TEST_MSG_2_TYPE;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestMessage42.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestMessage42.java
new file mode 100644
index 00000000000..e1256cf74ca
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestMessage42.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Test message. */
+public class TestMessage42 implements Message {
+ /** Test message 42 type. */
+ static final short TEST_MSG_42_TYPE = 42;
+
+ /** */
+ @Order(0)
+ int val;
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TEST_MSG_42_TYPE;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestOverByteIdMessage.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestOverByteIdMessage.java
new file mode 100644
index 00000000000..99df96d8c56
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestOverByteIdMessage.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Test message with over byte direct type. */
+public class TestOverByteIdMessage implements Message {
+ /** Direct type. */
+ static final short DIRECT_TYPE_OVER_BYTE = 1000;
+
+ /** */
+ @Order(0)
+ int val;
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return DIRECT_TYPE_OVER_BYTE;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestValidByteIdMessage.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestValidByteIdMessage.java
new file mode 100644
index 00000000000..6103a5f09d7
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/TestValidByteIdMessage.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+
+/** Test message with correct direct type. */
+public class TestValidByteIdMessage implements Message {
+ /** Direct type. */
+ static final short DIRECT_TYPE = -127;
+
+ /** */
+ @Order(0)
+ int val;
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return DIRECT_TYPE;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
index 4642ac20064..617be451eb2 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConditionalDeploymentSelfTest.java
@@ -22,11 +22,7 @@ import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.plugin.AbstractTestPluginProvider;
-import org.apache.ignite.plugin.ExtensionRegistry;
-import org.apache.ignite.plugin.PluginContext;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
+import org.apache.ignite.spi.MessagesPluginProvider;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
@@ -45,7 +41,7 @@ public class GridCacheConditionalDeploymentSelfTest extends
GridCommonAbstractTe
cfg.setCacheConfiguration(cacheConfiguration());
- cfg.setPluginProviders(new TestPluginProvider());
+ cfg.setPluginProviders(new
MessagesPluginProvider(TestCacheMessage.class));
return cfg;
}
@@ -89,7 +85,7 @@ public class GridCacheConditionalDeploymentSelfTest extends
GridCommonAbstractTe
public void testNoDeploymentInfo() throws Exception {
GridCacheIoManager ioMgr = cacheIoManager();
- TestMessage msg = new TestMessage();
+ TestCacheMessage msg = new TestCacheMessage();
assertNull(msg.deployInfo());
@@ -100,13 +96,6 @@ public class GridCacheConditionalDeploymentSelfTest extends
GridCommonAbstractTe
assertNull(msg.deployInfo());
}
- /**
- * @return Cache context.
- */
- protected GridCacheContext<?, ?> cacheContext() {
- return ((IgniteCacheProxy<?,
?>)grid(0).cache(DEFAULT_CACHE_NAME)).context();
- }
-
/**
* @return IO manager.
*/
@@ -114,42 +103,7 @@ public class GridCacheConditionalDeploymentSelfTest
extends GridCommonAbstractTe
return grid(0).context().cache().context().io();
}
- /**
- * Test message class.
- */
- public static class TestMessage extends GridCacheMessage implements
GridCacheDeployable {
- /** */
- public static final short DIRECT_TYPE = 302;
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return DIRECT_TYPE;
- }
-
- /** {@inheritDoc} */
- @Override public boolean addDeploymentInfo() {
- return addDepInfo;
- }
- }
-
/** */
private static class TestValue {
}
-
- /** */
- public static class TestPluginProvider extends AbstractTestPluginProvider {
- /** {@inheritDoc} */
- @Override public String name() {
- return "TEST_PLUGIN";
- }
-
- /** {@inheritDoc} */
- @Override public void initExtensions(PluginContext ctx,
ExtensionRegistry registry) {
- registry.registerExtension(MessageFactoryProvider.class, new
MessageFactoryProvider() {
- @Override public void registerAll(MessageFactory factory) {
- factory.register(TestMessage.DIRECT_TYPE,
TestMessage::new);
- }
- });
- }
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheMessage.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheMessage.java
new file mode 100644
index 00000000000..07da9281c9e
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/TestCacheMessage.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/** Test cache message. */
+public class TestCacheMessage extends GridCacheMessage implements
GridCacheDeployable {
+ /** {@inheritDoc} */
+ @Override public boolean addDeploymentInfo() {
+ return addDepInfo;
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java
b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java
index f624b9c0862..b129cd0043f 100644
---
a/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java
+++
b/modules/core/src/test/java/org/apache/ignite/loadtests/communication/GridTestMessage.java
@@ -21,12 +21,9 @@ import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
-import java.nio.ByteBuffer;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
@@ -103,15 +100,4 @@ class GridTestMessage implements Message, Externalizable {
str = U.readString(in);
bytes = U.readByteArray(in);
}
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- return true;
- }
-
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
index 5d18e781e1e..891f6ae48a2 100644
---
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/CommunicationConnectionPoolMetricsTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.spi.communication.tcp;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -30,7 +29,6 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
@@ -40,14 +38,8 @@ import
org.apache.ignite.internal.util.nio.GridCommunicationClient;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.AbstractTestPluginProvider;
-import org.apache.ignite.plugin.ExtensionRegistry;
-import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import
org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.spi.MessagesPluginProvider;
import org.apache.ignite.spi.communication.tcp.internal.ConnectionClientPool;
import org.apache.ignite.spi.metric.IntMetric;
import org.apache.ignite.spi.metric.LongMetric;
@@ -134,7 +126,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
cfg.setCommunicationSpi(communicationSpi);
- cfg.setPluginProviders(new TestCommunicationMessagePluginProvider());
+ cfg.setPluginProviders(new
MessagesPluginProvider(TestDelayMessage.class));
return cfg;
}
@@ -154,7 +146,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
Ignite ldr = clientLdr ? cli : srvr;
AtomicBoolean runFlag = new AtomicBoolean(true);
- TestMessage msg = new TestMessage();
+ TestDelayMessage msg = new TestDelayMessage();
IgniteInternalFuture<?> loadFut = runLoad(ldr, runFlag, () -> msg,
null);
@@ -206,7 +198,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
Ignite ldr = clientLdr ? cli : srvr;
AtomicBoolean runFlag = new AtomicBoolean(true);
- Message msg = new TestMessage();
+ Message msg = new TestDelayMessage();
IgniteInternalFuture<?> loadFut = runLoad(ldr, runFlag, () -> msg,
null, maxConnIdleTimeout, maxConnIdleTimeout * 4);
@@ -257,7 +249,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
AtomicBoolean runFlag = new AtomicBoolean(true);
AtomicLong loadCnt = new AtomicLong(preloadCnt);
- TestMessage msg = new TestMessage();
+ TestDelayMessage msg = new TestDelayMessage();
long loadMillis0 = System.currentTimeMillis();
@@ -368,7 +360,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
}
});
- IgniteInternalFuture<?> loadFut = runLoad(ldr, runFlag, () -> new
TestMessage((int)maxConnIdleTimeout * 3), null);
+ IgniteInternalFuture<?> loadFut = runLoad(ldr, runFlag, () -> new
TestDelayMessage((int)maxConnIdleTimeout * 3), null);
monFut.get(getTestTimeout());
@@ -395,7 +387,7 @@ public class CommunicationConnectionPoolMetricsTest extends
GridCommonAbstractTe
IgniteInternalFuture<?> loadFut = runLoad(
ldr,
runFlag,
- () -> new TestMessage(writeDelay.get()),
+ () -> new TestDelayMessage(writeDelay.get()),
loadCnt
);
@@ -517,75 +509,10 @@ public class CommunicationConnectionPoolMetricsTest
extends GridCommonAbstractTe
return 120 * 1000;
}
- /** */
- public static class TestCommunicationMessagePluginProvider extends
AbstractTestPluginProvider {
- /** {@inheritDoc} */
- @Override public String name() {
- return "TEST_PLUGIN";
- }
-
- /** {@inheritDoc} */
- @Override public void initExtensions(PluginContext ctx,
ExtensionRegistry registry) {
- registry.registerExtension(MessageFactoryProvider.class, new
MessageFactoryProvider() {
- @Override public void registerAll(MessageFactory factory) {
- factory.register(TestMessage.DIRECT_TYPE,
TestMessage::new);
- }
- });
- }
- }
-
/** */
public static MetricRegistryImpl metricsForCommunicationConnection(Ignite
from, Ignite to) {
return ((IgniteEx)from).context()
.metric()
.registry(metricName(SHARED_METRICS_REGISTRY_NAME,
((IgniteEx)to).context().localNodeId().toString()));
}
-
- /** */
- private static class TestMessage implements Message {
- /** */
- public static final short DIRECT_TYPE = 200;
-
- /** */
- private final int writeDelay;
-
- /** */
- public TestMessage(int writeDelay) {
- this.writeDelay = writeDelay;
- }
-
- /** */
- public TestMessage() {
- this(0);
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer)
{
- if (writeDelay > 0) {
- try {
- U.sleep(writeDelay);
- }
- catch (IgniteInterruptedCheckedException ignored) {
- // No-op.
- }
- }
-
- writer.setBuffer(buf);
-
- if (!writer.writeHeader(directType()))
- return false;
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader
reader) {
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return DIRECT_TYPE;
- }
- }
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TestDelayMessage.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TestDelayMessage.java
new file mode 100644
index 00000000000..4ed72c9905f
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TestDelayMessage.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.MarshallableMessage;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+
+/** Test message. */
+public class TestDelayMessage implements MarshallableMessage {
+ /** */
+ @Order(0)
+ int val;
+
+ /** */
+ private final int writeDelay;
+
+ /** */
+ public TestDelayMessage(int writeDelay) {
+ this.writeDelay = writeDelay;
+ }
+
+ /** */
+ public TestDelayMessage() {
+ this(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void prepareMarshal(Marshaller marsh) throws
IgniteCheckedException {
+ if (writeDelay > 0) {
+ try {
+ U.sleep(writeDelay);
+ }
+ catch (IgniteInterruptedCheckedException ignored) {
+ // No-op.
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void finishUnmarshal(Marshaller marsh, ClassLoader
clsLdr) throws IgniteCheckedException {
+ // No-op.
+ }
+}