This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 430e600d88f8db6d30717b7b2a9f1dabb2d1070c Author: YANGLiiN <[email protected]> AuthorDate: Mon Jul 5 22:50:15 2021 -0500 use Awaitility replace Thread.sleep (#11017) ### Motivation Currently the testcase use Thead.sleep in the pulsar-zookeeper-utils module. ### Modifications use Awaitility replace Thread.sleep (cherry picked from commit 490b2be8f3c2c9594ce459e073a6491fe96069a5) --- pulsar-zookeeper-utils/pom.xml | 6 +++ .../zookeeper/ZkBookieRackAffinityMappingTest.java | 18 +++---- ...kIsolatedBookieEnsemblePlacementPolicyTest.java | 48 ++++++++++++------- .../pulsar/zookeeper/ZookeeperCacheTest.java | 56 ++++++++-------------- 4 files changed, 67 insertions(+), 61 deletions(-) diff --git a/pulsar-zookeeper-utils/pom.xml b/pulsar-zookeeper-utils/pom.xml index 4b9bed47c69..a0bcb757e0e 100644 --- a/pulsar-zookeeper-utils/pom.xml +++ b/pulsar-zookeeper-utils/pom.xml @@ -124,6 +124,12 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>${project.groupId}</groupId> <artifactId>pulsar-common</artifactId> diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java index 2c276ed1e73..ea7f970ca1e 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkBookieRackAffinityMappingTest.java @@ -21,14 +21,12 @@ package org.apache.pulsar.zookeeper; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; - import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; - +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; - import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.util.ZkUtils; @@ -39,6 +37,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -158,7 +157,8 @@ public class ZkBookieRackAffinityMappingTest { ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Thread.sleep(100); + Awaitility.await() + .until(() -> localZkc.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) != null); racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3")); assertEquals(racks.get(0), "/rack0"); @@ -198,11 +198,12 @@ public class ZkBookieRackAffinityMappingTest { secondaryBookieGroup.put(BOOKIE3, BookieInfo.builder().rack("rack0").build()); bookieMapping.put("group2", secondaryBookieGroup); - localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - -1); + byte[] data = jsonMapper.writeValueAsBytes(bookieMapping); + localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data,-1); // wait for the zk to notify and update the mappings - Thread.sleep(100); + Awaitility.await().until(() -> Arrays + .equals(data, localZkc.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null))); racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3")); assertEquals(racks.get(0), "/rack0"); @@ -211,7 +212,8 @@ public class ZkBookieRackAffinityMappingTest { localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, "{}".getBytes(), -1); - Thread.sleep(100); + Awaitility.await().until(() -> Arrays.equals("{}".getBytes(), + localZkc.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null))); racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3")); assertEquals(racks.get(0), null); diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java index 496a8583f8b..02df09b64e2 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java @@ -24,6 +24,7 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.util.HashedWheelTimer; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -47,6 +48,7 @@ import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -107,7 +109,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Thread.sleep(100); + Awaitility.await() + .until(() -> localZkc.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) != null); ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy(); ClientConfiguration bkClientConf = new ClientConfiguration(); @@ -141,10 +144,11 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { secondaryBookieGroup.put(BOOKIE4, BookieInfo.builder().rack("rack0").build()); bookieMapping.put("group2", secondaryBookieGroup); - localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - -1); + byte[] data = jsonMapper.writeValueAsBytes(bookieMapping); + localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data, -1); - Thread.sleep(100); + Awaitility.await().until(() -> Arrays + .equals(data, localZkc.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null))); ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), null).getResult(); @@ -197,7 +201,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Thread.sleep(100); + Awaitility.await().until(() -> Arrays.equals(data.getBytes(StandardCharsets.UTF_8), + localZkc.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null))); List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult(); assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1).toBookieId())); @@ -227,10 +232,12 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { bookieMapping.put("group1", mainBookieGroup); bookieMapping.put("group2", secondaryBookieGroup); - ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, - jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + byte[] data = jsonMapper.writeValueAsBytes(bookieMapping); + ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data, + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Thread.sleep(100); + Awaitility.await().until(() -> Arrays.equals(data, + localZkc.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null))); ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy(); ClientConfiguration bkClientConf = new ClientConfiguration(); @@ -256,11 +263,12 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { bookieMapping.put("group1", mainBookieGroup); bookieMapping.put("group2", secondaryBookieGroup); - localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), - -1); + byte[] data2 = jsonMapper.writeValueAsBytes(bookieMapping); + localZkc.setData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data2, -1); // wait for the zk to notify and update the mappings - Thread.sleep(100); + Awaitility.await().until(() -> Arrays + .equals(data2, localZkc.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null))); ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(), new HashSet<>()).getResult(); assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1).toBookieId())); @@ -269,7 +277,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1); - Thread.sleep(100); + Awaitility.await() + .until(() -> localZkc.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) == null); isolationPolicy.newEnsemble(1, 1, 1, Collections.emptyMap(), new HashSet<>()); } @@ -285,7 +294,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Thread.sleep(100); + Awaitility.await() + .until(() -> localZkc.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) != null); ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy(); ClientConfiguration bkClientConf = new ClientConfiguration(); @@ -332,7 +342,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Thread.sleep(100); + Awaitility.await() + .until(() -> localZkc.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) != null); ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy(); ClientConfiguration bkClientConf = new ClientConfiguration(); @@ -379,7 +390,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Thread.sleep(100); + Awaitility.await() + .until(() -> localZkc.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) != null); ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy(); ClientConfiguration bkClientConf = new ClientConfiguration(); @@ -423,7 +435,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Thread.sleep(100); + Awaitility.await() + .until(() -> localZkc.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) != null); ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy(); ClientConfiguration bkClientConf = new ClientConfiguration(); @@ -473,7 +486,8 @@ public class ZkIsolatedBookieEnsemblePlacementPolicyTest { ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - Thread.sleep(100); + Awaitility.await() + .until(() -> localZkc.exists(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false) != null); // prepare a custom placement policy and put it into the custom metadata. The isolation policy should decode // from the custom metadata and apply it to the get black list method. diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java index a69dc9206ef..acbea3d29f4 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java @@ -19,8 +19,8 @@ package org.apache.pulsar.zookeeper; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -28,15 +28,12 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertNotNull; import static org.testng.AssertJUnit.assertNull; - import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.MoreExecutors; - import io.netty.util.concurrent.DefaultThreadFactory; - +import java.time.Duration; import java.util.Collections; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; import java.util.UUID; @@ -47,7 +44,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; - import lombok.Cleanup; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; @@ -60,15 +56,15 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event; import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; +import org.awaitility.Awaitility; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class ZookeeperCacheTest { private static final Logger log = LoggerFactory.getLogger(ZookeeperCacheTest.class); @@ -121,7 +117,7 @@ public class ZookeeperCacheTest { zkClient.setData("/my_test", newValue.getBytes(), -1); // Wait for the watch to be triggered - Thread.sleep(100); + Awaitility.await().until(() -> zkClient.exists("/my_test", false) != null); assertEquals(zkCache.get("/my_test").get(), newValue); @@ -166,18 +162,15 @@ public class ZookeeperCacheTest { zkClient.create("/test/z2", new byte[0], null, CreateMode.PERSISTENT); // Wait for cache to be updated in background - while (notificationCount.get() < 2) { - Thread.sleep(1); - } + Awaitility.await().until(() -> notificationCount.get() >= 2); assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1", "z2"))); assertEquals(cache.get("/test"), new TreeSet<String>(Lists.newArrayList("z1", "z2"))); assertEquals(notificationCount.get(), 2); zkClient.delete("/test/z2", -1); - while (notificationCount.get() < 3) { - Thread.sleep(1); - } + + Awaitility.await().until(() -> notificationCount.get() >= 3); assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1"))); assertEquals(cache.get(), new TreeSet<String>(Lists.newArrayList("z1"))); @@ -221,9 +214,8 @@ public class ZookeeperCacheTest { zkClient.create("/test/z1", new byte[0], null, CreateMode.PERSISTENT); // Wait for cache to be updated in background - while (notificationCount.get() < 1) { - Thread.sleep(1); - } + Awaitility.await().until(() -> notificationCount.get() >= 1); + final int recvNotifications = notificationCount.get(); @@ -232,9 +224,7 @@ public class ZookeeperCacheTest { assertTrue(recvNotifications == 1 || recvNotifications == 2); zkClient.delete("/test/z1", -1); - while (notificationCount.get() < (recvNotifications + 1)) { - Thread.sleep(1); - } + Awaitility.await().until(() -> notificationCount.get() >= recvNotifications + 1); assertTrue(cache.get().isEmpty()); assertTrue(cache.get().isEmpty()); @@ -256,14 +246,14 @@ public class ZookeeperCacheTest { public void testExistsCache() throws Exception { // Check existence after creation of the node zkClient.create("/test", new byte[0], null, CreateMode.PERSISTENT); - Thread.sleep(20); + Awaitility.await().until(() -> zkClient.exists("/test", false) != null); ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor); boolean exists = zkCacheService.exists("/test"); Assert.assertTrue(exists, "/test should exists in the cache"); // Check existence after deletion if the node zkClient.delete("/test", -1); - Thread.sleep(20); + Awaitility.await().until(() -> zkClient.exists("/test", false) == null); boolean shouldNotExist = zkCacheService.exists("/test"); Assert.assertFalse(shouldNotExist, "/test should not exist in the cache"); } @@ -273,7 +263,7 @@ public class ZookeeperCacheTest { zkClient.create("/test", new byte[0], null, CreateMode.PERSISTENT); zkClient.create("/test/c1", new byte[0], null, CreateMode.PERSISTENT); zkClient.create("/test/c2", new byte[0], null, CreateMode.PERSISTENT); - Thread.sleep(20); + Awaitility.await().until(() -> zkClient.exists("/test/c2", false) != null); ZooKeeperCache zkCacheService = new LocalZooKeeperCache(zkClient, 30, executor); boolean exists = zkCacheService.exists("/test"); Assert.assertTrue(exists, "/test should exists in the cache"); @@ -347,9 +337,7 @@ public class ZookeeperCacheTest { zkClient.create("/my_test2", value.getBytes(), null, CreateMode.PERSISTENT); // Wait for the watch to be triggered - while (notificationCount.get() < 1) { - Thread.sleep(1); - } + Awaitility.await().until(() -> notificationCount.get() >= 1); // retrieve the data from the cache and verify it is the updated/new data assertEquals(zkCache.get("/my_test").get(), newValue); @@ -497,7 +485,7 @@ public class ZookeeperCacheTest { } // (2) sleep to let cache to be invalidated async - Thread.sleep(1000); + Awaitility.await().until(()->zkCache.getAsync(key1).get().isPresent()); // (3) now, cache should be invalidate failed-future and should refetch the data assertEquals(zkCache.getAsync(key1).get().get(), value); @@ -515,7 +503,7 @@ public class ZookeeperCacheTest { // global-Zk session is connected now zkCacheService.zkSession.set(zkSession); // (5) sleep to let cache to be invalidated async - Thread.sleep(1000); + Awaitility.await().until(()->zkCache.getAsync(key1).get().isPresent()); // (6) now, cache should be invalidate failed-future and should refetch the data assertEquals(zkCache.getAsync(key1).get().get(), value); } @@ -622,11 +610,7 @@ public class ZookeeperCacheTest { private static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis) throws Exception { - for (int i = 0; i < retryCount; i++) { - if (predicate.test(null) || i == (retryCount - 1)) { - break; - } - Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i)); - } + Awaitility.await().between(Duration.ZERO, Duration.ofMillis(intSleepTimeInMillis * retryCount)) + .until(() -> predicate.test(null)); } }
