This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 06301d50d95 IGNITE-26813 Refactor QueryStartResponse, FragmentMapping,
ColocationGroup (#12454)
06301d50d95 is described below
commit 06301d50d95c39297ad357853b1fd3599a0d3784
Author: Vladimir Steshin <[email protected]>
AuthorDate: Fri Oct 31 11:25:35 2025 +0300
IGNITE-26813 Refactor QueryStartResponse, FragmentMapping, ColocationGroup
(#12454)
---
.../query/calcite/exec/ExecutionServiceImpl.java | 2 +-
.../query/calcite/message/MessageType.java | 9 +-
.../query/calcite/message/QueryStartResponse.java | 135 ++++----------
.../query/calcite/metadata/ColocationGroup.java | 198 ++++++++-------------
.../calcite/metadata/FragmentDescription.java | 14 +-
.../query/calcite/metadata/FragmentMapping.java | 125 ++++---------
.../ignite/internal/GridJobExecuteRequest.java | 2 +-
7 files changed, 151 insertions(+), 334 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 4c4f8ff323e..23b6424db39 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -854,7 +854,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
if (!qry.isExchangeWithInitNodeStarted(ectx.fragmentId())) {
try {
- messageService().send(origNodeId, new
QueryStartResponse(qry.id(), ectx.fragmentId()));
+ messageService().send(origNodeId, new
QueryStartResponse(qry.id(), ectx.fragmentId(), null));
}
catch (IgniteCheckedException e) {
IgniteException wrpEx = new IgniteException("Failed to send
reply. [nodeId=" + origNodeId + ']', e);
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index a864eb92775..e8932e7dab9 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -18,10 +18,13 @@
package org.apache.ignite.internal.processors.query.calcite.message;
import java.util.function.Supplier;
+import org.apache.ignite.internal.codegen.ColocationGroupSerializer;
import org.apache.ignite.internal.codegen.FragmentDescriptionSerializer;
+import org.apache.ignite.internal.codegen.FragmentMappingSerializer;
import org.apache.ignite.internal.codegen.InboxCloseMessageSerializer;
import
org.apache.ignite.internal.codegen.QueryBatchAcknowledgeMessageSerializer;
import org.apache.ignite.internal.codegen.QueryCloseMessageSerializer;
+import org.apache.ignite.internal.codegen.QueryStartResponseSerializer;
import org.apache.ignite.internal.codegen.QueryTxEntrySerializer;
import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
@@ -36,7 +39,7 @@ public enum MessageType {
QUERY_START_REQUEST(300, QueryStartRequest::new),
/** */
- QUERY_START_RESPONSE(301, QueryStartResponse::new),
+ QUERY_START_RESPONSE(301, QueryStartResponse::new, new
QueryStartResponseSerializer()),
/** */
QUERY_ERROR_MESSAGE(302, ErrorMessage::new),
@@ -57,10 +60,10 @@ public enum MessageType {
GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new),
/** */
- FRAGMENT_MAPPING(350, FragmentMapping::new),
+ FRAGMENT_MAPPING(350, FragmentMapping::new, new
FragmentMappingSerializer()),
/** */
- COLOCATION_GROUP(351, ColocationGroup::new),
+ COLOCATION_GROUP(351, ColocationGroup::new, new
ColocationGroupSerializer()),
/** */
FRAGMENT_DESCRIPTION(352, FragmentDescription::new, new
FragmentDescriptionSerializer()),
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
index 340a8637330..5df60d2d780 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartResponse.java
@@ -17,52 +17,49 @@
package org.apache.ignite.internal.processors.query.calcite.message;
-import java.nio.ByteBuffer;
import java.util.UUID;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
+import org.jetbrains.annotations.Nullable;
/**
*
*/
-public class QueryStartResponse implements MarshalableMessage {
+public class QueryStartResponse implements CalciteMessage {
/** */
- private UUID queryId;
+ @Order(value = 0, method = "queryId")
+ private UUID qryId;
/** */
+ @Order(1)
private long fragmentId;
/** */
- @GridDirectTransient
- private Throwable error;
-
- /** */
- private byte[] errBytes;
+ @Order(value = 2, method = "errorMessage")
+ private @Nullable ErrorMessage errMsg;
/** */
public QueryStartResponse() {}
/** */
- public QueryStartResponse(UUID queryId, long fragmentId) {
- this(queryId, fragmentId, null);
- }
-
- /** */
- public QueryStartResponse(UUID queryId, long fragmentId, Throwable error) {
- this.queryId = queryId;
+ public QueryStartResponse(UUID qryId, long fragmentId, @Nullable Throwable
error) {
+ this.qryId = qryId;
this.fragmentId = fragmentId;
- this.error = error;
+
+ if (error != null)
+ errMsg = new ErrorMessage(error);
}
/**
* @return Query ID.
*/
public UUID queryId() {
- return queryId;
+ return qryId;
+ }
+
+ /** */
+ public void queryId(UUID qryId) {
+ this.qryId = qryId;
}
/**
@@ -72,92 +69,26 @@ public class QueryStartResponse implements
MarshalableMessage {
return fragmentId;
}
+ /** */
+ public void fragmentId(long fragmentId) {
+ this.fragmentId = fragmentId;
+ }
+
/**
* @return Error.
*/
- public Throwable error() {
- return error;
- }
-
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- if (error != null)
- errBytes = U.marshal(ctx, error);
+ public @Nullable Throwable error() {
+ return ErrorMessage.error(errMsg);
}
- /** {@inheritDoc} */
- @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- if (errBytes != null)
- error = U.unmarshal(ctx, errBytes,
U.resolveClassLoader(ctx.gridConfig()));
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray(errBytes))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeLong(fragmentId))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeUuid(queryId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
+ /** */
+ public @Nullable ErrorMessage errorMessage() {
+ return errMsg;
}
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- errBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- fragmentId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- queryId = reader.readUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
+ /** */
+ public void errorMessage(@Nullable ErrorMessage errMsg) {
+ this.errMsg = errMsg;
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
index 6ec4a4e1276..0f78c8fe90b 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/ColocationGroup.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -30,42 +29,38 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.Order;
import
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
-import
org.apache.ignite.internal.processors.query.calcite.message.MarshalableMessage;
+import
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.GridIntIterator;
import org.apache.ignite.internal.util.GridIntList;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
/** */
-public class ColocationGroup implements MarshalableMessage {
+public class ColocationGroup implements CalciteMessage {
/** */
- private long[] sourceIds;
+ @Order(value = 0, method = "sourceIds")
+ private long[] srcIds;
/** */
- @GridDirectCollection(UUID.class)
+ @Order(1)
private List<UUID> nodeIds;
/** */
- @GridDirectTransient
private List<List<UUID>> assignments;
/**
* Flag, indacating that assignment is formed by original cache assignment
for given topology.
* In case of {@code true} value we can skip assignment marshalling and
calc assignment on remote nodes.
*/
- @GridDirectTransient
private boolean primaryAssignment;
- /** Marshalled assignments. */
+ /** Marshalled assignments serialization call holder. */
+ @Order(2)
private int[] marshalledAssignments;
/** */
@@ -79,8 +74,8 @@ public class ColocationGroup implements MarshalableMessage {
}
/** */
- public static ColocationGroup forSourceId(long sourceId) {
- return new ColocationGroup(new long[] {sourceId}, null, null);
+ public static ColocationGroup forSourceId(long srcId) {
+ return new ColocationGroup(new long[] {srcId}, null, null);
}
/** */
@@ -92,7 +87,7 @@ public class ColocationGroup implements MarshalableMessage {
.collect(Collectors.toList());
}
- return new ColocationGroup(sourceIds == null ? null :
Arrays.copyOf(sourceIds, sourceIds.length),
+ return new ColocationGroup(srcIds == null ? null :
Arrays.copyOf(srcIds, srcIds.length),
Collections.singletonList(nodeId), locAssignments);
}
@@ -101,15 +96,15 @@ public class ColocationGroup implements MarshalableMessage
{
}
/** */
- private ColocationGroup(long[] sourceIds, List<UUID> nodeIds,
List<List<UUID>> assignments) {
- this.sourceIds = sourceIds;
+ private ColocationGroup(long[] srcIds, List<UUID> nodeIds,
List<List<UUID>> assignments) {
+ this.srcIds = srcIds;
this.nodeIds = nodeIds;
this.assignments = assignments;
}
/** */
- private ColocationGroup(long[] sourceIds, List<UUID> nodeIds,
List<List<UUID>> assignments, boolean primaryAssignment) {
- this(sourceIds, nodeIds, assignments);
+ private ColocationGroup(long[] srcIds, List<UUID> nodeIds,
List<List<UUID>> assignments, boolean primaryAssignment) {
+ this(srcIds, nodeIds, assignments);
this.primaryAssignment = primaryAssignment;
}
@@ -121,6 +116,21 @@ public class ColocationGroup implements MarshalableMessage
{
return nodeIds == null ? Collections.emptyList() : nodeIds;
}
+ /** */
+ public void nodeIds(List<UUID> nodeIds) {
+ this.nodeIds = nodeIds;
+ }
+
+ /** */
+ public long[] sourceIds() {
+ return srcIds;
+ }
+
+ /** */
+ public void sourceIds(long[] srcIds) {
+ this.srcIds = srcIds;
+ }
+
/**
* @return List of partitions (index) and nodes (items) having an
appropriate partition in
* {@link GridDhtPartitionState#OWNING} state, calculated for distributed
tables, involved in query execution.
@@ -136,12 +146,12 @@ public class ColocationGroup implements
MarshalableMessage {
}
/** */
- public boolean belongs(long sourceId) {
- if (sourceIds == null)
+ public boolean belongs(long srcId) {
+ if (srcIds == null)
return false;
- for (long i : sourceIds) {
- if (i == sourceId)
+ for (long i : srcIds) {
+ if (i == srcId)
return true;
}
@@ -157,10 +167,10 @@ public class ColocationGroup implements
MarshalableMessage {
*/
public ColocationGroup colocate(ColocationGroup other) throws
ColocationMappingException {
long[] srcIds;
- if (sourceIds == null || other.sourceIds == null)
- srcIds = U.firstNotNull(sourceIds, other.sourceIds);
+ if (this.srcIds == null || other.srcIds == null)
+ srcIds = U.firstNotNull(this.srcIds, other.srcIds);
else
- srcIds = LongStream.concat(Arrays.stream(sourceIds),
Arrays.stream(other.sourceIds)).distinct().toArray();
+ srcIds = LongStream.concat(Arrays.stream(this.srcIds),
Arrays.stream(other.srcIds)).distinct().toArray();
List<UUID> nodeIds;
if (this.nodeIds == null || other.nodeIds == null)
@@ -241,7 +251,7 @@ public class ColocationGroup implements MarshalableMessage {
assignments.add(first != null ? Collections.singletonList(first) :
Collections.emptyList());
}
- return new ColocationGroup(sourceIds, new ArrayList<>(nodes),
assignments, primaryAssignment);
+ return new ColocationGroup(srcIds, new ArrayList<>(nodes),
assignments, primaryAssignment);
}
/** */
@@ -250,7 +260,7 @@ public class ColocationGroup implements MarshalableMessage {
return this;
// Make a shallow copy without cacheAssignment flag.
- return new ColocationGroup(sourceIds, nodeIds, assignments, false);
+ return new ColocationGroup(srcIds, nodeIds, assignments, false);
}
/** */
@@ -277,7 +287,7 @@ public class ColocationGroup implements MarshalableMessage {
assignments.add(Collections.emptyList());
}
- return new ColocationGroup(sourceIds, new ArrayList<>(nodes),
assignments);
+ return new ColocationGroup(srcIds, new ArrayList<>(nodes),
assignments);
}
return this;
@@ -285,7 +295,7 @@ public class ColocationGroup implements MarshalableMessage {
/** */
public ColocationGroup mapToNodes(List<UUID> nodeIds) {
- return !F.isEmpty(this.nodeIds) ? this : new
ColocationGroup(sourceIds, nodeIds, null);
+ return !F.isEmpty(this.nodeIds) ? this : new ColocationGroup(srcIds,
nodeIds, null);
}
/**
@@ -314,118 +324,54 @@ public class ColocationGroup implements
MarshalableMessage {
return MessageType.COLOCATION_GROUP;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeIntArray(marshalledAssignments))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeCollection(nodeIds,
MessageCollectionItemType.UUID))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeLongArray(sourceIds))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- marshalledAssignments = reader.readIntArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
+ /** Significantly compacts and fastens UUIDs marshalling. */
+ public @Nullable int[] marshalledAssignments() {
+ if (assignments == null || primaryAssignment)
+ return null;
- case 1:
- nodeIds =
reader.readCollection(MessageCollectionItemType.UUID);
+ Map<UUID, Integer> nodeIdxs = new HashMap<>();
- if (!reader.isLastRead())
- return false;
+ for (int i = 0; i < nodeIds.size(); i++)
+ nodeIdxs.put(nodeIds.get(i), i);
- reader.incrementState();
+ int bitsPerPart = Integer.SIZE -
Integer.numberOfLeadingZeros(nodeIds.size());
- case 2:
- sourceIds = reader.readLongArray();
+ CompactedIntArray.Builder builder =
CompactedIntArray.builder(bitsPerPart, assignments.size());
- if (!reader.isLastRead())
- return false;
+ for (List<UUID> assignment : assignments) {
+ assert F.isEmpty(assignment) || assignment.size() == 1;
- reader.incrementState();
+ if (F.isEmpty(assignment))
+ builder.add(nodeIds.size());
+ else {
+ Integer nodeIdx = nodeIdxs.get(assignment.get(0));
+ builder.add(nodeIdx);
+ }
}
- return true;
+ return builder.build().buffer();
}
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) {
- if (assignments != null && marshalledAssignments == null &&
!primaryAssignment) {
- Map<UUID, Integer> nodeIdxs = new HashMap<>();
-
- for (int i = 0; i < nodeIds.size(); i++)
- nodeIdxs.put(nodeIds.get(i), i);
-
- int bitsPerPart = Integer.SIZE -
Integer.numberOfLeadingZeros(nodeIds.size());
-
- CompactedIntArray.Builder builder =
CompactedIntArray.builder(bitsPerPart, assignments.size());
+ /** Significantly compacts and fastens UUIDs unmarshalling. */
+ public void marshalledAssignments(@Nullable int[] marshalledAssignments) {
+ if (F.isEmpty(marshalledAssignments)) {
+ assignments = null;
- for (List<UUID> assignment : assignments) {
- assert F.isEmpty(assignment) || assignment.size() == 1;
-
- if (F.isEmpty(assignment))
- builder.add(nodeIds.size());
- else {
- Integer nodeIdx = nodeIdxs.get(assignment.get(0));
-
- builder.add(nodeIdx);
- }
- }
-
- marshalledAssignments = builder.build().buffer();
+ return;
}
- }
- /** {@inheritDoc} */
- @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) {
- if (marshalledAssignments != null && assignments == null) {
- int bitsPerPart = Integer.SIZE -
Integer.numberOfLeadingZeros(nodeIds.size());
+ int bitsPerPart = Integer.SIZE -
Integer.numberOfLeadingZeros(nodeIds.size());
- CompactedIntArray compactedArr = CompactedIntArray.of(bitsPerPart,
marshalledAssignments);
+ CompactedIntArray compactedArr = CompactedIntArray.of(bitsPerPart,
marshalledAssignments);
- assignments = new ArrayList<>(compactedArr.size());
+ assignments = new ArrayList<>(compactedArr.size());
- for (GridIntIterator iter = compactedArr.iterator();
iter.hasNext(); ) {
- int nodeIdx = iter.next();
+ for (GridIntIterator iter = compactedArr.iterator(); iter.hasNext(); )
{
+ int nodeIdx = iter.next();
- assignments.add(nodeIdx >= nodeIds.size() ?
Collections.emptyList() :
- Collections.singletonList(nodeIds.get(nodeIdx)));
- }
+ assignments.add(nodeIdx >= nodeIds.size() ?
Collections.emptyList() :
+ Collections.singletonList(nodeIds.get(nodeIdx)));
}
}
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
index 6127e741546..881019b5ed6 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentDescription.java
@@ -119,15 +119,9 @@ public class FragmentDescription implements
MarshalableMessage {
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) {
- if (mapping != null)
- mapping.prepareMarshal(ctx);
-
- if (target != null) {
+ if (target != null)
target = target.explicitMapping();
- target.prepareMarshal(ctx);
- }
-
if (remoteSources0 == null && remoteSources != null) {
remoteSources0 = U.newHashMap(remoteSources.size());
@@ -138,12 +132,6 @@ public class FragmentDescription implements
MarshalableMessage {
/** {@inheritDoc} */
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) {
- if (mapping != null)
- mapping.prepareUnmarshal(ctx);
-
- if (target != null)
- target.prepareUnmarshal(ctx);
-
if (remoteSources == null && remoteSources0 != null) {
remoteSources = U.newHashMap(remoteSources0.size());
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
index 16df8d7d3ef..5450d9fcaec 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/metadata/FragmentMapping.java
@@ -17,45 +17,40 @@
package org.apache.ignite.internal.processors.query.calcite.metadata;
-import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import
org.apache.ignite.internal.processors.query.calcite.message.MarshalableMessage;
+import org.apache.ignite.internal.Order;
+import
org.apache.ignite.internal.processors.query.calcite.message.CalciteMessage;
import org.apache.ignite.internal.processors.query.calcite.message.MessageType;
import org.apache.ignite.internal.processors.query.calcite.util.Commons;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
/**
*
*/
-public class FragmentMapping implements MarshalableMessage {
+public class FragmentMapping implements CalciteMessage {
/** */
- @GridDirectCollection(ColocationGroup.class)
- private List<ColocationGroup> colocationGroups;
+ @Order(value = 0, method = "colocationGroups")
+ private List<ColocationGroup> colocationGrps;
/** */
public FragmentMapping() {
}
/** */
- private FragmentMapping(ColocationGroup colocationGroup) {
- this(F.asList(colocationGroup));
+ private FragmentMapping(ColocationGroup colocationGrp) {
+ this(F.asList(colocationGrp));
}
/** */
- private FragmentMapping(List<ColocationGroup> colocationGroups) {
- this.colocationGroups = colocationGroups;
+ private FragmentMapping(List<ColocationGroup> colocationGrps) {
+ this.colocationGrps = colocationGrps;
}
/** */
@@ -69,14 +64,14 @@ public class FragmentMapping implements MarshalableMessage {
}
/** */
- public static FragmentMapping create(long sourceId) {
- return new FragmentMapping(ColocationGroup.forSourceId(sourceId));
+ public static FragmentMapping create(long srcId) {
+ return new FragmentMapping(ColocationGroup.forSourceId(srcId));
}
/** */
- public static FragmentMapping create(long sourceId, ColocationGroup group)
{
+ public static FragmentMapping create(long srcId, ColocationGroup grp) {
try {
- return new
FragmentMapping(ColocationGroup.forSourceId(sourceId).colocate(group));
+ return new
FragmentMapping(ColocationGroup.forSourceId(srcId).colocate(grp));
}
catch (ColocationMappingException e) {
throw new AssertionError(e); // Cannot happen
@@ -85,20 +80,20 @@ public class FragmentMapping implements MarshalableMessage {
/** */
public boolean colocated() {
- return colocationGroups.isEmpty() || colocationGroups.size() == 1;
+ return colocationGrps.isEmpty() || colocationGrps.size() == 1;
}
/** */
public FragmentMapping combine(FragmentMapping other) {
- return new FragmentMapping(Commons.combine(colocationGroups,
other.colocationGroups));
+ return new FragmentMapping(Commons.combine(colocationGrps,
other.colocationGrps));
}
/** */
public FragmentMapping colocate(FragmentMapping other) throws
ColocationMappingException {
assert colocated() && other.colocated();
- ColocationGroup first = F.first(colocationGroups);
- ColocationGroup second = F.first(other.colocationGroups);
+ ColocationGroup first = F.first(colocationGrps);
+ ColocationGroup second = F.first(other.colocationGrps);
if (first == null && second == null)
return this;
@@ -110,33 +105,38 @@ public class FragmentMapping implements
MarshalableMessage {
/** */
public FragmentMapping local(UUID nodeId) throws
ColocationMappingException {
- if (colocationGroups.isEmpty())
+ if (colocationGrps.isEmpty())
return create(nodeId).colocate(this);
- return new FragmentMapping(Commons.transform(colocationGroups, c ->
c.local(nodeId)));
+ return new FragmentMapping(Commons.transform(colocationGrps, c ->
c.local(nodeId)));
}
/** */
public List<UUID> nodeIds() {
- return colocationGroups.stream()
+ return colocationGrps.stream()
.flatMap(g -> g.nodeIds().stream())
.distinct().collect(Collectors.toList());
}
/** */
public List<ColocationGroup> colocationGroups() {
- return Collections.unmodifiableList(colocationGroups);
+ return colocationGrps == null ? Collections.emptyList() :
Collections.unmodifiableList(colocationGrps);
}
/** */
- public FragmentMapping finalizeMapping(Supplier<List<UUID>> nodesSource) {
- if (colocationGroups.isEmpty())
+ public void colocationGroups(List<ColocationGroup> colocationGrps) {
+ this.colocationGrps = colocationGrps;
+ }
+
+ /** */
+ public FragmentMapping finalizeMapping(Supplier<List<UUID>> nodesSrc) {
+ if (colocationGrps.isEmpty())
return this;
- List<ColocationGroup> colocationGrps = this.colocationGroups;
+ List<ColocationGroup> colocationGrps = this.colocationGrps;
colocationGrps = Commons.transform(colocationGrps,
ColocationGroup::finalizeMapping);
- List<UUID> nodes = nodeIds(), nodes0 = nodes.isEmpty() ?
nodesSource.get() : nodes;
+ List<UUID> nodes = nodeIds(), nodes0 = nodes.isEmpty() ?
nodesSrc.get() : nodes;
colocationGrps = Commons.transform(colocationGrps, g ->
g.mapToNodes(nodes0));
return new FragmentMapping(colocationGrps);
@@ -144,7 +144,7 @@ public class FragmentMapping implements MarshalableMessage {
/** */
public FragmentMapping filterByPartitions(int[] parts) throws
ColocationMappingException {
- List<ColocationGroup> colocationGrps = this.colocationGroups;
+ List<ColocationGroup> colocationGrps = this.colocationGrps;
if (!F.isEmpty(parts) && colocationGrps.size() > 1)
throw new ColocationMappingException("Execution of non-collocated
query with partition parameter is not possible");
@@ -155,15 +155,15 @@ public class FragmentMapping implements
MarshalableMessage {
}
/** */
- public @NotNull ColocationGroup findGroup(long sourceId) {
- List<ColocationGroup> grps = colocationGroups.stream()
- .filter(c -> c.belongs(sourceId))
+ public @NotNull ColocationGroup findGroup(long srcId) {
+ List<ColocationGroup> grps = colocationGrps.stream()
+ .filter(c -> c.belongs(srcId))
.collect(Collectors.toList());
if (grps.isEmpty())
- throw new IllegalStateException("Failed to find group with given
id. [sourceId=" + sourceId + "]");
+ throw new IllegalStateException("Failed to find group with given
id. [sourceId=" + srcId + "]");
else if (grps.size() > 1)
- throw new IllegalStateException("Multiple groups with the same id
found. [sourceId=" + sourceId + "]");
+ throw new IllegalStateException("Multiple groups with the same id
found. [sourceId=" + srcId + "]");
return F.first(grps);
}
@@ -174,63 +174,12 @@ public class FragmentMapping implements
MarshalableMessage {
srcIds.forEach(srcId -> explicitMappingGrps.add(findGroup(srcId)));
- return new FragmentMapping(Commons.transform(colocationGroups,
+ return new FragmentMapping(Commons.transform(colocationGrps,
g -> explicitMappingGrps.contains(g) ? g.explicitMapping() : g));
}
- /** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) {
- colocationGroups.forEach(g -> g.prepareMarshal(ctx));
- }
-
- /** {@inheritDoc} */
- @Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx) {
- colocationGroups.forEach(g -> g.prepareUnmarshal(ctx));
- }
-
/** {@inheritDoc} */
@Override public MessageType type() {
return MessageType.FRAGMENT_MAPPING;
}
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeCollection(colocationGroups,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- colocationGroups =
reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
index 5c0739e0065..1ba24925810 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
@@ -267,7 +267,7 @@ public class GridJobExecuteRequest implements
ExecutorAwareMessage {
this.cpSpi = cpSpi == null || cpSpi.isEmpty() ? null : cpSpi;
}
-
+
/**
* @return Task session ID.
*/