This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 898d7befa1d IGNITE-26773 Use MessageSerializer for
GridDhtAtomicSingleUpdateRequest (#12436)
898d7befa1d is described below
commit 898d7befa1da3071c0047576cd1a35fb8d8c8b76
Author: Didar Shayarov <[email protected]>
AuthorDate: Thu Nov 13 11:20:20 2025 +0300
IGNITE-26773 Use MessageSerializer for GridDhtAtomicSingleUpdateRequest
(#12436)
---
.../communication/GridIoMessageFactory.java | 3 +-
.../atomic/GridDhtAtomicAbstractUpdateFuture.java | 11 +-
.../atomic/GridDhtAtomicAbstractUpdateRequest.java | 132 +++++++-------
.../distributed/dht/atomic/GridDhtAtomicCache.java | 7 -
.../atomic/GridDhtAtomicSingleUpdateFuture.java | 4 -
.../atomic/GridDhtAtomicSingleUpdateRequest.java | 193 ++++++++-------------
.../dht/atomic/GridDhtAtomicUpdateFuture.java | 4 +-
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 68 ++++----
8 files changed, 182 insertions(+), 240 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 9028c55a0c8..5dfabd85788 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -55,6 +55,7 @@ import
org.apache.ignite.internal.codegen.GridDeploymentResponseSerializer;
import
org.apache.ignite.internal.codegen.GridDhtAffinityAssignmentRequestSerializer;
import
org.apache.ignite.internal.codegen.GridDhtAtomicDeferredUpdateResponseSerializer;
import org.apache.ignite.internal.codegen.GridDhtAtomicNearResponseSerializer;
+import
org.apache.ignite.internal.codegen.GridDhtAtomicSingleUpdateRequestSerializer;
import
org.apache.ignite.internal.codegen.GridDhtAtomicUpdateResponseSerializer;
import org.apache.ignite.internal.codegen.GridDhtForceKeysRequestSerializer;
import org.apache.ignite.internal.codegen.GridDhtForceKeysResponseSerializer;
@@ -307,7 +308,7 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)-48, GridDhtAtomicNearResponse::new, new
GridDhtAtomicNearResponseSerializer());
factory.register((short)-45,
GridChangeGlobalStateMessageResponse::new, new
GridChangeGlobalStateMessageResponseSerializer());
factory.register((short)-43, IgniteIoTestMessage::new);
- factory.register((short)-36, GridDhtAtomicSingleUpdateRequest::new);
+ factory.register((short)-36, GridDhtAtomicSingleUpdateRequest::new,
new GridDhtAtomicSingleUpdateRequestSerializer());
factory.register((short)-27, GridDhtTxOnePhaseCommitAckRequest::new,
new GridDhtTxOnePhaseCommitAckRequestSerializer());
factory.register((short)-26, TxLockList::new, new
TxLockListSerializer());
factory.register((short)-25, TxLock::new, new TxLockSerializer());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 4034878c6e0..9831dc13598 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -171,8 +170,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture
extends GridCacheFutureA
if (log.isDebugEnabled())
log.debug("Mapping entry to DHT nodes [nodes=" + nodeIds(dhtNodes)
+ ", entry=" + entry + ']');
- CacheWriteSynchronizationMode syncMode =
updateReq.writeSynchronizationMode();
-
addDhtKey(entry.key(), dhtNodes);
for (int i = 0; i < dhtNodes.size(); i++) {
@@ -188,7 +185,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture
extends GridCacheFutureA
node.id(),
futId,
writeVer,
- syncMode,
topVer,
ttl,
conflictExpireTime,
@@ -245,8 +241,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture
extends GridCacheFutureA
boolean readRepairRecovery) {
assert readers != null;
- CacheWriteSynchronizationMode syncMode =
updateReq.writeSynchronizationMode();
-
addNearKey(entry.key(), readers);
AffinityTopologyVersion topVer = updateReq.topologyVersion();
@@ -278,7 +272,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture
extends GridCacheFutureA
node.id(),
futId,
writeVer,
- syncMode,
topVer,
ttl,
expireTime,
@@ -435,7 +428,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture
extends GridCacheFutureA
/**
* @param nearNode Near node.
- * @param sndRes {@code True} if allow to send result from DHT nodes.
+ * @param sndRes {@code True} if allow sending result from DHT nodes.
* @param ret Return value.
*/
private void sendDhtRequests(ClusterNode nearNode, GridCacheReturn ret,
boolean sndRes) {
@@ -513,7 +506,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture
extends GridCacheFutureA
* @param nodeId Node ID.
* @param futId Future ID.
* @param writeVer Update version.
- * @param syncMode Write synchronization mode.
* @param topVer Topology version.
* @param ttl TTL.
* @param conflictExpireTime Conflict expire time.
@@ -525,7 +517,6 @@ public abstract class GridDhtAtomicAbstractUpdateFuture
extends GridCacheFutureA
UUID nodeId,
long futId,
GridCacheVersion writeVer,
- CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
long ttl,
long conflictExpireTime,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
index 108b2810b5b..49fc5c2af21 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java
@@ -21,8 +21,8 @@ import java.nio.ByteBuffer;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheDeployable;
@@ -53,9 +53,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
/** */
static final int DHT_ATOMIC_HAS_RESULT_MASK = 0x08;
- /** */
- private static final int DHT_ATOMIC_REPLY_WITHOUT_DELAY = 0x10;
-
/** */
protected static final int DHT_ATOMIC_OBSOLETE_NEAR_KEY_FLAG_MASK = 0x20;
@@ -69,18 +66,19 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
public static final int CACHE_MSG_IDX = nextIndexId();
/** Future ID on primary. */
+ @Order(value = 4, method = "futureId")
protected long futId;
/** Write version. */
+ @Order(value = 5, method = "writeVersion")
protected GridCacheVersion writeVer;
- /** Write synchronization mode. */
- protected CacheWriteSynchronizationMode syncMode;
-
/** Topology version. */
+ @Order(value = 6, method = "topologyVersion")
protected AffinityTopologyVersion topVer;
/** Task name hash. */
+ @Order(7)
protected int taskNameHash;
/** Node ID. */
@@ -92,12 +90,15 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
private boolean onRes;
/** */
+ @Order(8)
private UUID nearNodeId;
/** */
+ @Order(value = 9, method = "nearFutureId")
private long nearFutId;
/** Additional flags. */
+ @Order(10)
protected byte flags;
/**
@@ -117,7 +118,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
UUID nodeId,
long futId,
GridCacheVersion writeVer,
- CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
int taskNameHash,
boolean addDepInfo,
@@ -131,7 +131,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
this.nodeId = nodeId;
this.futId = futId;
this.writeVer = writeVer;
- this.syncMode = syncMode;
this.topVer = topVer;
this.taskNameHash = taskNameHash;
this.addDepInfo = addDepInfo;
@@ -149,6 +148,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
return topVer;
}
+ /**
+ * @param topVer New topology version.
+ */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
/**
* @param nearNodeId Near node ID.
* @param nearFutId Future ID on near node.
@@ -160,20 +166,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
this.nearFutId = nearFutId;
}
- /**
- * @return {@code True} if backups should reply immediately.
- */
- boolean replyWithoutDelay() {
- return isFlag(DHT_ATOMIC_REPLY_WITHOUT_DELAY);
- }
-
- /**
- * @param replyWithoutDelay {@code True} if backups should reply
immediately.
- */
- void replyWithoutDelay(boolean replyWithoutDelay) {
- setFlag(replyWithoutDelay, DHT_ATOMIC_REPLY_WITHOUT_DELAY);
- }
-
/**
* @param res Result flag.
*/
@@ -195,6 +187,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
return nearNodeId;
}
+ /**
+ * @param nearNodeId New near node id.
+ */
+ public void nearNodeId(UUID nearNodeId) {
+ this.nearNodeId = nearNodeId;
+ }
+
/** {@inheritDoc} */
@Override public int lookupIndex() {
return CACHE_MSG_IDX;
@@ -214,6 +213,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
return flags;
}
+ /**
+ * @param flags New additional flags.
+ */
+ public void flags(byte flags) {
+ this.flags = flags;
+ }
+
/**
* @return Keep binary flag.
*/
@@ -267,14 +273,14 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
public abstract boolean forceTransformBackups();
/** {@inheritDoc} */
- @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) {
+ @Override public IgniteLogger messageLogger(GridCacheSharedContext<?, ?>
ctx) {
return ctx.atomicMessageLogger();
}
/**
* @param key Key to add.
* @param val Value, {@code null} if should be removed.
- * @param entryProcessor Entry processor.
+ * @param entryProc Entry processor.
* @param ttl TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
@@ -285,7 +291,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
*/
public abstract void addWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
+ EntryProcessor<Object, Object, Object> entryProc,
long ttl,
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer,
@@ -297,13 +303,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
/**
* @param key Key to add.
* @param val Value, {@code null} if should be removed.
- * @param entryProcessor Entry processor.
+ * @param entryProc Entry processor.
* @param ttl TTL.
* @param expireTime Expire time.
*/
public abstract void addNearWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
+ EntryProcessor<Object, Object, Object> entryProc,
long ttl,
long expireTime);
@@ -319,6 +325,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
return taskNameHash;
}
+ /**
+ * @param taskNameHash New task name hash.
+ */
+ public void taskNameHash(int taskNameHash) {
+ this.taskNameHash = taskNameHash;
+ }
+
/**
* @return Future ID on primary node.
*/
@@ -326,6 +339,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
return futId;
}
+ /**
+ * @param futId New future ID on primary node.
+ */
+ public void futureId(long futId) {
+ this.futId = futId;
+ }
+
/**
* @return Future ID on near node.
*/
@@ -333,6 +353,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
return nearFutId;
}
+ /**
+ * @param nearFutId New near future id.
+ */
+ public void nearFutureId(long nearFutId) {
+ this.nearFutId = nearFutId;
+ }
+
/**
* @return Write version.
*/
@@ -341,10 +368,10 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
}
/**
- * @return Cache write synchronization mode.
+ * @param writeVer New write version.
*/
- public final CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
+ public void writeVersion(GridCacheVersion writeVer) {
+ this.writeVer = writeVer;
}
/**
@@ -462,7 +489,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
}
/**
- * Reags flag mask.
+ * Reads flag mask.
*
* @param mask Mask to read.
* @return Flag value.
@@ -471,6 +498,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
return (flags & mask) != 0;
}
+ // TODO: remove after IGNITE-26774
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -511,24 +539,18 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
writer.incrementState();
case 8:
- if (!writer.writeByte(syncMode != null ?
(byte)syncMode.ordinal() : -1))
- return false;
-
- writer.incrementState();
-
- case 9:
if (!writer.writeInt(taskNameHash))
return false;
writer.incrementState();
- case 10:
+ case 9:
if (!writer.writeAffinityTopologyVersion(topVer))
return false;
writer.incrementState();
- case 11:
+ case 10:
if (!writer.writeMessage(writeVer))
return false;
@@ -580,18 +602,6 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
reader.incrementState();
case 8:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte();
-
- if (!reader.isLastRead())
- return false;
-
- syncMode =
CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
- reader.incrementState();
-
- case 9:
taskNameHash = reader.readInt();
if (!reader.isLastRead())
@@ -599,7 +609,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
reader.incrementState();
- case 10:
+ case 9:
topVer = reader.readAffinityTopologyVersion();
if (!reader.isLastRead())
@@ -607,7 +617,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
reader.incrementState();
- case 11:
+ case 10:
writeVer = reader.readMessage();
if (!reader.isLastRead())
@@ -622,20 +632,18 @@ public abstract class GridDhtAtomicAbstractUpdateRequest
extends GridCacheIdMess
/** {@inheritDoc} */
@Override public String toString() {
- StringBuilder flags = new StringBuilder();
+ StringBuilder flagsStr = new StringBuilder();
if (skipStore())
- appendFlag(flags, "skipStore");
+ appendFlag(flagsStr, "skipStore");
if (keepBinary())
- appendFlag(flags, "keepBinary");
+ appendFlag(flagsStr, "keepBinary");
if (isFlag(DHT_ATOMIC_NEAR_FLAG_MASK))
- appendFlag(flags, "near");
+ appendFlag(flagsStr, "near");
if (hasResult())
- appendFlag(flags, "hasRes");
- if (replyWithoutDelay())
- appendFlag(flags, "resNoDelay");
+ appendFlag(flagsStr, "hasRes");
return S.toString(GridDhtAtomicAbstractUpdateRequest.class, this,
- "flags", flags.toString());
+ "flags", flagsStr.toString());
}
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index a309b7fad0a..5876bdb50b3 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -3455,13 +3455,6 @@ public class GridDhtAtomicCache<K, V> extends
GridDhtCacheAdapter<K, V> {
if (nearRes != null)
sendDhtNearResponse(req, nearRes);
- if (dhtRes == null && req.replyWithoutDelay()) {
- dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(),
- req.partition(),
- req.futureId(),
- ctx.deploymentEnabled());
- }
-
if (dhtRes != null)
sendDhtPrimaryResponse(nodeId, req, dhtRes);
else
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
index 10de8e0aa57..538759cd3aa 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java
@@ -19,7 +19,6 @@ package
org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.util.List;
import java.util.UUID;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -77,7 +76,6 @@ class GridDhtAtomicSingleUpdateFuture extends
GridDhtAtomicAbstractUpdateFuture
UUID nodeId,
long futId,
GridCacheVersion writeVer,
- CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
long ttl,
long conflictExpireTime,
@@ -90,7 +88,6 @@ class GridDhtAtomicSingleUpdateFuture extends
GridDhtAtomicAbstractUpdateFuture
nodeId,
futId,
writeVer,
- syncMode,
topVer,
updateReq.taskNameHash(),
cctx.deploymentEnabled(),
@@ -104,7 +101,6 @@ class GridDhtAtomicSingleUpdateFuture extends
GridDhtAtomicAbstractUpdateFuture
nodeId,
futId,
writeVer,
- syncMode,
topVer,
updateReq.taskNameHash(),
null,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index dfced709aba..4571d02ce05 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -17,11 +17,10 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.nio.ByteBuffer;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -32,8 +31,6 @@ import
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -45,17 +42,21 @@ import static
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdateRequest {
/** Key to update. */
@GridToStringInclude
+ @Order(11)
protected KeyCacheObject key;
/** Value to update. */
@GridToStringInclude
+ @Order(value = 12, method = "value")
protected CacheObject val;
/** Previous value. */
@GridToStringInclude
+ @Order(value = 13, method = "previousValue")
protected CacheObject prevVal;
/** Partition. */
+ @Order(value = 14, method = "updateCounter")
protected long updateCntr;
/**
@@ -72,7 +73,6 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
* @param nodeId Node ID.
* @param futId Future ID.
* @param writeVer Write version for cache values.
- * @param syncMode Cache write synchronization mode.
* @param topVer Topology version.
* @param taskNameHash Task name hash code.
* @param addDepInfo Deployment info.
@@ -85,7 +85,6 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
UUID nodeId,
long futId,
GridCacheVersion writeVer,
- CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
int taskNameHash,
boolean addDepInfo,
@@ -97,7 +96,6 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
nodeId,
futId,
writeVer,
- syncMode,
topVer,
taskNameHash,
addDepInfo,
@@ -109,7 +107,7 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
/**
* @param key Key to add.
* @param val Value, {@code null} if should be removed.
- * @param entryProcessor Entry processor.
+ * @param entryProc Entry processor.
* @param ttl TTL (optional).
* @param conflictExpireTime Conflict expire time (optional).
* @param conflictVer Conflict version (optional).
@@ -120,7 +118,7 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
*/
@Override public void addWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
+ EntryProcessor<Object, Object, Object> entryProc,
long ttl,
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer,
@@ -128,7 +126,7 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
@Nullable CacheObject prevVal,
long updateCntr,
GridCacheOperation cacheOp) {
- assert entryProcessor == null;
+ assert entryProc == null;
assert ttl <= 0 : ttl;
assert conflictExpireTime <= 0 : conflictExpireTime;
assert conflictVer == null : conflictVer;
@@ -165,16 +163,16 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
/**
* @param key Key to add.
* @param val Value, {@code null} if should be removed.
- * @param entryProcessor Entry processor.
+ * @param entryProc Entry processor.
* @param ttl TTL.
* @param expireTime Expire time.
*/
@Override public void addNearWriteValue(KeyCacheObject key,
@Nullable CacheObject val,
- EntryProcessor<Object, Object, Object> entryProcessor,
+ EntryProcessor<Object, Object, Object> entryProc,
long ttl,
long expireTime) {
- assert entryProcessor == null;
+ assert entryProc == null;
assert ttl <= 0 : ttl;
assert key.partition() >= 0 : key;
@@ -212,6 +210,20 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
return near() ? null : key;
}
+ /**
+ * @return Key to update.
+ */
+ public KeyCacheObject key() {
+ return key;
+ }
+
+ /**
+ * @param key New key to update.
+ */
+ public void key(KeyCacheObject key) {
+ this.key = key;
+ }
+
/** {@inheritDoc} */
@Override public int obsoleteNearKeysSize() {
return isFlag(DHT_ATOMIC_OBSOLETE_NEAR_KEY_FLAG_MASK) ? 1 : 0;
@@ -240,6 +252,20 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
return updateCntr;
}
+ /**
+ * @return Update counter.
+ */
+ public long updateCounter() {
+ return updateCntr;
+ }
+
+ /**
+ * @param updateCntr Update counter.
+ */
+ public void updateCounter(long updateCntr) {
+ this.updateCntr = updateCntr;
+ }
+
/** {@inheritDoc} */
@Override public KeyCacheObject nearKey(int idx) {
assert idx == 0 : idx;
@@ -254,6 +280,20 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
return near() ? null : val;
}
+ /**
+ * @return Cache object value.
+ */
+ public CacheObject value() {
+ return val;
+ }
+
+ /**
+ * @param val Cache object value to update.
+ */
+ public void value(CacheObject val) {
+ this.val = val;
+ }
+
/** {@inheritDoc} */
@Override @Nullable public CacheObject previousValue(int idx) {
assert idx == 0 : idx;
@@ -261,6 +301,20 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
return prevVal;
}
+ /**
+ * @return Previous value.
+ */
+ public CacheObject previousValue() {
+ return prevVal;
+ }
+
+ /**
+ * @param prevVal New previous value.
+ */
+ public void previousValue(CacheObject prevVal) {
+ this.prevVal = prevVal;
+ }
+
/** {@inheritDoc} */
@Override @Nullable public CacheObject nearValue(int idx) {
assert idx == 0 : idx;
@@ -277,9 +331,7 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
/** {@inheritDoc} */
@Override @Nullable public EntryProcessor<Object, Object, Object>
nearEntryProcessor(int idx) {
- assert idx == 0 : idx;
-
- return null;
+ return entryProcessor(idx);
}
/** {@inheritDoc} */
@@ -298,9 +350,7 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public long nearTtl(int idx) {
- assert idx == 0 : idx;
-
- return CU.TTL_NOT_CHANGED;
+ return ttl(idx);
}
/** {@inheritDoc} */
@@ -312,9 +362,7 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public long nearExpireTime(int idx) {
- assert idx == 0 : idx;
-
- return CU.EXPIRE_TIME_CALCULATE;
+ return conflictExpireTime(idx);
}
/** {@inheritDoc} */
@@ -323,10 +371,10 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
}
/** {@inheritDoc} */
- @Override public void prepareMarshal(GridCacheSharedContext ctx) throws
IgniteCheckedException {
+ @Override public void prepareMarshal(GridCacheSharedContext<?, ?> ctx)
throws IgniteCheckedException {
super.prepareMarshal(ctx);
- GridCacheContext cctx = ctx.cacheContext(cacheId);
+ GridCacheContext<?, ?> cctx = ctx.cacheContext(cacheId);
prepareMarshalObject(key, cctx);
@@ -336,10 +384,10 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
}
/** {@inheritDoc} */
- @Override public void finishUnmarshal(GridCacheSharedContext ctx,
ClassLoader ldr) throws IgniteCheckedException {
+ @Override public void finishUnmarshal(GridCacheSharedContext<?, ?> ctx,
ClassLoader ldr) throws IgniteCheckedException {
super.finishUnmarshal(ctx, ldr);
- GridCacheContext cctx = ctx.cacheContext(cacheId);
+ GridCacheContext<?, ?> cctx = ctx.cacheContext(cacheId);
finishUnmarshalObject(key, cctx, ldr);
@@ -348,101 +396,12 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
finishUnmarshalObject(prevVal, cctx, ldr);
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 12:
- if (!writer.writeKeyCacheObject(key))
- return false;
-
- writer.incrementState();
-
- case 13:
- if (!writer.writeCacheObject(prevVal))
- return false;
-
- writer.incrementState();
-
- case 14:
- if (!writer.writeLong(updateCntr))
- return false;
-
- writer.incrementState();
-
- case 15:
- if (!writer.writeCacheObject(val))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 12:
- key = reader.readKeyCacheObject();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 13:
- prevVal = reader.readCacheObject();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 14:
- updateCntr = reader.readLong();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- case 15:
- val = reader.readCacheObject();
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/**
* @param obj CacheObject to marshal
* @param ctx context
* @throws IgniteCheckedException if error
*/
- private void prepareMarshalObject(CacheObject obj, GridCacheContext ctx)
throws IgniteCheckedException {
+ private void prepareMarshalObject(CacheObject obj, GridCacheContext<?, ?>
ctx) throws IgniteCheckedException {
if (obj != null)
obj.prepareMarshal(ctx.cacheObjectContext());
}
@@ -453,7 +412,7 @@ public class GridDhtAtomicSingleUpdateRequest extends
GridDhtAtomicAbstractUpdat
* @param ldr class loader
* @throws IgniteCheckedException if error
*/
- private void finishUnmarshalObject(@Nullable CacheObject obj,
GridCacheContext ctx,
+ private void finishUnmarshalObject(@Nullable CacheObject obj,
GridCacheContext<?, ?> ctx,
ClassLoader ldr) throws IgniteCheckedException {
if (obj != null)
obj.finishUnmarshal(ctx.cacheObjectContext(), ldr);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 17754c61232..7debeffa16f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -19,7 +19,7 @@ package
org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.util.List;
import java.util.UUID;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -75,7 +75,6 @@ class GridDhtAtomicUpdateFuture extends
GridDhtAtomicAbstractUpdateFuture {
UUID nodeId,
long futId,
GridCacheVersion writeVer,
- CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
long ttl,
long conflictExpireTime,
@@ -87,7 +86,6 @@ class GridDhtAtomicUpdateFuture extends
GridDhtAtomicAbstractUpdateFuture {
nodeId,
futId,
writeVer,
- syncMode,
topVer,
updateReq.taskNameHash(),
null,
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 40c075189f1..28415c85490 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.UUID;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -138,7 +137,6 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
* @param futId Future ID.
* @param writeVer Write version for cache values.
* @param invokeArgs Optional arguments for entry processor.
- * @param syncMode Cache write synchronization mode.
* @param topVer Topology version.
* @param keepBinary Keep binary flag.
* @param skipStore Skip store flag.
@@ -152,7 +150,6 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
UUID nodeId,
long futId,
GridCacheVersion writeVer,
- CacheWriteSynchronizationMode syncMode,
@NotNull AffinityTopologyVersion topVer,
int taskNameHash,
Object[] invokeArgs,
@@ -166,7 +163,6 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
nodeId,
futId,
writeVer,
- syncMode,
topVer,
taskNameHash,
addDepInfo,
@@ -551,97 +547,97 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
}
switch (writer.state()) {
- case 12:
+ case 11:
if (!writer.writeGridLongList(conflictExpireTimes))
return false;
writer.incrementState();
- case 13:
+ case 12:
if (!writer.writeCollection(conflictVers,
MessageCollectionItemType.MSG))
return false;
writer.incrementState();
- case 14:
+ case 13:
if (!writer.writeCollection(entryProcessorsBytes,
MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 15:
+ case 14:
if (!writer.writeBoolean(forceTransformBackups))
return false;
writer.incrementState();
- case 16:
+ case 15:
if (!writer.writeObjectArray(invokeArgsBytes,
MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 17:
+ case 16:
if (!writer.writeCollection(keys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
- case 18:
+ case 17:
if (!writer.writeCollection(nearEntryProcessorsBytes,
MessageCollectionItemType.BYTE_ARR))
return false;
writer.incrementState();
- case 19:
+ case 18:
if (!writer.writeGridLongList(nearExpireTimes))
return false;
writer.incrementState();
- case 20:
+ case 19:
if (!writer.writeCollection(nearKeys,
MessageCollectionItemType.KEY_CACHE_OBJECT))
return false;
writer.incrementState();
- case 21:
+ case 20:
if (!writer.writeGridLongList(nearTtls))
return false;
writer.incrementState();
- case 22:
+ case 21:
if (!writer.writeCollection(nearVals,
MessageCollectionItemType.CACHE_OBJECT))
return false;
writer.incrementState();
- case 23:
+ case 22:
if (!writer.writeCollection(obsoleteIndexes,
MessageCollectionItemType.INT))
return false;
writer.incrementState();
- case 24:
+ case 23:
if (!writer.writeCollection(prevVals,
MessageCollectionItemType.CACHE_OBJECT))
return false;
writer.incrementState();
- case 25:
+ case 24:
if (!writer.writeGridLongList(ttls))
return false;
writer.incrementState();
- case 26:
+ case 25:
if (!writer.writeGridLongList(updateCntrs))
return false;
writer.incrementState();
- case 27:
+ case 26:
if (!writer.writeCollection(vals,
MessageCollectionItemType.CACHE_OBJECT))
return false;
@@ -660,7 +656,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
return false;
switch (reader.state()) {
- case 12:
+ case 11:
conflictExpireTimes = reader.readGridLongList();
if (!reader.isLastRead())
@@ -668,7 +664,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 13:
+ case 12:
conflictVers =
reader.readCollection(MessageCollectionItemType.MSG);
if (!reader.isLastRead())
@@ -676,7 +672,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 14:
+ case 13:
entryProcessorsBytes =
reader.readCollection(MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
@@ -684,7 +680,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 15:
+ case 14:
forceTransformBackups = reader.readBoolean();
if (!reader.isLastRead())
@@ -692,7 +688,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 16:
+ case 15:
invokeArgsBytes =
reader.readObjectArray(MessageCollectionItemType.BYTE_ARR, byte[].class);
if (!reader.isLastRead())
@@ -700,7 +696,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 17:
+ case 16:
keys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
@@ -708,7 +704,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 18:
+ case 17:
nearEntryProcessorsBytes =
reader.readCollection(MessageCollectionItemType.BYTE_ARR);
if (!reader.isLastRead())
@@ -716,7 +712,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 19:
+ case 18:
nearExpireTimes = reader.readGridLongList();
if (!reader.isLastRead())
@@ -724,7 +720,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 20:
+ case 19:
nearKeys =
reader.readCollection(MessageCollectionItemType.KEY_CACHE_OBJECT);
if (!reader.isLastRead())
@@ -732,7 +728,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 21:
+ case 20:
nearTtls = reader.readGridLongList();
if (!reader.isLastRead())
@@ -740,7 +736,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 22:
+ case 21:
nearVals =
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
if (!reader.isLastRead())
@@ -748,7 +744,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 23:
+ case 22:
obsoleteIndexes =
reader.readCollection(MessageCollectionItemType.INT);
if (!reader.isLastRead())
@@ -756,7 +752,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 24:
+ case 23:
prevVals =
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
if (!reader.isLastRead())
@@ -764,7 +760,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 25:
+ case 24:
ttls = reader.readGridLongList();
if (!reader.isLastRead())
@@ -772,7 +768,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 26:
+ case 25:
updateCntrs = reader.readGridLongList();
if (!reader.isLastRead())
@@ -780,7 +776,7 @@ public class GridDhtAtomicUpdateRequest extends
GridDhtAtomicAbstractUpdateReque
reader.incrementState();
- case 27:
+ case 26:
vals =
reader.readCollection(MessageCollectionItemType.CACHE_OBJECT);
if (!reader.isLastRead())