'Single' operations optimizations for tx cache.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3b28e0b5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3b28e0b5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3b28e0b5 Branch: refs/heads/ignite-single-op-tx Commit: 3b28e0b53ba5ea5f4f2aefd7e28fe819bbababf4 Parents: 29b5b4a Author: sboikov <[email protected]> Authored: Mon Nov 16 12:59:03 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Nov 16 12:59:03 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 11 +- .../distributed/GridDistributedTxMapping.java | 78 --------- .../GridDistributedTxRemoteAdapter.java | 99 ++++-------- .../cache/distributed/dht/GridDhtTxLocal.java | 19 +-- .../distributed/dht/GridDhtTxLocalAdapter.java | 8 +- .../cache/distributed/dht/GridDhtTxMapping.java | 134 ++-------------- .../distributed/dht/GridDhtTxPrepareFuture.java | 16 +- .../cache/distributed/dht/GridDhtTxRemote.java | 18 ++- ...arOptimisticSerializableTxPrepareFuture.java | 1 - .../near/GridNearOptimisticTxPrepareFuture.java | 88 +++++++++-- .../GridNearPessimisticTxPrepareFuture.java | 1 - .../near/GridNearTxFinishFuture.java | 4 +- .../cache/distributed/near/GridNearTxLocal.java | 97 ++++++------ .../near/GridNearTxPrepareRequest.java | 9 -- .../distributed/near/GridNearTxRemote.java | 16 +- .../cache/transactions/IgniteInternalTx.java | 8 - .../cache/transactions/IgniteTxAdapter.java | 7 - .../cache/transactions/IgniteTxHandler.java | 7 +- .../IgniteTxImplicitSingleStateImpl.java | 95 +++++++++++ .../transactions/IgniteTxLocalAdapter.java | 79 +++------- .../cache/transactions/IgniteTxRemoteEx.java | 6 +- .../transactions/IgniteTxRemoteStateImpl.java | 158 +++++++++++++++++++ .../cache/transactions/IgniteTxState.java | 32 +++- .../cache/transactions/IgniteTxStateImpl.java | 104 ++++++++++++ 24 files changed, 639 insertions(+), 456 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index df9f5c4..0786a50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -74,6 +74,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; /** @@ -4014,7 +4015,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme IgniteInternalTx tx = cctx.tm().localTxx(); - return tx == null || !tx.removed(txKey()); + if (tx != null) { + IgniteTxEntry e = tx.entry(txKey()); + + boolean rmvd = e != null && e.op() == DELETE; + + return !rmvd; + } + + return true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index 2d2d935..8c9f181 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -24,9 +24,7 @@ import java.io.ObjectOutput; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashSet; -import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -59,15 +57,9 @@ public class GridDistributedTxMapping implements Externalizable { /** DHT version. */ private GridCacheVersion dhtVer; - /** Copy on remove flag. */ - private boolean readOnly; - /** {@code True} if this is last mapping for node. */ private boolean last; - /** IDs of backup nodes receiving last prepare request during this mapping. */ - private Collection<UUID> lastBackups; - /** {@code True} if mapping is for near caches, {@code false} otherwise. */ private boolean near; @@ -91,20 +83,6 @@ public class GridDistributedTxMapping implements Externalizable { } /** - * @return IDs of backup nodes receiving last prepare request during this mapping. - */ - @Nullable public Collection<UUID> lastBackups() { - return lastBackups; - } - - /** - * @param lastBackups IDs of backup nodes receiving last prepare request during this mapping. - */ - public void lastBackups(@Nullable Collection<UUID> lastBackups) { - this.lastBackups = lastBackups; - } - - /** * @return {@code True} if this is last mapping for node. */ public boolean last() { @@ -161,17 +139,6 @@ public class GridDistributedTxMapping implements Externalizable { } /** - * @param entries Mapped entries. - * @param readOnly Flag indicating that passed in collection is read-only. - */ - public void entries(Collection<IgniteTxEntry> entries, boolean readOnly) { - this.entries = entries; - - // Set copy on remove flag as passed in collection is unmodifiable. - this.readOnly = true; - } - - /** * @return {@code True} if lock is explicit. */ public boolean explicitLock() { @@ -221,8 +188,6 @@ public class GridDistributedTxMapping implements Externalizable { * @param entry Adds entry. */ public void add(IgniteTxEntry entry) { - ensureModifiable(); - entries.add(entry); } @@ -231,48 +196,16 @@ public class GridDistributedTxMapping implements Externalizable { * @return {@code True} if entry was removed. */ public boolean removeEntry(IgniteTxEntry entry) { - ensureModifiable(); - return entries.remove(entry); } /** - * @param parts Evicts partitions from mapping. - */ - public void evictPartitions(@Nullable int[] parts) { - if (!F.isEmpty(parts)) { - ensureModifiable(); - - evictPartitions(parts, entries); - } - } - - /** - * @param parts Partitions. - * @param c Collection. - */ - private void evictPartitions(int[] parts, Collection<IgniteTxEntry> c) { - assert parts != null; - - for (Iterator<IgniteTxEntry> it = c.iterator(); it.hasNext();) { - IgniteTxEntry e = it.next(); - - GridCacheEntryEx cached = e.cached(); - - if (U.containsIntArray(parts, cached.partition())) - it.remove(); - } - } - - /** * @param keys Keys to evict readers for. */ public void evictReaders(@Nullable Collection<IgniteTxKey> keys) { if (keys == null || keys.isEmpty()) return; - ensureModifiable(); - evictReaders(keys, entries); } @@ -293,17 +226,6 @@ public class GridDistributedTxMapping implements Externalizable { } /** - * Copies collection of entries if it is read-only. - */ - private void ensureModifiable() { - if (readOnly) { - entries = new LinkedHashSet<>(entries); - - readOnly = false; - } - } - - /** * Whether empty or not. * * @return Empty or not. http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index bf37b3b..be59dd1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -47,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteEx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteStateImpl; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState; -import org.apache.ignite.internal.processors.cache.transactions.IgniteTxStateImpl; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; @@ -88,14 +87,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** */ private static final long serialVersionUID = 0L; - /** Read set. */ - @GridToStringInclude - protected Map<IgniteTxKey, IgniteTxEntry> readMap; - - /** Write map. */ - @GridToStringInclude - protected Map<IgniteTxKey, IgniteTxEntry> writeMap; - /** Remote thread ID. */ @GridToStringInclude private long rmtThreadId; @@ -114,7 +105,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** */ @GridToStringInclude - protected IgniteTxState txState; + protected IgniteTxRemoteStateImpl txState; /** * Empty constructor required for {@link Externalizable}. @@ -173,8 +164,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter this.rmtThreadId = rmtThreadId; this.invalidate = invalidate; - txState = new IgniteTxRemoteStateImpl(); - commitVersion(commitVer); // Must set started flag after concurrency and isolation. @@ -210,14 +199,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter * @return Checks if transaction has no entries. */ @Override public boolean empty() { - return readMap.isEmpty() && writeMap.isEmpty(); - } - - /** {@inheritDoc} */ - @Override public boolean removed(IgniteTxKey key) { - IgniteTxEntry e = writeMap.get(key); - - return e != null && e.op() == DELETE; + return txState.empty(); } /** {@inheritDoc} */ @@ -227,12 +209,12 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() { - return writeMap; + return txState.writeMap(); } /** {@inheritDoc} */ @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() { - return readMap; + return txState.readMap(); } /** {@inheritDoc} */ @@ -254,12 +236,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public IgniteTxEntry entry(IgniteTxKey key) { - IgniteTxEntry e = writeMap == null ? null : writeMap.get(key); - - if (e == null) - e = readMap == null ? null : readMap.get(key); - - return e; + return txState.entry(key); } /** @@ -268,8 +245,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter * @param key key to be removed. */ public void clearEntry(IgniteTxKey key) { - readMap.remove(key); - writeMap.remove(key); + txState.clearEntry(key); } /** @@ -277,13 +253,19 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. */ - @Override public void doneRemote(GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers, - Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers) { + @Override public void doneRemote(GridCacheVersion baseVer, + Collection<GridCacheVersion> committedVers, + Collection<GridCacheVersion> rolledbackVers, + Collection<GridCacheVersion> pendingVers) { + Map<IgniteTxKey, IgniteTxEntry> readMap = txState.readMap(); + if (readMap != null && !readMap.isEmpty()) { for (IgniteTxEntry txEntry : readMap.values()) doneRemote(txEntry, baseVer, committedVers, rolledbackVers, pendingVers); } + Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap(); + if (writeMap != null && !writeMap.isEmpty()) { for (IgniteTxEntry txEntry : writeMap.values()) doneRemote(txEntry, baseVer, committedVers, rolledbackVers, pendingVers); @@ -299,8 +281,10 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter * @param rolledbackVers Rolled back versions relative to base version. * @param pendingVers Pending versions. */ - private void doneRemote(IgniteTxEntry txEntry, GridCacheVersion baseVer, - Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers, + private void doneRemote(IgniteTxEntry txEntry, + GridCacheVersion baseVer, + Collection<GridCacheVersion> committedVers, + Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers) { while (true) { GridDistributedCacheEntry entry = (GridDistributedCacheEntry)txEntry.cached(); @@ -360,36 +344,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter @Override public boolean setWriteValue(IgniteTxEntry e) { checkInternal(e.txKey()); - IgniteTxEntry entry = writeMap.get(e.txKey()); - - if (entry == null) { - IgniteTxEntry rmv = readMap.remove(e.txKey()); - - if (rmv != null) { - e.cached(rmv.cached()); - - writeMap.put(e.txKey(), e); - } - // If lock is explicit. - else { - e.cached(e.context().cache().entryEx(e.key())); - - // explicit lock. - writeMap.put(e.txKey(), e); - } - } - else { - // Copy values. - entry.value(e.value(), e.hasWriteValue(), e.hasReadValue()); - entry.entryProcessors(e.entryProcessors()); - entry.op(e.op()); - entry.ttl(e.ttl()); - entry.explicitVersion(e.explicitVersion()); - - // Conflict resolution stuff. - entry.conflictVersion(e.conflictVersion()); - entry.conflictExpireTime(e.conflictExpireTime()); - } + txState.setWriteValue(e); addExplicit(e); @@ -398,7 +353,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public boolean hasWriteKey(IgniteTxKey key) { - return writeMap.containsKey(key); + return txState.hasWriteKey(key); } /** {@inheritDoc} */ @@ -409,27 +364,27 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public Set<IgniteTxKey> readSet() { - return readMap.keySet(); + return txState.readSet(); } /** {@inheritDoc} */ @Override public Set<IgniteTxKey> writeSet() { - return writeMap.keySet(); + return txState.writeSet(); } /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> allEntries() { - return F.concat(false, writeEntries(), readEntries()); + return txState.allEntries(); } /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> writeEntries() { - return writeMap.values(); + return txState.writeEntries(); } /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> readEntries() { - return readMap.values(); + return txState.readEntries(); } /** @@ -468,7 +423,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter @SuppressWarnings({"CatchGenericClass"}) private void commitIfLocked() throws IgniteCheckedException { if (state() == COMMITTING) { - for (IgniteTxEntry txEntry : writeMap.values()) { + for (IgniteTxEntry txEntry : writeEntries()) { assert txEntry != null : "Missing transaction entry for tx: " + this; while (true) { @@ -503,6 +458,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter if (commitAllowed.compareAndSet(false, true)) { IgniteCheckedException err = null; + Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap(); + if (!F.isEmpty(writeMap)) { // Register this transaction as completed prior to write-phase to // ensure proper lock ordering for removed entries. http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 44f34aa..46304a1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -307,8 +307,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa 0, nearMiniId, null, - true, - null); + true); } // For pessimistic mode we don't distribute prepare request. @@ -322,8 +321,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa nearMiniId, Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), true, - needReturnValue(), - null))) + needReturnValue()))) return prepFut.get(); } else @@ -378,7 +376,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa * @param nearMiniId Near mini future ID. * @param txNodes Transaction nodes mapping. * @param last {@code True} if this is last prepare request. - * @param lastBackups IDs of backup nodes receiving last prepare request. * @return Future that will be completed when locks are acquired. */ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync( @@ -388,8 +385,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa long msgId, IgniteUuid nearMiniId, Map<UUID, Collection<UUID>> txNodes, - boolean last, - Collection<UUID> lastBackups + boolean last ) { // In optimistic mode prepare still can be called explicitly from salvageTx. GridDhtTxPrepareFuture fut = prepFut.get(); @@ -404,8 +400,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa nearMiniId, verMap, last, - needReturnValue(), - lastBackups))) { + needReturnValue()))) { GridDhtTxPrepareFuture f = prepFut.get(); assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + @@ -443,13 +438,15 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } try { - if (reads != null) + if (reads != null) { for (IgniteTxEntry e : reads) addEntry(msgId, e); + } - if (writes != null) + if (writes != null) { for (IgniteTxEntry e : writes) addEntry(msgId, e); + } userPrepare(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 0869b90..4ee9211 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -435,7 +435,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { if (log.isDebugEnabled()) log.debug("Removing mapping for entry [nodeId=" + nodeId + ", entry=" + entry + ']'); - IgniteTxEntry txEntry = txMap.get(entry.txKey()); + IgniteTxEntry txEntry = entry(entry.txKey()); if (txEntry == null) return false; @@ -469,7 +469,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { List<GridDhtCacheEntry> entries = mapping.getValue(); for (GridDhtCacheEntry entry : entries) { - IgniteTxEntry txEntry = txMap.get(entry.txKey()); + IgniteTxEntry txEntry = entry(entry.txKey()); if (txEntry != null) { if (m == null) @@ -529,7 +529,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); try { - IgniteTxEntry existing = txMap.get(e.txKey()); + IgniteTxEntry existing = entry(e.txKey()); if (existing != null) { // Must keep NOOP operation if received READ because it means that the lock was sent to a backup node. @@ -569,7 +569,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { existing.explicitVersion(dhtVer); } - txMap.put(existing.txKey(), existing); + txState.addEntry(existing); if (log.isDebugEnabled()) log.debug("Added entry to transaction: " + existing); http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java index 55cbe96..9ec35b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java @@ -17,18 +17,15 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.util.GridLeanMap; +import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.util.typedef.internal.U; /** * DHT transaction mapping. @@ -37,12 +34,6 @@ public class GridDhtTxMapping { /** Transaction nodes mapping (primary node -> related backup nodes). */ private final Map<UUID, Collection<UUID>> txNodes = new GridLeanMap<>(); - /** */ - private final List<TxMapping> mappings = new ArrayList<>(); - - /** */ - private TxMapping last; - /** * Adds information about next mapping. * @@ -50,24 +41,26 @@ public class GridDhtTxMapping { */ @SuppressWarnings("ConstantConditions") public void addMapping(List<ClusterNode> nodes) { - ClusterNode primary = F.first(nodes); + assert !F.isEmpty(nodes) : nodes; - Collection<ClusterNode> backups = F.view(nodes, F.notEqualTo(primary)); + ClusterNode primary = nodes.get(0); - if (last == null || !last.primary.equals(primary.id())) { - last = new TxMapping(primary, backups); + int size = nodes.size(); - mappings.add(last); - } - else - last.add(backups); + if (size > 1) { + Collection<UUID> backups = txNodes.get(primary.id()); - Collection<UUID> storedBackups = txNodes.get(last.primary); + if (backups == null) { + backups = U.newHashSet(size - 1); - if (storedBackups == null) - txNodes.put(last.primary, storedBackups = new HashSet<>()); + txNodes.put(primary.id(), backups); + } - storedBackups.addAll(last.backups); + for (int i = 1; i < size; i++) + backups.add(nodes.get(i).id()); + } + else + txNodes.put(primary.id(), new GridLeanSet<UUID>()); } /** @@ -76,99 +69,4 @@ public class GridDhtTxMapping { public Map<UUID, Collection<UUID>> transactionNodes() { return txNodes; } - - /** - * For each mapping sets flags indicating if mapping is last for node. - * - * @param mappings Mappings. - */ - public void initLast(Collection<GridDistributedTxMapping> mappings) { - assert this.mappings.size() == mappings.size(); - - int idx = 0; - - for (GridDistributedTxMapping map : mappings) { - TxMapping mapping = this.mappings.get(idx); - - map.lastBackups(lastBackups(mapping, idx)); - - boolean last = true; - - for (int i = idx + 1; i < this.mappings.size(); i++) { - TxMapping nextMap = this.mappings.get(i); - - if (nextMap.primary.equals(mapping.primary)) { - last = false; - - break; - } - } - - map.last(last); - - idx++; - } - } - - /** - * @param mapping Mapping. - * @param idx Mapping index. - * @return IDs of backup nodes receiving last prepare request during this mapping. - */ - @Nullable private Collection<UUID> lastBackups(TxMapping mapping, int idx) { - Collection<UUID> res = null; - - for (UUID backup : mapping.backups) { - boolean foundNext = false; - - for (int i = idx + 1; i < mappings.size(); i++) { - TxMapping nextMap = mappings.get(i); - - if (nextMap.primary.equals(mapping.primary) && nextMap.backups.contains(backup)) { - foundNext = true; - - break; - } - } - - if (!foundNext) { - if (res == null) - res = new ArrayList<>(mapping.backups.size()); - - res.add(backup); - } - } - - return res; - } - - /** - */ - private static class TxMapping { - /** */ - private final UUID primary; - - /** */ - private final Set<UUID> backups; - - /** - * @param primary Primary node. - * @param backups Backup nodes. - */ - private TxMapping(ClusterNode primary, Iterable<ClusterNode> backups) { - this.primary = primary.id(); - - this.backups = new HashSet<>(); - - add(backups); - } - - /** - * @param backups Backup nodes. - */ - private void add(Iterable<ClusterNode> backups) { - for (ClusterNode n : backups) - this.backups.add(n.id()); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 1d6f633..b6b8c33 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -165,9 +165,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** {@code True} if this is last prepare operation for node. */ private boolean last; - /** IDs of backup nodes receiving last prepare request during this prepare. */ - private Collection<UUID> lastBackups; - /** Needs return value flag. */ private boolean retVal; @@ -197,7 +194,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param dhtVerMap DHT versions map. * @param last {@code True} if this is last prepare operation for node. * @param retVal Return value flag. - * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. */ public GridDhtTxPrepareFuture( GridCacheSharedContext cctx, @@ -205,8 +201,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter IgniteUuid nearMiniId, Map<IgniteTxKey, GridCacheVersion> dhtVerMap, boolean last, - boolean retVal, - Collection<UUID> lastBackups + boolean retVal ) { super(REDUCER); @@ -214,7 +209,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter this.tx = tx; this.dhtVerMap = dhtVerMap; this.last = last; - this.lastBackups = lastBackups; futId = IgniteUuid.randomUuid(); @@ -864,14 +858,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } /** - * @param backupId Backup node ID. - * @return {@code True} if backup node receives last prepare request for this transaction. - */ - private boolean lastBackup(UUID backupId) { - return lastBackups != null && lastBackups.contains(backupId); - } - - /** * Checks if this transaction needs previous value for the given tx entry. Will use passed in map to store * required key or will create new map if passed in map is {@code null}. * http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index e268a88..569356c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteStateImpl; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.typedef.T2; @@ -137,9 +138,9 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { this.nearXidVer = nearXidVer; this.txNodes = txNodes; - readMap = Collections.emptyMap(); - - writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1); + txState = new IgniteTxRemoteStateImpl( + Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(), + new ConcurrentLinkedHashMap<IgniteTxKey, IgniteTxEntry>(U.capacity(txSize), 0.75f, 1)); topologyVersion(topVer); } @@ -207,8 +208,9 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { this.nearNodeId = nearNodeId; this.rmtFutId = rmtFutId; - readMap = Collections.emptyMap(); - writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1); + txState = new IgniteTxRemoteStateImpl( + Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(), + new ConcurrentLinkedHashMap<IgniteTxKey, IgniteTxEntry>(U.capacity(txSize), 0.75f, 1)); topologyVersion(topVer); } @@ -280,6 +282,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { @Override public void addInvalidPartition(GridCacheContext cacheCtx, int part) { super.addInvalidPartition(cacheCtx, part); + Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap(); + for (Iterator<IgniteTxEntry> it = writeMap.values().iterator(); it.hasNext();) { IgniteTxEntry e = it.next(); @@ -312,7 +316,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { // Initialize cache entry. entry.cached(cached); - writeMap.put(entry.txKey(), entry); + txState.addWriteEntry(entry.txKey(), entry); addExplicit(entry); } @@ -356,7 +360,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { txEntry.entryProcessors(entryProcessors); - writeMap.put(key, txEntry); + txState.addWriteEntry(key, txEntry); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 53bd818..29774a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -422,7 +422,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim m.near(), txMapping.transactionNodes(), m.last(), - m.lastBackups(), tx.onePhaseCommit(), tx.needReturnValue() && tx.implicit(), tx.implicitSingle(), http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 8574b28..36ad7d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -19,7 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.ArrayDeque; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -265,7 +267,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa return; } - prepare(tx.writeEntries(), topLocked); + IgniteTxEntry singleWrite = tx.singleWrite(); + + if (singleWrite != null) + prepareSingle(singleWrite, topLocked); + else + prepare(tx.writeEntries(), topLocked); markInitialized(); } @@ -278,6 +285,46 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } /** + * @param write Write. + * @param topLocked {@code True} if thread already acquired lock preventing topology change. + */ + private void prepareSingle(IgniteTxEntry write, boolean topLocked) { + AffinityTopologyVersion topVer = tx.topologyVersion(); + + assert topVer.topologyVersion() > 0; + + txMapping = new GridDhtTxMapping(); + + GridDistributedTxMapping mapping = map(write, topVer, null, topLocked); + + if (mapping.node().isLocal()) { + if (write.context().isNear()) + tx.nearLocallyMapped(true); + else if (write.context().isColocated()) + tx.colocatedLocallyMapped(true); + } + + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Abandoning (re)map because future is done: " + this); + + return; + } + + tx.addEntryMapping(mapping); + + cctx.mvcc().recheckPendingLocks(); + + mapping.last(true); + + tx.transactionNodes(txMapping.transactionNodes()); + + checkOnePhase(); + + proceedPrepare(mapping, null); + } + + /** * @param writes Write entries. * @param topLocked {@code True} if thread already acquired lock preventing topology change. * @throws IgniteCheckedException If failed. @@ -292,17 +339,26 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa txMapping = new GridDhtTxMapping(); - Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>(); + Map<UUID, GridDistributedTxMapping> map = new HashMap<>(); // Assign keys to primary nodes. GridDistributedTxMapping cur = null; + Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>(); + for (IgniteTxEntry write : writes) { GridDistributedTxMapping updated = map(write, topVer, cur, topLocked); if (cur != updated) { mappings.offer(updated); + updated.last(true); + + GridDistributedTxMapping prev = map.put(updated.node().id(), updated); + + if (prev != null) + prev.last(false); + if (updated.node().isLocal()) { if (write.context().isNear()) tx.nearLocallyMapped(true); @@ -325,8 +381,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa cctx.mvcc().recheckPendingLocks(); - txMapping.initLast(mappings); - tx.transactionNodes(txMapping.transactionNodes()); checkOnePhase(); @@ -340,14 +394,24 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param mappings Queue of mappings. */ private void proceedPrepare(final Queue<GridDistributedTxMapping> mappings) { - if (isDone()) - return; - final GridDistributedTxMapping m = mappings.poll(); if (m == null) return; + proceedPrepare(m, mappings); + } + + /** + * Continues prepare after previous mapping successfully finished. + * + * @param m Mapping. + * @param mappings Queue of mappings. + */ + private void proceedPrepare(GridDistributedTxMapping m, @Nullable final Queue<GridDistributedTxMapping> mappings) { + if (isDone()) + return; + assert !m.empty(); final ClusterNode n = m.node(); @@ -361,7 +425,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa m.near(), txMapping.transactionNodes(), m.last(), - m.lastBackups(), tx.onePhaseCommit(), tx.needReturnValue() && tx.implicit(), tx.implicitSingle(), @@ -442,7 +505,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa ) { GridCacheContext cacheCtx = entry.context(); - List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer); + List<ClusterNode> nodes; + + GridCacheEntryEx cached0 = entry.cached(); + + if (cached0.isDht()) + nodes = cacheCtx.affinity().nodes(cached0.partition(), topVer); + else + nodes = cacheCtx.affinity().nodes(entry.key(), topVer); txMapping.addMapping(nodes); http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 11d31b2..1554a62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -200,7 +200,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA m.near(), txMapping.transactionNodes(), true, - txMapping.transactionNodes().get(node.id()), tx.onePhaseCommit(), tx.needReturnValue() && tx.implicit(), tx.implicitSingle(), http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 7b1ff05..4d43e9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -534,7 +534,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu mapping.dhtVersion(xidVer, xidVer); - tx.readyNearLocks(mapping, Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList(), + tx.readyNearLocks(mapping, + Collections.<GridCacheVersion>emptyList(), + Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 472a081..12c9958 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -512,7 +512,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (m == null) mappings.put(node.id(), m = new GridDistributedTxMapping(node)); - IgniteTxEntry txEntry = txMap.get(key); + IgniteTxEntry txEntry = entry(key); assert txEntry != null; @@ -526,26 +526,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { } /** - * Adds keys mapping. - * - * @param n Mapped node. - * @param mappedKeys Mapped keys. + * @return Non {@code null} entry if tx has only one write entry. */ - private void addKeyMapping(ClusterNode n, Iterable<IgniteTxKey> mappedKeys) { - GridDistributedTxMapping m = mappings.get(n.id()); - - if (m == null) - mappings.put(n.id(), m = new GridDistributedTxMapping(n)); - - for (IgniteTxKey key : mappedKeys) { - IgniteTxEntry txEntry = txMap.get(key); - - assert txEntry != null; - - txEntry.nodeId(n.id()); - - m.add(txEntry); - } + @Nullable IgniteTxEntry singleWrite() { + return txState.singleWrite(); } /** @@ -553,30 +537,36 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { */ void addEntryMapping(@Nullable Collection<GridDistributedTxMapping> maps) { if (!F.isEmpty(maps)) { - for (GridDistributedTxMapping map : maps) { - ClusterNode n = map.node(); + for (GridDistributedTxMapping map : maps) + addEntryMapping(map); - GridDistributedTxMapping m = mappings.get(n.id()); - - if (m == null) { - m = F.addIfAbsent(mappings, n.id(), new GridDistributedTxMapping(n)); + if (log.isDebugEnabled()) { + log.debug("Added mappings to transaction [locId=" + cctx.localNodeId() + + ", mappings=" + maps + + ", tx=" + this + ']'); + } + } + } - m.near(map.near()); + /** + * @param map Mapping. + */ + void addEntryMapping(GridDistributedTxMapping map) { + ClusterNode n = map.node(); - if (map.explicitLock()) - m.markExplicitLock(); - } + GridDistributedTxMapping m = mappings.get(n.id()); - assert m != null; + if (m == null) { + m = F.addIfAbsent(mappings, n.id(), new GridDistributedTxMapping(n)); - for (IgniteTxEntry entry : map.entries()) - m.add(entry); - } + m.near(map.near()); - if (log.isDebugEnabled()) - log.debug("Added mappings to transaction [locId=" + cctx.localNodeId() + ", mappings=" + maps + - ", tx=" + this + ']'); + if (map.explicitLock()) + m.markExplicitLock(); } + + for (IgniteTxEntry entry : map.entries()) + m.add(entry); } /** @@ -615,8 +605,23 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) { - Collection<IgniteTxEntry> entries = F.concat(false, mapping.writes(), mapping.reads()); + readyNearLocks(mapping.writes(), mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers); + readyNearLocks(mapping.reads(), mapping.dhtVersion(), pendingVers, committedVers, rolledbackVers); + } + /** + * @param entries Entries. + * @param dhtVer DHT version. + * @param pendingVers Pending versions. + * @param committedVers Committed versions. + * @param rolledbackVers Rolled back versions. + */ + void readyNearLocks(Collection<IgniteTxEntry> entries, + GridCacheVersion dhtVer, + Collection<GridCacheVersion> pendingVers, + Collection<GridCacheVersion> committedVers, + Collection<GridCacheVersion> rolledbackVers) + { for (IgniteTxEntry txEntry : entries) { while (true) { GridCacheContext cacheCtx = txEntry.cached().context(); @@ -629,8 +634,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { // Handle explicit locks. GridCacheVersion explicit = txEntry.explicitVersion(); - if (explicit == null) - entry.readyNearLock(xidVer, mapping.dhtVersion(), committedVers, rolledbackVers, pendingVers); + if (explicit == null) { + entry.readyNearLock(xidVer, + dhtVer, + committedVers, + rolledbackVers, + pendingVers); + } break; } @@ -863,7 +873,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { * @param writes Write entries. * @param txNodes Transaction nodes mapping. * @param last {@code True} if this is last prepare request. - * @param lastBackups IDs of backup nodes receiving last prepare request. * @return Future that will be completed when locks are acquired. */ @SuppressWarnings("TypeMayBeWeakened") @@ -871,8 +880,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { @Nullable Collection<IgniteTxEntry> reads, @Nullable Collection<IgniteTxEntry> writes, Map<UUID, Collection<UUID>> txNodes, - boolean last, - Collection<UUID> lastBackups + boolean last ) { if (state() != PREPARING) { if (timedOut()) @@ -893,8 +901,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { IgniteUuid.randomUuid(), Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), last, - needReturnValue() && implicit(), - lastBackups); + needReturnValue() && implicit()); try { // At this point all the entries passed in must be enlisted in transaction because this is an http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 456d726..798635a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -99,7 +99,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param near {@code True} if mapping is for near caches. * @param txNodes Transaction nodes mapping. * @param last {@code True} if this last prepare request for node. - * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. * @param onePhaseCommit One phase commit flag. * @param retVal Return value flag. * @param implicitSingle Implicit single flag. @@ -118,7 +117,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { boolean near, Map<UUID, Collection<UUID>> txNodes, boolean last, - Collection<UUID> lastBackups, boolean onePhaseCommit, boolean retVal, boolean implicitSingle, @@ -137,7 +135,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { this.topVer = topVer; this.near = near; this.last = last; - this.lastBackups = lastBackups; this.retVal = retVal; this.implicitSingle = implicitSingle; this.explicitLock = explicitLock; @@ -153,12 +150,6 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { return firstClientReq; } - /** - * @return IDs of backup nodes receiving last prepare request during this prepare. - */ - public Collection<UUID> lastBackups() { - return lastBackups; - } /** * @return {@code True} if this last prepare request for node. http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java index 87c68b2..c4b1037 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxRemote.java @@ -34,10 +34,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteStateImpl; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; @@ -127,10 +129,10 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { this.nearNodeId = nearNodeId; - readMap = Collections.emptyMap(); + int writeSize = writeEntries != null ? Math.max(txSize, writeEntries.size()) : txSize; - writeMap = new LinkedHashMap<>( - writeEntries != null ? Math.max(txSize, writeEntries.size()) : txSize, 1.0f); + txState = new IgniteTxRemoteStateImpl(Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(), + U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(writeSize)); if (writeEntries != null) { for (IgniteTxEntry entry : writeEntries) { @@ -198,8 +200,8 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { this.nearXidVer = nearXidVer; this.nearNodeId = nearNodeId; - readMap = new LinkedHashMap<>(1, 1.0f); - writeMap = new LinkedHashMap<>(txSize, 1.0f); + txState = new IgniteTxRemoteStateImpl(U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(1), + U.<IgniteTxKey, IgniteTxEntry>newLinkedHashMap(txSize)); } /** {@inheritDoc} */ @@ -322,7 +324,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { // Initialize cache entry. entry.cached(cached); - writeMap.put(entry.txKey(), entry); + txState.addWriteEntry(entry.txKey(), entry); addExplicit(entry); @@ -391,7 +393,7 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter { drVer, skipStore); - writeMap.put(key, txEntry); + txState.addWriteEntry(key, txEntry); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 3b5c962..f5f99f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -669,14 +669,6 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { public boolean serializable(); /** - * Checks whether given key has been removed within transaction. - * - * @param key Key to check. - * @return {@code True} if key has been removed. - */ - public boolean removed(IgniteTxKey key); - - /** * Gets allowed remaining time for this transaction. * * @return Remaining time. http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 0d62fcd..109eddf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.ObjectStreamException; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -50,7 +49,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; -import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.processors.cache.version.GridCachePlainVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; @@ -2107,11 +2105,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public boolean removed(IgniteTxKey key) { - return false; - } - - /** {@inheritDoc} */ @Override public long remainingTime() throws IgniteTxTimeoutCheckedException { return 0; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 570aa48..014aa8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -216,8 +216,7 @@ public class IgniteTxHandler { req.reads(), req.writes(), req.transactionNodes(), - req.last(), - req.lastBackups()); + req.last()); if (locTx.isRollbackOnly()) locTx.rollbackAsync(); @@ -398,7 +397,6 @@ public class IgniteTxHandler { if (req.onePhaseCommit()) { assert req.last(); - assert F.isEmpty(req.lastBackups()) || req.lastBackups().size() <= 1; tx.onePhaseCommit(true); } @@ -413,8 +411,7 @@ public class IgniteTxHandler { req.messageId(), req.miniId(), req.transactionNodes(), - req.last(), - req.lastBackups()); + req.last()); if (tx.isRollbackOnly()) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java index d4222b7..85a9561 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java @@ -19,6 +19,8 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; import java.util.Collections; +import java.util.Map; +import java.util.Set; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -39,6 +41,12 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxStateAdapter { /** */ private GridCacheContext cacheCtx; + /** */ + private IgniteTxEntry entry; + + /** */ + private boolean init; + /** {@inheritDoc} */ @Override public void addActiveCache(GridCacheContext ctx, IgniteTxLocalAdapter tx) throws IgniteCheckedException { @@ -150,4 +158,91 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxStateAdapter { if (cacheCtx != null) onTxEnd(cacheCtx, tx, commit); } + + /** {@inheritDoc} */ + @Override public IgniteTxEntry entry(IgniteTxKey key) { + if (entry != null && entry.txKey().equals(key)) + return entry; + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean hasWriteKey(IgniteTxKey key) { + return entry != null && entry.txKey().equals(key); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey> readSet() { + return Collections.emptySet(); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey> writeSet() { + return entry != null ? Collections.singleton(entry.txKey()) : Collections.<IgniteTxKey>emptySet(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> writeEntries() { + return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> readEntries() { + return Collections.emptyList(); + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() { + return entry != null ? Collections.singletonMap(entry.txKey(), entry) : + Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(); + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() { + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ + @Override public boolean empty() { + return entry == null; + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> allEntries() { + return entry != null ? Collections.singletonList(entry) : Collections.<IgniteTxEntry>emptyList(); + } + + /** {@inheritDoc} */ + @Override public boolean init(int txSize) { + if (!init) { + init = true; + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean initialized() { + return init; + } + + /** {@inheritDoc} */ + @Override public void addEntry(IgniteTxEntry entry) { + assert this.entry == null : "Entry already set [cur=" + this.entry + ", new=" + entry + ']'; + + this.entry = entry; + } + + /** {@inheritDoc} */ + @Override public void seal() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public IgniteTxEntry singleWrite() { + return entry; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 25d5c22..88e4fa2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -121,18 +121,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** */ private static final long serialVersionUID = 0L; - /** Per-transaction read map. */ - @GridToStringInclude - protected Map<IgniteTxKey, IgniteTxEntry> txMap; - - /** Read view on transaction map. */ - @GridToStringExclude - protected IgniteTxMap readView; - - /** Write view on transaction map. */ - @GridToStringExclude - protected IgniteTxMap writeView; - /** Minimal version encountered (either explicit lock or XID of this transaction). */ protected GridCacheVersion minVer; @@ -252,7 +240,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public boolean empty() { - return txMap.isEmpty(); + return txState.empty(); } /** {@inheritDoc} */ @@ -273,6 +261,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { assert false; + return false; } @@ -290,75 +279,61 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public boolean isStarted() { - return txMap != null; + return txState.initialized(); } /** {@inheritDoc} */ @Override public boolean hasWriteKey(IgniteTxKey key) { - return writeView.containsKey(key); + return txState.hasWriteKey(key); } /** * @return Transaction read set. */ @Override public Set<IgniteTxKey> readSet() { - return txMap == null ? Collections.<IgniteTxKey>emptySet() : readView.keySet(); + return txState.readSet(); } /** * @return Transaction write set. */ @Override public Set<IgniteTxKey> writeSet() { - return txMap == null ? Collections.<IgniteTxKey>emptySet() : writeView.keySet(); - } - - /** {@inheritDoc} */ - @Override public boolean removed(IgniteTxKey key) { - if (txMap == null) - return false; - - IgniteTxEntry e = txMap.get(key); - - return e != null && e.op() == DELETE; + return txState.writeSet(); } /** {@inheritDoc} */ @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() { - return readView == null ? Collections.<IgniteTxKey, IgniteTxEntry>emptyMap() : readView; + return txState.readMap(); } /** {@inheritDoc} */ @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() { - return writeView == null ? Collections.<IgniteTxKey, IgniteTxEntry>emptyMap() : writeView; + return txState.writeMap(); } /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> allEntries() { - return txMap == null ? Collections.<IgniteTxEntry>emptySet() : txMap.values(); + return txState.allEntries(); } /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> readEntries() { - return readView == null ? Collections.<IgniteTxEntry>emptyList() : readView.values(); + return txState.readEntries(); } /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> writeEntries() { - return writeView == null ? Collections.<IgniteTxEntry>emptyList() : writeView.values(); + return txState.writeEntries(); } /** {@inheritDoc} */ @Nullable @Override public IgniteTxEntry entry(IgniteTxKey key) { - return txMap == null ? null : txMap.get(key); + return txState.entry(key); } /** {@inheritDoc} */ @Override public void seal() { - if (readView != null) - readView.seal(); - - if (writeView != null) - writeView.seal(); + txState.seal(); } /** {@inheritDoc} */ @@ -413,7 +388,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter KeyCacheObject key, CacheEntryPredicate[] filter ) throws GridCacheFilterFailedException { - IgniteTxEntry e = txMap == null ? null : txMap.get(cacheCtx.txKey(key)); + IgniteTxEntry e = entry(cacheCtx.txKey(key)); if (e != null) { // We should look at tx entry previous value. If this is a user peek then previous @@ -858,7 +833,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter checkValid(); - boolean empty = F.isEmpty(near() ? txMap : writeMap()); + Collection<IgniteTxEntry> commitEntries = near() ? allEntries() : writeEntries(); + + boolean empty = F.isEmpty(commitEntries); // Register this transaction as completed prior to write-phase to // ensure proper lock ordering for removed entries. @@ -878,7 +855,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /* * Commit to cache. Note that for 'near' transaction we loop through all the entries. */ - for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) { + for (IgniteTxEntry txEntry : commitEntries) { GridCacheContext cacheCtx = txEntry.context(); GridDrType drType = cacheCtx.isDrEnabled() ? DR_PRIMARY : DR_NONE; @@ -3195,7 +3172,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * */ private void onException() { - for (IgniteTxEntry txEntry : txMap.values()) { + for (IgniteTxEntry txEntry : allEntries()) { GridCacheEntryEx cached0 = txEntry.cached(); if (cached0 != null) @@ -3442,16 +3419,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @return {@code True} if transaction was successfully started. */ public boolean init() { - if (txMap == null) { - txMap = U.newLinkedHashMap(txSize > 0 ? txSize : 16); + return !txState.init(txSize) || cctx.tm().onStarted(this); - readView = new IgniteTxMap(txMap, CU.reads()); - writeView = new IgniteTxMap(txMap, CU.writes()); - - return cctx.tm().onStarted(this); - } - - return true; } /** @@ -3462,7 +3431,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * cache (e.g. they have different stores). */ protected final void addActiveCache(GridCacheContext cacheCtx) throws IgniteCheckedException { - txState().addActiveCache(cacheCtx, this); + txState.addActiveCache(cacheCtx, this); } /** @@ -3537,7 +3506,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter "Invalid tx state for adding entry [op=" + op + ", val=" + val + ", entry=" + entry + ", filter=" + Arrays.toString(filter) + ", txCtx=" + cctx.tm().txContextVersion() + ", tx=" + this + ']'; - IgniteTxEntry old = txMap.get(key); + IgniteTxEntry old = entry(key); // Keep old filter if already have one (empty filter is always overridden). if (!filtersSet || !F.isEmptyOrNulls(filter)) { @@ -3601,7 +3570,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (!hasDrTtl) txEntry.expiry(expiryPlc); - txMap.put(key, txEntry); + txState.addEntry(txEntry); if (log.isDebugEnabled()) log.debug("Created transaction entry: " + txEntry); @@ -3663,7 +3632,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(IgniteTxLocalAdapter.class, this, "super", super.toString(), - "size", (txMap == null ? 0 : txMap.size())); + "size", allEntries().size()); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java index 9660e4e..a8cd16c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java @@ -35,8 +35,10 @@ public interface IgniteTxRemoteEx extends IgniteInternalTx { * @param rolledbackVers Rolled back version. * @param pendingVers Pending versions. */ - public void doneRemote(GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers, - Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers); + public void doneRemote(GridCacheVersion baseVer, + Collection<GridCacheVersion> committedVers, + Collection<GridCacheVersion> rolledbackVers, + Collection<GridCacheVersion> pendingVers); /** * @param e Sets write value for pessimistic transactions. http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java index 98f3841..fb64655 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java @@ -18,18 +18,40 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; +import java.util.Map; +import java.util.Set; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.jetbrains.annotations.Nullable; /** * */ public class IgniteTxRemoteStateImpl implements IgniteTxState { + /** Read set. */ + @GridToStringInclude + protected Map<IgniteTxKey, IgniteTxEntry> readMap; + + /** Write map. */ + @GridToStringInclude + protected Map<IgniteTxKey, IgniteTxEntry> writeMap; + + /** + * @param readMap Read map. + * @param writeMap Write map. + */ + public IgniteTxRemoteStateImpl(Map<IgniteTxKey, IgniteTxEntry> readMap, + Map<IgniteTxKey, IgniteTxEntry> writeMap) { + this.readMap = readMap; + this.writeMap = writeMap; + } + /** {@inheritDoc} */ @Override public boolean implicitSingle() { return false; @@ -102,4 +124,140 @@ public class IgniteTxRemoteStateImpl implements IgniteTxState { @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) { assert false; } + + /** {@inheritDoc} */ + @Override public IgniteTxEntry entry(IgniteTxKey key) { + IgniteTxEntry e = writeMap == null ? null : writeMap.get(key); + + if (e == null) + e = readMap == null ? null : readMap.get(key); + + return e; + } + + /** {@inheritDoc} */ + @Override public boolean hasWriteKey(IgniteTxKey key) { + return writeMap.containsKey(key); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey> readSet() { + return readMap.keySet(); + } + + /** {@inheritDoc} */ + @Override public Set<IgniteTxKey> writeSet() { + return writeMap.keySet(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> writeEntries() { + return writeMap.values(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> readEntries() { + return readMap.values(); + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() { + return writeMap; + } + + /** {@inheritDoc} */ + @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() { + return readMap; + } + + /** {@inheritDoc} */ + @Override public boolean empty() { + return readMap.isEmpty() && writeMap.isEmpty(); + } + + /** + * @param e Entry. + */ + public void setWriteValue(IgniteTxEntry e) { + IgniteTxEntry entry = writeMap.get(e.txKey()); + + if (entry == null) { + IgniteTxEntry rmv = readMap.remove(e.txKey()); + + if (rmv != null) { + e.cached(rmv.cached()); + + writeMap.put(e.txKey(), e); + } + // If lock is explicit. + else { + e.cached(e.context().cache().entryEx(e.key())); + + // explicit lock. + writeMap.put(e.txKey(), e); + } + } + else { + // Copy values. + entry.value(e.value(), e.hasWriteValue(), e.hasReadValue()); + entry.entryProcessors(e.entryProcessors()); + entry.op(e.op()); + entry.ttl(e.ttl()); + entry.explicitVersion(e.explicitVersion()); + + // Conflict resolution stuff. + entry.conflictVersion(e.conflictVersion()); + entry.conflictExpireTime(e.conflictExpireTime()); + } + } + + /** + * @param key Key. + * @param e Entry. + */ + public void addWriteEntry(IgniteTxKey key, IgniteTxEntry e) { + writeMap.put(key, e); + } + + /** + * Clears entry from transaction as it never happened. + * + * @param key key to be removed. + */ + public void clearEntry(IgniteTxKey key) { + readMap.remove(key); + writeMap.remove(key); + } + + /** {@inheritDoc} */ + @Override public Collection<IgniteTxEntry> allEntries() { + return F.concat(false, writeEntries(), readEntries()); + } + + /** {@inheritDoc} */ + @Override public boolean init(int txSize) { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean initialized() { + assert false; + + return false; + } + + /** {@inheritDoc} */ + @Override public void addEntry(IgniteTxEntry entry) { + assert false; + } + + /** {@inheritDoc} */ + @Override public void seal() { + assert false; + } + + /** {@inheritDoc} */ + @Override public IgniteTxEntry singleWrite() { + return null; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3b28e0b5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java index 5938453..1eece47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.Collection; +import java.util.Map; +import java.util.Set; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -54,5 +56,33 @@ public interface IgniteTxState { public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit); - // public IgniteTxEntry entry(IgniteTxKey key); + public IgniteTxEntry entry(IgniteTxKey key); + + public boolean hasWriteKey(IgniteTxKey key); + + public Set<IgniteTxKey> readSet(); + + public Set<IgniteTxKey> writeSet(); + + public Collection<IgniteTxEntry> writeEntries(); + + public Collection<IgniteTxEntry> readEntries(); + + public Map<IgniteTxKey, IgniteTxEntry> writeMap(); + + public Map<IgniteTxKey, IgniteTxEntry> readMap(); + + public boolean empty(); + + public Collection<IgniteTxEntry> allEntries(); + + public boolean init(int txSize); + + public boolean initialized(); + + public void addEntry(IgniteTxEntry entry); + + public void seal(); + + @Nullable public IgniteTxEntry singleWrite(); }
