http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 775b61c..7e04292 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -558,7 +558,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig /** {@inheritDoc} */ @SuppressWarnings({"CatchGenericClass"}) - @Override public final void userCommit() throws IgniteCheckedException { + @Override public void userCommit() throws IgniteCheckedException { TransactionState state = state(); if (state != COMMITTING) { @@ -590,7 +590,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig WALPointer ptr = null; - Exception err = null; + IgniteCheckedException err = null; cctx.database().checkpointReadLock(); @@ -609,176 +609,175 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig UUID nodeId = txEntry.nodeId() == null ? this.nodeId : txEntry.nodeId(); - try { - while (true) { - try { - GridCacheEntryEx cached = txEntry.cached(); + while (true) { + try { + GridCacheEntryEx cached = txEntry.cached(); - // Must try to evict near entries before committing from - // transaction manager to make sure locks are held. - if (!evictNearEntry(txEntry, false)) { - if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) { - cached.markObsolete(xidVer); + // Must try to evict near entries before committing from + // transaction manager to make sure locks are held. + if (!evictNearEntry(txEntry, false)) { + if (cacheCtx.isNear() && cacheCtx.dr().receiveEnabled()) { + cached.markObsolete(xidVer); - break; - } + break; + } - if (cached.detached()) - break; + if (cached.detached()) + break; - boolean updateNearCache = updateNearCache(cacheCtx, txEntry.key(), topVer); + boolean updateNearCache = updateNearCache(cacheCtx, txEntry.key(), topVer); - boolean metrics = true; + boolean metrics = true; - if (!updateNearCache && cacheCtx.isNear() && txEntry.locallyMapped()) - metrics = false; + if (!updateNearCache && cacheCtx.isNear() && txEntry.locallyMapped()) + metrics = false; - boolean evt = !isNearLocallyMapped(txEntry, false); + boolean evt = !isNearLocallyMapped(txEntry, false); - if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters())) - txEntry.cached().unswap(false); + if (!F.isEmpty(txEntry.entryProcessors()) || !F.isEmpty(txEntry.filters())) + txEntry.cached().unswap(false); - IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry, - true, null); + IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(txEntry, + true, null); - GridCacheVersion dhtVer = null; + GridCacheVersion dhtVer = null; - // For near local transactions we must record DHT version - // in order to keep near entries on backup nodes until - // backup remote transaction completes. - if (cacheCtx.isNear()) { - if (txEntry.op() == CREATE || txEntry.op() == UPDATE || - txEntry.op() == DELETE || txEntry.op() == TRANSFORM) - dhtVer = txEntry.dhtVersion(); + // For near local transactions we must record DHT version + // in order to keep near entries on backup nodes until + // backup remote transaction completes. + if (cacheCtx.isNear()) { + if (txEntry.op() == CREATE || txEntry.op() == UPDATE || + txEntry.op() == DELETE || txEntry.op() == TRANSFORM) + dhtVer = txEntry.dhtVersion(); - if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) && - txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) { - ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry); + if ((txEntry.op() == CREATE || txEntry.op() == UPDATE) && + txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) { + ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry); - if (expiry != null) { - txEntry.cached().unswap(false); + if (expiry != null) { + txEntry.cached().unswap(false); - Duration duration = cached.hasValue() ? - expiry.getExpiryForUpdate() : expiry.getExpiryForCreation(); + Duration duration = cached.hasValue() ? + expiry.getExpiryForUpdate() : expiry.getExpiryForCreation(); - txEntry.ttl(CU.toTtl(duration)); - } + txEntry.ttl(CU.toTtl(duration)); } } + } - GridCacheOperation op = res.get1(); - CacheObject val = res.get2(); + GridCacheOperation op = res.get1(); + CacheObject val = res.get2(); - // Deal with conflicts. - GridCacheVersion explicitVer = txEntry.conflictVersion() != null ? - txEntry.conflictVersion() : writeVersion(); + // Deal with conflicts. + GridCacheVersion explicitVer = txEntry.conflictVersion() != null ? + txEntry.conflictVersion() : writeVersion(); - if ((op == CREATE || op == UPDATE) && - txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) { - ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry); + if ((op == CREATE || op == UPDATE) && + txEntry.conflictExpireTime() == CU.EXPIRE_TIME_CALCULATE) { + ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry); - if (expiry != null) { - Duration duration = cached.hasValue() ? - expiry.getExpiryForUpdate() : expiry.getExpiryForCreation(); + if (expiry != null) { + Duration duration = cached.hasValue() ? + expiry.getExpiryForUpdate() : expiry.getExpiryForCreation(); - long ttl = CU.toTtl(duration); + long ttl = CU.toTtl(duration); - txEntry.ttl(ttl); + txEntry.ttl(ttl); - if (ttl == CU.TTL_ZERO) - op = DELETE; - } + if (ttl == CU.TTL_ZERO) + op = DELETE; } + } - boolean conflictNeedResolve = cacheCtx.conflictNeedResolve(); - - GridCacheVersionConflictContext<?, ?> conflictCtx = null; - - if (conflictNeedResolve) { - IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext> conflictRes = - conflictResolve(op, txEntry, val, explicitVer, cached); + boolean conflictNeedResolve = cacheCtx.conflictNeedResolve(); - assert conflictRes != null; + GridCacheVersionConflictContext<?, ?> conflictCtx = null; - conflictCtx = conflictRes.get2(); + if (conflictNeedResolve) { + IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext> conflictRes = + conflictResolve(op, txEntry, val, explicitVer, cached); - if (conflictCtx.isUseOld()) - op = NOOP; - else if (conflictCtx.isUseNew()) { - txEntry.ttl(conflictCtx.ttl()); - txEntry.conflictExpireTime(conflictCtx.expireTime()); - } - else { - assert conflictCtx.isMerge(); + assert conflictRes != null; - op = conflictRes.get1(); - val = txEntry.context().toCacheObject(conflictCtx.mergeValue()); - explicitVer = writeVersion(); + conflictCtx = conflictRes.get2(); - txEntry.ttl(conflictCtx.ttl()); - txEntry.conflictExpireTime(conflictCtx.expireTime()); - } + if (conflictCtx.isUseOld()) + op = NOOP; + else if (conflictCtx.isUseNew()) { + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); } - else - // Nullify explicit version so that innerSet/innerRemove will work as usual. - explicitVer = null; + else { + assert conflictCtx.isMerge(); - if (sndTransformedVals || conflictNeedResolve) { - assert sndTransformedVals && cacheCtx.isReplicated() || conflictNeedResolve; + op = conflictRes.get1(); + val = txEntry.context().toCacheObject(conflictCtx.mergeValue()); + explicitVer = writeVersion(); - txEntry.value(val, true, false); - txEntry.op(op); - txEntry.entryProcessors(null); - txEntry.conflictVersion(explicitVer); + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); } + } + else + // Nullify explicit version so that innerSet/innerRemove will work as usual. + explicitVer = null; - if (dhtVer == null) - dhtVer = explicitVer != null ? explicitVer : writeVersion(); + if (sndTransformedVals || conflictNeedResolve) { + assert sndTransformedVals && cacheCtx.isReplicated() || conflictNeedResolve; - if (op == CREATE || op == UPDATE) { - assert val != null : txEntry; + txEntry.value(val, true, false); + txEntry.op(op); + txEntry.entryProcessors(null); + txEntry.conflictVersion(explicitVer); + } - GridCacheUpdateTxResult updRes = cached.innerSet( - this, - eventNodeId(), - txEntry.nodeId(), - val, - false, - false, - txEntry.ttl(), - evt, - metrics, - txEntry.keepBinary(), - txEntry.hasOldValue(), - txEntry.oldValue(), - topVer, - null, - cached.detached() ? DR_NONE : drType, - txEntry.conflictExpireTime(), - cached.isNear() ? null : explicitVer, - CU.subjectId(this, cctx), - resolveTaskName(), - dhtVer, - null, - mvccSnapshot()); - - if (updRes.success()) { - txEntry.updateCounter(updRes.updateCounter()); - - GridLongList waitTxs = updRes.mvccWaitTransactions(); - - updateWaitTxs(waitTxs); - } + if (dhtVer == null) + dhtVer = explicitVer != null ? explicitVer : writeVersion(); + + if (op == CREATE || op == UPDATE) { + assert val != null : txEntry; + + GridCacheUpdateTxResult updRes = cached.innerSet( + this, + eventNodeId(), + txEntry.nodeId(), + val, + false, + false, + txEntry.ttl(), + evt, + metrics, + txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), + topVer, + null, + cached.detached() ? DR_NONE : drType, + txEntry.conflictExpireTime(), + cached.isNear() ? null : explicitVer, + CU.subjectId(this, cctx), + resolveTaskName(), + dhtVer, + null, + mvccSnapshot()); + + if (updRes.success()) { + txEntry.updateCounter(updRes.updateCounter()); + + GridLongList waitTxs = updRes.mvccWaitTransactions(); + + updateWaitTxs(waitTxs); + } - if (updRes.loggedPointer() != null) - ptr = updRes.loggedPointer(); + if (updRes.loggedPointer() != null) + ptr = updRes.loggedPointer(); - if (updRes.success() && updateNearCache) { - final CacheObject val0 = val; - final boolean metrics0 = metrics; - final GridCacheVersion dhtVer0 = dhtVer; + if (updRes.success() && updateNearCache) { + final CacheObject val0 = val; + final boolean metrics0 = metrics; + final GridCacheVersion dhtVer0 = dhtVer; - updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerSet( + updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerSet( null, eventNodeId(), nodeId, @@ -801,46 +800,46 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig dhtVer0, null, mvccSnapshot()) - ); - } + ); + } + } + else if (op == DELETE) { + GridCacheUpdateTxResult updRes = cached.innerRemove( + this, + eventNodeId(), + txEntry.nodeId(), + false, + evt, + metrics, + txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), + topVer, + null, + cached.detached() ? DR_NONE : drType, + cached.isNear() ? null : explicitVer, + CU.subjectId(this, cctx), + resolveTaskName(), + dhtVer, + null, + mvccSnapshot()); + + if (updRes.success()) { + txEntry.updateCounter(updRes.updateCounter()); + + GridLongList waitTxs = updRes.mvccWaitTransactions(); + + updateWaitTxs(waitTxs); } - else if (op == DELETE) { - GridCacheUpdateTxResult updRes = cached.innerRemove( - this, - eventNodeId(), - txEntry.nodeId(), - false, - evt, - metrics, - txEntry.keepBinary(), - txEntry.hasOldValue(), - txEntry.oldValue(), - topVer, - null, - cached.detached() ? DR_NONE : drType, - cached.isNear() ? null : explicitVer, - CU.subjectId(this, cctx), - resolveTaskName(), - dhtVer, - null, - mvccSnapshot()); - - if (updRes.success()) { - txEntry.updateCounter(updRes.updateCounter()); - - GridLongList waitTxs = updRes.mvccWaitTransactions(); - - updateWaitTxs(waitTxs); - } - if (updRes.loggedPointer() != null) - ptr = updRes.loggedPointer(); + if (updRes.loggedPointer() != null) + ptr = updRes.loggedPointer(); - if (updRes.success() && updateNearCache) { - final boolean metrics0 = metrics; - final GridCacheVersion dhtVer0 = dhtVer; + if (updRes.success() && updateNearCache) { + final boolean metrics0 = metrics; + final GridCacheVersion dhtVer0 = dhtVer; - updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerRemove( + updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerRemove( null, eventNodeId(), nodeId, @@ -859,125 +858,78 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig dhtVer0, null, mvccSnapshot()) - ); - } + ); } - else if (op == RELOAD) { - cached.innerReload(); + } + else if (op == RELOAD) { + cached.innerReload(); - if (updateNearCache) - updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerReload()); + if (updateNearCache) + updateNearEntrySafely(cacheCtx, txEntry.key(), entry -> entry.innerReload()); + } + else if (op == READ) { + CacheGroupContext grp = cacheCtx.group(); + + if (grp.persistenceEnabled() && grp.walEnabled() && + cctx.snapshot().needTxReadLogging()) { + ptr = cctx.wal().log(new DataRecord(new DataEntry( + cacheCtx.cacheId(), + txEntry.key(), + val, + op, + nearXidVersion(), + writeVersion(), + 0, + txEntry.key().partition(), + txEntry.updateCounter()))); } - else if (op == READ) { - CacheGroupContext grp = cacheCtx.group(); - - if (grp.persistenceEnabled() && grp.walEnabled() && - cctx.snapshot().needTxReadLogging()) { - ptr = cctx.wal().log(new DataRecord(new DataEntry( - cacheCtx.cacheId(), - txEntry.key(), - val, - op, - nearXidVersion(), - writeVersion(), - 0, - txEntry.key().partition(), - txEntry.updateCounter()))); - } - ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry); + ExpiryPolicy expiry = cacheCtx.expiryForTxEntry(txEntry); - if (expiry != null) { - Duration duration = expiry.getExpiryForAccess(); + if (expiry != null) { + Duration duration = expiry.getExpiryForAccess(); - if (duration != null) - cached.updateTtl(null, CU.toTtl(duration)); - } - - if (log.isDebugEnabled()) - log.debug("Ignoring READ entry when committing: " + txEntry); + if (duration != null) + cached.updateTtl(null, CU.toTtl(duration)); } - else { - assert ownsLock(txEntry.cached()) : - "Transaction does not own lock for group lock entry during commit [tx=" + - this + ", txEntry=" + txEntry + ']'; - if (conflictCtx == null || !conflictCtx.isUseOld()) { - if (txEntry.ttl() != CU.TTL_NOT_CHANGED) - cached.updateTtl(null, txEntry.ttl()); - } - - if (log.isDebugEnabled()) - log.debug("Ignoring NOOP entry when committing: " + txEntry); - } + if (log.isDebugEnabled()) + log.debug("Ignoring READ entry when committing: " + txEntry); } + else { + assert ownsLock(txEntry.cached()) : + "Transaction does not own lock for group lock entry during commit [tx=" + + this + ", txEntry=" + txEntry + ']'; + + if (conflictCtx == null || !conflictCtx.isUseOld()) { + if (txEntry.ttl() != CU.TTL_NOT_CHANGED) + cached.updateTtl(null, txEntry.ttl()); + } - // Check commit locks after set, to make sure that - // we are not changing obsolete entries. - // (innerSet and innerRemove will throw an exception - // if an entry is obsolete). - if (txEntry.op() != READ) - checkCommitLocks(cached); - - // Break out of while loop. - break; - } - // If entry cached within transaction got removed. - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry during transaction commit (will retry): " + txEntry); - - txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topologyVersion())); + if (log.isDebugEnabled()) + log.debug("Ignoring NOOP entry when committing: " + txEntry); + } } - } - } - catch (Throwable ex) { - // We are about to initiate transaction rollback when tx has started to committing. - // Need to remove version from committed list. - cctx.tm().removeCommittedTx(this); - boolean isNodeStopping = X.hasCause(ex, NodeStoppingException.class); - boolean hasInvalidEnvironmentIssue = X.hasCause(ex, InvalidEnvironmentException.class); + // Check commit locks after set, to make sure that + // we are not changing obsolete entries. + // (innerSet and innerRemove will throw an exception + // if an entry is obsolete). + if (txEntry.op() != READ) + checkCommitLocks(cached); - IgniteCheckedException err0 = new IgniteTxHeuristicCheckedException("Failed to locally write to cache " + - "(all transaction entries will be invalidated, however there was a window when " + - "entries for this transaction were visible to others): " + this, ex); - - if (isNodeStopping) { - U.warn(log, "Failed to commit transaction, node is stopping [tx=" + this + - ", err=" + ex + ']'); - } - else if (hasInvalidEnvironmentIssue) { - U.warn(log, "Failed to commit transaction, node is in invalid state and will be stopped [tx=" + this + - ", err=" + ex + ']'); + // Break out of while loop. + break; } - else - U.error(log, "Commit failed.", err0); - - COMMIT_ERR_UPD.compareAndSet(this, null, err0); + // If entry cached within transaction got removed. + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry during transaction commit (will retry): " + txEntry); - state(UNKNOWN); - - if (hasInvalidEnvironmentIssue) - cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex)); - else if (!isNodeStopping) { // Skip fair uncommit in case of node stopping or invalidation. - try { - // Courtesy to minimize damage. - uncommit(); - } - catch (Throwable ex1) { - U.error(log, "Failed to uncommit transaction: " + this, ex1); - - if (ex1 instanceof Error) - throw ex1; - } + txEntry.cached(entryEx(cacheCtx, txEntry.txKey(), topologyVersion())); } - - if (ex instanceof Error) - throw ex; - - throw err0; } + } // Apply cache sizes only for primary nodes. Update counters were applied on prepare state. @@ -988,11 +940,32 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig if (ptr != null && !cctx.tm().logTxRecords()) cctx.wal().flush(ptr, false); } - catch (StorageException e) { - err = e; + catch (Throwable ex) { + // We are about to initiate transaction rollback when tx has started to committing. + // Need to remove version from committed list. + cctx.tm().removeCommittedTx(this); + + if (X.hasCause(ex, NodeStoppingException.class)) { + U.warn(log, "Failed to commit transaction, node is stopping [tx=" + CU.txString(this) + + ", err=" + ex + ']'); + + return; + } + + err = heuristicException(ex); + + COMMIT_ERR_UPD.compareAndSet(this, null, err); + + state(UNKNOWN); + + try { + uncommit(); + } + catch (Throwable e) { + err.addSuppressed(e); + } - throw new IgniteCheckedException("Failed to log transaction record " + - "(transaction will be rolled back): " + this, e); + throw err; } finally { cctx.database().checkpointReadUnlock();
http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java index f31f0e9..ceeb4e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java @@ -24,6 +24,7 @@ import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureHandler; +import org.apache.ignite.failure.NoOpFailureHandler; import org.apache.ignite.failure.StopNodeOrHaltFailureHandler; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.GridProcessorAdapter; @@ -79,6 +80,13 @@ public class FailureProcessor extends GridProcessorAdapter { } /** + * @return @{code True} if a node will be stopped by current handler in near time. + */ + public boolean nodeStopping() { + return failureCtx != null && !(hnd instanceof NoOpFailureHandler); + } + + /** * This method is used to initialize local failure handler if {@link IgniteConfiguration} don't contain configured one. * * @return Default {@link FailureHandler} implementation. http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java index a7e6e8c..f68ecd6 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java @@ -41,7 +41,6 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject; import org.apache.ignite.internal.util.IgniteExceptionRegistry; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java index facea69..e69aff8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractSelfTest.java @@ -256,7 +256,7 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { cfg.setIndexedTypes(idxTypes); if (cacheMode() == PARTITIONED) - cfg.setBackups(1); + cfg.setBackups(backups()); return cfg; } @@ -362,6 +362,13 @@ public abstract class GridCacheAbstractSelfTest extends GridCommonAbstractTest { } /** + * @return Backups. + */ + protected int backups() { + return 1; + } + + /** * @param idx Index of grid. * @return Default cache. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java index 5f2e2ed..3e59c2a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQuerySelfTest.java @@ -26,12 +26,10 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.Callable; import javax.cache.Cache; -import junit.framework.TestCase; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.IgniteTransactions; -import org.apache.ignite.Ignition; import org.apache.ignite.binary.BinaryObject; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.query.QueryCursor; @@ -47,6 +45,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.spi.indexing.IndexingQueryFilter; import org.apache.ignite.spi.indexing.IndexingSpi; import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -57,25 +56,19 @@ import org.jetbrains.annotations.Nullable; /** * Indexing Spi query only test */ -public class IndexingSpiQuerySelfTest extends TestCase { - public static final String CACHE_NAME = "test-cache"; +public class IndexingSpiQuerySelfTest extends GridCommonAbstractTest { + private IndexingSpi indexingSpi; /** {@inheritDoc} */ - @Override public void tearDown() throws Exception { - Ignition.stopAll(true); - } - - /** - * @return Configuration. - */ - protected IgniteConfiguration configuration() { - IgniteConfiguration cfg = new IgniteConfiguration(); + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); TcpDiscoverySpi disco = new TcpDiscoverySpi(); disco.setIpFinder(ipFinder); + cfg.setIndexingSpi(indexingSpi); cfg.setDiscoverySpi(disco); return cfg; @@ -86,17 +79,22 @@ public class IndexingSpiQuerySelfTest extends TestCase { return new CacheConfiguration<>(cacheName); } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + /** * @throws Exception If failed. */ public void testSimpleIndexingSpi() throws Exception { - IgniteConfiguration cfg = configuration(); - - cfg.setIndexingSpi(new MyIndexingSpi()); + indexingSpi = new MyIndexingSpi(); - Ignite ignite = Ignition.start(cfg); + Ignite ignite = startGrid(0); - CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME); + CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME); IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg); @@ -113,13 +111,11 @@ public class IndexingSpiQuerySelfTest extends TestCase { * @throws Exception If failed. */ public void testIndexingSpiWithDisabledQueryProcessor() throws Exception { - IgniteConfiguration cfg = configuration(); - - cfg.setIndexingSpi(new MyIndexingSpi()); + indexingSpi = new MyIndexingSpi(); - Ignite ignite = Ignition.start(cfg); + Ignite ignite = startGrid(0); - CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME); + CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME); IgniteCache<Integer, Integer> cache = ignite.createCache(ccfg); @@ -136,13 +132,11 @@ public class IndexingSpiQuerySelfTest extends TestCase { * @throws Exception If failed. */ public void testBinaryIndexingSpi() throws Exception { - IgniteConfiguration cfg = configuration(); + indexingSpi = new MyBinaryIndexingSpi(); - cfg.setIndexingSpi(new MyBinaryIndexingSpi()); + Ignite ignite = startGrid(0); - Ignite ignite = Ignition.start(cfg); - - CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(CACHE_NAME); + CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME); IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg); @@ -168,13 +162,11 @@ public class IndexingSpiQuerySelfTest extends TestCase { public void testNonBinaryIndexingSpi() throws Exception { System.setProperty(IgniteSystemProperties.IGNITE_UNWRAP_BINARY_FOR_INDEXING_SPI, "true"); - IgniteConfiguration cfg = configuration(); - - cfg.setIndexingSpi(new MyIndexingSpi()); + indexingSpi = new MyIndexingSpi(); - Ignite ignite = Ignition.start(cfg); + Ignite ignite = startGrid(0); - CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(CACHE_NAME); + CacheConfiguration<PersonKey, Person> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME); IgniteCache<PersonKey, Person> cache = ignite.createCache(ccfg); @@ -198,13 +190,11 @@ public class IndexingSpiQuerySelfTest extends TestCase { */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") public void testIndexingSpiFailure() throws Exception { - IgniteConfiguration cfg = configuration(); - - cfg.setIndexingSpi(new MyBrokenIndexingSpi()); + indexingSpi = new MyBrokenIndexingSpi(); - Ignite ignite = Ignition.start(cfg); + Ignite ignite = startGrid(0); - CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(CACHE_NAME); + CacheConfiguration<Integer, Integer> ccfg = cacheConfiguration(DEFAULT_CACHE_NAME); ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java index e59deed..ca80b13 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/IndexingSpiQueryTxSelfTest.java @@ -17,6 +17,11 @@ package org.apache.ignite.internal.processors.cache.query; +import java.util.Collection; +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.Cache; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheAtomicityMode; @@ -37,61 +42,64 @@ import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; -import java.util.Collection; -import java.util.Iterator; -import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; -import javax.cache.Cache; - /** * Indexing Spi transactional query test */ public class IndexingSpiQueryTxSelfTest extends GridCacheAbstractSelfTest { - /** */ - private static AtomicInteger cnt; - /** {@inheritDoc} */ @Override protected int gridCount() { return 4; } /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - cnt = new AtomicInteger(); - - super.beforeTestsStarted(); - } - - /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true); - if (cnt.getAndIncrement() == 0) - cfg.setClientMode(true); - else { - cfg.setIndexingSpi(new MyBrokenIndexingSpi()); + cfg.setClientMode("client".equals(igniteInstanceName)); + cfg.setIndexingSpi(new MyBrokenIndexingSpi()); - CacheConfiguration ccfg = cacheConfiguration(igniteInstanceName); - ccfg.setName("test-cache"); - ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + CacheConfiguration ccfg = cacheConfiguration(igniteInstanceName); + ccfg.setName(DEFAULT_CACHE_NAME); + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - ccfg.setIndexedTypes(Integer.class, Integer.class); + ccfg.setIndexedTypes(Integer.class, Integer.class); + + cfg.setCacheConfiguration(ccfg); - cfg.setCacheConfiguration(ccfg); - } return cfg; } + /** */ + public void testIndexingSpiWithTxClient() throws Exception { + IgniteEx client = startGrid("client"); + + assertNotNull(client.cache(DEFAULT_CACHE_NAME)); + + doTestIndexingSpiWithTx(client, 0); + } + + /** */ + public void testIndexingSpiWithTxLocal() throws Exception { + IgniteEx ignite = (IgniteEx)primaryNode(0, DEFAULT_CACHE_NAME); + + doTestIndexingSpiWithTx(ignite, 0); + } + + /** */ + public void testIndexingSpiWithTxNotLocal() throws Exception { + IgniteEx ignite = (IgniteEx)primaryNode(0, DEFAULT_CACHE_NAME); + + doTestIndexingSpiWithTx(ignite, 1); + } + /** * @throws Exception If failed. */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - public void testIndexingSpiWithTx() throws Exception { - IgniteEx ignite = grid(0); - - final IgniteCache<Integer, Integer> cache = ignite.cache("test-cache"); + private void doTestIndexingSpiWithTx(IgniteEx ignite, int key) throws Exception { + final IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); final IgniteTransactions txs = ignite.transactions(); @@ -104,7 +112,7 @@ public class IndexingSpiQueryTxSelfTest extends GridCacheAbstractSelfTest { Transaction tx; try (Transaction tx0 = tx = txs.txStart(concurrency, isolation)) { - cache.put(1, 1); + cache.put(key, key); tx0.commit(); } @@ -114,6 +122,8 @@ public class IndexingSpiQueryTxSelfTest extends GridCacheAbstractSelfTest { return null; } }, IgniteTxHeuristicCheckedException.class); + + checkFutures(); } } } @@ -135,7 +145,7 @@ public class IndexingSpiQueryTxSelfTest extends GridCacheAbstractSelfTest { /** {@inheritDoc} */ @Override public Iterator<Cache.Entry<?, ?>> query(@Nullable String cacheName, Collection<Object> params, @Nullable IndexingQueryFilter filters) throws IgniteSpiException { - return null; + return null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java index fe27e6e..01db747 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/AbstractTransactionIntergrityTest.java @@ -33,6 +33,7 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.cluster.ClusterNode; @@ -40,10 +41,10 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; import org.apache.ignite.failure.FailureHandler; import org.apache.ignite.failure.StopNodeFailureHandler; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; @@ -73,7 +74,7 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { private static final int DFLT_ACCOUNTS_CNT = 32; /** Count of threads and caches. */ - private static final int DFLT_TX_THREADS_CNT = 20; + private static final int DFLT_TX_THREADS_CNT = Runtime.getRuntime().availableProcessors(); /** Count of nodes to start. */ private static final int DFLT_NODES_CNT = 3; @@ -126,16 +127,6 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { return true; } - /** - * @return Flag enables cross-node transactions, - * when primary partitions participating in transaction spreaded across several cluster nodes. - */ - protected boolean crossNodeTransactions() { - // Commit error during cross node transactions breaks transaction integrity - // TODO: https://issues.apache.org/jira/browse/IGNITE-9086 - return false; - } - /** {@inheritDoc} */ @Override protected FailureHandler getFailureHandler(String igniteInstanceName) { return new StopNodeFailureHandler(); @@ -148,14 +139,15 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { cfg.setConsistentId(name); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); - cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); - cfg.setLocalHost("127.0.0.1"); cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() - .setMaxSize(256 * 1024 * 1024) - .setPersistenceEnabled(persistent())) - ); + .setPersistenceEnabled(persistent()) + .setMaxSize(50 * 1024 * 1024) + ) + .setWalSegmentSize(16 * 1024 * 1024) + .setPageSize(1024) + .setWalMode(WALMode.LOG_ONLY)); CacheConfiguration[] cacheConfigurations = new CacheConfiguration[txThreadsCount()]; @@ -178,6 +170,8 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { cfg.setCacheConfiguration(cacheConfigurations); + cfg.setFailureDetectionTimeout(30_000); + return cfg; } @@ -219,8 +213,11 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { /** * Test transfer amount. + * + * @param failoverScenario Scenario. + * @param colocatedAccounts {@code True} to use colocated on same primary node accounts. */ - public void doTestTransferAmount(FailoverScenario failoverScenario) throws Exception { + public void doTestTransferAmount(FailoverScenario failoverScenario, boolean colocatedAccounts) throws Exception { failoverScenario.beforeNodesStarted(); //given: started some nodes with client. @@ -230,26 +227,26 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { igniteClient.cluster().active(true); - int[] initAmount = new int[txThreadsCount()]; + int[] initAmounts = new int[txThreadsCount()]; completedTxs = new ConcurrentLinkedHashMap[txThreadsCount()]; //and: fill all accounts on all caches and calculate total amount for every cache. for (int cachePrefixIdx = 0; cachePrefixIdx < txThreadsCount(); cachePrefixIdx++) { IgniteCache<Integer, AccountState> cache = igniteClient.getOrCreateCache(cacheName(cachePrefixIdx)); - AtomicInteger coinsCounter = new AtomicInteger(); + AtomicInteger coinsCntr = new AtomicInteger(); try (Transaction tx = igniteClient.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { for (int accountId = 0; accountId < accountsCount(); accountId++) { - Set<Integer> initialAmount = generateCoins(coinsCounter, 5); + Set<Integer> initAmount = generateCoins(coinsCntr, 5); - cache.put(accountId, new AccountState(accountId, tx.xid(), initialAmount)); + cache.put(accountId, new AccountState(accountId, tx.xid(), initAmount)); } tx.commit(); } - initAmount[cachePrefixIdx] = coinsCounter.get(); + initAmounts[cachePrefixIdx] = coinsCntr.get(); completedTxs[cachePrefixIdx] = new ConcurrentLinkedHashMap(); } @@ -259,7 +256,8 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { ArrayList<Thread> transferThreads = new ArrayList<>(); for (int i = 0; i < txThreadsCount(); i++) { - transferThreads.add(new TransferAmountTxThread(firstTransactionDone, igniteClient, cacheName(i), i)); + transferThreads.add(new TransferAmountTxThread(firstTransactionDone, + igniteClient, cacheName(i), i, colocatedAccounts)); transferThreads.get(i).start(); } @@ -268,13 +266,12 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { failoverScenario.afterFirstTransaction(); - for (Thread thread : transferThreads) { + for (Thread thread : transferThreads) thread.join(); - } failoverScenario.afterTransactionsFinished(); - consistencyCheck(initAmount); + consistencyCheck(initAmounts); } /** @@ -385,11 +382,11 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { /** * @param txId Transaction id. - * @param coinsToRemove Coins to remove from current account. + * @param coinsToRmv Coins to remove from current account. * @return Account state with removed coins. */ - public AccountState removeCoins(IgniteUuid txId, Set<Integer> coinsToRemove) { - return new AccountState(accId, txId, Sets.difference(coins, coinsToRemove).immutableCopy()); + public AccountState removeCoins(IgniteUuid txId, Set<Integer> coinsToRmv) { + return new AccountState(accId, txId, Sets.difference(coins, coinsToRmv).immutableCopy()); } /** {@inheritDoc} */ @@ -418,11 +415,11 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { /** * @param coinsNum Coins number. */ - private Set<Integer> generateCoins(AtomicInteger coinsCounter, int coinsNum) { + private Set<Integer> generateCoins(AtomicInteger coinsCntr, int coinsNum) { Set<Integer> res = new HashSet<>(); for (int i = 0; i < coinsNum; i++) - res.add(coinsCounter.incrementAndGet()); + res.add(coinsCntr.incrementAndGet()); return res; } @@ -479,23 +476,35 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { private class TransferAmountTxThread extends Thread { /** */ private CountDownLatch firstTransactionLatch; + /** */ private IgniteEx ignite; + /** */ private String cacheName; + /** */ - private int txIndex; + private int workerIdx; + /** */ private Random random = new Random(); + /** */ + private final boolean colocatedAccounts; + /** * @param ignite Ignite. */ - private TransferAmountTxThread(CountDownLatch firstTransactionLatch, final IgniteEx ignite, String cacheName, int txIndex) { + private TransferAmountTxThread(CountDownLatch firstTransactionLatch, + final IgniteEx ignite, + String cacheName, + int workerIdx, + boolean colocatedAccounts) { this.firstTransactionLatch = firstTransactionLatch; this.ignite = ignite; this.cacheName = cacheName; - this.txIndex = txIndex; + this.workerIdx = workerIdx; + this.colocatedAccounts = colocatedAccounts; } /** {@inheritDoc} */ @@ -514,7 +523,6 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { /** * @throws IgniteException if fails */ - @SuppressWarnings("unchecked") private void updateInTransaction(IgniteCache<Integer, AccountState> cache) throws IgniteException { int accIdFrom; int accIdTo; @@ -526,11 +534,16 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { if (accIdFrom == accIdTo) continue; - ClusterNode primaryForAccFrom = ignite.cachex(cacheName).affinity().mapKeyToNode(accIdFrom); - ClusterNode primaryForAccTo = ignite.cachex(cacheName).affinity().mapKeyToNode(accIdTo); + Affinity<Object> affinity = ignite.affinity(cacheName); + + ClusterNode primaryForAccFrom = affinity.mapKeyToNode(accIdFrom); + assertNotNull(primaryForAccFrom); + + ClusterNode primaryForAccTo = affinity.mapKeyToNode(accIdTo); + assertNotNull(primaryForAccTo); // Allows only transaction between accounts that primary on the same node if corresponding flag is enabled. - if (!crossNodeTransactions() && !primaryForAccFrom.id().equals(primaryForAccTo.id())) + if (colocatedAccounts && !primaryForAccFrom.id().equals(primaryForAccTo.id())) continue; break; @@ -541,7 +554,10 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { try (Transaction tx = ignite.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) { acctFrom = cache.get(accIdFrom); + assertNotNull(acctFrom); + acctTo = cache.get(accIdTo); + assertNotNull(acctTo); Set<Integer> coinsToTransfer = acctFrom.coinsToTransfer(random); @@ -553,23 +569,8 @@ public class AbstractTransactionIntergrityTest extends GridCommonAbstractTest { tx.commit(); - completedTxs[txIndex].put(tx.xid(), new TxState(acctFrom, acctTo, nextFrom, nextTo, coinsToTransfer)); - } - } - - /** - * @param curr current - * @return random value - */ - private long getNextAccountId(long curr) { - long randomVal; - - do { - randomVal = random.nextInt(accountsCount()); + completedTxs[workerIdx].put(tx.xid(), new TxState(acctFrom, acctTo, nextFrom, nextTo, coinsToTransfer)); } - while (curr == randomVal); - - return randomVal; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java index 3260607..473eaf5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithPrimaryIndexCorruptionTest.java @@ -17,20 +17,26 @@ package org.apache.ignite.internal.processors.cache.transactions; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Supplier; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteIllegalStateException; import org.apache.ignite.Ignition; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; import org.apache.ignite.internal.processors.cache.tree.SearchRow; import org.apache.ignite.testframework.GridTestUtils; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; + /** * Test cases that check transaction data integrity after transaction commit failed. */ @@ -45,81 +51,96 @@ public class TransactionIntegrityWithPrimaryIndexCorruptionTest extends Abstract super.afterTest(); } - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 60 * 1000L; + /** */ + public void testPrimaryIndexCorruptionDuringCommitPrimaryColocatedThrowsError() throws Exception { + doTestTransferAmount0(true, true, () -> new AssertionError("Test")); } - /** - * Throws a test {@link AssertionError} during tx commit from {@link BPlusTree} and checks after that data is consistent. - */ - public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode1() throws Exception { - doTestTransferAmount(new IndexCorruptionFailoverScenario( - true, - (hnd, tree) -> hnd instanceof BPlusTree.Search, - failoverPredicate(true, () -> new AssertionError("Test"))) - ); + /** */ + public void testPrimaryIndexCorruptionDuringCommitPrimaryColocatedThrowsUnchecked() throws Exception { + doTestTransferAmount0(true, true, () -> new RuntimeException("Test")); } - /** - * Throws a test {@link RuntimeException} during tx commit from {@link BPlusTree} and checks after that data is consistent. - */ - public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode2() throws Exception { - doTestTransferAmount(new IndexCorruptionFailoverScenario( - true, - (hnd, tree) -> hnd instanceof BPlusTree.Search, - failoverPredicate(true, () -> new RuntimeException("Test"))) - ); + /** */ + public void testPrimaryIndexCorruptionDuringCommitPrimaryColocatedThrowsChecked() throws Exception { + doTestTransferAmount0(true, true, () -> new IgniteCheckedException("Test")); } - /** - * Throws a test {@link AssertionError} during tx commit from {@link BPlusTree} and checks after that data is consistent. - */ - public void testPrimaryIndexCorruptionDuringCommitOnBackupNode() throws Exception { - doTestTransferAmount(new IndexCorruptionFailoverScenario( - true, - (hnd, tree) -> hnd instanceof BPlusTree.Search, - failoverPredicate(false, () -> new AssertionError("Test"))) - ); + /** */ + public void testPrimaryIndexCorruptionDuringCommitPrimaryNonColocatedThrowsError() throws Exception { + doTestTransferAmount0(false, true, () -> new AssertionError("Test")); } - /** - * Throws a test {@link IgniteCheckedException} during tx commit from {@link BPlusTree} and checks after that data is consistent. - */ - public void testPrimaryIndexCorruptionDuringCommitOnPrimaryNode3() throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-9082"); + /** */ + public void testPrimaryIndexCorruptionDuringCommitPrimaryNonColocatedThrowsUnchecked() throws Exception { + doTestTransferAmount0(false, true, () -> new RuntimeException("Test")); + } - doTestTransferAmount(new IndexCorruptionFailoverScenario( - false, - (hnd, tree) -> hnd instanceof BPlusTree.Search, - failoverPredicate(true, () -> new IgniteCheckedException("Test"))) - ); + /** */ + public void testPrimaryIndexCorruptionDuringCommitPrimaryNonColocatedThrowsChecked() throws Exception { + doTestTransferAmount0(false, true, () -> new IgniteCheckedException("Test")); + } + + /** */ + public void testPrimaryIndexCorruptionDuringCommitBackupColocatedThrowsError() throws Exception { + doTestTransferAmount0(true, false, () -> new AssertionError("Test")); + } + + /** */ + public void testPrimaryIndexCorruptionDuringCommitBackupColocatedThrowsUnchecked() throws Exception { + doTestTransferAmount0(true, false, () -> new RuntimeException("Test")); + } + + /** */ + public void testPrimaryIndexCorruptionDuringCommitBackupColocatedThrowsChecked() throws Exception { + doTestTransferAmount0(true, false, () -> new IgniteCheckedException("Test")); + } + + /** */ + public void testPrimaryIndexCorruptionDuringCommitBackupNonColocatedThrowsError() throws Exception { + doTestTransferAmount0(false, false, () -> new AssertionError("Test")); + } + + /** */ + public void testPrimaryIndexCorruptionDuringCommitBackupNonColocatedThrowsUnchecked() throws Exception { + doTestTransferAmount0(false, false, () -> new RuntimeException("Test")); + } + + /** */ + public void testPrimaryIndexCorruptionDuringCommitBackupNonColocatedThrowsChecked() throws Exception { + doTestTransferAmount0(false, false, () -> new IgniteCheckedException("Test")); } /** * Creates failover predicate which generates error during transaction commmit. * - * @param failOnPrimary If {@code true} index should be failed on transaction primary node. + * @param failOnPrimary If {@code true} index should be failed on transaction primary node, otherwise on backup. * @param errorSupplier Supplier to create various errors. + * @param errorConsumer Consumer to track unexpected errors while committing. */ private BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate( boolean failOnPrimary, - Supplier<Throwable> errorSupplier + Supplier<Throwable> errorSupplier, + Consumer<Throwable> errorConsumer ) { return (ignite, row) -> { - int cacheId = row.cacheId(); - int partId = row.key().partition(); - - final ClusterNode locNode = ignite.localNode(); - final AffinityTopologyVersion curTopVer = ignite.context().discovery().topologyVersionEx(); - - // Throw exception if current node is primary for given row. - return ignite.cachesx(c -> c.context().cacheId() == cacheId) - .stream() - .filter(c -> c.context().affinity().primaryByPartition(locNode, partId, curTopVer) == failOnPrimary) - .map(c -> errorSupplier.get()) - .findFirst() - .orElse(null); + try { + int cacheId = row.cacheId(); + int partId = row.key().partition(); + + GridDhtPartitionTopology top = ignite.context().cache().cacheGroup(cacheId).topology(); + + GridDhtLocalPartition part = top.localPartition(partId); + + assertTrue("Illegal partition state for mapped tx: " + part, part != null && part.state() == OWNING); + + return part.primary(top.readyTopologyVersion()) == failOnPrimary ? errorSupplier.get() : null; + } + catch (Throwable e) { + errorConsumer.accept(e); + + throw e; + } }; } @@ -130,68 +151,68 @@ public class TransactionIntegrityWithPrimaryIndexCorruptionTest extends Abstract /** Failed node index. */ static final int failedNodeIdx = 1; - /** Is node stopping expected after failover. */ - private final boolean nodeStoppingExpected; - - /** Predicate that will choose an instance of {@link BPlusTree} and page operation - * to make further failover in this tree using {@link #failoverPredicate}. */ - private final BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPredicate; + /** + * Predicate that will choose an instance of {@link BPlusTree} and page operation to make further failover in + * this tree using {@link #failoverPred}. + */ + private final BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPred; /** Function that may return error during row insertion into {@link BPlusTree}. */ - private final BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate; + private final BiFunction<IgniteEx, SearchRow, Throwable> failoverPred; /** - * @param nodeStoppingExpected Node stopping expected. - * @param treeCorruptionPredicate Tree corruption predicate. - * @param failoverPredicate Failover predicate. + * @param treeCorruptionPred Tree corruption predicate. + * @param failoverPred Failover predicate. */ IndexCorruptionFailoverScenario( - boolean nodeStoppingExpected, - BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPredicate, - BiFunction<IgniteEx, SearchRow, Throwable> failoverPredicate + BiFunction<PageHandler, BPlusTree, Boolean> treeCorruptionPred, + BiFunction<IgniteEx, SearchRow, Throwable> failoverPred ) { - this.nodeStoppingExpected = nodeStoppingExpected; - this.treeCorruptionPredicate = treeCorruptionPredicate; - this.failoverPredicate = failoverPredicate; + this.treeCorruptionPred = treeCorruptionPred; + this.failoverPred = failoverPred; } /** {@inheritDoc} */ @Override public void beforeNodesStarted() { BPlusTree.pageHndWrapper = (tree, hnd) -> { - final IgniteEx locIgnite = (IgniteEx) Ignition.localIgnite(); + final IgniteEx locIgnite = (IgniteEx)Ignition.localIgnite(); - if (!locIgnite.name().endsWith(String.valueOf(failedNodeIdx))) + if (getTestIgniteInstanceIndex(locIgnite.name()) != failedNodeIdx) return hnd; - if (treeCorruptionPredicate.apply(hnd, tree)) { - log.info("Created corrupted tree handler for -> " + hnd + " " + tree); + if (treeCorruptionPred.apply(hnd, tree)) { + log.info("Created corrupted tree handler [nodeOrder=" + locIgnite.localNode().order() + ", hnd=" + hnd + + ", tree=" + tree + ']'); - PageHandler<Object, BPlusTree.Result> delegate = (PageHandler<Object, BPlusTree.Result>) hnd; + PageHandler<Object, BPlusTree.Result> delegate = (PageHandler<Object, BPlusTree.Result>)hnd; return new PageHandler<BPlusTree.Get, BPlusTree.Result>() { - @Override public BPlusTree.Result run(int cacheId, long pageId, long page, long pageAddr, PageIO io, Boolean walPlc, BPlusTree.Get arg, int lvl) throws IgniteCheckedException { - log.info("Invoked " + " " + cacheId + " " + arg.toString() + " for BTree (" + corruptionEnabled + ") -> " + arg.row() + " / " + arg.row().getClass()); + @Override public BPlusTree.Result run(int cacheId, long pageId, long page, long pageAddr, PageIO io, + Boolean walPlc, BPlusTree.Get arg, int lvl) throws IgniteCheckedException { + log.info("Invoked [cachedId=" + cacheId + ", hnd=" + arg.toString() + + ", corruption=" + corruptionEnabled + ", row=" + arg.row() + ", rowCls=" + arg.row().getClass() + ']'); if (corruptionEnabled && (arg.row() instanceof SearchRow)) { - SearchRow row = (SearchRow) arg.row(); + SearchRow row = (SearchRow)arg.row(); // Store cacheId to search row explicitly, as it can be zero if there is one cache in a group. - Throwable res = failoverPredicate.apply(locIgnite, new SearchRow(cacheId, row.key())); + Throwable res = failoverPred.apply(locIgnite, new SearchRow(cacheId, row.key())); if (res != null) { if (res instanceof Error) - throw (Error) res; + throw (Error)res; else if (res instanceof RuntimeException) - throw (RuntimeException) res; + throw (RuntimeException)res; else if (res instanceof IgniteCheckedException) - throw (IgniteCheckedException) res; + throw (IgniteCheckedException)res; } } return delegate.run(cacheId, pageId, page, pageAddr, io, walPlc, arg, lvl); } - @Override public boolean releaseAfterWrite(int cacheId, long pageId, long page, long pageAddr, BPlusTree.Get g, int lvl) { + @Override public boolean releaseAfterWrite(int cacheId, long pageId, long page, long pageAddr, + BPlusTree.Get g, int lvl) { return g.canRelease(pageId, lvl); } }; @@ -212,27 +233,68 @@ public class TransactionIntegrityWithPrimaryIndexCorruptionTest extends Abstract // Disable index corruption. BPlusTree.pageHndWrapper = (tree, hnd) -> hnd; - if (nodeStoppingExpected) { - // Wait until node with corrupted index will left cluster. - GridTestUtils.waitForCondition(() -> { - try { - grid(failedNodeIdx); - } - catch (IgniteIllegalStateException e) { - return true; - } + // Wait until node with corrupted index will left cluster. + GridTestUtils.waitForCondition(() -> { + try { + grid(failedNodeIdx); + } + catch (IgniteIllegalStateException e) { + return true; + } - return false; - }, getTestTimeout()); + return false; + }, getTestTimeout()); - // Failed node should be stopped. - GridTestUtils.assertThrows(log, () -> grid(failedNodeIdx), IgniteIllegalStateException.class, ""); + // Failed node should be stopped. + GridTestUtils.assertThrows(log, () -> grid(failedNodeIdx), IgniteIllegalStateException.class, null); - // Re-start failed node. - startGrid(failedNodeIdx); + // Re-start failed node. + startGrid(failedNodeIdx); - awaitPartitionMapExchange(); - } + awaitPartitionMapExchange(); + } + } + + /** + * Test transfer amount with extended error recording. + * + * @param colocatedAccount Colocated account. + * @param failOnPrimary {@code True} if fail on primary, else on backup. + * @param supplier Fail reason supplier. + * @throws Exception If failover predicate execution is failed. + */ + private void doTestTransferAmount0(boolean colocatedAccount, boolean failOnPrimary, Supplier<Throwable> supplier) throws Exception { + ErrorTracker errTracker = new ErrorTracker(); + + doTestTransferAmount( + new IndexCorruptionFailoverScenario( + (hnd, tree) -> hnd instanceof BPlusTree.Search, + failoverPredicate(failOnPrimary, supplier, errTracker)), + colocatedAccount + ); + + for (Throwable throwable : errTracker.errors()) + log.error("Recorded error", throwable); + + if (!errTracker.errors().isEmpty()) + fail("Test run has error"); + } + + /** */ + private static class ErrorTracker implements Consumer<Throwable> { + /** Queue. */ + private final Queue<Throwable> q = new ConcurrentLinkedQueue<>(); + + /** {@inheritDoc} */ + @Override public void accept(Throwable throwable) { + q.add(throwable); + } + + /** + * @return Recorded errors. + */ + public Collection<Throwable> errors() { + return q; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java index 25aae4b..551335f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TransactionIntegrityWithSystemWorkerDeathTest.java @@ -41,9 +41,7 @@ public class TransactionIntegrityWithSystemWorkerDeathTest extends AbstractTrans return false; } - /** - * - */ + /** */ public void testFailoverWithDiscoWorkerTermination() throws Exception { doTestTransferAmount(new FailoverScenario() { static final int failedNodeIdx = 1; @@ -83,7 +81,7 @@ public class TransactionIntegrityWithSystemWorkerDeathTest extends AbstractTrans awaitPartitionMapExchange(); } - }); + }, true); } /**
