Changed tx mini future ids from IgniteUuid to int, removed some legacy code from tx processing.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/901be4f4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/901be4f4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/901be4f4 Branch: refs/heads/ignite-4768-1 Commit: 901be4f49440f7488781dd066bbef1cd2a85322f Parents: cbc472f Author: sboikov <[email protected]> Authored: Mon Mar 13 19:11:49 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Mar 13 19:11:49 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheIoManager.java | 2 + .../processors/cache/KeyCacheObjectImpl.java | 9 + .../distributed/GridDistributedBaseMessage.java | 2 +- .../distributed/GridDistributedLockRequest.java | 26 +- .../GridDistributedTxFinishRequest.java | 217 ++++++++++++---- .../GridDistributedTxFinishResponse.java | 75 +++++- .../distributed/GridDistributedTxMapping.java | 45 +--- .../GridDistributedTxPrepareRequest.java | 205 ++++++++------- .../GridDistributedTxPrepareResponse.java | 76 +++++- .../GridDistributedUnlockRequest.java | 18 +- .../distributed/dht/GridDhtLockRequest.java | 16 +- .../distributed/dht/GridDhtTxFinishFuture.java | 47 ++-- .../distributed/dht/GridDhtTxFinishRequest.java | 251 ++++++------------ .../dht/GridDhtTxFinishResponse.java | 70 +++--- .../cache/distributed/dht/GridDhtTxLocal.java | 34 ++- .../distributed/dht/GridDhtTxPrepareFuture.java | 130 +++++----- .../dht/GridDhtTxPrepareRequest.java | 138 ++++------ .../dht/GridDhtTxPrepareResponse.java | 117 ++++----- .../distributed/dht/GridDhtUnlockRequest.java | 15 +- .../colocated/GridDhtColocatedLockFuture.java | 18 +- .../distributed/near/GridNearGetRequest.java | 55 ++-- .../distributed/near/GridNearLockFuture.java | 20 +- .../distributed/near/GridNearLockRequest.java | 26 +- .../distributed/near/GridNearLockResponse.java | 12 +- ...arOptimisticSerializableTxPrepareFuture.java | 72 +++--- .../near/GridNearOptimisticTxPrepareFuture.java | 41 +-- .../GridNearPessimisticTxPrepareFuture.java | 67 +++-- .../near/GridNearSingleGetRequest.java | 46 +--- .../near/GridNearTxFinishFuture.java | 172 +++++++------ .../near/GridNearTxFinishRequest.java | 174 ++----------- .../near/GridNearTxFinishResponse.java | 36 +-- .../cache/distributed/near/GridNearTxLocal.java | 14 +- .../near/GridNearTxPrepareFutureAdapter.java | 19 +- .../near/GridNearTxPrepareRequest.java | 252 +++++++------------ .../near/GridNearTxPrepareResponse.java | 86 +++---- .../distributed/near/GridNearUnlockRequest.java | 20 +- .../distributed/near/IgniteTxMappingsImpl.java | 4 +- .../near/IgniteTxMappingsSingleImpl.java | 6 +- .../cache/transactions/IgniteTxEntry.java | 44 +--- .../cache/transactions/IgniteTxHandler.java | 92 ++++--- .../transactions/IgniteTxLocalAdapter.java | 4 +- 41 files changed, 1252 insertions(+), 1521 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index 1cd8fbe..99878ec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -557,6 +557,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridDhtTxPrepareRequest req = (GridDhtTxPrepareRequest)msg; GridDhtTxPrepareResponse res = new GridDhtTxPrepareResponse( + req.partition(), req.version(), req.futureId(), req.miniId(), @@ -697,6 +698,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearTxPrepareRequest req = (GridNearTxPrepareRequest)msg; GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( + req.partition(), req.version(), req.futureId(), req.miniId(), http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java index 4f8570c..48797b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/KeyCacheObjectImpl.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache; import java.nio.ByteBuffer; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.Nullable; @@ -200,4 +201,12 @@ public class KeyCacheObjectImpl extends CacheObjectAdapter implements KeyCacheOb return val.equals(other.val); } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(S.INCLUDE_SENSITIVE ? getClass().getSimpleName() : "KeyCacheObject", + "part", part, true, + "val", val, true, + "hasValBytes", valBytes != null, false); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java index ebbc9ae..630c79f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java @@ -142,7 +142,7 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem /** * @return Count of keys referenced in candidates array (needed only locally for optimization). */ - public int keysCount() { + int keysCount() { return cnt; } http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index a671296..48c01f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -79,10 +79,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { @GridDirectCollection(KeyCacheObject.class) private List<KeyCacheObject> keys; - /** Partition IDs of keys to lock. */ - @GridDirectCollection(int.class) - protected List<Integer> partIds; - /** Array indicating whether value should be returned for a key. */ @GridToStringInclude private boolean[] retVals; @@ -226,13 +222,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { } /** - * @return Return flags. - */ - public boolean[] returnFlags() { - return retVals; - } - - /** * Sets skip store flag value. * * @param skipStore Skip store flag. @@ -289,15 +278,11 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { boolean retVal, GridCacheContext ctx ) throws IgniteCheckedException { - if (keys == null) { + if (keys == null) keys = new ArrayList<>(keysCount()); - partIds = new ArrayList<>(keysCount()); - } keys.add(key); - partIds.add(key.partition()); - retVals[idx] = retVal; idx++; @@ -312,7 +297,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { /** {@inheritDoc} */ @Override public int partition() { - return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + return keys != null && !keys.isEmpty() ? keys.get(0).partition() : -1; } /** @@ -344,13 +329,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { GridCacheContext cctx = ctx.cacheContext(cacheId); finishUnmarshalCacheObjects(keys, cctx, ldr); - - if (partIds != null && !partIds.isEmpty()) { - assert partIds.size() == keys.size(); - - for (int i = 0; i < keys.size(); i++) - keys.get(i).partition(partIds.get(i)); - } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java index 3e47cc9..ab9f0ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -20,18 +20,24 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAware; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + /** * Transaction completion message. */ @@ -39,6 +45,27 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i /** */ private static final long serialVersionUID = 0L; + /** */ + protected static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01; + + /** */ + protected static final int CHECK_COMMITTED_FLAG_MASK = 0x02; + + /** */ + protected static final int NEED_RETURN_VALUE_FLAG_MASK = 0x04; + + /** */ + protected static final int SYS_INVALIDATE_FLAG_MASK = 0x08; + + /** */ + protected static final int EXPLICIT_LOCK_FLAG_MASK = 0x10; + + /** */ + protected static final int STORE_ENABLED_FLAG_MASK = 0x20; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + /** Future ID. */ private IgniteUuid futId; @@ -54,14 +81,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i /** Commit flag. */ private boolean commit; - /** Sync commit flag. */ - @Deprecated - private boolean syncCommit; - - /** Sync commit flag. */ - @Deprecated - private boolean syncRollback; - /** Min version used as base for completed versions. */ private GridCacheVersion baseVer; @@ -74,6 +93,18 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i /** IO policy. */ private byte plc; + /** Subject ID. */ + private UUID subjId; + + /** Task name hash. */ + private int taskNameHash; + + /** */ + private byte flags; + + /** Write synchronization mode. */ + private CacheWriteSynchronizationMode syncMode; + /** Transient TX state. */ @GridDirectTransient private IgniteTxState txState; @@ -94,8 +125,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i * @param invalidate Invalidate flag. * @param sys System transaction flag. * @param plc IO policy. - * @param syncCommit Sync commit flag. - * @param syncRollback Sync rollback flag. + * @param syncMode Write synchronization mode. * @param baseVer Base version. * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. @@ -105,39 +135,93 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i public GridDistributedTxFinishRequest( GridCacheVersion xidVer, IgniteUuid futId, + @NotNull AffinityTopologyVersion topVer, @Nullable GridCacheVersion commitVer, long threadId, boolean commit, boolean invalidate, boolean sys, byte plc, - boolean syncCommit, - boolean syncRollback, + CacheWriteSynchronizationMode syncMode, GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers, + @Nullable UUID subjId, + int taskNameHash, int txSize, boolean addDepInfo ) { super(xidVer, 0, addDepInfo); + assert xidVer != null; + assert syncMode != null; this.futId = futId; + this.topVer = topVer; this.commitVer = commitVer; this.threadId = threadId; this.commit = commit; this.invalidate = invalidate; this.sys = sys; this.plc = plc; - this.syncCommit = syncCommit; - this.syncRollback = syncRollback; + this.syncMode = syncMode; this.baseVer = baseVer; + this.subjId = subjId; + this.taskNameHash = taskNameHash; this.txSize = txSize; completedVersions(committedVers, rolledbackVers); } /** + * @return Transaction write synchronization mode (can be null is message sent from old nodes). + */ + public final CacheWriteSynchronizationMode syncMode() { + return syncMode; + } + + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + protected final void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reags flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + protected final boolean isFlag(int mask) { + return (flags & mask) != 0; + } + + /** + * @return Subject ID. + */ + @Nullable public final UUID subjectId() { + return subjId; + } + + /** + * @return Task name hash. + */ + public final int taskNameHash() { + return taskNameHash; + } + + /** + * @return Topology version. + */ + @Override public final AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** * @return System transaction flag. */ public boolean system() { @@ -188,27 +272,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i } /** - * @return Sync commit flag. - */ - public boolean syncCommit() { - return syncCommit; - } - - /** - * @param syncCommit Sync commit flag. - */ - public void syncCommit(boolean syncCommit) { - this.syncCommit = syncCommit; - } - - /** - * @return Sync rollback flag. - */ - public boolean syncRollback() { - return syncRollback; - } - - /** * @return Base version. */ public GridCacheVersion baseVersion() { @@ -227,7 +290,9 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i * @return {@code True} if reply is required. */ public boolean replyRequired() { - return commit ? syncCommit : syncRollback; + assert syncMode != null; + + return syncMode == FULL_SYNC; } /** {@inheritDoc} */ @@ -279,48 +344,66 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i writer.incrementState(); case 10: - if (!writer.writeIgniteUuid("futId", futId)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 11: - if (!writer.writeBoolean("invalidate", invalidate)) + if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); case 12: - if (!writer.writeByte("plc", plc)) + if (!writer.writeBoolean("invalidate", invalidate)) return false; writer.incrementState(); case 13: - if (!writer.writeBoolean("syncCommit", syncCommit)) + if (!writer.writeByte("plc", plc)) return false; writer.incrementState(); case 14: - if (!writer.writeBoolean("syncRollback", syncRollback)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 15: - if (!writer.writeBoolean("sys", sys)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 16: - if (!writer.writeLong("threadId", threadId)) + if (!writer.writeBoolean("sys", sys)) return false; writer.incrementState(); case 17: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 18: + if (!writer.writeLong("threadId", threadId)) + return false; + + writer.incrementState(); + + case 19: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 20: if (!writer.writeInt("txSize", txSize)) return false; @@ -367,7 +450,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i reader.incrementState(); case 10: - futId = reader.readIgniteUuid("futId"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -375,7 +458,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i reader.incrementState(); case 11: - invalidate = reader.readBoolean("invalidate"); + futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) return false; @@ -383,7 +466,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i reader.incrementState(); case 12: - plc = reader.readByte("plc"); + invalidate = reader.readBoolean("invalidate"); if (!reader.isLastRead()) return false; @@ -391,7 +474,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i reader.incrementState(); case 13: - syncCommit = reader.readBoolean("syncCommit"); + plc = reader.readByte("plc"); if (!reader.isLastRead()) return false; @@ -399,7 +482,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i reader.incrementState(); case 14: - syncRollback = reader.readBoolean("syncRollback"); + subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) return false; @@ -407,15 +490,19 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i reader.incrementState(); case 15: - sys = reader.readBoolean("sys"); + byte syncModeOrd; + + syncModeOrd = reader.readByte("syncMode"); if (!reader.isLastRead()) return false; + syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); + reader.incrementState(); case 16: - threadId = reader.readLong("threadId"); + sys = reader.readBoolean("sys"); if (!reader.isLastRead()) return false; @@ -423,6 +510,30 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i reader.incrementState(); case 17: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 18: + threadId = reader.readLong("threadId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 19: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 20: txSize = reader.readInt("txSize"); if (!reader.isLastRead()) @@ -442,7 +553,7 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage i /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 18; + return 21; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java index 109d665..2c446c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java @@ -17,13 +17,14 @@ package org.apache.ignite.internal.processors.cache.distributed; -import java.io.Externalizable; import java.nio.ByteBuffer; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.managers.communication.GridIoMessageFactory; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -41,25 +42,59 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { /** Future ID. */ private IgniteUuid futId; + /** */ + @GridToStringExclude + private byte flags; + + /** */ + private int part; + /** - * Empty constructor required by {@link Externalizable}. + * Empty constructor required by {@link GridIoMessageFactory}. */ public GridDistributedTxFinishResponse() { /* No-op. */ } /** + * @param part Partition. * @param txId Transaction id. * @param futId Future ID. */ - public GridDistributedTxFinishResponse(GridCacheVersion txId, IgniteUuid futId) { + public GridDistributedTxFinishResponse(int part, GridCacheVersion txId, IgniteUuid futId) { assert txId != null; assert futId != null; + this.part = part; this.txId = txId; this.futId = futId; } + /** {@inheritDoc} */ + @Override public final int partition() { + return part; + } + + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + protected final void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reags flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + protected final boolean isFlag(int mask) { + return (flags & mask) != 0; + } + /** * * @return Transaction id. @@ -101,12 +136,24 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { switch (writer.state()) { case 3: - if (!writer.writeIgniteUuid("futId", futId)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); case 4: + if (!writer.writeIgniteUuid("futId", futId)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeInt("part", part)) + return false; + + writer.incrementState(); + + case 6: if (!writer.writeMessage("txId", txId)) return false; @@ -129,7 +176,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { switch (reader.state()) { case 3: - futId = reader.readIgniteUuid("futId"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -137,6 +184,22 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { reader.incrementState(); case 4: + futId = reader.readIgniteUuid("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + part = reader.readInt("part"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: txId = reader.readMessage("txId"); if (!reader.isLastRead()) @@ -156,7 +219,7 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 5; + return 7; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index 8c9f181..f8cec50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -17,10 +17,6 @@ package org.apache.ignite.internal.processors.cache.distributed; -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashSet; @@ -33,19 +29,15 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; /** * Transaction node mapping. */ -public class GridDistributedTxMapping implements Externalizable { - /** */ - private static final long serialVersionUID = 0L; - +public class GridDistributedTxMapping { /** Mapped node. */ @GridToStringExclude - private ClusterNode node; + private ClusterNode primary; /** Entries. */ @GridToStringInclude @@ -67,17 +59,10 @@ public class GridDistributedTxMapping implements Externalizable { private boolean clientFirst; /** - * Empty constructor required for {@link Externalizable}. + * @param primary Primary node. */ - public GridDistributedTxMapping() { - // No-op. - } - - /** - * @param node Mapped node. - */ - public GridDistributedTxMapping(ClusterNode node) { - this.node = node; + public GridDistributedTxMapping(ClusterNode primary) { + this.primary = primary; entries = new LinkedHashSet<>(); } @@ -127,8 +112,8 @@ public class GridDistributedTxMapping implements Externalizable { /** * @return Node. */ - public ClusterNode node() { - return node; + public ClusterNode primary() { + return primary; } /** @@ -235,21 +220,7 @@ public class GridDistributedTxMapping implements Externalizable { } /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(node); - - U.writeCollection(out, entries); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - node = (ClusterNode)in.readObject(); - - entries = U.readCollection(in); - } - - /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridDistributedTxMapping.class, this, "node", node.id()); + return S.toString(GridDistributedTxMapping.class, this, "node", primary.id()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index 5e1499c..acf6bc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -40,11 +40,11 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateAwa import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -60,18 +60,30 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** */ private static final long serialVersionUID = 0L; - /** Version in which direct marshalling of tx nodes was introduced. */ - public static final IgniteProductVersion TX_NODES_DIRECT_MARSHALLABLE_SINCE = IgniteProductVersion.fromString("1.5.0"); + /** */ + private static final int NEED_RETURN_VALUE_FLAG_MASK = 0x01; + + /** */ + private static final int INVALIDATE_FLAG_MASK = 0x02; + + /** */ + private static final int ONE_PHASE_COMMIT_FLAG_MASK = 0x04; + + /** */ + private static final int LAST_REQ_FLAG_MASK = 0x08; + + /** */ + private static final int SYSTEM_TX_FLAG_MASK = 0x10; /** Collection to message converter. */ - public static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() { + private static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() { @Override public UUIDCollectionMessage apply(Collection<UUID> uuids) { return new UUIDCollectionMessage(uuids); } }; /** Message to collection converter. */ - public static final C1<UUIDCollectionMessage, Collection<UUID>> MSG_TO_COL = new C1<UUIDCollectionMessage, Collection<UUID>>() { + private static final C1<UUIDCollectionMessage, Collection<UUID>> MSG_TO_COL = new C1<UUIDCollectionMessage, Collection<UUID>>() { @Override public Collection<UUID> apply(UUIDCollectionMessage msg) { return msg.uuids(); } @@ -97,10 +109,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage @GridToStringInclude private long timeout; - /** Invalidation flag. */ - @GridToStringInclude - private boolean invalidate; - /** Transaction read set. */ @GridToStringInclude @GridDirectCollection(IgniteTxEntry.class) @@ -135,15 +143,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage @GridDirectMap(keyType = UUID.class, valueType = UUIDCollectionMessage.class) private Map<UUID, UUIDCollectionMessage> txNodesMsg; - /** */ - private byte[] txNodesBytes; - - /** One phase commit flag. */ - private boolean onePhaseCommit; - - /** System flag. */ - private boolean sys; - /** IO policy. */ private byte plc; @@ -151,6 +150,10 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage @GridDirectTransient private IgniteTxState txState; + /** */ + @GridToStringExclude + private byte flags; + /** * Required by {@link Externalizable}. */ @@ -164,6 +167,8 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage * @param reads Read entries. * @param writes Write entries. * @param txNodes Transaction nodes mapping. + * @param retVal Return value flag. + * @param last Last request flag. * @param onePhaseCommit One phase commit flag. * @param addDepInfo Deployment info flag. */ @@ -173,6 +178,8 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage @Nullable Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes, Map<UUID, Collection<UUID>> txNodes, + boolean retVal, + boolean last, boolean onePhaseCommit, boolean addDepInfo ) { @@ -182,16 +189,33 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage threadId = tx.threadId(); concurrency = tx.concurrency(); isolation = tx.isolation(); - invalidate = tx.isInvalidate(); txSize = tx.size(); - sys = tx.system(); plc = tx.ioPolicy(); this.timeout = timeout; this.reads = reads; this.writes = writes; this.txNodes = txNodes; - this.onePhaseCommit = onePhaseCommit; + + setFlag(tx.system(), SYSTEM_TX_FLAG_MASK); + setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK); + setFlag(tx.isInvalidate(), INVALIDATE_FLAG_MASK); + setFlag(onePhaseCommit, ONE_PHASE_COMMIT_FLAG_MASK); + setFlag(last, LAST_REQ_FLAG_MASK); + } + + /** + * @return Flag indicating whether transaction needs return value. + */ + public final boolean needReturnValue() { + return isFlag(NEED_RETURN_VALUE_FLAG_MASK); + } + + /** + * @param retVal Need return value. + */ + public final void needReturnValue(boolean retVal) { + setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK); } /** @@ -204,8 +228,8 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** * @return System flag. */ - public boolean system() { - return sys; + public final boolean system() { + return isFlag(SYSTEM_TX_FLAG_MASK); } /** @@ -253,7 +277,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage * @return Invalidate flag. */ public boolean isInvalidate() { - return invalidate; + return isFlag(INVALIDATE_FLAG_MASK); } /** @@ -316,7 +340,14 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage * @return One phase commit flag. */ public boolean onePhaseCommit() { - return onePhaseCommit; + return isFlag(ONE_PHASE_COMMIT_FLAG_MASK); + } + + /** + * @return {@code True} if this is last prepare request for node. + */ + public boolean last() { + return isFlag(LAST_REQ_FLAG_MASK); } /** {@inheritDoc} */ @@ -351,15 +382,8 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage dhtVerVals = dhtVers.values(); } - // Marshal txNodes only if there is a node in topology with an older version. - if (ctx.exchange().minimumNodeVersion(topologyVersion()).compareTo(TX_NODES_DIRECT_MARSHALLABLE_SINCE) < 0) { - if (txNodes != null && txNodesBytes == null) - txNodesBytes = U.marshal(ctx, txNodes); - } - else { - if (txNodesMsg == null) - txNodesMsg = F.viewReadOnly(txNodes, COL_TO_MSG); - } + if (txNodesMsg == null) + txNodesMsg = F.viewReadOnly(txNodes, COL_TO_MSG); } /** {@inheritDoc} */ @@ -392,9 +416,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage if (txNodesMsg != null) txNodes = F.viewReadOnly(txNodesMsg, MSG_TO_COL); - - if (txNodesBytes != null && txNodes == null) - txNodes = U.unmarshal(ctx, txNodesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } /** {@inheritDoc} */ @@ -407,6 +428,26 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage return ctx.txPrepareMessageLogger(); } + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + private void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reags flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + private boolean isFlag(int mask) { + return (flags & mask) != 0; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -441,7 +482,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage writer.incrementState(); case 10: - if (!writer.writeBoolean("invalidate", invalidate)) + if (!writer.writeByte("flags", flags)) return false; writer.incrementState(); @@ -453,66 +494,48 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage writer.incrementState(); case 12: - if (!writer.writeBoolean("onePhaseCommit", onePhaseCommit)) - return false; - - writer.incrementState(); - - case 13: if (!writer.writeByte("plc", plc)) return false; writer.incrementState(); - case 14: + case 13: if (!writer.writeCollection("reads", reads, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 15: - if (!writer.writeBoolean("sys", sys)) - return false; - - writer.incrementState(); - - case 16: + case 14: if (!writer.writeLong("threadId", threadId)) return false; writer.incrementState(); - case 17: + case 15: if (!writer.writeLong("timeout", timeout)) return false; writer.incrementState(); - case 18: - if (!writer.writeByteArray("txNodesBytes", txNodesBytes)) - return false; - - writer.incrementState(); - - case 19: + case 16: if (!writer.writeMap("txNodesMsg", txNodesMsg, MessageCollectionItemType.UUID, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 20: + case 17: if (!writer.writeInt("txSize", txSize)) return false; writer.incrementState(); - case 21: + case 18: if (!writer.writeMessage("writeVer", writeVer)) return false; writer.incrementState(); - case 22: + case 19: if (!writer.writeCollection("writes", writes, MessageCollectionItemType.MSG)) return false; @@ -563,7 +586,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 10: - invalidate = reader.readBoolean("invalidate"); + flags = reader.readByte("flags"); if (!reader.isLastRead()) return false; @@ -583,14 +606,6 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 12: - onePhaseCommit = reader.readBoolean("onePhaseCommit"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 13: plc = reader.readByte("plc"); if (!reader.isLastRead()) @@ -598,7 +613,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 14: + case 13: reads = reader.readCollection("reads", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -606,15 +621,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 15: - sys = reader.readBoolean("sys"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 16: + case 14: threadId = reader.readLong("threadId"); if (!reader.isLastRead()) @@ -622,7 +629,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 17: + case 15: timeout = reader.readLong("timeout"); if (!reader.isLastRead()) @@ -630,15 +637,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 18: - txNodesBytes = reader.readByteArray("txNodesBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 19: + case 16: txNodesMsg = reader.readMap("txNodesMsg", MessageCollectionItemType.UUID, MessageCollectionItemType.MSG, false); if (!reader.isLastRead()) @@ -646,7 +645,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 20: + case 17: txSize = reader.readInt("txSize"); if (!reader.isLastRead()) @@ -654,7 +653,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 21: + case 18: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -662,7 +661,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); - case 22: + case 19: writes = reader.readCollection("writes", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -682,12 +681,26 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 23; + return 20; } /** {@inheritDoc} */ @Override public String toString() { + StringBuilder flags = new StringBuilder(); + + if (needReturnValue()) + flags.append("retVal"); + if (isInvalidate()) + flags.append("invalidate"); + if (onePhaseCommit()) + flags.append("onePhase"); + if (last()) + flags.append("last"); + if (system()) + flags.append("sys"); + return GridToStringBuilder.toString(GridDistributedTxPrepareRequest.class, this, + "flags", flags.toString(), "super", super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java index 76a5e31..53a1391 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareResponse.java @@ -51,6 +51,12 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage @GridDirectTransient private IgniteTxState txState; + /** */ + private int part; + + /** */ + private byte flags; + /** * Empty constructor (required by {@link Externalizable}). */ @@ -59,24 +65,54 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage } /** - * @param xid Transaction ID. + * @param part Partition. + * @param xid Lock or transaction ID. * @param addDepInfo Deployment info flag. */ - public GridDistributedTxPrepareResponse(GridCacheVersion xid, boolean addDepInfo) { + public GridDistributedTxPrepareResponse(int part, GridCacheVersion xid, boolean addDepInfo) { super(xid, 0, addDepInfo); + + this.part = part; } /** - * @param xid Lock ID. + * @param part Partition. + * @param xid Lock or transaction ID. * @param err Error. * @param addDepInfo Deployment info flag. */ - public GridDistributedTxPrepareResponse(GridCacheVersion xid, Throwable err, boolean addDepInfo) { + public GridDistributedTxPrepareResponse(int part, GridCacheVersion xid, Throwable err, boolean addDepInfo) { super(xid, 0, addDepInfo); + this.part = part; this.err = err; } + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + protected final void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reags flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + protected final boolean isFlag(int mask) { + return (flags & mask) != 0; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return part; + } + /** {@inheritDoc} */ @Override public Throwable error() { return err; @@ -106,8 +142,6 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage this.txState = txState; } - /** {@inheritDoc} - * @param ctx*/ /** {@inheritDoc} */ @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { return ctx.txPrepareMessageLogger(); @@ -150,6 +184,18 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage writer.incrementState(); + case 8: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeInt("part", part)) + return false; + + writer.incrementState(); + } return true; @@ -174,6 +220,22 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage reader.incrementState(); + case 8: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + part = reader.readInt("part"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(GridDistributedTxPrepareResponse.class); @@ -186,7 +248,7 @@ public class GridDistributedTxPrepareResponse extends GridDistributedBaseMessage /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 8; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java index 5d70ec1..be7ecf8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java @@ -45,10 +45,6 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage { @GridDirectCollection(KeyCacheObject.class) private List<KeyCacheObject> keys; - /** Partition IDs. */ - @GridDirectCollection(int.class) - protected List<Integer> partIds; - /** * Empty constructor required by {@link Externalizable}. */ @@ -80,18 +76,15 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage { * @throws IgniteCheckedException If failed. */ public void addKey(KeyCacheObject key, GridCacheContext ctx) throws IgniteCheckedException { - if (keys == null) { + if (keys == null) keys = new ArrayList<>(keysCount()); - partIds = new ArrayList<>(keysCount()); - } keys.add(key); - partIds.add(key.partition()); } /** {@inheritDoc} */ @Override public int partition() { - return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + return keys != null && !keys.isEmpty() ? keys.get(0).partition() : -1; } /** {@inheritDoc} @@ -107,13 +100,6 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage { super.finishUnmarshal(ctx, ldr); finishUnmarshalCacheObjects(keys, ctx.cacheContext(cacheId), ldr); - - if (partIds != null && !partIds.isEmpty()) { - assert partIds.size() == keys.size(); - - for (int i = 0; i < keys.size(); i++) - keys.get(i).partition(partIds.get(i)); - } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index 95c6dfc..50167d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -419,12 +419,6 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { writer.incrementState(); - case 30: - if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) - return false; - - writer.incrementState(); - } return true; @@ -521,14 +515,6 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 30: - partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } return reader.afterMessageRead(GridDhtLockRequest.class); @@ -541,7 +527,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 31; + return 30; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 60e07b3..17e9047 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -179,7 +179,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; - if (f.futureId().equals(res.miniId())) { + if (f.futureId() == res.miniId()) { found = true; assert f.node().id().equals(nodeId); @@ -304,10 +304,12 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur boolean res = false; + int miniId = 0; + for (ClusterNode n : nodes) { assert !n.isLocal(); - MiniFuture fut = new MiniFuture(n); + MiniFuture fut = new MiniFuture(++miniId, n); add(fut); // Append new future. @@ -325,8 +327,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.system(), tx.ioPolicy(), tx.isSystemInvalidate(), - sync, - sync, + sync ? FULL_SYNC : tx.syncMode(), tx.completedBase(), tx.committedVersions(), tx.rolledbackVersions(), @@ -391,9 +392,11 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur boolean res = false; + int miniId = 0; + // Create mini futures. for (GridDistributedTxMapping dhtMapping : dhtMap.values()) { - ClusterNode n = dhtMapping.node(); + ClusterNode n = dhtMapping.primary(); assert !n.isLocal(); @@ -403,7 +406,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur // Nothing to send. continue; - MiniFuture fut = new MiniFuture(dhtMapping, nearMapping); + MiniFuture fut = new MiniFuture(++miniId, dhtMapping, nearMapping); add(fut); // Append new future. @@ -426,8 +429,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.system(), tx.ioPolicy(), tx.isSystemInvalidate(), - sync, - sync, + sync ? FULL_SYNC : tx.syncMode(), tx.completedBase(), tx.committedVersions(), tx.rolledbackVersions(), @@ -474,12 +476,12 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur } for (GridDistributedTxMapping nearMapping : nearMap.values()) { - if (!dhtMap.containsKey(nearMapping.node().id())) { + if (!dhtMap.containsKey(nearMapping.primary().id())) { if (nearMapping.empty()) // Nothing to send. continue; - MiniFuture fut = new MiniFuture(null, nearMapping); + MiniFuture fut = new MiniFuture(++miniId, null, nearMapping); add(fut); // Append new future. @@ -497,8 +499,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.system(), tx.ioPolicy(), tx.isSystemInvalidate(), - sync, - sync, + sync ? FULL_SYNC : tx.syncMode(), tx.completedBase(), tx.committedVersions(), tx.rolledbackVersions(), @@ -513,12 +514,12 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur req.writeVersion(tx.writeVersion()); try { - cctx.io().send(nearMapping.node(), req, tx.ioPolicy()); + cctx.io().send(nearMapping.primary(), req, tx.ioPolicy()); if (msgLog.isDebugEnabled()) { msgLog.debug("DHT finish fut, sent request near [txId=" + tx.nearXidVersion() + ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.node().id() + ']'); + ", node=" + nearMapping.primary().id() + ']'); } if (sync) @@ -534,7 +535,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur if (msgLog.isDebugEnabled()) { msgLog.debug("DHT finish fut, failed to send request near [txId=" + tx.nearXidVersion() + ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.node().id() + + ", node=" + nearMapping.primary().id() + ", err=" + e + ']'); } @@ -573,7 +574,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur private static final long serialVersionUID = 0L; /** */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); + private final int futId; /** DHT mapping. */ @GridToStringInclude @@ -588,19 +589,23 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur private ClusterNode node; /** + * @param futId Future ID. * @param node Node. */ - private MiniFuture(ClusterNode node) { + private MiniFuture(int futId, ClusterNode node) { + this.futId = futId; this.node = node; } /** + * @param futId Future ID. * @param dhtMapping Mapping. * @param nearMapping nearMapping. */ - MiniFuture(GridDistributedTxMapping dhtMapping, GridDistributedTxMapping nearMapping) { - assert dhtMapping == null || nearMapping == null || dhtMapping.node().equals(nearMapping.node()); + MiniFuture(int futId, GridDistributedTxMapping dhtMapping, GridDistributedTxMapping nearMapping) { + assert dhtMapping == null || nearMapping == null || dhtMapping.primary().equals(nearMapping.primary()); + this.futId = futId; this.dhtMapping = dhtMapping; this.nearMapping = nearMapping; } @@ -608,7 +613,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** * @return Future ID. */ - IgniteUuid futureId() { + int futureId() { return futId; } @@ -616,7 +621,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @return Node ID. */ public ClusterNode node() { - return node != null ? node : dhtMapping != null ? dhtMapping.node() : nearMapping.node(); + return node != null ? node : dhtMapping != null ? dhtMapping.primary() : nearMapping.primary(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index c618a18..d9b3ae7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -21,6 +21,7 @@ import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Collection; import java.util.UUID; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishRequest; @@ -43,12 +44,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** */ private static final long serialVersionUID = 0L; - /** */ - public static final int WAIT_REMOTE_TX_FLAG_MASK = 0x01; - - /** */ - public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x02; - /** Near node ID. */ private UUID nearNodeId; @@ -56,22 +51,13 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { private TransactionIsolation isolation; /** Mini future ID. */ - private IgniteUuid miniId; - - /** System invalidation flag. */ - private boolean sysInvalidate; - - /** Topology version. */ - private AffinityTopologyVersion topVer; + private int miniId; /** Pending versions with order less than one for this message (needed for commit ordering). */ @GridToStringInclude @GridDirectCollection(GridCacheVersion.class) private Collection<GridCacheVersion> pendingVers; - /** Check committed flag. */ - private boolean checkCommitted; - /** Partition update counter. */ @GridToStringInclude @GridDirectCollection(Long.class) @@ -80,15 +66,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** One phase commit write version. */ private GridCacheVersion writeVer; - /** Subject ID. */ - private UUID subjId; - - /** Task name hash. */ - private int taskNameHash; - - /** */ - private byte flags; - /** * Empty constructor required for {@link Externalizable}. */ @@ -110,8 +87,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { * @param sys System flag. * @param plc IO policy. * @param sysInvalidate System invalidation flag. - * @param syncCommit Synchronous commit flag. - * @param syncRollback Synchronous rollback flag. + * @param syncMode Write synchronization mode. * @param baseVer Base version. * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. @@ -124,7 +100,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { public GridDhtTxFinishRequest( UUID nearNodeId, IgniteUuid futId, - IgniteUuid miniId, + int miniId, @NotNull AffinityTopologyVersion topVer, GridCacheVersion xidVer, GridCacheVersion commitVer, @@ -135,8 +111,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { boolean sys, byte plc, boolean sysInvalidate, - boolean syncCommit, - boolean syncRollback, + CacheWriteSynchronizationMode syncMode, GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers, @@ -151,35 +126,34 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { super( xidVer, futId, + topVer, commitVer, threadId, commit, invalidate, sys, plc, - syncCommit, - syncRollback, + syncMode, baseVer, committedVers, rolledbackVers, + subjId, + taskNameHash, txSize, addDepInfo); - assert miniId != null; + assert miniId != 0; assert nearNodeId != null; assert isolation != null; this.pendingVers = pendingVers; - this.topVer = topVer; this.nearNodeId = nearNodeId; this.isolation = isolation; this.miniId = miniId; - this.sysInvalidate = sysInvalidate; - this.subjId = subjId; - this.taskNameHash = taskNameHash; needReturnValue(retVal); waitRemoteTransactions(waitRemoteTxs); + systemInvalidate(sysInvalidate); } /** @@ -196,8 +170,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { * @param sys System flag. * @param plc IO policy. * @param sysInvalidate System invalidation flag. - * @param syncCommit Synchronous commit flag. - * @param syncRollback Synchronous rollback flag. + * @param syncMode Write synchronization mode. * @param baseVer Base version. * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. @@ -211,7 +184,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { public GridDhtTxFinishRequest( UUID nearNodeId, IgniteUuid futId, - IgniteUuid miniId, + int miniId, @NotNull AffinityTopologyVersion topVer, GridCacheVersion xidVer, GridCacheVersion commitVer, @@ -222,8 +195,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { boolean sys, byte plc, boolean sysInvalidate, - boolean syncCommit, - boolean syncRollback, + CacheWriteSynchronizationMode syncMode, GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers, @@ -236,9 +208,30 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { boolean retVal, boolean waitRemoteTxs ) { - this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc, - sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize, - subjId, taskNameHash, addDepInfo, retVal, waitRemoteTxs); + this(nearNodeId, + futId, + miniId, + topVer, + xidVer, + commitVer, + threadId, + isolation, + commit, + invalidate, + sys, + plc, + sysInvalidate, + syncMode, + baseVer, + committedVers, + rolledbackVers, + pendingVers, + txSize, + subjId, + taskNameHash, + addDepInfo, + retVal, + waitRemoteTxs); if (updateIdxs != null && !updateIdxs.isEmpty()) { partUpdateCnt = new GridLongList(updateIdxs.size()); @@ -258,25 +251,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** * @return Mini ID. */ - public IgniteUuid miniId() { + public int miniId() { return miniId; } /** - * @return Subject ID. - */ - @Nullable public UUID subjectId() { - return subjId; - } - - /** - * @return Task name hash. - */ - public int taskNameHash() { - return taskNameHash; - } - - /** * @return Transaction isolation. */ public TransactionIsolation isolation() { @@ -294,7 +273,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { * @return System invalidate flag. */ public boolean isSystemInvalidate() { - return sysInvalidate; + return isFlag(SYS_INVALIDATE_FLAG_MASK); + } + + /** + * @param sysInvalidate System invalidation flag. + */ + private void systemInvalidate(boolean sysInvalidate) { + setFlag(sysInvalidate, SYS_INVALIDATE_FLAG_MASK); } /** @@ -312,63 +298,45 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { } /** - * @return Topology version. - */ - @Override public AffinityTopologyVersion topologyVersion() { - return topVer; - } - - /** * @return Check committed flag. */ public boolean checkCommitted() { - return checkCommitted; + return isFlag(CHECK_COMMITTED_FLAG_MASK); } /** * @param checkCommitted Check committed flag. */ public void checkCommitted(boolean checkCommitted) { - this.checkCommitted = checkCommitted; + setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK); } /** * @return {@code True} */ public boolean waitRemoteTransactions() { - return (flags & WAIT_REMOTE_TX_FLAG_MASK) != 0; + return isFlag(WAIT_REMOTE_TX_FLAG_MASK); } /** * @param waitRemoteTxs Wait remote transactions flag. */ - public void waitRemoteTransactions(boolean waitRemoteTxs) { - if (waitRemoteTxs) - flags = (byte)(flags | WAIT_REMOTE_TX_FLAG_MASK); - else - flags &= ~WAIT_REMOTE_TX_FLAG_MASK; + private void waitRemoteTransactions(boolean waitRemoteTxs) { + setFlag(waitRemoteTxs, WAIT_REMOTE_TX_FLAG_MASK); } /** * @return Flag indicating whether transaction needs return value. */ public boolean needReturnValue() { - return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0; + return isFlag(NEED_RETURN_VALUE_FLAG_MASK); } /** * @param retVal Need return value. */ public void needReturnValue(boolean retVal) { - if (retVal) - flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK); - else - flags &= ~NEED_RETURN_VALUE_FLAG_MASK; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtTxFinishRequest.class, this, super.toString()); + setFlag(retVal, NEED_RETURN_VALUE_FLAG_MASK); } /** {@inheritDoc} */ @@ -386,73 +354,37 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { } switch (writer.state()) { - case 18: - if (!writer.writeBoolean("checkCommitted", checkCommitted)) - return false; - - writer.incrementState(); - - case 19: - if (!writer.writeByte("flags", flags)) - return false; - - writer.incrementState(); - - case 20: - if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) - return false; - - writer.incrementState(); - case 21: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) return false; writer.incrementState(); case 22: - if (!writer.writeUuid("nearNodeId", nearNodeId)) + if (!writer.writeInt("miniId", miniId)) return false; writer.incrementState(); case 23: - if (!writer.writeMessage("partUpdateCnt", partUpdateCnt)) + if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); case 24: - if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("partUpdateCnt", partUpdateCnt)) return false; writer.incrementState(); case 25: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 26: - if (!writer.writeBoolean("sysInvalidate", sysInvalidate)) - return false; - - writer.incrementState(); - - case 27: - if (!writer.writeInt("taskNameHash", taskNameHash)) - return false; - - writer.incrementState(); - - case 28: - if (!writer.writeMessage("topVer", topVer)) - return false; - - writer.incrementState(); - - case 29: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -474,23 +406,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { return false; switch (reader.state()) { - case 18: - checkCommitted = reader.readBoolean("checkCommitted"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 19: - flags = reader.readByte("flags"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 20: + case 21: byte isolationOrd; isolationOrd = reader.readByte("isolation"); @@ -502,16 +418,8 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 21: - miniId = reader.readIgniteUuid("miniId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - case 22: - nearNodeId = reader.readUuid("nearNodeId"); + miniId = reader.readInt("miniId"); if (!reader.isLastRead()) return false; @@ -519,7 +427,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 23: - partUpdateCnt = reader.readMessage("partUpdateCnt"); + nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) return false; @@ -527,7 +435,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 24: - pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG); + partUpdateCnt = reader.readMessage("partUpdateCnt"); if (!reader.isLastRead()) return false; @@ -535,7 +443,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 25: - subjId = reader.readUuid("subjId"); + pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -543,30 +451,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); case 26: - sysInvalidate = reader.readBoolean("sysInvalidate"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 27: - taskNameHash = reader.readInt("taskNameHash"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 28: - topVer = reader.readMessage("topVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 29: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -586,6 +470,11 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 30; + return 27; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtTxFinishRequest.class, this, super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java index 8fb1f4e..bc9503f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java @@ -39,8 +39,11 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { /** */ private static final long serialVersionUID = 0L; + /** Flag indicating if this is a check-committed response. */ + private static final int CHECK_COMMITTED_FLAG_MASK = 0x01; + /** Mini future ID. */ - private IgniteUuid miniId; + private int miniId; /** Error. */ @GridDirectTransient @@ -49,9 +52,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { /** Serialized error. */ private byte[] checkCommittedErrBytes; - /** Flag indicating if this is a check-committed response. */ - private boolean checkCommitted; - /** Cache return value. */ private GridCacheReturn retVal; @@ -63,14 +63,15 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { } /** + * @param part Partition. * @param xid Xid version. * @param futId Future ID. * @param miniId Mini future ID. */ - public GridDhtTxFinishResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId) { - super(xid, futId); + public GridDhtTxFinishResponse(int part, GridCacheVersion xid, IgniteUuid futId, int miniId) { + super(part, xid, futId); - assert miniId != null; + assert miniId != 0; this.miniId = miniId; } @@ -78,7 +79,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { /** * @return Mini future ID. */ - public IgniteUuid miniId() { + public int miniId() { return miniId; } @@ -100,14 +101,14 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { * @return Check committed flag. */ public boolean checkCommitted() { - return checkCommitted; + return isFlag(CHECK_COMMITTED_FLAG_MASK); } /** * @param checkCommitted Check committed flag. */ public void checkCommitted(boolean checkCommitted) { - this.checkCommitted = checkCommitted; + setFlag(checkCommitted, CHECK_COMMITTED_FLAG_MASK); } /** {@inheritDoc} */ @@ -158,11 +159,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtTxFinishResponse.class, this, super.toString()); - } - - /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -177,25 +173,19 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { } switch (writer.state()) { - case 5: - if (!writer.writeBoolean("checkCommitted", checkCommitted)) - return false; - - writer.incrementState(); - - case 6: + case 7: if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes)) return false; writer.incrementState(); - case 7: - if (!writer.writeIgniteUuid("miniId", miniId)) + case 8: + if (!writer.writeInt("miniId", miniId)) return false; writer.incrementState(); - case 8: + case 9: if (!writer.writeMessage("retVal", retVal)) return false; @@ -217,15 +207,7 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { return false; switch (reader.state()) { - case 5: - checkCommitted = reader.readBoolean("checkCommitted"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: + case 7: checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes"); if (!reader.isLastRead()) @@ -233,15 +215,15 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { reader.incrementState(); - case 7: - miniId = reader.readIgniteUuid("miniId"); + case 8: + miniId = reader.readInt("miniId"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 8: + case 9: retVal = reader.readMessage("retVal"); if (!reader.isLastRead()) @@ -261,6 +243,18 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 9; + return 10; + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder flags = new StringBuilder(); + + if (checkCommitted()) + appendFlag(flags, "checkComm"); + + return S.toString(GridDhtTxFinishResponse.class, this, + "flags", flags.toString(), + "super", super.toString()); } }
