This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 9d96b6cfaae IGNITE-26280 Remove Message interface from CacheObject
(#12297)
9d96b6cfaae is described below
commit 9d96b6cfaae59b206b41892175a14e161747a979
Author: Maksim Timonin <[email protected]>
AuthorDate: Fri Sep 5 15:17:15 2025 +0500
IGNITE-26280 Remove Message interface from CacheObject (#12297)
---
.../query/calcite/message/QueryTxEntry.java | 8 +-
.../internal/MessageSerializerGenerator.java | 18 ++
.../internal/binary/BinaryEnumObjectImpl.java | 81 ---------
.../ignite/internal/binary/BinaryObjectImpl.java | 65 -------
.../internal/binary/BinaryObjectOffheapImpl.java | 22 ---
.../apache/ignite/internal/binary/BinaryUtils.java | 11 --
.../internal/direct/DirectMessageReader.java | 35 +++-
.../internal/direct/DirectMessageWriter.java | 20 +++
.../direct/stream/DirectByteBufferStream.java | 196 +++++++++++++++++++++
.../managers/communication/GridIoManager.java | 2 +-
.../communication/GridIoMessageFactory.java | 9 -
.../cache/CacheEntryPredicateContainsValue.java | 4 +-
.../processors/cache/CacheInvokeDirectResult.java | 8 +-
.../internal/processors/cache/CacheObject.java | 3 +-
.../processors/cache/CacheObjectAdapter.java | 43 -----
.../processors/cache/CacheObjectByteArrayImpl.java | 53 ------
.../internal/processors/cache/CacheObjectImpl.java | 10 --
.../internal/processors/cache/GridCacheReturn.java | 4 +-
.../processors/cache/KeyCacheObjectImpl.java | 62 -------
.../binary/CacheObjectBinaryProcessorImpl.java | 24 ++-
.../distributed/GridCacheTtlUpdateRequest.java | 8 +-
.../distributed/GridDistributedLockRequest.java | 4 +-
.../distributed/GridDistributedLockResponse.java | 4 +-
.../distributed/GridDistributedUnlockRequest.java | 4 +-
.../cache/distributed/dht/GridDhtCacheAdapter.java | 2 +-
.../cache/distributed/dht/GridDhtLockRequest.java | 4 +-
.../dht/GridPartitionedSingleGetFuture.java | 2 +-
.../atomic/GridDhtAtomicSingleUpdateRequest.java | 12 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 20 +--
.../dht/atomic/GridDhtAtomicUpdateResponse.java | 4 +-
.../atomic/GridNearAtomicFullUpdateRequest.java | 8 +-
.../atomic/GridNearAtomicSingleUpdateRequest.java | 8 +-
.../cache/distributed/dht/atomic/UpdateErrors.java | 4 +-
.../dht/preloader/GridDhtForceKeysRequest.java | 4 +-
.../dht/preloader/GridDhtForceKeysResponse.java | 4 +-
.../cache/distributed/near/GridNearGetRequest.java | 4 +-
.../distributed/near/GridNearSingleGetRequest.java | 4 +-
.../cache/query/GridCacheQueryRequest.java | 4 +-
.../continuous/CacheContinuousQueryEntry.java | 12 +-
.../cache/transactions/IgniteTxEntry.java | 4 +-
.../cache/transactions/TxEntryValueHolder.java | 4 +-
.../cacheobject/IgniteCacheObjectProcessor.java | 8 +-
.../processors/datastreamer/DataStreamerEntry.java | 8 +-
.../communication/MessageCollectionItemType.java | 8 +-
.../extensions/communication/MessageReader.java | 16 ++
.../extensions/communication/MessageWriter.java | 18 ++
.../internal/GridAffinityNoCacheSelfTest.java | 22 ---
.../direct/DirectMarshallingMessagesTest.java | 2 +-
...irectByteBufferStreamImplByteOrderSelfTest.java | 2 +-
...tractCommunicationMessageSerializationTest.java | 26 +++
.../cache/IgniteIncompleteCacheObjectSelfTest.java | 22 ---
...niteCacheContinuousQueryImmutableEntryTest.java | 2 +-
.../processors/database/CacheFreeListSelfTest.java | 28 ---
.../ignite/testframework/GridSpiTestContext.java | 2 +-
.../src/test/resources/codegen/TestMessage.java | 24 +++
.../resources/codegen/TestMessageSerializer.java | 27 +++
.../query/h2/twostep/msg/GridH2CacheObject.java | 4 +-
57 files changed, 493 insertions(+), 528 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
index 2bbc238c6ac..87b01d0ba04 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
@@ -137,13 +137,13 @@ public class QueryTxEntry implements CalciteMessage {
writer.incrementState();
case 1:
- if (!writer.writeMessage(key))
+ if (!writer.writeKeyCacheObject(key))
return false;
writer.incrementState();
case 2:
- if (!writer.writeMessage(val))
+ if (!writer.writeCacheObject(val))
return false;
writer.incrementState();
@@ -173,7 +173,7 @@ public class QueryTxEntry implements CalciteMessage {
reader.incrementState();
case 1:
- key = reader.readMessage();
+ key = reader.readKeyCacheObject();
if (!reader.isLastRead())
return false;
@@ -181,7 +181,7 @@ public class QueryTxEntry implements CalciteMessage {
reader.incrementState();
case 2:
- val = reader.readMessage();
+ val = reader.readCacheObject();
if (!reader.isLastRead())
return false;
diff --git
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
index 1e26d38c451..73dff879e1a 100644
---
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
+++
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
@@ -364,6 +364,12 @@ class MessageSerializerGenerator {
"MessageCollectionItemType." +
messageCollectionItemType(typeArgs.get(1)));
}
+ else if (assignableFrom(type,
type("org.apache.ignite.internal.processors.cache.KeyCacheObject")))
+ returnFalseIfWriteFailed(write, "writer.writeKeyCacheObject",
getExpr);
+
+ else if (assignableFrom(type,
type("org.apache.ignite.internal.processors.cache.CacheObject")))
+ returnFalseIfWriteFailed(write, "writer.writeCacheObject",
getExpr);
+
else if (assignableFrom(type, type(MESSAGE_INTERFACE)))
returnFalseIfWriteFailed(write, "writer.writeMessage",
getExpr);
@@ -487,6 +493,12 @@ class MessageSerializerGenerator {
"MessageCollectionItemType." +
messageCollectionItemType(typeArgs.get(1)), "false");
}
+ else if (assignableFrom(type,
type("org.apache.ignite.internal.processors.cache.KeyCacheObject")))
+ returnFalseIfReadFailed(name, "reader.readKeyCacheObject");
+
+ else if (assignableFrom(type,
type("org.apache.ignite.internal.processors.cache.CacheObject")))
+ returnFalseIfReadFailed(name, "reader.readCacheObject");
+
else if (assignableFrom(type, type(MESSAGE_INTERFACE)))
returnFalseIfReadFailed(name, "reader.readMessage");
@@ -547,6 +559,12 @@ class MessageSerializerGenerator {
if (sameType(type,
"org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion"))
return "AFFINITY_TOPOLOGY_VERSION";
+ if (sameType(type,
"org.apache.ignite.internal.processors.cache.KeyCacheObject"))
+ return "KEY_CACHE_OBJECT";
+
+ if (sameType(type,
"org.apache.ignite.internal.processors.cache.CacheObject"))
+ return "CACHE_OBJECT";
+
PrimitiveType primitiveType = unboxedType(type);
if (primitiveType != null)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
index 032287b5f9b..0101ef34fd6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
@@ -37,8 +37,6 @@ import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.marshaller.Marshallers;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -386,85 +384,6 @@ class BinaryEnumObjectImpl implements BinaryObjectEx,
Externalizable, CacheObjec
// No-op.
}
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 119;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeString(clsName))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeInt(ord))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeInt(typeId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- clsName = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- ord = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- typeId = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public int size() {
if (valBytes == null) {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index 6e22af7f01d..6e4bd14078a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -45,8 +45,6 @@ import
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.marshaller.Marshallers;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -775,11 +773,6 @@ final class BinaryObjectImpl extends BinaryObjectExImpl
implements Externalizabl
return ((BinaryReaderExImpl)reader(null, false)).getOrCreateSchema();
}
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
if (detachAllowed) {
@@ -807,64 +800,6 @@ final class BinaryObjectImpl extends BinaryObjectExImpl
implements Externalizabl
start = in.readInt();
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray(valBytes))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeInt(part))
- return false;
-
- writer.incrementState();
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- valBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- part = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 113;
- }
-
/**
* Runs value deserialization regardless of whether obj already has the
deserialized value.
* Will set obj if descriptor is configured to keep deserialized values.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
index 2945626641e..57e3d9c8eba 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
@@ -40,8 +40,6 @@ import
org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.marshaller.Marshallers;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -481,21 +479,6 @@ class BinaryObjectOffheapImpl extends BinaryObjectExImpl
implements Externalizab
throw new UnsupportedOperationException();
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- throw new UnsupportedOperationException();
- }
-
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
throw new UnsupportedOperationException(); // To make sure it is not
marshalled.
@@ -506,11 +489,6 @@ class BinaryObjectOffheapImpl extends BinaryObjectExImpl
implements Externalizab
throw new UnsupportedOperationException(); // To make sure it is not
marshalled.
}
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
/** {@inheritDoc} */
@Override public int size() {
return length();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index f82dbdc243a..643442d6aa8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -55,9 +55,7 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.function.BiConsumer;
import java.util.function.Function;
-import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
@@ -83,7 +81,6 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.marshaller.Marshallers;
import org.apache.ignite.platform.PlatformType;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -1684,14 +1681,6 @@ public class BinaryUtils {
return new BinaryEnumObjectImpl(ctx, arr);
}
- /**
- * @param register Register method.
- */
- public static void registerMessages(BiConsumer<Short, Supplier<Message>>
register) {
- register.accept((short)113, BinaryObjectImpl::new);
- register.accept((short)119, BinaryEnumObjectImpl::new);
- }
-
/** */
public static BinaryObjectEx binaryObject(BinaryContext ctx, byte[] arr,
int start) {
return new BinaryObjectImpl(ctx, arr, start);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index 999ec90d19d..77d8a10f8d1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -26,6 +26,9 @@ import
org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteOutClosure;
@@ -49,11 +52,12 @@ public class DirectMessageReader implements MessageReader {
/**
* @param msgFactory Message factory.
+ * @param cacheObjProc Cache object processor.
*/
- public DirectMessageReader(final MessageFactory msgFactory) {
+ public DirectMessageReader(final MessageFactory msgFactory,
IgniteCacheObjectProcessor cacheObjProc) {
state = new DirectMessageState<>(StateItem.class, new
IgniteOutClosure<StateItem>() {
@Override public StateItem apply() {
- return new StateItem(msgFactory);
+ return new StateItem(msgFactory, cacheObjProc);
}
});
}
@@ -305,6 +309,28 @@ public class DirectMessageReader implements MessageReader {
return msg;
}
+ /** {@inheritDoc} */
+ @Override public CacheObject readCacheObject() {
+ DirectByteBufferStream stream = state.item().stream;
+
+ CacheObject val = stream.readCacheObject();
+
+ lastRead = stream.lastFinished();
+
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject readKeyCacheObject() {
+ DirectByteBufferStream stream = state.item().stream;
+
+ KeyCacheObject key = stream.readKeyCacheObject();
+
+ lastRead = stream.lastFinished();
+
+ return key;
+ }
+
/** {@inheritDoc} */
@Override public <T> T[] readObjectArray(MessageCollectionItemType
itemType, Class<T> itemCls) {
DirectByteBufferStream stream = state.item().stream;
@@ -385,9 +411,10 @@ public class DirectMessageReader implements MessageReader {
/**
* @param msgFactory Message factory.
+ * @param cacheObjProc Cache object processor.
*/
- public StateItem(MessageFactory msgFactory) {
- stream = new DirectByteBufferStream(msgFactory);
+ public StateItem(MessageFactory msgFactory, IgniteCacheObjectProcessor
cacheObjProc) {
+ stream = new DirectByteBufferStream(msgFactory, cacheObjProc);
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index bfc76e0ddcc..0b68dbd3fd5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -26,6 +26,8 @@ import
org.apache.ignite.internal.direct.state.DirectMessageState;
import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteOutClosure;
@@ -283,6 +285,24 @@ public class DirectMessageWriter implements MessageWriter {
return stream.lastFinished();
}
+ /** {@inheritDoc} */
+ @Override public boolean writeCacheObject(@Nullable CacheObject obj) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeCacheObject(obj);
+
+ return stream.lastFinished();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeKeyCacheObject(KeyCacheObject obj) {
+ DirectByteBufferStream stream = state.item().stream;
+
+ stream.writeKeyCacheObject(obj);
+
+ return stream.lastFinished();
+ }
+
/** {@inheritDoc} */
@Override public <T> boolean writeObjectArray(T[] arr,
MessageCollectionItemType itemType) {
DirectByteBufferStream stream = state.item().stream;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
index ae92154d2c1..c6da5fe96da 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
@@ -27,8 +27,12 @@ import java.util.List;
import java.util.Map;
import java.util.RandomAccess;
import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -212,6 +216,10 @@ public class DirectByteBufferStream {
@GridToStringExclude
private final MessageFactory msgFactory;
+ /** Is required to instantiate {@link CacheObject} while reading messages.
*/
+ @GridToStringExclude
+ private final IgniteCacheObjectProcessor cacheObjProc;
+
/** */
@GridToStringExclude
protected ByteBuffer buf;
@@ -313,10 +321,41 @@ public class DirectByteBufferStream {
private int topVerMinor;
/**
+ * This field represents a phase of reading or writing {@code CacheObject}
object enabling ser/des mechanism to keep
+ * track of fields that are already read/written.
+ */
+ private byte cacheObjState;
+
+ /** */
+ private byte[] cacheObjArr;
+
+ /** */
+ private int keyCacheObjPart;
+
+ /** */
+ private byte cacheObjType;
+
+ /**
+ * Constructror for stream used for writing messages.
+ *
* @param msgFactory Message factory.
*/
public DirectByteBufferStream(MessageFactory msgFactory) {
this.msgFactory = msgFactory;
+
+ // Is not used while writing messages.
+ cacheObjProc = null;
+ }
+
+ /**
+ * Constructror for stream used for reading messages.
+ *
+ * @param msgFactory Message factory.
+ * @param cacheObjProc Cache object processor.
+ */
+ public DirectByteBufferStream(MessageFactory msgFactory,
IgniteCacheObjectProcessor cacheObjProc) {
+ this.msgFactory = msgFactory;
+ this.cacheObjProc = cacheObjProc;
}
/**
@@ -751,6 +790,78 @@ public class DirectByteBufferStream {
writeInt(-1);
}
+ /**
+ * @param obj Cache object.
+ */
+ public void writeCacheObject(CacheObject obj) {
+ try {
+ if (obj != null) {
+ switch (cacheObjState) {
+ case 0:
+ writeByte(obj.cacheObjectType());
+
+ if (!lastFinished)
+ return;
+
+ cacheObjState++;
+
+ case 1:
+ writeByteArray(obj.valueBytes(null));
+
+ if (!lastFinished)
+ return;
+
+ cacheObjState = 0;
+ }
+ }
+ else
+ writeByte((byte)-1);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @param keyObj Key cache object.
+ */
+ public void writeKeyCacheObject(KeyCacheObject keyObj) {
+ try {
+ if (keyObj != null) {
+ switch (cacheObjState) {
+ case 0:
+ writeByte(keyObj.cacheObjectType());
+
+ if (!lastFinished)
+ return;
+
+ cacheObjState++;
+
+ case 1:
+ writeByteArray(keyObj.valueBytes(null));
+
+ if (!lastFinished)
+ return;
+
+ cacheObjState++;
+
+ case 2:
+ writeInt(keyObj.partition());
+
+ if (!lastFinished)
+ return;
+
+ cacheObjState = 0;
+ }
+ }
+ else
+ writeByte((byte)-1);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
/**
* @param msg Message.
* @param writer Writer.
@@ -1313,6 +1424,74 @@ public class DirectByteBufferStream {
return new AffinityTopologyVersion(topVerMajor, topVerMinor);
}
+ /**
+ * @return Value.
+ */
+ public KeyCacheObject readKeyCacheObject() {
+ switch (cacheObjState) {
+ case 0:
+ cacheObjType = readByte();
+
+ if (!lastFinished || cacheObjType == (byte)-1)
+ return null;
+
+ cacheObjState++;
+
+ case 1:
+ cacheObjArr = readByteArray();
+
+ if (!lastFinished)
+ return null;
+
+ cacheObjState++;
+
+ case 2:
+ keyCacheObjPart = readInt();
+
+ if (!lastFinished)
+ return null;
+
+ cacheObjState = 0;
+ }
+
+ try {
+ KeyCacheObject key = cacheObjProc.toKeyCacheObject(null,
cacheObjType, cacheObjArr);
+
+ if (keyCacheObjPart != -1)
+ key.partition(keyCacheObjPart);
+
+ return key;
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ /**
+ * @return Value.
+ */
+ public CacheObject readCacheObject() {
+ switch (cacheObjState) {
+ case 0:
+ cacheObjType = readByte();
+
+ if (!lastFinished || cacheObjType == (byte)-1)
+ return null;
+
+ cacheObjState++;
+
+ case 1:
+ cacheObjArr = readByteArray();
+
+ if (!lastFinished)
+ return null;
+
+ cacheObjState = 0;
+ }
+
+ return cacheObjProc.toCacheObject(null, cacheObjType, cacheObjArr);
+ }
+
/**
* @param reader Reader.
* @return Message.
@@ -1862,6 +2041,17 @@ public class DirectByteBufferStream {
writeAffinityTopologyVersion((AffinityTopologyVersion)val);
break;
+
+ case KEY_CACHE_OBJECT:
+ writeKeyCacheObject((KeyCacheObject)val);
+
+ break;
+
+ case CACHE_OBJECT:
+ writeCacheObject((CacheObject)val);
+
+ break;
+
case MSG:
try {
if (val != null)
@@ -1951,6 +2141,12 @@ public class DirectByteBufferStream {
case AFFINITY_TOPOLOGY_VERSION:
return readAffinityTopologyVersion();
+ case KEY_CACHE_OBJECT:
+ return readKeyCacheObject();
+
+ case CACHE_OBJECT:
+ return readCacheObject();
+
case MSG:
return readMessage(reader);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 04f156b759e..f2f23c443c3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -440,7 +440,7 @@ public class GridIoManager extends
GridManagerAdapter<CommunicationSpi<Object>>
}
@Override public MessageReader reader(UUID rmtNodeId,
MessageFactory msgFactory) {
- return new DirectMessageReader(msgFactory);
+ return new DirectMessageReader(msgFactory,
ctx.cacheObjects());
}
};
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 43bae86aba7..207190bfd75 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.GridJobSiblingsResponse;
import org.apache.ignite.internal.GridTaskCancelRequest;
import org.apache.ignite.internal.GridTaskSessionRequest;
import org.apache.ignite.internal.IgniteDiagnosticMessage;
-import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.codegen.CacheEvictionEntrySerializer;
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
import
org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
@@ -80,12 +79,9 @@ import
org.apache.ignite.internal.processors.cache.CacheEntryPredicateContainsVa
import
org.apache.ignite.internal.processors.cache.CacheEntrySerializablePredicate;
import org.apache.ignite.internal.processors.cache.CacheEvictionEntry;
import org.apache.ignite.internal.processors.cache.CacheInvokeDirectResult;
-import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
-import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import
org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
-import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
import
org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
@@ -304,8 +300,6 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)86, GridCacheVersion::new, new
GridCacheVersionSerializer());
factory.register((short)87, GridDhtPartitionExchangeId::new, new
GridDhtPartitionExchangeIdSerializer());
factory.register((short)88, GridCacheReturn::new);
- factory.register((short)89, CacheObjectImpl::new);
- factory.register((short)90, KeyCacheObjectImpl::new);
factory.register((short)91, GridCacheEntryInfo::new, new
GridCacheEntryInfoSerializer());
factory.register((short)92, CacheEntryInfoCollection::new);
factory.register((short)93, CacheInvokeDirectResult::new);
@@ -320,7 +314,6 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)102, CacheVersionedValue::new, new
CacheVersionedValueSerializer());
factory.register((short)103, GridCacheRawVersionedEntry::new);
factory.register((short)104, GridCacheVersionEx::new, new
GridCacheVersionExSerializer());
- factory.register((short)105, CacheObjectByteArrayImpl::new);
factory.register((short)106, GridQueryCancelRequest::new);
factory.register((short)107, GridQueryFailResponse::new, new
GridQueryFailResponseSerializer());
factory.register((short)108, GridQueryNextPageRequest::new, new
GridQueryNextPageRequestSerializer());
@@ -332,8 +325,6 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)116, GridNearSingleGetRequest::new);
factory.register((short)117, GridNearSingleGetResponse::new);
factory.register((short)118, CacheContinuousQueryBatchAck::new);
- // 119 - BinaryEnumObjectImpl
- BinaryUtils.registerMessages(factory::register);
// [120..123] - DR
factory.register((short)125, GridNearAtomicSingleUpdateRequest::new);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
index d7b9ff6251c..5a39c212b9b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
@@ -105,7 +105,7 @@ public class CacheEntryPredicateContainsValue extends
CacheEntryPredicateAdapter
switch (writer.state()) {
case 0:
- if (!writer.writeMessage(val))
+ if (!writer.writeCacheObject(val))
return false;
writer.incrementState();
@@ -124,7 +124,7 @@ public class CacheEntryPredicateContainsValue extends
CacheEntryPredicateAdapter
switch (reader.state()) {
case 0:
- val = reader.readMessage();
+ val = reader.readCacheObject();
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
index 1a7ab5e1823..d26382b762c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
@@ -206,13 +206,13 @@ public class CacheInvokeDirectResult implements Message,
Serializable {
writer.incrementState();
case 1:
- if (!writer.writeMessage(key))
+ if (!writer.writeKeyCacheObject(key))
return false;
writer.incrementState();
case 2:
- if (!writer.writeMessage(res))
+ if (!writer.writeCacheObject(res))
return false;
writer.incrementState();
@@ -236,7 +236,7 @@ public class CacheInvokeDirectResult implements Message,
Serializable {
reader.incrementState();
case 1:
- key = reader.readMessage();
+ key = reader.readKeyCacheObject();
if (!reader.isLastRead())
return false;
@@ -244,7 +244,7 @@ public class CacheInvokeDirectResult implements Message,
Serializable {
reader.incrementState();
case 2:
- res = reader.readMessage();
+ res = reader.readCacheObject();
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
index cfb4d5c3431..eefd03e0841 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
@@ -19,13 +19,12 @@ package org.apache.ignite.internal.processors.cache;
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
*
*/
-public interface CacheObject extends Message {
+public interface CacheObject {
/** */
public static final byte TYPE_REGULAR = 1;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
index ff995a1008a..a06dee96191 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
@@ -28,8 +28,6 @@ import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
@@ -154,47 +152,6 @@ public abstract class CacheObjectAdapter implements
CacheObject, Externalizable
return objectPutSize(valBytes.length);
}
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- valBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray(valBytes))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(S.includeSensitive() ? getClass().getSimpleName() :
"CacheObject",
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
index c9f0cb0dcc3..7a0fc222d33 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
@@ -26,8 +26,6 @@ import java.util.Arrays;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -123,52 +121,6 @@ public class CacheObjectByteArrayImpl implements
CacheObject, Externalizable {
// No-op.
}
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray(val))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- val = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
val = U.readByteArray(in);
@@ -179,11 +131,6 @@ public class CacheObjectByteArrayImpl implements
CacheObject, Externalizable {
U.writeByteArray(out, val);
}
- /** {@inheritDoc} */
- @Override public short directType() {
- return 105;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return "CacheObjectByteArrayImpl [arrLen=" + (val != null ? val.length
: 0) + ']';
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
index cfe2effd4fe..641fc45c589 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
@@ -126,16 +126,6 @@ public class CacheObjectImpl extends CacheObjectAdapter {
val = valueFromValueBytes(ctx, ldr);
}
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 89;
- }
-
/** {@inheritDoc} */
@Override public int hashCode() {
assert false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
index fbc4a777db6..b1b7574a2e4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
@@ -415,7 +415,7 @@ public class GridCacheReturn implements Externalizable,
Message {
writer.incrementState();
case 1:
- if (!writer.writeMessage(cacheObj))
+ if (!writer.writeCacheObject(cacheObj))
return false;
writer.incrementState();
@@ -457,7 +457,7 @@ public class GridCacheReturn implements Externalizable,
Message {
reader.incrementState();
case 1:
- cacheObj = reader.readMessage();
+ cacheObj = reader.readCacheObject();
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index d81f26b2b7c..4d3220dd5da 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -17,13 +17,10 @@
package org.apache.ignite.internal.processors.cache;
-import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -49,8 +46,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter
implements KeyCacheOb
* @param part Partition.
*/
public KeyCacheObjectImpl(Object val, byte[] valBytes, int part) {
- assert val != null;
-
this.val = val;
this.valBytes = valBytes;
this.part = part;
@@ -106,11 +101,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter
implements KeyCacheOb
return (T)val;
}
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
/** {@inheritDoc} */
@Override public CacheObject prepareForCache(CacheObjectValueContext ctx) {
return this;
@@ -123,58 +113,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter
implements KeyCacheOb
return IgniteUtils.hashCode(val);
}
- /** {@inheritDoc} */
- @Override public short directType() {
- return 90;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 1:
- part = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 1:
- if (!writer.writeInt(part))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public void prepareMarshal(CacheObjectValueContext ctx) throws
IgniteCheckedException {
if (valBytes == null)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index bf3dff665c2..651f9b8a38f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -180,6 +180,9 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
/** Cached affinity key field names. */
private final ConcurrentHashMap<Integer, T1<BinaryField>> affKeyFields =
new ConcurrentHashMap<>();
+ /** Dummy {@code CacheObjectValueContext} used for mocking. */
+ private CacheObjectValueContext fakeCacheObjCtx;
+
/*
* Static initializer
*/
@@ -326,6 +329,8 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
if (!ctx.clientNode())
metadataFileStore.restoreMetadata();
+
+ fakeCacheObjCtx = new CacheObjectContext(ctx, null, null, false,
false, false, false, false);
}
/** {@inheritDoc} */
@@ -1293,10 +1298,19 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
}
/** {@inheritDoc} */
- @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte
type, byte[] bytes) {
+ @Override public CacheObject toCacheObject(@Nullable CacheObjectContext
ctx, byte type, byte[] bytes) {
switch (type) {
case CacheObject.TYPE_BINARY:
- return (CacheObject)BinaryUtils.binaryObject(binaryContext(),
bytes, ctx);
+ CacheObjectValueContext coctx = ctx;
+
+ // CacheObjectContext is actually required only if
transformation is enabled.
+ // In this case CacheObjectContext#kernalContext is used only.
+ if (coctx == null && this.ctx.transformer() != null)
+ coctx = fakeCacheObjCtx;
+
+ return coctx == null
+ ? (CacheObject)BinaryUtils.binaryObject(binaryContext(),
bytes)
+ : (CacheObject)BinaryUtils.binaryObject(binaryContext(),
bytes, coctx);
case CacheObject.TYPE_BINARY_ENUM:
return (CacheObject)BinaryUtils.binaryEnum(binaryContext(),
bytes);
@@ -1312,7 +1326,7 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
}
/** {@inheritDoc} */
- @Override public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx,
byte type, byte[] bytes)
+ @Override public KeyCacheObject toKeyCacheObject(@Nullable
CacheObjectContext ctx, byte type, byte[] bytes)
throws IgniteCheckedException {
switch (type) {
case CacheObject.TYPE_BINARY:
@@ -1322,7 +1336,9 @@ public class CacheObjectBinaryProcessorImpl extends
GridProcessorAdapter impleme
throw new IllegalArgumentException("Byte arrays cannot be used
as cache keys.");
case CacheObject.TYPE_REGULAR:
- return new
KeyCacheObjectImpl(ctx.kernalContext().cacheObjects().unmarshal(ctx, bytes,
null), bytes, -1);
+ return ctx == null
+ ? new KeyCacheObjectImpl(null, bytes, -1)
+ : new
KeyCacheObjectImpl(ctx.kernalContext().cacheObjects().unmarshal(ctx, bytes,
null), bytes, -1);
}
throw new IllegalArgumentException("Invalid object type: " + type);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
index 72a94d75dd4..c822a5fab2f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -211,13 +211,13 @@ public class GridCacheTtlUpdateRequest extends
GridCacheIdMessage {
switch (writer.state()) {
case 4:
- if (!writer.writeCollection(keys,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(keys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
case 5:
- if (!writer.writeCollection(nearKeys,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(nearKeys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
@@ -260,7 +260,7 @@ public class GridCacheTtlUpdateRequest extends
GridCacheIdMessage {
switch (reader.state()) {
case 4:
- keys = reader.readCollection(MessageCollectionItemType.MSG);
+ keys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
@@ -268,7 +268,7 @@ public class GridCacheTtlUpdateRequest extends
GridCacheIdMessage {
reader.incrementState();
case 5:
- nearKeys =
reader.readCollection(MessageCollectionItemType.MSG);
+ nearKeys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index dee04c6f2ef..aea28b76a93 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -394,7 +394,7 @@ public class GridDistributedLockRequest extends
GridDistributedBaseMessage {
writer.incrementState();
case 14:
- if (!writer.writeCollection(keys,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(keys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
@@ -501,7 +501,7 @@ public class GridDistributedLockRequest extends
GridDistributedBaseMessage {
reader.incrementState();
case 14:
- keys = reader.readCollection(MessageCollectionItemType.MSG);
+ keys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index b502d1f1bde..e37db1622df 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -230,7 +230,7 @@ public class GridDistributedLockResponse extends
GridDistributedBaseMessage {
writer.incrementState();
case 10:
- if (!writer.writeCollection(vals,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(vals,
MessageCollectionItemType.CACHE_OBJECT))
return false;
writer.incrementState();
@@ -265,7 +265,7 @@ public class GridDistributedLockResponse extends
GridDistributedBaseMessage {
reader.incrementState();
case 10:
- vals = reader.readCollection(MessageCollectionItemType.MSG);
+ vals =
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
index 324c569ac14..1da28eb179e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
@@ -116,7 +116,7 @@ public class GridDistributedUnlockRequest extends
GridDistributedBaseMessage {
switch (writer.state()) {
case 8:
- if (!writer.writeCollection(keys,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(keys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
@@ -135,7 +135,7 @@ public class GridDistributedUnlockRequest extends
GridDistributedBaseMessage {
switch (reader.state()) {
case 8:
- keys = reader.readCollection(MessageCollectionItemType.MSG);
+ keys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 0a545287ee9..a08b939b4fa 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1216,7 +1216,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends
GridDistributedCacheAdap
else if (req.needVersion())
res0 = new CacheVersionedValue(info.value(),
info.version());
else
- res0 = info.value();
+ res0 = new CacheVersionedValue(info.value(),
null);
}
res = new GridNearSingleGetResponse(
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index bb8aabc2ba2..fd46c5a0017 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -337,7 +337,7 @@ public class GridDhtLockRequest extends
GridDistributedLockRequest {
writer.incrementState();
case 24:
- if (!writer.writeObjectArray(ownedKeys,
MessageCollectionItemType.MSG))
+ if (!writer.writeObjectArray(ownedKeys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
@@ -410,7 +410,7 @@ public class GridDhtLockRequest extends
GridDistributedLockRequest {
reader.incrementState();
case 24:
- ownedKeys =
reader.readObjectArray(MessageCollectionItemType.MSG, KeyCacheObject.class);
+ ownedKeys =
reader.readObjectArray(MessageCollectionItemType.KEY_CACHE_OBJECT,
KeyCacheObject.class);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 12b5e371a4f..f237ea4005d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -629,7 +629,7 @@ public class GridPartitionedSingleGetFuture extends
GridCacheFutureAdapter<Objec
setResult(verVal.value(), verVal.version());
}
else
- setResult((CacheObject)res0, null);
+ setResult(res0 == null ? null :
((CacheVersionedValue)res0).value(), null);
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index acecac9f605..33fd4d522bc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -370,13 +370,13 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
switch (writer.state()) {
case 12:
- if (!writer.writeMessage(key))
+ if (!writer.writeKeyCacheObject(key))
return false;
writer.incrementState();
case 13:
- if (!writer.writeMessage(prevVal))
+ if (!writer.writeCacheObject(prevVal))
return false;
writer.incrementState();
@@ -388,7 +388,7 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
writer.incrementState();
case 15:
- if (!writer.writeMessage(val))
+ if (!writer.writeCacheObject(val))
return false;
writer.incrementState();
@@ -407,7 +407,7 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
switch (reader.state()) {
case 12:
- key = reader.readMessage();
+ key = reader.readKeyCacheObject();
if (!reader.isLastRead())
return false;
@@ -415,7 +415,7 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
reader.incrementState();
case 13:
- prevVal = reader.readMessage();
+ prevVal = reader.readCacheObject();
if (!reader.isLastRead())
return false;
@@ -431,7 +431,7 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
reader.incrementState();
case 15:
- val = reader.readMessage();
+ val = reader.readCacheObject();
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 219ee3acaaf..9c56300a83b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -585,7 +585,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
writer.incrementState();
case 17:
- if (!writer.writeCollection(keys,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(keys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
@@ -603,7 +603,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
writer.incrementState();
case 20:
- if (!writer.writeCollection(nearKeys,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(nearKeys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
@@ -615,7 +615,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
writer.incrementState();
case 22:
- if (!writer.writeCollection(nearVals,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(nearVals,
MessageCollectionItemType.CACHE_OBJECT))
return false;
writer.incrementState();
@@ -627,7 +627,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
writer.incrementState();
case 24:
- if (!writer.writeCollection(prevVals,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(prevVals,
MessageCollectionItemType.CACHE_OBJECT))
return false;
writer.incrementState();
@@ -645,7 +645,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
writer.incrementState();
case 27:
- if (!writer.writeCollection(vals,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(vals,
MessageCollectionItemType.CACHE_OBJECT))
return false;
writer.incrementState();
@@ -704,7 +704,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
case 17:
- keys = reader.readCollection(MessageCollectionItemType.MSG);
+ keys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
@@ -728,7 +728,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
case 20:
- nearKeys =
reader.readCollection(MessageCollectionItemType.MSG);
+ nearKeys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
@@ -744,7 +744,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
case 22:
- nearVals =
reader.readCollection(MessageCollectionItemType.MSG);
+ nearVals =
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
if (!reader.isLastRead())
return false;
@@ -760,7 +760,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
case 24:
- prevVals =
reader.readCollection(MessageCollectionItemType.MSG);
+ prevVals =
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
if (!reader.isLastRead())
return false;
@@ -784,7 +784,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
case 27:
- vals = reader.readCollection(MessageCollectionItemType.MSG);
+ vals =
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 5a17d498f93..eacb7872beb 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -188,7 +188,7 @@ public class GridDhtAtomicUpdateResponse extends
GridCacheIdMessage implements G
writer.incrementState();
case 6:
- if (!writer.writeCollection(nearEvicted,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(nearEvicted,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
@@ -229,7 +229,7 @@ public class GridDhtAtomicUpdateResponse extends
GridCacheIdMessage implements G
reader.incrementState();
case 6:
- nearEvicted =
reader.readCollection(MessageCollectionItemType.MSG);
+ nearEvicted =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 46fcd96ab89..fdb25125b2c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -469,13 +469,13 @@ public class GridNearAtomicFullUpdateRequest extends
GridNearAtomicAbstractUpdat
writer.incrementState();
case 17:
- if (!writer.writeCollection(keys,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(keys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
case 18:
- if (!writer.writeCollection(vals,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(vals,
MessageCollectionItemType.CACHE_OBJECT))
return false;
writer.incrementState();
@@ -550,7 +550,7 @@ public class GridNearAtomicFullUpdateRequest extends
GridNearAtomicAbstractUpdat
reader.incrementState();
case 17:
- keys = reader.readCollection(MessageCollectionItemType.MSG);
+ keys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
@@ -558,7 +558,7 @@ public class GridNearAtomicFullUpdateRequest extends
GridNearAtomicAbstractUpdat
reader.incrementState();
case 18:
- vals = reader.readCollection(MessageCollectionItemType.MSG);
+ vals =
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 606c59bc1c2..fb15a113c5d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -241,13 +241,13 @@ public class GridNearAtomicSingleUpdateRequest extends
GridNearAtomicAbstractSin
switch (writer.state()) {
case 10:
- if (!writer.writeMessage(key))
+ if (!writer.writeKeyCacheObject(key))
return false;
writer.incrementState();
case 11:
- if (!writer.writeMessage(val))
+ if (!writer.writeCacheObject(val))
return false;
writer.incrementState();
@@ -266,7 +266,7 @@ public class GridNearAtomicSingleUpdateRequest extends
GridNearAtomicAbstractSin
switch (reader.state()) {
case 10:
- key = reader.readMessage();
+ key = reader.readKeyCacheObject();
if (!reader.isLastRead())
return false;
@@ -274,7 +274,7 @@ public class GridNearAtomicSingleUpdateRequest extends
GridNearAtomicAbstractSin
reader.incrementState();
case 11:
- val = reader.readMessage();
+ val = reader.readCacheObject();
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
index 20d7e13a851..c3e228b7ad5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
@@ -158,7 +158,7 @@ public class UpdateErrors implements Message {
writer.incrementState();
case 1:
- if (!writer.writeCollection(failedKeys,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(failedKeys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
@@ -182,7 +182,7 @@ public class UpdateErrors implements Message {
reader.incrementState();
case 1:
- failedKeys =
reader.readCollection(MessageCollectionItemType.MSG);
+ failedKeys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
index e62bbfa2eb9..75fafb77df8 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
@@ -169,7 +169,7 @@ public class GridDhtForceKeysRequest extends
GridCacheIdMessage implements GridC
writer.incrementState();
case 5:
- if (!writer.writeCollection(keys,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(keys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
@@ -208,7 +208,7 @@ public class GridDhtForceKeysRequest extends
GridCacheIdMessage implements GridC
reader.incrementState();
case 5:
- keys = reader.readCollection(MessageCollectionItemType.MSG);
+ keys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index 71de9f3a307..1611b58e1db 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -234,7 +234,7 @@ public class GridDhtForceKeysResponse extends
GridCacheIdMessage implements Grid
writer.incrementState();
case 8:
- if (!writer.writeCollection(missedKeys,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(missedKeys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
@@ -285,7 +285,7 @@ public class GridDhtForceKeysResponse extends
GridCacheIdMessage implements Grid
reader.incrementState();
case 8:
- missedKeys =
reader.readCollection(MessageCollectionItemType.MSG);
+ missedKeys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 7a17010e6e4..863d0aac5d5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -369,7 +369,7 @@ public class GridNearGetRequest extends GridCacheIdMessage
implements GridCacheD
writer.incrementState();
case 8:
- if (!writer.writeCollection(keys,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(keys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
@@ -456,7 +456,7 @@ public class GridNearGetRequest extends GridCacheIdMessage
implements GridCacheD
reader.incrementState();
case 8:
- keys = reader.readCollection(MessageCollectionItemType.MSG);
+ keys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index ce7413ff835..f3c2bf02281 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -309,7 +309,7 @@ public class GridNearSingleGetRequest extends
GridCacheIdMessage implements Grid
reader.incrementState();
case 8:
- key = reader.readMessage();
+ key = reader.readKeyCacheObject();
if (!reader.isLastRead())
return false;
@@ -385,7 +385,7 @@ public class GridNearSingleGetRequest extends
GridCacheIdMessage implements Grid
writer.incrementState();
case 8:
- if (!writer.writeMessage(key))
+ if (!writer.writeKeyCacheObject(key))
return false;
writer.incrementState();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 4a94a4dace1..2bb1840db43 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -819,7 +819,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
writer.incrementState();
case 26:
- if (!writer.writeCollection(skipKeys,
MessageCollectionItemType.MSG))
+ if (!writer.writeCollection(skipKeys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
@@ -1017,7 +1017,7 @@ public class GridCacheQueryRequest extends
GridCacheIdMessage implements GridCac
reader.incrementState();
case 26:
- skipKeys =
reader.readCollection(MessageCollectionItemType.MSG);
+ skipKeys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 40b64f74bc3..5f72a0e6a6f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -417,19 +417,19 @@ public class CacheContinuousQueryEntry implements
GridCacheDeployable, Message {
writer.incrementState();
case 4:
- if (!writer.writeMessage(isFiltered() ? null : key))
+ if (!writer.writeKeyCacheObject(isFiltered() ? null : key))
return false;
writer.incrementState();
case 5:
- if (!writer.writeMessage(isFiltered() ? null : newVal))
+ if (!writer.writeCacheObject(isFiltered() ? null : newVal))
return false;
writer.incrementState();
case 6:
- if (!writer.writeMessage(isFiltered() ? null : oldVal))
+ if (!writer.writeCacheObject(isFiltered() ? null : oldVal))
return false;
writer.incrementState();
@@ -495,7 +495,7 @@ public class CacheContinuousQueryEntry implements
GridCacheDeployable, Message {
reader.incrementState();
case 4:
- key = reader.readMessage();
+ key = reader.readKeyCacheObject();
if (!reader.isLastRead())
return false;
@@ -503,7 +503,7 @@ public class CacheContinuousQueryEntry implements
GridCacheDeployable, Message {
reader.incrementState();
case 5:
- newVal = reader.readMessage();
+ newVal = reader.readCacheObject();
if (!reader.isLastRead())
return false;
@@ -511,7 +511,7 @@ public class CacheContinuousQueryEntry implements
GridCacheDeployable, Message {
reader.incrementState();
case 6:
- oldVal = reader.readMessage();
+ oldVal = reader.readCacheObject();
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 6a72111c1e4..ca0a664c697 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -1145,7 +1145,7 @@ public class IgniteTxEntry implements
GridPeerDeployAware, Message {
writer.incrementState();
case 7:
- if (!writer.writeMessage(key))
+ if (!writer.writeKeyCacheObject(key))
return false;
writer.incrementState();
@@ -1247,7 +1247,7 @@ public class IgniteTxEntry implements
GridPeerDeployAware, Message {
reader.incrementState();
case 7:
- key = reader.readMessage();
+ key = reader.readKeyCacheObject();
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java
index 4adf4ac7754..7b846ca9005 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java
@@ -188,7 +188,7 @@ public class TxEntryValueHolder implements Message {
writer.incrementState();
case 2:
- if (!writer.writeMessage(hasWriteVal ? val : null))
+ if (!writer.writeCacheObject(hasWriteVal ? val : null))
return false;
writer.incrementState();
@@ -224,7 +224,7 @@ public class TxEntryValueHolder implements Message {
reader.incrementState();
case 2:
- val = reader.readMessage();
+ val = reader.readCacheObject();
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 6fb70b200bf..eb8311bff43 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -166,20 +166,20 @@ public interface IgniteCacheObjectProcessor extends
GridProcessor {
boolean failIfUnregistered);
/**
- * @param ctx Cache context.
+ * @param ctx Optional cache context. If {@code null} then skip
umarshalling the byte array.
* @param type Object type.
* @param bytes Object bytes.
* @return Cache object.
*/
- public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[]
bytes);
+ public CacheObject toCacheObject(@Nullable CacheObjectContext ctx, byte
type, byte[] bytes);
/**
- * @param ctx Cache context.
+ * @param ctx Optional cache context. If {@code null} then skip
umarshalling the byte array.
* @param type Object type.
* @param bytes Object bytes.
* @return Cache object.
*/
- public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, byte type,
byte[] bytes) throws IgniteCheckedException;
+ public KeyCacheObject toKeyCacheObject(@Nullable CacheObjectContext ctx,
byte type, byte[] bytes) throws IgniteCheckedException;
/**
* @param ctx Cache context.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java
index b09af8b4009..e6654cfb202 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java
@@ -113,13 +113,13 @@ public class DataStreamerEntry implements
Map.Entry<KeyCacheObject, CacheObject>
switch (writer.state()) {
case 0:
- if (!writer.writeMessage(key))
+ if (!writer.writeKeyCacheObject(key))
return false;
writer.incrementState();
case 1:
- if (!writer.writeMessage(val))
+ if (!writer.writeCacheObject(val))
return false;
writer.incrementState();
@@ -135,7 +135,7 @@ public class DataStreamerEntry implements
Map.Entry<KeyCacheObject, CacheObject>
switch (reader.state()) {
case 0:
- key = reader.readMessage();
+ key = reader.readKeyCacheObject();
if (!reader.isLastRead())
return false;
@@ -143,7 +143,7 @@ public class DataStreamerEntry implements
Map.Entry<KeyCacheObject, CacheObject>
reader.incrementState();
case 1:
- val = reader.readMessage();
+ val = reader.readCacheObject();
if (!reader.isLastRead())
return false;
diff --git
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
index bbb79143b5d..90e7c81d18a 100644
---
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
+++
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
@@ -87,7 +87,13 @@ public enum MessageCollectionItemType {
MSG,
/** Topology version. */
- AFFINITY_TOPOLOGY_VERSION;
+ AFFINITY_TOPOLOGY_VERSION,
+
+ /** Key cache object. */
+ KEY_CACHE_OBJECT,
+
+ /** Cache object. */
+ CACHE_OBJECT;
/** Enum values. */
private static final MessageCollectionItemType[] VALS = values();
diff --git
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
index 6d0ac9d191c..b0f55d1a851 100644
---
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
+++
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
@@ -24,6 +24,8 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.lang.IgniteUuid;
/**
@@ -194,6 +196,20 @@ public interface MessageReader {
*/
public <T extends Message> T readMessage();
+ /**
+ * Reads {@link CacheObject}.
+ *
+ * @return Cache object.
+ */
+ public CacheObject readCacheObject();
+
+ /**
+ * Reads {@link KeyCacheObject}.
+ *
+ * @return Key cache object.
+ */
+ public KeyCacheObject readKeyCacheObject();
+
/**
* Reads array of objects.
*
diff --git
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
index 5a13c41bff7..356766bdfc0 100644
---
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
+++
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
@@ -23,6 +23,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.lang.IgniteUuid;
/**
@@ -241,6 +243,22 @@ public interface MessageWriter {
*/
public boolean writeMessage(Message val);
+ /**
+ * Writes {@link CacheObject}.
+ *
+ * @param obj Cache object.
+ * @return Whether value was fully written.
+ */
+ public boolean writeCacheObject(CacheObject obj);
+
+ /**
+ * Writes {@link KeyCacheObject}.
+ *
+ * @param obj Key cache object.
+ * @return Whether value was fully written.
+ */
+ public boolean writeKeyCacheObject(KeyCacheObject obj);
+
/**
* Writes array of objects.
*
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
index a0ee15ca451..5d914886a46 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
@@ -28,8 +28,6 @@ import
org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import
org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl;
import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
@@ -295,25 +293,5 @@ public class GridAffinityNoCacheSelfTest extends
GridCommonAbstractTest {
@Override public void prepareMarshal(CacheObjectValueContext ctx)
throws IgniteCheckedException {
throw new UnsupportedOperationException();
}
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer)
{
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader
reader) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- throw new UnsupportedOperationException();
- }
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
index 1dfb759b593..3c07575f8b4 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
@@ -77,7 +77,7 @@ public class DirectMarshallingMessagesTest extends
GridCommonAbstractTest {
T resMsg = (T)msgFactory.create(type);
boolean fullyRead = loopBuffer(buf, buf.position(),
- buf0 -> resMsg.readFrom(buf0, new
DirectMessageReader(msgFactory)));
+ buf0 -> resMsg.readFrom(buf0, new DirectMessageReader(msgFactory,
null)));
assertTrue("The message was not read completely.", fullyRead);
return resMsg;
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java
index 44f44be77f7..cb6c40576fe 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java
@@ -101,7 +101,7 @@ public class DirectByteBufferStreamImplByteOrderSelfTest {
@Override public MessageSerializer serializer(short type) {
return null;
}
- });
+ }, null);
stream.setBuffer(buff);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractCommunicationMessageSerializationTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractCommunicationMessageSerializationTest.java
index 86f577dccf2..6665d64a507 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractCommunicationMessageSerializationTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractCommunicationMessageSerializationTest.java
@@ -24,6 +24,8 @@ import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -272,6 +274,16 @@ public abstract class
AbstractCommunicationMessageSerializationTest {
return writeField(AffinityTopologyVersion.class);
}
+ /** {@inheritDoc} */
+ @Override public boolean writeCacheObject(CacheObject obj) {
+ return writeField(CacheObject.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeKeyCacheObject(KeyCacheObject obj) {
+ return writeField(KeyCacheObject.class);
+ }
+
/** {@inheritDoc} */
@Override public boolean writeMessage(Message val) {
return writeField(Message.class);
@@ -511,6 +523,20 @@ public abstract class
AbstractCommunicationMessageSerializationTest {
return null;
}
+ /** {@inheritDoc} */
+ @Override public CacheObject readCacheObject() {
+ readField(CacheObject.class);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public KeyCacheObject readKeyCacheObject() {
+ readField(KeyCacheObject.class);
+
+ return null;
+ }
+
/** {@inheritDoc} */
@Override public <T> T[] readObjectArray(MessageCollectionItemType
itemType, Class<T> itemCls) {
readField(Object[].class);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
index d26642f7423..ca704daa33b 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
@@ -20,8 +20,6 @@ package org.apache.ignite.internal.processors.cache;
import java.nio.ByteBuffer;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
@@ -160,25 +158,5 @@ public class IgniteIncompleteCacheObjectSelfTest extends
GridCommonAbstractTest
@Override public void prepareMarshal(final CacheObjectValueContext
ctx) throws IgniteCheckedException {
// No-op
}
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(final ByteBuffer buf, final
MessageWriter writer) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(final ByteBuffer buf, final
MessageReader reader) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- // No-op
- }
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index d8d9a19a444..777bea67a89 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -158,7 +158,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest
extends GridCommonAbst
e0.writeTo(buf, writer);
CacheContinuousQueryEntry e1 = new CacheContinuousQueryEntry();
- e1.readFrom(ByteBuffer.wrap(buf.array()), new
DirectMessageReader(msgFactory));
+ e1.readFrom(ByteBuffer.wrap(buf.array()), new
DirectMessageReader(msgFactory, null));
assertEquals(e0.cacheId(), e1.cacheId());
assertEquals(e0.eventType(), e1.eventType());
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
index 9090918f9be..d714f666aac 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
@@ -53,8 +53,6 @@ import
org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVers
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.metric.GridMetricManager;
import
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridTestKernalContext;
@@ -736,31 +734,5 @@ public class CacheFreeListSelfTest extends
GridCommonAbstractTest {
@Override public void prepareMarshal(CacheObjectValueContext ctx)
throws IgniteCheckedException {
assert false;
}
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer)
{
- assert false;
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader
reader) {
- assert false;
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- assert false;
-
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public void onAckReceived() {
- assert false;
- }
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 253abf97fcf..56852ea365a 100644
---
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -548,7 +548,7 @@ public class GridSpiTestContext implements IgniteSpiContext
{
}
@Override public MessageReader reader(UUID rmtNodeId,
MessageFactory msgFactory) {
- return new DirectMessageReader(msgFactory);
+ return new DirectMessageReader(msgFactory, null);
}
};
}
diff --git a/modules/core/src/test/resources/codegen/TestMessage.java
b/modules/core/src/test/resources/codegen/TestMessage.java
index 78b99dbede3..4e19fe0809c 100644
--- a/modules/core/src/test/resources/codegen/TestMessage.java
+++ b/modules/core/src/test/resources/codegen/TestMessage.java
@@ -21,6 +21,8 @@ import java.lang.String;
import java.util.UUID;
import java.util.BitSet;
import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.lang.IgniteUuid;
@@ -65,6 +67,12 @@ public class TestMessage implements Message {
@Order(value = 11, method = "overridenFieldMethod")
private String field;
+ @Order(value = 12)
+ private KeyCacheObject keyCacheObject;
+
+ @Order(value = 13)
+ private CacheObject cacheObject;
+
public int id() {
return id;
}
@@ -161,6 +169,22 @@ public class TestMessage implements Message {
this.field = field;
}
+ public KeyCacheObject keyCacheObject() {
+ return keyCacheObject;
+ }
+
+ public void keyCacheObject(KeyCacheObject keyCacheObject) {
+ this.keyCacheObject = keyCacheObject;
+ }
+
+ public CacheObject cacheObject() {
+ return cacheObject;
+ }
+
+ public void cacheObject(CacheObject cacheObject) {
+ this.cacheObject = cacheObject;
+ }
+
public short directType() {
return 0;
}
diff --git a/modules/core/src/test/resources/codegen/TestMessageSerializer.java
b/modules/core/src/test/resources/codegen/TestMessageSerializer.java
index 3d5a0b7e1f5..8ca1f2be5d6 100644
--- a/modules/core/src/test/resources/codegen/TestMessageSerializer.java
+++ b/modules/core/src/test/resources/codegen/TestMessageSerializer.java
@@ -118,6 +118,18 @@ public class TestMessageSerializer implements
MessageSerializer {
writer.incrementState();
+ case 12:
+ if (!writer.writeKeyCacheObject(msg.keyCacheObject()))
+ return false;
+
+ writer.incrementState();
+
+ case 13:
+ if (!writer.writeCacheObject(msg.cacheObject()))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -226,6 +238,21 @@ public class TestMessageSerializer implements
MessageSerializer {
reader.incrementState();
+ case 12:
+ msg.keyCacheObject(reader.readKeyCacheObject());
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 13:
+ msg.cacheObject(reader.readCacheObject());
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
}
return true;
diff --git
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java
index 429baa914f0..c0d4634556e 100644
---
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java
+++
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java
@@ -69,7 +69,7 @@ public class GridH2CacheObject extends GridH2ValueMessage {
switch (reader.state()) {
case 0:
- obj = reader.readMessage();
+ obj = reader.readCacheObject();
if (!reader.isLastRead())
return false;
@@ -97,7 +97,7 @@ public class GridH2CacheObject extends GridH2ValueMessage {
switch (writer.state()) {
case 0:
- if (!writer.writeMessage(obj))
+ if (!writer.writeCacheObject(obj))
return false;
writer.incrementState();