This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 4e2d7d6a1f2 IGNITE-27862 Use MessageSerializer for DataStreamerRequest
(#12747)
4e2d7d6a1f2 is described below
commit 4e2d7d6a1f213f5591b9d43324fdc6233e9a789e
Author: Alexey Abashev <[email protected]>
AuthorDate: Tue Mar 3 13:12:49 2026 +0300
IGNITE-27862 Use MessageSerializer for DataStreamerRequest (#12747)
---
.../communication/GridIoMessageFactory.java | 3 +-
.../datastreamer/DataStreamerRequest.java | 317 +++------------------
2 files changed, 38 insertions(+), 282 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 3396b8dad0b..0e4e507b524 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -278,6 +278,7 @@ import
org.apache.ignite.internal.processors.continuous.GridContinuousMessage;
import
org.apache.ignite.internal.processors.continuous.GridContinuousMessageSerializer;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerRequest;
+import
org.apache.ignite.internal.processors.datastreamer.DataStreamerRequestSerializer;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerResponse;
import
org.apache.ignite.internal.processors.datastreamer.DataStreamerResponseSerializer;
import
org.apache.ignite.internal.processors.marshaller.MissingMappingRequestMessage;
@@ -434,7 +435,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)58, GridCacheQueryRequest::new, new
GridCacheQueryRequestSerializer());
factory.register((short)59, GridCacheQueryResponse::new, new
GridCacheQueryResponseSerializer());
factory.register((short)61, GridContinuousMessage::new, new
GridContinuousMessageSerializer());
- factory.register((short)62, DataStreamerRequest::new);
+ factory.register((short)62, DataStreamerRequest::new, new
DataStreamerRequestSerializer());
factory.register((short)63, DataStreamerResponse::new, new
DataStreamerResponseSerializer());
factory.register((short)76, GridTaskResultRequest::new, new
GridTaskResultRequestSerializer());
factory.register((short)77, GridTaskResultResponse::new, new
GridTaskResultResponseSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
index 577dc470caf..d14d23de8e9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerRequest.java
@@ -17,80 +17,90 @@
package org.apache.ignite.internal.processors.datastreamer;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.configuration.DeploymentMode;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectMap;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.plugin.extensions.communication.Message;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-import static
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType.IGNITE_UUID;
-
/**
*
*/
public class DataStreamerRequest implements Message {
/** */
- private long reqId;
+ @Order(0)
+ long reqId;
/** */
- private byte[] resTopicBytes;
+ // TODO: Refactor bytes serialization - IGNITE-27977
+ @Order(1)
+ byte[] resTopicBytes;
/** Cache name. */
- private String cacheName;
+ @Order(2)
+ String cacheName;
/** */
- private byte[] updaterBytes;
+ // TODO: Refactor bytes serialization - IGNITE-27977
+ @Order(3)
+ byte[] updaterBytes;
/** Entries to update. */
- @GridDirectCollection(DataStreamerEntry.class)
- private Collection<DataStreamerEntry> entries;
+ @Order(4)
+ Collection<DataStreamerEntry> entries;
/** {@code True} to ignore deployment ownership. */
- private boolean ignoreDepOwnership;
+ @Order(5)
+ boolean ignoreDepOwnership;
/** */
- private boolean skipStore;
+ @Order(6)
+ boolean skipStore;
/** Keep binary flag. */
- private boolean keepBinary;
+ @Order(7)
+ boolean keepBinary;
/** */
- private DeploymentMode depMode;
+ // TODO: DeploymentMode enum is serialized as byte ordinal - consider
refactoring to use enum serialization
+ @Order(8)
+ DeploymentMode depMode;
/** */
- private String sampleClsName;
+ @Order(9)
+ String sampleClsName;
/** */
- private String userVer;
+ @Order(10)
+ String userVer;
/** Node class loader participants. */
@GridToStringInclude
- @GridDirectMap(keyType = UUID.class, valueType = IgniteUuid.class)
- private Map<UUID, IgniteUuid> ldrParticipants;
+ @Order(11)
+ Map<UUID, IgniteUuid> ldrParticipants;
/** */
- private IgniteUuid clsLdrId;
+ @Order(12)
+ IgniteUuid clsLdrId;
/** */
- private boolean forceLocDep;
+ @Order(13)
+ boolean forceLocDep;
/** Topology version. */
- private AffinityTopologyVersion topVer;
+ @Order(14)
+ AffinityTopologyVersion topVer;
/** */
- private int partId;
+ @Order(15)
+ int partId;
/**
* Empty constructor.
@@ -272,261 +282,6 @@ public class DataStreamerRequest implements Message {
return S.toString(DataStreamerRequest.class, this);
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeString(cacheName))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeIgniteUuid(clsLdrId))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeByte(depMode != null ?
(byte)depMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeCollection(entries,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeBoolean(forceLocDep))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeBoolean(ignoreDepOwnership))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeBoolean(keepBinary))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeMap(ldrParticipants,
MessageCollectionItemType.UUID, IGNITE_UUID))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeInt(partId))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeLong(reqId))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeByteArray(resTopicBytes))
- return false;
-
- writer.incrementState();
-
- case 11:
- if (!writer.writeString(sampleClsName))
- return false;
-
- writer.incrementState();
-
- case 12:
- if (!writer.writeBoolean(skipStore))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeAffinityTopologyVersion(topVer))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeByteArray(updaterBytes))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeString(userVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- cacheName = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- clsLdrId = reader.readIgniteUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- byte depModeOrd;
-
- depModeOrd = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- depMode = DeploymentMode.fromOrdinal(depModeOrd);
-
- reader.incrementState();
-
- case 3:
- entries = reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- forceLocDep = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- ignoreDepOwnership = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- keepBinary = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- ldrParticipants =
reader.readMap(MessageCollectionItemType.UUID, IGNITE_UUID, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- partId = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- reqId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- resTopicBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 11:
- sampleClsName = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 12:
- skipStore = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- topVer = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- updaterBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- userVer = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 62;