This is an automated email from the ASF dual-hosted git repository.
timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 689449726b1 IGNITE-26781: Use MessageSerializer for TxLockList,
CommittedVersion, GridNearSingleGetResponse (#12441)
689449726b1 is described below
commit 689449726b169d6163e16e1db1e120eea158c07f
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Oct 23 15:25:35 2025 +0300
IGNITE-26781: Use MessageSerializer for TxLockList, CommittedVersion,
GridNearSingleGetResponse (#12441)
---
.../communication/GridIoMessageFactory.java | 6 +-
.../distributed/dht/atomic/GridDhtAtomicCache.java | 4 +-
.../near/GridNearSingleGetResponse.java | 192 +++++++--------------
.../cache/transactions/IgniteTxManager.java | 36 ++--
.../cache/transactions/TxDeadlockDetection.java | 6 +-
.../processors/cache/transactions/TxLockList.java | 59 ++-----
6 files changed, 109 insertions(+), 194 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index ce66cbf0954..12fe9e90bf3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -81,6 +81,7 @@ import
org.apache.ignite.internal.codegen.GridNearGetResponseSerializer;
import org.apache.ignite.internal.codegen.GridNearLockRequestSerializer;
import org.apache.ignite.internal.codegen.GridNearLockResponseSerializer;
import org.apache.ignite.internal.codegen.GridNearSingleGetRequestSerializer;
+import org.apache.ignite.internal.codegen.GridNearSingleGetResponseSerializer;
import org.apache.ignite.internal.codegen.GridNearTxFinishRequestSerializer;
import org.apache.ignite.internal.codegen.GridNearTxFinishResponseSerializer;
import org.apache.ignite.internal.codegen.GridNearTxPrepareRequestSerializer;
@@ -116,6 +117,7 @@ import
org.apache.ignite.internal.codegen.SnapshotFilesRequestMessageSerializer;
import
org.apache.ignite.internal.codegen.TcpInverseConnectionResponseMessageSerializer;
import
org.apache.ignite.internal.codegen.TransactionAttributesAwareRequestSerializer;
import
org.apache.ignite.internal.codegen.TransactionIsolationMessageSerializer;
+import org.apache.ignite.internal.codegen.TxLockListSerializer;
import org.apache.ignite.internal.codegen.TxLockSerializer;
import org.apache.ignite.internal.codegen.TxLocksRequestSerializer;
import org.apache.ignite.internal.codegen.TxLocksResponseSerializer;
@@ -281,7 +283,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)-43, IgniteIoTestMessage::new);
factory.register((short)-36, GridDhtAtomicSingleUpdateRequest::new);
factory.register((short)-27, GridDhtTxOnePhaseCommitAckRequest::new,
new GridDhtTxOnePhaseCommitAckRequestSerializer());
- factory.register((short)-26, TxLockList::new);
+ factory.register((short)-26, TxLockList::new, new
TxLockListSerializer());
factory.register((short)-25, TxLock::new, new TxLockSerializer());
factory.register((short)-24, TxLocksRequest::new, new
TxLocksRequestSerializer());
factory.register((short)-23, TxLocksResponse::new, new
TxLocksResponseSerializer());
@@ -382,7 +384,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)114, GridDhtPartitionSupplyMessage::new, new
GridDhtPartitionSupplyMessageSerializer());
factory.register((short)115, UUIDCollectionMessage::new, new
UUIDCollectionMessageSerializer());
factory.register((short)116, GridNearSingleGetRequest::new, new
GridNearSingleGetRequestSerializer());
- factory.register((short)117, GridNearSingleGetResponse::new);
+ factory.register((short)117, GridNearSingleGetResponse::new, new
GridNearSingleGetResponseSerializer());
factory.register((short)118, CacheContinuousQueryBatchAck::new, new
CacheContinuousQueryBatchAckSerializer());
// [120..123] - DR
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 5f36506ce79..d14cca8e63d 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -829,9 +829,7 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@Override public <T> EntryProcessorResult<T> invoke(K key,
EntryProcessor<K, V, T> entryProcessor, Object... args)
throws IgniteCheckedException {
- IgniteInternalFuture<EntryProcessorResult<T>> invokeFut =
invoke0(false, key, entryProcessor, args);
-
- return invokeFut.get();
+ return invoke0(false, key, entryProcessor, args).get();
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
index a78d44418c5..d51a7b85c84 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.processors.cache.distributed.near;
-import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -28,10 +28,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheIdMessage;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
@@ -45,22 +42,23 @@ public class GridNearSingleGetResponse extends
GridCacheIdMessage implements Gri
public static final int CONTAINS_VAL_FLAG_MASK = 0x2;
/** Future ID. */
+ @Order(value = 4, method = "futureId")
private long futId;
- /** */
+ /** Result. */
+ @Order(value = 5, method = "result")
private Message res;
- /** */
+ /** Topology version. */
+ @Order(value = 6, method = "topologyVersion")
private AffinityTopologyVersion topVer;
- /** Error. */
- @GridDirectTransient
- private IgniteCheckedException err;
-
- /** Serialized error. */
- private byte[] errBytes;
+ /** Error message. */
+ @Order(value = 7, method = "errorMessage")
+ private ErrorMessage errMsg;
- /** */
+ /** Flags. */
+ @Order(8)
private byte flags;
/**
@@ -99,13 +97,27 @@ public class GridNearSingleGetResponse extends
GridCacheIdMessage implements Gri
/**
* @param err Error.
*/
- public void error(IgniteCheckedException err) {
- this.err = err;
+ public void error(Throwable err) {
+ this.errMsg = new ErrorMessage(err);
}
/** {@inheritDoc} */
- @Override public IgniteCheckedException error() {
- return err;
+ @Override public Throwable error() {
+ return ErrorMessage.error(errMsg);
+ }
+
+ /**
+ * @return Error message.
+ */
+ public ErrorMessage errorMessage() {
+ return errMsg;
+ }
+
+ /**
+ * @param errMsg Error message.
+ */
+ public void errorMessage(ErrorMessage errMsg) {
+ this.errMsg = errMsg;
}
/**
@@ -115,6 +127,13 @@ public class GridNearSingleGetResponse extends
GridCacheIdMessage implements Gri
return topVer != null ? topVer : super.topologyVersion();
}
+ /**
+ * @param topVer Topology version.
+ */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
/**
* @return {@code True} if invalid partitions error occurred.
*/
@@ -136,6 +155,20 @@ public class GridNearSingleGetResponse extends
GridCacheIdMessage implements Gri
flags |= CONTAINS_VAL_FLAG_MASK;
}
+ /**
+ * @return Flags.
+ */
+ public byte flags() {
+ return flags;
+ }
+
+ /**
+ * @param flags Flags.
+ */
+ public void flags(byte flags) {
+ this.flags = flags;
+ }
+
/**
* @return Result.
*/
@@ -143,6 +176,13 @@ public class GridNearSingleGetResponse extends
GridCacheIdMessage implements Gri
return res;
}
+ /**
+ * @param res Result.
+ */
+ public void result(Message res) {
+ this.res = res;
+ }
+
/**
* @return Future ID.
*/
@@ -150,6 +190,13 @@ public class GridNearSingleGetResponse extends
GridCacheIdMessage implements Gri
return futId;
}
+ /**
+ * @param futId Future ID.
+ */
+ public void futureId(long futId) {
+ this.futId = futId;
+ }
+
/** {@inheritDoc} */
@Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
super.prepareMarshal(ctx);
@@ -164,9 +211,6 @@ public class GridNearSingleGetResponse extends
GridCacheIdMessage implements Gri
else if (res instanceof GridCacheEntryInfo)
((GridCacheEntryInfo)res).marshal(cctx);
}
-
- if (err != null && errBytes == null)
- errBytes = U.marshal(ctx, err);
}
/** {@inheritDoc} */
@@ -183,112 +227,6 @@ public class GridNearSingleGetResponse extends
GridCacheIdMessage implements Gri
else if (res instanceof GridCacheEntryInfo)
((GridCacheEntryInfo)res).unmarshal(cctx, ldr);
}
-
- if (errBytes != null && err == null)
- err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 4:
- if (!writer.writeByteArray(errBytes))
- return false;
-
- writer.incrementState();
-
- case 5:
- if (!writer.writeByte(flags))
- return false;
-
- writer.incrementState();
-
- case 6:
- if (!writer.writeLong(futId))
- return false;
-
- writer.incrementState();
-
- case 7:
- if (!writer.writeMessage(res))
- return false;
-
- writer.incrementState();
-
- case 8:
- if (!writer.writeAffinityTopologyVersion(topVer))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 4:
- errBytes = reader.readByteArray();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 5:
- flags = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 6:
- futId = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 7:
- res = reader.readMessage();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 8:
- topVer = reader.readAffinityTopologyVersion();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
}
/** {@inheritDoc} */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 5fb6fb51365..6f814705de4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.processors.cache.transactions;
-import java.io.Externalizable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -1227,13 +1226,13 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
if (committed == null)
committed = new ArrayList<>();
- committed.add(e.getKey());
+ committed.add(returnVersion(e.getKey()));
}
else {
if (rolledback == null)
rolledback = new ArrayList<>();
- rolledback.add(e.getKey());
+ rolledback.add(returnVersion(e.getKey()));
}
}
@@ -1242,6 +1241,15 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
rolledback == null ? Collections.emptyList() : rolledback);
}
+ /**
+ * Hides internal {@link CommittedVersion}.
+ *
+ * @return Cache version.
+ */
+ private static GridCacheVersion returnVersion(GridCacheVersion ver) {
+ return ver instanceof CommittedVersion ?
((CommittedVersion)ver).originVer : ver;
+ }
+
/**
* Peeks completed versions history map to find out whether transaction
was committed or rolled back
* in the recent past.
@@ -3247,20 +3255,21 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
}
/**
- *
+ * Near version container. Is not for resending or serialization. Should
not be exposed outside.
*/
private static class CommittedVersion extends GridCacheVersion {
/** */
private static final long serialVersionUID = 0L;
- /** Corresponding near version. Transient. */
- private GridCacheVersion nearVer;
+ /** Transient corresponding near version. */
+ private final GridCacheVersion nearVer;
- /**
- * Empty constructor required by {@link Externalizable}.
- */
+ /** */
+ private final GridCacheVersion originVer;
+
+ /** */
public CommittedVersion() {
- // No-op.
+ throw new UnsupportedOperationException("Near committed version
container is not a message to send or serialize.");
}
/**
@@ -3270,10 +3279,17 @@ public class IgniteTxManager extends
GridCacheSharedManagerAdapter {
private CommittedVersion(GridCacheVersion ver, GridCacheVersion
nearVer) {
super(ver.topologyVersion(), ver.order(), ver.nodeOrder(),
ver.dataCenterId());
+ assert ver != null;
assert nearVer != null;
+ originVer = ver;
this.nearVer = nearVer;
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ throw new UnsupportedOperationException("Near committed version
container is not a message to send or serialize.");
+ }
}
/**
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
index f65485dc7b8..1febf364220 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java
@@ -330,7 +330,7 @@ public class TxDeadlockDetection {
@SuppressWarnings("ForLoopReplaceableByForEach")
private void mapTxKeys(@Nullable Set<IgniteTxKey> txKeys,
Map<IgniteTxKey, TxLockList> txLocks) {
for (Map.Entry<IgniteTxKey, TxLockList> e : txLocks.entrySet()) {
- List<TxLock> locks = e.getValue().txLocks();
+ List<TxLock> locks = e.getValue().transactionLocks();
for (int i = 0; i < locks.size(); i++) {
TxLock txLock = locks.get(i);
@@ -426,7 +426,7 @@ public class TxDeadlockDetection {
TxLockList lockList = e.getValue();
if (lockList != null && !lockList.isEmpty()) {
- for (TxLock lock : lockList.txLocks()) {
+ for (TxLock lock : lockList.transactionLocks()) {
if (lock.owner() || lock.candiate()) {
if (txs.get(lock.txId()) == null)
txs.put(lock.txId(), new
T2<>(lock.nearNodeId(), lock.threadId()));
@@ -466,7 +466,7 @@ public class TxDeadlockDetection {
GridCacheVersion txOwner = null;
- for (TxLock lock : e.getValue().txLocks()) {
+ for (TxLock lock : e.getValue().transactionLocks()) {
if (lock.owner() && txOwner == null) {
// Actually we can get lock list with more than one
owner. In this case ignore all owners
// except first because likely the first owner was
cause of deadlock.
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLockList.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLockList.java
index 1a3db893456..4bd02c6491e 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLockList.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLockList.java
@@ -17,16 +17,12 @@
package org.apache.ignite.internal.processors.cache.transactions;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.ignite.internal.GridDirectCollection;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
/**
* List of transaction locks for particular key.
@@ -34,7 +30,7 @@ import
org.apache.ignite.plugin.extensions.communication.MessageWriter;
public class TxLockList implements Message {
/** Tx locks. */
@GridToStringInclude
- @GridDirectCollection(value = TxLock.class)
+ @Order(value = 0, method = "transactionLocks")
private List<TxLock> txLocks = new ArrayList<>();
/**
@@ -47,10 +43,17 @@ public class TxLockList implements Message {
/**
* @return Lock list.
*/
- public List<TxLock> txLocks() {
+ public List<TxLock> transactionLocks() {
return txLocks;
}
+ /**
+ * @param txLocks Lock list.
+ */
+ public void transactionLocks(List<TxLock> txLocks) {
+ this.txLocks = txLocks;
+ }
+
/**
* @param txLock Tx lock.
*/
@@ -70,50 +73,8 @@ public class TxLockList implements Message {
return S.toString(TxLockList.class, this);
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeCollection(txLocks,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- txLocks = reader.readCollection(MessageCollectionItemType.MSG);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return -26;
}
-
}