Repository: ignite Updated Branches: refs/heads/ignite-1607-read [created] a273299bb
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a273299b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a273299b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a273299b Branch: refs/heads/ignite-1607-read Commit: a273299bb970d359b43e065d664c9284021ad3c6 Parents: 920d747 Author: sboikov <[email protected]> Authored: Mon Oct 5 17:17:40 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Oct 5 17:17:40 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheEntryEx.java | 5 +- .../processors/cache/GridCacheMvcc.java | 6 ++ .../distributed/GridDistributedCacheEntry.java | 2 +- .../distributed/dht/GridDhtCacheEntry.java | 6 +- .../distributed/dht/GridDhtLockFuture.java | 1 + .../cache/local/GridLocalCacheEntry.java | 3 +- .../cache/transactions/IgniteTxEntry.java | 36 ++++++++++-- .../transactions/IgniteTxLocalAdapter.java | 61 +------------------- .../cache/transactions/IgniteTxManager.java | 6 +- .../CacheSerializableTransactionsTest.java | 2 +- .../processors/cache/GridCacheTestEntryEx.java | 2 +- .../loadtests/hashmap/GridHashMapLoadTest.java | 2 +- 12 files changed, 57 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a273299b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 430590a..fb4fcdd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -510,12 +510,13 @@ public interface GridCacheEntryEx { * * @param tx Cache transaction. * @param timeout Timeout for lock acquisition. + * @param serReadVer Optional read entry version for optimistic serializable transaction. * @return {@code True} if lock was acquired, {@code false} otherwise. * @throws GridCacheEntryRemovedException If this entry is obsolete. * @throws GridDistributedLockCancelledException If lock has been cancelled. */ - public boolean tmLock(IgniteInternalTx tx, long timeout) throws GridCacheEntryRemovedException, - GridDistributedLockCancelledException; + public boolean tmLock(IgniteInternalTx tx, long timeout, @Nullable GridCacheVersion serReadVer) + throws GridCacheEntryRemovedException, GridDistributedLockCancelledException; /** * Unlocks acquired lock. http://git-wip-us.apache.org/repos/asf/ignite/blob/a273299b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java index c2102bd..337be06 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java @@ -455,6 +455,7 @@ public final class GridCacheMvcc { /*nearVer*/null, threadId, ver, + null, timeout, reenter, tx, @@ -469,6 +470,7 @@ public final class GridCacheMvcc { * @param nearVer Near version. * @param threadId Thread ID. * @param ver Lock version. + * @param serReadVer Optional read entry version for optimistic serializable transaction. * @param timeout Lock acquisition timeout. * @param reenter Reentry flag ({@code true} if reentry is allowed). * @param tx Transaction flag. @@ -483,6 +485,7 @@ public final class GridCacheMvcc { @Nullable GridCacheVersion nearVer, long threadId, GridCacheVersion ver, + @Nullable GridCacheVersion serReadVer, long timeout, boolean reenter, boolean tx, @@ -510,6 +513,9 @@ public final class GridCacheMvcc { if (owner == null || owner.threadId() != threadId) return null; } + + if (serReadVer != null && !serReadVer.equals(ver)) + return null; } UUID locNodeId = cctx.nodeId(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a273299b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java index d4f0d6c..bed3275 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java @@ -742,7 +742,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { } /** {@inheritDoc} */ - @Override public boolean tmLock(IgniteInternalTx tx, long timeout) + @Override public boolean tmLock(IgniteInternalTx tx, long timeout, GridCacheVersion serReadVer) throws GridCacheEntryRemovedException, GridDistributedLockCancelledException { if (tx.local()) // Null is returned if timeout is negative and there is other lock owner. http://git-wip-us.apache.org/repos/asf/ignite/blob/a273299b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index bf22c7d..82af820 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -163,6 +163,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { * @param topVer Topology version. * @param threadId Owning thread ID. * @param ver Lock version. + * @param serReadVer Optional read entry version for optimistic serializable transaction. * @param timeout Timeout to acquire lock. * @param reenter Reentry flag. * @param tx Tx flag. @@ -177,6 +178,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { AffinityTopologyVersion topVer, long threadId, GridCacheVersion ver, + @Nullable GridCacheVersion serReadVer, long timeout, boolean reenter, boolean tx, @@ -212,6 +214,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { nearVer, threadId, ver, + serReadVer, timeout, reenter, tx, @@ -250,7 +253,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override public boolean tmLock(IgniteInternalTx tx, long timeout) + @Override public boolean tmLock(IgniteInternalTx tx, long timeout, GridCacheVersion serReadVer) throws GridCacheEntryRemovedException, GridDistributedLockCancelledException { if (tx.local()) { GridDhtTxLocalAdapter dhtTx = (GridDhtTxLocalAdapter)tx; @@ -262,6 +265,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { tx.topologyVersion(), tx.threadId(), tx.xidVersion(), + serReadVer, timeout, /*reenter*/false, /*tx*/true, http://git-wip-us.apache.org/repos/asf/ignite/blob/a273299b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 2c16534..f9adfe8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -400,6 +400,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> topVer, threadId, lockVer, + null, timeout, /*reenter*/false, inTx(), http://git-wip-us.apache.org/repos/asf/ignite/blob/a273299b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java index cacac13..383ba69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java @@ -191,7 +191,8 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { } /** {@inheritDoc} */ - @Override public boolean tmLock(IgniteInternalTx tx, long timeout) throws GridCacheEntryRemovedException { + @Override public boolean tmLock(IgniteInternalTx tx, long timeout, GridCacheVersion serReadVer) + throws GridCacheEntryRemovedException { GridCacheMvccCandidate cand = addLocal( tx.threadId(), tx.xidVersion(), http://git-wip-us.apache.org/repos/asf/ignite/blob/a273299b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index 3c33d19..d68efec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -175,6 +175,9 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { */ private byte flags; + /** */ + private GridCacheVersion serReadVer; + /** * Required by {@link Externalizable} */ @@ -822,6 +825,13 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { this.entryProcessorCalcVal = entryProcessorCalcVal; } + /** + * @return + */ + public GridCacheVersion serializableReadVersion() { + return serReadVer; + } + /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -884,18 +894,24 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { writer.incrementState(); case 8: - if (!writer.writeByteArray("transformClosBytes", transformClosBytes)) + if (!writer.writeMessage("serReadVer", serReadVer)) return false; writer.incrementState(); case 9: - if (!writer.writeLong("ttl", ttl)) + if (!writer.writeByteArray("transformClosBytes", transformClosBytes)) return false; writer.incrementState(); case 10: + if (!writer.writeLong("ttl", ttl)) + return false; + + writer.incrementState(); + + case 11: if (!writer.writeMessage("val", val)) return false; @@ -979,7 +995,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 8: - transformClosBytes = reader.readByteArray("transformClosBytes"); + serReadVer = reader.readMessage("serReadVer"); if (!reader.isLastRead()) return false; @@ -987,7 +1003,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 9: - ttl = reader.readLong("ttl"); + transformClosBytes = reader.readByteArray("transformClosBytes"); if (!reader.isLastRead()) return false; @@ -995,6 +1011,14 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); case 10: + ttl = reader.readLong("ttl"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: val = reader.readMessage("val"); if (!reader.isLastRead()) @@ -1014,7 +1038,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 12; } /** {@inheritDoc} */ @@ -1038,4 +1062,4 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { return GridToStringBuilder.toString(IgniteTxEntry.class, this, "xidVer", tx == null ? "null" : tx.xidVersion()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a273299b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index aa0ffe8..826172e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1509,7 +1509,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param cacheCtx Cache context. * @param map Return map. * @param missedMap Missed keys. - * @param redos Keys to retry. * @param deserializePortable Deserialize portable flag. * @param skipVals Skip values flag. * @param keepCacheObjects Keep cache objects flag. @@ -1520,14 +1519,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter final GridCacheContext cacheCtx, final Map<K, V> map, final Map<KeyCacheObject, GridCacheVersion> missedMap, - @Nullable final Collection<KeyCacheObject> redos, final boolean deserializePortable, final boolean skipVals, final boolean keepCacheObjects, final boolean skipStore ) { - assert redos != null || pessimistic(); - if (log.isDebugEnabled()) log.debug("Loading missed values for missed map: " + missedMap); @@ -1863,7 +1859,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return checkMissed(cacheCtx, retMap, missed, - null, deserializePortable, skipVals, keepCacheObjects, @@ -1912,8 +1907,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter else { assert optimistic() || readCommitted() || skipVals; - final Collection<KeyCacheObject> redos = new ArrayList<>(); - if (!missed.isEmpty()) { if (!readCommitted()) for (Iterator<KeyCacheObject> it = missed.keySet().iterator(); it.hasNext(); ) { @@ -1932,64 +1925,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter IgniteInternalFuture<Map<K, V>> fut0 = checkMissed(cacheCtx, retMap, missed, - redos, deserializePortable, skipVals, keepCacheObjects, skipStore); - return new GridEmbeddedFuture<>( - // First future. - fut0, - // Closure that returns another future, based on result from first. - new PMC<Map<K, V>>() { - @Override public IgniteInternalFuture<Map<K, V>> postMiss(Map<K, V> map) { - if (redos.isEmpty()) - return new GridFinishedFuture<>( - Collections.<K, V>emptyMap()); - - if (log.isDebugEnabled()) - log.debug("Starting to future-recursively get values for keys: " + redos); - - // Future recursion. - return getAllAsync(cacheCtx, - redos, - null, - deserializePortable, - skipVals, - true, - skipStore); - } - }, - // Finalize. - new FinishClosure<Map<K, V>>() { - @Override Map<K, V> finish(Map<K, V> loaded) { - for (Map.Entry<K, V> entry : loaded.entrySet()) { - KeyCacheObject cacheKey = (KeyCacheObject)entry.getKey(); - - IgniteTxEntry txEntry = entry(cacheCtx.txKey(cacheKey)); - - CacheObject val = (CacheObject)entry.getValue(); - - if (!readCommitted()) - txEntry.readValue(val); - - if (!F.isEmpty(txEntry.entryProcessors())) - val = txEntry.applyEntryProcessors(val); - - cacheCtx.addResult(retMap, - cacheKey, - val, - skipVals, - keepCacheObjects, - deserializePortable, - false); - } - - return retMap; - } - } - ); + return fut0; } return new GridFinishedFuture<>(retMap); http://git-wip-us.apache.org/repos/asf/ignite/blob/a273299b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 477816d..34c5377 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1396,7 +1396,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert !entry1.detached() : "Expected non-detached entry for near transaction " + "[locNodeId=" + cctx.localNodeId() + ", entry=" + entry1 + ']'; - if (!entry1.tmLock(tx, timeout)) { + GridCacheVersion serReadVer = txEntry1.serializableReadVersion(); + + assert serReadVer == null || ser : txEntry1; + + if (!entry1.tmLock(tx, timeout, serReadVer)) { // Unlock locks locked so far. for (IgniteTxEntry txEntry2 : entries) { if (txEntry2 == txEntry1) http://git-wip-us.apache.org/repos/asf/ignite/blob/a273299b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 68899e7..3936d1d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -121,7 +121,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void _testTxRollbackRead2() throws Exception { + public void testTxRollbackRead2() throws Exception { txRollbackRead(false); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a273299b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index 1fef4d5..b754d80 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -552,7 +552,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr } /** @inheritDoc */ - @Override public boolean tmLock(IgniteInternalTx tx, long timeout) { + @Override public boolean tmLock(IgniteInternalTx tx, long timeout, GridCacheVersion serReadVer) { assert false; return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/a273299b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java index cf2ff41..bbff5a8 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridHashMapLoadTest.java @@ -81,7 +81,7 @@ public class GridHashMapLoadTest extends GridCommonAbstractTest { map.put(key, new GridCacheMapEntry(ctx, ctx.toCacheKeyObject(key), key.hashCode(), ctx.toCacheObject(val), null, 1) { - @Override public boolean tmLock(IgniteInternalTx tx, long timeout) { + @Override public boolean tmLock(IgniteInternalTx tx, long timeout, GridCacheVersion serReadVer) { return false; }
