This is an automated email from the ASF dual-hosted git repository. bogong pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 79de91b5765764553fd82c23a581b9bdd1ecb0fd Author: Cong Zhao <[email protected]> AuthorDate: Tue Aug 9 14:07:31 2022 +0800 [fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920) (cherry picked from commit f02679d83402ac07e06c48570505f343e350a491) --- .../broker/auth/AuthorizationWithAuthDataTest.java | 3 +- .../io/elasticsearch/ElasticSearchClientTests.java | 92 ++++++++++++---------- 2 files changed, 52 insertions(+), 43 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java index ccb401f74b3..eb9ed006c13 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java @@ -25,6 +25,7 @@ import io.jsonwebtoken.SignatureAlgorithm; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Base64; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.UUID; @@ -262,7 +263,7 @@ public class AuthorizationWithAuthDataTest extends MockedPulsarServiceBaseTest { @Test public void testAdmin() throws PulsarAdminException { admin.tenants().createTenant("test-tenant-1", - TenantInfo.builder().allowedClusters(Set.of(configClusterName)).build()); + TenantInfo.builder().allowedClusters(Collections.singleton(configClusterName)).build()); admin.namespaces().createNamespace("test-tenant-1/test-namespace-1"); String partitionedTopic = UUID.randomUUID().toString(); admin.topics().createPartitionedTopic(partitionedTopic,3); diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java index 1a505dae31a..1c871159b50 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java @@ -18,6 +18,19 @@ */ package org.apache.pulsar.io.elasticsearch; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertThrows; +import static org.testng.Assert.assertTrue; +import java.io.IOException; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.atomic.LongAdder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.schema.GenericObject; @@ -32,20 +45,6 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.io.IOException; -import java.util.Optional; -import java.util.UUID; - -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertThrows; -import static org.testng.Assert.assertTrue; - @Slf4j public class ElasticSearchClientTests extends ElasticSearchTestBase { @@ -63,8 +62,16 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase { } static class MockRecord<T> implements Record<T> { - int acked = 0; - int failed = 0; + LongAdder acked = new LongAdder(); + LongAdder failed = new LongAdder(); + + public int getAcked() { + return acked.intValue(); + } + + public int getFailed() { + return failed.intValue(); + } @Override public T getValue() { @@ -73,12 +80,12 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase { @Override public void ack() { - acked++; + acked.increment(); } @Override public void fail() { - failed++; + failed.increment(); } } @@ -130,13 +137,13 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase { try { MockRecord<GenericObject> mockRecord = new MockRecord<>(); client.indexDocument(mockRecord, Pair.of("1", "{ \"a\":1}")); - assertEquals(mockRecord.acked, 1); - assertEquals(mockRecord.failed, 0); + assertEquals(mockRecord.getAcked(), 1); + assertEquals(mockRecord.getFailed(), 0); assertEquals(client.totalHits(index), 1); client.deleteDocument(mockRecord, "1"); - assertEquals(mockRecord.acked, 2); - assertEquals(mockRecord.failed, 0); + assertEquals(mockRecord.getAcked(), 2); + assertEquals(mockRecord.getFailed(), 0); assertEquals(client.totalHits(index), 0); } finally { client.delete(index); @@ -193,11 +200,11 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase { client.flush(); assertNotNull(client.irrecoverableError.get()); assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception")); - assertEquals(mockRecord.acked, 1); - assertEquals(mockRecord.failed, 1); + assertEquals(mockRecord.getAcked(), 1); + assertEquals(mockRecord.getFailed(), 1); assertThrows(Exception.class, () -> client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"))); - assertEquals(mockRecord.acked, 1); - assertEquals(mockRecord.failed, 2); + assertEquals(mockRecord.getAcked(), 1); + assertEquals(mockRecord.getFailed(), 2); } } @@ -215,8 +222,8 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase { client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}")); client.flush(); assertNull(client.irrecoverableError.get()); - assertEquals(mockRecord.acked, 1); - assertEquals(mockRecord.failed, 1); + assertEquals(mockRecord.getAcked(), 1); + assertEquals(mockRecord.getFailed(), 1); } } @@ -239,29 +246,28 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase { MockRecord<GenericObject> mockRecord = new MockRecord<>(); client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}")); client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}")); - assertEquals(mockRecord.acked, 2); - assertEquals(mockRecord.failed, 0); + assertEquals(mockRecord.getAcked(), 2); + assertEquals(mockRecord.getFailed(), 0); assertEquals(client.totalHits(index), 2); ChaosContainer<?> chaosContainer = ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 15); chaosContainer.start(); client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}")); - assertEquals(mockRecord.acked, 2); - assertEquals(mockRecord.failed, 0); + assertEquals(mockRecord.getAcked(), 2); + assertEquals(mockRecord.getFailed(), 0); assertEquals(client.totalHits(index), 2); chaosContainer.stop(); client.flush(); - assertEquals(mockRecord.acked, 3); - assertEquals(mockRecord.failed, 0); + assertEquals(mockRecord.getAcked(), 3); + assertEquals(mockRecord.getFailed(), 0); assertEquals(client.totalHits(index), 3); } finally { client.delete(index); } } } - @Test public void testBulkBlocking() throws Exception { final String index = "indexblocking-" + UUID.randomUUID(); @@ -284,14 +290,14 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase { } Awaitility.await().untilAsserted(() -> { - assertThat("acked record", mockRecord.acked, greaterThanOrEqualTo(4)); - assertEquals(mockRecord.failed, 0); + assertThat("acked record", mockRecord.getAcked(), greaterThanOrEqualTo(4)); + assertEquals(mockRecord.getFailed(), 0); assertThat("totalHits", client.totalHits(index), greaterThanOrEqualTo(4L)); }); client.flush(); Awaitility.await().untilAsserted(() -> { - assertEquals(mockRecord.failed, 0); - assertEquals(mockRecord.acked, 5); + assertEquals(mockRecord.getFailed(), 0); + assertEquals(mockRecord.getAcked(), 5); assertEquals(client.totalHits(index), 5); }); @@ -310,9 +316,11 @@ public class ElasticSearchClientTests extends ElasticSearchTestBase { assertTrue(elapsed > 29000); // bulkIndex was blocking while elasticsearch was down or busy Thread.sleep(1000L); - assertEquals(mockRecord.acked, 15); - assertEquals(mockRecord.failed, 0); - assertEquals(client.records.size(), 0); + Awaitility.await().untilAsserted(() -> { + assertEquals(mockRecord.getAcked(), 15); + assertEquals(mockRecord.getFailed(), 0); + assertEquals(client.records.size(), 0); + }); chaosContainer.stop(); } finally {
