This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d4d50069462 IGNITE-26576 Refactor GridNearAtomicAbstractUpdateRequest
and GridNearAtomicSingleUpdateRequest (#12395)
d4d50069462 is described below
commit d4d5006946280bdbdb2018e35454b66ae6371e4d
Author: Vladimir Steshin <[email protected]>
AuthorDate: Thu Oct 16 13:23:20 2025 +0300
IGNITE-26576 Refactor GridNearAtomicAbstractUpdateRequest and
GridNearAtomicSingleUpdateRequest (#12395)
---
.../CacheWriteSynchronizationModeMessage.java | 94 +++++++++++++++++
.../communication/GridCacheOperationMessage.java | 101 +++++++++++++++++++
.../communication/GridIoMessageFactory.java | 12 ++-
.../GridNearAtomicAbstractSingleUpdateRequest.java | 103 -------------------
.../GridNearAtomicAbstractUpdateRequest.java | 111 +++++++++++++++------
.../atomic/GridNearAtomicFullUpdateRequest.java | 14 +--
.../GridNearAtomicSingleUpdateFilterRequest.java | 58 ++---------
.../atomic/GridNearAtomicSingleUpdateRequest.java | 57 ++++++++++-
.../CacheWriteSynchroizationModeMessageTest.java | 81 +++++++++++++++
.../GridCacheOperationModeMessageTest.java | 97 ++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 6 +-
11 files changed, 538 insertions(+), 196 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CacheWriteSynchronizationModeMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CacheWriteSynchronizationModeMessage.java
new file mode 100644
index 00000000000..3769c41d049
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/CacheWriteSynchronizationModeMessage.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class CacheWriteSynchronizationModeMessage implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 503;
+
+ /** Cache write synchronization mode value. */
+ @Nullable private CacheWriteSynchronizationMode cacheWriteSyncMode;
+
+ /** Code of cache write synchronization mode. */
+ @Order(0)
+ private byte code = -1;
+
+ /** Constructor. */
+ public CacheWriteSynchronizationModeMessage() {
+ // No-op.
+ }
+
+ /** Constructor. */
+ public CacheWriteSynchronizationModeMessage(@Nullable
CacheWriteSynchronizationMode mode) {
+ cacheWriteSyncMode = mode;
+ code = encode(mode);
+ }
+
+ /** @param mode Cache write synchronization mode to encode. */
+ private static byte encode(@Nullable CacheWriteSynchronizationMode mode) {
+ if (mode == null)
+ return -1;
+
+ switch (mode) {
+ case FULL_SYNC: return 0;
+ case FULL_ASYNC: return 1;
+ case PRIMARY_SYNC: return 2;
+ }
+
+ throw new IllegalArgumentException("Unknown cache write
synchronization mode: " + mode);
+ }
+
+ /** @param code Code of cache write synchronization mode to decode. */
+ @Nullable private static CacheWriteSynchronizationMode decode(short code) {
+ switch (code) {
+ case -1: return null;
+ case 0: return CacheWriteSynchronizationMode.FULL_SYNC;
+ case 1: return CacheWriteSynchronizationMode.FULL_ASYNC;
+ case 2: return CacheWriteSynchronizationMode.PRIMARY_SYNC;
+ }
+
+ throw new IllegalArgumentException("Unknown cache write
synchronization mode code: " + code);
+ }
+
+ /** @param code Code of cache write synchronization mode. */
+ public void code(byte code) {
+ this.code = code;
+ cacheWriteSyncMode = decode(code);
+ }
+
+ /** @return Code of cache write synchronization mode. */
+ public byte code() {
+ return code;
+ }
+
+ /** @return Cache write synchronization mode value. */
+ public CacheWriteSynchronizationMode value() {
+ return cacheWriteSyncMode;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridCacheOperationMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridCacheOperationMessage.java
new file mode 100644
index 00000000000..e187c654919
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridCacheOperationMessage.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.internal.Order;
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class GridCacheOperationMessage implements Message {
+ /** Type code. */
+ public static final short TYPE_CODE = 504;
+
+ /** Cache oparation. */
+ @Nullable private GridCacheOperation cacheOperation;
+
+ /** Cache oparation code. */
+ @Order(0)
+ private byte code = -1;
+
+ /** Constructor. */
+ public GridCacheOperationMessage() {
+ // No-op.
+ }
+
+ /** Constructor. */
+ public GridCacheOperationMessage(@Nullable GridCacheOperation
cacheOperation) {
+ this.cacheOperation = cacheOperation;
+ code = encode(cacheOperation);
+ }
+
+ /** @param operation Cache operation to encode. */
+ private static byte encode(@Nullable GridCacheOperation operation) {
+ if (operation == null)
+ return -1;
+
+ switch (operation) {
+ case READ: return 0;
+ case CREATE: return 1;
+ case UPDATE: return 2;
+ case DELETE: return 3;
+ case TRANSFORM: return 4;
+ case RELOAD: return 5;
+ case NOOP: return 6;
+ }
+
+ throw new IllegalArgumentException("Unknown cache operation: " +
operation);
+ }
+
+ /** @param code Cache operation code to dencode to a cache operation
value. */
+ @Nullable private static GridCacheOperation decode(byte code) {
+ switch (code) {
+ case -1: return null;
+ case 0: return GridCacheOperation.READ;
+ case 1: return GridCacheOperation.CREATE;
+ case 2: return GridCacheOperation.UPDATE;
+ case 3: return GridCacheOperation.DELETE;
+ case 4: return GridCacheOperation.TRANSFORM;
+ case 5: return GridCacheOperation.RELOAD;
+ case 6: return GridCacheOperation.NOOP;
+ }
+
+ throw new IllegalArgumentException("Unknown cache operation code: " +
code);
+ }
+
+ /** @code Cache operation code. */
+ public void code(byte code) {
+ this.code = code;
+ cacheOperation = decode(code);
+ }
+
+ /** @return Cache operation code. */
+ public byte code() {
+ return code;
+ }
+
+ /** @return Cache operation value. */
+ @Nullable public GridCacheOperation value() {
+ return cacheOperation;
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return TYPE_CODE;
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 98cd6a74605..3cdfc15a6da 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -32,9 +32,11 @@ import
org.apache.ignite.internal.codegen.CacheEvictionEntrySerializer;
import org.apache.ignite.internal.codegen.CacheGroupAffinityMessageSerializer;
import
org.apache.ignite.internal.codegen.CachePartitionPartialCountersMapSerializer;
import org.apache.ignite.internal.codegen.CacheVersionedValueSerializer;
+import
org.apache.ignite.internal.codegen.CacheWriteSynchronizationModeMessageSerializer;
import org.apache.ignite.internal.codegen.ErrorMessageSerializer;
import
org.apache.ignite.internal.codegen.GenerateEncryptionKeyRequestSerializer;
import org.apache.ignite.internal.codegen.GridCacheEntryInfoSerializer;
+import org.apache.ignite.internal.codegen.GridCacheOperationMessageSerializer;
import org.apache.ignite.internal.codegen.GridCacheSqlQuerySerializer;
import org.apache.ignite.internal.codegen.GridCacheTtlUpdateRequestSerializer;
import org.apache.ignite.internal.codegen.GridCacheTxRecoveryRequestSerializer;
@@ -62,6 +64,8 @@ import
org.apache.ignite.internal.codegen.GridDistributedTxPrepareRequestSeriali
import org.apache.ignite.internal.codegen.GridJobCancelRequestSerializer;
import org.apache.ignite.internal.codegen.GridJobSiblingsRequestSerializer;
import
org.apache.ignite.internal.codegen.GridNearAtomicCheckUpdateRequestSerializer;
+import
org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateFilterRequestSerializer;
+import
org.apache.ignite.internal.codegen.GridNearAtomicSingleUpdateRequestSerializer;
import
org.apache.ignite.internal.codegen.GridNearAtomicUpdateResponseSerializer;
import org.apache.ignite.internal.codegen.GridNearGetRequestSerializer;
import org.apache.ignite.internal.codegen.GridNearLockRequestSerializer;
@@ -361,9 +365,9 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register((short)118, CacheContinuousQueryBatchAck::new, new
CacheContinuousQueryBatchAckSerializer());
// [120..123] - DR
- factory.register((short)125, GridNearAtomicSingleUpdateRequest::new);
+ factory.register((short)125, GridNearAtomicSingleUpdateRequest::new,
new GridNearAtomicSingleUpdateRequestSerializer());
factory.register((short)126,
GridNearAtomicSingleUpdateInvokeRequest::new);
- factory.register((short)127,
GridNearAtomicSingleUpdateFilterRequest::new);
+ factory.register((short)127,
GridNearAtomicSingleUpdateFilterRequest::new, new
GridNearAtomicSingleUpdateFilterRequestSerializer());
factory.register((short)128, CacheGroupAffinityMessage::new, new
CacheGroupAffinityMessageSerializer());
factory.register((short)129, WalStateAckMessage::new, new
WalStateAckMessageSerializer());
factory.register((short)130,
UserManagementOperationFinishedMessage::new, new
UserManagementOperationFinishedMessageSerializer());
@@ -403,12 +407,16 @@ public class GridIoMessageFactory implements
MessageFactoryProvider {
factory.register(StatisticsRequest.TYPE_CODE, StatisticsRequest::new);
factory.register(StatisticsResponse.TYPE_CODE,
StatisticsResponse::new);
+ // Enums
factory.register(CachePartitionPartialCountersMap.TYPE_CODE,
CachePartitionPartialCountersMap::new,
new CachePartitionPartialCountersMapSerializer());
factory.register(IgniteDhtDemandedPartitionsMap.TYPE_CODE,
IgniteDhtDemandedPartitionsMap::new,
new IgniteDhtDemandedPartitionsMapSerializer());
factory.register(TransactionIsolationMessage.TYPE_CODE,
TransactionIsolationMessage::new,
new TransactionIsolationMessageSerializer());
+ factory.register(CacheWriteSynchronizationModeMessage.TYPE_CODE,
CacheWriteSynchronizationModeMessage::new,
+ new CacheWriteSynchronizationModeMessageSerializer());
+ factory.register(GridCacheOperationMessage.TYPE_CODE,
GridCacheOperationMessage::new, new GridCacheOperationMessageSerializer());
// [-3..119] [124..129] [-23..-28] [-36..-55] [183..188] - this
// [120..123] - DR
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
deleted file mode 100644
index 6e26691e0c3..00000000000
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-
-import java.util.UUID;
-import javax.cache.expiry.ExpiryPolicy;
-import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.GridCacheOperation;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- *
- */
-public abstract class GridNearAtomicAbstractSingleUpdateRequest extends
GridNearAtomicAbstractUpdateRequest {
- /** */
- private static final CacheEntryPredicate[] NO_FILTER = new
CacheEntryPredicate[0];
-
- /**
- * Empty constructor.
- */
- protected GridNearAtomicAbstractSingleUpdateRequest() {
- // No-op.
- }
-
- /**
- * Constructor.
- * @param cacheId Cache ID.
- * @param nodeId Node ID.
- * @param futId Future ID.
- * @param topVer Topology version.
- * @param syncMode Synchronization mode.
- * @param op Cache update operation.
- * @param taskNameHash Task name hash code.
- * @param flags Flags.
- * @param addDepInfo Deployment info flag.
- */
- protected GridNearAtomicAbstractSingleUpdateRequest(
- int cacheId,
- UUID nodeId,
- long futId,
- @NotNull AffinityTopologyVersion topVer,
- CacheWriteSynchronizationMode syncMode,
- GridCacheOperation op,
- int taskNameHash,
- byte flags,
- boolean addDepInfo
- ) {
- super(cacheId,
- nodeId,
- futId,
- topVer,
- syncMode,
- op,
- taskNameHash,
- flags,
- addDepInfo);
- }
-
- /**
- * @return Expiry policy.
- */
- @Override public ExpiryPolicy expiry() {
- return null;
- }
-
- /**
- * @return Optional arguments for entry processor.
- */
- @Override @Nullable public Object[] invokeArguments() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public CacheEntryPredicate[] filter() {
- return NO_FILTER;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(GridNearAtomicAbstractSingleUpdateRequest.class,
this,
- "nodeId", nodeId, "futId", futId, "topVer", topVer,
- "parent", super.toString());
- }
-}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
index 04c9b164d34..615361ac977 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java
@@ -24,7 +24,9 @@ import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
-import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.Order;
+import
org.apache.ignite.internal.managers.communication.CacheWriteSynchronizationModeMessage;
+import
org.apache.ignite.internal.managers.communication.GridCacheOperationMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -73,36 +75,40 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
private static final int AFFINITY_MAPPING_FLAG_MASK = 0x80;
/** Target node ID. */
- @GridDirectTransient
protected UUID nodeId;
/** Future version. */
+ @Order(value = 4, method = "futureId")
protected long futId;
/** Topology version. */
+ @Order(value = 5, method = "topologyVersion")
protected AffinityTopologyVersion topVer;
- /** Write synchronization mode. */
- protected CacheWriteSynchronizationMode syncMode;
+ /** Cache operation wrapper message. */
+ @Order(value = 6, method = "cacheOperationMessage")
+ protected GridCacheOperationMessage opMsg;
- /** Update operation. */
- protected GridCacheOperation op;
+ /** Write synchronization mode wrapper message. */
+ @Order(value = 7, method = "writeSynchronizationModeMessage")
+ protected CacheWriteSynchronizationModeMessage syncModeMsg;
/** Task name hash. */
+ @Order(8)
protected int taskNameHash;
/** Compressed boolean flags. Make sure 'toString' is updated when add new
flag. */
@GridToStringExclude
+ @Order(9)
protected byte flags;
/** */
- @GridDirectTransient
private GridNearAtomicUpdateResponse res;
/**
*
*/
- public GridNearAtomicAbstractUpdateRequest() {
+ protected GridNearAtomicAbstractUpdateRequest() {
// No-op.
}
@@ -134,8 +140,8 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
this.nodeId = nodeId;
this.futId = futId;
this.topVer = topVer;
- this.syncMode = syncMode;
- this.op = op;
+ this.opMsg = new GridCacheOperationMessage(op);
+ this.syncModeMsg = new CacheWriteSynchronizationModeMessage(syncMode);
this.taskNameHash = taskNameHash;
this.flags = flags;
this.addDepInfo = addDepInfo;
@@ -206,6 +212,11 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
return isFlag(NEAR_CACHE_FLAG_MASK);
}
+ /** Sets new topology version. */
+ public void topologyVersion(AffinityTopologyVersion topVer) {
+ this.topVer = topVer;
+ }
+
/** {@inheritDoc} */
@Override public final AffinityTopologyVersion topologyVersion() {
return topVer;
@@ -251,9 +262,9 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
* @return {@code True} if update is processed in {@link
CacheWriteSynchronizationMode#FULL_SYNC} mode.
*/
boolean fullSync() {
- assert syncMode != null;
+ assert syncModeMsg != null && writeSynchronizationMode() != null;
- return syncMode == CacheWriteSynchronizationMode.FULL_SYNC;
+ return writeSynchronizationMode() ==
CacheWriteSynchronizationMode.FULL_SYNC;
}
/**
@@ -263,11 +274,37 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
return taskNameHash;
}
+ /**
+ * Sets task name hash code.
+ */
+ public void taskNameHash(int taskNameHash) {
+ this.taskNameHash = taskNameHash;
+ }
+
+ /**
+ * @return Compressed boolean flags.
+ */
+ public byte flags() {
+ return flags;
+ }
+
+ /**
+ * @param flags New compressed boolean flags.
+ */
+ public void flags(byte flags) {
+ this.flags = flags;
+ }
+
/**
* @return Update opreation.
*/
- public GridCacheOperation operation() {
- return op;
+ @Nullable public GridCacheOperation operation() {
+ return opMsg.value();
+ }
+
+ /** @return Cache operatrion. */
+ @Nullable public CacheWriteSynchronizationMode writeSynchronizationMode() {
+ return syncModeMsg.value();
}
/**
@@ -285,10 +322,32 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
}
/**
- * @return Write synchronization mode.
+ * Sets near node future ID.
*/
- public final CacheWriteSynchronizationMode writeSynchronizationMode() {
- return syncMode;
+ public void futureId(long futId) {
+ this.futId = futId;
+ }
+
+ /** @return The cache operation wrapper message. */
+ public GridCacheOperationMessage cacheOperationMessage() {
+ return opMsg;
+ }
+
+ /** Sets the cache operation wrapper message. */
+ public void cacheOperationMessage(GridCacheOperationMessage cacheOpMsg) {
+ this.opMsg = cacheOpMsg;
+ }
+
+ /**
+ * @return The write mode synchronization wrapper message.
+ */
+ public final CacheWriteSynchronizationModeMessage
writeSynchronizationModeMessage() {
+ return syncModeMsg;
+ }
+
+ /** Sets the write mode synchronization wrapper message */
+ public void
writeSynchronizationModeMessage(CacheWriteSynchronizationModeMessage
writeSyncModeMsg) {
+ this.syncModeMsg = writeSyncModeMsg;
}
/**
@@ -513,6 +572,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
*/
public abstract KeyCacheObject key(int idx);
+ // TODO: remove after IGNITE-26599, IGNITE-26577
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -541,13 +601,13 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
writer.incrementState();
case 6:
- if (!writer.writeByte(op != null ? (byte)op.ordinal() : -1))
+ if (!writer.writeMessage(opMsg))
return false;
writer.incrementState();
case 7:
- if (!writer.writeByte(syncMode != null ?
(byte)syncMode.ordinal() : -1))
+ if (!writer.writeMessage(syncModeMsg))
return false;
writer.incrementState();
@@ -569,6 +629,7 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
return true;
}
+ // TODO: remove after IGNITE-26599, IGNITE-26577
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);
@@ -594,27 +655,19 @@ public abstract class GridNearAtomicAbstractUpdateRequest
extends GridCacheIdMes
reader.incrementState();
case 6:
- byte opOrd;
-
- opOrd = reader.readByte();
+ opMsg = reader.readMessage();
if (!reader.isLastRead())
return false;
- op = GridCacheOperation.fromOrdinal(opOrd);
-
reader.incrementState();
case 7:
- byte syncModeOrd;
-
- syncModeOrd = reader.readByte();
+ syncModeMsg = reader.readMessage();
if (!reader.isLastRead())
return false;
- syncMode =
CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd);
-
reader.incrementState();
case 8:
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
index 557b0e5a3f6..7ff682f3cbc 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicFullUpdateRequest.java
@@ -175,13 +175,13 @@ public class GridNearAtomicFullUpdateRequest extends
GridNearAtomicAbstractUpdat
@Nullable GridCacheVersion conflictVer) {
EntryProcessor<Object, Object, Object> entryProc = null;
- if (op == TRANSFORM) {
+ if (operation() == TRANSFORM) {
assert val instanceof EntryProcessor : val;
entryProc = (EntryProcessor<Object, Object, Object>)val;
}
- assert val != null || op == DELETE;
+ assert val != null || operation() == DELETE;
keys.add(key);
@@ -256,19 +256,19 @@ public class GridNearAtomicFullUpdateRequest extends
GridNearAtomicAbstractUpdat
/** {@inheritDoc} */
@Override public List<?> values() {
- return op == TRANSFORM ? entryProcessors : vals;
+ return operation() == TRANSFORM ? entryProcessors : vals;
}
/** {@inheritDoc} */
@Override public CacheObject value(int idx) {
- assert op == UPDATE : op;
+ assert operation() == UPDATE : operation();
return vals.get(idx);
}
/** {@inheritDoc} */
@Override public EntryProcessor<Object, Object, Object> entryProcessor(int
idx) {
- assert op == TRANSFORM : op;
+ assert operation() == TRANSFORM : operation();
return entryProcessors.get(idx);
}
@@ -360,7 +360,7 @@ public class GridNearAtomicFullUpdateRequest extends
GridNearAtomicAbstractUpdat
filter = null;
}
- if (op == TRANSFORM) {
+ if (operation() == TRANSFORM) {
// force addition of deployment info for entry processors if P2P
is enabled globally.
if (!addDepInfo && ctx.deploymentEnabled())
addDepInfo = true;
@@ -393,7 +393,7 @@ public class GridNearAtomicFullUpdateRequest extends
GridNearAtomicAbstractUpdat
}
}
- if (op == TRANSFORM) {
+ if (operation() == TRANSFORM) {
if (entryProcessors == null)
entryProcessors = unmarshalCollection(entryProcessorsBytes,
ctx, ldr);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
index 518738cea93..edb273bc565 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFilterRequest.java
@@ -17,20 +17,17 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.typedef.internal.S;
-import
org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
-import org.apache.ignite.plugin.extensions.communication.MessageReader;
-import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -39,6 +36,7 @@ import org.jetbrains.annotations.Nullable;
*/
public class GridNearAtomicSingleUpdateFilterRequest extends
GridNearAtomicSingleUpdateRequest {
/** Filter. */
+ @Order(12)
private CacheEntryPredicate[] filter;
/**
@@ -91,6 +89,11 @@ public class GridNearAtomicSingleUpdateFilterRequest extends
GridNearAtomicSingl
this.filter = filter;
}
+ /** */
+ public void filter(CacheEntryPredicate[] filter) {
+ this.filter = filter;
+ }
+
/** {@inheritDoc} */
@Override @Nullable public CacheEntryPredicate[] filter() {
return filter;
@@ -132,53 +135,6 @@ public class GridNearAtomicSingleUpdateFilterRequest
extends GridNearAtomicSingl
}
}
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!super.writeTo(buf, writer))
- return false;
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 12:
- if (!writer.writeObjectArray(filter,
MessageCollectionItemType.MSG))
- return false;
-
- writer.incrementState();
-
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- if (!super.readFrom(buf, reader))
- return false;
-
- switch (reader.state()) {
- case 12:
- filter = reader.readObjectArray(MessageCollectionItemType.MSG,
CacheEntryPredicate.class);
-
- if (!reader.isLastRead())
- return false;
-
- reader.incrementState();
-
- }
-
- return true;
- }
-
/** {@inheritDoc} */
@Override public short directType() {
return 127;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index fb15a113c5d..d6cc7552e6c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -21,14 +21,18 @@ import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -45,12 +49,14 @@ import static
org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
/**
*
*/
-public class GridNearAtomicSingleUpdateRequest extends
GridNearAtomicAbstractSingleUpdateRequest {
+public class GridNearAtomicSingleUpdateRequest extends
GridNearAtomicAbstractUpdateRequest {
/** Key to update. */
@GridToStringInclude
+ @Order(10)
protected KeyCacheObject key;
/** Value to update. */
+ @Order(value = 11, method = "value")
protected CacheObject val;
/**
@@ -115,8 +121,8 @@ public class GridNearAtomicSingleUpdateRequest extends
GridNearAtomicAbstractSin
long conflictTtl,
long conflictExpireTime,
@Nullable GridCacheVersion conflictVer) {
- assert op != TRANSFORM;
- assert val != null || op == DELETE;
+ assert operation() != TRANSFORM;
+ assert val != null || operation() == DELETE;
assert conflictTtl < 0 : conflictTtl;
assert conflictExpireTime < 0 : conflictExpireTime;
assert conflictVer == null : conflictVer;
@@ -142,6 +148,20 @@ public class GridNearAtomicSingleUpdateRequest extends
GridNearAtomicAbstractSin
return Collections.singletonList(key);
}
+ /**
+ * @param key Key to update.
+ */
+ public void key(KeyCacheObject key) {
+ this.key = key;
+ }
+
+ /**
+ * @return Key to update.
+ */
+ public KeyCacheObject key() {
+ return key;
+ }
+
/** {@inheritDoc} */
@Override public KeyCacheObject key(int idx) {
assert idx == 0 : idx;
@@ -154,6 +174,20 @@ public class GridNearAtomicSingleUpdateRequest extends
GridNearAtomicAbstractSin
return Collections.singletonList(val);
}
+ /**
+ * @return Cache object value to update.
+ */
+ public CacheObject value() {
+ return val;
+ }
+
+ /**
+ * @param val Cache object value to update.
+ */
+ public void value(CacheObject val) {
+ this.val = val;
+ }
+
/** {@inheritDoc} */
@Override public CacheObject value(int idx) {
assert idx == 0 : idx;
@@ -161,6 +195,21 @@ public class GridNearAtomicSingleUpdateRequest extends
GridNearAtomicAbstractSin
return val;
}
+ /** {@inheritDoc} */
+ @Nullable @Override public CacheEntryPredicate[] filter() {
+ return GridCacheUtils.empty0();
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExpiryPolicy expiry() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public Object[] invokeArguments() {
+ return null;
+ }
+
/** {@inheritDoc} */
@Override public EntryProcessor<Object, Object, Object> entryProcessor(int
idx) {
assert idx == 0 : idx;
@@ -225,6 +274,7 @@ public class GridNearAtomicSingleUpdateRequest extends
GridNearAtomicAbstractSin
val.finishUnmarshal(cctx.cacheObjectContext(), ldr);
}
+ // TODO: remove after IGNITE-26599
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
@@ -257,6 +307,7 @@ public class GridNearAtomicSingleUpdateRequest extends
GridNearAtomicAbstractSin
return true;
}
+ // TODO: remove after IGNITE-26599
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CacheWriteSynchroizationModeMessageTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CacheWriteSynchroizationModeMessageTest.java
new file mode 100644
index 00000000000..50bd6370c63
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/CacheWriteSynchroizationModeMessageTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+
+import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/** */
+public class CacheWriteSynchroizationModeMessageTest {
+ /** */
+ @Test
+ public void testCacheWriteSynchroizationCode() {
+ assertEquals(-1, new
CacheWriteSynchronizationModeMessage(null).code());
+ assertEquals(0, new
CacheWriteSynchronizationModeMessage(CacheWriteSynchronizationMode.FULL_SYNC).code());
+ assertEquals(1, new
CacheWriteSynchronizationModeMessage(CacheWriteSynchronizationMode.FULL_ASYNC).code());
+ assertEquals(2, new
CacheWriteSynchronizationModeMessage(CacheWriteSynchronizationMode.PRIMARY_SYNC).code());
+
+ for (CacheWriteSynchronizationMode op :
CacheWriteSynchronizationMode.values()) {
+ assertTrue(new CacheWriteSynchronizationModeMessage(op).code() >=
0);
+ assertTrue(new CacheWriteSynchronizationModeMessage(op).code() <
3);
+ }
+ }
+
+ /** */
+ @Test
+ public void testCacheWriteSynchronizationFromCode() {
+ CacheWriteSynchronizationModeMessage msg = new
CacheWriteSynchronizationModeMessage(null);
+
+ msg.code((byte)-1);
+ assertNull(msg.value());
+
+ msg.code((byte)0);
+ assertSame(CacheWriteSynchronizationMode.FULL_SYNC, msg.value());
+
+ msg.code((byte)1);
+ assertSame(CacheWriteSynchronizationMode.FULL_ASYNC, msg.value());
+
+ msg.code((byte)2);
+ assertSame(CacheWriteSynchronizationMode.PRIMARY_SYNC, msg.value());
+
+ Throwable t = assertThrowsWithCause(() -> msg.code((byte)3),
IllegalArgumentException.class);
+ assertEquals("Unknown cache write synchronization mode code: 3",
t.getMessage());
+ }
+
+ /** */
+ @Test
+ public void testConversionConsistency() {
+ for (CacheWriteSynchronizationMode op :
F.concat(CacheWriteSynchronizationMode.values(),
(CacheWriteSynchronizationMode)null)) {
+ CacheWriteSynchronizationModeMessage msg = new
CacheWriteSynchronizationModeMessage(op);
+
+ assertEquals(op, msg.value());
+
+ CacheWriteSynchronizationModeMessage newMsg = new
CacheWriteSynchronizationModeMessage();
+ newMsg.code(msg.code());
+
+ assertEquals(msg.value(), newMsg.value());
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCacheOperationModeMessageTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCacheOperationModeMessageTest.java
new file mode 100644
index 00000000000..c120a692e59
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridCacheOperationModeMessageTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.internal.processors.cache.GridCacheOperation;
+import org.apache.ignite.internal.util.typedef.F;
+import org.junit.Test;
+
+import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/** */
+public class GridCacheOperationModeMessageTest {
+ /** */
+ @Test
+ public void testCacheOperationModeCode() {
+ assertEquals(-1, new GridCacheOperationMessage(null).code());
+ assertEquals(0, new
GridCacheOperationMessage(GridCacheOperation.READ).code());
+ assertEquals(1, new
GridCacheOperationMessage(GridCacheOperation.CREATE).code());
+ assertEquals(2, new
GridCacheOperationMessage(GridCacheOperation.UPDATE).code());
+ assertEquals(3, new
GridCacheOperationMessage(GridCacheOperation.DELETE).code());
+ assertEquals(4, new
GridCacheOperationMessage(GridCacheOperation.TRANSFORM).code());
+ assertEquals(5, new
GridCacheOperationMessage(GridCacheOperation.RELOAD).code());
+ assertEquals(6, new
GridCacheOperationMessage(GridCacheOperation.NOOP).code());
+
+ for (GridCacheOperation op : GridCacheOperation.values()) {
+ assertTrue(new GridCacheOperationMessage(op).code() >= 0);
+ assertTrue(new GridCacheOperationMessage(op).code() < 7);
+ }
+ }
+
+ /** */
+ @Test
+ public void testCacheOperationModeFromCode() {
+ GridCacheOperationMessage msg = new GridCacheOperationMessage(null);
+
+ msg.code((byte)-1);
+ assertNull(msg.value());
+
+ msg.code((byte)0);
+ assertSame(GridCacheOperation.READ, msg.value());
+
+ msg.code((byte)1);
+ assertSame(GridCacheOperation.CREATE, msg.value());
+
+ msg.code((byte)2);
+ assertSame(GridCacheOperation.UPDATE, msg.value());
+
+ msg.code((byte)3);
+ assertSame(GridCacheOperation.DELETE, msg.value());
+
+ msg.code((byte)4);
+ assertSame(GridCacheOperation.TRANSFORM, msg.value());
+
+ msg.code((byte)5);
+ assertSame(GridCacheOperation.RELOAD, msg.value());
+
+ msg.code((byte)6);
+ assertSame(GridCacheOperation.NOOP, msg.value());
+
+ Throwable t = assertThrowsWithCause(() -> msg.code((byte)7),
IllegalArgumentException.class);
+ assertEquals("Unknown cache operation code: 7", t.getMessage());
+ }
+
+ /** */
+ @Test
+ public void testConversionConsistency() {
+ for (GridCacheOperation op : F.concat(GridCacheOperation.values(),
(GridCacheOperation)null)) {
+ GridCacheOperationMessage msg = new GridCacheOperationMessage(op);
+
+ assertEquals(op, msg.value());
+
+ GridCacheOperationMessage newMsg = new GridCacheOperationMessage();
+ newMsg.code(msg.code());
+
+ assertEquals(msg.value(), newMsg.value());
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index b840db2b284..aadd5c46f84 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -37,7 +37,9 @@ import
org.apache.ignite.internal.IgniteLocalNodeMapBeforeStartTest;
import org.apache.ignite.internal.IgniteSlowClientDetectionSelfTest;
import org.apache.ignite.internal.TransactionsMXBeanImplTest;
import org.apache.ignite.internal.codegen.MessageProcessorTest;
+import
org.apache.ignite.internal.managers.communication.CacheWriteSynchroizationModeMessageTest;
import org.apache.ignite.internal.managers.communication.ErrorMessageSelfTest;
+import
org.apache.ignite.internal.managers.communication.GridCacheOperationModeMessageTest;
import
org.apache.ignite.internal.managers.communication.TransactionIsolationMessageTest;
import
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentV2Test;
import
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentV2TestNoOptimizations;
@@ -147,7 +149,9 @@ import org.junit.runners.Suite;
MessageProcessorTest.class,
ErrorMessageSelfTest.class,
- TransactionIsolationMessageTest.class
+ TransactionIsolationMessageTest.class,
+ GridCacheOperationModeMessageTest.class,
+ CacheWriteSynchroizationModeMessageTest.class
})
public class IgniteBasicTestSuite {
}