This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f02679d8340 [fix][flaky-test]
ElasticSearchClientTests.testBulkBlocking (#16920)
f02679d8340 is described below
commit f02679d83402ac07e06c48570505f343e350a491
Author: Cong Zhao <[email protected]>
AuthorDate: Tue Aug 9 14:07:31 2022 +0800
[fix][flaky-test] ElasticSearchClientTests.testBulkBlocking (#16920)
---
.../io/elasticsearch/ElasticSearchClientTests.java | 93 ++++++++++++----------
1 file changed, 50 insertions(+), 43 deletions(-)
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index 7ade25046f1..8a453dfe451 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -18,7 +18,20 @@
*/
package org.apache.pulsar.io.elasticsearch;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
import eu.rekawek.toxiproxy.model.ToxicDirection;
+import java.io.IOException;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.atomic.LongAdder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
@@ -34,20 +47,6 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.io.IOException;
-import java.util.Optional;
-import java.util.UUID;
-
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertThrows;
-import static org.testng.Assert.assertTrue;
-
@Slf4j
public abstract class ElasticSearchClientTests extends ElasticSearchTestBase {
public final static String INDEX = "myindex";
@@ -78,8 +77,16 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
}
static class MockRecord<T> implements Record<T> {
- int acked = 0;
- int failed = 0;
+ LongAdder acked = new LongAdder();
+ LongAdder failed = new LongAdder();
+
+ public int getAcked() {
+ return acked.intValue();
+ }
+
+ public int getFailed() {
+ return failed.intValue();
+ }
@Override
public T getValue() {
@@ -88,12 +95,12 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
@Override
public void ack() {
- acked++;
+ acked.increment();
}
@Override
public void fail() {
- failed++;
+ failed.increment();
}
}
@@ -152,13 +159,13 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.indexDocument(mockRecord, Pair.of("1", "{ \"a\":1}"));
- assertEquals(mockRecord.acked, 1);
- assertEquals(mockRecord.failed, 0);
+ assertEquals(mockRecord.getAcked(), 1);
+ assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.getRestClient().totalHits(index), 1);
client.deleteDocument(mockRecord, "1");
- assertEquals(mockRecord.acked, 2);
- assertEquals(mockRecord.failed, 0);
+ assertEquals(mockRecord.getAcked(), 2);
+ assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.getRestClient().totalHits(index), 0);
} finally {
client.getRestClient().deleteIndex(index);
@@ -216,11 +223,11 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
client.flush();
assertNotNull(client.irrecoverableError.get());
assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
- assertEquals(mockRecord.acked, 1);
- assertEquals(mockRecord.failed, 1);
+ assertEquals(mockRecord.getAcked(), 1);
+ assertEquals(mockRecord.getFailed(), 1);
assertThrows(Exception.class, () -> client.bulkIndex(mockRecord,
Pair.of("3", "{\"a\":3}")));
- assertEquals(mockRecord.acked, 1);
- assertEquals(mockRecord.failed, 2);
+ assertEquals(mockRecord.getAcked(), 1);
+ assertEquals(mockRecord.getFailed(), 2);
}
}
@@ -239,8 +246,8 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
client.flush();
assertNull(client.irrecoverableError.get());
- assertEquals(mockRecord.acked, 1);
- assertEquals(mockRecord.failed, 1);
+ assertEquals(mockRecord.getAcked(), 1);
+ assertEquals(mockRecord.getFailed(), 1);
}
}
@@ -266,8 +273,8 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":2}"));
- assertEquals(mockRecord.acked, 2);
- assertEquals(mockRecord.failed, 0);
+ assertEquals(mockRecord.getAcked(), 2);
+ assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.getRestClient().totalHits(index), 2);
log.info("starting the toxic");
@@ -276,13 +283,13 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
toxiproxy.removeToxicAfterDelay("elasticpause", 15000);
client.bulkIndex(mockRecord, Pair.of("3", "{\"a\":3}"));
- assertEquals(mockRecord.acked, 2);
- assertEquals(mockRecord.failed, 0);
+ assertEquals(mockRecord.getAcked(), 2);
+ assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.getRestClient().totalHits(index), 2);
client.flush();
- assertEquals(mockRecord.acked, 3);
- assertEquals(mockRecord.failed, 0);
+ assertEquals(mockRecord.getAcked(), 3);
+ assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.getRestClient().totalHits(index), 3);
} finally {
client.getRestClient().deleteIndex(index);
@@ -316,14 +323,14 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
}
Awaitility.await().untilAsserted(() -> {
- assertThat("acked record", mockRecord.acked,
greaterThanOrEqualTo(4));
- assertEquals(mockRecord.failed, 0);
+ assertThat("acked record", mockRecord.getAcked(),
greaterThanOrEqualTo(4));
+ assertEquals(mockRecord.getFailed(), 0);
assertThat("totalHits",
client.getRestClient().totalHits(index), greaterThanOrEqualTo(4L));
});
client.flush();
Awaitility.await().untilAsserted(() -> {
- assertEquals(mockRecord.failed, 0);
- assertEquals(mockRecord.acked, 5);
+ assertEquals(mockRecord.getFailed(), 0);
+ assertEquals(mockRecord.getAcked(), 5);
assertEquals(client.getRestClient().totalHits(index),
5);
});
@@ -344,8 +351,8 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
assertTrue(elapsed > 29000); // bulkIndex was blocking
while elasticsearch was down or busy
Awaitility.await().untilAsserted(() -> {
- assertEquals(mockRecord.acked, 15);
- assertEquals(mockRecord.failed, 0);
+ assertEquals(mockRecord.getAcked(), 15);
+ assertEquals(mockRecord.getFailed(), 0);
});
} finally {
@@ -372,13 +379,13 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
client.bulkIndex(mockRecord, Pair.of("key" + i, "{\"a\":" + i
+ "}"));
client.bulkDelete(mockRecord, "key" + i);
}
- assertEquals(mockRecord.acked, 10);
- assertEquals(mockRecord.failed, 0);
+ assertEquals(mockRecord.getAcked(), 10);
+ assertEquals(mockRecord.getFailed(), 0);
assertEquals(client.getRestClient().totalHits(index), 0);
// no effect
client.flush();
- assertEquals(mockRecord.acked, 10);
+ assertEquals(mockRecord.getAcked(), 10);
}
}