Repository: ignite Updated Branches: refs/heads/ignite-1607 a5103a87b -> 820c0ecd3
ignite-1607 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/820c0ecd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/820c0ecd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/820c0ecd Branch: refs/heads/ignite-1607 Commit: 820c0ecd3b96b2d8ae8787d7566a563bbaa7f6df Parents: a5103a8 Author: sboikov <[email protected]> Authored: Mon Oct 5 11:34:31 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Oct 5 11:59:30 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheMetricsImpl.java | 12 +- .../processors/cache/GridCacheProcessor.java | 13 - .../cache/distributed/dht/GridDhtTxLocal.java | 2 +- ...arOptimisticSerializableTxPrepareFuture.java | 9 +- .../near/GridNearOptimisticTxPrepareFuture.java | 8 +- .../cache/distributed/near/GridNearTxLocal.java | 2 +- .../transactions/IgniteTransactionsImpl.java | 6 - .../cache/transactions/IgniteTxHandler.java | 4 +- .../cache/transactions/IgniteTxManager.java | 299 +------- .../cache/version/GridCacheVersionManager.java | 49 +- .../apache/ignite/transactions/Transaction.java | 2 +- .../transactions/TransactionIsolation.java | 5 +- .../cache/CacheDeadlockFreeTxTest.java | 484 ------------- .../cache/CacheSerializableTxTest.java | 687 +++++++++++++++++++ 14 files changed, 736 insertions(+), 846 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index dfa0217..a60c22b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -351,7 +351,7 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public int getTxCommitQueueSize() { - return cctx.tm().commitQueueSize(); + return 0; } /** {@inheritDoc} */ @@ -366,12 +366,12 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public int getTxPrepareQueueSize() { - return cctx.tm().prepareQueueSize(); + return 0; } /** {@inheritDoc} */ @Override public int getTxStartVersionCountsSize() { - return cctx.tm().startVersionCountsSize(); + return 0; } /** {@inheritDoc} */ @@ -396,17 +396,17 @@ public class CacheMetricsImpl implements CacheMetrics { /** {@inheritDoc} */ @Override public int getTxDhtCommitQueueSize() { - return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().commitQueueSize() : -1; + return 0; } /** {@inheritDoc} */ @Override public int getTxDhtPrepareQueueSize() { - return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().prepareQueueSize() : -1; + return 0; } /** {@inheritDoc} */ @Override public int getTxDhtStartVersionCountsSize() { - return cctx.isNear() && dhtCtx != null ? dhtCtx.tm().startVersionCountsSize() : -1; + return 0; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index daa4475..4bdf4e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -140,7 +140,6 @@ import static org.apache.ignite.internal.IgniteComponentType.JTA; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_TX_CONFIG; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; -import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; /** * Cache processor. @@ -411,15 +410,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { throw new IgniteCheckedException("Cannot start cache in PRIVATE or ISOLATED deployment mode: " + ctx.config().getDeploymentMode()); - if (!c.getTransactionConfiguration().isTxSerializableEnabled() && - c.getTransactionConfiguration().getDefaultTxIsolation() == SERIALIZABLE) - U.warn(log, - "Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " + - "(most likely misconfiguration - either update 'isTxSerializableEnabled' or " + - "'defaultTxIsolationLevel' properties) for cache: " + U.maskName(cc.getName()), - "Serializable transactions are disabled while default transaction isolation is SERIALIZABLE " + - "for cache: " + U.maskName(cc.getName())); - if (cc.isWriteBehindEnabled()) { if (cfgStore == null) throw new IgniteCheckedException("Cannot enable write-behind (writer or store is not provided) " + @@ -632,9 +622,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { sharedCtx = createSharedContext(ctx, CU.startStoreSessionListeners(ctx, ctx.config().getCacheStoreSessionListenerFactories())); - ctx.performance().add("Disable serializable transactions (set 'txSerializableEnabled' to false)", - !ctx.config().getTransactionConfiguration().isTxSerializableEnabled()); - for (int i = 0; i < cfgs.length; i++) { if (ctx.config().isDaemon() && !CU.isMarshallerCache(cfgs[i].getName())) continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 4f8469f..2071275 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -595,7 +595,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (finish(false) || state() == UNKNOWN) fut.finish(); else - fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this))); + fut.onError(new IgniteCheckedException("Failed to rollback transaction: " + CU.txString(this))); } catch (IgniteTxOptimisticCheckedException e) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 9056ae9..62fc40f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,7 +81,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre public GridNearOptimisticSerializableTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { super(cctx, tx); - assert tx.optimistic() : tx; + assert tx.optimistic() && tx.serializable() : tx; // Should wait for all mini futures completion before finishing tx. ignoreChildFailures(IgniteCheckedException.class); @@ -436,9 +435,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre return; } - prepare( - tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(), - tx.writeEntries()); + prepare(tx.readEntries(), tx.writeEntries()); markInitialized(); } @@ -519,7 +516,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearTxPre futId, tx.topologyVersion(), tx, - tx.optimistic() && tx.serializable() ? m.reads() : null, + m.reads(), m.writes(), m.near(), txMapping.transactionNodes(), http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 25028c4..4a0caa9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -82,7 +82,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { super(cctx, tx); - assert tx.optimistic() : tx; + assert tx.optimistic() && !tx.serializable() : tx; } /** {@inheritDoc} */ @@ -449,9 +449,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd return; } - prepare( - tx.optimistic() && tx.serializable() ? tx.readEntries() : Collections.<IgniteTxEntry>emptyList(), - tx.writeEntries()); + prepare(Collections.<IgniteTxEntry>emptyList(), tx.writeEntries()); markInitialized(); } @@ -572,7 +570,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearTxPrepareFutureAd futId, tx.topologyVersion(), tx, - tx.optimistic() && tx.serializable() ? m.reads() : null, + null, m.writes(), m.near(), txMapping.transactionNodes(), http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index ef9f77e..721de47 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -753,7 +753,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (fut == null) { // Future must be created before any exception can be thrown. if (optimistic()) { - fut = isolation() == TransactionIsolation.SERIALIZABLE_TRY_LOCK ? + fut = serializable() ? new GridNearOptimisticSerializableTxPrepareFuture(cctx, this) : new GridNearOptimisticTxPrepareFuture(cctx, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index c0c2284..716676f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -147,12 +147,6 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { cctx.kernalContext().gateway().readLock(); try { - TransactionConfiguration cfg = cctx.gridConfig().getTransactionConfiguration(); - - if (!cfg.isTxSerializableEnabled() && isolation == SERIALIZABLE) - throw new IllegalArgumentException("SERIALIZABLE isolation level is disabled (to enable change " + - "'txSerializableEnabled' configuration property)"); - IgniteInternalTx tx = cctx.tm().userTx(sysCacheCtx); if (tx != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index c2cc629..baaf4aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -68,6 +68,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFutureCancelledException; +import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; @@ -415,7 +416,8 @@ public class IgniteTxHandler { if (tx.isRollbackOnly()) { try { - tx.rollback(); + if (tx.state() != TransactionState.ROLLED_BACK) + tx.rollback(); } catch (IgniteCheckedException e) { U.error(log, "Failed to rollback transaction: " + tx, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index d431cb6..477816d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -25,7 +25,6 @@ import java.util.Iterator; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.Map; -import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; @@ -64,7 +63,6 @@ import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; -import org.apache.ignite.internal.util.lang.GridFunc; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -78,7 +76,6 @@ import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT; @@ -131,16 +128,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** TX handler. */ private IgniteTxHandler txHandler; - /** All transactions. */ - private final Queue<IgniteInternalTx> committedQ = new ConcurrentLinkedDeque8<>(); - - /** Preparing transactions. */ - private final Queue<IgniteInternalTx> prepareQ = new ConcurrentLinkedDeque8<>(); - - /** Minimum start version. */ - private final ConcurrentNavigableMap<GridCacheVersion, AtomicInt> startVerCnts = - new ConcurrentSkipListMap<>(); - /** Committed local transactions. */ private final GridBoundedConcurrentOrderedMap<GridCacheVersion, Boolean> completedVers = new GridBoundedConcurrentOrderedMap<>(Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); @@ -308,41 +295,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * USE ONLY FOR MEMORY PROFILING DURING TESTS. */ @Override public void printMemoryStats() { - IgniteInternalTx firstTx = committedQ.peek(); - - int committedSize = committedQ.size(); - - Map.Entry<GridCacheVersion, AtomicInt> startVerEntry = startVerCnts.firstEntry(); - - GridCacheVersion minStartVer = null; - long dur = 0; - - if (committedSize > 3000) { - minStartVer = new GridCacheVersion(Integer.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE, Integer.MAX_VALUE, 0); - - IgniteInternalTx stuck = null; - - for (IgniteInternalTx tx : txs()) - if (tx.startVersion().isLess(minStartVer)) { - minStartVer = tx.startVersion(); - dur = U.currentTimeMillis() - tx.startTime(); - - stuck = tx; - } - - X.println("Stuck transaction: " + stuck); - } - X.println(">>> "); X.println(">>> Transaction manager memory stats [grid=" + cctx.gridName() + ']'); X.println(">>> threadMapSize: " + threadMap.size()); - X.println(">>> idMap [size=" + idMap.size() + ", minStartVer=" + minStartVer + ", dur=" + dur + "ms]"); - X.println(">>> committedQueue [size=" + committedSize + - ", firstStartVersion=" + (firstTx == null ? "null" : firstTx.startVersion()) + - ", firstEndVersion=" + (firstTx == null ? "null" : firstTx.endVersion()) + ']'); - X.println(">>> prepareQueueSize: " + prepareQ.size()); - X.println(">>> startVerCntsSize [size=" + startVerCnts.size() + - ", firstVer=" + startVerEntry + ']'); + X.println(">>> idMap [size=" + idMap.size() + ']'); X.println(">>> completedVersSize: " + completedVers.size()); } @@ -361,27 +317,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * @return Committed queue size. - */ - public int commitQueueSize() { - return committedQ.size(); - } - - /** - * @return Prepare queue size. - */ - public int prepareQueueSize() { - return prepareQ.size(); - } - - /** - * @return Start version counts. - */ - public int startVersionCountsSize() { - return startVerCnts.size(); - } - - /** * @return Committed versions size. */ public int completedVersionsSize() { @@ -493,42 +428,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return null; } - if (cctx.txConfig().isTxSerializableEnabled()) { - AtomicInt next = new AtomicInt(1); - - boolean loop = true; - - while (loop) { - AtomicInt prev = startVerCnts.putIfAbsent(tx.startVersion(), next); - - if (prev == null) - break; // Put succeeded - exit. - - // Previous value was 0, which means that it will be deleted - // by another thread in "decrementStartVersionCount(..)" method. - // In that case, we delete here too, so we can safely try again. - for (;;) { - int p = prev.get(); - - assert p >= 0 : p; - - if (p == 0) { - if (startVerCnts.remove(tx.startVersion(), prev)) - if (log.isDebugEnabled()) - log.debug("Removed count from onCreated callback: " + tx); - - break; // Retry outer loop. - } - - if (prev.compareAndSet(p, p + 1)) { - loop = false; // Increment succeeded - exit outer loop. - - break; - } - } - } - } - if (tx.timeout() > 0) { cctx.time().addTimeoutObject(tx); @@ -822,117 +721,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); } - boolean txSerEnabled = cctx.txConfig().isTxSerializableEnabled(); - - // Clean up committed transactions queue. - if (tx.pessimistic() && tx.local()) { - if (tx.enforceSerializable() && txSerEnabled) { - for (Iterator<IgniteInternalTx> it = committedQ.iterator(); it.hasNext();) { - IgniteInternalTx committedTx = it.next(); - - assert committedTx != tx; - - // Clean up. - if (isSafeToForget(committedTx)) - it.remove(); - } - } - - // Nothing else to do in pessimistic mode. - return; - } - - if (txSerEnabled && tx.optimistic() && tx.enforceSerializable()) { - Set<IgniteTxKey> readSet = tx.readSet(); - Set<IgniteTxKey> writeSet = tx.writeSet(); - - GridCacheVersion startTn = tx.startVersion(); - - GridCacheVersion finishTn = cctx.versions().last(); - - // Add future to prepare queue only on first prepare call. - if (tx.markPreparing()) - prepareQ.offer(tx); - - // Check that our read set does not intersect with write set - // of all transactions that completed their write phase - // while our transaction was in read phase. - for (Iterator<IgniteInternalTx> it = committedQ.iterator(); it.hasNext();) { - IgniteInternalTx committedTx = it.next(); - - assert committedTx != tx; - - // Clean up. - if (isSafeToForget(committedTx)) { - it.remove(); - - continue; - } - - GridCacheVersion tn = committedTx.endVersion(); - - // We only care about transactions - // with tn > startTn and tn <= finishTn - if (tn.compareTo(startTn) <= 0 || tn.compareTo(finishTn) > 0) - continue; - - if (tx.serializable()) { - if (GridFunc.intersects(committedTx.writeSet(), readSet)) { - tx.setRollbackOnly(); - - throw new IgniteTxOptimisticCheckedException("Failed to prepare transaction " + - "(committed vs. read-set conflict): " + tx); - } - } - } - - // Check that our read and write sets do not intersect with write - // sets of all active transactions. - for (Iterator<IgniteInternalTx> iter = prepareQ.iterator(); iter.hasNext();) { - IgniteInternalTx prepareTx = iter.next(); - - if (prepareTx == tx) - // Skip yourself. - continue; - - // Optimistically remove completed transactions. - if (prepareTx.done()) { - iter.remove(); - - if (log.isDebugEnabled()) - log.debug("Removed finished transaction from active queue: " + prepareTx); - - continue; - } - - // Check if originating node left. - if (cctx.discovery().node(prepareTx.nodeId()) == null) { - iter.remove(); - - rollbackTx(prepareTx); - - if (log.isDebugEnabled()) - log.debug("Removed and rolled back transaction because sender node left grid: " + - CU.txString(prepareTx)); - - continue; - } - - if (tx.serializable() && !prepareTx.isRollbackOnly()) { - Set<IgniteTxKey> prepareWriteSet = prepareTx.writeSet(); - - if (GridFunc.intersects(prepareWriteSet, readSet, writeSet)) { - // Remove from active set. - iter.remove(); - - tx.setRollbackOnly(); - - throw new IgniteTxOptimisticCheckedException( - "Failed to prepare transaction (read-set/write-set conflict): " + tx); - } - } - } - } + if (tx.pessimistic() && tx.local()) + return; // Nothing else to do in pessimistic mode. // Optimistic. assert tx.optimistic() || !tx.local(); @@ -945,40 +735,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * @param tx Transaction to check. - * @return {@code True} if transaction can be discarded. - */ - private boolean isSafeToForget(IgniteInternalTx tx) { - Map.Entry<GridCacheVersion, AtomicInt> e = startVerCnts.firstEntry(); - - if (e == null) - return true; - - assert e.getValue().get() >= 0; - - return tx.endVersion().compareTo(e.getKey()) <= 0; - } - - /** - * Decrement start version count. - * - * @param tx Cache transaction. - */ - private void decrementStartVersionCount(IgniteInternalTx tx) { - AtomicInt cnt = startVerCnts.get(tx.startVersion()); - - assert cnt != null : "Failed to find start version count for transaction [startVerCnts=" + startVerCnts + - ", tx=" + tx + ']'; - - assert cnt.get() > 0; - - if (cnt.decrementAndGet() == 0) - if (startVerCnts.remove(tx.startVersion(), cnt)) - if (log.isDebugEnabled()) - log.debug("Removed start version for transaction: " + tx); - } - - /** * @param tx Transaction. */ private void removeObsolete(IgniteInternalTx tx) { @@ -1303,25 +1059,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 8. Assign transaction number at the end of transaction. tx.endVersion(cctx.versions().next(tx.topologyVersion())); - // 9. Clean start transaction number for this transaction. - if (cctx.txConfig().isTxSerializableEnabled()) - decrementStartVersionCount(tx); - - // 10. Add to committed queue only if it is possible - // that this transaction can affect other ones. - if (cctx.txConfig().isTxSerializableEnabled() && tx.enforceSerializable() && !isSafeToForget(tx)) - committedQ.add(tx); - - // 11. Remove from per-thread storage. + // 9. Remove from per-thread storage. clearThreadMap(tx); - // 12. Unregister explicit locks. + // 10. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) { for (GridCacheVersion ver : tx.alternateVersions()) idMap.remove(ver); } - // 13. Remove Near-2-DHT mappings. + // 11. Remove Near-2-DHT mappings. if (tx instanceof GridCacheMappedVersion) { GridCacheVersion mapped = ((GridCacheMappedVersion)tx).mappedVersion(); @@ -1329,10 +1076,10 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { mappedVers.remove(mapped); } - // 14. Clear context. + // 12. Clear context. resetContext(); - // 15. Update metrics. + // 14. Update metrics. if (!tx.dht() && tx.local()) { if (!tx.system()) cctx.txMetrics().onTxCommit(); @@ -1388,26 +1135,22 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 5. Remove obsolete entries. removeObsolete(tx); - // 6. Clean start transaction number for this transaction. - if (cctx.txConfig().isTxSerializableEnabled()) - decrementStartVersionCount(tx); - - // 7. Remove from per-thread storage. + // 6. Remove from per-thread storage. clearThreadMap(tx); - // 8. Unregister explicit locks. + // 7. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) for (GridCacheVersion ver : tx.alternateVersions()) idMap.remove(ver); - // 9. Remove Near-2-DHT mappings. + // 8. Remove Near-2-DHT mappings. if (tx instanceof GridCacheMappedVersion) mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); - // 10. Clear context. + // 9. Clear context. resetContext(); - // 11. Update metrics. + // 10. Update metrics. if (!tx.dht() && tx.local()) { if (!tx.system()) cctx.txMetrics().onTxRollback(); @@ -1452,23 +1195,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // 3. Notify evictions. notifyEvitions(tx); - // 4. Clean start transaction number for this transaction. - if (cctx.txConfig().isTxSerializableEnabled()) - decrementStartVersionCount(tx); - - // 5. Remove from per-thread storage. + // 4. Remove from per-thread storage. clearThreadMap(tx); - // 6. Unregister explicit locks. + // 5. Unregister explicit locks. if (!tx.alternateVersions().isEmpty()) for (GridCacheVersion ver : tx.alternateVersions()) idMap.remove(ver); - // 7. Remove Near-2-DHT mappings. + // 6. Remove Near-2-DHT mappings. if (tx instanceof GridCacheMappedVersion) mappedVers.remove(((GridCacheMappedVersion)tx).mappedVersion()); - // 8. Clear context. + // 7. Clear context. resetContext(); if (log.isDebugEnabled()) @@ -1630,7 +1369,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { long timeout; - if (tx.isolation() != TransactionIsolation.SERIALIZABLE_TRY_LOCK) { + boolean ser = tx.optimistic() && tx.serializable(); + + if (!ser) { long remainingTime = U.currentTimeMillis() - (tx.startTime() + tx.timeout()); // For serializable transactions, failure to acquire lock means http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java index 36f1c36..7a4be0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java @@ -53,9 +53,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { /** Last version. */ private volatile GridCacheVersion last; - /** Serializable transaction flag. */ - private boolean txSerEnabled; - /** Data center ID. */ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") private byte dataCenterId; @@ -79,8 +76,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { /** {@inheritDoc} */ @Override public void start0() throws IgniteCheckedException { - txSerEnabled = cctx.gridConfig().getTransactionConfiguration().isTxSerializableEnabled(); - last = new GridCacheVersion(0, 0, order.get(), 0, dataCenterId); cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_METRICS_UPDATED); @@ -235,36 +230,18 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { int locNodeOrder = (int)cctx.localNode().order(); - if (txSerEnabled) { - synchronized (this) { - long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet(); - - GridCacheVersion next = new GridCacheVersion( - (int)topVer, - globalTime, - ord, - locNodeOrder, - dataCenterId); - - last = next; - - return next; - } - } - else { - long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet(); + long ord = forLoad ? loadOrder.incrementAndGet() : order.incrementAndGet(); - GridCacheVersion next = new GridCacheVersion( - (int)topVer, - globalTime, - ord, - locNodeOrder, - dataCenterId); + GridCacheVersion next = new GridCacheVersion( + (int)topVer, + globalTime, + ord, + locNodeOrder, + dataCenterId); - last = next; + last = next; - return next; - } + return next; } /** @@ -273,12 +250,6 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter { * @return Last generated version. */ public GridCacheVersion last() { - if (txSerEnabled) { - synchronized (this) { - return last; - } - } - else - return last; + return last; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java index a6e960d..6c4e894 100644 --- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java +++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java @@ -54,7 +54,7 @@ import org.apache.ignite.lang.IgniteUuid; * Read access with this level happens the same way as with {@link TransactionIsolation#REPEATABLE_READ} level. * However, in {@link TransactionConcurrency#OPTIMISTIC} mode, if some transactions cannot be serially isolated * from each other, then one winner will be picked and the other transactions in conflict will result in - * {@link org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException} being thrown. + * {@link TransactionOptimisticException} being thrown. * </li> * </ul> * <p> http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java index c43396c..c3be3c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java +++ b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionIsolation.java @@ -31,10 +31,7 @@ public enum TransactionIsolation { REPEATABLE_READ, /** Serializable isolation level. */ - SERIALIZABLE, - - /** TODO IGNITE-1607 */ - SERIALIZABLE_TRY_LOCK; + SERIALIZABLE; /** Enum values. */ private static final TransactionIsolation[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeadlockFreeTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeadlockFreeTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeadlockFreeTxTest.java deleted file mode 100644 index f156978..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheDeadlockFreeTxTest.java +++ /dev/null @@ -1,484 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import javax.cache.Cache; -import javax.cache.configuration.Factory; -import javax.cache.integration.CacheLoaderException; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteTransactions; -import org.apache.ignite.cache.CacheMode; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cache.store.CacheStore; -import org.apache.ignite.cache.store.CacheStoreAdapter; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.NearCacheConfiguration; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; -import org.apache.ignite.transactions.Transaction; -import org.apache.ignite.transactions.TransactionOptimisticException; - -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMode.PARTITIONED; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; -import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; -import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE_TRY_LOCK; - -/** - * - */ -public class CacheDeadlockFreeTxTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final int SRVS = 3; - - /** */ - private static final int CLIENTS = 3; - - /** */ - private boolean client; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); - - cfg.setClientMode(client); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - startGridsMultiThreaded(SRVS); - - client = true; - - startGridsMultiThreaded(SRVS, CLIENTS); - - client = false; - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - stopAllGrids(); - } - - /** - * @throws Exception If failed. - */ - public void testTxRollbackIfLocked1() throws Exception { - Ignite ignite0 = ignite(0); - - final IgniteTransactions txs = ignite0.transactions(); - - final IgniteCache<Integer, Integer> cache = - ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true)); - - final Integer key = nearKey(cache); - - final CountDownLatch latch1 = new CountDownLatch(1); - final CountDownLatch latch2 = new CountDownLatch(1); - - IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - cache.put(key, 1); - - log.info("Locked key: " + key); - - latch1.countDown(); - - assertTrue(latch2.await(10, SECONDS)); - - tx.commit(); - } - - return null; - } - }, "lock-thread"); - - assertTrue(latch1.await(10, SECONDS)); - - try { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) { - cache.put(key, 2); - - log.info("Commit"); - - tx.commit(); - } - - fail(); - } - catch (TransactionOptimisticException e) { - log.info("Expected exception: " + e); - } - - latch2.countDown(); - - fut.get(); - - assertEquals(1, (Object)cache.get(key)); - - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) { - cache.put(key, 2); - - tx.commit(); - } - - assertEquals(2, (Object)cache.get(key)); - } - - /** - * @throws Exception If failed. - */ - public void testTxRollbackIfLocked2() throws Exception { - rollbackIfLockedPartialLock(false); - } - - /** - * @throws Exception If failed. - */ - public void testTxRollbackIfLocked3() throws Exception { - rollbackIfLockedPartialLock(true); - } - - /** - * @param locKey If {@code true} gets lock for local key. - * @throws Exception If failed. - */ - public void rollbackIfLockedPartialLock(boolean locKey) throws Exception { - Ignite ignite0 = ignite(0); - - final IgniteTransactions txs = ignite0.transactions(); - - final IgniteCache<Integer, Integer> cache = - ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true)); - - final Integer key1 = primaryKey(ignite(1).cache(cache.getName())); - final Integer key2 = locKey ? primaryKey(cache) : primaryKey(ignite(2).cache(cache.getName())); - - final CountDownLatch latch1 = new CountDownLatch(1); - final CountDownLatch latch2 = new CountDownLatch(1); - - IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { - cache.put(key1, 1); - - log.info("Locked key: " + key1); - - latch1.countDown(); - - assertTrue(latch2.await(10, SECONDS)); - - log.info("Commit1"); - - tx.commit(); - } - - return null; - } - }, "lock-thread"); - - assertTrue(latch1.await(10, SECONDS)); - - try { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) { - cache.put(key1, 2); - cache.put(key2, 2); - - log.info("Commit2"); - - tx.commit(); - } - - fail(); - } - catch (TransactionOptimisticException e) { - log.info("Expected exception: " + e); - } - - latch2.countDown(); - - fut.get(); - - assertEquals(1, (Object) cache.get(key1)); - assertNull(cache.get(key2)); - - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) { - cache.put(key1, 2); - cache.put(key2, 2); - - log.info("Commit3"); - - tx.commit(); - } - - assertEquals(2, (Object) cache.get(key2)); - assertEquals(2, (Object) cache.get(key2)); - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentUpdateNoDeadlock() throws Exception { - concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false); - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentUpdateNoDeadlockNodeRestart() throws Exception { - concurrentUpdateNoDeadlock(Collections.singletonList(ignite(1)), 10, true); - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentUpdateNoDeadlockClients() throws Exception { - concurrentUpdateNoDeadlock(clients(), 20, false); - } - - /** - * @throws Exception If failed. - */ - public void testConcurrentUpdateNoDeadlockClientsNodeRestart() throws Exception { - concurrentUpdateNoDeadlock(clients(), 20, true); - } - - /** - * @return Client nodes. - */ - private List<Ignite> clients() { - List<Ignite> clients = new ArrayList<>(); - - for (int i = 0; i < CLIENTS; i++) { - Ignite ignite = ignite(SRVS + i); - - assertTrue(ignite.configuration().isClientMode()); - - clients.add(ignite); - } - - return clients; - } - - /** - * @param updateNodes Nodes executing updates. - * @param threads Number of threads executing updates. - * @param restart If {@code true} restarts one node. - * @throws Exception If failed. - */ - private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes, - int threads, - final boolean restart) throws Exception { - assert updateNodes.size() > 0; - - final Ignite ignite0 = ignite(0); - - final String cacheName = - ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName(); - - final int KEYS = 100; - - final AtomicBoolean finished = new AtomicBoolean(); - - IgniteInternalFuture<Object> fut = null; - - try { - if (restart) { - fut = GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (!finished.get()) { - stopGrid(0); - - U.sleep(300); - - Ignite ignite = startGrid(0); - - assertFalse(ignite.configuration().isClientMode()); - } - - return null; - } - }); - } - - for (int i = 0; i < 10; i++) { - log.info("Iteration: " + i); - - final long stopTime = U.currentTimeMillis() + 5_000; - - final AtomicInteger idx = new AtomicInteger(); - - IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { - int nodeIdx = idx.getAndIncrement() % updateNodes.size(); - - Ignite node = updateNodes.get(nodeIdx); - - log.info("Tx thread: " + node.name()); - - final IgniteTransactions txs = node.transactions(); - - final IgniteCache<Integer, Integer> cache = node.cache(cacheName); - - assertNotNull(cache); - - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - while (U.currentTimeMillis() < stopTime) { - final Map<Integer, Integer> keys = new LinkedHashMap<>(); - - for (int i = 0; i < KEYS / 2; i++) - keys.put(rnd.nextInt(KEYS), rnd.nextInt()); - - try { - if (restart) { - doInTransaction(node, OPTIMISTIC, REPEATABLE_READ, new Callable<Void>() { - @Override public Void call() throws Exception { - cache.putAll(keys); - - return null; - } - }); - } else { - try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE_TRY_LOCK)) { - cache.putAll(keys); - - tx.commit(); - } - } - } catch (TransactionOptimisticException ignore) { - // No-op. - } catch (Throwable e) { - log.error("Unexpected error: " + e, e); - - throw e; - } - } - - return null; - } - }, threads, "tx-thread"); - - updateFut.get(60, SECONDS); - - IgniteCache<Integer, Integer> cache = ignite(1).cache(cacheName); - - for (int key = 0; key < KEYS; key++) { - Integer val = cache.get(key); - - for (int node = 1; node < SRVS + CLIENTS; node++) - assertEquals(val, ignite(node).cache(cache.getName()).get(key)); - } - } - - finished.set(true); - - if (fut != null) - fut.get(); - } - finally { - finished.set(true); - } - } - - /** - * @param cacheMode Cache mode. - * @param syncMode Write synchronization mode. - * @param backups Number of backups. - * @param storeEnabled If {@code true} adds cache store. - * @param nearCache If {@code true} near cache is enabled. - * @return Cache configuration. - */ - private CacheConfiguration<Integer, Integer> cacheConfiguration( - CacheMode cacheMode, - CacheWriteSynchronizationMode syncMode, - int backups, - boolean storeEnabled, - boolean nearCache) { - CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); - - ccfg.setCacheMode(cacheMode); - ccfg.setAtomicityMode(TRANSACTIONAL); - ccfg.setBackups(backups); - ccfg.setWriteSynchronizationMode(syncMode); - - if (storeEnabled) { - ccfg.setCacheStoreFactory(new TestStoreFactory()); - ccfg.setWriteThrough(true); - } - - if (nearCache) - ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>()); - - return ccfg; - } - - /** - * - */ - private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>> { - /** {@inheritDoc} */ - @Override public CacheStore<Integer, Integer> create() { - return new CacheStoreAdapter<Integer, Integer>() { - @Override public Integer load(Integer key) throws CacheLoaderException { - return null; - } - - @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) { - // No-op. - } - - @Override public void delete(Object key) { - // No-op. - } - }; - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/820c0ecd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTxTest.java new file mode 100644 index 0000000..217e362 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTxTest.java @@ -0,0 +1,687 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.integration.CacheLoaderException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionOptimisticException; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + +/** + * + */ +public class CacheSerializableTxTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRVS = 4; + + /** */ + private static final int CLIENTS = 3; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(SRVS); + + client = true; + + startGridsMultiThreaded(SRVS, CLIENTS); + + client = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackRead1() throws Exception { + txRollbackRead(true); + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackRead2() throws Exception { + txRollbackRead(false); + } + + /** + * @param noVal If {@code true} there is no cache value when read in tx. + * @throws Exception If failed. + */ + private void txRollbackRead(boolean noVal) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + List<Integer> keys = new ArrayList<>(); + + keys.add(nearKey(cache)); + + for (Integer key : keys) { + log.info("Test key: " + key); + + Integer expVal = null; + + if (!noVal) { + expVal = -1; + + cache.put(key, expVal); + } + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertEquals(expVal, val); + + updateKey(cache, key, 1); + + log.info("Commit"); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + assertEquals(1, (Object) cache.get(key)); + } + } + finally { + ignite0.destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackReadWrite() throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + final IgniteCache<Integer, Integer> cache = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); + + final Integer key = nearKey(cache); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + Integer val = cache.get(key); + + assertNull(val); + + updateKey(cache, key, 1); + + cache.put(key, 2); + + log.info("Commit"); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + assertEquals(1, (Object)cache.get(key)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 2); + + tx.commit(); + } + + assertEquals(2, (Object) cache.get(key)); + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackIfLocked1() throws Exception { + Ignite ignite0 = ignite(0); + + IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + final Integer key = nearKey(cache); + + CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = lockKey(latch, cache, key); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 2); + + log.info("Commit"); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch.countDown(); + + fut.get(); + + assertEquals(1, (Object)cache.get(key)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key, 2); + + tx.commit(); + } + + assertEquals(2, (Object)cache.get(key)); + } + finally { + ignite0.destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackIfLocked2() throws Exception { + rollbackIfLockedPartialLock(false); + } + + /** + * @throws Exception If failed. + */ + public void testTxRollbackIfLocked3() throws Exception { + rollbackIfLockedPartialLock(true); + } + + /** + * @param locKey If {@code true} gets lock for local key. + * @throws Exception If failed. + */ + public void rollbackIfLockedPartialLock(boolean locKey) throws Exception { + Ignite ignite0 = ignite(0); + + final IgniteTransactions txs = ignite0.transactions(); + + for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { + logCacheInfo(ccfg); + + try { + IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg); + + final Integer key1 = primaryKey(ignite(1).cache(cache.getName())); + final Integer key2 = locKey ? primaryKey(cache) : primaryKey(ignite(2).cache(cache.getName())); + + CountDownLatch latch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = lockKey(latch, cache, key1); + + try { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key1, 2); + cache.put(key2, 2); + + log.info("Commit2"); + + tx.commit(); + } + + fail(); + } + catch (TransactionOptimisticException e) { + log.info("Expected exception: " + e); + } + + latch.countDown(); + + fut.get(); + + assertEquals(1, (Object) cache.get(key1)); + assertNull(cache.get(key2)); + + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.put(key1, 2); + cache.put(key2, 2); + + log.info("Commit3"); + + tx.commit(); + } + + assertEquals(2, (Object) cache.get(key2)); + assertEquals(2, (Object) cache.get(key2)); + } + finally { + ignite0.destroyCache(ccfg.getName()); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlock() throws Exception { + concurrentUpdateNoDeadlock(Collections.singletonList(ignite(0)), 10, false); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlockNodeRestart() throws Exception { + concurrentUpdateNoDeadlock(Collections.singletonList(ignite(1)), 10, true); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlockClients() throws Exception { + concurrentUpdateNoDeadlock(clients(), 20, false); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentUpdateNoDeadlockClientsNodeRestart() throws Exception { + concurrentUpdateNoDeadlock(clients(), 20, true); + } + + /** + * @return Client nodes. + */ + private List<Ignite> clients() { + List<Ignite> clients = new ArrayList<>(); + + for (int i = 0; i < CLIENTS; i++) { + Ignite ignite = ignite(SRVS + i); + + assertTrue(ignite.configuration().isClientMode()); + + clients.add(ignite); + } + + return clients; + } + + /** + * @param updateNodes Nodes executing updates. + * @param threads Number of threads executing updates. + * @param restart If {@code true} restarts one node. + * @throws Exception If failed. + */ + private void concurrentUpdateNoDeadlock(final List<Ignite> updateNodes, + int threads, + final boolean restart) throws Exception { + assert updateNodes.size() > 0; + + final Ignite ignite0 = ignite(0); + + final String cacheName = + ignite0.createCache(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)).getName(); + + final int KEYS = 100; + + final AtomicBoolean finished = new AtomicBoolean(); + + IgniteInternalFuture<Object> fut = null; + + try { + if (restart) { + fut = GridTestUtils.runAsync(new Callable<Object>() { + @Override public Object call() throws Exception { + while (!finished.get()) { + stopGrid(0); + + U.sleep(300); + + Ignite ignite = startGrid(0); + + assertFalse(ignite.configuration().isClientMode()); + } + + return null; + } + }); + } + + for (int i = 0; i < 10; i++) { + log.info("Iteration: " + i); + + final long stopTime = U.currentTimeMillis() + 5_000; + + final AtomicInteger idx = new AtomicInteger(); + + IgniteInternalFuture<?> updateFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + int nodeIdx = idx.getAndIncrement() % updateNodes.size(); + + Ignite node = updateNodes.get(nodeIdx); + + log.info("Tx thread: " + node.name()); + + final IgniteTransactions txs = node.transactions(); + + final IgniteCache<Integer, Integer> cache = node.cache(cacheName); + + assertNotNull(cache); + + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + while (U.currentTimeMillis() < stopTime) { + final Map<Integer, Integer> keys = new LinkedHashMap<>(); + + for (int i = 0; i < KEYS / 2; i++) + keys.put(rnd.nextInt(KEYS), rnd.nextInt()); + + try { + if (restart) { + doInTransaction(node, OPTIMISTIC, REPEATABLE_READ, new Callable<Void>() { + @Override public Void call() throws Exception { + cache.putAll(keys); + + return null; + } + }); + } + else { + try (Transaction tx = txs.txStart(OPTIMISTIC, SERIALIZABLE)) { + cache.putAll(keys); + + tx.commit(); + } + } + } + catch (TransactionOptimisticException ignore) { + // No-op. + } + catch (Throwable e) { + log.error("Unexpected error: " + e, e); + + throw e; + } + } + + return null; + } + }, threads, "tx-thread"); + + updateFut.get(60, SECONDS); + + IgniteCache<Integer, Integer> cache = ignite(1).cache(cacheName); + + for (int key = 0; key < KEYS; key++) { + Integer val = cache.get(key); + + for (int node = 1; node < SRVS + CLIENTS; node++) + assertEquals(val, ignite(node).cache(cache.getName()).get(key)); + } + } + + finished.set(true); + + if (fut != null) + fut.get(); + } + finally { + finished.set(true); + } + } + + /** + * @return Cache configurations. + */ + private List<CacheConfiguration<Integer, Integer>> cacheConfigurations() { + List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>(); + + // No store, no near. + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, false)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, false)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, false)); + + // Store, no near. + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, false)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, false)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, false)); + + // No store, near. + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, false, true)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, false, true)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, false, true)); + + // Store, near. + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 0, true, true)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 1, true, true)); + ccfgs.add(cacheConfiguration(PARTITIONED, FULL_SYNC, 2, true, true)); + + return ccfgs; + } + + /** + * @param ccfg Cache configuration. + */ + private void logCacheInfo(CacheConfiguration<?, ?> ccfg) { + log.info("Test cache [mode=" + ccfg.getCacheMode() + + ", sync=" + ccfg.getWriteSynchronizationMode() + + ", backups=" + ccfg.getBackups() + + ", near=" + (ccfg.getNearConfiguration() != null) + + ", store=" + ccfg.isWriteThrough() + ']'); + } + + /** + * @param cache Cache. + * @param key Key. + * @param val Value. + * @throws Exception If failed. + */ + private void updateKey( + final IgniteCache<Integer, Integer> cache, + final Integer key, + final Integer val) throws Exception { + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, val); + + tx.commit(); + } + + return null; + } + }, "update-thread"); + + fut.get(); + } + + /** + * @param releaseLatch Release lock latch. + * @param cache Cache. + * @param key Key. + * @return Future. + * @throws Exception If failed. + */ + private IgniteInternalFuture<?> lockKey( + final CountDownLatch releaseLatch, + final IgniteCache<Integer, Integer> cache, + final Integer key) throws Exception { + final CountDownLatch lockLatch = new CountDownLatch(1); + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + IgniteTransactions txs = cache.unwrap(Ignite.class).transactions(); + + try (Transaction tx = txs.txStart(PESSIMISTIC, REPEATABLE_READ)) { + cache.put(key, 1); + + log.info("Locked key: " + key); + + lockLatch.countDown(); + + assertTrue(releaseLatch.await(100000, SECONDS)); + + log.info("Commit tx: " + key); + + tx.commit(); + } + + return null; + } + }, "lock-thread"); + + assertTrue(lockLatch.await(10, SECONDS)); + + return fut; + } + + /** + * @param cacheMode Cache mode. + * @param syncMode Write synchronization mode. + * @param backups Number of backups. + * @param storeEnabled If {@code true} adds cache store. + * @param nearCache If {@code true} near cache is enabled. + * @return Cache configuration. + */ + private CacheConfiguration<Integer, Integer> cacheConfiguration( + CacheMode cacheMode, + CacheWriteSynchronizationMode syncMode, + int backups, + boolean storeEnabled, + boolean nearCache) { + CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setBackups(backups); + ccfg.setWriteSynchronizationMode(syncMode); + + if (storeEnabled) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setWriteThrough(true); + } + + if (nearCache) + ccfg.setNearConfiguration(new NearCacheConfiguration<Integer, Integer>()); + + return ccfg; + } + + /** + * + */ + private static class TestStoreFactory implements Factory<CacheStore<Integer, Integer>> { + /** {@inheritDoc} */ + @Override public CacheStore<Integer, Integer> create() { + return new CacheStoreAdapter<Integer, Integer>() { + @Override public Integer load(Integer key) throws CacheLoaderException { + return null; + } + + @Override public void write(Cache.Entry<? extends Integer, ? extends Integer> entry) { + // No-op. + } + + @Override public void delete(Object key) { + // No-op. + } + }; + } + } +}
