Repository: ignite Updated Branches: refs/heads/ignite-3479 eb141c6ea -> d92cfa435
ignite-3479 Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d92cfa43 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d92cfa43 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d92cfa43 Branch: refs/heads/ignite-3479 Commit: d92cfa435da5f72f316f252eb0be03eac1c5d71d Parents: eb141c6 Author: sboikov <[email protected]> Authored: Thu Sep 28 15:31:06 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Sep 28 17:38:14 2017 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 65 ++++++++++++--- .../distributed/dht/GridDhtTxPrepareFuture.java | 10 ++- .../GridNearPessimisticTxPrepareFuture.java | 1 + .../near/GridNearTxPrepareFutureAdapter.java | 2 +- .../cache/mvcc/CacheCoordinatorsProcessor.java | 85 ++++++++++++++------ .../processors/cache/mvcc/MvccQueryTracker.java | 40 +++++++-- .../mvcc/NewCoordinatorQueryAckRequest.java | 2 +- .../cache/mvcc/CacheMvccTransactionsTest.java | 63 +++++++++++---- 8 files changed, 205 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index f850ad3..097d90f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1719,9 +1719,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana IgniteTxManager tm = cctx.tm(); if (tm != null) { - U.warn(diagnosticLog, "Pending transactions:"); + boolean first = true; for (IgniteInternalTx tx : tm.activeTransactions()) { + if (first) { + U.warn(diagnosticLog, "Pending transactions:"); + + first = false; + } + if (exchTopVer != null) { U.warn(diagnosticLog, ">>> [txVer=" + tx.topologyVersionSnapshot() + ", exchWait=" + tm.needWaitTransaction(tx, exchTopVer) + @@ -1735,31 +1741,66 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridCacheMvccManager mvcc = cctx.mvcc(); if (mvcc != null) { - U.warn(diagnosticLog, "Pending explicit locks:"); + boolean first = true; + + for (GridCacheExplicitLockSpan lockSpan : mvcc.activeExplicitLocks()) { + if (first) { + U.warn(diagnosticLog, "Pending explicit locks:"); + + first = false; + } - for (GridCacheExplicitLockSpan lockSpan : mvcc.activeExplicitLocks()) U.warn(diagnosticLog, ">>> " + lockSpan); + } + + first = true; - U.warn(diagnosticLog, "Pending cache futures:"); + for (GridCacheFuture<?> fut : mvcc.activeFutures()) { + if (first) { + U.warn(diagnosticLog, "Pending cache futures:"); + + first = false; + } - for (GridCacheFuture<?> fut : mvcc.activeFutures()) dumpDiagnosticInfo(fut, diagCtx); + } + + first = true; - U.warn(diagnosticLog, "Pending atomic cache futures:"); + for (GridCacheFuture<?> fut : mvcc.atomicFutures()) { + if (first) { + U.warn(diagnosticLog, "Pending atomic cache futures:"); + + first = false; + } - for (GridCacheFuture<?> fut : mvcc.atomicFutures()) dumpDiagnosticInfo(fut, diagCtx); + } - U.warn(diagnosticLog, "Pending data streamer futures:"); + first = true; + + for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures()) { + if (first) { + U.warn(diagnosticLog, "Pending data streamer futures:"); + + first = false; + } - for (IgniteInternalFuture<?> fut : mvcc.dataStreamerFutures()) dumpDiagnosticInfo(fut, diagCtx); + } if (tm != null) { - U.warn(diagnosticLog, "Pending transaction deadlock detection futures:"); + first = true; + + for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures()) { + if (first) { + U.warn(diagnosticLog, "Pending transaction deadlock detection futures:"); + + first = false; + } - for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures()) dumpDiagnosticInfo(fut, diagCtx); + } } } @@ -1781,6 +1822,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana affDumpCnt++; } } + + cctx.kernalContext().coordinators().dumpDebugInfo(diagnosticLog, diagCtx); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 4eca4e8..723bd4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1239,6 +1239,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite IgniteInternalFuture<MvccCoordinatorVersion> waitCrdCntrFut = null; if (req.requestMvccCounter()) { + assert last; + assert tx.txState().mvccEnabled(cctx); MvccCoordinator crd = cctx.coordinators().currentCoordinator(); @@ -1286,17 +1288,17 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite fut.get(); sendPrepareRequests(); + + markInitialized(); } catch (Throwable e) { - U.error(log, "Failed to get coordinator counter: " + e, e); + U.error(log, "Failed to get mvcc version for tx [txId=" + tx.nearXidVersion() + + ", err=" + e + ']', e); GridNearTxPrepareResponse res = createPrepareResponse(e); onDone(res, res.error()); } - finally { - markInitialized(); - } } }); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 2001011..4a2aeb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -507,6 +507,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA return S.toString(GridNearPessimisticTxPrepareFuture.class, this, "innerFuts", futs, + "txId", tx.nearXidVersion(), "super", super.toString()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index ddc5826..987a751 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -160,7 +160,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends * @param txMapping Transaction mapping. */ final void checkOnePhase(GridDhtTxMapping txMapping) { - if (tx.storeWriteThrough()) + if (tx.storeWriteThrough() || tx.txState().mvccEnabled(cctx)) // TODO IGNITE-3479 (onePhase + mvcc) return; Map<UUID, Collection<UUID>> map = txMapping.transactionNodes(); http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java index 759a369..cfd6c4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/CacheCoordinatorsProcessor.java @@ -35,6 +35,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -264,7 +265,8 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { MSG_POLICY); } catch (IgniteCheckedException e) { - fut.onError(e); + if (verFuts.remove(fut.id) != null) + fut.onError(e); } return fut; @@ -328,7 +330,7 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } catch (IgniteCheckedException e) { if (verFuts.remove(fut.id) != null) - fut.onDone(e); + fut.onError(e); } return fut; @@ -354,13 +356,13 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { new CoordinatorWaitTxsRequest(fut.id, txs), MSG_POLICY); } - catch (ClusterTopologyCheckedException e) { - if (ackFuts.remove(fut.id) != null) - fut.onDone(); // No need to ack, finish without error. - } catch (IgniteCheckedException e) { - if (ackFuts.remove(fut.id) != null) - fut.onDone(e); + if (ackFuts.remove(fut.id) != null) { + if (e instanceof ClusterTopologyCheckedException) + fut.onDone(); // No need to wait, new coordinator will be assigned, finish without error. + else + fut.onDone(e); + } } return fut; @@ -385,13 +387,13 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { new CoordinatorTxAckRequest(fut.id, mvccVer.counter()), MSG_POLICY); } - catch (ClusterTopologyCheckedException e) { - if (ackFuts.remove(fut.id) != null) - fut.onDone(); // No need to ack, finish without error. - } catch (IgniteCheckedException e) { - if (ackFuts.remove(fut.id) != null) - fut.onDone(e); + if (ackFuts.remove(fut.id) != null) { + if (e instanceof ClusterTopologyCheckedException) + fut.onDone(); // No need to ack, finish without error. + else + fut.onDone(e); + } } return fut; @@ -964,6 +966,36 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { } /** + * @param log Logger. + * @param diagCtx Diagnostic request. + */ + public void dumpDebugInfo(IgniteLogger log, @Nullable IgniteDiagnosticPrepareContext diagCtx) { + boolean first = true; + + for (MvccVersionFuture verFur : verFuts.values()) { + if (first) { + U.warn(log, "Pending mvcc version futures: "); + + first = false; + } + + U.warn(log, ">>> " + verFur.toString()); + } + + first = true; + + for (WaitAckFuture waitAckFut : ackFuts.values()) { + if (first) { + U.warn(log, "Pending mvcc wait ack futures: "); + + first = false; + } + + U.warn(log, ">>> " + waitAckFut.toString()); + } + } + + /** * */ public class MvccVersionFuture extends GridFutureAdapter<MvccCoordinatorVersion> { @@ -1004,22 +1036,23 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { onDone(res); } + /** + * @param err Error. + */ void onError(IgniteCheckedException err) { - if (verFuts.remove(id) != null) { - if (lsnr != null) - lsnr.onMvccError(err); + if (lsnr != null) + lsnr.onMvccError(err); - onDone(err); - } + onDone(err); } /** * @param nodeId Failed node ID. */ - void onNodeLeft(UUID nodeId) { - if (crdId.equals(nodeId)) { - ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request coordinator version, " + - "coordinator failed: " + nodeId); + void onNodeLeft(UUID nodeId ) { + if (crdId.equals(nodeId) && verFuts.remove(id) != null) { + ClusterTopologyCheckedException err = new ClusterTopologyCheckedException("Failed to request mvcc " + + "version, coordinator failed: " + nodeId); onError(err); } @@ -1073,13 +1106,15 @@ public class CacheCoordinatorsProcessor extends GridProcessorAdapter { * @param nodeId Failed node ID. */ void onNodeLeft(UUID nodeId) { - if (crdId.equals(nodeId) && verFuts.remove(id) != null) + if (crdId.equals(nodeId) && ackFuts.remove(id) != null) onDone(); } /** {@inheritDoc} */ @Override public String toString() { - return "WaitAckFuture [crdId=" + crdId + ", id=" + id + ']'; + return "WaitAckFuture [crdId=" + crdId + + ", id=" + id + + ", ackTx=" + ackTx + ']'; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java index 095f630..a460820 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/MvccQueryTracker.java @@ -23,6 +23,8 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteInClosure; import org.jetbrains.annotations.Nullable; @@ -37,18 +39,20 @@ public class MvccQueryTracker { private MvccCoordinatorVersion mvccVer; /** */ + @GridToStringExclude private final GridCacheContext cctx; /** */ private final boolean canRemap; /** */ + @GridToStringExclude private final MvccQueryAware lsnr; /** - * @param cctx - * @param canRemap - * @param lsnr + * @param cctx Cache context. + * @param canRemap {@code True} if can wait for topology changes. + * @param lsnr Listener. */ public MvccQueryTracker(GridCacheContext cctx, boolean canRemap, MvccQueryAware lsnr) { assert cctx.mvccEnabled() : cctx.name(); @@ -58,6 +62,16 @@ public class MvccQueryTracker { this.lsnr = lsnr; } + /** + * @return Requested mvcc version. + */ + public MvccCoordinatorVersion mvccVersion() { + assert mvccVer != null : this; + + return mvccVer; + } + + /** {@inheritDoc} */ @Nullable public MvccCoordinatorVersion onMvccCoordinatorChange(MvccCoordinator newCrd) { synchronized (this) { if (mvccVer != null) { @@ -72,10 +86,9 @@ public class MvccQueryTracker { } } - public MvccCoordinatorVersion mvccVersion() { - return mvccVer; - } - + /** + * + */ public void onQueryDone() { MvccCoordinator mvccCrd0 = null; MvccCoordinatorVersion mvccVer0 = null; @@ -95,6 +108,9 @@ public class MvccQueryTracker { cctx.shared().coordinators().ackQueryDone(mvccCrd0, mvccVer0); } + /** + * @param topVer Topology version. + */ public void requestVersion(final AffinityTopologyVersion topVer) { MvccCoordinator mvccCrd0 = cctx.affinity().mvccCoordinator(topVer); @@ -130,6 +146,8 @@ public class MvccQueryTracker { try { MvccCoordinatorVersion rcvdVer = fut.get(); + assert rcvdVer != null; + boolean needRemap = false; synchronized (MvccQueryTracker.this) { @@ -168,6 +186,9 @@ public class MvccQueryTracker { }); } + /** + * @param topVer Current topology version. + */ private void waitNextTopology(AffinityTopologyVersion topVer) { assert canRemap; @@ -189,4 +210,9 @@ public class MvccQueryTracker { }); } } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(MvccQueryTracker.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java index 40b8e01..5631fed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/mvcc/NewCoordinatorQueryAckRequest.java @@ -54,7 +54,7 @@ public class NewCoordinatorQueryAckRequest implements MvccCoordinatorMessage { /** {@inheritDoc} */ @Override public boolean waitForCoordinatorInit() { - return true; + return false; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d92cfa43/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java index bec2725..074c4f8 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java @@ -1677,21 +1677,21 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testReadInProgressCoordinatorFailsSimple_FromServer() throws Exception { - readInProgressCoordinatorFails(false); + readInProgressCoordinatorFailsSimple(false); } /** * @throws Exception If failed. */ public void testReadInProgressCoordinatorFailsSimple_FromClient() throws Exception { - readInProgressCoordinatorFails(true); + readInProgressCoordinatorFailsSimple(true); } /** * @param fromClient {@code True} if read from client node, otherwise from server node. * @throws Exception If failed. */ - private void readInProgressCoordinatorFails(boolean fromClient) throws Exception { + private void readInProgressCoordinatorFailsSimple(boolean fromClient) throws Exception { testSpi = true; startGrids(4); @@ -1790,13 +1790,33 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ public void testReadInProgressCoordinatorFails() throws Exception { - startGrids(3); + readInProgressCoordinatorFails(false); + } + /** + * @throws Exception If failed. + */ + public void testReadInProgressCoordinatorFails_ReadDelay() throws Exception { + readInProgressCoordinatorFails(true); + } + + /** + * @param readDelay {@code True} if delays get requests. + * @throws Exception If failed. + */ + private void readInProgressCoordinatorFails(boolean readDelay) throws Exception { + final int COORD_NODES = 5; + final int SRV_NODES = 4; - startGridsMultiThreaded(3, 4); + if (readDelay) + testSpi = true; + + startGrids(COORD_NODES); + + startGridsMultiThreaded(COORD_NODES, SRV_NODES); client = true; - Ignite client = startGrid(7); + Ignite client = startGrid(COORD_NODES + SRV_NODES); final List<String> cacheNames = new ArrayList<>(); @@ -1807,12 +1827,16 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { for (int i = 0; i < KEYS; i++) vals.put(i, 0); + String[] exclude = new String[COORD_NODES]; + + for (int i = 0; i < COORD_NODES; i++) + exclude[i] = testNodeName(i); + for (CacheConfiguration ccfg : cacheConfigurations()) { ccfg.setName("cache-" + cacheNames.size()); - // First 3 server nodes are 'dedicated' coordinators. - ccfg.setNodeFilter(new TestCacheNodeExcludingFilter( - testNodeName(0), testNodeName(1), testNodeName(2))); + // First server nodes are 'dedicated' coordinators. + ccfg.setNodeFilter(new TestCacheNodeExcludingFilter(exclude)); cacheNames.add(ccfg.getName()); @@ -1825,6 +1849,17 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } } + if (readDelay) { + for (int i = COORD_NODES; i < COORD_NODES + SRV_NODES + 1; i++) { + TestRecordingCommunicationSpi.spi(ignite(i)).closure(new IgniteBiInClosure<ClusterNode, Message>() { + @Override public void apply(ClusterNode node, Message msg) { + if (msg instanceof GridNearGetRequest) + doSleep(ThreadLocalRandom.current().nextLong(50) + 1); + } + }); + } + } + final AtomicBoolean done = new AtomicBoolean(); try { @@ -1833,7 +1868,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { IgniteInternalFuture getFut = GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { @Override public Void call() throws Exception { try { - Ignite node = ignite(3 + (readNodeIdx.getAndIncrement() % 5)); + Ignite node = ignite(COORD_NODES + (readNodeIdx.getAndIncrement() % (SRV_NODES + 1))); int cnt = 0; @@ -1868,11 +1903,11 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { throw e; } } - }, 10, "get-thread"); + }, (SRV_NODES + 1) + 1, "get-thread"); IgniteInternalFuture putFut1 = GridTestUtils.runAsync(new Callable() { @Override public Void call() throws Exception { - Ignite node = ignite(3); + Ignite node = ignite(COORD_NODES); List<IgniteCache> caches = new ArrayList<>(); @@ -1909,7 +1944,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { IgniteInternalFuture putFut2 = GridTestUtils.runAsync(new Callable() { @Override public Void call() throws Exception { - Ignite node = ignite(3); + Ignite node = ignite(COORD_NODES); IgniteCache cache = node.cache(cacheNames.get(0)); @@ -1934,7 +1969,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest { } }, "put-thread"); - for (int i = 0; i < 3 && !getFut.isDone(); i++) { + for (int i = 0; i < COORD_NODES && !getFut.isDone(); i++) { U.sleep(3000); stopGrid(i);
