This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new bd27205ec5d IGNITE-27226 Use MessageSerializer for QueryStartRequest
(#12548)
bd27205ec5d is described below
commit bd27205ec5deec7ccb8b4849e79efc2c5175b22a
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Dec 4 14:56:48 2025 +0300
IGNITE-27226 Use MessageSerializer for QueryStartRequest (#12548)
---
.../query/calcite/exec/ExecutionServiceImpl.java | 4 +-
.../query/calcite/message/MessageType.java | 3 +-
.../query/calcite/message/QueryStartRequest.java | 283 +++++++--------------
3 files changed, 89 insertions(+), 201 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
index 5fe024f934e..6b09e00c636 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/ExecutionServiceImpl.java
@@ -882,7 +882,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
);
final BaseQueryContext qctx = createQueryContext(
- msg.appAttrs() == null ? Contexts.empty() : Contexts.of(new
SessionContextImpl(msg.appAttrs())),
+ msg.applicationAttributes() == null ? Contexts.empty() :
Contexts.of(new SessionContextImpl(msg.applicationAttributes())),
msg.schema());
QueryPlan qryPlan = queryPlanCache().queryPlan(
@@ -903,7 +903,7 @@ public class ExecutionServiceImpl<Row> extends
AbstractService implements Execut
msg.fragmentDescription(),
handler,
qry.createMemoryTracker(memoryTracker,
cfg.getQueryMemoryQuota()),
- createIoTracker(nodeId, msg.originatingQryId()),
+ createIoTracker(nodeId, msg.originatingQueryId()),
msg.timeout(),
Commons.parametersMap(msg.parameters()),
msg.queryTransactionEntries()
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index 4c27c63a66c..4d74ac8b54a 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -27,6 +27,7 @@ import
org.apache.ignite.internal.codegen.InboxCloseMessageSerializer;
import
org.apache.ignite.internal.codegen.QueryBatchAcknowledgeMessageSerializer;
import org.apache.ignite.internal.codegen.QueryBatchMessageSerializer;
import org.apache.ignite.internal.codegen.QueryCloseMessageSerializer;
+import org.apache.ignite.internal.codegen.QueryStartRequestSerializer;
import org.apache.ignite.internal.codegen.QueryStartResponseSerializer;
import org.apache.ignite.internal.codegen.QueryTxEntrySerializer;
import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
@@ -39,7 +40,7 @@ import
org.apache.ignite.plugin.extensions.communication.MessageSerializer;
*/
public enum MessageType {
/** */
- QUERY_START_REQUEST(300, QueryStartRequest::new),
+ QUERY_START_REQUEST(300, QueryStartRequest::new, new
QueryStartRequestSerializer()),
/** */
QUERY_START_RESPONSE(301, QueryStartResponse::new, new
QueryStartResponseSerializer()),
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
index db2ae3063dd..e4c4c338a4c 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryStartRequest.java
@@ -17,21 +17,15 @@
package org.apache.ignite.internal.processors.query.calcite.message;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectCollection;
-import org.apache.ignite.internal.GridDirectMap;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import org.apache.ignite.internal.util.typedef.internal.U;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -39,43 +33,51 @@ import org.jetbrains.annotations.Nullable;
*/
public class QueryStartRequest implements MarshalableMessage,
ExecutionContextAware {
/** */
+ @Order(0)
private String schema;
/** */
+ @Order(value = 1, method = "queryId")
private UUID qryId;
/** */
+ @Order(value = 2, method = "originatingQueryId")
private long originatingQryId;
/** */
+ @Order(value = 3, method = "topologyVersion")
private AffinityTopologyVersion ver;
/** */
+ @Order(value = 4, method = "fragmentDescription")
private FragmentDescription fragmentDesc;
/** */
+ @Order(value = 5)
private String root;
/** Total count of fragments in query for this node. */
+ @Order(value = 6, method = "totalFragmentsCount")
private int totalFragmentsCnt;
/** */
- @GridDirectTransient
- private Object[] params;
+ private @Nullable Object[] params;
/** */
- private byte[] paramsBytes;
+ @Order(value = 7, method = "parametersBytes")
+ private @Nullable byte[] paramsBytes;
/** */
+ @Order(value = 8)
private long timeout;
/** */
- @GridDirectCollection(QueryTxEntry.class)
+ @Order(value = 9, method = "queryTransactionEntries")
private @Nullable Collection<QueryTxEntry> qryTxEntries;
/** */
- @GridDirectMap(keyType = String.class, valueType = String.class)
- private Map<String, String> appAttrs;
+ @Order(value = 10, method = "applicationAttributes")
+ private @Nullable Map<String, String> appAttrs;
/** */
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
@@ -87,10 +89,10 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
AffinityTopologyVersion ver,
FragmentDescription fragmentDesc,
int totalFragmentsCnt,
- Object[] params,
+ @Nullable Object[] params,
@Nullable byte[] paramsBytes,
long timeout,
- Collection<QueryTxEntry> qryTxEntries,
+ @Nullable Collection<QueryTxEntry> qryTxEntries,
@Nullable Map<String, String> appAttrs
) {
this.qryId = qryId;
@@ -117,18 +119,43 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
return schema;
}
+ /** */
+ public void schema(String schema) {
+ this.schema = schema;
+ }
+
/** {@inheritDoc} */
@Override public UUID queryId() {
return qryId;
}
+ /** */
+ public void queryId(UUID qryId) {
+ this.qryId = qryId;
+ }
+
/**
* @return Registered local query ID on originating node.
*/
- public long originatingQryId() {
+ public long originatingQueryId() {
return originatingQryId;
}
+ /** */
+ public void originatingQueryId(long originatingQryId) {
+ this.originatingQryId = originatingQryId;
+ }
+
+ /** */
+ public @Nullable byte[] parametersBytes() {
+ return paramsBytes;
+ }
+
+ /** */
+ public void parametersBytes(@Nullable byte[] paramsBytes) {
+ this.paramsBytes = paramsBytes;
+ }
+
/** {@inheritDoc} */
@Override public long fragmentId() {
return fragmentDesc.fragmentId();
@@ -141,6 +168,11 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
return fragmentDesc;
}
+ /** */
+ public void fragmentDescription(FragmentDescription fragmentDesc) {
+ this.fragmentDesc = fragmentDesc;
+ }
+
/**
* @return Topology version.
*/
@@ -148,6 +180,11 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
return ver;
}
+ /** */
+ public void topologyVersion(AffinityTopologyVersion ver) {
+ this.ver = ver;
+ }
+
/**
* @return Fragment plan.
*/
@@ -155,6 +192,11 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
return root;
}
+ /** */
+ public void root(String root) {
+ this.root = root;
+ }
+
/**
* @return Total count of fragments in query for this node.
*/
@@ -162,11 +204,16 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
return totalFragmentsCnt;
}
+ /** */
+ public void totalFragmentsCount(int totalFragmentsCnt) {
+ this.totalFragmentsCnt = totalFragmentsCnt;
+ }
+
/**
* @return Query parameters.
*/
@SuppressWarnings("AssignmentOrReturnOfFieldWithMutableType")
- public Object[] parameters() {
+ public @Nullable Object[] parameters() {
return params;
}
@@ -185,6 +232,16 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
return timeout;
}
+ /** */
+ public void timeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+ /** */
+ public void parametersBytes(long timeout) {
+ this.timeout = timeout;
+ }
+
/**
* @return Transaction entries to mixin on query processing.
*/
@@ -193,10 +250,20 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
}
/** */
- public Map<String, String> appAttrs() {
+ public void queryTransactionEntries(@Nullable Collection<QueryTxEntry>
qryTxEntries) {
+ this.qryTxEntries = qryTxEntries;
+ }
+
+ /** */
+ public @Nullable Map<String, String> applicationAttributes() {
return appAttrs;
}
+ /** */
+ public void applicationAttributes(@Nullable Map<String, String> appAttrs) {
+ this.appAttrs = appAttrs;
+ }
+
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
if (paramsBytes == null && params != null)
@@ -225,186 +292,6 @@ public class QueryStartRequest implements
MarshalableMessage, ExecutionContextAw
}
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeMessage(fragmentDesc))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeLong(originatingQryId))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeByteArray(paramsBytes))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeUuid(qryId))
- return false;
-
- writer.incrementState();
-
- case 4:
- if (!writer.writeString(root))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeString(schema))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeLong(timeout))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeInt(totalFragmentsCnt))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeCollection(qryTxEntries,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- case 9:
- if (!writer.writeAffinityTopologyVersion(ver))
- return false;
-
- writer.incrementState();
-
- case 10:
- if (!writer.writeMap(appAttrs,
MessageCollectionItemType.STRING, MessageCollectionItemType.STRING))
- return false;
-
- writer.incrementState();
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- fragmentDesc = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- originatingQryId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- paramsBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- qryId = reader.readUuid();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 4:
- root = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- schema = reader.readString();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- timeout = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- totalFragmentsCnt = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- qryTxEntries =
reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 9:
- ver = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 10:
- appAttrs = reader.readMap(MessageCollectionItemType.STRING,
MessageCollectionItemType.STRING, false);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public MessageType type() {
return MessageType.QUERY_START_REQUEST;