This is an automated email from the ASF dual-hosted git repository.
irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new cebc76f IGNITE-12545 Introduce listener interface for components to
react to partition map exchange events - Fixes #7263.
cebc76f is described below
commit cebc76f1f5d7d077093ddaecbabcd8db2106c60a
Author: Ivan Rakov <[email protected]>
AuthorDate: Mon Jan 20 20:08:44 2020 +0300
IGNITE-12545 Introduce listener interface for components to react to
partition map exchange events - Fixes #7263.
---
.../cache/GridCachePartitionExchangeManager.java | 29 +++
.../preloader/GridDhtPartitionsExchangeFuture.java | 12 +
.../dht/preloader/PartitionsExchangeAware.java | 69 ++++++
.../distributed/PartitionsExchangeAwareTest.java | 272 +++++++++++++++++++++
.../testsuites/IgniteCacheMvccTestSuite6.java | 4 +
.../ignite/testsuites/IgniteCacheTestSuite6.java | 3 +
6 files changed, 389 insertions(+)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index eba10ff..0e4a5ff 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -96,6 +96,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.StopCachesOnClientReconnectExchangeTask;
import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager;
@@ -273,6 +274,9 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
/** Distributed latch manager. */
private ExchangeLatchManager latchMgr;
+ /** List of exchange aware components. */
+ private final List<PartitionsExchangeAware> exchangeAwareComps = new
ArrayList<>();
+
/** Histogram of PME durations. */
private volatile HistogramMetric durationHistogram;
@@ -1204,6 +1208,31 @@ public class GridCachePartitionExchangeManager<K, V>
extends GridCacheSharedMana
}
/**
+ * Registers component that will be notified on every partition map
exchange.
+ *
+ * @param comp Component to be registered.
+ */
+ public void registerExchangeAwareComponent(PartitionsExchangeAware comp) {
+ exchangeAwareComps.add(comp);
+ }
+
+ /**
+ * Removes exchange aware component from list of listeners.
+ *
+ * @param comp Component to be registered.
+ */
+ public void unregisterExchangeAwareComponent(PartitionsExchangeAware comp)
{
+ exchangeAwareComps.remove(comp);
+ }
+
+ /**
+ * @return List of registered exchange listeners.
+ */
+ public List<PartitionsExchangeAware> exchangeAwareComponents() {
+ return U.sealList(exchangeAwareComps);
+ }
+
+ /**
* Partition refresh callback for selected cache groups.
* For coordinator causes {@link GridDhtPartitionsFullMessage
FullMessages} send,
* for non coordinator - {@link GridDhtPartitionsSingleMessage
SingleMessages} send
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 46636b1..890e28b 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -884,6 +884,9 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
exchangeType = exchange;
+ for (PartitionsExchangeAware comp :
cctx.exchange().exchangeAwareComponents())
+ comp.onInitBeforeTopologyLock(this);
+
updateTopologies(crdNode);
timeBag.finishGlobalStage("Determine exchange type");
@@ -931,6 +934,9 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
}
}
+ for (PartitionsExchangeAware comp :
cctx.exchange().exchangeAwareComponents())
+ comp.onInitAfterTopologyLock(this);
+
if (exchLog.isInfoEnabled())
exchLog.info("Finished exchange init [topVer=" + topVer + ",
crd=" + crdNode + ']');
}
@@ -2319,6 +2325,9 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
if (err == null)
cctx.coordinators().onExchangeDone(events().discoveryCache());
+ for (PartitionsExchangeAware comp :
cctx.exchange().exchangeAwareComponents())
+ comp.onDoneBeforeTopologyUnlock(this);
+
// Create and destory caches and cache proxies.
cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
@@ -2423,6 +2432,9 @@ public class GridDhtPartitionsExchangeFuture extends
GridDhtTopologyFutureAdapte
}
}
+ for (PartitionsExchangeAware comp :
cctx.exchange().exchangeAwareComponents())
+ comp.onDoneAfterTopologyUnlock(this);
+
if (firstDiscoEvt instanceof DiscoveryCustomEvent)
((DiscoveryCustomEvent)firstDiscoEvt).customMessage(null);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java
new file mode 100644
index 0000000..7318726
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/PartitionsExchangeAware.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
+
+/**
+ * Interface which allows to subscribe a component for partition map exchange
events
+ * (via {@link
GridCachePartitionExchangeManager#registerExchangeAwareComponent(PartitionsExchangeAware)}).
+ * Heavy computations shouldn't be performed in listener methods: aware
components will be notified
+ * synchronously from exchange thread.
+ * Runtime exceptions thrown by listener methods will trigger failure handler
(as per exchange thread is critical).
+ * Please ensure that your implementation will never throw an exception if you
subscribe to exchange events for
+ * non-system-critical activities.
+ */
+public interface PartitionsExchangeAware {
+ /**
+ * Callback from exchange process initialization; called before topology
is locked.
+ *
+ * @param fut Partition map exchange future.
+ */
+ public default void
onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ // No-op.
+ }
+
+ /**
+ * Callback from exchange process initialization; called after topology is
locked.
+ * Guarantees that no more data updates will be performed on local node
until exchange process is completed.
+ *
+ * @param fut Partition map exchange future.
+ */
+ public default void
onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ // No-op.
+ }
+
+ /**
+ * Callback from exchange process completion; called before topology is
unlocked.
+ * Guarantees that no updates were performed on local node since exchange
process started.
+ *
+ * @param fut Partition map exchange future.
+ */
+ public default void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ // No-op.
+ }
+
+ /**
+ * Callback from exchange process completion; called after topology is
unlocked.
+ *
+ * @param fut Partition map exchange future.
+ */
+ public default void
onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ // No-op.
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PartitionsExchangeAwareTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PartitionsExchangeAwareTest.java
new file mode 100644
index 0000000..2070dcd
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/PartitionsExchangeAwareTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteInterruptedException;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test covering semantics of {@link PartitionsExchangeAware} interface for
components.
+ */
+public class PartitionsExchangeAwareTest extends GridCommonAbstractTest {
+ /** Nodes count. */
+ private static final int NODES_CNT = 2;
+
+ /** Ip finder. */
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new
TcpDiscoveryVmIpFinder(true);
+
+ /** Atomic cache name. */
+ private static final String ATOMIC_CACHE_NAME = "atomic";
+
+ /** Transactional cache name. */
+ private static final String TX_CACHE_NAME = "tx";
+
+ /** Timeout seconds. */
+ public static final int TIMEOUT_SECONDS = 10;
+
+ /** Initialize before lock reached latch. */
+ private CountDownLatch initBeforeLockReachedLatch;
+
+ /** Initialize before lock wait latch. */
+ private CountDownLatch initBeforeLockWaitLatch;
+
+ /** Initialize after lock reached latch. */
+ private CountDownLatch initAfterLockReachedLatch;
+
+ /** Initialize after lock wait latch. */
+ private CountDownLatch initAfterLockWaitLatch;
+
+ /** On done before lock reached latch. */
+ private CountDownLatch onDoneBeforeUnlockReachedLatch;
+
+ /** On done before lock wait latch. */
+ private CountDownLatch onDoneBeforeUnlockWaitLatch;
+
+ /** On done after lock reached latch. */
+ private CountDownLatch onDoneAfterUnlockReachedLatch;
+
+ /** On done after lock wait latch. */
+ private CountDownLatch onDoneAfterUnlockWaitLatch;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+ cfg.setDataStorageConfiguration(new
DataStorageConfiguration().setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration().setMaxSize(100 * 1024 * 1024)));
+
+ CacheConfiguration atomicCfg = new CacheConfiguration()
+ .setName(ATOMIC_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, 16));
+
+ CacheConfiguration txCfg = new CacheConfiguration()
+ .setName(TX_CACHE_NAME)
+ .setAffinity(new RendezvousAffinityFunction(false, 16));
+
+ cfg.setCacheConfiguration(atomicCfg, txCfg);
+
+ return cfg;
+ }
+
+ /**
+ * Init before test.
+ */
+ @Before
+ public void init() {
+ initBeforeLockReachedLatch = new CountDownLatch(NODES_CNT);
+ initBeforeLockWaitLatch = new CountDownLatch(1);
+
+ initAfterLockReachedLatch = new CountDownLatch(NODES_CNT);
+ initAfterLockWaitLatch = new CountDownLatch(1);
+
+ onDoneBeforeUnlockReachedLatch = new CountDownLatch(NODES_CNT);
+ onDoneBeforeUnlockWaitLatch = new CountDownLatch(1);
+
+ onDoneAfterUnlockReachedLatch = new CountDownLatch(NODES_CNT);
+ onDoneAfterUnlockWaitLatch = new CountDownLatch(1);
+
+ stopAllGrids();
+ }
+
+ /**
+ * Cleanup after test.
+ */
+ @After
+ public void cleanUp() {
+ initBeforeLockWaitLatch.countDown();
+ initAfterLockWaitLatch.countDown();
+ onDoneBeforeUnlockWaitLatch.countDown();
+ onDoneAfterUnlockWaitLatch.countDown();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Checks that updates are impossible during PME exactly from the moment
topologies are locked
+ * and until exchange future is completed.
+ *
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testPartitionsExchangeAware() throws Exception {
+ startGrids(2);
+
+ awaitPartitionMapExchange();
+
+ IgniteEx ig0 = grid(0);
+ IgniteEx ig1 = grid(1);
+
+ IgniteCache<Integer, Integer> atomicCache =
ig0.cache(ATOMIC_CACHE_NAME);
+ IgniteCache<Integer, Integer> txCache = ig1.cache(TX_CACHE_NAME);
+
+ PartitionsExchangeAware exchangeAware = new PartitionsExchangeAware() {
+ /** {@inheritDoc} */
+ @Override public void
onInitBeforeTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ try {
+ initBeforeLockReachedLatch.countDown();
+ initBeforeLockWaitLatch.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
onInitAfterTopologyLock(GridDhtPartitionsExchangeFuture fut) {
+ try {
+ initAfterLockReachedLatch.countDown();
+ initAfterLockWaitLatch.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ try {
+ onDoneBeforeUnlockReachedLatch.countDown();
+ onDoneBeforeUnlockWaitLatch.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void
onDoneAfterTopologyUnlock(GridDhtPartitionsExchangeFuture fut) {
+ try {
+ onDoneAfterUnlockReachedLatch.countDown();
+ onDoneAfterUnlockWaitLatch.await();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteInterruptedException(e);
+ }
+ }
+ };
+
+
ig0.context().cache().context().exchange().registerExchangeAwareComponent(exchangeAware);
+
ig1.context().cache().context().exchange().registerExchangeAwareComponent(exchangeAware);
+
+ GridTestUtils.runAsync(() -> startGrid(2));
+
+ assertTrue(initBeforeLockReachedLatch.await(TIMEOUT_SECONDS,
TimeUnit.SECONDS));
+ assertUpdateIsPossible(atomicCache, txCache, true);
+
+ initBeforeLockWaitLatch.countDown();
+
+ assertTrue(initAfterLockReachedLatch.await(TIMEOUT_SECONDS,
TimeUnit.SECONDS));
+ assertUpdateIsPossible(atomicCache, txCache, false);
+
+ initAfterLockWaitLatch.countDown();
+
+ assertTrue(onDoneBeforeUnlockReachedLatch.await(TIMEOUT_SECONDS,
TimeUnit.SECONDS));
+ assertUpdateIsPossible(atomicCache, txCache, false);
+
+ onDoneBeforeUnlockWaitLatch.countDown();
+
+ assertTrue(onDoneAfterUnlockReachedLatch.await(TIMEOUT_SECONDS,
TimeUnit.SECONDS));
+ assertUpdateIsPossible(atomicCache, txCache, true);
+
+ onDoneAfterUnlockWaitLatch.countDown();
+
+ System.out.println("^^^^success");
+ }
+
+
+ /**
+ * Asserts that update operations do (or don't) hang according to the
passed flag.
+ *
+ * @param atomicCache Atomic cache.
+ * @param txCache Tx cache.
+ * @param updatePossible Falg whether update is possible.
+ */
+ private static void assertUpdateIsPossible(
+ IgniteCache<Integer, Integer> atomicCache,
+ IgniteCache<Integer, Integer> txCache,
+ boolean updatePossible
+ ) throws IgniteInterruptedCheckedException {
+ Map<Integer, Integer> putAllArg = new HashMap<>();
+ IntStream.of(100).forEach(i -> putAllArg.put(i, i));
+
+ final IgniteInternalFuture txUpdateFut = GridTestUtils.runAsync(new
Runnable() {
+ @Override public void run() {
+ txCache.putAll(putAllArg);
+ }
+ });
+
+ final IgniteInternalFuture atomicUpdateFut =
GridTestUtils.runAsync(new Runnable() {
+ @Override public void run() {
+ atomicCache.putAll(putAllArg);
+ }
+ });
+
+ assertEquals(updatePossible, GridTestUtils.waitForCondition(new
GridAbsPredicate() {
+ @Override public boolean apply() {
+ assertTrue(txUpdateFut.isDone() == atomicUpdateFut.isDone());
+
+ return atomicUpdateFut.isDone();
+ }
+ }, TIMEOUT_SECONDS * 1000));
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
index 7c30da0..8596a41 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheMvccTestSuite6.java
@@ -39,6 +39,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.ExchangeMergeStal
import
org.apache.ignite.internal.processors.cache.distributed.GridCachePartitionEvictionDuringReadThroughSelfTest;
import
org.apache.ignite.internal.processors.cache.distributed.IgniteCache150ClientsTest;
import
org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
+import
org.apache.ignite.internal.processors.cache.distributed.PartitionsExchangeAwareTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxLocalDhtMixedCacheModesTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticOnPartitionExchangeTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxOptimisticPrepareOnUnstableTopologyTest;
@@ -99,6 +100,9 @@ public class IgniteCacheMvccTestSuite6 {
suite.add(PartitionedMvccTxPessimisticCacheGetsDistributionTest.class);
suite.add(ReplicatedMvccTxPessimisticCacheGetsDistributionTest.class);
+ // This exchange test is irrelevant to MVCC.
+ suite.add(PartitionsExchangeAwareTest.class);
+
return suite;
}
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
index ee02b56..d8bf783 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite6.java
@@ -41,6 +41,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.IgniteCache150Cli
import
org.apache.ignite.internal.processors.cache.distributed.IgniteCacheThreadLocalTxTest;
import
org.apache.ignite.internal.processors.cache.distributed.IgniteOptimisticTxSuspendResumeTest;
import
org.apache.ignite.internal.processors.cache.distributed.IgnitePessimisticTxSuspendResumeTest;
+import
org.apache.ignite.internal.processors.cache.distributed.PartitionsExchangeAwareTest;
import org.apache.ignite.internal.processors.cache.transactions.TxLabelTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxLocalDhtMixedCacheModesTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxMultiCacheAsyncOpsTest;
@@ -144,6 +145,8 @@ public class IgniteCacheTestSuite6 {
GridTestUtils.addTestIfNeeded(suite,
TxRollbackOnMapOnInvalidTopologyTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
PartitionsExchangeAwareTest.class, ignoredTests);
+
return suite;
}
}