This is an automated email from the ASF dual-hosted git repository. mmuzaf pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new c325627 IGNITE-15208 Remove unnecessary rebalance order classes from exchange thread (#9285) c325627 is described below commit c32562791f777aa991634caa708845e8cc6d3e80 Author: Maxim Muzafarov <mmu...@apache.org> AuthorDate: Sat Aug 7 01:11:44 2021 +0300 IGNITE-15208 Remove unnecessary rebalance order classes from exchange thread (#9285) --- .../cache/GridCachePartitionExchangeManager.java | 205 ++++++--------------- .../processors/cache/GridCachePreloader.java | 26 +-- .../cache/GridCachePreloaderAdapter.java | 24 +-- .../dht/preloader/GridDhtPreloader.java | 24 +-- .../cache/CacheGroupsMetricsRebalanceTest.java | 36 +--- .../TxPartitionCounterStateConsistencyTest.java | 2 - 6 files changed, 82 insertions(+), 235 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 2d5626a..c4662a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -22,16 +22,17 @@ import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.NavigableMap; +import java.util.NavigableSet; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -48,6 +49,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteCompute; @@ -102,7 +104,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; @@ -112,7 +113,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.lat import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; -import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; @@ -305,9 +305,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** Histogram of blocking PME durations. */ private volatile HistogramMetricImpl blockingDurationHistogram; - /** Delay before rebalancing code is start executing after exchange completion. For tests only. */ - private volatile long rebalanceDelay; - /** Metric that shows whether cluster is in fully rebalanced state. */ private volatile BooleanMetricImpl rebalanced; @@ -2648,13 +2645,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param delay Rebalance delay. - */ - public void rebalanceDelay(long delay) { - this.rebalanceDelay = delay; - } - - /** * For testing only. * * @return Current version to wait for. @@ -3289,10 +3279,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana continue; } - Map<Integer, GridDhtPreloaderAssignments> assignsMap = null; - - boolean forcePreload = false; - GridDhtPartitionExchangeId exchId; GridDhtPartitionsExchangeFuture exchFut = null; @@ -3335,8 +3321,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } else if (task instanceof ForceRebalanceExchangeTask) { - forcePreload = true; - timeout = 0; // Force refresh. exchId = ((ForceRebalanceExchangeTask)task).exchangeId(); @@ -3485,84 +3469,33 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } if (rebalanceRequired(exchFut)) { - if (rebalanceDelay > 0) - U.sleep(rebalanceDelay); + NavigableSet<CacheGroupContext> assignsSet = cctx.cache().cacheGroups().stream() + .collect(Collectors.toCollection(() -> new TreeSet<>(new CacheRebalanceOrderComparator()))); - assignsMap = new HashMap<>(); - - IgniteCacheSnapshotManager snp = cctx.snapshot(); - - for (final CacheGroupContext grp : cctx.cache().cacheGroups()) { - long delay = grp.config().getRebalanceDelay(); - - boolean disableRebalance = snp.partitionsAreFrozen(grp); - - GridDhtPreloaderAssignments assigns = null; - - // Don't delay for dummy reassigns to avoid infinite recursion. - if ((delay == 0 || forcePreload) && !disableRebalance) - assigns = grp.preloader().generateAssignments(exchId, exchFut); - - assignsMap.put(grp.groupId(), assigns); - - if (resVer == null && !grp.isLocal()) - resVer = grp.topology().readyTopologyVersion(); - } - } - - if (resVer == null) - resVer = exchId.topologyVersion(); - - if (!F.isEmpty(assignsMap)) { - int size = assignsMap.size(); - - NavigableMap<CacheRebalanceOrder, List<Integer>> orderMap = new TreeMap<>(); - - for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) { - int grpId = e.getKey(); - - CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - - CacheRebalanceOrder order = new CacheRebalanceOrder( - grp.config().getRebalanceOrder(), - grp.config().getRebalanceMode()); - - if (orderMap.get(order) == null) - orderMap.put(order, new ArrayList<Integer>(size)); - - orderMap.get(order).add(grpId); - } - - RebalanceFuture r = null; + RebalanceFuture next = null; GridCompoundFuture<Boolean, Boolean> rebFut = new GridCompoundFuture<>(); - ArrayList<String> rebList = new ArrayList<>(size); - GridCompoundFuture<Boolean, Boolean> forcedRebFut = null; if (task instanceof ForceRebalanceExchangeTask) forcedRebFut = ((ForceRebalanceExchangeTask)task).forcedRebalanceFuture(); - for (CacheRebalanceOrder order : orderMap.descendingKeySet()) { - for (Integer grpId : orderMap.get(order)) { - CacheGroupContext grp = cctx.cache().cacheGroup(grpId); + for (CacheGroupContext grp : assignsSet.descendingSet()) { + boolean disableRebalance = cctx.snapshot().partitionsAreFrozen(grp); - GridDhtPreloaderAssignments assigns = assignsMap.get(grpId); - - RebalanceFuture cur = grp.preloader().addAssignments(assigns, - forcePreload, - cnt, - r, - forcedRebFut, - rebFut); + if (disableRebalance) + continue; - if (cur != null) { - rebList.add(grp.cacheOrGroupName()); + RebalanceFuture cur = grp.preloader().prepare(exchId, + exchFut, + cnt, + next, + forcedRebFut, + rebFut); - r = cur; - } - } + if (cur != null) + next = cur; } rebFut.markInitialized(); @@ -3570,15 +3503,16 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (forcedRebFut != null) forcedRebFut.markInitialized(); - if (r != null) { - Collections.reverse(rebList); - - RebalanceFuture finalR = r; + if (next != null) { + RebalanceFuture finalR = next; // Waits until compatible rebalances are finished. // Start rebalancing cache groups chain. Each group will be rebalanced // sequentially one by one e.g.: // ignite-sys-cache -> cacheGroupR1 -> cacheGroupP2 -> cacheGroupR3 + List<String> rebList = assignsSet.stream().map(CacheGroupContext::cacheOrGroupName) + .collect(Collectors.toList()); + long rebId = cnt; rebFut.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { @@ -3594,6 +3528,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana }); } else { + resVer = resVer == null ? assignsSet.stream() + .filter(g -> !g.isLocal()) + .map(g -> g.topology().readyTopologyVersion()) + .filter(Objects::nonNull) + .findFirst() + .orElse(exchId.topologyVersion()) : resVer; + U.log(log, "Skipping rebalancing (nothing scheduled) " + "[top=" + resVer + ", force=" + (exchFut == null) + ", evt=" + exchId.discoveryEventName() + @@ -3602,7 +3543,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } else { U.log(log, "Skipping rebalancing (no affinity changes) " + - "[top=" + resVer + + "[top=" + resVer == null ? exchId.topologyVersion() : resVer + ", evt=" + exchId.discoveryEventName() + ", evtNode=" + exchId.nodeId() + ", client=" + cctx.kernalContext().clientNode() + ']'); @@ -3638,10 +3579,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * Rebalance is not required on a client node and is always required when the exchange future is null. - * In other cases, this method checks all caches and decides whether rebalancing is required or not - * for the specific exchange. * - * @param exchFut Exchange future. + * @param exchFut Exchange future or {@code null} if it is force rebalance task. * @return {@code True} if rebalance is required at least for one of cache groups. */ private boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) { @@ -3651,15 +3590,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (exchFut == null) return true; - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal()) - continue; - - if (grp.preloader().rebalanceRequired(exchFut)) - return true; - } - - return false; + return lastAffinityChangedTopologyVersion(exchFut.topologyVersion()).equals(exchFut.topologyVersion()); } } @@ -4028,61 +3959,29 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * Represents a cache rebalance order that takes into account both values: rebalance order itself and rebalance mode. * It is assumed SYNC caches should be rebalanced in the first place. */ - private static class CacheRebalanceOrder implements Comparable<CacheRebalanceOrder> { - /** Cache rebalance order. */ - private int order; - - /** Cache rebalance mode. */ - private CacheRebalanceMode mode; - - /** - * Creates a new instance of CacheRebalanceOrder. - * - * @param order Cache rebalance order. - * @param mode Cache rebalance mode. - */ - public CacheRebalanceOrder(int order, CacheRebalanceMode mode) { - this.order = order; - this.mode = mode; - } - + private static class CacheRebalanceOrderComparator implements Comparator<CacheGroupContext> { /** {@inheritDoc} */ - @Override public int compareTo(@NotNull CacheRebalanceOrder o) { - if (order == o.order) { - if (mode == o.mode) - return 0; - - switch (mode) { - case SYNC: return -1; - case ASYNC: return o.mode == CacheRebalanceMode.SYNC ? 1 : -1; - case NONE: return 1; + @Override public int compare(CacheGroupContext ctx1, CacheGroupContext ctx2) { + CacheConfiguration<?, ?> cfg1 = ctx1.config(); + CacheConfiguration<?, ?> cfg2 = ctx2.config(); + + if (cfg1.getRebalanceOrder() == cfg2.getRebalanceOrder()) { + if (cfg1.getRebalanceMode() == cfg2.getRebalanceMode()) + return ctx1.cacheOrGroupName().compareTo(ctx2.cacheOrGroupName()); + + switch (cfg1.getRebalanceMode()) { + case SYNC: + return -1; + case ASYNC: + return cfg2.getRebalanceMode() == CacheRebalanceMode.SYNC ? 1 : -1; + case NONE: + return 1; default: - throw new IllegalArgumentException("Unknown cache rebalance mode [mode=" + mode + ']'); + throw new IllegalArgumentException("Unknown cache rebalance mode [mode=" + cfg1.getRebalanceMode() + ']'); } } else - return (order < o.order) ? -1 : 1; - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - CacheRebalanceOrder order1 = (CacheRebalanceOrder)o; - - if (order != order1.order) - return false; - return mode == order1.mode; - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int result = order; - result = 31 * result + mode.hashCode(); - return result; + return (cfg1.getRebalanceOrder() < cfg2.getRebalanceOrder()) ? -1 : 1; } } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 4da69ca..82bf6d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -26,13 +26,11 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander.RebalanceFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.jetbrains.annotations.Nullable; @@ -66,33 +64,17 @@ public interface GridCachePreloader { public void onInitialExchangeComplete(@Nullable Throwable err); /** - * @param exchFut Completed exchange future. - * @return {@code True} if rebalance should be started (previous will be interrupted). - */ - public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut); - - /** * @param exchId Exchange ID. * @param exchFut Completed exchange future. Can be {@code null} if forced or reassigned generation occurs. - * @return Partition assignments which will be requested from supplier nodes. - */ - @Nullable public GridDhtPreloaderAssignments generateAssignments( - GridDhtPartitionExchangeId exchId, - @Nullable GridDhtPartitionsExchangeFuture exchFut); - - /** - * Adds assignments to preloader. - * - * @param assignments Assignments to add. - * @param forcePreload {@code True} if preload requested by {@link ForceRebalanceExchangeTask}. * @param rebalanceId Rebalance id created by exchange thread. - * @param next Rebalance's future follows after the current one. + * @param next Rebalance future follows after the current one. * @param forcedRebFut External future for forced rebalance. * @param compatibleRebFut Future for waiting for compatible rebalances. * @return Future if rebalance was planned or null. */ - public RebalanceFuture addAssignments(GridDhtPreloaderAssignments assignments, - boolean forcePreload, + public RebalanceFuture prepare( + GridDhtPartitionExchangeId exchId, + @Nullable GridDhtPartitionsExchangeFuture exchFut, long rebalanceId, final RebalanceFuture next, @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut, diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 31d209b..2078204 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -138,25 +138,27 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { // No-op. } - /** {@inheritDoc} */ - @Override public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) { - return true; - } - - /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments generateAssignments( + /** + * @param exchId Exchange ID. + * @param exchFut Completed exchange future. Can be {@code null} if forced or reassigned generation occurs. + * @return Partition assignments which will be requested from supplier nodes. + */ + public GridDhtPreloaderAssignments generateAssignments( GridDhtPartitionExchangeId exchId, - GridDhtPartitionsExchangeFuture exchFut) { + GridDhtPartitionsExchangeFuture exchFut + ) { return null; } /** {@inheritDoc} */ - @Override public RebalanceFuture addAssignments(GridDhtPreloaderAssignments assignments, - boolean forcePreload, + @Override public RebalanceFuture prepare( + GridDhtPartitionExchangeId exchId, + GridDhtPartitionsExchangeFuture exchFut, long rebalanceId, RebalanceFuture next, @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut, - GridCompoundFuture<Boolean, Boolean> compatibleRebFut) { + GridCompoundFuture<Boolean, Boolean> compatibleRebFut + ) { return null; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 8c3b446..21f2652 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -159,17 +159,6 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public boolean rebalanceRequired(GridDhtPartitionsExchangeFuture exchFut) { - if (ctx.kernalContext().clientNode()) - return false; // No-op. - - AffinityTopologyVersion lastAffChangeTopVer = - ctx.exchange().lastAffinityChangedTopologyVersion(exchFut.topologyVersion()); - - return lastAffChangeTopVer.equals(exchFut.topologyVersion()); - } - - /** {@inheritDoc} */ @Override public GridDhtPreloaderAssignments generateAssignments( GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut @@ -378,15 +367,20 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public RebalanceFuture addAssignments( - GridDhtPreloaderAssignments assignments, - boolean forceRebalance, + @Override public RebalanceFuture prepare( + GridDhtPartitionExchangeId exchId, + @Nullable GridDhtPartitionsExchangeFuture exchFut, long rebalanceId, final RebalanceFuture next, @Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut, GridCompoundFuture<Boolean, Boolean> compatibleRebFut ) { - return demander.addAssignments(assignments, forceRebalance, rebalanceId, next, forcedRebFut, compatibleRebFut); + long delay = grp.config().getRebalanceDelay(); + boolean forceRebalance = forcedRebFut != null; + + // Don't delay for dummy reassigns to avoid infinite recursion. + return (delay == 0 || forceRebalance) ? demander.addAssignments(generateAssignments(exchId, exchFut), forceRebalance, + rebalanceId, next, forcedRebFut, compatibleRebFut) : null; } /** diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java index 4635830..b1fe03e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheGroupsMetricsRebalanceTest.java @@ -28,7 +28,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.ToLongFunction; import java.util.stream.Collectors; -import com.google.common.collect.Lists; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; @@ -48,7 +47,6 @@ import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.impl.ObjectGauge; -import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.A; @@ -194,37 +192,13 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { cache2.put(i, CACHE2 + "-" + i); } - final CountDownLatch startStopRebalanceLatch = new CountDownLatch(1); - final CountDownLatch finishStopRebalanceLatch = new CountDownLatch(1); - GridFutureAdapter<Void> stopRebalanceResFut = new GridFutureAdapter<>(); + ignite = startGrid(1); - ignite = startGrid(1, cfg -> { - cfg.setLocalEventListeners(Collections.singletonMap( - (IgnitePredicate<Event>)evt -> { - startStopRebalanceLatch.countDown(); - - try { - assertTrue(finishStopRebalanceLatch.await(getTestTimeout(), TimeUnit.SECONDS)); - - stopRebalanceResFut.onDone(); - } - catch (Throwable e) { - stopRebalanceResFut.onDone(e); - } - - return false; - }, - new int[] {EventType.EVT_CACHE_REBALANCE_STOPPED} - )); - }); - - assertTrue(startStopRebalanceLatch.await(getTestTimeout(), TimeUnit.SECONDS)); + awaitPartitionMapExchange(true, true, null, true); CacheMetrics metrics1 = ignite.cache(CACHE1).localMetrics(); CacheMetrics metrics2 = ignite.cache(CACHE2).localMetrics(); - finishStopRebalanceLatch.countDown(); - long rate1 = metrics1.getRebalancingKeysRate(); long rate2 = metrics2.getRebalancingKeysRate(); @@ -234,8 +208,6 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { assertEquals(metrics1.getRebalancedKeys(), rate1); assertEquals(metrics2.getRebalancedKeys(), rate2); - - stopRebalanceResFut.get(getTestTimeout()); } /** @@ -245,7 +217,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { public void testCacheGroupRebalance() throws Exception { IgniteEx ignite0 = startGrid(0); - List<String> cacheNames = Lists.newArrayList(CACHE4, CACHE5); + List<String> cacheNames = Arrays.asList(CACHE4, CACHE5); int allKeysCount = 0; @@ -370,7 +342,7 @@ public class CacheGroupsMetricsRebalanceTest extends GridCommonAbstractTest { IgniteEx ignite0 = startGrid(0); - List<String> cacheNames = Lists.newArrayList(CACHE4, CACHE5); + List<String> cacheNames = Arrays.asList(CACHE4, CACHE5); for (String cacheName : cacheNames) { ignite0.getOrCreateCache(cacheName).putAll(new Random().ints(KEYS_COUNT).distinct().boxed() diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java index e6f3bbaf..5c78e3f 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxPartitionCounterStateConsistencyTest.java @@ -1185,8 +1185,6 @@ public class TxPartitionCounterStateConsistencyTest extends TxPartitionCounterSt grid(0).resetLostPartitions(Collections.singleton(DEFAULT_CACHE_NAME)); } - prim.context().cache().context().exchange().rebalanceDelay(500); - Random r = new Random(); AtomicBoolean stop = new AtomicBoolean();