This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b183a59c8c3 IGNITE-26947 Use MessageSerializer for
GridJobSiblingsResponse (#12499)
b183a59c8c3 is described below
commit b183a59c8c3e2660ec72d8dd846690a08d10510d
Author: Dmitry Werner <[email protected]>
AuthorDate: Thu Nov 6 18:25:21 2025 +0500
IGNITE-26947 Use MessageSerializer for GridJobSiblingsResponse (#12499)
---
.../ignite/internal/GridJobSiblingsResponse.java | 81 +++++++++-------------
.../communication/GridIoMessageFactory.java | 3 +-
.../processors/task/GridTaskProcessor.java | 11 +--
3 files changed, 40 insertions(+), 55 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsResponse.java
index 314e00d4b9c..9aeb780795f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobSiblingsResponse.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal;
-import java.nio.ByteBuffer;
import java.util.Collection;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.compute.ComputeJobSibling;
@@ -25,8 +24,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -34,10 +31,10 @@ import org.jetbrains.annotations.Nullable;
*/
public class GridJobSiblingsResponse implements Message {
/** */
- @GridDirectTransient
- private Collection<ComputeJobSibling> siblings;
+ private @Nullable Collection<ComputeJobSibling> siblings;
/** */
+ @Order(0)
private byte[] siblingsBytes;
/**
@@ -49,70 +46,56 @@ public class GridJobSiblingsResponse implements Message {
/**
* @param siblings Siblings.
- * @param siblingsBytes Serialized siblings.
*/
- public GridJobSiblingsResponse(@Nullable Collection<ComputeJobSibling>
siblings, @Nullable byte[] siblingsBytes) {
+ public GridJobSiblingsResponse(@Nullable Collection<ComputeJobSibling>
siblings) {
this.siblings = siblings;
- this.siblingsBytes = siblingsBytes;
}
/**
* @return Job siblings.
*/
- public Collection<ComputeJobSibling> jobSiblings() {
+ public @Nullable Collection<ComputeJobSibling> jobSiblings() {
return siblings;
}
/**
- * @param marsh Marshaller.
- * @throws IgniteCheckedException In case of error.
+ * @return Serialized siblings.
*/
- public void unmarshalSiblings(Marshaller marsh) throws
IgniteCheckedException {
- assert marsh != null;
-
- if (siblingsBytes != null)
- siblings = U.unmarshal(marsh, siblingsBytes, null);
+ public byte[] siblingsBytes() {
+ return siblingsBytes;
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray(siblingsBytes))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
+ /**
+ * @param siblingsBytes Serialized siblings.
+ */
+ public void siblingsBytes(byte[] siblingsBytes) {
+ this.siblingsBytes = siblingsBytes;
}
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- siblingsBytes = reader.readByteArray();
+ /**
+ * Marshals siblings to byte array.
+ *
+ * @param marsh Marshaller.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void marshalSiblings(Marshaller marsh) throws
IgniteCheckedException {
+ siblingsBytes = U.marshal(marsh, siblings);
+ }
- if (!reader.isLastRead())
- return false;
+ /**
+ * Unmarshals siblings from byte array.
+ *
+ * @param marsh Marshaller.
+ * @throws IgniteCheckedException In case of error.
+ */
+ public void unmarshalSiblings(Marshaller marsh) throws
IgniteCheckedException {
+ assert marsh != null;
- reader.incrementState();
+ if (siblingsBytes != null) {
+ siblings = U.unmarshal(marsh, siblingsBytes, null);
+ siblingsBytes = null;
}
-
- return true;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index cf07a3fcfb8..c3804b5beae 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -78,6 +78,7 @@ import
org.apache.ignite.internal.codegen.GridDistributedTxPrepareResponseSerial
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobExecuteResponseSerializer;
import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer;
+import org.apache.ignite.internal.codegen.GridJobSiblingsResponseSerializer;
import
org.apache.ignite.internal.codegen.GridNearAtomicCheckUpdateRequestSerializer;
import
org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateFilterRequestSerializer;
import
org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateRequestSerializer;
@@ -313,7 +314,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)1, GridJobExecuteRequest::new);
factory.register((short)2, GridJobExecuteResponse::new, new
GridJobExecuteResponseSerializer());
factory.register((short)3, GridJobSiblingsRequest::new, new
GridJobSiblingsRequestSerializer());
- factory.register((short)4, GridJobSiblingsResponse::new);
+ factory.register((short)4, GridJobSiblingsResponse::new, new
GridJobSiblingsResponseSerializer());
factory.register((short)5, GridTaskCancelRequest::new, new
GridTaskCancelRequestSerializer());
factory.register((short)6, GridTaskSessionRequest::new, new
GridTaskSessionRequestSerializer());
factory.register((short)7, GridCheckpointRequest::new, new
GridCheckpointRequestSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index b6411017fc9..54775dcb63a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -1412,11 +1412,12 @@ public class GridTaskProcessor extends
GridProcessorAdapter implements IgniteCha
boolean loc = ctx.localNodeId().equals(nodeId);
- ctx.io().sendToCustomTopic(nodeId, topic,
- new GridJobSiblingsResponse(
- loc ? siblings : null,
- loc ? null : U.marshal(marsh, siblings)),
- SYSTEM_POOL);
+ GridJobSiblingsResponse resp = new
GridJobSiblingsResponse(siblings);
+
+ if (!loc)
+ resp.marshalSiblings(marsh);
+
+ ctx.io().sendToCustomTopic(nodeId, topic, resp,
SYSTEM_POOL);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send job sibling response.", e);