This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2a9ed0499aa IGNITE-26659 Use MessageSerializer for QueryTxEntry
(#12404)
2a9ed0499aa is described below
commit 2a9ed0499aa6f3484e779984d4dfe4345e1758d6
Author: Denis <[email protected]>
AuthorDate: Mon Oct 13 17:16:30 2025 +1000
IGNITE-26659 Use MessageSerializer for QueryTxEntry (#12404)
---
.../query/calcite/message/MessageType.java | 3 +-
.../query/calcite/message/QueryTxEntry.java | 123 ++++++---------------
2 files changed, 37 insertions(+), 89 deletions(-)
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
index 9492d66d650..a86ec3dcf65 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/MessageType.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.processors.query.calcite.message;
import java.util.function.Supplier;
import org.apache.ignite.internal.codegen.FragmentDescriptionSerializer;
+import org.apache.ignite.internal.codegen.QueryTxEntrySerializer;
import
org.apache.ignite.internal.processors.query.calcite.metadata.ColocationGroup;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentDescription;
import
org.apache.ignite.internal.processors.query.calcite.metadata.FragmentMapping;
@@ -62,7 +63,7 @@ public enum MessageType {
FRAGMENT_DESCRIPTION(352, FragmentDescription::new, new
FragmentDescriptionSerializer()),
/** */
- QUERY_TX_ENTRY(353, QueryTxEntry::new);
+ QUERY_TX_ENTRY(353, QueryTxEntry::new, new QueryTxEntrySerializer());
/** */
private final int directType;
diff --git
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
index 87b01d0ba04..a2b8feac693 100644
---
a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
+++
b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/message/QueryTxEntry.java
@@ -17,39 +17,42 @@
package org.apache.ignite.internal.processors.query.calcite.message;
-import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Comparator;
import java.util.function.Function;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.TransactionConfiguration;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import
org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* Class to pass to remote nodes transaction changes.
*
* @see TransactionConfiguration#setTxAwareQueriesEnabled(boolean)
* @see ExecutionContext#transactionChanges(Collection)
- * @see ExecutionContext#transactionChanges(int, int[], Function)
+ * @see ExecutionContext#transactionChanges(int, int[], Function, Comparator)
* @see QueryStartRequest#queryTransactionEntries()
*/
public class QueryTxEntry implements CalciteMessage {
/** Cache id. */
+ @Order(0)
private int cacheId;
/** Entry key. */
+ @Order(1)
private KeyCacheObject key;
/** Entry value. */
+ @Order(value = 2, method = "value")
private CacheObject val;
/** Entry version. */
+ @Order(value = 3, method = "version")
private GridCacheVersion ver;
/**
@@ -77,21 +80,49 @@ public class QueryTxEntry implements CalciteMessage {
return cacheId;
}
+ /**
+ * @param cacheId New cache id.
+ */
+ public void cacheId(int cacheId) {
+ this.cacheId = cacheId;
+ }
+
/** @return Entry key. */
public KeyCacheObject key() {
return key;
}
+ /**
+ * @param key New entry key.
+ */
+ public void key(KeyCacheObject key) {
+ this.key = key;
+ }
+
/** @return Entry value. */
public CacheObject value() {
return val;
}
+ /**
+ * @param val New entry value.
+ */
+ public void value(CacheObject val) {
+ this.val = val;
+ }
+
/** @return Entry version. */
public GridCacheVersion version() {
return ver;
}
+ /**
+ * @param ver New entry version.
+ */
+ public void version(GridCacheVersion ver) {
+ this.ver = ver;
+ }
+
/** */
public void prepareMarshal(GridCacheSharedContext<?, ?> ctx) throws
IgniteCheckedException {
CacheObjectContext coctx =
ctx.cacheContext(cacheId).cacheObjectContext();
@@ -110,94 +141,10 @@ public class QueryTxEntry implements CalciteMessage {
if (val != null)
val.finishUnmarshal(coctx, ldr);
-
}
/** {@inheritDoc} */
@Override public MessageType type() {
return MessageType.QUERY_TX_ENTRY;
}
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeInt(cacheId))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeKeyCacheObject(key))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeCacheObject(val))
- return false;
-
- writer.incrementState();
-
- case 3:
- if (!writer.writeMessage(ver))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- cacheId = reader.readInt();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 1:
- key = reader.readKeyCacheObject();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 2:
- val = reader.readCacheObject();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 3:
- ver = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
}