ignite-4705
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eef1d310 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eef1d310 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eef1d310 Branch: refs/heads/ignite-4705 Commit: eef1d3108178f17293175c0e6eb04707c89ca876 Parents: c5c5eb5 Author: sboikov <[email protected]> Authored: Thu Feb 16 17:36:24 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Feb 16 17:36:24 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 4 ++ .../processors/cache/GridCacheMessage.java | 2 +- .../GridDhtAtomicAbstractUpdateFuture.java | 22 ++++-- .../GridDhtAtomicAbstractUpdateRequest.java | 74 +++++++++++++++++++- .../dht/atomic/GridDhtAtomicCache.java | 10 +-- .../GridDhtAtomicDeferredUpdateResponse.java | 7 +- .../dht/atomic/GridDhtAtomicNearResponse.java | 38 +++++----- .../GridDhtAtomicSingleUpdateRequest.java | 48 ++----------- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 52 ++------------ .../distributed/dht/atomic/UpdateErrors.java | 6 ++ 10 files changed, 137 insertions(+), 126 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index f9952b3..0f7371d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicNearResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; @@ -361,6 +362,9 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (depEnabled) cctx.deploy().ignoreOwnership(true); + if (!cacheMsg.partitionExchangeMessage()) + log.info("Cache message: " + cacheMsg); + unmarshall(nodeId, cacheMsg); if (cacheMsg.classError() != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java index b9fb56a..3ec5323 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java @@ -50,7 +50,7 @@ public abstract class GridCacheMessage implements Message { private static final long serialVersionUID = 0L; /** Maximum number of cache lookup indexes. */ - public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 5; + public static final int MAX_CACHE_MSG_LOOKUP_INDEX = 6; /** Cache message index field name. */ public static final String CACHE_MSG_INDEX_FIELD_NAME = "CACHE_MSG_IDX"; http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index d494d98..1c83163 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -353,8 +354,10 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte /** * Sends requests to remote nodes. + * + * @param ret Cache operation return value. */ - final void map() { + final void map(GridCacheReturn ret) { boolean fullSync = updateReq.writeSynchronizationMode() == FULL_SYNC; if (!F.isEmpty(mappings)) { @@ -369,6 +372,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte for (GridDhtAtomicAbstractUpdateRequest req : mappings.values()) { try { req.dhtNodes(dhtNodes); + req.setResult(ret.success()); cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); @@ -392,14 +396,18 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte registerResponse(req.nodeId()); } } - } - else - onDone(); - // Send response right away if no ACKs from backup is required. - // Backups will send ACKs anyway, future will be completed after all backups have replied. - if (!fullSync) + // Send response right away if no ACKs from backup is required. + // Backups will send ACKs anyway, future will be completed after all backups have replied. + if (!fullSync) + completionCb.apply(updateReq, updateRes); + } + else { + // Reply. completionCb.apply(updateReq, updateRes); + + onDone(); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java index 3edbf8c..30c07e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateRequest.java @@ -41,6 +41,21 @@ import org.jetbrains.annotations.Nullable; * */ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable { + /** Skip store flag bit mask. */ + public static final int DHT_ATOMIC_SKIP_STORE_FLAG_MASK = 0x01; + + /** Keep binary flag. */ + public static final int DHT_ATOMIC_KEEP_BINARY_FLAG_MASK = 0x02; + + /** Near cache key flag. */ + public static final int DHT_ATOMIC_NEAR_FLAG_MASK = 0x04; + + /** */ + public static final int DHT_ATOMIC_HAS_RESULT_MASK = 0x08; + + /** */ + public static final int DHT_ATOMIC_RESULT_SUCCESS_MASK = 0x10; + /** Message index. */ public static final int CACHE_MSG_IDX = nextIndexId(); @@ -58,6 +73,9 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag /** */ private long nearFutId; + /** Additional flags. */ + protected byte flags; + /** */ @GridDirectCollection(UUID.class) private List<UUID> dhtNodes; @@ -85,6 +103,15 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag } /** + * @param success Success flag. + */ + public void setResult(boolean success) { + setFlag(true, DHT_ATOMIC_HAS_RESULT_MASK); + + setFlag(success, DHT_ATOMIC_RESULT_SUCCESS_MASK); + } + + /** * @return Near node ID. */ public UUID nearNodeId() { @@ -118,6 +145,13 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag } /** + * @return Flags. + */ + public final byte flags() { + return flags; + } + + /** * @return Keep binary flag. */ public abstract boolean keepBinary(); @@ -333,9 +367,29 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag */ @Nullable public abstract Object[] invokeArguments(); + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + protected final void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reags flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + protected final boolean isFlag(int mask) { + return (flags & mask) != 0; + } + /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 6; + return 7; } /** {@inheritDoc} */ @@ -360,12 +414,18 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag writer.incrementState(); case 4: - if (!writer.writeLong("nearFutId", nearFutId)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 5: + if (!writer.writeLong("nearFutId", nearFutId)) + return false; + + writer.incrementState(); + + case 6: if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; @@ -396,7 +456,7 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag reader.incrementState(); case 4: - nearFutId = reader.readLong("nearFutId"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -404,6 +464,14 @@ public abstract class GridDhtAtomicAbstractUpdateRequest extends GridCacheMessag reader.incrementState(); case 5: + nearFutId = reader.readLong("nearFutId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 3b81ee7..2f6e320 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -421,7 +421,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); - ctx.io().addHandler(ctx.cacheId(), GridDhtAtomicNearResponse.class, new CI2<UUID, GridDhtAtomicNearResponse>() { + ctx.io().addHandler(ctx.cacheId(), + GridDhtAtomicNearResponse.class, + new CI2<UUID, GridDhtAtomicNearResponse>() { @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) { processDhtAtomicNearResponse(uuid, msg); } @@ -1969,7 +1971,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { else { // If there are backups, map backup update future. if (dhtFut != null) - dhtFut.map(); + dhtFut.map(res.returnValue()); // Otherwise, complete the call. else completionCb.apply(req, res); @@ -3226,9 +3228,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { GridCacheVersion ver = req.writeVersion(); GridDhtAtomicNearResponse nearRes = ctx.config().getWriteSynchronizationMode() == FULL_SYNC ? - new GridDhtAtomicNearResponse(req.nearFutureId(), req.dhtNodes()) : null; + new GridDhtAtomicNearResponse(ctx.cacheId(), req.nearFutureId(), req.dhtNodes(), req.flags()) : null; - Boolean replicate = ctx.isDrEnabled(); + boolean replicate = ctx.isDrEnabled(); boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java index b662476..bd2bae0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; @@ -42,7 +41,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem public static final int CACHE_MSG_IDX = nextIndexId(); /** ACK future versions. */ - @GridDirectCollection(GridCacheVersion.class) + @GridDirectCollection(Long.class) private Collection<Long> futIds; /** {@inheritDoc} */ @@ -105,7 +104,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem switch (writer.state()) { case 3: - if (!writer.writeCollection("futIds", futIds, MessageCollectionItemType.MSG)) + if (!writer.writeCollection("futIds", futIds, MessageCollectionItemType.LONG)) return false; writer.incrementState(); @@ -127,7 +126,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem switch (reader.state()) { case 3: - futIds = reader.readCollection("futIds", MessageCollectionItemType.MSG); + futIds = reader.readCollection("futIds", MessageCollectionItemType.LONG); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java index 628e1dc..4110b5d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java @@ -26,10 +26,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.*; + /** * TODO IGNITE-4705: no not send mapping if it == affinity? */ @@ -41,12 +44,6 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { public static final int CACHE_MSG_IDX = nextIndexId(); /** */ - private static final int HAS_RESULT_MASK = 0x1; - - /** */ - private static final int RESULT_SUCCESS_MASK = 0x2; - - /** */ private long futId; /** */ @@ -67,12 +64,16 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { } /** + * @param cacheId Cache ID. * @param futId Future ID. * @param mapping Update mapping. + * @param flags Flags. */ - public GridDhtAtomicNearResponse(long futId, List<UUID> mapping) { + public GridDhtAtomicNearResponse(int cacheId, long futId, List<UUID> mapping, byte flags) { + this.cacheId = cacheId; this.futId = futId; this.mapping = mapping; + this.flags = flags; } /** @@ -87,28 +88,19 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { } /** - * @param success Success flag. - */ - public void setResult(boolean success) { - setFlag(true, HAS_RESULT_MASK); - - setFlag(success, RESULT_SUCCESS_MASK); - } - - /** * @return Operation result. */ public GridCacheReturn result() { assert hasResult(); - return new GridCacheReturn(true, isFlag(RESULT_SUCCESS_MASK)); + return new GridCacheReturn(true, isFlag(DHT_ATOMIC_RESULT_SUCCESS_MASK)); } /** * @return {@code True} if response contains operation result. */ public boolean hasResult() { - return isFlag(HAS_RESULT_MASK); + return isFlag(DHT_ATOMIC_HAS_RESULT_MASK); } /** @@ -144,6 +136,11 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { } /** {@inheritDoc} */ + @Override public int lookupIndex() { + return CACHE_MSG_IDX; + } + + /** {@inheritDoc} */ @Override public byte directType() { return -45; } @@ -265,4 +262,9 @@ public class GridDhtAtomicNearResponse extends GridCacheMessage { return reader.afterMessageRead(GridDhtAtomicNearResponse.class); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtAtomicNearResponse.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java index e46c843..678f3f9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java @@ -48,9 +48,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat /** */ private static final long serialVersionUID = 0L; - /** Near cache key flag. */ - private static final int NEAR_FLAG_MASK = 0x80; - /** Future ID on primary. */ protected long futId; @@ -69,9 +66,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat /** Task name hash. */ protected int taskNameHash; - /** Additional flags. */ - protected byte flags; - /** Key to update. */ @GridToStringInclude protected KeyCacheObject key; @@ -226,7 +220,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat /** {@inheritDoc} */ @Override public boolean skipStore() { - return isFlag(SKIP_STORE_FLAG_MASK); + return isFlag(DHT_ATOMIC_SKIP_STORE_FLAG_MASK); } /** {@inheritDoc} */ @@ -364,21 +358,21 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat /** {@inheritDoc} */ @Override public boolean keepBinary() { - return isFlag(KEEP_BINARY_FLAG_MASK); + return isFlag(DHT_ATOMIC_KEEP_BINARY_FLAG_MASK); } /** * */ private boolean near() { - return isFlag(NEAR_FLAG_MASK); + return isFlag(DHT_ATOMIC_NEAR_FLAG_MASK); } /** * */ private void near(boolean near) { - setFlag(near, NEAR_FLAG_MASK); + setFlag(near, DHT_ATOMIC_NEAR_FLAG_MASK); } /** {@inheritDoc} */ @@ -425,12 +419,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat } switch (writer.state()) { - case 6: - if (!writer.writeByte("flags", flags)) - return false; - - writer.incrementState(); - case 7: if (!writer.writeLong("futId", futId)) return false; @@ -513,14 +501,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat return false; switch (reader.state()) { - case 6: - flags = reader.readByte("flags"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - case 7: futId = reader.readLong("futId"); @@ -658,26 +638,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat return 18; } - /** - * Sets flag mask. - * - * @param flag Set or clear. - * @param mask Mask. - */ - private void setFlag(boolean flag, int mask) { - flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); - } - - /** - * Reags flag mask. - * - * @param mask Mask to read. - * @return Flag value. - */ - private boolean isFlag(int mask) { - return (flags & mask) != 0; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtAtomicSingleUpdateRequest.class, this, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 9da6b2e..7a210ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -148,9 +148,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque /** Keep binary flag. */ private boolean keepBinary; - /** Additional flags. */ - private byte flags; - /** * Empty constructor required by {@link Externalizable}. */ @@ -522,7 +519,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque /** {@inheritDoc} */ @Override public boolean skipStore() { - return isFlag(SKIP_STORE_FLAG_MASK); + return isFlag(DHT_ATOMIC_SKIP_STORE_FLAG_MASK); } /** {@inheritDoc} */ @@ -612,26 +609,20 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque } switch (writer.state()) { - case 6: - if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) - return false; - - writer.incrementState(); - case 7: - if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) return false; writer.incrementState(); case 8: - if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) + if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 9: - if (!writer.writeByte("flags", flags)) + if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); @@ -766,16 +757,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque return false; switch (reader.state()) { - case 6: - conflictExpireTimes = reader.readMessage("conflictExpireTimes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - case 7: - conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); + conflictExpireTimes = reader.readMessage("conflictExpireTimes"); if (!reader.isLastRead()) return false; @@ -783,7 +766,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); case 8: - entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); + conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -791,7 +774,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); case 9: - flags = reader.readByte("flags"); + entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) return false; @@ -975,27 +958,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque return 29; } - /** - * Sets flag mask. - * - * @param flag Set or clear. - * @param mask Mask. - */ - private void setFlag(boolean flag, int mask) { - flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); - } - - /** - * Reags flag mask. - * - * @param mask Mask to read. - * @return Flag value. - */ - private boolean isFlag(int mask) { - return (flags & mask) != 0; - } - - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtAtomicUpdateRequest.class, this, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/eef1d310/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java index 106612c..4d12198 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/UpdateErrors.java @@ -28,6 +28,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; @@ -184,4 +185,9 @@ public class UpdateErrors implements Message { @Override public void onAckReceived() { // No-op. } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(UpdateErrors.class, this); + } }
