This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dcf1ea15d4b [improve][io] Use `SinkContext.fatal` in elastic search
connector (#21204)
dcf1ea15d4b is described below
commit dcf1ea15d4b0c0b67669f791cc0b04a705ba17c8
Author: Zike Yang <[email protected]>
AuthorDate: Mon Sep 25 21:59:33 2023 +0800
[improve][io] Use `SinkContext.fatal` in elastic search connector (#21204)
### Motivation
We have introduced the `SinkContext.fatal` method in
https://github.com/apache/pulsar/pull/21143
This PR use this fatal method to handle the elastic sink connector
exception correctly.
### Modifications
- Use `SinkContext.fatal` to throw connector exception
- Add state to the ElasticSearchClient
---
.../io/elasticsearch/ElasticSearchClient.java | 37 +++++----
.../pulsar/io/elasticsearch/ElasticSearchSink.java | 93 ++++++++++------------
.../io/elasticsearch/ElasticSearchAuthTests.java | 17 ++--
.../elasticsearch/ElasticSearchClientSslTests.java | 4 +-
.../io/elasticsearch/ElasticSearchClientTests.java | 54 ++++++++-----
.../io/elasticsearch/ElasticSearchSinkTests.java | 18 ++++-
.../opensearch/OpenSearchClientSslTests.java | 8 +-
7 files changed, 132 insertions(+), 99 deletions(-)
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index d6fb5bb705d..3b2359f16e8 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
import org.apache.pulsar.io.elasticsearch.client.RestClient;
import org.apache.pulsar.io.elasticsearch.client.RestClientFactory;
@@ -53,11 +54,19 @@ public class ElasticSearchClient implements AutoCloseable {
final Set<String> indexCache = new HashSet<>();
final Map<String, String> topicToIndexCache = new HashMap<>();
- final AtomicReference<Exception> irrecoverableError = new
AtomicReference<>();
+ final AtomicReference<State> state = new AtomicReference<>(State.Open);
+
private final IndexNameFormatter indexNameFormatter;
- public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) {
+ enum State {
+ Open, Failed, Closed
+ }
+
+ final SinkContext sinkContext;
+
+ public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig,
SinkContext sinkContext) {
this.config = elasticSearchConfig;
+ this.sinkContext = sinkContext;
if (this.config.getIndexName() != null) {
this.indexNameFormatter = new
IndexNameFormatter(this.config.getIndexName());
} else {
@@ -94,18 +103,15 @@ public class ElasticSearchClient implements AutoCloseable {
};
this.backoffRetry = new
RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
this.client = retry(() -> RestClientFactory.createClient(config,
bulkListener), -1, "client creation");
+ state.set(State.Open);
}
void failed(Exception e) {
- if (irrecoverableError.compareAndSet(null, e)) {
- log.error("Irrecoverable error:", e);
+ if (state.compareAndSet(State.Open, State.Failed)) {
+ sinkContext.fatal(e);
}
}
- boolean isFailed() {
- return irrecoverableError.get() != null;
- }
-
void checkForIrrecoverableError(Record<?> record,
BulkProcessor.BulkOperationResult result) {
if (!result.isError()) {
return;
@@ -145,7 +151,7 @@ public class ElasticSearchClient implements AutoCloseable {
public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws
Exception {
try {
- checkNotFailed();
+ checkState();
checkIndexExists(record);
final String indexName = indexName(record);
final String documentId = idAndDoc.getLeft();
@@ -174,7 +180,7 @@ public class ElasticSearchClient implements AutoCloseable {
*/
public boolean indexDocument(Record<GenericObject> record, Pair<String,
String> idAndDoc) throws Exception {
try {
- checkNotFailed();
+ checkState();
checkIndexExists(record);
final String indexName = indexName(record);
@@ -197,7 +203,7 @@ public class ElasticSearchClient implements AutoCloseable {
public void bulkDelete(Record<GenericObject> record, String id) throws
Exception {
try {
- checkNotFailed();
+ checkState();
checkIndexExists(record);
final String indexName = indexName(record);
@@ -224,7 +230,7 @@ public class ElasticSearchClient implements AutoCloseable {
*/
public boolean deleteDocument(Record<GenericObject> record, String id)
throws Exception {
try {
- checkNotFailed();
+ checkState();
checkIndexExists(record);
final String indexName = indexName(record);
final boolean deleted = client.deleteDocument(indexName, id);
@@ -254,6 +260,7 @@ public class ElasticSearchClient implements AutoCloseable {
client.close();
client = null;
}
+ state.compareAndSet(State.Open, State.Closed);
}
@VisibleForTesting
@@ -261,9 +268,9 @@ public class ElasticSearchClient implements AutoCloseable {
this.client = client;
}
- private void checkNotFailed() throws Exception {
- if (irrecoverableError.get() != null) {
- throw irrecoverableError.get();
+ private void checkState() {
+ if (state.get() != State.Open) {
+ throw new IllegalStateException(String.format("Elasticsearch
client is in %s state", state.get().name()));
}
}
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index f76f985f721..6d38775bd05 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -76,7 +76,7 @@ public class ElasticSearchSink implements Sink<GenericObject>
{
public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
elasticSearchConfig = ElasticSearchConfig.load(config, sinkContext);
elasticSearchConfig.validate();
- elasticsearchClient = new ElasticSearchClient(elasticSearchConfig);
+ elasticsearchClient = new ElasticSearchClient(elasticSearchConfig,
sinkContext);
if (!Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
primaryFields =
Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
}
@@ -110,60 +110,55 @@ public class ElasticSearchSink implements
Sink<GenericObject> {
@Override
public void write(Record<GenericObject> record) throws Exception {
- if (!elasticsearchClient.isFailed()) {
- Pair<String, String> idAndDoc = extractIdAndDocument(record);
- try {
- if (log.isDebugEnabled()) {
- log.debug("index doc {} {}", idAndDoc.getLeft(),
idAndDoc.getRight());
- }
- if (idAndDoc.getRight() == null) {
- switch (elasticSearchConfig.getNullValueAction()) {
- case DELETE:
- if (idAndDoc.getLeft() != null) {
- if (elasticSearchConfig.isBulkEnabled()) {
- elasticsearchClient.bulkDelete(record,
idAndDoc.getLeft());
- } else {
- elasticsearchClient.deleteDocument(record,
idAndDoc.getLeft());
- }
+ Pair<String, String> idAndDoc = extractIdAndDocument(record);
+ try {
+ if (log.isDebugEnabled()) {
+ log.debug("index doc {} {}", idAndDoc.getLeft(),
idAndDoc.getRight());
+ }
+ if (idAndDoc.getRight() == null) {
+ switch (elasticSearchConfig.getNullValueAction()) {
+ case DELETE:
+ if (idAndDoc.getLeft() != null) {
+ if (elasticSearchConfig.isBulkEnabled()) {
+ elasticsearchClient.bulkDelete(record,
idAndDoc.getLeft());
+ } else {
+ elasticsearchClient.deleteDocument(record,
idAndDoc.getLeft());
}
- break;
- case IGNORE:
- break;
- case FAIL:
- elasticsearchClient.failed(
- new
PulsarClientException.InvalidMessageException("Unexpected null message value"));
- throw elasticsearchClient.irrecoverableError.get();
- }
- } else {
- if (elasticSearchConfig.isBulkEnabled()) {
- elasticsearchClient.bulkIndex(record, idAndDoc);
- } else {
- elasticsearchClient.indexDocument(record, idAndDoc);
- }
- }
- } catch (JsonProcessingException jsonProcessingException) {
- switch (elasticSearchConfig.getMalformedDocAction()) {
+ }
+ break;
case IGNORE:
break;
- case WARN:
- log.warn("Ignoring malformed document messageId={}",
-
record.getMessage().map(Message::getMessageId).orElse(null),
- jsonProcessingException);
- elasticsearchClient.failed(jsonProcessingException);
- throw jsonProcessingException;
case FAIL:
- log.error("Malformed document messageId={}",
-
record.getMessage().map(Message::getMessageId).orElse(null),
- jsonProcessingException);
- elasticsearchClient.failed(jsonProcessingException);
- throw jsonProcessingException;
+ elasticsearchClient.failed(
+ new
PulsarClientException.InvalidMessageException("Unexpected null message value"));
+ }
+ } else {
+ if (elasticSearchConfig.isBulkEnabled()) {
+ elasticsearchClient.bulkIndex(record, idAndDoc);
+ } else {
+ elasticsearchClient.indexDocument(record, idAndDoc);
}
- } catch (Exception e) {
- log.error("write error for {} {}:", idAndDoc.getLeft(),
idAndDoc.getRight(), e);
- throw e;
}
- } else {
- throw new IllegalStateException("Elasticsearch client is in FAILED
status");
+ } catch (JsonProcessingException jsonProcessingException) {
+ switch (elasticSearchConfig.getMalformedDocAction()) {
+ case IGNORE:
+ break;
+ case WARN:
+ log.warn("Ignoring malformed document messageId={}",
+
record.getMessage().map(Message::getMessageId).orElse(null),
+ jsonProcessingException);
+ elasticsearchClient.failed(jsonProcessingException);
+ break;
+ case FAIL:
+ log.error("Malformed document messageId={}",
+
record.getMessage().map(Message::getMessageId).orElse(null),
+ jsonProcessingException);
+ elasticsearchClient.failed(jsonProcessingException);
+ break;
+ }
+ } catch (Exception e) {
+ log.error("write error for {} {}:", idAndDoc.getLeft(),
idAndDoc.getRight(), e);
+ throw e;
}
}
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAuthTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAuthTests.java
index 7c56bfc23c9..1deb82ad1de 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAuthTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAuthTests.java
@@ -22,6 +22,7 @@ import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
@@ -79,7 +80,7 @@ public abstract class ElasticSearchAuthTests extends
ElasticSearchTestBase {
config.setMaxRetries(1);
config.setBulkEnabled(true);
// ensure auth is needed
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));) {
expectThrows(ElasticSearchConnectionException.class, () -> {
client.createIndexIfNeeded(indexName);
});
@@ -87,7 +88,7 @@ public abstract class ElasticSearchAuthTests extends
ElasticSearchTestBase {
config.setPassword(ELASTICPWD);
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));) {
ensureCalls(client, indexName);
}
}
@@ -106,7 +107,7 @@ public abstract class ElasticSearchAuthTests extends
ElasticSearchTestBase {
config.setPassword(ELASTICPWD);
String token;
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));) {
token = createAuthToken(client, "elastic", ELASTICPWD);
}
@@ -114,14 +115,14 @@ public abstract class ElasticSearchAuthTests extends
ElasticSearchTestBase {
config.setPassword(null);
// ensure auth is needed
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));) {
expectThrows(ElasticSearchConnectionException.class, () -> {
client.createIndexIfNeeded(indexName);
});
}
config.setToken(token);
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));) {
ensureCalls(client, indexName);
}
}
@@ -139,7 +140,7 @@ public abstract class ElasticSearchAuthTests extends
ElasticSearchTestBase {
config.setPassword(ELASTICPWD);
String apiKey;
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));) {
apiKey = createApiKey(client);
}
@@ -147,14 +148,14 @@ public abstract class ElasticSearchAuthTests extends
ElasticSearchTestBase {
config.setPassword(null);
// ensure auth is needed
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));) {
expectThrows(ElasticSearchConnectionException.class, () -> {
client.createIndexIfNeeded(indexName);
});
}
config.setApiKey(apiKey);
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));) {
ensureCalls(client, indexName);
}
}
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
index 5598a88d410..5e0b2a029b8 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.elasticsearch;
+import org.apache.pulsar.io.core.SinkContext;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.MountableFile;
@@ -26,6 +27,7 @@ import org.testng.annotations.Test;
import java.io.IOException;
import java.time.Duration;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -183,7 +185,7 @@ public abstract class ElasticSearchClientSslTests extends
ElasticSearchTestBase
}
private void testClientWithConfig(ElasticSearchConfig config) throws
IOException {
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));) {
testIndexExists(client);
}
}
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index 6d9928c0426..31ad27ec051 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.io.elasticsearch;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
@@ -32,16 +35,17 @@ import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
import
org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestClient;
import
org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient;
import
org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer;
import org.awaitility.Awaitility;
-import org.mockito.Mockito;
import org.testcontainers.containers.Network;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.annotations.AfterClass;
@@ -109,7 +113,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
public void testClientInstance() throws Exception {
try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
.setElasticSearchUrl("http://" +
container.getHttpHostAddress())
- .setIndexName(INDEX))) {
+ .setIndexName(INDEX), mock(SinkContext.class));) {
if (elasticImageName.equals(OPENSEARCH) ||
elasticImageName.equals(ELASTICSEARCH_7)) {
assertTrue(client.getRestClient() instanceof
OpenSearchHighLevelRestClient);
} else {
@@ -121,23 +125,23 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
@Test
public void testIndexName() throws Exception {
String index = "myindex-" + UUID.randomUUID();
- Record<GenericObject> record = Mockito.mock(Record.class);
+ Record<GenericObject> record = mock(Record.class);
String topicName = "topic-" + UUID.randomUUID();
when(record.getTopicName()).thenReturn(Optional.of(topicName));
try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
.setElasticSearchUrl("http://" +
container.getHttpHostAddress())
- .setIndexName(index))) {
+ .setIndexName(index), mock(SinkContext.class))) {
assertEquals(client.indexName(record), index);
}
try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
- .setElasticSearchUrl("http://" +
container.getHttpHostAddress()))) {
+ .setElasticSearchUrl("http://" +
container.getHttpHostAddress()), mock(SinkContext.class))) {
assertEquals(client.indexName(record), topicName);
}
String indexBase = "myindex-" + UUID.randomUUID();
index = indexBase + "-%{+yyyy-MM-dd}";
try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
.setElasticSearchUrl("http://" +
container.getHttpHostAddress())
- .setIndexName(index))) {
+ .setIndexName(index), mock(SinkContext.class))) {
assertThrows(IllegalStateException.class, () -> {
client.indexName(record);
});
@@ -145,7 +149,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
when(record.getEventTime()).thenReturn(Optional.of(1645182000000L));
try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
.setElasticSearchUrl("http://" +
container.getHttpHostAddress())
- .setIndexName(index))) {
+ .setIndexName(index), mock(SinkContext.class))) {
assertEquals(client.indexName(record), indexBase + "-2022-02-18");
}
}
@@ -155,7 +159,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
String index = "myindex-" + UUID.randomUUID();
try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
.setElasticSearchUrl("http://" +
container.getHttpHostAddress())
- .setIndexName(index));) {
+ .setIndexName(index), mock(SinkContext.class));) {
assertTrue(client.createIndexIfNeeded(index));
try {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
@@ -179,7 +183,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
String index = "mynewindex-" + UUID.randomUUID();
try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
.setElasticSearchUrl("http://" +
container.getHttpHostAddress())
- .setIndexName(index));) {
+ .setIndexName(index), mock(SinkContext.class));) {
assertFalse(client.indexExists(index));
assertTrue(client.createIndexIfNeeded(index));
try {
@@ -194,7 +198,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
@Test
public void testTopicToIndexName() throws IOException {
try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
- .setElasticSearchUrl("http://" +
container.getHttpHostAddress()));) {
+ .setElasticSearchUrl("http://" +
container.getHttpHostAddress()), mock(SinkContext.class));) {
assertEquals(client.topicToIndexName("data-ks1.table1"),
"data-ks1.table1");
assertEquals(client.topicToIndexName("persistent://public/default/testesjson"),
"testesjson");
assertEquals(client.topicToIndexName("default/testesjson"),
"testesjson");
@@ -217,13 +221,19 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
.setBulkEnabled(true)
.setBulkFlushIntervalInMs(-1L)
.setMalformedDocAction(ElasticSearchConfig.MalformedDocAction.FAIL);
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ SinkContext sinkContext = mock(SinkContext.class);
+ AtomicReference<Throwable> irrecoverableError = new
AtomicReference<>();
+ doAnswer(invocation -> {
+ irrecoverableError.compareAndSet(null, invocation.getArgument(0));
+ return null;
+ }).when(sinkContext).fatal(any(Throwable.class));
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
sinkContext);) {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
client.flush();
- assertNotNull(client.irrecoverableError.get());
-
assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
+ assertNotNull(irrecoverableError.get());
+
assertTrue(irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 1);
assertThrows(Exception.class, () -> client.bulkIndex(mockRecord,
Pair.of("3", "{\"a\":3}")));
@@ -241,12 +251,18 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
.setBulkEnabled(true)
.setBulkFlushIntervalInMs(-1)
.setMalformedDocAction(ElasticSearchConfig.MalformedDocAction.IGNORE);
- try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+ SinkContext sinkContext = mock(SinkContext.class);
+ AtomicReference<Throwable> irrecoverableError = new
AtomicReference<>();
+ doAnswer(invocation -> {
+ irrecoverableError.set(invocation.getArgument(0));
+ return null;
+ }).when(sinkContext).fatal(any(Throwable.class));
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));) {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
client.flush();
- assertNull(client.irrecoverableError.get());
+ assertNull(irrecoverableError.get());
assertEquals(mockRecord.getAcked(), 1);
assertEquals(mockRecord.getFailed(), 1);
}
@@ -268,7 +284,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
// disabled, we want to have full control over flush()
method
.setBulkFlushIntervalInMs(-1);
- try (ElasticSearchClient client = new
ElasticSearchClient(config);) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));) {
try {
assertTrue(client.createIndexIfNeeded(index));
MockRecord<GenericObject> mockRecord = new MockRecord<>();
@@ -314,7 +330,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
.setBulkConcurrentRequests(2)
.setRetryBackoffInMs(100)
.setBulkFlushIntervalInMs(10000);
- try (ElasticSearchClient client = new
ElasticSearchClient(config);) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));) {
assertTrue(client.createIndexIfNeeded(index));
try {
@@ -373,7 +389,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
.setBulkActions(10)
.setBulkFlushIntervalInMs(-1L);
- try (ElasticSearchClient client = new ElasticSearchClient(config)) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class))) {
assertTrue(client.createIndexIfNeeded(index));
MockRecord<GenericObject> mockRecord = new MockRecord<>();
for (int i = 0; i < 5; i++) {
@@ -397,7 +413,7 @@ public abstract class ElasticSearchClientTests extends
ElasticSearchTestBase {
.setElasticSearchUrl("http://" +
container.getHttpHostAddress())
.setIndexName(index);
- try (ElasticSearchClient client = new ElasticSearchClient(config)) {
+ try (ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class))) {
MockRecord<GenericObject> mockRecord = new MockRecord<>();
client.indexDocument(mockRecord, Pair.of("key0",
"{\"a\":1,\"b\":null}"));
final Map<String, Object> sourceAsMap;
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 0ce29683f86..8e828e9a395 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.io.elasticsearch;
import co.elastic.clients.transport.ElasticsearchTransport;
import com.fasterxml.jackson.core.JsonParseException;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -33,6 +33,8 @@ import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -73,6 +75,7 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.fail;
@@ -89,6 +92,7 @@ public abstract class ElasticSearchSinkTests extends
ElasticSearchTestBase {
@Mock
protected SinkContext mockSinkContext;
+ AtomicReference<Throwable> irrecoverableError = new AtomicReference<>();
protected Map<String, Object> map;
protected ElasticSearchSink sink;
@@ -135,6 +139,11 @@ public abstract class ElasticSearchSinkTests extends
ElasticSearchTestBase {
mockRecord = mock(Record.class);
mockSinkContext = mock(SinkContext.class);
+ irrecoverableError.set(null);
+ doAnswer(invocation -> {
+ irrecoverableError.set(invocation.getArgument(0));
+ return null;
+ }).when(mockSinkContext).fatal(any(Throwable.class));
when(mockRecord.getValue()).thenAnswer((Answer<GenericObject>)
invocation -> new GenericObject() {
@Override
@@ -422,7 +431,7 @@ public abstract class ElasticSearchSinkTests extends
ElasticSearchTestBase {
assertEquals(json,
"{\"name\":null,\"userName\":\"boby\",\"email\":null}");
}
- @Test(expectedExceptions =
PulsarClientException.InvalidMessageException.class)
+ @Test
public void testNullValueFailure() throws Exception {
String index = "testnullvaluefail";
map.put("indexName", index);
@@ -431,6 +440,7 @@ public abstract class ElasticSearchSinkTests extends
ElasticSearchTestBase {
sink.open(map, mockSinkContext);
MockRecordNullValue mockRecordNullValue = new MockRecordNullValue();
sink.write(mockRecordNullValue);
+ assertNotNull(irrecoverableError.get());
}
@Test
@@ -480,7 +490,7 @@ public abstract class ElasticSearchSinkTests extends
ElasticSearchTestBase {
assertEquals(sink.getElasticsearchClient().getRestClient().totalHits(index),
1L);
sink.write(new MockRecordNullValue());
assertEquals(sink.getElasticsearchClient().getRestClient().totalHits(index),
action.equals(ElasticSearchConfig.NullValueAction.DELETE) ? 0L : 1L);
- assertNull(sink.getElasticsearchClient().irrecoverableError.get());
+ assertNull(irrecoverableError.get());
}
@Test
@@ -517,7 +527,7 @@ public abstract class ElasticSearchSinkTests extends
ElasticSearchTestBase {
sink.close();
verify(restHighLevelClient).close();
- verify(internalBulkProcessor).awaitClose(Mockito.anyLong(),
Mockito.any(TimeUnit.class));
+ verify(internalBulkProcessor).awaitClose(Mockito.anyLong(),
any(TimeUnit.class));
verify(client).close();
verify(restClient).close();
} else {
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java
index de694613685..72bebfe2bbf 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.io.elasticsearch.opensearch;
+import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.elasticsearch.ElasticSearchClient;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
import org.apache.pulsar.io.elasticsearch.ElasticSearchSslConfig;
@@ -32,6 +33,7 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -81,7 +83,7 @@ public class OpenSearchClientSslTests extends
ElasticSearchTestBase {
.setEnabled(true)
.setTruststorePath(sslResourceDir +
"/truststore.jks")
.setTruststorePassword("changeit"));
- ElasticSearchClient client = new ElasticSearchClient(config);
+ ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));
testIndexExists(client);
}
}
@@ -107,7 +109,7 @@ public class OpenSearchClientSslTests extends
ElasticSearchTestBase {
.setHostnameVerification(true)
.setTruststorePath(sslResourceDir +
"/truststore.jks")
.setTruststorePassword("changeit"));
- ElasticSearchClient client = new ElasticSearchClient(config);
+ ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));
testIndexExists(client);
}
}
@@ -133,7 +135,7 @@ public class OpenSearchClientSslTests extends
ElasticSearchTestBase {
.setTruststorePassword("changeit")
.setKeystorePath(sslResourceDir + "/keystore.jks")
.setKeystorePassword("changeit"));
- ElasticSearchClient client = new ElasticSearchClient(config);
+ ElasticSearchClient client = new ElasticSearchClient(config,
mock(SinkContext.class));
testIndexExists(client);
}
}