Repository: ignite Updated Branches: refs/heads/ignite-1607-read 190310da0 -> b1d07144f
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1d07144 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1d07144 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1d07144 Branch: refs/heads/ignite-1607-read Commit: b1d07144f9725dea579c16c47805009c9e4f60f1 Parents: 190310d Author: sboikov <[email protected]> Authored: Mon Oct 12 09:58:34 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Oct 12 14:27:34 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 10 +- ...arOptimisticSerializableTxPrepareFuture.java | 187 ++++++++++++------- .../near/GridNearTxFinishFuture.java | 19 +- .../CacheSerializableTransactionsTest.java | 98 ++++++++++ .../processors/cache/IgniteTxAbstractTest.java | 42 +---- .../IgniteTxMultiThreadedAbstractTest.java | 15 +- ...CachePartitionedTxMultiThreadedSelfTest.java | 8 - 7 files changed, 263 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 9378017..fd0fb92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -756,9 +756,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme // Cache version for optimistic check. startVer = ver; - if (retVer) - resVer = isNear() ? ((GridNearCacheEntry)this).dhtVersion() : startVer; - GridCacheMvcc mvcc = mvccExtras(); owner = mvcc == null ? null : mvcc.anyOwner(); @@ -876,6 +873,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (ret != null && expiryPlc != null) updateTtl(expiryPlc); + + if (retVer) { + resVer = isNear() ? ((GridNearCacheEntry)this).dhtVersion() : startVer; + + if (resVer == null) + ret = null; + } } if (ret != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 04c4851..6bd21b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; @@ -76,8 +77,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPrepareFutureAdapter implements GridCacheMvccFuture<IgniteInternalTx> { /** */ - @GridToStringInclude - private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); + private KeyLockFuture keyLockFut = new KeyLockFuture(); /** */ private final AtomicReference<ClientRemapFuture> remapFutRef; @@ -103,10 +103,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre log.debug("Transaction future received owner changed callback: " + entry); if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) { - lockKeys.remove(entry.txKey()); - - // This will check for locks. - onDone(); + keyLockFut.onKeyLocked(entry.txKey()); return true; } @@ -151,10 +148,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre } /** - * @param nodeId Failed node ID. + * @param m Failed mapping. * @param e Error. */ - void onError(@Nullable UUID nodeId, Throwable e) { + void onError(@Nullable GridDistributedTxMapping m, Throwable e) { if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) { if (tx.onePhaseCommit()) { tx.markForBackupCheck(); @@ -165,28 +162,12 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre } } - if (e instanceof IgniteTxOptimisticCheckedException && nodeId != null) - tx.removeMapping(nodeId); + if (e instanceof IgniteTxOptimisticCheckedException && m != null) + tx.removeMapping(m.node().id()); err.compareAndSet(null, e); - } - - /** - * @return {@code True} if all locks are owned. - */ - private boolean checkLocks() { - boolean locked = lockKeys.isEmpty(); - if (locked) { - if (log.isDebugEnabled()) - log.debug("All locks are acquired for near prepare future: " + this); - } - else { - if (log.isDebugEnabled()) - log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']'); - } - - return locked; + keyLockFut.onDone(e); } /** {@inheritDoc} */ @@ -199,7 +180,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre if (f.futureId().equals(res.miniId())) { assert f.node().id().equals(nodeId); - f.onResult(nodeId, res); + f.onResult(res); } } } @@ -208,14 +189,11 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre /** {@inheritDoc} */ @Override public boolean onDone(IgniteInternalTx t, Throwable err) { - this.err.compareAndSet(null, err); - - err = this.err.get(); - - // If locks were not acquired yet, delay completion. - if (isDone() || (err == null && !checkLocks())) + if (isDone()) return false; + this.err.compareAndSet(null, err); + return onComplete(); } @@ -490,10 +468,15 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>(); for (IgniteTxEntry read : reads) - map(read, topVer, mappings, false); + map(read, topVer, mappings, false, remap); for (IgniteTxEntry write : writes) - map(write, topVer, mappings, true); + map(write, topVer, mappings, true, remap); + + keyLockFut.onAllKeysAdded(); + + if (!remap) + add(keyLockFut); if (isDone()) { if (log.isDebugEnabled()) @@ -518,28 +501,32 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre add(fut); } - Collection<MiniFuture> futs = (Collection)futures(); + Collection<IgniteInternalFuture<?>> futs = (Collection)futures(); - Iterator<MiniFuture> it = futs.iterator(); + Iterator<IgniteInternalFuture<?>> it = futs.iterator(); while (it.hasNext()) { - MiniFuture fut = it.next(); + IgniteInternalFuture<?> fut0 = it.next(); - if (skipFuture(remap, fut)) + if (skipFuture(remap, fut0)) continue; + MiniFuture fut = (MiniFuture)fut0; + IgniteCheckedException err = prepare(fut); if (err != null) { while (it.hasNext()) { - fut = it.next(); + fut0 = it.next(); - if (skipFuture(remap, fut)) + if (skipFuture(remap, fut0)) continue; + fut = (MiniFuture)fut0; + tx.removeMapping(fut.mapping().node().id()); - fut.onResult(err); + fut.onResult(new IgniteCheckedException("Failed to prepare transaction.", err)); } break; @@ -554,8 +541,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre * @param fut Future. * @return {@code True} if skip future during remap. */ - private boolean skipFuture(boolean remap, MiniFuture fut) { - return remap && fut.rcvRes.get(); + private boolean skipFuture(boolean remap, IgniteInternalFuture<?> fut) { + return !(isMini(fut)) || (remap && ((MiniFuture)fut).rcvRes.get()); } /** @@ -613,7 +600,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { try { - fut.onResult(n.id(), prepFut.get()); + fut.onResult(prepFut.get()); } catch (IgniteCheckedException e) { fut.onResult(e); @@ -647,12 +634,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre * @param topVer Topology version. * @param curMapping Current mapping. * @param waitLock Wait lock flag. + * @param remap Remap flag. */ private void map( IgniteTxEntry entry, AffinityTopologyVersion topVer, Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> curMapping, - boolean waitLock + boolean waitLock, + boolean remap ) { GridCacheContext cacheCtx = entry.context(); @@ -678,9 +667,9 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre else entry.cached(cacheCtx.local().entryEx(entry.key(), topVer)); - if (cacheCtx.isNear() || cacheCtx.isLocal()) { + if (!remap && (cacheCtx.isNear() || cacheCtx.isLocal())) { if (waitLock && entry.explicitVersion() == null) - lockKeys.add(entry.txKey()); + keyLockFut.addLockKey(entry.txKey()); } IgniteBiTuple<ClusterNode, Boolean> key = F.t(primary, cacheCtx.isNear()); @@ -735,16 +724,23 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre /** {@inheritDoc} */ @Override public String toString() { - Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { - @Override public String apply(IgniteInternalFuture<?> f) { - return "[node=" + ((MiniFuture)f).node().id() + - ", loc=" + ((MiniFuture)f).node().isLocal() + - ", done=" + f.isDone() + "]"; - } - }); + Collection<String> futs = F.viewReadOnly(futures(), + new C1<IgniteInternalFuture<?>, String>() { + @Override public String apply(IgniteInternalFuture<?> f) { + return "[node=" + ((MiniFuture)f).node().id() + + ", loc=" + ((MiniFuture)f).node().isLocal() + + ", done=" + f.isDone() + "]"; + } + }, + new P1<IgniteInternalFuture<?>>() { + @Override public boolean apply(IgniteInternalFuture<?> f) { + return isMini(f); + } + }); return S.toString(GridNearOptimisticSerializableTxPrepareFuture.class, this, "innerFuts", futs, + "keyLockFut", keyLockFut, "tx", tx, "super", super.toString()); } @@ -829,7 +825,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre */ void onResult(Throwable e) { if (rcvRes.compareAndSet(false, true)) { - onError(m.node().id(), e); + onError(m, e); if (log.isDebugEnabled()) log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']'); @@ -860,17 +856,16 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre } /** - * @param nodeId Failed node ID. * @param res Result callback. */ - void onResult(UUID nodeId, final GridNearTxPrepareResponse res) { + void onResult(final GridNearTxPrepareResponse res) { if (isDone()) return; if (rcvRes.compareAndSet(false, true)) { if (res.error() != null) { // Fail the whole compound future. - onError(nodeId, res.error()); + onError(m, res.error()); onDone(res.error()); } @@ -884,11 +879,11 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre ClientRemapFuture remapFut = new ClientRemapFuture(); if (remapFutRef.compareAndSet(null, remapFut)) { - Collection<MiniFuture> futs = (Collection)futures(); + Collection<IgniteInternalFuture<?>> futs = (Collection)futures(); - for (MiniFuture fut : futs) { - if (fut != this) - remapFut.add(fut); + for (IgniteInternalFuture<?> fut : futs) { + if (isMini(fut) && fut != this) + remapFut.add((MiniFuture)fut); } remapFut.markInitialized(); @@ -975,4 +970,68 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); } } + + /** + * Keys lock future. + */ + private class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { + /** */ + @GridToStringInclude + private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); + + /** */ + private volatile boolean allKeysAdded; + + /** + * @param key Key to track for locking. + */ + private void addLockKey(IgniteTxKey key) { + assert !allKeysAdded; + + lockKeys.add(key); + } + + /** + * @param key Locked keys. + */ + private void onKeyLocked(IgniteTxKey key) { + lockKeys.remove(key); + + checkLocks(); + } + + /** + * Moves future to the ready state. + */ + private void onAllKeysAdded() { + allKeysAdded = true; + + checkLocks(); + } + + /** + * @return {@code True} if all locks are owned. + */ + private boolean checkLocks() { + boolean locked = lockKeys.isEmpty(); + + if (locked && allKeysAdded) { + if (log.isDebugEnabled()) + log.debug("All locks are acquired for near prepare future: " + this); + + onDone((GridNearTxPrepareResponse)null); + } + else { + if (log.isDebugEnabled()) + log.debug("Still waiting for locks [fut=" + this + ", keys=" + lockKeys + ']'); + } + + return locked; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(KeyLockFuture.class, this, super.toString()); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 85311cc..2c1a79f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -266,9 +266,18 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } if (tx.onePhaseCommit()) { - finishOnePhase(); + try { + boolean commit = this.commit && err == null; + + tx.finish(commit); + + finishOnePhase(commit); - tx.tmFinish(commit && err == null); + tx.tmFinish(commit); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to finish transaction: " + tx, e); + } } Throwable th = this.err.get(); @@ -509,9 +518,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } /** - * + * @param commit Commit flag. */ - private void finishOnePhase() { + private void finishOnePhase(boolean commit) { // No need to send messages as transaction was already committed on remote node. // Finish local mapping only as we need send commit message to backups. for (GridDistributedTxMapping m : mappings.values()) { @@ -521,6 +530,8 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu // Add new future. if (fut != null) add(fut); + + break; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 0285016..1302698 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -381,6 +381,73 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testTxRollback() throws Exception { + Ignite ignite0 = ignite(0); + Ignite ignite1 = ignite(1); + + final IgniteTransactions txs0 = ignite0.transactions(); + final IgniteTransactions txs1 = ignite1.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache0 = ignite0.createCache(ccfg); + IgniteCache<Integer, Integer> cache1 = ignite1.cache(ccfg.getName()); + + List<Integer> keys = testKeys(cache0); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + for (int i = 0; i < 100; i++) { + try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache0.get(key); + + assertEquals(expVal, val); + + cache0.put(key, i); + + tx.rollback(); + } + + try (Transaction tx = txs0.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache0.get(key); + + assertEquals(expVal, val); + + cache0.put(key, i); + + tx.commit(); + + expVal = i; + } + + try (Transaction tx = txs1.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache1.get(key); + + assertEquals(expVal, val); + + cache1.put(key, val); + + tx.commit(); + } + } + + checkValue(key, expVal, cache0.getName()); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ public void testTxCommitReadOnlyGetAll() throws Exception { Ignite ignite0 = ignite(0); @@ -428,6 +495,37 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + public void testTxCommitReadWriteTwoNodes() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + Integer key0 = primaryKey(ignite(0).cache(null)); + Integer key1 = primaryKey(ignite(1).cache(null)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key0, key0); + + cache.get(key1); + + tx.commit(); + } + } + finally { + destroyCache(ignite0, ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ public void testTxConflictRead1() throws Exception { txConflictRead(true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java index fcf46cf..dff0344 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxAbstractTest.java @@ -175,9 +175,7 @@ abstract class IgniteTxAbstractTest extends GridCommonAbstractTest { for (int i = 0; i < iterations(); i++) { IgniteCache<Integer, String> cache = jcache(gridIdx); - Transaction tx = ignite(gridIdx).transactions().txStart(concurrency, isolation, 0, 0); - - try { + try (Transaction tx = ignite(gridIdx).transactions().txStart(concurrency, isolation, 0, 0)) { int prevKey = -1; for (Integer key : getKeys()) { @@ -236,46 +234,22 @@ abstract class IgniteTxAbstractTest extends GridCommonAbstractTest { debug("Committed transaction [i=" + i + ", tx=" + tx + ']'); } catch (TransactionOptimisticException e) { - if (concurrency != OPTIMISTIC || isolation != SERIALIZABLE) { - error("Received invalid optimistic failure.", e); + if (!(concurrency == OPTIMISTIC && isolation == SERIALIZABLE)) { + log.error("Unexpected error: " + e, e); throw e; } - - if (isTestDebug()) - info("Optimistic transaction failure (will rollback) [i=" + i + ", msg=" + e.getMessage() + - ", tx=" + tx.xid() + ']'); - - try { - tx.rollback(); - } - catch (IgniteException ex) { - error("Failed to rollback optimistic failure: " + tx, ex); - - throw ex; - } } - catch (Exception e) { - error("Transaction failed (will rollback): " + tx, e); - - tx.rollback(); + catch (Throwable e) { + log.error("Unexpected error: " + e, e); throw e; } - catch (Error e) { - error("Error when executing transaction (will rollback): " + tx, e); - - tx.rollback(); + } - throw e; - } - finally { - Transaction t = ignite(gridIdx).transactions().tx(); + Transaction tx = ignite(gridIdx).transactions().tx(); - assert t == null : "Thread should not have transaction upon completion ['t==tx'=" + (t == tx) + - ", t=" + t + (t != tx ? "tx=" + tx : "tx=''") + ']'; - } - } + assertNull("Thread should not have transaction upon completion", tx); if (printMemoryStats()) { if (cntr.getAndIncrement() % 100 == 0) http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java index 191feb6..f13ba8c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxMultiThreadedAbstractTest.java @@ -250,8 +250,8 @@ public abstract class IgniteTxMultiThreadedAbstractTest extends IgniteTxAbstract break; } - catch(TransactionOptimisticException e) { - log.info("Got error, will retry: " + e); + catch (TransactionOptimisticException e) { + // Retry. } } } @@ -300,8 +300,17 @@ public abstract class IgniteTxMultiThreadedAbstractTest extends IgniteTxAbstract assertEquals((long)THREADS * ITERATIONS, total); + // Try to update one more time to make sure cache is in consistent state. + try (Transaction tx = grid(0).transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + long val = cache.get(key); + + cache.put(key, val); + + tx.commit(); + } + for (int i = 0; i < gridCount(); i++) - assertEquals(total, (Object)cache.get(key)); + assertEquals(total, grid(i).cache(null).get(key)); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/b1d07144/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java index 346bd34..f76361a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedTxMultiThreadedSelfTest.java @@ -40,11 +40,6 @@ public class GridCachePartitionedTxMultiThreadedSelfTest extends IgniteTxMultiTh private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); /** {@inheritDoc} */ - @Override public void testOptimisticSerializableCommitMultithreaded() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-806"); - } - - /** {@inheritDoc} */ @SuppressWarnings({"ConstantConditions"}) @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration c = super.getConfiguration(gridName); @@ -53,9 +48,6 @@ public class GridCachePartitionedTxMultiThreadedSelfTest extends IgniteTxMultiTh CacheConfiguration cc = defaultCacheConfiguration(); - // TODO IGNITE-1607 add test with near cache. - cc.setNearConfiguration(null); - cc.setCacheMode(PARTITIONED); cc.setBackups(1);
