This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d96b6cfaae IGNITE-26280 Remove Message interface from CacheObject 
(#12297)
9d96b6cfaae is described below

commit 9d96b6cfaae59b206b41892175a14e161747a979
Author: Maksim Timonin <[email protected]>
AuthorDate: Fri Sep 5 15:17:15 2025 +0500

    IGNITE-26280 Remove Message interface from CacheObject (#12297)
---
 .../query/calcite/message/QueryTxEntry.java        |   8 +-
 .../internal/MessageSerializerGenerator.java       |  18 ++
 .../internal/binary/BinaryEnumObjectImpl.java      |  81 ---------
 .../ignite/internal/binary/BinaryObjectImpl.java   |  65 -------
 .../internal/binary/BinaryObjectOffheapImpl.java   |  22 ---
 .../apache/ignite/internal/binary/BinaryUtils.java |  11 --
 .../internal/direct/DirectMessageReader.java       |  35 +++-
 .../internal/direct/DirectMessageWriter.java       |  20 +++
 .../direct/stream/DirectByteBufferStream.java      | 196 +++++++++++++++++++++
 .../managers/communication/GridIoManager.java      |   2 +-
 .../communication/GridIoMessageFactory.java        |   9 -
 .../cache/CacheEntryPredicateContainsValue.java    |   4 +-
 .../processors/cache/CacheInvokeDirectResult.java  |   8 +-
 .../internal/processors/cache/CacheObject.java     |   3 +-
 .../processors/cache/CacheObjectAdapter.java       |  43 -----
 .../processors/cache/CacheObjectByteArrayImpl.java |  53 ------
 .../internal/processors/cache/CacheObjectImpl.java |  10 --
 .../internal/processors/cache/GridCacheReturn.java |   4 +-
 .../processors/cache/KeyCacheObjectImpl.java       |  62 -------
 .../binary/CacheObjectBinaryProcessorImpl.java     |  24 ++-
 .../distributed/GridCacheTtlUpdateRequest.java     |   8 +-
 .../distributed/GridDistributedLockRequest.java    |   4 +-
 .../distributed/GridDistributedLockResponse.java   |   4 +-
 .../distributed/GridDistributedUnlockRequest.java  |   4 +-
 .../cache/distributed/dht/GridDhtCacheAdapter.java |   2 +-
 .../cache/distributed/dht/GridDhtLockRequest.java  |   4 +-
 .../dht/GridPartitionedSingleGetFuture.java        |   2 +-
 .../atomic/GridDhtAtomicSingleUpdateRequest.java   |  12 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java     |  20 +--
 .../dht/atomic/GridDhtAtomicUpdateResponse.java    |   4 +-
 .../atomic/GridNearAtomicFullUpdateRequest.java    |   8 +-
 .../atomic/GridNearAtomicSingleUpdateRequest.java  |   8 +-
 .../cache/distributed/dht/atomic/UpdateErrors.java |   4 +-
 .../dht/preloader/GridDhtForceKeysRequest.java     |   4 +-
 .../dht/preloader/GridDhtForceKeysResponse.java    |   4 +-
 .../cache/distributed/near/GridNearGetRequest.java |   4 +-
 .../distributed/near/GridNearSingleGetRequest.java |   4 +-
 .../cache/query/GridCacheQueryRequest.java         |   4 +-
 .../continuous/CacheContinuousQueryEntry.java      |  12 +-
 .../cache/transactions/IgniteTxEntry.java          |   4 +-
 .../cache/transactions/TxEntryValueHolder.java     |   4 +-
 .../cacheobject/IgniteCacheObjectProcessor.java    |   8 +-
 .../processors/datastreamer/DataStreamerEntry.java |   8 +-
 .../communication/MessageCollectionItemType.java   |   8 +-
 .../extensions/communication/MessageReader.java    |  16 ++
 .../extensions/communication/MessageWriter.java    |  18 ++
 .../internal/GridAffinityNoCacheSelfTest.java      |  22 ---
 .../direct/DirectMarshallingMessagesTest.java      |   2 +-
 ...irectByteBufferStreamImplByteOrderSelfTest.java |   2 +-
 ...tractCommunicationMessageSerializationTest.java |  26 +++
 .../cache/IgniteIncompleteCacheObjectSelfTest.java |  22 ---
 ...niteCacheContinuousQueryImmutableEntryTest.java |   2 +-
 .../processors/database/CacheFreeListSelfTest.java |  28 ---
 .../ignite/testframework/GridSpiTestContext.java   |   2 +-
 .../src/test/resources/codegen/TestMessage.java    |  24 +++
 .../resources/codegen/TestMessageSerializer.java   |  27 +++
 .../query/h2/twostep/msg/GridH2CacheObject.java    |   4 +-
 57 files changed, 493 insertions(+), 528 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
index 2bbc238c6ac..87b01d0ba04 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
@@ -137,13 +137,13 @@ public class QueryTxEntry implements CalciteMessage {
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeMessage(key))
+                if (!writer.writeKeyCacheObject(key))
                     return false;
 
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeMessage(val))
+                if (!writer.writeCacheObject(val))
                     return false;
 
                 writer.incrementState();
@@ -173,7 +173,7 @@ public class QueryTxEntry implements CalciteMessage {
                 reader.incrementState();
 
             case 1:
-                key = reader.readMessage();
+                key = reader.readKeyCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
@@ -181,7 +181,7 @@ public class QueryTxEntry implements CalciteMessage {
                 reader.incrementState();
 
             case 2:
-                val = reader.readMessage();
+                val = reader.readCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
 
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
index 1e26d38c451..73dff879e1a 100644
--- 
a/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
+++ 
b/modules/codegen2/src/main/java/org/apache/ignite/internal/MessageSerializerGenerator.java
@@ -364,6 +364,12 @@ class MessageSerializerGenerator {
                     "MessageCollectionItemType." + 
messageCollectionItemType(typeArgs.get(1)));
             }
 
+            else if (assignableFrom(type, 
type("org.apache.ignite.internal.processors.cache.KeyCacheObject")))
+                returnFalseIfWriteFailed(write, "writer.writeKeyCacheObject", 
getExpr);
+
+            else if (assignableFrom(type, 
type("org.apache.ignite.internal.processors.cache.CacheObject")))
+                returnFalseIfWriteFailed(write, "writer.writeCacheObject", 
getExpr);
+
             else if (assignableFrom(type, type(MESSAGE_INTERFACE)))
                 returnFalseIfWriteFailed(write, "writer.writeMessage", 
getExpr);
 
@@ -487,6 +493,12 @@ class MessageSerializerGenerator {
                     "MessageCollectionItemType." + 
messageCollectionItemType(typeArgs.get(1)), "false");
             }
 
+            else if (assignableFrom(type, 
type("org.apache.ignite.internal.processors.cache.KeyCacheObject")))
+                returnFalseIfReadFailed(name, "reader.readKeyCacheObject");
+
+            else if (assignableFrom(type, 
type("org.apache.ignite.internal.processors.cache.CacheObject")))
+                returnFalseIfReadFailed(name, "reader.readCacheObject");
+
             else if (assignableFrom(type, type(MESSAGE_INTERFACE)))
                 returnFalseIfReadFailed(name, "reader.readMessage");
 
@@ -547,6 +559,12 @@ class MessageSerializerGenerator {
             if (sameType(type, 
"org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion"))
                 return "AFFINITY_TOPOLOGY_VERSION";
 
+            if (sameType(type, 
"org.apache.ignite.internal.processors.cache.KeyCacheObject"))
+                return "KEY_CACHE_OBJECT";
+
+            if (sameType(type, 
"org.apache.ignite.internal.processors.cache.CacheObject"))
+                return "CACHE_OBJECT";
+
             PrimitiveType primitiveType = unboxedType(type);
 
             if (primitiveType != null)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
index 032287b5f9b..0101ef34fd6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryEnumObjectImpl.java
@@ -37,8 +37,6 @@ import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.marshaller.Marshallers;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -386,85 +384,6 @@ class BinaryEnumObjectImpl implements BinaryObjectEx, 
Externalizable, CacheObjec
         // No-op.
     }
 
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 119;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeString(clsName))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeInt(ord))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeInt(typeId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                clsName = reader.readString();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                ord = reader.readInt();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                typeId = reader.readInt();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
     /** {@inheritDoc} */
     @Override public int size() {
         if (valBytes == null) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index 6e22af7f01d..6e4bd14078a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -45,8 +45,6 @@ import 
org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProce
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.marshaller.Marshallers;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -775,11 +773,6 @@ final class BinaryObjectImpl extends BinaryObjectExImpl 
implements Externalizabl
         return ((BinaryReaderExImpl)reader(null, false)).getOrCreateSchema();
     }
 
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         if (detachAllowed) {
@@ -807,64 +800,6 @@ final class BinaryObjectImpl extends BinaryObjectExImpl 
implements Externalizabl
         start = in.readInt();
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByteArray(valBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeInt(part))
-                    return false;
-
-                writer.incrementState();
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                valBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                part = reader.readInt();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 113;
-    }
-
     /**
      * Runs value deserialization regardless of whether obj already has the 
deserialized value.
      * Will set obj if descriptor is configured to keep deserialized values.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
index 2945626641e..57e3d9c8eba 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
@@ -40,8 +40,6 @@ import 
org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.marshaller.Marshallers;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -481,21 +479,6 @@ class BinaryObjectOffheapImpl extends BinaryObjectExImpl 
implements Externalizab
         throw new UnsupportedOperationException();
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        throw new UnsupportedOperationException();
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        throw new UnsupportedOperationException();
-    }
-
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         throw new UnsupportedOperationException(); // To make sure it is not 
marshalled.
@@ -506,11 +489,6 @@ class BinaryObjectOffheapImpl extends BinaryObjectExImpl 
implements Externalizab
         throw new UnsupportedOperationException(); // To make sure it is not 
marshalled.
     }
 
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
     /** {@inheritDoc} */
     @Override public int size() {
         return length();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index f82dbdc243a..643442d6aa8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -55,9 +55,7 @@ import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.function.BiConsumer;
 import java.util.function.Function;
-import java.util.function.Supplier;
 import java.util.function.ToIntFunction;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteCheckedException;
@@ -83,7 +81,6 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.marshaller.Marshallers;
 import org.apache.ignite.platform.PlatformType;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -1684,14 +1681,6 @@ public class BinaryUtils {
         return new BinaryEnumObjectImpl(ctx, arr);
     }
 
-    /**
-     * @param register Register method.
-     */
-    public static void registerMessages(BiConsumer<Short, Supplier<Message>> 
register) {
-        register.accept((short)113, BinaryObjectImpl::new);
-        register.accept((short)119, BinaryEnumObjectImpl::new);
-    }
-
     /** */
     public static BinaryObjectEx binaryObject(BinaryContext ctx, byte[] arr, 
int start) {
         return new BinaryObjectImpl(ctx, arr, start);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
index 999ec90d19d..77d8a10f8d1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageReader.java
@@ -26,6 +26,9 @@ import 
org.apache.ignite.internal.direct.state.DirectMessageState;
 import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
 import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteOutClosure;
@@ -49,11 +52,12 @@ public class DirectMessageReader implements MessageReader {
 
     /**
      * @param msgFactory Message factory.
+     * @param cacheObjProc Cache object processor.
      */
-    public DirectMessageReader(final MessageFactory msgFactory) {
+    public DirectMessageReader(final MessageFactory msgFactory, 
IgniteCacheObjectProcessor cacheObjProc) {
         state = new DirectMessageState<>(StateItem.class, new 
IgniteOutClosure<StateItem>() {
             @Override public StateItem apply() {
-                return new StateItem(msgFactory);
+                return new StateItem(msgFactory, cacheObjProc);
             }
         });
     }
@@ -305,6 +309,28 @@ public class DirectMessageReader implements MessageReader {
         return msg;
     }
 
+    /** {@inheritDoc} */
+    @Override public CacheObject readCacheObject() {
+        DirectByteBufferStream stream = state.item().stream;
+
+        CacheObject val = stream.readCacheObject();
+
+        lastRead = stream.lastFinished();
+
+        return val;
+    }
+
+    /** {@inheritDoc} */
+    @Override public KeyCacheObject readKeyCacheObject() {
+        DirectByteBufferStream stream = state.item().stream;
+
+        KeyCacheObject key = stream.readKeyCacheObject();
+
+        lastRead = stream.lastFinished();
+
+        return key;
+    }
+
     /** {@inheritDoc} */
     @Override public <T> T[] readObjectArray(MessageCollectionItemType 
itemType, Class<T> itemCls) {
         DirectByteBufferStream stream = state.item().stream;
@@ -385,9 +411,10 @@ public class DirectMessageReader implements MessageReader {
 
         /**
          * @param msgFactory Message factory.
+         * @param cacheObjProc Cache object processor.
          */
-        public StateItem(MessageFactory msgFactory) {
-            stream = new DirectByteBufferStream(msgFactory);
+        public StateItem(MessageFactory msgFactory, IgniteCacheObjectProcessor 
cacheObjProc) {
+            stream = new DirectByteBufferStream(msgFactory, cacheObjProc);
         }
 
         /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index bfc76e0ddcc..0b68dbd3fd5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -26,6 +26,8 @@ import 
org.apache.ignite.internal.direct.state.DirectMessageState;
 import org.apache.ignite.internal.direct.state.DirectMessageStateItem;
 import org.apache.ignite.internal.direct.stream.DirectByteBufferStream;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteOutClosure;
@@ -283,6 +285,24 @@ public class DirectMessageWriter implements MessageWriter {
         return stream.lastFinished();
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean writeCacheObject(@Nullable CacheObject obj) {
+        DirectByteBufferStream stream = state.item().stream;
+
+        stream.writeCacheObject(obj);
+
+        return stream.lastFinished();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeKeyCacheObject(KeyCacheObject obj) {
+        DirectByteBufferStream stream = state.item().stream;
+
+        stream.writeKeyCacheObject(obj);
+
+        return stream.lastFinished();
+    }
+
     /** {@inheritDoc} */
     @Override public <T> boolean writeObjectArray(T[] arr, 
MessageCollectionItemType itemType) {
         DirectByteBufferStream stream = state.item().stream;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
index ae92154d2c1..c6da5fe96da 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStream.java
@@ -27,8 +27,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.RandomAccess;
 import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -212,6 +216,10 @@ public class DirectByteBufferStream {
     @GridToStringExclude
     private final MessageFactory msgFactory;
 
+    /** Is required to instantiate {@link CacheObject} while reading messages. 
*/
+    @GridToStringExclude
+    private final IgniteCacheObjectProcessor cacheObjProc;
+
     /** */
     @GridToStringExclude
     protected ByteBuffer buf;
@@ -313,10 +321,41 @@ public class DirectByteBufferStream {
     private int topVerMinor;
 
     /**
+     * This field represents a phase of reading or writing {@code CacheObject} 
object enabling ser/des mechanism to keep
+     * track of fields that are already read/written.
+     */
+    private byte cacheObjState;
+
+    /** */
+    private byte[] cacheObjArr;
+
+    /** */
+    private int keyCacheObjPart;
+
+    /** */
+    private byte cacheObjType;
+
+    /**
+     * Constructror for stream used for writing messages.
+     *
      * @param msgFactory Message factory.
      */
     public DirectByteBufferStream(MessageFactory msgFactory) {
         this.msgFactory = msgFactory;
+
+        // Is not used while writing messages.
+        cacheObjProc = null;
+    }
+
+    /**
+     * Constructror for stream used for reading messages.
+     *
+     * @param msgFactory Message factory.
+     * @param cacheObjProc Cache object processor.
+     */
+    public DirectByteBufferStream(MessageFactory msgFactory, 
IgniteCacheObjectProcessor cacheObjProc) {
+        this.msgFactory = msgFactory;
+        this.cacheObjProc = cacheObjProc;
     }
 
     /**
@@ -751,6 +790,78 @@ public class DirectByteBufferStream {
             writeInt(-1);
     }
 
+    /**
+     * @param obj Cache object.
+     */
+    public void writeCacheObject(CacheObject obj) {
+        try {
+            if (obj != null) {
+                switch (cacheObjState) {
+                    case 0:
+                        writeByte(obj.cacheObjectType());
+
+                        if (!lastFinished)
+                            return;
+
+                        cacheObjState++;
+
+                    case 1:
+                        writeByteArray(obj.valueBytes(null));
+
+                        if (!lastFinished)
+                            return;
+
+                        cacheObjState = 0;
+                }
+            }
+            else
+                writeByte((byte)-1);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @param keyObj Key cache object.
+     */
+    public void writeKeyCacheObject(KeyCacheObject keyObj) {
+        try {
+            if (keyObj != null) {
+                switch (cacheObjState) {
+                    case 0:
+                        writeByte(keyObj.cacheObjectType());
+
+                        if (!lastFinished)
+                            return;
+
+                        cacheObjState++;
+
+                    case 1:
+                        writeByteArray(keyObj.valueBytes(null));
+
+                        if (!lastFinished)
+                            return;
+
+                        cacheObjState++;
+
+                    case 2:
+                        writeInt(keyObj.partition());
+
+                        if (!lastFinished)
+                            return;
+
+                        cacheObjState = 0;
+                }
+            }
+            else
+                writeByte((byte)-1);
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
     /**
      * @param msg Message.
      * @param writer Writer.
@@ -1313,6 +1424,74 @@ public class DirectByteBufferStream {
         return new AffinityTopologyVersion(topVerMajor, topVerMinor);
     }
 
+    /**
+     * @return Value.
+     */
+    public KeyCacheObject readKeyCacheObject() {
+        switch (cacheObjState) {
+            case 0:
+                cacheObjType = readByte();
+
+                if (!lastFinished || cacheObjType == (byte)-1)
+                    return null;
+
+                cacheObjState++;
+
+            case 1:
+                cacheObjArr = readByteArray();
+
+                if (!lastFinished)
+                    return null;
+
+                cacheObjState++;
+
+            case 2:
+                keyCacheObjPart = readInt();
+
+                if (!lastFinished)
+                    return null;
+
+                cacheObjState = 0;
+        }
+
+        try {
+            KeyCacheObject key = cacheObjProc.toKeyCacheObject(null, 
cacheObjType, cacheObjArr);
+
+            if (keyCacheObjPart != -1)
+                key.partition(keyCacheObjPart);
+
+            return key;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * @return Value.
+     */
+    public CacheObject readCacheObject() {
+        switch (cacheObjState) {
+            case 0:
+                cacheObjType = readByte();
+
+                if (!lastFinished || cacheObjType == (byte)-1)
+                    return null;
+
+                cacheObjState++;
+
+            case 1:
+                cacheObjArr = readByteArray();
+
+                if (!lastFinished)
+                    return null;
+
+                cacheObjState = 0;
+        }
+
+        return cacheObjProc.toCacheObject(null, cacheObjType, cacheObjArr);
+    }
+
     /**
      * @param reader Reader.
      * @return Message.
@@ -1862,6 +2041,17 @@ public class DirectByteBufferStream {
                 writeAffinityTopologyVersion((AffinityTopologyVersion)val);
 
                 break;
+
+            case KEY_CACHE_OBJECT:
+                writeKeyCacheObject((KeyCacheObject)val);
+
+                break;
+
+            case CACHE_OBJECT:
+                writeCacheObject((CacheObject)val);
+
+                break;
+
             case MSG:
                 try {
                     if (val != null)
@@ -1951,6 +2141,12 @@ public class DirectByteBufferStream {
             case AFFINITY_TOPOLOGY_VERSION:
                 return readAffinityTopologyVersion();
 
+            case KEY_CACHE_OBJECT:
+                return readKeyCacheObject();
+
+            case CACHE_OBJECT:
+                return readCacheObject();
+
             case MSG:
                 return readMessage(reader);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 04f156b759e..f2f23c443c3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -440,7 +440,7 @@ public class GridIoManager extends 
GridManagerAdapter<CommunicationSpi<Object>>
                 }
 
                 @Override public MessageReader reader(UUID rmtNodeId, 
MessageFactory msgFactory) {
-                    return new DirectMessageReader(msgFactory);
+                    return new DirectMessageReader(msgFactory, 
ctx.cacheObjects());
                 }
             };
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 43bae86aba7..207190bfd75 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.GridJobSiblingsResponse;
 import org.apache.ignite.internal.GridTaskCancelRequest;
 import org.apache.ignite.internal.GridTaskSessionRequest;
 import org.apache.ignite.internal.IgniteDiagnosticMessage;
-import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.codegen.CacheEvictionEntrySerializer;
 import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
 import 
org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
@@ -80,12 +79,9 @@ import 
org.apache.ignite.internal.processors.cache.CacheEntryPredicateContainsVa
 import 
org.apache.ignite.internal.processors.cache.CacheEntrySerializablePredicate;
 import org.apache.ignite.internal.processors.cache.CacheEvictionEntry;
 import org.apache.ignite.internal.processors.cache.CacheInvokeDirectResult;
-import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
-import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import 
org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
-import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.WalStateAckMessage;
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
 import 
org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
@@ -304,8 +300,6 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)86, GridCacheVersion::new, new 
GridCacheVersionSerializer());
         factory.register((short)87, GridDhtPartitionExchangeId::new, new 
GridDhtPartitionExchangeIdSerializer());
         factory.register((short)88, GridCacheReturn::new);
-        factory.register((short)89, CacheObjectImpl::new);
-        factory.register((short)90, KeyCacheObjectImpl::new);
         factory.register((short)91, GridCacheEntryInfo::new, new 
GridCacheEntryInfoSerializer());
         factory.register((short)92, CacheEntryInfoCollection::new);
         factory.register((short)93, CacheInvokeDirectResult::new);
@@ -320,7 +314,6 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)102, CacheVersionedValue::new, new 
CacheVersionedValueSerializer());
         factory.register((short)103, GridCacheRawVersionedEntry::new);
         factory.register((short)104, GridCacheVersionEx::new, new 
GridCacheVersionExSerializer());
-        factory.register((short)105, CacheObjectByteArrayImpl::new);
         factory.register((short)106, GridQueryCancelRequest::new);
         factory.register((short)107, GridQueryFailResponse::new, new 
GridQueryFailResponseSerializer());
         factory.register((short)108, GridQueryNextPageRequest::new, new 
GridQueryNextPageRequestSerializer());
@@ -332,8 +325,6 @@ public class GridIoMessageFactory implements 
MessageFactoryProvider {
         factory.register((short)116, GridNearSingleGetRequest::new);
         factory.register((short)117, GridNearSingleGetResponse::new);
         factory.register((short)118, CacheContinuousQueryBatchAck::new);
-        // 119 - BinaryEnumObjectImpl
-        BinaryUtils.registerMessages(factory::register);
 
         // [120..123] - DR
         factory.register((short)125, GridNearAtomicSingleUpdateRequest::new);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
index d7b9ff6251c..5a39c212b9b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
@@ -105,7 +105,7 @@ public class CacheEntryPredicateContainsValue extends 
CacheEntryPredicateAdapter
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeMessage(val))
+                if (!writer.writeCacheObject(val))
                     return false;
 
                 writer.incrementState();
@@ -124,7 +124,7 @@ public class CacheEntryPredicateContainsValue extends 
CacheEntryPredicateAdapter
 
         switch (reader.state()) {
             case 0:
-                val = reader.readMessage();
+                val = reader.readCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
index 1a7ab5e1823..d26382b762c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheInvokeDirectResult.java
@@ -206,13 +206,13 @@ public class CacheInvokeDirectResult implements Message, 
Serializable {
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeMessage(key))
+                if (!writer.writeKeyCacheObject(key))
                     return false;
 
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeMessage(res))
+                if (!writer.writeCacheObject(res))
                     return false;
 
                 writer.incrementState();
@@ -236,7 +236,7 @@ public class CacheInvokeDirectResult implements Message, 
Serializable {
                 reader.incrementState();
 
             case 1:
-                key = reader.readMessage();
+                key = reader.readKeyCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
@@ -244,7 +244,7 @@ public class CacheInvokeDirectResult implements Message, 
Serializable {
                 reader.incrementState();
 
             case 2:
-                res = reader.readMessage();
+                res = reader.readCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
index cfb4d5c3431..eefd03e0841 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObject.java
@@ -19,13 +19,12 @@ package org.apache.ignite.internal.processors.cache;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
-public interface CacheObject extends Message {
+public interface CacheObject {
     /** */
     public static final byte TYPE_REGULAR = 1;
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
index ff995a1008a..a06dee96191 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectAdapter.java
@@ -28,8 +28,6 @@ import org.apache.ignite.internal.pagemem.PageUtils;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 /**
  *
@@ -154,47 +152,6 @@ public abstract class CacheObjectAdapter implements 
CacheObject, Externalizable
         return objectPutSize(valBytes.length);
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                valBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByteArray(valBytes))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(S.includeSensitive() ? getClass().getSimpleName() : 
"CacheObject",
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
index c9f0cb0dcc3..7a0fc222d33 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectByteArrayImpl.java
@@ -26,8 +26,6 @@ import java.util.Arrays;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -123,52 +121,6 @@ public class CacheObjectByteArrayImpl implements 
CacheObject, Externalizable {
         // No-op.
     }
 
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByteArray(val))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                val = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
         val = U.readByteArray(in);
@@ -179,11 +131,6 @@ public class CacheObjectByteArrayImpl implements 
CacheObject, Externalizable {
         U.writeByteArray(out, val);
     }
 
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 105;
-    }
-
     /** {@inheritDoc} */
     @Override public String toString() {
         return "CacheObjectByteArrayImpl [arrLen=" + (val != null ? val.length 
: 0) + ']';
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
index cfe2effd4fe..641fc45c589 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectImpl.java
@@ -126,16 +126,6 @@ public class CacheObjectImpl extends CacheObjectAdapter {
             val = valueFromValueBytes(ctx, ldr);
     }
 
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 89;
-    }
-
     /** {@inheritDoc} */
     @Override public int hashCode() {
         assert false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
index fbc4a777db6..b1b7574a2e4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheReturn.java
@@ -415,7 +415,7 @@ public class GridCacheReturn implements Externalizable, 
Message {
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeMessage(cacheObj))
+                if (!writer.writeCacheObject(cacheObj))
                     return false;
 
                 writer.incrementState();
@@ -457,7 +457,7 @@ public class GridCacheReturn implements Externalizable, 
Message {
                 reader.incrementState();
 
             case 1:
-                cacheObj = reader.readMessage();
+                cacheObj = reader.readCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
index d81f26b2b7c..4d3220dd5da 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java
@@ -17,13 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.nio.ByteBuffer;
 import java.util.Objects;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -49,8 +46,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter 
implements KeyCacheOb
      * @param part Partition.
      */
     public KeyCacheObjectImpl(Object val, byte[] valBytes, int part) {
-        assert val != null;
-
         this.val = val;
         this.valBytes = valBytes;
         this.part = part;
@@ -106,11 +101,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter 
implements KeyCacheOb
         return (T)val;
     }
 
-    /** {@inheritDoc} */
-    @Override public void onAckReceived() {
-        // No-op.
-    }
-
     /** {@inheritDoc} */
     @Override public CacheObject prepareForCache(CacheObjectValueContext ctx) {
         return this;
@@ -123,58 +113,6 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter 
implements KeyCacheOb
         return IgniteUtils.hashCode(val);
     }
 
-    /** {@inheritDoc} */
-    @Override public short directType() {
-        return 90;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        if (!super.readFrom(buf, reader))
-            return false;
-
-        switch (reader.state()) {
-            case 1:
-                part = reader.readInt();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!super.writeTo(buf, writer))
-            return false;
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 1:
-                if (!writer.writeInt(part))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
     /** {@inheritDoc} */
     @Override public void prepareMarshal(CacheObjectValueContext ctx) throws 
IgniteCheckedException {
         if (valBytes == null)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index bf3dff665c2..651f9b8a38f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -180,6 +180,9 @@ public class CacheObjectBinaryProcessorImpl extends 
GridProcessorAdapter impleme
     /** Cached affinity key field names. */
     private final ConcurrentHashMap<Integer, T1<BinaryField>> affKeyFields = 
new ConcurrentHashMap<>();
 
+    /** Dummy {@code CacheObjectValueContext} used for mocking. */
+    private CacheObjectValueContext fakeCacheObjCtx;
+
     /*
      * Static initializer
      */
@@ -326,6 +329,8 @@ public class CacheObjectBinaryProcessorImpl extends 
GridProcessorAdapter impleme
 
         if (!ctx.clientNode())
             metadataFileStore.restoreMetadata();
+
+        fakeCacheObjCtx = new CacheObjectContext(ctx, null, null, false, 
false, false, false, false);
     }
 
     /** {@inheritDoc} */
@@ -1293,10 +1298,19 @@ public class CacheObjectBinaryProcessorImpl extends 
GridProcessorAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Override public CacheObject toCacheObject(CacheObjectContext ctx, byte 
type, byte[] bytes) {
+    @Override public CacheObject toCacheObject(@Nullable CacheObjectContext 
ctx, byte type, byte[] bytes) {
         switch (type) {
             case CacheObject.TYPE_BINARY:
-                return (CacheObject)BinaryUtils.binaryObject(binaryContext(), 
bytes, ctx);
+                CacheObjectValueContext coctx = ctx;
+
+                // CacheObjectContext is actually required only if 
transformation is enabled.
+                // In this case CacheObjectContext#kernalContext is used only.
+                if (coctx == null && this.ctx.transformer() != null)
+                    coctx = fakeCacheObjCtx;
+
+                return coctx == null
+                    ? (CacheObject)BinaryUtils.binaryObject(binaryContext(), 
bytes)
+                    : (CacheObject)BinaryUtils.binaryObject(binaryContext(), 
bytes, coctx);
 
             case CacheObject.TYPE_BINARY_ENUM:
                 return (CacheObject)BinaryUtils.binaryEnum(binaryContext(), 
bytes);
@@ -1312,7 +1326,7 @@ public class CacheObjectBinaryProcessorImpl extends 
GridProcessorAdapter impleme
     }
 
     /** {@inheritDoc} */
-    @Override public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, 
byte type, byte[] bytes)
+    @Override public KeyCacheObject toKeyCacheObject(@Nullable 
CacheObjectContext ctx, byte type, byte[] bytes)
         throws IgniteCheckedException {
         switch (type) {
             case CacheObject.TYPE_BINARY:
@@ -1322,7 +1336,9 @@ public class CacheObjectBinaryProcessorImpl extends 
GridProcessorAdapter impleme
                 throw new IllegalArgumentException("Byte arrays cannot be used 
as cache keys.");
 
             case CacheObject.TYPE_REGULAR:
-                return new 
KeyCacheObjectImpl(ctx.kernalContext().cacheObjects().unmarshal(ctx, bytes, 
null), bytes, -1);
+                return ctx == null
+                    ? new KeyCacheObjectImpl(null, bytes, -1)
+                    : new 
KeyCacheObjectImpl(ctx.kernalContext().cacheObjects().unmarshal(ctx, bytes, 
null), bytes, -1);
         }
 
         throw new IllegalArgumentException("Invalid object type: " + type);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
index 72a94d75dd4..c822a5fab2f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTtlUpdateRequest.java
@@ -211,13 +211,13 @@ public class GridCacheTtlUpdateRequest extends 
GridCacheIdMessage {
 
         switch (writer.state()) {
             case 4:
-                if (!writer.writeCollection(keys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(keys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeCollection(nearKeys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(nearKeys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -260,7 +260,7 @@ public class GridCacheTtlUpdateRequest extends 
GridCacheIdMessage {
 
         switch (reader.state()) {
             case 4:
-                keys = reader.readCollection(MessageCollectionItemType.MSG);
+                keys = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -268,7 +268,7 @@ public class GridCacheTtlUpdateRequest extends 
GridCacheIdMessage {
                 reader.incrementState();
 
             case 5:
-                nearKeys = 
reader.readCollection(MessageCollectionItemType.MSG);
+                nearKeys = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index dee04c6f2ef..aea28b76a93 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -394,7 +394,7 @@ public class GridDistributedLockRequest extends 
GridDistributedBaseMessage {
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeCollection(keys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(keys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -501,7 +501,7 @@ public class GridDistributedLockRequest extends 
GridDistributedBaseMessage {
                 reader.incrementState();
 
             case 14:
-                keys = reader.readCollection(MessageCollectionItemType.MSG);
+                keys = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
index b502d1f1bde..e37db1622df 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java
@@ -230,7 +230,7 @@ public class GridDistributedLockResponse extends 
GridDistributedBaseMessage {
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeCollection(vals, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(vals, 
MessageCollectionItemType.CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -265,7 +265,7 @@ public class GridDistributedLockResponse extends 
GridDistributedBaseMessage {
                 reader.incrementState();
 
             case 10:
-                vals = reader.readCollection(MessageCollectionItemType.MSG);
+                vals = 
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
index 324c569ac14..1da28eb179e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
@@ -116,7 +116,7 @@ public class GridDistributedUnlockRequest extends 
GridDistributedBaseMessage {
 
         switch (writer.state()) {
             case 8:
-                if (!writer.writeCollection(keys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(keys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -135,7 +135,7 @@ public class GridDistributedUnlockRequest extends 
GridDistributedBaseMessage {
 
         switch (reader.state()) {
             case 8:
-                keys = reader.readCollection(MessageCollectionItemType.MSG);
+                keys = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 0a545287ee9..a08b939b4fa 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1216,7 +1216,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
                             else if (req.needVersion())
                                 res0 = new CacheVersionedValue(info.value(), 
info.version());
                             else
-                                res0 = info.value();
+                                res0 = new CacheVersionedValue(info.value(), 
null);
                         }
 
                         res = new GridNearSingleGetResponse(
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
index bb8aabc2ba2..fd46c5a0017 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java
@@ -337,7 +337,7 @@ public class GridDhtLockRequest extends 
GridDistributedLockRequest {
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeObjectArray(ownedKeys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeObjectArray(ownedKeys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -410,7 +410,7 @@ public class GridDhtLockRequest extends 
GridDistributedLockRequest {
                 reader.incrementState();
 
             case 24:
-                ownedKeys = 
reader.readObjectArray(MessageCollectionItemType.MSG, KeyCacheObject.class);
+                ownedKeys = 
reader.readObjectArray(MessageCollectionItemType.KEY_CACHE_OBJECT, 
KeyCacheObject.class);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index 12b5e371a4f..f237ea4005d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -629,7 +629,7 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
                 setResult(verVal.value(), verVal.version());
             }
             else
-                setResult((CacheObject)res0, null);
+                setResult(res0 == null ? null : 
((CacheVersionedValue)res0).value(), null);
         }
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index acecac9f605..33fd4d522bc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -370,13 +370,13 @@ public class GridDhtAtomicSingleUpdateRequest extends 
GridDhtAtomicAbstractUpdat
 
         switch (writer.state()) {
             case 12:
-                if (!writer.writeMessage(key))
+                if (!writer.writeKeyCacheObject(key))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeMessage(prevVal))
+                if (!writer.writeCacheObject(prevVal))
                     return false;
 
                 writer.incrementState();
@@ -388,7 +388,7 @@ public class GridDhtAtomicSingleUpdateRequest extends 
GridDhtAtomicAbstractUpdat
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeMessage(val))
+                if (!writer.writeCacheObject(val))
                     return false;
 
                 writer.incrementState();
@@ -407,7 +407,7 @@ public class GridDhtAtomicSingleUpdateRequest extends 
GridDhtAtomicAbstractUpdat
 
         switch (reader.state()) {
             case 12:
-                key = reader.readMessage();
+                key = reader.readKeyCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
@@ -415,7 +415,7 @@ public class GridDhtAtomicSingleUpdateRequest extends 
GridDhtAtomicAbstractUpdat
                 reader.incrementState();
 
             case 13:
-                prevVal = reader.readMessage();
+                prevVal = reader.readCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
@@ -431,7 +431,7 @@ public class GridDhtAtomicSingleUpdateRequest extends 
GridDhtAtomicAbstractUpdat
                 reader.incrementState();
 
             case 15:
-                val = reader.readMessage();
+                val = reader.readCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 219ee3acaaf..9c56300a83b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -585,7 +585,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridDhtAtomicAbstractUpdateReque
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeCollection(keys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(keys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -603,7 +603,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridDhtAtomicAbstractUpdateReque
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeCollection(nearKeys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(nearKeys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -615,7 +615,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridDhtAtomicAbstractUpdateReque
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeCollection(nearVals, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(nearVals, 
MessageCollectionItemType.CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -627,7 +627,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridDhtAtomicAbstractUpdateReque
                 writer.incrementState();
 
             case 24:
-                if (!writer.writeCollection(prevVals, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(prevVals, 
MessageCollectionItemType.CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -645,7 +645,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridDhtAtomicAbstractUpdateReque
                 writer.incrementState();
 
             case 27:
-                if (!writer.writeCollection(vals, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(vals, 
MessageCollectionItemType.CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -704,7 +704,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 17:
-                keys = reader.readCollection(MessageCollectionItemType.MSG);
+                keys = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -728,7 +728,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 20:
-                nearKeys = 
reader.readCollection(MessageCollectionItemType.MSG);
+                nearKeys = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -744,7 +744,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 22:
-                nearVals = 
reader.readCollection(MessageCollectionItemType.MSG);
+                nearVals = 
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -760,7 +760,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 24:
-                prevVals = 
reader.readCollection(MessageCollectionItemType.MSG);
+                prevVals = 
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -784,7 +784,7 @@ public class GridDhtAtomicUpdateRequest extends 
GridDhtAtomicAbstractUpdateReque
                 reader.incrementState();
 
             case 27:
-                vals = reader.readCollection(MessageCollectionItemType.MSG);
+                vals = 
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
index 5a17d498f93..eacb7872beb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java
@@ -188,7 +188,7 @@ public class GridDhtAtomicUpdateResponse extends 
GridCacheIdMessage implements G
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeCollection(nearEvicted, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(nearEvicted, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -229,7 +229,7 @@ public class GridDhtAtomicUpdateResponse extends 
GridCacheIdMessage implements G
                 reader.incrementState();
 
             case 6:
-                nearEvicted = 
reader.readCollection(MessageCollectionItemType.MSG);
+                nearEvicted = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 46fcd96ab89..fdb25125b2c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -469,13 +469,13 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeCollection(keys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(keys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeCollection(vals, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(vals, 
MessageCollectionItemType.CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -550,7 +550,7 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 17:
-                keys = reader.readCollection(MessageCollectionItemType.MSG);
+                keys = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
@@ -558,7 +558,7 @@ public class GridNearAtomicFullUpdateRequest extends 
GridNearAtomicAbstractUpdat
                 reader.incrementState();
 
             case 18:
-                vals = reader.readCollection(MessageCollectionItemType.MSG);
+                vals = 
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 606c59bc1c2..fb15a113c5d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -241,13 +241,13 @@ public class GridNearAtomicSingleUpdateRequest extends 
GridNearAtomicAbstractSin
 
         switch (writer.state()) {
             case 10:
-                if (!writer.writeMessage(key))
+                if (!writer.writeKeyCacheObject(key))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeMessage(val))
+                if (!writer.writeCacheObject(val))
                     return false;
 
                 writer.incrementState();
@@ -266,7 +266,7 @@ public class GridNearAtomicSingleUpdateRequest extends 
GridNearAtomicAbstractSin
 
         switch (reader.state()) {
             case 10:
-                key = reader.readMessage();
+                key = reader.readKeyCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
@@ -274,7 +274,7 @@ public class GridNearAtomicSingleUpdateRequest extends 
GridNearAtomicAbstractSin
                 reader.incrementState();
 
             case 11:
-                val = reader.readMessage();
+                val = reader.readCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
index 20d7e13a851..c3e228b7ad5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java
@@ -158,7 +158,7 @@ public class UpdateErrors implements Message {
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeCollection(failedKeys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(failedKeys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -182,7 +182,7 @@ public class UpdateErrors implements Message {
                 reader.incrementState();
 
             case 1:
-                failedKeys = 
reader.readCollection(MessageCollectionItemType.MSG);
+                failedKeys = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
index e62bbfa2eb9..75fafb77df8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysRequest.java
@@ -169,7 +169,7 @@ public class GridDhtForceKeysRequest extends 
GridCacheIdMessage implements GridC
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeCollection(keys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(keys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -208,7 +208,7 @@ public class GridDhtForceKeysRequest extends 
GridCacheIdMessage implements GridC
                 reader.incrementState();
 
             case 5:
-                keys = reader.readCollection(MessageCollectionItemType.MSG);
+                keys = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
index 71de9f3a307..1611b58e1db 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysResponse.java
@@ -234,7 +234,7 @@ public class GridDhtForceKeysResponse extends 
GridCacheIdMessage implements Grid
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeCollection(missedKeys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(missedKeys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -285,7 +285,7 @@ public class GridDhtForceKeysResponse extends 
GridCacheIdMessage implements Grid
                 reader.incrementState();
 
             case 8:
-                missedKeys = 
reader.readCollection(MessageCollectionItemType.MSG);
+                missedKeys = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
index 7a17010e6e4..863d0aac5d5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java
@@ -369,7 +369,7 @@ public class GridNearGetRequest extends GridCacheIdMessage 
implements GridCacheD
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeCollection(keys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(keys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -456,7 +456,7 @@ public class GridNearGetRequest extends GridCacheIdMessage 
implements GridCacheD
                 reader.incrementState();
 
             case 8:
-                keys = reader.readCollection(MessageCollectionItemType.MSG);
+                keys = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
index ce7413ff835..f3c2bf02281 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java
@@ -309,7 +309,7 @@ public class GridNearSingleGetRequest extends 
GridCacheIdMessage implements Grid
                 reader.incrementState();
 
             case 8:
-                key = reader.readMessage();
+                key = reader.readKeyCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
@@ -385,7 +385,7 @@ public class GridNearSingleGetRequest extends 
GridCacheIdMessage implements Grid
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeMessage(key))
+                if (!writer.writeKeyCacheObject(key))
                     return false;
 
                 writer.incrementState();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
index 4a94a4dace1..2bb1840db43 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryRequest.java
@@ -819,7 +819,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 writer.incrementState();
 
             case 26:
-                if (!writer.writeCollection(skipKeys, 
MessageCollectionItemType.MSG))
+                if (!writer.writeCollection(skipKeys, 
MessageCollectionItemType.KEY_CACHE_OBJECT))
                     return false;
 
                 writer.incrementState();
@@ -1017,7 +1017,7 @@ public class GridCacheQueryRequest extends 
GridCacheIdMessage implements GridCac
                 reader.incrementState();
 
             case 26:
-                skipKeys = 
reader.readCollection(MessageCollectionItemType.MSG);
+                skipKeys = 
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 40b64f74bc3..5f72a0e6a6f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -417,19 +417,19 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMessage(isFiltered() ? null : key))
+                if (!writer.writeKeyCacheObject(isFiltered() ? null : key))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeMessage(isFiltered() ? null : newVal))
+                if (!writer.writeCacheObject(isFiltered() ? null : newVal))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeMessage(isFiltered() ? null : oldVal))
+                if (!writer.writeCacheObject(isFiltered() ? null : oldVal))
                     return false;
 
                 writer.incrementState();
@@ -495,7 +495,7 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
                 reader.incrementState();
 
             case 4:
-                key = reader.readMessage();
+                key = reader.readKeyCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
@@ -503,7 +503,7 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
                 reader.incrementState();
 
             case 5:
-                newVal = reader.readMessage();
+                newVal = reader.readCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
@@ -511,7 +511,7 @@ public class CacheContinuousQueryEntry implements 
GridCacheDeployable, Message {
                 reader.incrementState();
 
             case 6:
-                oldVal = reader.readMessage();
+                oldVal = reader.readCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
index 6a72111c1e4..ca0a664c697 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java
@@ -1145,7 +1145,7 @@ public class IgniteTxEntry implements 
GridPeerDeployAware, Message {
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeMessage(key))
+                if (!writer.writeKeyCacheObject(key))
                     return false;
 
                 writer.incrementState();
@@ -1247,7 +1247,7 @@ public class IgniteTxEntry implements 
GridPeerDeployAware, Message {
                 reader.incrementState();
 
             case 7:
-                key = reader.readMessage();
+                key = reader.readKeyCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java
index 4adf4ac7754..7b846ca9005 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxEntryValueHolder.java
@@ -188,7 +188,7 @@ public class TxEntryValueHolder implements Message {
                 writer.incrementState();
 
             case 2:
-                if (!writer.writeMessage(hasWriteVal ? val : null))
+                if (!writer.writeCacheObject(hasWriteVal ? val : null))
                     return false;
 
                 writer.incrementState();
@@ -224,7 +224,7 @@ public class TxEntryValueHolder implements Message {
                 reader.incrementState();
 
             case 2:
-                val = reader.readMessage();
+                val = reader.readCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
index 6fb70b200bf..eb8311bff43 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cacheobject/IgniteCacheObjectProcessor.java
@@ -166,20 +166,20 @@ public interface IgniteCacheObjectProcessor extends 
GridProcessor {
         boolean failIfUnregistered);
 
     /**
-     * @param ctx Cache context.
+     * @param ctx Optional cache context. If {@code null} then skip 
umarshalling the byte array.
      * @param type Object type.
      * @param bytes Object bytes.
      * @return Cache object.
      */
-    public CacheObject toCacheObject(CacheObjectContext ctx, byte type, byte[] 
bytes);
+    public CacheObject toCacheObject(@Nullable CacheObjectContext ctx, byte 
type, byte[] bytes);
 
     /**
-     * @param ctx Cache context.
+     * @param ctx Optional cache context. If {@code null} then skip 
umarshalling the byte array.
      * @param type Object type.
      * @param bytes Object bytes.
      * @return Cache object.
      */
-    public KeyCacheObject toKeyCacheObject(CacheObjectContext ctx, byte type, 
byte[] bytes) throws IgniteCheckedException;
+    public KeyCacheObject toKeyCacheObject(@Nullable CacheObjectContext ctx, 
byte type, byte[] bytes) throws IgniteCheckedException;
 
     /**
      * @param ctx Cache context.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java
index b09af8b4009..e6654cfb202 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerEntry.java
@@ -113,13 +113,13 @@ public class DataStreamerEntry implements 
Map.Entry<KeyCacheObject, CacheObject>
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeMessage(key))
+                if (!writer.writeKeyCacheObject(key))
                     return false;
 
                 writer.incrementState();
 
             case 1:
-                if (!writer.writeMessage(val))
+                if (!writer.writeCacheObject(val))
                     return false;
 
                 writer.incrementState();
@@ -135,7 +135,7 @@ public class DataStreamerEntry implements 
Map.Entry<KeyCacheObject, CacheObject>
 
         switch (reader.state()) {
             case 0:
-                key = reader.readMessage();
+                key = reader.readKeyCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
@@ -143,7 +143,7 @@ public class DataStreamerEntry implements 
Map.Entry<KeyCacheObject, CacheObject>
                 reader.incrementState();
 
             case 1:
-                val = reader.readMessage();
+                val = reader.readCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
index bbb79143b5d..90e7c81d18a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageCollectionItemType.java
@@ -87,7 +87,13 @@ public enum MessageCollectionItemType {
     MSG,
 
     /** Topology version. */
-    AFFINITY_TOPOLOGY_VERSION;
+    AFFINITY_TOPOLOGY_VERSION,
+
+    /** Key cache object. */
+    KEY_CACHE_OBJECT,
+
+    /** Cache object. */
+    CACHE_OBJECT;
 
     /** Enum values. */
     private static final MessageCollectionItemType[] VALS = values();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
index 6d0ac9d191c..b0f55d1a851 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
@@ -24,6 +24,8 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.lang.IgniteUuid;
 
 /**
@@ -194,6 +196,20 @@ public interface MessageReader {
      */
     public <T extends Message> T readMessage();
 
+    /**
+     * Reads {@link CacheObject}.
+     *
+     * @return Cache object.
+     */
+    public CacheObject readCacheObject();
+
+    /**
+     * Reads {@link KeyCacheObject}.
+     *
+     * @return Key cache object.
+     */
+    public KeyCacheObject readKeyCacheObject();
+
     /**
      * Reads array of objects.
      *
diff --git 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
index 5a13c41bff7..356766bdfc0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageWriter.java
@@ -23,6 +23,8 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.lang.IgniteUuid;
 
 /**
@@ -241,6 +243,22 @@ public interface MessageWriter {
      */
     public boolean writeMessage(Message val);
 
+    /**
+     * Writes {@link CacheObject}.
+     *
+     * @param obj Cache object.
+     * @return Whether value was fully written.
+     */
+    public boolean writeCacheObject(CacheObject obj);
+
+    /**
+     * Writes {@link KeyCacheObject}.
+     *
+     * @param obj Key cache object.
+     * @return Whether value was fully written.
+     */
+    public boolean writeKeyCacheObject(KeyCacheObject obj);
+
     /**
      * Writes array of objects.
      *
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
index a0ee15ca451..5d914886a46 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridAffinityNoCacheSelfTest.java
@@ -28,8 +28,6 @@ import 
org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
 import 
org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl;
 import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
@@ -295,25 +293,5 @@ public class GridAffinityNoCacheSelfTest extends 
GridCommonAbstractTest {
         @Override public void prepareMarshal(CacheObjectValueContext ctx) 
throws IgniteCheckedException {
             throw new UnsupportedOperationException();
         }
-
-        /** {@inheritDoc} */
-        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) 
{
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf, MessageReader 
reader) {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onAckReceived() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public short directType() {
-            throw new UnsupportedOperationException();
-        }
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
index 1dfb759b593..3c07575f8b4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/direct/DirectMarshallingMessagesTest.java
@@ -77,7 +77,7 @@ public class DirectMarshallingMessagesTest extends 
GridCommonAbstractTest {
         T resMsg = (T)msgFactory.create(type);
 
         boolean fullyRead = loopBuffer(buf, buf.position(),
-            buf0 -> resMsg.readFrom(buf0, new 
DirectMessageReader(msgFactory)));
+            buf0 -> resMsg.readFrom(buf0, new DirectMessageReader(msgFactory, 
null)));
         assertTrue("The message was not read completely.", fullyRead);
 
         return resMsg;
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java
index 44f44be77f7..cb6c40576fe 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/direct/stream/DirectByteBufferStreamImplByteOrderSelfTest.java
@@ -101,7 +101,7 @@ public class DirectByteBufferStreamImplByteOrderSelfTest {
             @Override public MessageSerializer serializer(short type) {
                 return null;
             }
-        });
+        }, null);
 
         stream.setBuffer(buff);
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractCommunicationMessageSerializationTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractCommunicationMessageSerializationTest.java
index 86f577dccf2..6665d64a507 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractCommunicationMessageSerializationTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/AbstractCommunicationMessageSerializationTest.java
@@ -24,6 +24,8 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
@@ -272,6 +274,16 @@ public abstract class 
AbstractCommunicationMessageSerializationTest {
             return writeField(AffinityTopologyVersion.class);
         }
 
+        /** {@inheritDoc} */
+        @Override public boolean writeCacheObject(CacheObject obj) {
+            return writeField(CacheObject.class);
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean writeKeyCacheObject(KeyCacheObject obj) {
+            return writeField(KeyCacheObject.class);
+        }
+
         /** {@inheritDoc} */
         @Override public boolean writeMessage(Message val) {
             return writeField(Message.class);
@@ -511,6 +523,20 @@ public abstract class 
AbstractCommunicationMessageSerializationTest {
             return null;
         }
 
+        /** {@inheritDoc} */
+        @Override public CacheObject readCacheObject() {
+            readField(CacheObject.class);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public KeyCacheObject readKeyCacheObject() {
+            readField(KeyCacheObject.class);
+
+            return null;
+        }
+
         /** {@inheritDoc} */
         @Override public <T> T[] readObjectArray(MessageCollectionItemType 
itemType, Class<T> itemCls) {
             readField(Object[].class);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
index d26642f7423..ca704daa33b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteIncompleteCacheObjectSelfTest.java
@@ -20,8 +20,6 @@ package org.apache.ignite.internal.processors.cache;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
 import org.junit.Assert;
@@ -160,25 +158,5 @@ public class IgniteIncompleteCacheObjectSelfTest extends 
GridCommonAbstractTest
         @Override public void prepareMarshal(final CacheObjectValueContext 
ctx) throws IgniteCheckedException {
             // No-op
         }
-
-        /** {@inheritDoc} */
-        @Override public boolean writeTo(final ByteBuffer buf, final 
MessageWriter writer) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean readFrom(final ByteBuffer buf, final 
MessageReader reader) {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public short directType() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onAckReceived() {
-            // No-op
-        }
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
index d8d9a19a444..777bea67a89 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/IgniteCacheContinuousQueryImmutableEntryTest.java
@@ -158,7 +158,7 @@ public class IgniteCacheContinuousQueryImmutableEntryTest 
extends GridCommonAbst
         e0.writeTo(buf, writer);
 
         CacheContinuousQueryEntry e1 = new CacheContinuousQueryEntry();
-        e1.readFrom(ByteBuffer.wrap(buf.array()), new 
DirectMessageReader(msgFactory));
+        e1.readFrom(ByteBuffer.wrap(buf.array()), new 
DirectMessageReader(msgFactory, null));
 
         assertEquals(e0.cacheId(), e1.cacheId());
         assertEquals(e0.eventType(), e1.eventType());
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
index 9090918f9be..d714f666aac 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/CacheFreeListSelfTest.java
@@ -53,8 +53,6 @@ import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.CacheVers
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.metric.GridMetricManager;
 import 
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.spi.metric.noop.NoopMetricExporterSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
@@ -736,31 +734,5 @@ public class CacheFreeListSelfTest extends 
GridCommonAbstractTest {
         @Override public void prepareMarshal(CacheObjectValueContext ctx) 
throws IgniteCheckedException {
             assert false;
         }
-
-        /** {@inheritDoc} */
-        @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) 
{
-            assert false;
-
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean readFrom(ByteBuffer buf, MessageReader 
reader) {
-            assert false;
-
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public short directType() {
-            assert false;
-
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void onAckReceived() {
-            assert false;
-        }
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 253abf97fcf..56852ea365a 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -548,7 +548,7 @@ public class GridSpiTestContext implements IgniteSpiContext 
{
                 }
 
                 @Override public MessageReader reader(UUID rmtNodeId, 
MessageFactory msgFactory) {
-                    return new DirectMessageReader(msgFactory);
+                    return new DirectMessageReader(msgFactory, null);
                 }
             };
         }
diff --git a/modules/core/src/test/resources/codegen/TestMessage.java 
b/modules/core/src/test/resources/codegen/TestMessage.java
index 78b99dbede3..4e19fe0809c 100644
--- a/modules/core/src/test/resources/codegen/TestMessage.java
+++ b/modules/core/src/test/resources/codegen/TestMessage.java
@@ -21,6 +21,8 @@ import java.lang.String;
 import java.util.UUID;
 import java.util.BitSet;
 import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.lang.IgniteUuid;
@@ -65,6 +67,12 @@ public class TestMessage implements Message {
     @Order(value = 11, method = "overridenFieldMethod")
     private String field;
 
+    @Order(value = 12)
+    private KeyCacheObject keyCacheObject;
+
+    @Order(value = 13)
+    private CacheObject cacheObject;
+
     public int id() {
         return id;
     }
@@ -161,6 +169,22 @@ public class TestMessage implements Message {
         this.field = field;
     }
 
+    public KeyCacheObject keyCacheObject() {
+        return keyCacheObject;
+    }
+
+    public void keyCacheObject(KeyCacheObject keyCacheObject) {
+        this.keyCacheObject = keyCacheObject;
+    }
+
+    public CacheObject cacheObject() {
+        return cacheObject;
+    }
+
+    public void cacheObject(CacheObject cacheObject) {
+        this.cacheObject = cacheObject;
+    }
+
     public short directType() {
         return 0;
     }
diff --git a/modules/core/src/test/resources/codegen/TestMessageSerializer.java 
b/modules/core/src/test/resources/codegen/TestMessageSerializer.java
index 3d5a0b7e1f5..8ca1f2be5d6 100644
--- a/modules/core/src/test/resources/codegen/TestMessageSerializer.java
+++ b/modules/core/src/test/resources/codegen/TestMessageSerializer.java
@@ -118,6 +118,18 @@ public class TestMessageSerializer implements 
MessageSerializer {
 
                 writer.incrementState();
 
+            case 12:
+                if (!writer.writeKeyCacheObject(msg.keyCacheObject()))
+                    return false;
+
+                writer.incrementState();
+
+            case 13:
+                if (!writer.writeCacheObject(msg.cacheObject()))
+                    return false;
+
+                writer.incrementState();
+
         }
 
         return true;
@@ -226,6 +238,21 @@ public class TestMessageSerializer implements 
MessageSerializer {
 
                 reader.incrementState();
 
+            case 12:
+                msg.keyCacheObject(reader.readKeyCacheObject());
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
+                msg.cacheObject(reader.readCacheObject());
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
         }
 
         return true;
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java
index 429baa914f0..c0d4634556e 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2CacheObject.java
@@ -69,7 +69,7 @@ public class GridH2CacheObject extends GridH2ValueMessage {
 
         switch (reader.state()) {
             case 0:
-                obj = reader.readMessage();
+                obj = reader.readCacheObject();
 
                 if (!reader.isLastRead())
                     return false;
@@ -97,7 +97,7 @@ public class GridH2CacheObject extends GridH2ValueMessage {
 
         switch (writer.state()) {
             case 0:
-                if (!writer.writeMessage(obj))
+                if (!writer.writeCacheObject(obj))
                     return false;
 
                 writer.incrementState();

Reply via email to