This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f02679d8340 [fix][flaky-test] 
ElasticSearchClientTests.testBulkBlocking (#16920)
f02679d8340 is described below

commit f02679d83402ac07e06c48570505f343e350a491
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Aug 9 14:07:31 2022 +0800

    [fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920)
---
 .../io/elasticsearch/ElasticSearchClientTests.java | 93 ++++++++++++----------
 1 file changed, 50 insertions(+), 43 deletions(-)

diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index 7ade25046f1..8a453dfe451 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -18,7 +18,20 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
 import eu.rekawek.toxiproxy.model.ToxicDirection;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.atomic.LongAdder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.schema.GenericObject;
@@ -34,20 +47,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.io.IOException;
-import java.util.Optional;
-import java.util.UUID;
-
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-
 @Slf4j
 public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
     public final static String INDEX = "myindex";
@@ -78,8 +77,16 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
     }
 
     static class MockRecord<T> implements Record<T> {
-        int acked = 0;
-        int failed = 0;
+        LongAdder acked = new LongAdder();
+        LongAdder failed = new LongAdder();
+
+        public int getAcked() {
+            return acked.intValue();
+        }
+
+        public int getFailed() {
+            return failed.intValue();
+        }
 
         @Override
         public T getValue() {
@@ -88,12 +95,12 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
 
         @Override
         public void ack() {
-            acked++;
+            acked.increment();
         }
 
         @Override
         public void fail() {
-            failed++;
+            failed.increment();
         }
     }
 
@@ -152,13 +159,13 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
             try {
                 MockRecord<GenericObject> mockRecord = new MockRecord<>();
                 client.indexDocument(mockRecord, Pair.of("1", "{ \"a\":1}"));
-                assertEquals(mockRecord.acked, 1);
-                assertEquals(mockRecord.failed, 0);
+                assertEquals(mockRecord.getAcked(), 1);
+                assertEquals(mockRecord.getFailed(), 0);
                 assertEquals(client.getRestClient().totalHits(index), 1);
 
                 client.deleteDocument(mockRecord, "1");
-                assertEquals(mockRecord.acked, 2);
-                assertEquals(mockRecord.failed, 0);
+                assertEquals(mockRecord.getAcked(), 2);
+                assertEquals(mockRecord.getFailed(), 0);
                 assertEquals(client.getRestClient().totalHits(index), 0);
             } finally {
                 client.getRestClient().deleteIndex(index);
@@ -216,11 +223,11 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
             client.flush();
             assertNotNull(client.irrecoverableError.get());
             
assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
-            assertEquals(mockRecord.acked, 1);
-            assertEquals(mockRecord.failed, 1);
+            assertEquals(mockRecord.getAcked(), 1);
+            assertEquals(mockRecord.getFailed(), 1);
             assertThrows(Exception.class, () -> client.bulkIndex(mockRecord, 
Pair.of("3", "{\"a\":3}")));
-            assertEquals(mockRecord.acked, 1);
-            assertEquals(mockRecord.failed, 2);
+            assertEquals(mockRecord.getAcked(), 1);
+            assertEquals(mockRecord.getFailed(), 2);
         }
     }
 
@@ -239,8 +246,8 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
             client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
             client.flush();
             assertNull(client.irrecoverableError.get());
-            assertEquals(mockRecord.acked, 1);
-            assertEquals(mockRecord.failed, 1);
+            assertEquals(mockRecord.getAcked(), 1);
+            assertEquals(mockRecord.getFailed(), 1);
         }
     }
 
@@ -266,8 +273,8 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                     MockRecord<GenericObject> mockRecord = new MockRecord<>();
                     client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
                     client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
-                    assertEquals(mockRecord.acked, 2);
-                    assertEquals(mockRecord.failed, 0);
+                    assertEquals(mockRecord.getAcked(), 2);
+                    assertEquals(mockRecord.getFailed(), 0);
                     assertEquals(client.getRestClient().totalHits(index), 2);
 
                     log.info("starting the toxic");
@@ -276,13 +283,13 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                     toxiproxy.removeToxicAfterDelay("elasticpause", 15000);
 
                     client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
-                    assertEquals(mockRecord.acked, 2);
-                    assertEquals(mockRecord.failed, 0);
+                    assertEquals(mockRecord.getAcked(), 2);
+                    assertEquals(mockRecord.getFailed(), 0);
                     assertEquals(client.getRestClient().totalHits(index), 2);
 
                     client.flush();
-                    assertEquals(mockRecord.acked, 3);
-                    assertEquals(mockRecord.failed, 0);
+                    assertEquals(mockRecord.getAcked(), 3);
+                    assertEquals(mockRecord.getFailed(), 0);
                     assertEquals(client.getRestClient().totalHits(index), 3);
                 } finally {
                     client.getRestClient().deleteIndex(index);
@@ -316,14 +323,14 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                     }
 
                     Awaitility.await().untilAsserted(() -> {
-                        assertThat("acked record", mockRecord.acked, 
greaterThanOrEqualTo(4));
-                        assertEquals(mockRecord.failed, 0);
+                        assertThat("acked record", mockRecord.getAcked(), 
greaterThanOrEqualTo(4));
+                        assertEquals(mockRecord.getFailed(), 0);
                         assertThat("totalHits", 
client.getRestClient().totalHits(index), greaterThanOrEqualTo(4L));
                     });
                     client.flush();
                     Awaitility.await().untilAsserted(() -> {
-                        assertEquals(mockRecord.failed, 0);
-                        assertEquals(mockRecord.acked, 5);
+                        assertEquals(mockRecord.getFailed(), 0);
+                        assertEquals(mockRecord.getAcked(), 5);
                         assertEquals(client.getRestClient().totalHits(index), 
5);
                     });
 
@@ -344,8 +351,8 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                     assertTrue(elapsed > 29000); // bulkIndex was blocking 
while elasticsearch was down or busy
 
                     Awaitility.await().untilAsserted(() -> {
-                        assertEquals(mockRecord.acked, 15);
-                        assertEquals(mockRecord.failed, 0);
+                        assertEquals(mockRecord.getAcked(), 15);
+                        assertEquals(mockRecord.getFailed(), 0);
                     });
 
                 } finally {
@@ -372,13 +379,13 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                 client.bulkIndex(mockRecord, Pair.of("key" + i, "{\"a\":" + i 
+ "}"));
                 client.bulkDelete(mockRecord, "key" + i);
             }
-            assertEquals(mockRecord.acked, 10);
-            assertEquals(mockRecord.failed, 0);
+            assertEquals(mockRecord.getAcked(), 10);
+            assertEquals(mockRecord.getFailed(), 0);
             assertEquals(client.getRestClient().totalHits(index), 0);
             // no effect
             client.flush();
 
-            assertEquals(mockRecord.acked, 10);
+            assertEquals(mockRecord.getAcked(), 10);
         }
     }
 

Reply via email to