http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 7199ede..bff69bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -72,13 +72,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa private IgniteUuid nearFutId; /** Near future ID. */ - private IgniteUuid nearMiniId; + private int nearMiniId; /** Near future ID. */ private IgniteUuid nearFinFutId; /** Near future ID. */ - private IgniteUuid nearFinMiniId; + private int nearFinMiniId; /** Near XID. */ private GridCacheVersion nearXidVer; @@ -121,7 +121,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa UUID nearNodeId, GridCacheVersion nearXidVer, IgniteUuid nearFutId, - IgniteUuid nearMiniId, + int nearMiniId, long nearThreadId, boolean implicit, boolean implicitSingle, @@ -159,7 +159,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa assert nearNodeId != null; assert nearFutId != null; - assert nearMiniId != null; + assert nearMiniId != 0; assert nearXidVer != null; this.nearNodeId = nearNodeId; @@ -255,16 +255,9 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** - * @return Near future mini ID. - */ - public IgniteUuid nearFinishMiniId() { - return nearFinMiniId; - } - - /** * @param nearFinMiniId Near future mini ID. */ - public void nearFinishMiniId(IgniteUuid nearFinMiniId) { + public void nearFinishMiniId(int nearFinMiniId) { this.nearFinMiniId = nearFinMiniId; } @@ -394,7 +387,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa @Nullable Collection<IgniteTxEntry> writes, Map<IgniteTxKey, GridCacheVersion> verMap, long msgId, - IgniteUuid nearMiniId, + int nearMiniId, Map<UUID, Collection<UUID>> txNodes, boolean last ) { @@ -417,7 +410,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa needReturnValue()))) { GridDhtTxPrepareFuture f = prepFut; - assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + + assert f.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " + "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; if (timeout == -1) @@ -427,7 +420,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } } else { - assert fut.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + + assert fut.nearMiniId() == nearMiniId : "Wrong near mini id on existing future " + "[futMiniId=" + fut.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + fut + ']'; // Prepare was called explicitly. @@ -626,7 +619,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa "Invalid state [nearFinFutId=" + nearFinFutId + ", isInvalidate=" + isInvalidate() + ", commit=" + commit + ", sysInvalidate=" + isSystemInvalidate() + ", state=" + state() + ']'; - assert nearMiniId != null; + assert nearMiniId != 0; return super.finish(commit); } @@ -641,8 +634,13 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa return; } - GridNearTxFinishResponse res = new GridNearTxFinishResponse(nearXidVer, threadId, nearFinFutId, - nearFinMiniId, err); + GridNearTxFinishResponse res = new GridNearTxFinishResponse( + -1, + nearXidVer, + threadId, + nearFinFutId, + nearFinMiniId, + err); try { cctx.io().send(nearNodeId, res, ioPolicy());
http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 1227ba9..56884ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -177,7 +177,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter private boolean trackable = true; /** Near mini future id. */ - private IgniteUuid nearMiniId; + private int nearMiniId; /** DHT versions map. */ private Map<IgniteTxKey, GridCacheVersion> dhtVerMap; @@ -223,7 +223,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter GridCacheSharedContext cctx, final GridDhtTxLocalAdapter tx, long timeout, - IgniteUuid nearMiniId, + int nearMiniId, Map<IgniteTxKey, GridCacheVersion> dhtVerMap, boolean last, boolean retVal @@ -263,7 +263,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** * @return Near mini future id. */ - public IgniteUuid nearMiniId() { + int nearMiniId() { return nearMiniId; } @@ -562,7 +562,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @return Mini future. */ @SuppressWarnings("ForLoopReplaceableByForEach") - private MiniFuture miniFuture(IgniteUuid miniId) { + private MiniFuture miniFuture(int miniId) { // We iterate directly over the futs collection here to avoid copy. synchronized (sync) { int size = futuresCountNoLock(); @@ -576,7 +576,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter MiniFuture mini = (MiniFuture)fut; - if (mini.futureId().equals(miniId)) { + if (mini.futureId() == miniId) { if (!mini.isDone()) return mini; else @@ -856,9 +856,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter assert F.isEmpty(tx.invalidPartitions()); GridNearTxPrepareResponse res = new GridNearTxPrepareResponse( + -1, tx.nearXidVersion(), tx.colocated() ? tx.xid() : tx.nearFutureId(), - nearMiniId == null ? tx.xid() : nearMiniId, + nearMiniId, tx.xidVersion(), tx.writeVersion(), ret, @@ -1233,6 +1234,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter return; if (last) { + int miniId = 0; + assert tx.transactionNodes() != null; final long timeout = timeoutObj != null ? timeoutObj.timeout : 0; @@ -1241,7 +1244,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) { assert !dhtMapping.empty(); - ClusterNode n = dhtMapping.node(); + ClusterNode n = dhtMapping.primary(); assert !n.isLocal(); @@ -1257,7 +1260,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (tx.remainingTime() == -1) return; - MiniFuture fut = new MiniFuture(n.id(), dhtMapping, nearMapping); + MiniFuture fut = new MiniFuture(n.id(), ++miniId, dhtMapping, nearMapping); add(fut); // Append new future. @@ -1367,11 +1370,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { - if (!tx.dhtMap().containsKey(nearMapping.node().id())) { + if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { if (tx.remainingTime() == -1) return; - MiniFuture fut = new MiniFuture(nearMapping.node().id(), null, nearMapping); + MiniFuture fut = new MiniFuture(nearMapping.primary().id(), ++miniId, null, nearMapping); add(fut); // Append new future. @@ -1417,12 +1420,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter assert req.transactionNodes() != null; try { - cctx.io().send(nearMapping.node(), req, tx.ioPolicy()); + cctx.io().send(nearMapping.primary(), req, tx.ioPolicy()); if (msgLog.isDebugEnabled()) { msgLog.debug("DHT prepare fut, sent request near [txId=" + tx.nearXidVersion() + ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.node().id() + ']'); + ", node=" + nearMapping.primary().id() + ']'); } } catch (ClusterTopologyCheckedException ignored) { @@ -1433,7 +1436,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (msgLog.isDebugEnabled()) { msgLog.debug("DHT prepare fut, failed to send request near [txId=" + tx.nearXidVersion() + ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.node().id() + ']'); + ", node=" + nearMapping.primary().id() + ']'); } fut.onResult(e); @@ -1442,7 +1445,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (msgLog.isDebugEnabled()) { msgLog.debug("DHT prepare fut, failed to send request near, ignore [txId=" + tx.nearXidVersion() + ", dhtTxId=" + tx.xidVersion() + - ", node=" + nearMapping.node().id() + + ", node=" + nearMapping.primary().id() + ", err=" + e + ']'); } } @@ -1479,27 +1482,37 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter while (true) { try { - Collection<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion()); + List<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion()); + + assert dhtNodes.size() > 0 && dhtNodes.get(0).id().equals(cctx.localNodeId()) : dhtNodes; if (log.isDebugEnabled()) log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) + ", entry=" + entry + ']'); - // Exclude local node. - map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap); + for (int i = 1; i < dhtNodes.size(); i++) { + ClusterNode node = dhtNodes.get(i); + + addMapping(entry, node, dhtMap); + } Collection<UUID> readers = cached.readers(); if (!F.isEmpty(readers)) { - Collection<ClusterNode> nearNodes = - cctx.discovery().nodes(readers, F0.not(F.idForNodeId(tx.nearNodeId()))); + for (UUID readerId : readers) { + if (readerId.equals(tx.nearNodeId())) + continue; - if (log.isDebugEnabled()) - log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) + - ", entry=" + entry + ']'); + ClusterNode readerNode = cctx.discovery().node(readerId); + + if (readerNode == null || dhtNodes.contains(readerNode)) + continue; + + if (log.isDebugEnabled()) + log.debug("Mapping entry to near node [node=" + readerNode + ", entry=" + entry + ']'); - // Exclude DHT nodes. - map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap); + addMapping(entry, readerNode, nearMap); + } } else if (log.isDebugEnabled()) log.debug("Entry has no near readers: " + entry); @@ -1516,39 +1529,35 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** * @param entry Entry. - * @param nodes Nodes. + * @param n Node. * @param globalMap Map. */ - private void map( + private void addMapping( IgniteTxEntry entry, - Iterable<ClusterNode> nodes, + ClusterNode n, Map<UUID, GridDistributedTxMapping> globalMap ) { - if (nodes != null) { - for (ClusterNode n : nodes) { - GridDistributedTxMapping global = globalMap.get(n.id()); + GridDistributedTxMapping global = globalMap.get(n.id()); - if (!F.isEmpty(entry.entryProcessors())) { - GridDhtPartitionState state = entry.context().topology().partitionState(n.id(), - entry.cached().partition()); + if (!F.isEmpty(entry.entryProcessors())) { + GridDhtPartitionState state = entry.context().topology().partitionState(n.id(), + entry.cached().partition()); - if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) { - T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue(); + if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.EVICTED) { + T2<GridCacheOperation, CacheObject> procVal = entry.entryProcessorCalculatedValue(); - assert procVal != null : entry; + assert procVal != null : entry; - entry.op(procVal.get1()); - entry.value(procVal.get2(), true, false); - entry.entryProcessors(null); - } - } - - if (global == null) - globalMap.put(n.id(), global = new GridDistributedTxMapping(n)); - - global.add(entry); + entry.op(procVal.get1()); + entry.value(procVal.get2(), true, false); + entry.entryProcessors(null); } } + + if (global == null) + globalMap.put(n.id(), global = new GridDistributedTxMapping(n)); + + global.add(entry); } /** @@ -1602,7 +1611,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter private static final long serialVersionUID = 0L; /** */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); + private final int futId; /** Node ID. */ private UUID nodeId; @@ -1617,17 +1626,20 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** * @param nodeId Node ID. + * @param futId Future ID. * @param dhtMapping Mapping. * @param nearMapping nearMapping. */ MiniFuture( UUID nodeId, + int futId, GridDistributedTxMapping dhtMapping, GridDistributedTxMapping nearMapping ) { - assert dhtMapping == null || nearMapping == null || dhtMapping.node().equals(nearMapping.node()); + assert dhtMapping == null || nearMapping == null || dhtMapping.primary().equals(nearMapping.primary()); this.nodeId = nodeId; + this.futId = futId; this.dhtMapping = dhtMapping; this.nearMapping = nearMapping; } @@ -1635,7 +1647,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** * @return Future ID. */ - IgniteUuid futureId() { + int futureId() { return futId; } @@ -1643,7 +1655,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @return Node ID. */ public ClusterNode node() { - return dhtMapping != null ? dhtMapping.node() : nearMapping.node(); + return dhtMapping != null ? dhtMapping.primary() : nearMapping.primary(); } /** @@ -1689,7 +1701,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter try { GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); - cached.removeReader(nearMapping.node().id(), res.messageId()); + cached.removeReader(nearMapping.primary().id(), res.messageId()); break; } @@ -1709,22 +1721,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } // Process invalid partitions (no need to remap). - // Keep this loop for backward compatibility. - if (!F.isEmpty(res.invalidPartitions())) { - for (Iterator<IgniteTxEntry> it = dhtMapping.entries().iterator(); it.hasNext();) { - IgniteTxEntry entry = it.next(); - - if (res.invalidPartitions().contains(entry.cached().partition())) { - it.remove(); - - if (log.isDebugEnabled()) - log.debug("Removed mapping for entry from dht mapping [key=" + entry.key() + - ", tx=" + tx + ", dhtMapping=" + dhtMapping + ']'); - } - } - } - - // Process invalid partitions (no need to remap). if (!F.isEmpty(res.invalidPartitionsByCacheId())) { Map<Integer, int[]> invalidPartsMap = res.invalidPartitionsByCacheId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index a8f2087..8898803 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -52,9 +52,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** */ private static final long serialVersionUID = 0L; - /** */ - public static final int NEED_RETURN_VALUE_FLAG_MASK = 0x01; - /** Max order. */ private UUID nearNodeId; @@ -62,7 +59,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { private IgniteUuid futId; /** Mini future ID. */ - private IgniteUuid miniId; + private int miniId; /** Topology version. */ private AffinityTopologyVersion topVer; @@ -91,9 +88,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** Near transaction ID. */ private GridCacheVersion nearXidVer; - /** {@code True} if this is last prepare request for node. */ - private boolean last; - /** Subject ID. */ private UUID subjId; @@ -103,9 +97,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** Preload keys. */ private BitSet preloadKeys; - /** */ - private byte flags; - /** * Empty constructor required for {@link Externalizable}. */ @@ -129,7 +120,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { */ public GridDhtTxPrepareRequest( IgniteUuid futId, - IgniteUuid miniId, + int miniId, AffinityTopologyVersion topVer, GridDhtTxLocalAdapter tx, long timeout, @@ -143,17 +134,24 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { int taskNameHash, boolean addDepInfo, boolean retVal) { - super(tx, timeout, null, dhtWrites, txNodes, onePhaseCommit, addDepInfo); + super(tx, + timeout, + null, + dhtWrites, + txNodes, + retVal, + last, + onePhaseCommit, + addDepInfo); assert futId != null; - assert miniId != null; + assert miniId != 0; this.topVer = topVer; this.futId = futId; this.nearWrites = nearWrites; this.miniId = miniId; this.nearXidVer = nearXidVer; - this.last = last; this.subjId = subjId; this.taskNameHash = taskNameHash; @@ -165,30 +163,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { } /** - * @return Flag indicating whether transaction needs return value. - */ - public boolean needReturnValue() { - return (flags & NEED_RETURN_VALUE_FLAG_MASK) != 0; - } - - /** - * @param retVal Need return value. - */ - public void needReturnValue(boolean retVal) { - if (retVal) - flags = (byte)(flags | NEED_RETURN_VALUE_FLAG_MASK); - else - flags &= ~NEED_RETURN_VALUE_FLAG_MASK; - } - - /** - * @return {@code True} if this is last prepare request for node. - */ - public boolean last() { - return last; - } - - /** * @return Near transaction ID. */ public GridCacheVersion nearXidVersion() { @@ -227,7 +201,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param idx Entry index to set invalidation flag. * @param invalidate Invalidation flag value. */ - public void invalidateNearEntry(int idx, boolean invalidate) { + void invalidateNearEntry(int idx, boolean invalidate) { invalidateNearEntries.set(idx, invalidate); } @@ -244,7 +218,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { * * @param idx Key index. */ - public void markKeyForPreload(int idx) { + void markKeyForPreload(int idx) { if (preloadKeys == null) preloadKeys = new BitSet(); @@ -271,7 +245,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** * @return Mini future ID. */ - public IgniteUuid miniId() { + public int miniId() { return miniId; } @@ -374,85 +348,73 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { } switch (writer.state()) { - case 23: - if (!writer.writeByte("flags", flags)) - return false; - - writer.incrementState(); - - case 24: + case 20: if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); - case 25: + case 21: if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries)) return false; writer.incrementState(); - case 26: - if (!writer.writeBoolean("last", last)) - return false; - - writer.incrementState(); - - case 27: - if (!writer.writeIgniteUuid("miniId", miniId)) + case 22: + if (!writer.writeInt("miniId", miniId)) return false; writer.incrementState(); - case 28: + case 23: if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); - case 29: + case 24: if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 30: + case 25: if (!writer.writeMessage("nearXidVer", nearXidVer)) return false; writer.incrementState(); - case 31: + case 26: if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 32: + case 27: if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 33: + case 28: if (!writer.writeBitSet("preloadKeys", preloadKeys)) return false; writer.incrementState(); - case 34: + case 29: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 35: + case 30: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 36: + case 31: if (!writer.writeMessage("topVer", topVer)) return false; @@ -474,15 +436,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { return false; switch (reader.state()) { - case 23: - flags = reader.readByte("flags"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 24: + case 20: futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) @@ -490,7 +444,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 25: + case 21: invalidateNearEntries = reader.readBitSet("invalidateNearEntries"); if (!reader.isLastRead()) @@ -498,23 +452,15 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 26: - last = reader.readBoolean("last"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 27: - miniId = reader.readIgniteUuid("miniId"); + case 22: + miniId = reader.readInt("miniId"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 28: + case 23: nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) @@ -522,7 +468,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 29: + case 24: nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -530,7 +476,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 30: + case 25: nearXidVer = reader.readMessage("nearXidVer"); if (!reader.isLastRead()) @@ -538,7 +484,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 31: + case 26: ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -546,7 +492,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 32: + case 27: ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -554,7 +500,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 33: + case 28: preloadKeys = reader.readBitSet("preloadKeys"); if (!reader.isLastRead()) @@ -562,7 +508,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 34: + case 29: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -570,7 +516,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 35: + case 30: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -578,7 +524,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 36: + case 31: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -598,6 +544,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 37; + return 32; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java index 2eba9f1..fba68ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareResponse.java @@ -58,16 +58,11 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { private IgniteUuid futId; /** Mini future ID. */ - private IgniteUuid miniId; - - /** Invalid partitions. */ - @GridToStringInclude - @GridDirectCollection(int.class) - private Collection<Integer> invalidParts; + private int miniId; /** Invalid partitions by cache ID. */ @GridDirectMap(keyType = Integer.class, valueType = int[].class) - private Map<Integer, int[]> invalidPartsByCacheId; + private Map<Integer, int[]> invalidParts; /** Preload entries. */ @GridDirectCollection(GridCacheEntryInfo.class) @@ -81,34 +76,46 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { } /** + * @param part Partition. * @param xid Xid version. * @param futId Future ID. * @param miniId Mini future ID. * @param addDepInfo Deployment info flag. */ - public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, boolean addDepInfo) { - super(xid, addDepInfo); + public GridDhtTxPrepareResponse( + int part, + GridCacheVersion xid, + IgniteUuid futId, + int miniId, + boolean addDepInfo) { + super(part, xid, addDepInfo); assert futId != null; - assert miniId != null; + assert miniId != 0; this.futId = futId; this.miniId = miniId; } /** + * @param part Partition. * @param xid Xid version. * @param futId Future ID. * @param miniId Mini future ID. * @param err Error. * @param addDepInfo Deployment enabled. */ - public GridDhtTxPrepareResponse(GridCacheVersion xid, IgniteUuid futId, IgniteUuid miniId, Throwable err, + public GridDhtTxPrepareResponse( + int part, + GridCacheVersion xid, + IgniteUuid futId, + int miniId, + Throwable err, boolean addDepInfo) { - super(xid, err, addDepInfo); + super(part, xid, err, addDepInfo); assert futId != null; - assert miniId != null; + assert miniId != 0; this.futId = futId; this.miniId = miniId; @@ -117,7 +124,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { /** * @return Evicted readers. */ - public Collection<IgniteTxKey> nearEvicted() { + Collection<IgniteTxKey> nearEvicted() { return nearEvicted; } @@ -138,36 +145,22 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { /** * @return Mini future ID. */ - public IgniteUuid miniId() { + public int miniId() { return miniId; } /** - * @return Invalid partitions. - */ - public Collection<Integer> invalidPartitions() { - return invalidParts; - } - - /** - * @param invalidParts Invalid partitions. - */ - public void invalidPartitions(Collection<Integer> invalidParts) { - this.invalidParts = invalidParts; - } - - /** * @return Map from cacheId to an array of invalid partitions. */ - public Map<Integer, int[]> invalidPartitionsByCacheId() { - return invalidPartsByCacheId; + Map<Integer, int[]> invalidPartitionsByCacheId() { + return invalidParts; } /** * @param invalidPartsByCacheId Map from cache ID to an array of invalid partitions. */ public void invalidPartitionsByCacheId(Map<Integer, Set<Integer>> invalidPartsByCacheId) { - this.invalidPartsByCacheId = CU.convertInvalidPartitions(invalidPartsByCacheId); + this.invalidParts = CU.convertInvalidPartitions(invalidPartsByCacheId); } /** @@ -175,7 +168,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { * * @return Collection of entry infos need to be preloaded. */ - public Collection<GridCacheEntryInfo> preloadEntries() { + Collection<GridCacheEntryInfo> preloadEntries() { return preloadEntries == null ? Collections.<GridCacheEntryInfo>emptyList() : preloadEntries; } @@ -193,8 +186,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { preloadEntries.add(info); } - /** {@inheritDoc} - * @param ctx*/ + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); @@ -237,11 +229,6 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { } /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtTxPrepareResponse.class, this, "super", super.toString()); - } - - /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -256,37 +243,31 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { } switch (writer.state()) { - case 8: + case 10: if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); - case 9: - if (!writer.writeCollection("invalidParts", invalidParts, MessageCollectionItemType.INT)) - return false; - - writer.incrementState(); - - case 10: - if (!writer.writeMap("invalidPartsByCacheId", invalidPartsByCacheId, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR)) + case 11: + if (!writer.writeMap("invalidParts", invalidParts, MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR)) return false; writer.incrementState(); - case 11: - if (!writer.writeIgniteUuid("miniId", miniId)) + case 12: + if (!writer.writeInt("miniId", miniId)) return false; writer.incrementState(); - case 12: + case 13: if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 13: + case 14: if (!writer.writeCollection("preloadEntries", preloadEntries, MessageCollectionItemType.MSG)) return false; @@ -308,7 +289,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { return false; switch (reader.state()) { - case 8: + case 10: futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) @@ -316,31 +297,23 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { reader.incrementState(); - case 9: - invalidParts = reader.readCollection("invalidParts", MessageCollectionItemType.INT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: - invalidPartsByCacheId = reader.readMap("invalidPartsByCacheId", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false); + case 11: + invalidParts = reader.readMap("invalidParts", MessageCollectionItemType.INT, MessageCollectionItemType.INT_ARR, false); if (!reader.isLastRead()) return false; reader.incrementState(); - case 11: - miniId = reader.readIgniteUuid("miniId"); + case 12: + miniId = reader.readInt("miniId"); if (!reader.isLastRead()) return false; reader.incrementState(); - case 12: + case 13: nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -348,7 +321,7 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { reader.incrementState(); - case 13: + case 14: preloadEntries = reader.readCollection("preloadEntries", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -368,6 +341,12 @@ public class GridDhtTxPrepareResponse extends GridDistributedTxPrepareResponse { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 14; + return 15; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtTxPrepareResponse.class, this, + "super", super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java index 3737295..752df54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtUnlockRequest.java @@ -119,11 +119,6 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest { writer.incrementState(); - case 9: - if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) - return false; - - writer.incrementState(); } return true; @@ -148,14 +143,6 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest { reader.incrementState(); - case 9: - partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } return reader.afterMessageRead(GridDhtUnlockRequest.class); @@ -168,6 +155,6 @@ public class GridDhtUnlockRequest extends GridDistributedUnlockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 10; + return 9; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 79ca108..0ce380d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -160,6 +160,9 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** Keep binary. */ private final boolean keepBinary; + /** */ + private int miniId; + /** * @param cctx Registry. * @param keys Keys to lock. @@ -485,7 +488,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture * @return Mini future. */ @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"}) - private MiniFuture miniFuture(IgniteUuid miniId) { + private MiniFuture miniFuture(int miniId) { // We iterate directly over the futs collection here to avoid copy. synchronized (sync) { int size = futuresCountNoLock(); @@ -499,7 +502,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture MiniFuture mini = (MiniFuture)fut; - if (mini.futureId().equals(miniId)) { + if (mini.futureId() == miniId) { if (!mini.isDone()) return mini; else @@ -1049,7 +1052,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture if (node.isLocal()) lockLocally(mappedKeys, req.topologyVersion()); else { - final MiniFuture fut = new MiniFuture(node, mappedKeys); + final MiniFuture fut = new MiniFuture(node, mappedKeys, ++miniId); req.miniId(fut.futureId()); @@ -1393,7 +1396,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture private static final long serialVersionUID = 0L; /** */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); + private final int futId; /** Node ID. */ @GridToStringExclude @@ -1409,19 +1412,22 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** * @param node Node. * @param keys Keys. + * @param futId Mini future ID. */ MiniFuture( ClusterNode node, - Collection<KeyCacheObject> keys + Collection<KeyCacheObject> keys, + int futId ) { this.node = node; this.keys = keys; + this.futId = futId; } /** * @return Future ID. */ - IgniteUuid futureId() { + int futureId() { return futId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index 7ca2635..79c71b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@ -73,10 +73,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep @GridDirectCollection(KeyCacheObject.class) private List<KeyCacheObject> keys; - /** Partition IDs. */ - @GridDirectCollection(int.class) - private List<Integer> partIds; - /** */ @GridDirectCollection(boolean.class) private Collection<Boolean> flags; @@ -154,12 +150,10 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep this.keys = new ArrayList<>(keys.size()); flags = new ArrayList<>(keys.size()); - partIds = new ArrayList<>(keys.size()); for (Map.Entry<KeyCacheObject, Boolean> entry : keys.entrySet()) { this.keys.add(entry.getKey()); flags.add(entry.getValue()); - partIds.add(entry.getKey().partition()); } this.readThrough = readThrough; @@ -259,7 +253,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep /** {@inheritDoc} */ @Override public int partition() { - return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + return keys != null && !keys.isEmpty() ? keys.get(0).partition() : -1; } /** @@ -302,13 +296,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep while (keysIt.hasNext()) keyMap.put(keysIt.next(), flagsIt.next()); } - - if (partIds != null && !partIds.isEmpty()) { - assert partIds.size() == keys.size(); - - for (int i = 0; i < keys.size(); i++) - keys.get(i).partition(partIds.get(i)); - } } /** {@inheritDoc} */ @@ -368,48 +355,42 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep writer.incrementState(); case 9: - if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) - return false; - - writer.incrementState(); - - case 10: if (!writer.writeBoolean("readThrough", readThrough)) return false; writer.incrementState(); - case 11: + case 10: if (!writer.writeBoolean("reload", reload)) return false; writer.incrementState(); - case 12: + case 11: if (!writer.writeBoolean("skipVals", skipVals)) return false; writer.incrementState(); - case 13: + case 12: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 14: + case 13: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 15: + case 14: if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); - case 16: + case 15: if (!writer.writeMessage("ver", ver)) return false; @@ -480,14 +461,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); case 9: - partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: readThrough = reader.readBoolean("readThrough"); if (!reader.isLastRead()) @@ -495,7 +468,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); - case 11: + case 10: reload = reader.readBoolean("reload"); if (!reader.isLastRead()) @@ -503,7 +476,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); - case 12: + case 11: skipVals = reader.readBoolean("skipVals"); if (!reader.isLastRead()) @@ -511,7 +484,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); - case 13: + case 12: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -519,7 +492,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); - case 14: + case 13: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -527,7 +500,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); - case 15: + case 14: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -535,7 +508,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep reader.incrementState(); - case 16: + case 15: ver = reader.readMessage("ver"); if (!reader.isLastRead()) @@ -555,7 +528,7 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 17; + return 16; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index d3e3a15..ffc84d8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -164,6 +164,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean /** Keep binary context flag. */ private final boolean keepBinary; + /** */ + private int miniId; + /** * @param cctx Registry. * @param keys Keys to lock. @@ -532,7 +535,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean * @return Mini future. */ @SuppressWarnings({"ForLoopReplaceableByForEach", "IfMayBeConditional"}) - private MiniFuture miniFuture(IgniteUuid miniId) { + private MiniFuture miniFuture(int miniId) { // We iterate directly over the futs collection here to avoid copy. synchronized (sync) { int size = futuresCountNoLock(); @@ -546,7 +549,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean MiniFuture mini = (MiniFuture)fut; - if (mini.futureId().equals(miniId)) { + if (mini.futureId() == miniId) { if (!mini.isDone()) return mini; else @@ -1178,7 +1181,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean req.filter(filter, cctx); if (node.isLocal()) { - req.miniId(IgniteUuid.randomUuid()); + req.miniId(-1); if (log.isDebugEnabled()) log.debug("Before locally locking near request: " + req); @@ -1316,7 +1319,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean fut)); } else { - final MiniFuture fut = new MiniFuture(node, mappedKeys); + final MiniFuture fut = new MiniFuture(node, mappedKeys, ++miniId); req.miniId(fut.futureId()); @@ -1489,7 +1492,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean private static final long serialVersionUID = 0L; /** */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); + private final int futId; /** Node ID. */ @GridToStringExclude @@ -1505,19 +1508,22 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean /** * @param node Node. * @param keys Keys. + * @param futId Mini future ID. */ MiniFuture( ClusterNode node, - Collection<KeyCacheObject> keys + Collection<KeyCacheObject> keys, + int futId ) { this.node = node; this.keys = keys; + this.futId = futId; } /** * @return Future ID. */ - IgniteUuid futureId() { + int futureId() { return futId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index 9e12153..229961e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -50,7 +50,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { private AffinityTopologyVersion topVer; /** Mini future ID. */ - private IgniteUuid miniId; + private int miniId; /** Filter. */ private CacheEntryPredicate[] filter; @@ -256,14 +256,14 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** * @return Mini future ID. */ - public IgniteUuid miniId() { + public int miniId() { return miniId; } /** * @param miniId Mini future Id. */ - public void miniId(IgniteUuid miniId) { + public void miniId(int miniId) { this.miniId = miniId; } @@ -423,7 +423,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { writer.incrementState(); case 28: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeInt("miniId", miniId)) return false; writer.incrementState(); @@ -464,12 +464,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest { writer.incrementState(); - case 35: - if (!writer.writeCollection("partIds", partIds, MessageCollectionItemType.INT)) - return false; - - writer.incrementState(); - } return true; @@ -551,7 +545,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); case 28: - miniId = reader.readIgniteUuid("miniId"); + miniId = reader.readInt("miniId"); if (!reader.isLastRead()) return false; @@ -606,14 +600,6 @@ public class GridNearLockRequest extends GridDistributedLockRequest { reader.incrementState(); - case 35: - partIds = reader.readCollection("partIds", MessageCollectionItemType.INT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } return reader.afterMessageRead(GridNearLockRequest.class); @@ -626,7 +612,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 36; + return 35; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java index e48a098..b10591d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockResponse.java @@ -47,7 +47,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { private Collection<GridCacheVersion> pending; /** */ - private IgniteUuid miniId; + private int miniId; /** DHT versions. */ @GridToStringInclude @@ -85,7 +85,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { int cacheId, GridCacheVersion lockVer, IgniteUuid futId, - IgniteUuid miniId, + int miniId, boolean filterRes, int cnt, Throwable err, @@ -94,7 +94,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { ) { super(cacheId, lockVer, futId, cnt, err, addDepInfo); - assert miniId != null; + assert miniId != 0; this.miniId = miniId; this.clientRemapVer = clientRemapVer; @@ -134,7 +134,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { /** * @return Mini future ID. */ - public IgniteUuid miniId() { + public int miniId() { return miniId; } @@ -233,7 +233,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { writer.incrementState(); case 14: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeInt("miniId", miniId)) return false; writer.incrementState(); @@ -293,7 +293,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse { reader.incrementState(); case 14: - miniId = reader.readIgniteUuid("miniId"); + miniId = reader.readInt("miniId"); if (!reader.isLastRead()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index dbc8096..80508dc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -56,9 +56,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteReducer; -import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; @@ -70,9 +68,6 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; */ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter { /** */ - public static final IgniteProductVersion SER_TX_SINCE = IgniteProductVersion.fromString("1.5.0"); - - /** */ @GridToStringExclude private KeyLockFuture keyLockFut; @@ -80,6 +75,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim @GridToStringExclude private ClientRemapFuture remapFut; + /** */ + private int miniId; + /** * @param cctx Context. * @param tx Transaction. @@ -153,7 +151,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim if (isMini(fut)) { MiniFuture f = (MiniFuture) fut; - if (f.node().id().equals(nodeId)) { + if (f.primary().id().equals(nodeId)) { ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Remote node left grid: " + nodeId); @@ -186,7 +184,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim if (e instanceof IgniteTxOptimisticCheckedException || e instanceof IgniteTxTimeoutCheckedException) { if (m != null) - tx.removeMapping(m.node().id()); + tx.removeMapping(m.primary().id()); } ERR_UPD.compareAndSet(this, null, e); @@ -227,7 +225,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @return Mini future. */ @SuppressWarnings("ForLoopReplaceableByForEach") - private MiniFuture miniFuture(IgniteUuid miniId) { + private MiniFuture miniFuture(int miniId) { // We iterate directly over the futs collection here to avoid copy. synchronized (sync) { int size = futuresCountNoLock(); @@ -241,7 +239,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim MiniFuture mini = (MiniFuture)fut; - if (mini.futureId().equals(miniId)) { + if (mini.futureId() == miniId) { if (!mini.isDone()) return mini; else @@ -339,15 +337,15 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim assert topVer.topologyVersion() > 0; - txMapping = new GridDhtTxMapping(); + GridDhtTxMapping txMapping = new GridDhtTxMapping(); Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>(); for (IgniteTxEntry write : writes) - map(write, topVer, mappings, remap, topLocked); + map(write, topVer, mappings, txMapping, remap, topLocked); for (IgniteTxEntry read : reads) - map(read, topVer, mappings, remap, topLocked); + map(read, topVer, mappings, txMapping, remap, topLocked); if (keyLockFut != null) keyLockFut.onAllKeysAdded(); @@ -365,12 +363,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim tx.transactionNodes(txMapping.transactionNodes()); - checkOnePhase(); + checkOnePhase(txMapping); for (GridDistributedTxMapping m : mappings.values()) { assert !m.empty(); - add(new MiniFuture(this, m)); + add(new MiniFuture(this, m, ++miniId)); } Collection<IgniteInternalFuture<?>> futs = (Collection)futures(); @@ -385,7 +383,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim MiniFuture fut = (MiniFuture)fut0; - IgniteCheckedException err = prepare(fut); + IgniteCheckedException err = prepare(fut, txMapping); if (err != null) { while (it.hasNext()) { @@ -396,7 +394,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim fut = (MiniFuture)fut0; - tx.removeMapping(fut.mapping().node().id()); + tx.removeMapping(fut.mapping().primary().id()); fut.onResult(new IgniteCheckedException("Failed to prepare transaction.", err)); } @@ -421,10 +419,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @param fut Mini future. * @return Prepare error if any. */ - @Nullable private IgniteCheckedException prepare(final MiniFuture fut) { + @Nullable private IgniteCheckedException prepare(final MiniFuture fut, GridDhtTxMapping txMapping) { GridDistributedTxMapping m = fut.mapping(); - final ClusterNode n = m.node(); + final ClusterNode primary = m.primary(); long timeout = tx.remainingTime(); @@ -477,8 +475,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim req.miniId(fut.futureId()); // If this is the primary node for the keys. - if (n.isLocal()) { - IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); + if (primary.isLocal()) { + IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(primary.id(), + tx, + req); prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { @@ -493,7 +493,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } else { try { - cctx.io().send(n, req, tx.ioPolicy()); + cctx.io().send(primary, req, tx.ioPolicy()); } catch (ClusterTopologyCheckedException e) { e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); @@ -523,6 +523,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim IgniteTxEntry entry, AffinityTopologyVersion topVer, Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping, + GridDhtTxMapping txMapping, boolean remap, boolean topLocked ) { @@ -544,13 +545,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim ", primary=" + U.toShortString(primary) + ", topVer=" + topVer + ']'); } - if (primary.version().compareTo(SER_TX_SINCE) < 0) { - onDone(new IgniteCheckedException("Optimistic serializable transactions can be used only with node " + - "version starting from " + SER_TX_SINCE)); - - return; - } - // Must re-initialize cached entry while holding topology lock. if (cacheCtx.isNear()) entry.cached(cacheCtx.nearTx().entryExx(entry.key(), topVer)); @@ -626,8 +620,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @Override public String apply(IgniteInternalFuture<?> f) { - return "[node=" + ((MiniFuture)f).node().id() + - ", loc=" + ((MiniFuture)f).node().isLocal() + + return "[node=" + ((MiniFuture)f).primary().id() + + ", loc=" + ((MiniFuture)f).primary().isLocal() + ", done=" + f.isDone() + "]"; } }, @@ -654,7 +648,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim /** * Constructor. */ - public ClientRemapFuture() { + ClientRemapFuture() { super(new ClientRemapFutureReducer()); } } @@ -697,7 +691,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes"); /** */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); + private final int futId; /** Parent future. */ private final GridNearOptimisticSerializableTxPrepareFuture parent; @@ -713,24 +707,26 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim /** * @param parent Parent future. * @param m Mapping. + * @param futId Mini future ID. */ - MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping m) { + MiniFuture(GridNearOptimisticSerializableTxPrepareFuture parent, GridDistributedTxMapping m, int futId) { this.parent = parent; this.m = m; + this.futId = futId; } /** * @return Future ID. */ - IgniteUuid futureId() { + int futureId() { return futId; } /** - * @return Node ID. + * @return Primary node. */ - public ClusterNode node() { - return m.node(); + public ClusterNode primary() { + return m.primary(); } /** @@ -795,7 +791,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim assert parent.cctx.kernalContext().clientNode(); assert m.clientFirst(); - parent.tx.removeMapping(m.node().id()); + parent.tx.removeMapping(m.primary().id()); ClientRemapFuture remapFut0 = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/901be4f4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index a2cb182..6189b38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -58,7 +58,6 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; -import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionTimeoutException; import org.jetbrains.annotations.Nullable; @@ -75,6 +74,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa @GridToStringExclude private KeyLockFuture keyLockFut; + /** */ + private int miniId; + + /** */ + private GridDhtTxMapping txMapping; + /** * @param cctx Context. * @param tx Transaction. @@ -232,7 +237,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @return Mini future. */ @SuppressWarnings("ForLoopReplaceableByForEach") - private MiniFuture miniFuture(IgniteUuid miniId) { + private MiniFuture miniFuture(int miniId) { // We iterate directly over the futs collection here to avoid copy. synchronized (sync) { int size = futuresCountNoLock(); @@ -246,7 +251,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa MiniFuture mini = (MiniFuture)fut; - if (mini.futureId().equals(miniId)) { + if (mini.futureId() == miniId) { if (!mini.isDone()) return mini; else @@ -352,7 +357,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa GridDistributedTxMapping mapping = map(write, topVer, null, topLocked, remap); - if (mapping.node().isLocal()) { + if (mapping.primary().isLocal()) { if (write.context().isNear()) tx.nearLocallyMapped(true); else if (write.context().isColocated()) @@ -377,7 +382,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa tx.transactionNodes(txMapping.transactionNodes()); - checkOnePhase(); + checkOnePhase(txMapping); proceedPrepare(mapping, null); } @@ -414,12 +419,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa updated.last(true); - GridDistributedTxMapping prev = map.put(updated.node().id(), updated); + GridDistributedTxMapping prev = map.put(updated.primary().id(), updated); if (prev != null) prev.last(false); - if (updated.node().isLocal()) { + if (updated.primary().isLocal()) { if (write.context().isNear()) tx.nearLocallyMapped(true); else if (write.context().isColocated()) @@ -446,7 +451,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa tx.transactionNodes(txMapping.transactionNodes()); - checkOnePhase(); + checkOnePhase(txMapping); proceedPrepare(mappings); } @@ -480,7 +485,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa try { assert !m.empty(); - final ClusterNode n = m.node(); + final ClusterNode n = m.primary(); long timeout = tx.remainingTime(); @@ -521,7 +526,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } } - final MiniFuture fut = new MiniFuture(this, m, mappings); + final MiniFuture fut = new MiniFuture(this, m, ++miniId, mappings); req.miniId(fut.futureId()); @@ -639,7 +644,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } } - if (cur == null || !cur.node().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) { + if (cur == null || !cur.primary().id().equals(primary.id()) || cur.near() != cacheCtx.isNear()) { boolean clientFirst = cur == null && !topLocked && cctx.kernalContext().clientNode(); cur = new GridDistributedTxMapping(primary); @@ -771,7 +776,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa private final GridNearOptimisticTxPrepareFuture parent; /** */ - private final IgniteUuid futId = IgniteUuid.randomUuid(); + private final int futId; /** Keys. */ @GridToStringInclude @@ -787,19 +792,23 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa /** * @param parent Parent. * @param m Mapping. + * @param futId Mini future ID. * @param mappings Queue of mappings to proceed with. */ - MiniFuture(GridNearOptimisticTxPrepareFuture parent, GridDistributedTxMapping m, + MiniFuture(GridNearOptimisticTxPrepareFuture parent, + GridDistributedTxMapping m, + int futId, Queue<GridDistributedTxMapping> mappings) { this.parent = parent; this.m = m; + this.futId = futId; this.mappings = mappings; } /** * @return Future ID. */ - IgniteUuid futureId() { + int futureId() { return futId; } @@ -807,7 +816,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @return Node ID. */ public ClusterNode node() { - return m.node(); + return m.primary(); } /** @@ -840,7 +849,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa void onNodeLeft(ClusterTopologyCheckedException e, boolean discoThread) { if (msgLog.isDebugEnabled()) { msgLog.debug("Near optimistic prepare fut, mini future node left [txId=" + parent.tx.nearXidVersion() + - ", node=" + m.node().id() + ']'); + ", node=" + m.primary().id() + ']'); } if (isDone())
