This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 79de91b5765764553fd82c23a581b9bdd1ecb0fd
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Aug 9 14:07:31 2022 +0800

    [fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920)
    
    (cherry picked from commit f02679d83402ac07e06c48570505f343e350a491)
---
 .../broker/auth/AuthorizationWithAuthDataTest.java |  3 +-
 .../io/elasticsearch/ElasticSearchClientTests.java | 92 ++++++++++++----------
 2 files changed, 52 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java
index ccb401f74b3..eb9ed006c13 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/AuthorizationWithAuthDataTest.java
@@ -25,6 +25,7 @@ import io.jsonwebtoken.SignatureAlgorithm;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
@@ -262,7 +263,7 @@ public class AuthorizationWithAuthDataTest extends 
MockedPulsarServiceBaseTest {
     @Test
     public void testAdmin() throws PulsarAdminException {
         admin.tenants().createTenant("test-tenant-1",
-                
TenantInfo.builder().allowedClusters(Set.of(configClusterName)).build());
+                
TenantInfo.builder().allowedClusters(Collections.singleton(configClusterName)).build());
         admin.namespaces().createNamespace("test-tenant-1/test-namespace-1");
         String partitionedTopic = UUID.randomUUID().toString();
         admin.topics().createPartitionedTopic(partitionedTopic,3);
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index 1a505dae31a..1c871159b50 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -18,6 +18,19 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.atomic.LongAdder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.schema.GenericObject;
@@ -32,20 +45,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.util.Optional;
-import java.util.UUID;
-
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-
 @Slf4j
 public class ElasticSearchClientTests extends ElasticSearchTestBase {
 
@@ -63,8 +62,16 @@ public class ElasticSearchClientTests extends 
ElasticSearchTestBase {
     }
 
     static class MockRecord<T> implements Record<T> {
-        int acked = 0;
-        int failed = 0;
+        LongAdder acked = new LongAdder();
+        LongAdder failed = new LongAdder();
+
+        public int getAcked() {
+            return acked.intValue();
+        }
+
+        public int getFailed() {
+            return failed.intValue();
+        }
 
         @Override
         public T getValue() {
@@ -73,12 +80,12 @@ public class ElasticSearchClientTests extends 
ElasticSearchTestBase {
 
         @Override
         public void ack() {
-            acked++;
+            acked.increment();
         }
 
         @Override
         public void fail() {
-            failed++;
+            failed.increment();
         }
     }
 
@@ -130,13 +137,13 @@ public class ElasticSearchClientTests extends 
ElasticSearchTestBase {
             try {
                 MockRecord<GenericObject> mockRecord = new MockRecord<>();
                 client.indexDocument(mockRecord, Pair.of("1", "{ \"a\":1}"));
-                assertEquals(mockRecord.acked, 1);
-                assertEquals(mockRecord.failed, 0);
+                assertEquals(mockRecord.getAcked(), 1);
+                assertEquals(mockRecord.getFailed(), 0);
                 assertEquals(client.totalHits(index), 1);
 
                 client.deleteDocument(mockRecord, "1");
-                assertEquals(mockRecord.acked, 2);
-                assertEquals(mockRecord.failed, 0);
+                assertEquals(mockRecord.getAcked(), 2);
+                assertEquals(mockRecord.getFailed(), 0);
                 assertEquals(client.totalHits(index), 0);
             } finally {
                 client.delete(index);
@@ -193,11 +200,11 @@ public class ElasticSearchClientTests extends 
ElasticSearchTestBase {
             client.flush();
             assertNotNull(client.irrecoverableError.get());
             
assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
-            assertEquals(mockRecord.acked, 1);
-            assertEquals(mockRecord.failed, 1);
+            assertEquals(mockRecord.getAcked(), 1);
+            assertEquals(mockRecord.getFailed(), 1);
             assertThrows(Exception.class, () -> client.bulkIndex(mockRecord, 
Pair.of("3", "{\"a\":3}")));
-            assertEquals(mockRecord.acked, 1);
-            assertEquals(mockRecord.failed, 2);
+            assertEquals(mockRecord.getAcked(), 1);
+            assertEquals(mockRecord.getFailed(), 2);
         }
     }
 
@@ -215,8 +222,8 @@ public class ElasticSearchClientTests extends 
ElasticSearchTestBase {
             client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
             client.flush();
             assertNull(client.irrecoverableError.get());
-            assertEquals(mockRecord.acked, 1);
-            assertEquals(mockRecord.failed, 1);
+            assertEquals(mockRecord.getAcked(), 1);
+            assertEquals(mockRecord.getFailed(), 1);
         }
     }
 
@@ -239,29 +246,28 @@ public class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                 MockRecord<GenericObject> mockRecord = new MockRecord<>();
                 client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
                 client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
-                assertEquals(mockRecord.acked, 2);
-                assertEquals(mockRecord.failed, 0);
+                assertEquals(mockRecord.getAcked(), 2);
+                assertEquals(mockRecord.getFailed(), 0);
                 assertEquals(client.totalHits(index), 2);
 
                 ChaosContainer<?> chaosContainer = 
ChaosContainer.pauseContainerForSeconds(container.getContainerName(), 15);
                 chaosContainer.start();
 
                 client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
-                assertEquals(mockRecord.acked, 2);
-                assertEquals(mockRecord.failed, 0);
+                assertEquals(mockRecord.getAcked(), 2);
+                assertEquals(mockRecord.getFailed(), 0);
                 assertEquals(client.totalHits(index), 2);
 
                 chaosContainer.stop();
                 client.flush();
-                assertEquals(mockRecord.acked, 3);
-                assertEquals(mockRecord.failed, 0);
+                assertEquals(mockRecord.getAcked(), 3);
+                assertEquals(mockRecord.getFailed(), 0);
                 assertEquals(client.totalHits(index), 3);
             } finally {
                 client.delete(index);
             }
         }
     }
-
     @Test
     public void testBulkBlocking() throws Exception {
         final String index = "indexblocking-" + UUID.randomUUID();
@@ -284,14 +290,14 @@ public class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                 }
 
                 Awaitility.await().untilAsserted(() -> {
-                    assertThat("acked record", mockRecord.acked, 
greaterThanOrEqualTo(4));
-                    assertEquals(mockRecord.failed, 0);
+                    assertThat("acked record", mockRecord.getAcked(), 
greaterThanOrEqualTo(4));
+                    assertEquals(mockRecord.getFailed(), 0);
                     assertThat("totalHits", client.totalHits(index), 
greaterThanOrEqualTo(4L));
                 });
                 client.flush();
                 Awaitility.await().untilAsserted(() -> {
-                    assertEquals(mockRecord.failed, 0);
-                    assertEquals(mockRecord.acked, 5);
+                    assertEquals(mockRecord.getFailed(), 0);
+                    assertEquals(mockRecord.getAcked(), 5);
                     assertEquals(client.totalHits(index), 5);
                 });
 
@@ -310,9 +316,11 @@ public class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                 assertTrue(elapsed > 29000); // bulkIndex was blocking while 
elasticsearch was down or busy
 
                 Thread.sleep(1000L);
-                assertEquals(mockRecord.acked, 15);
-                assertEquals(mockRecord.failed, 0);
-                assertEquals(client.records.size(), 0);
+                Awaitility.await().untilAsserted(() -> {
+                    assertEquals(mockRecord.getAcked(), 15);
+                    assertEquals(mockRecord.getFailed(), 0);
+                    assertEquals(client.records.size(), 0);
+                });
 
                 chaosContainer.stop();
             } finally {

Reply via email to