This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new ca1ac020a3e IGNITE-25961 Use MessageSerializer for GridNearAtomicUpdateResponse and GridNearAtomicCheckUpdateRequest (#12201) ca1ac020a3e is described below commit ca1ac020a3e943ecc4e4b0b8fb0aa71184e9e9ce Author: Maksim Davydov <70368398+maksa...@users.noreply.github.com> AuthorDate: Thu Sep 18 12:41:02 2025 +0300 IGNITE-25961 Use MessageSerializer for GridNearAtomicUpdateResponse and GridNearAtomicCheckUpdateRequest (#12201) --- .../communication/GridIoMessageFactory.java | 6 +- .../processors/cache/GridCacheIdMessage.java | 4 + .../atomic/GridNearAtomicCheckUpdateRequest.java | 84 ++------- .../dht/atomic/GridNearAtomicUpdateResponse.java | 198 ++++++--------------- 4 files changed, 81 insertions(+), 211 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index f48f7c3a599..4a2ac144135 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -41,6 +41,8 @@ import org.apache.ignite.internal.codegen.GridDhtTxOnePhaseCommitAckRequestSeria import org.apache.ignite.internal.codegen.GridIntListSerializer; import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer; import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer; +import org.apache.ignite.internal.codegen.GridNearAtomicCheckUpdateRequestSerializer; +import org.apache.ignite.internal.codegen.GridNearAtomicUpdateResponseSerializer; import org.apache.ignite.internal.codegen.GridQueryCancelRequestSerializer; import org.apache.ignite.internal.codegen.GridQueryFailResponseSerializer; import org.apache.ignite.internal.codegen.GridQueryKillRequestSerializer; @@ -221,7 +223,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register((short)-53, SchemaOperationStatusMessage::new, new SchemaOperationStatusMessageSerializer()); factory.register((short)-52, GridIntList::new, new GridIntListSerializer()); factory.register((short)-51, NearCacheUpdates::new, new NearCacheUpdatesSerializer()); - factory.register((short)-50, GridNearAtomicCheckUpdateRequest::new); + factory.register((short)-50, GridNearAtomicCheckUpdateRequest::new, new GridNearAtomicCheckUpdateRequestSerializer()); factory.register((short)-49, UpdateErrors::new); factory.register((short)-48, GridDhtAtomicNearResponse::new); factory.register((short)-45, GridChangeGlobalStateMessageResponse::new); @@ -273,7 +275,7 @@ public class GridIoMessageFactory implements MessageFactoryProvider { factory.register((short)38, GridDhtAtomicUpdateRequest::new); factory.register((short)39, GridDhtAtomicUpdateResponse::new); factory.register((short)40, GridNearAtomicFullUpdateRequest::new); - factory.register((short)41, GridNearAtomicUpdateResponse::new); + factory.register((short)41, GridNearAtomicUpdateResponse::new, new GridNearAtomicUpdateResponseSerializer()); factory.register((short)42, GridDhtForceKeysRequest::new); factory.register((short)43, GridDhtForceKeysResponse::new); factory.register((short)45, GridDhtPartitionDemandMessage::new); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java index be87ddff1f1..8a9c8a4da58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIdMessage.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import java.nio.ByteBuffer; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -29,6 +30,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; public abstract class GridCacheIdMessage extends GridCacheMessage { /** Cache ID. */ @GridToStringInclude + @Order(3) protected int cacheId; /** @@ -52,6 +54,7 @@ public abstract class GridCacheIdMessage extends GridCacheMessage { /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + // TODO: Remove #writeTo() after all inheritors have migrated to the new ser/der scheme (IGNITE-25490). writer.setBuffer(buf); if (!super.writeTo(buf, writer)) @@ -78,6 +81,7 @@ public abstract class GridCacheIdMessage extends GridCacheMessage { /** {@inheritDoc} */ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + // TODO: Remove #readFrom() after all inheritors have migrated to the new ser/der scheme (IGNITE-25490). reader.setBuffer(buf); if (!super.readFrom(buf, reader)) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java index 39ba66599dc..78a11bf56d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicCheckUpdateRequest.java @@ -17,12 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.nio.ByteBuffer; -import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; /** * @@ -32,13 +29,14 @@ public class GridNearAtomicCheckUpdateRequest extends GridCacheIdMessage { public static final int CACHE_MSG_IDX = nextIndexId(); /** */ - @GridDirectTransient private GridNearAtomicAbstractUpdateRequest updateReq; /** */ + @Order(value = 4, method = "partition") private int partId; /** */ + @Order(value = 5, method = "futureId") private long futId; /** @@ -69,6 +67,14 @@ public class GridNearAtomicCheckUpdateRequest extends GridCacheIdMessage { return futId; } + /** + * @param futId Future ID on near node. + */ + public void futureId(long futId) { + this.futId = futId; + } + + /** * @return Related update request. */ @@ -81,6 +87,13 @@ public class GridNearAtomicCheckUpdateRequest extends GridCacheIdMessage { return partId; } + /** + * @param partId Partition ID this message is targeted to or {@code -1} if it cannot be determined. + */ + public void partition(int partId) { + this.partId = partId; + } + /** {@inheritDoc} */ @Override public int lookupIndex() { return CACHE_MSG_IDX; @@ -96,67 +109,6 @@ public class GridNearAtomicCheckUpdateRequest extends GridCacheIdMessage { return -50; } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 4: - if (!writer.writeLong(futId)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeInt(partId)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 4: - futId = reader.readLong(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - partId = reader.readInt(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridNearAtomicCheckUpdateRequest.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java index 086391bbe39..ff631a36d47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java @@ -17,14 +17,12 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -36,9 +34,6 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; /** @@ -49,35 +44,39 @@ public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements public static final int CACHE_MSG_IDX = nextIndexId(); /** Node ID this reply should be sent to. */ - @GridDirectTransient private UUID nodeId; /** Future ID. */ + @Order(value = 4, method = "futureId") private long futId; /** */ + @Order(value = 5, method = "errors") private UpdateErrors errs; /** Return value. */ @GridToStringInclude + @Order(value = 6, method = "returnValue") private GridCacheReturn ret; /** */ + @Order(value = 7, method = "remapTopologyVersion") private AffinityTopologyVersion remapTopVer; /** Data for near cache update. */ + @Order(8) private NearCacheUpdates nearUpdates; /** Partition ID. */ - private int partId = -1; + @Order(value = 9, method = "partition") + private int partId; /** */ - @GridDirectCollection(UUID.class) @GridToStringInclude + @Order(10) private List<UUID> mapping; /** */ - @GridDirectTransient private boolean nodeLeft; /** @@ -107,6 +106,8 @@ public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements this.partId = partId; this.nodeLeft = nodeLeft; this.addDepInfo = addDepInfo; + + assert partId >= 0; } /** @@ -156,6 +157,27 @@ public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements return futId; } + /** + * @param futId New future ID. + */ + public void futureId(long futId) { + this.futId = futId; + } + + /** + * @return Errs. + */ + public UpdateErrors errors() { + return errs; + } + + /** + * @param errs New errs. + */ + public void errors(UpdateErrors errs) { + this.errs = errs; + } + /** * Sets update error. * @@ -197,14 +219,14 @@ public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements /** * @param remapTopVer Topology version to remap update. */ - void remapTopologyVersion(AffinityTopologyVersion remapTopVer) { + public void remapTopologyVersion(AffinityTopologyVersion remapTopVer) { this.remapTopVer = remapTopVer; } /** * @return Topology version if update should be remapped. */ - @Nullable AffinityTopologyVersion remapTopologyVersion() { + @Nullable public AffinityTopologyVersion remapTopologyVersion() { return remapTopVer; } @@ -369,11 +391,32 @@ public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements ret.finishUnmarshal(cctx, ldr); } + /** + * @return Data for near cache update. + */ + public NearCacheUpdates nearUpdates() { + return nearUpdates; + } + + /** + * @param nearUpdates New data for near cache update. + */ + public void nearUpdates(NearCacheUpdates nearUpdates) { + this.nearUpdates = nearUpdates; + } + /** {@inheritDoc} */ @Override public int partition() { return partId; } + /** + * @param partId New partition ID. + */ + public void partition(int partId) { + this.partId = partId; + } + /** {@inheritDoc} */ @Override public boolean addDeploymentInfo() { return addDepInfo; @@ -384,137 +427,6 @@ public class GridNearAtomicUpdateResponse extends GridCacheIdMessage implements return ctx.atomicMessageLogger(); } - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 4: - if (!writer.writeMessage(errs)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeLong(futId)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeCollection(mapping, MessageCollectionItemType.UUID)) - return false; - - writer.incrementState(); - - case 7: - if (!writer.writeMessage(nearUpdates)) - return false; - - writer.incrementState(); - - case 8: - if (!writer.writeInt(partId)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeAffinityTopologyVersion(remapTopVer)) - return false; - - writer.incrementState(); - - case 10: - if (!writer.writeMessage(ret)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 4: - errs = reader.readMessage(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - futId = reader.readLong(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: - mapping = reader.readCollection(MessageCollectionItemType.UUID); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: - nearUpdates = reader.readMessage(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 8: - partId = reader.readInt(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - remapTopVer = reader.readAffinityTopologyVersion(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: - ret = reader.readMessage(); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return true; - } - /** {@inheritDoc} */ @Override public short directType() { return 41;