Repository: ignite Updated Branches: refs/heads/ignite-1607-read fdd6f1c9e -> 7572810fe
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7572810f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7572810f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7572810f Branch: refs/heads/ignite-1607-read Commit: 7572810fe78d026932e8c7a5d6767ffb297c84d4 Parents: fdd6f1c Author: sboikov <[email protected]> Authored: Thu Oct 8 14:52:48 2015 +0300 Committer: sboikov <[email protected]> Committed: Thu Oct 8 15:21:01 2015 +0300 ---------------------------------------------------------------------- .../transactions/IgniteTxLocalAdapter.java | 59 ++++++++++++-------- .../CacheSerializableTransactionsTest.java | 43 ++++++++++++++ 2 files changed, 80 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7572810f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 99b4c45..fb82ef6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -442,7 +442,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter for (KeyCacheObject key : keys) { while (true) { - GridCacheEntryEx entry = cacheCtx.cache().entryEx(key); + IgniteTxEntry txEntry = entry(cacheCtx.txKey(key)); + + GridCacheEntryEx entry = txEntry == null ? cacheCtx.cache().entryEx(key) : + txEntry.cached(); + + if (entry == null) + continue; try { T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned(this, @@ -469,6 +475,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter catch (GridCacheEntryRemovedException ignore) { if (log.isDebugEnabled()) log.debug("Got removed entry, will retry: " + key); + + if (txEntry != null) + txEntry.cached(cacheCtx.cache().entryEx(key)); } } } @@ -484,36 +493,42 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert ver != null : key; - c.apply(key, val, ver); + if (val != null) { + if (nextVer == null) + nextVer = cacheCtx.versions().next(); - if (nextVer == null) - nextVer = cacheCtx.versions().next(); + CacheObject cacheVal = cacheCtx.toCacheObject(val); - CacheObject cacheVal = cacheCtx.toCacheObject(val); + while (true) { + GridCacheEntryEx entry = cacheCtx.cache().entryEx(key); - while (true) { - GridCacheEntryEx entry = cacheCtx.cache().entryEx(key); + try { + boolean set = entry.versionedValue(cacheVal, ver, nextVer); - try { - boolean set = entry.versionedValue(cacheVal, ver, nextVer); + if (set) + ver = nextVer; - if (log.isDebugEnabled()) - log.debug("Set value loaded from store into entry [set=" + set + - ", curVer=" + ver + ", newVer=" + nextVer + ", " + - "entry=" + entry + ']'); + if (log.isDebugEnabled()) + log.debug("Set value loaded from store into entry [set=" + set + + ", curVer=" + ver + ", newVer=" + nextVer + ", " + + "entry=" + entry + ']'); - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry, (will retry): " + entry); - } - catch (IgniteCheckedException e) { - // Wrap errors (will be unwrapped). - throw new GridClosureException(e); + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry, (will retry): " + entry); + } + catch (IgniteCheckedException e) { + // Wrap errors (will be unwrapped). + throw new GridClosureException(e); + } } } + else + ver = IgniteTxEntry.READ_NEW_ENTRY_VER; + c.apply(key, val, ver); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/7572810f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index a620ee5..8ecd045 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -130,6 +130,49 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testTxLoadFromStore() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + if (ccfg.getCacheStoreFactory() == null) + continue; + + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = testKeys(cache); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer storeVal = -1; + + storeMap.put(key, storeVal); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(storeVal, val); + + tx.commit(); + } + + checkValue(key, storeVal, cache.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ public void testTxCommitReadOnly1() throws Exception { Ignite ignite0 = ignite(0);
