This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 36cbb274437 IGNITE-26956 Use MessageSerializer for GenericValueMessage
and QueryBatchMessage (#12500)
36cbb274437 is described below
commit 36cbb274437befaa4bac0f6d77aed85f7f077924
Author: Dmitry Werner <[email protected]>
AuthorDate: Fri Nov 7 19:24:39 2025 +0500
IGNITE-26956 Use MessageSerializer for GenericValueMessage and
QueryBatchMessage (#12500)
---
.../query/calcite/message/GenericValueMessage.java | 65 ++------
.../query/calcite/message/MessageType.java | 6 +-
.../query/calcite/message/QueryBatchMessage.java | 175 +++++++--------------
3 files changed, 77 insertions(+), 169 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
index b517befb770..e2521de6743 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/GenericValueMessage.java
@@ -17,23 +17,20 @@
package org.apache.ignite.internal.processors.query.calcite.message;
-import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
*/
public final class GenericValueMessage implements ValueMessage {
/** */
- @GridDirectTransient
private Object val;
/** */
+ @Order(0)
private byte[] serialized;
/** */
@@ -51,6 +48,20 @@ public final class GenericValueMessage implements
ValueMessage {
return val;
}
+ /**
+ * @return Serialized value.
+ */
+ public byte[] serialized() {
+ return serialized;
+ }
+
+ /**
+ * @param serialized Serialized value.
+ */
+ public void serialized(byte[] serialized) {
+ this.serialized = serialized;
+ }
+
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
if (val != null && serialized == null)
@@ -59,50 +70,8 @@ public final class GenericValueMessage implements
ValueMessage {
/** {@inheritDoc} */
@Override public void prepareUnmarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
- if (serialized != null && val == null) {
+ if (serialized != null && val == null)
val = U.unmarshal(ctx, serialized,
U.resolveClassLoader(ctx.gridConfig()));
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray(serialized))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- serialized = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
}
/** {@inheritDoc} */
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index e2424054d55..4c27c63a66c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -22,8 +22,10 @@ import
org.apache.ignite.internal.codegen.CalciteErrorMessageSerializer;
import org.apache.ignite.internal.codegen.ColocationGroupSerializer;
import org.apache.ignite.internal.codegen.FragmentDescriptionSerializer;
import org.apache.ignite.internal.codegen.FragmentMappingSerializer;
+import org.apache.ignite.internal.codegen.GenericValueMessageSerializer;
import org.apache.ignite.internal.codegen.InboxCloseMessageSerializer;
import
org.apache.ignite.internal.codegen.QueryBatchAcknowledgeMessageSerializer;
+import org.apache.ignite.internal.codegen.QueryBatchMessageSerializer;
import org.apache.ignite.internal.codegen.QueryCloseMessageSerializer;
import org.apache.ignite.internal.codegen.QueryStartResponseSerializer;
import org.apache.ignite.internal.codegen.QueryTxEntrySerializer;
@@ -46,7 +48,7 @@ public enum MessageType {
QUERY_ERROR_MESSAGE(302, CalciteErrorMessage::new, new
CalciteErrorMessageSerializer()),
/** */
- QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new),
+ QUERY_BATCH_MESSAGE(303, QueryBatchMessage::new, new
QueryBatchMessageSerializer()),
/** */
QUERY_ACKNOWLEDGE_MESSAGE(304, QueryBatchAcknowledgeMessage::new, new
QueryBatchAcknowledgeMessageSerializer()),
@@ -58,7 +60,7 @@ public enum MessageType {
QUERY_CLOSE_MESSAGE(306, QueryCloseMessage::new, new
QueryCloseMessageSerializer()),
/** */
- GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new),
+ GENERIC_VALUE_MESSAGE(307, GenericValueMessage::new, new
GenericValueMessageSerializer()),
/** */
FRAGMENT_MAPPING(350, FragmentMapping::new, new
FragmentMappingSerializer()),
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
index 61bd2196c48..b325b11085c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryBatchMessage.java
@@ -17,43 +17,42 @@
package org.apache.ignite.internal.processors.query.calcite.message;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
*
*/
public class QueryBatchMessage implements MarshalableMessage,
ExecutionContextAware {
/** */
+ @Order(value = 0, method = "queryId")
private UUID qryId;
/** */
+ @Order(1)
private long fragmentId;
/** */
+ @Order(2)
private long exchangeId;
/** */
+ @Order(3)
private int batchId;
/** */
+ @Order(4)
private boolean last;
/** */
- @GridDirectTransient
private List<Object> rows;
/** */
- @GridDirectCollection(ValueMessage.class)
+ @Order(value = 5, method = "messageRows")
private List<ValueMessage> mRows;
/** */
@@ -75,11 +74,25 @@ public class QueryBatchMessage implements
MarshalableMessage, ExecutionContextAw
return qryId;
}
+ /**
+ * @param qryId Query ID.
+ */
+ public void queryId(UUID qryId) {
+ this.qryId = qryId;
+ }
+
/** {@inheritDoc} */
@Override public long fragmentId() {
return fragmentId;
}
+ /**
+ * @param fragmentId Fragment ID.
+ */
+ public void fragmentId(long fragmentId) {
+ this.fragmentId = fragmentId;
+ }
+
/**
* @return Exchange ID.
*/
@@ -87,6 +100,13 @@ public class QueryBatchMessage implements
MarshalableMessage, ExecutionContextAw
return exchangeId;
}
+ /**
+ * @param exchangeId Exchange ID.
+ */
+ public void exchangeId(long exchangeId) {
+ this.exchangeId = exchangeId;
+ }
+
/**
* @return Batch ID.
*/
@@ -94,6 +114,13 @@ public class QueryBatchMessage implements
MarshalableMessage, ExecutionContextAw
return batchId;
}
+ /**
+ * @param batchId Batch ID.
+ */
+ public void batchId(int batchId) {
+ this.batchId = batchId;
+ }
+
/**
* @return Last batch flag.
*/
@@ -101,6 +128,13 @@ public class QueryBatchMessage implements
MarshalableMessage, ExecutionContextAw
return last;
}
+ /**
+ * @param last Last batch flag.
+ */
+ public void last(boolean last) {
+ this.last = last;
+ }
+
/**
* @return Rows.
*/
@@ -108,6 +142,20 @@ public class QueryBatchMessage implements
MarshalableMessage, ExecutionContextAw
return rows;
}
+ /**
+ * @return Message rows.
+ */
+ public List<ValueMessage> messageRows() {
+ return mRows;
+ }
+
+ /**
+ * @param mRows Message rows.
+ */
+ public void messageRows(List<ValueMessage> mRows) {
+ this.mRows = mRows;
+ }
+
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
if (mRows != null || rows == null)
@@ -142,117 +190,6 @@ public class QueryBatchMessage implements
MarshalableMessage, ExecutionContextAw
}
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeInt(batchId))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeLong(exchangeId))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeLong(fragmentId))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeBoolean(last))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeCollection(mRows,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeUuid(qryId))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- batchId = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- exchangeId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- fragmentId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- last = reader.readBoolean();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- mRows = reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- qryId = reader.readUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public MessageType type() {
return MessageType.QUERY_BATCH_MESSAGE;