This is an automated email from the ASF dual-hosted git repository.

shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 06301d50d95 IGNITE-26813 Refactor QueryStartResponse, FragmentMapping, 
ColocationGroup (#12454)
06301d50d95 is described below

commit 06301d50d95c39297ad357853b1fd3599a0d3784
Author: Vladimir Steshin <[email protected]>
AuthorDate: Fri Oct 31 11:25:35 2025 +0300

    IGNITE-26813 Refactor QueryStartResponse, FragmentMapping, ColocationGroup 
(#12454)
---
 .../query/calcite/exec/ExecutionServiceImpl.java   |   2 +-
 .../query/calcite/message/MessageType.java         |   9 +-
 .../query/calcite/message/QueryStartResponse.java  | 135 ++++----------
 .../query/calcite/metadata/ColocationGroup.java    | 198 ++++++++-------------
 .../calcite/metadata/FragmentDescription.java      |  14 +-
 .../query/calcite/metadata/FragmentMapping.java    | 125 ++++---------
 .../ignite/internal/GridJobExecuteRequest.java     |   2 +-
 7 files changed, 151 insertions(+), 334 deletions(-)

diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 4c4f8ff323e..23b6424db39 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -854,7 +854,7 @@ public class ExecutionServiceImpl<Row> extends 
AbstractService implements Execut
 
         if (!qry.isExchangeWithInitNodeStarted(ectx.fragmentId())) {
             try {
-                messageService().send(origNodeId, new 
QueryStartResponse(qry.id(), ectx.fragmentId()));
+                messageService().send(origNodeId, new 
QueryStartResponse(qry.id(), ectx.fragmentId(), null));
             }
             catch (IgniteCheckedException e) {
                 IgniteException wrpEx = new IgniteException("Failed to send 
reply. [nodeId=" + origNodeId + ']', e);
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index a864eb92775..e8932e7dab9 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -18,10 +18,13 @@
 package org.apache.ignite.internal.processors.query.calcite.message;
 
 import java.util.function.Supplier;
+import org.apache.ignite.internal.codegen.ColocationGroupSerializer;
 import org.apache.ignite.internal.codegen.FragmentDescriptionSerializer;
+import org.apache.ignite.internal.codegen.FragmentMappingSerializer;
 import org.apache.ignite.internal.codegen.InboxCloseMessageSerializer;
 import 
org.apache.ignite.internal.codegen.QueryBatchAcknowledgeMessageSerializer;
 import org.apache.ignite.internal.codegen.QueryCloseMessageSerializer;
+import org.apache.ignite.internal.codegen.QueryStartResponseSerializer;
 import org.apache.ignite.internal.codegen.QueryTxEntrySerializer;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
 import 
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
@@ -36,7 +39,7 @@ public enum MessageType {
     QUERY_START_REQUEST(300, QueryStartRequest::new),
 
     /** */
-    QUERY_START_RESPONSE(301, QueryStartResponse::new),
+    QUERY_START_RESPONSE(301, QueryStartResponse::new, new 
QueryStartResponseSerializer()),
 
     /** */
     QUERY_ERROR_MESSAGE(302, ErrorMessage::new),
@@ -57,10 +60,10 @@ public enum MessageType {
     GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new),
 
     /** */
-    FRAGMENT_MAPPING(350, FragmentMapping::new),
+    FRAGMENT_MAPPING(350, FragmentMapping::new, new 
FragmentMappingSerializer()),
 
     /** */
-    COLOCATION_GROUP(351, ColocationGroup::new),
+    COLOCATION_GROUP(351, ColocationGroup::new, new 
ColocationGroupSerializer()),
 
     /** */
     FRAGMENT_DESCRIPTION(352, FragmentDescription::new, new 
FragmentDescriptionSerializer()),
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
index 340a8637330..5df60d2d780 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
@@ -17,52 +17,49 @@
 
 package org.apache.ignite.internal.processors.query.calcite.message;
 
-import java.nio.ByteBuffer;
 import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
+import org.jetbrains.annotations.Nullable;
 
 /**
  *
  */
-public class QueryStartResponse implements MarshalableMessage {
+public class QueryStartResponse implements CalciteMessage {
     /** */
-    private UUID queryId;
+    @Order(value = 0, method = "queryId")
+    private UUID qryId;
 
     /** */
+    @Order(1)
     private long fragmentId;
 
     /** */
-    @GridDirectTransient
-    private Throwable error;
-
-    /** */
-    private byte[] errBytes;
+    @Order(value = 2, method = "errorMessage")
+    private @Nullable ErrorMessage errMsg;
 
     /** */
     public QueryStartResponse() {}
 
     /** */
-    public QueryStartResponse(UUID queryId, long fragmentId) {
-        this(queryId, fragmentId, null);
-    }
-
-    /** */
-    public QueryStartResponse(UUID queryId, long fragmentId, Throwable error) {
-        this.queryId = queryId;
+    public QueryStartResponse(UUID qryId, long fragmentId, @Nullable Throwable 
error) {
+        this.qryId = qryId;
         this.fragmentId = fragmentId;
-        this.error = error;
+
+        if (error != null)
+            errMsg = new ErrorMessage(error);
     }
 
     /**
      * @return Query ID.
      */
     public UUID queryId() {
-        return queryId;
+        return qryId;
+    }
+
+    /** */
+    public void queryId(UUID qryId) {
+        this.qryId = qryId;
     }
 
     /**
@@ -72,92 +69,26 @@ public class QueryStartResponse implements 
MarshalableMessage {
         return fragmentId;
     }
 
+    /** */
+    public void fragmentId(long fragmentId) {
+        this.fragmentId = fragmentId;
+    }
+
     /**
      * @return Error.
      */
-    public Throwable error() {
-        return error;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        if (error != null)
-            errBytes = U.marshal(ctx, error);
+    public @Nullable Throwable error() {
+        return ErrorMessage.error(errMsg);
     }
 
-    /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) 
throws IgniteCheckedException {
-        if (errBytes != null)
-            error = U.unmarshal(ctx, errBytes, 
U.resolveClassLoader(ctx.gridConfig()));
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeByteArray(errBytes))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeLong(fragmentId))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeUuid(queryId))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
+    /** */
+    public @Nullable ErrorMessage errorMessage() {
+        return errMsg;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                errBytes = reader.readByteArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 1:
-                fragmentId = reader.readLong();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-            case 2:
-                queryId = reader.readUuid();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
+    /** */
+    public void errorMessage(@Nullable ErrorMessage errMsg) {
+        this.errMsg = errMsg;
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index 6ec4a4e1276..0f78c8fe90b 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -17,7 +17,6 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -30,42 +29,38 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.Order;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import 
org.apache.ignite.internal.processors.query.calcite.message.MarshalableMessage;
+import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.GridIntIterator;
 import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
 
 /** */
-public class ColocationGroup implements MarshalableMessage {
+public class ColocationGroup implements CalciteMessage {
     /** */
-    private long[] sourceIds;
+    @Order(value = 0, method = "sourceIds")
+    private long[] srcIds;
 
     /** */
-    @GridDirectCollection(UUID.class)
+    @Order(1)
     private List<UUID> nodeIds;
 
     /** */
-    @GridDirectTransient
     private List<List<UUID>> assignments;
 
     /**
      * Flag, indacating that assignment is formed by original cache assignment 
for given topology.
      * In case of {@code true} value we can skip assignment marshalling and 
calc assignment on remote nodes.
      */
-    @GridDirectTransient
     private boolean primaryAssignment;
 
-    /** Marshalled assignments. */
+    /** Marshalled assignments serialization call holder. */
+    @Order(2)
     private int[] marshalledAssignments;
 
     /** */
@@ -79,8 +74,8 @@ public class ColocationGroup implements MarshalableMessage {
     }
 
     /** */
-    public static ColocationGroup forSourceId(long sourceId) {
-        return new ColocationGroup(new long[] {sourceId}, null, null);
+    public static ColocationGroup forSourceId(long srcId) {
+        return new ColocationGroup(new long[] {srcId}, null, null);
     }
 
     /** */
@@ -92,7 +87,7 @@ public class ColocationGroup implements MarshalableMessage {
                     .collect(Collectors.toList());
         }
 
-        return new ColocationGroup(sourceIds == null ? null : 
Arrays.copyOf(sourceIds, sourceIds.length),
+        return new ColocationGroup(srcIds == null ? null : 
Arrays.copyOf(srcIds, srcIds.length),
             Collections.singletonList(nodeId), locAssignments);
     }
 
@@ -101,15 +96,15 @@ public class ColocationGroup implements MarshalableMessage 
{
     }
 
     /** */
-    private ColocationGroup(long[] sourceIds, List<UUID> nodeIds, 
List<List<UUID>> assignments) {
-        this.sourceIds = sourceIds;
+    private ColocationGroup(long[] srcIds, List<UUID> nodeIds, 
List<List<UUID>> assignments) {
+        this.srcIds = srcIds;
         this.nodeIds = nodeIds;
         this.assignments = assignments;
     }
 
     /** */
-    private ColocationGroup(long[] sourceIds, List<UUID> nodeIds, 
List<List<UUID>> assignments, boolean primaryAssignment) {
-        this(sourceIds, nodeIds, assignments);
+    private ColocationGroup(long[] srcIds, List<UUID> nodeIds, 
List<List<UUID>> assignments, boolean primaryAssignment) {
+        this(srcIds, nodeIds, assignments);
 
         this.primaryAssignment = primaryAssignment;
     }
@@ -121,6 +116,21 @@ public class ColocationGroup implements MarshalableMessage 
{
         return nodeIds == null ? Collections.emptyList() : nodeIds;
     }
 
+    /** */
+    public void nodeIds(List<UUID> nodeIds) {
+        this.nodeIds = nodeIds;
+    }
+
+    /** */
+    public long[] sourceIds() {
+        return srcIds;
+    }
+
+    /** */
+    public void sourceIds(long[] srcIds) {
+        this.srcIds = srcIds;
+    }
+
     /**
      * @return List of partitions (index) and nodes (items) having an 
appropriate partition in
      * {@link GridDhtPartitionState#OWNING} state, calculated for distributed 
tables, involved in query execution.
@@ -136,12 +146,12 @@ public class ColocationGroup implements 
MarshalableMessage {
     }
 
     /** */
-    public boolean belongs(long sourceId) {
-        if (sourceIds == null)
+    public boolean belongs(long srcId) {
+        if (srcIds == null)
             return false;
 
-        for (long i : sourceIds) {
-            if (i == sourceId)
+        for (long i : srcIds) {
+            if (i == srcId)
                 return true;
         }
 
@@ -157,10 +167,10 @@ public class ColocationGroup implements 
MarshalableMessage {
      */
     public ColocationGroup colocate(ColocationGroup other) throws 
ColocationMappingException {
         long[] srcIds;
-        if (sourceIds == null || other.sourceIds == null)
-            srcIds = U.firstNotNull(sourceIds, other.sourceIds);
+        if (this.srcIds == null || other.srcIds == null)
+            srcIds = U.firstNotNull(this.srcIds, other.srcIds);
         else
-            srcIds = LongStream.concat(Arrays.stream(sourceIds), 
Arrays.stream(other.sourceIds)).distinct().toArray();
+            srcIds = LongStream.concat(Arrays.stream(this.srcIds), 
Arrays.stream(other.srcIds)).distinct().toArray();
 
         List<UUID> nodeIds;
         if (this.nodeIds == null || other.nodeIds == null)
@@ -241,7 +251,7 @@ public class ColocationGroup implements MarshalableMessage {
             assignments.add(first != null ? Collections.singletonList(first) : 
Collections.emptyList());
         }
 
-        return new ColocationGroup(sourceIds, new ArrayList<>(nodes), 
assignments, primaryAssignment);
+        return new ColocationGroup(srcIds, new ArrayList<>(nodes), 
assignments, primaryAssignment);
     }
 
     /** */
@@ -250,7 +260,7 @@ public class ColocationGroup implements MarshalableMessage {
             return this;
 
         // Make a shallow copy without cacheAssignment flag.
-        return new ColocationGroup(sourceIds, nodeIds, assignments, false);
+        return new ColocationGroup(srcIds, nodeIds, assignments, false);
     }
 
     /** */
@@ -277,7 +287,7 @@ public class ColocationGroup implements MarshalableMessage {
                     assignments.add(Collections.emptyList());
             }
 
-            return new ColocationGroup(sourceIds, new ArrayList<>(nodes), 
assignments);
+            return new ColocationGroup(srcIds, new ArrayList<>(nodes), 
assignments);
         }
 
         return this;
@@ -285,7 +295,7 @@ public class ColocationGroup implements MarshalableMessage {
 
     /** */
     public ColocationGroup mapToNodes(List<UUID> nodeIds) {
-        return !F.isEmpty(this.nodeIds) ? this : new 
ColocationGroup(sourceIds, nodeIds, null);
+        return !F.isEmpty(this.nodeIds) ? this : new ColocationGroup(srcIds, 
nodeIds, null);
     }
 
     /**
@@ -314,118 +324,54 @@ public class ColocationGroup implements 
MarshalableMessage {
         return MessageType.COLOCATION_GROUP;
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeIntArray(marshalledAssignments))
-                    return false;
-
-                writer.incrementState();
-
-            case 1:
-                if (!writer.writeCollection(nodeIds, 
MessageCollectionItemType.UUID))
-                    return false;
-
-                writer.incrementState();
-
-            case 2:
-                if (!writer.writeLongArray(sourceIds))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                marshalledAssignments = reader.readIntArray();
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
+    /** Significantly compacts and fastens UUIDs marshalling. */
+    public @Nullable int[] marshalledAssignments() {
+        if (assignments == null || primaryAssignment)
+            return null;
 
-            case 1:
-                nodeIds = 
reader.readCollection(MessageCollectionItemType.UUID);
+        Map<UUID, Integer> nodeIdxs = new HashMap<>();
 
-                if (!reader.isLastRead())
-                    return false;
+        for (int i = 0; i < nodeIds.size(); i++)
+            nodeIdxs.put(nodeIds.get(i), i);
 
-                reader.incrementState();
+        int bitsPerPart = Integer.SIZE - 
Integer.numberOfLeadingZeros(nodeIds.size());
 
-            case 2:
-                sourceIds = reader.readLongArray();
+        CompactedIntArray.Builder builder = 
CompactedIntArray.builder(bitsPerPart, assignments.size());
 
-                if (!reader.isLastRead())
-                    return false;
+        for (List<UUID> assignment : assignments) {
+            assert F.isEmpty(assignment) || assignment.size() == 1;
 
-                reader.incrementState();
+            if (F.isEmpty(assignment))
+                builder.add(nodeIds.size());
+            else {
+                Integer nodeIdx = nodeIdxs.get(assignment.get(0));
 
+                builder.add(nodeIdx);
+            }
         }
 
-        return true;
+        return builder.build().buffer();
     }
 
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) {
-        if (assignments != null && marshalledAssignments == null && 
!primaryAssignment) {
-            Map<UUID, Integer> nodeIdxs = new HashMap<>();
-
-            for (int i = 0; i < nodeIds.size(); i++)
-                nodeIdxs.put(nodeIds.get(i), i);
-
-            int bitsPerPart = Integer.SIZE - 
Integer.numberOfLeadingZeros(nodeIds.size());
-
-            CompactedIntArray.Builder builder = 
CompactedIntArray.builder(bitsPerPart, assignments.size());
+    /** Significantly compacts and fastens UUIDs unmarshalling. */
+    public void marshalledAssignments(@Nullable int[] marshalledAssignments) {
+        if (F.isEmpty(marshalledAssignments)) {
+            assignments = null;
 
-            for (List<UUID> assignment : assignments) {
-                assert F.isEmpty(assignment) || assignment.size() == 1;
-
-                if (F.isEmpty(assignment))
-                    builder.add(nodeIds.size());
-                else {
-                    Integer nodeIdx = nodeIdxs.get(assignment.get(0));
-
-                    builder.add(nodeIdx);
-                }
-            }
-
-            marshalledAssignments = builder.build().buffer();
+            return;
         }
-    }
 
-    /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) {
-        if (marshalledAssignments != null && assignments == null) {
-            int bitsPerPart = Integer.SIZE - 
Integer.numberOfLeadingZeros(nodeIds.size());
+        int bitsPerPart = Integer.SIZE - 
Integer.numberOfLeadingZeros(nodeIds.size());
 
-            CompactedIntArray compactedArr = CompactedIntArray.of(bitsPerPart, 
marshalledAssignments);
+        CompactedIntArray compactedArr = CompactedIntArray.of(bitsPerPart, 
marshalledAssignments);
 
-            assignments = new ArrayList<>(compactedArr.size());
+        assignments = new ArrayList<>(compactedArr.size());
 
-            for (GridIntIterator iter = compactedArr.iterator(); 
iter.hasNext(); ) {
-                int nodeIdx = iter.next();
+        for (GridIntIterator iter = compactedArr.iterator(); iter.hasNext(); ) 
{
+            int nodeIdx = iter.next();
 
-                assignments.add(nodeIdx >= nodeIds.size() ? 
Collections.emptyList() :
-                    Collections.singletonList(nodeIds.get(nodeIdx)));
-            }
+            assignments.add(nodeIdx >= nodeIds.size() ? 
Collections.emptyList() :
+                Collections.singletonList(nodeIds.get(nodeIdx)));
         }
     }
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
index 6127e741546..881019b5ed6 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
@@ -119,15 +119,9 @@ public class FragmentDescription implements 
MarshalableMessage {
 
     /** {@inheritDoc} */
     @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) {
-        if (mapping != null)
-            mapping.prepareMarshal(ctx);
-
-        if (target != null) {
+        if (target != null)
             target = target.explicitMapping();
 
-            target.prepareMarshal(ctx);
-        }
-
         if (remoteSources0 == null && remoteSources != null) {
             remoteSources0 = U.newHashMap(remoteSources.size());
 
@@ -138,12 +132,6 @@ public class FragmentDescription implements 
MarshalableMessage {
 
     /** {@inheritDoc} */
     @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) {
-        if (mapping != null)
-            mapping.prepareUnmarshal(ctx);
-
-        if (target != null)
-            target.prepareUnmarshal(ctx);
-
         if (remoteSources == null && remoteSources0 != null) {
             remoteSources = U.newHashMap(remoteSources0.size());
 
diff --git 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
index 16df8d7d3ef..5450d9fcaec 100644
--- 
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
+++ 
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
@@ -17,45 +17,40 @@
 
 package org.apache.ignite.internal.processors.query.calcite.metadata;
 
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import 
org.apache.ignite.internal.processors.query.calcite.message.MarshalableMessage;
+import org.apache.ignite.internal.Order;
+import 
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
 import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
 import org.apache.ignite.internal.processors.query.calcite.util.Commons;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import 
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
 
 /**
  *
  */
-public class FragmentMapping implements MarshalableMessage {
+public class FragmentMapping implements CalciteMessage {
     /** */
-    @GridDirectCollection(ColocationGroup.class)
-    private List<ColocationGroup> colocationGroups;
+    @Order(value = 0, method = "colocationGroups")
+    private List<ColocationGroup> colocationGrps;
 
     /** */
     public FragmentMapping() {
     }
 
     /** */
-    private FragmentMapping(ColocationGroup colocationGroup) {
-        this(F.asList(colocationGroup));
+    private FragmentMapping(ColocationGroup colocationGrp) {
+        this(F.asList(colocationGrp));
     }
 
     /** */
-    private FragmentMapping(List<ColocationGroup> colocationGroups) {
-        this.colocationGroups = colocationGroups;
+    private FragmentMapping(List<ColocationGroup> colocationGrps) {
+        this.colocationGrps = colocationGrps;
     }
 
     /** */
@@ -69,14 +64,14 @@ public class FragmentMapping implements MarshalableMessage {
     }
 
     /** */
-    public static FragmentMapping create(long sourceId) {
-        return new FragmentMapping(ColocationGroup.forSourceId(sourceId));
+    public static FragmentMapping create(long srcId) {
+        return new FragmentMapping(ColocationGroup.forSourceId(srcId));
     }
 
     /** */
-    public static FragmentMapping create(long sourceId, ColocationGroup group) 
{
+    public static FragmentMapping create(long srcId, ColocationGroup grp) {
         try {
-            return new 
FragmentMapping(ColocationGroup.forSourceId(sourceId).colocate(group));
+            return new 
FragmentMapping(ColocationGroup.forSourceId(srcId).colocate(grp));
         }
         catch (ColocationMappingException e) {
             throw new AssertionError(e); // Cannot happen
@@ -85,20 +80,20 @@ public class FragmentMapping implements MarshalableMessage {
 
     /** */
     public boolean colocated() {
-        return colocationGroups.isEmpty() || colocationGroups.size() == 1;
+        return colocationGrps.isEmpty() || colocationGrps.size() == 1;
     }
 
     /** */
     public FragmentMapping combine(FragmentMapping other) {
-        return new FragmentMapping(Commons.combine(colocationGroups, 
other.colocationGroups));
+        return new FragmentMapping(Commons.combine(colocationGrps, 
other.colocationGrps));
     }
 
     /** */
     public FragmentMapping colocate(FragmentMapping other) throws 
ColocationMappingException {
         assert colocated() && other.colocated();
 
-        ColocationGroup first = F.first(colocationGroups);
-        ColocationGroup second = F.first(other.colocationGroups);
+        ColocationGroup first = F.first(colocationGrps);
+        ColocationGroup second = F.first(other.colocationGrps);
 
         if (first == null && second == null)
             return this;
@@ -110,33 +105,38 @@ public class FragmentMapping implements 
MarshalableMessage {
 
     /** */
     public FragmentMapping local(UUID nodeId) throws 
ColocationMappingException {
-        if (colocationGroups.isEmpty())
+        if (colocationGrps.isEmpty())
             return create(nodeId).colocate(this);
 
-        return new FragmentMapping(Commons.transform(colocationGroups, c -> 
c.local(nodeId)));
+        return new FragmentMapping(Commons.transform(colocationGrps, c -> 
c.local(nodeId)));
     }
 
     /** */
     public List<UUID> nodeIds() {
-        return colocationGroups.stream()
+        return colocationGrps.stream()
             .flatMap(g -> g.nodeIds().stream())
             .distinct().collect(Collectors.toList());
     }
 
     /** */
     public List<ColocationGroup> colocationGroups() {
-        return Collections.unmodifiableList(colocationGroups);
+        return colocationGrps == null ? Collections.emptyList() : 
Collections.unmodifiableList(colocationGrps);
     }
 
     /** */
-    public FragmentMapping finalizeMapping(Supplier<List<UUID>> nodesSource) {
-        if (colocationGroups.isEmpty())
+    public void colocationGroups(List<ColocationGroup> colocationGrps) {
+        this.colocationGrps = colocationGrps;
+    }
+
+    /** */
+    public FragmentMapping finalizeMapping(Supplier<List<UUID>> nodesSrc) {
+        if (colocationGrps.isEmpty())
             return this;
 
-        List<ColocationGroup> colocationGrps = this.colocationGroups;
+        List<ColocationGroup> colocationGrps = this.colocationGrps;
 
         colocationGrps = Commons.transform(colocationGrps, 
ColocationGroup::finalizeMapping);
-        List<UUID> nodes = nodeIds(), nodes0 = nodes.isEmpty() ? 
nodesSource.get() : nodes;
+        List<UUID> nodes = nodeIds(), nodes0 = nodes.isEmpty() ? 
nodesSrc.get() : nodes;
         colocationGrps = Commons.transform(colocationGrps, g -> 
g.mapToNodes(nodes0));
 
         return new FragmentMapping(colocationGrps);
@@ -144,7 +144,7 @@ public class FragmentMapping implements MarshalableMessage {
 
     /** */
     public FragmentMapping filterByPartitions(int[] parts) throws 
ColocationMappingException {
-        List<ColocationGroup> colocationGrps = this.colocationGroups;
+        List<ColocationGroup> colocationGrps = this.colocationGrps;
 
         if (!F.isEmpty(parts) && colocationGrps.size() > 1)
             throw new ColocationMappingException("Execution of non-collocated 
query with partition parameter is not possible");
@@ -155,15 +155,15 @@ public class FragmentMapping implements 
MarshalableMessage {
     }
 
     /** */
-    public @NotNull ColocationGroup findGroup(long sourceId) {
-        List<ColocationGroup> grps = colocationGroups.stream()
-            .filter(c -> c.belongs(sourceId))
+    public @NotNull ColocationGroup findGroup(long srcId) {
+        List<ColocationGroup> grps = colocationGrps.stream()
+            .filter(c -> c.belongs(srcId))
             .collect(Collectors.toList());
 
         if (grps.isEmpty())
-            throw new IllegalStateException("Failed to find group with given 
id. [sourceId=" + sourceId + "]");
+            throw new IllegalStateException("Failed to find group with given 
id. [sourceId=" + srcId + "]");
         else if (grps.size() > 1)
-            throw new IllegalStateException("Multiple groups with the same id 
found. [sourceId=" + sourceId + "]");
+            throw new IllegalStateException("Multiple groups with the same id 
found. [sourceId=" + srcId + "]");
 
         return F.first(grps);
     }
@@ -174,63 +174,12 @@ public class FragmentMapping implements 
MarshalableMessage {
 
         srcIds.forEach(srcId -> explicitMappingGrps.add(findGroup(srcId)));
 
-        return new FragmentMapping(Commons.transform(colocationGroups,
+        return new FragmentMapping(Commons.transform(colocationGrps,
             g -> explicitMappingGrps.contains(g) ? g.explicitMapping() : g));
     }
 
-    /** {@inheritDoc} */
-    @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) {
-        colocationGroups.forEach(g -> g.prepareMarshal(ctx));
-    }
-
-    /** {@inheritDoc} */
-    @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) {
-        colocationGroups.forEach(g -> g.prepareUnmarshal(ctx));
-    }
-
     /** {@inheritDoc} */
     @Override public MessageType type() {
         return MessageType.FRAGMENT_MAPPING;
     }
-
-    /** {@inheritDoc} */
-    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
-        writer.setBuffer(buf);
-
-        if (!writer.isHeaderWritten()) {
-            if (!writer.writeHeader(directType()))
-                return false;
-
-            writer.onHeaderWritten();
-        }
-
-        switch (writer.state()) {
-            case 0:
-                if (!writer.writeCollection(colocationGroups, 
MessageCollectionItemType.MSG))
-                    return false;
-
-                writer.incrementState();
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
-        reader.setBuffer(buf);
-
-        switch (reader.state()) {
-            case 0:
-                colocationGroups = 
reader.readCollection(MessageCollectionItemType.MSG);
-
-                if (!reader.isLastRead())
-                    return false;
-
-                reader.incrementState();
-
-        }
-
-        return true;
-    }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
index 5c0739e0065..1ba24925810 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
@@ -267,7 +267,7 @@ public class GridJobExecuteRequest implements 
ExecutorAwareMessage {
 
         this.cpSpi = cpSpi == null || cpSpi.isEmpty() ? null : cpSpi;
     }
-    
+
     /**
      * @return Task session ID.
      */


Reply via email to