This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dcf1ea15d4b [improve][io] Use `SinkContext.fatal` in elastic search 
connector (#21204)
dcf1ea15d4b is described below

commit dcf1ea15d4b0c0b67669f791cc0b04a705ba17c8
Author: Zike Yang <[email protected]>
AuthorDate: Mon Sep 25 21:59:33 2023 +0800

    [improve][io] Use `SinkContext.fatal` in elastic search connector (#21204)
    
    ### Motivation
    
    We have introduced the `SinkContext.fatal` method in 
https://github.com/apache/pulsar/pull/21143
    This PR use this fatal method to handle the elastic sink connector 
exception correctly.
    
    ### Modifications
    
    - Use `SinkContext.fatal` to throw connector exception
    - Add state to the ElasticSearchClient
---
 .../io/elasticsearch/ElasticSearchClient.java      | 37 +++++----
 .../pulsar/io/elasticsearch/ElasticSearchSink.java | 93 ++++++++++------------
 .../io/elasticsearch/ElasticSearchAuthTests.java   | 17 ++--
 .../elasticsearch/ElasticSearchClientSslTests.java |  4 +-
 .../io/elasticsearch/ElasticSearchClientTests.java | 54 ++++++++-----
 .../io/elasticsearch/ElasticSearchSinkTests.java   | 18 ++++-
 .../opensearch/OpenSearchClientSslTests.java       |  8 +-
 7 files changed, 132 insertions(+), 99 deletions(-)

diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index d6fb5bb705d..3b2359f16e8 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -33,6 +33,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
 import org.apache.pulsar.io.elasticsearch.client.RestClient;
 import org.apache.pulsar.io.elasticsearch.client.RestClientFactory;
@@ -53,11 +54,19 @@ public class ElasticSearchClient implements AutoCloseable {
     final Set<String> indexCache = new HashSet<>();
     final Map<String, String> topicToIndexCache = new HashMap<>();
 
-    final AtomicReference<Exception> irrecoverableError = new 
AtomicReference<>();
+    final AtomicReference<State> state = new AtomicReference<>(State.Open);
+
     private final IndexNameFormatter indexNameFormatter;
 
-    public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) {
+    enum State {
+        Open, Failed, Closed
+    }
+
+    final SinkContext sinkContext;
+
+    public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig, 
SinkContext sinkContext) {
         this.config = elasticSearchConfig;
+        this.sinkContext = sinkContext;
         if (this.config.getIndexName() != null) {
             this.indexNameFormatter = new 
IndexNameFormatter(this.config.getIndexName());
         } else {
@@ -94,18 +103,15 @@ public class ElasticSearchClient implements AutoCloseable {
         };
         this.backoffRetry = new 
RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
         this.client = retry(() -> RestClientFactory.createClient(config, 
bulkListener), -1, "client creation");
+        state.set(State.Open);
     }
 
     void failed(Exception e) {
-        if (irrecoverableError.compareAndSet(null, e)) {
-            log.error("Irrecoverable error:", e);
+        if (state.compareAndSet(State.Open, State.Failed)) {
+            sinkContext.fatal(e);
         }
     }
 
-    boolean isFailed() {
-        return irrecoverableError.get() != null;
-    }
-
     void checkForIrrecoverableError(Record<?> record, 
BulkProcessor.BulkOperationResult result) {
         if (!result.isError()) {
             return;
@@ -145,7 +151,7 @@ public class ElasticSearchClient implements AutoCloseable {
 
     public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws 
Exception {
         try {
-            checkNotFailed();
+            checkState();
             checkIndexExists(record);
             final String indexName = indexName(record);
             final String documentId = idAndDoc.getLeft();
@@ -174,7 +180,7 @@ public class ElasticSearchClient implements AutoCloseable {
      */
     public boolean indexDocument(Record<GenericObject> record, Pair<String, 
String> idAndDoc) throws Exception {
         try {
-            checkNotFailed();
+            checkState();
             checkIndexExists(record);
 
             final String indexName = indexName(record);
@@ -197,7 +203,7 @@ public class ElasticSearchClient implements AutoCloseable {
 
     public void bulkDelete(Record<GenericObject> record, String id) throws 
Exception {
         try {
-            checkNotFailed();
+            checkState();
             checkIndexExists(record);
 
             final String indexName = indexName(record);
@@ -224,7 +230,7 @@ public class ElasticSearchClient implements AutoCloseable {
      */
     public boolean deleteDocument(Record<GenericObject> record, String id) 
throws Exception {
         try {
-            checkNotFailed();
+            checkState();
             checkIndexExists(record);
             final String indexName = indexName(record);
             final boolean deleted = client.deleteDocument(indexName, id);
@@ -254,6 +260,7 @@ public class ElasticSearchClient implements AutoCloseable {
             client.close();
             client = null;
         }
+        state.compareAndSet(State.Open, State.Closed);
     }
 
     @VisibleForTesting
@@ -261,9 +268,9 @@ public class ElasticSearchClient implements AutoCloseable {
         this.client = client;
     }
 
-    private void checkNotFailed() throws Exception {
-        if (irrecoverableError.get() != null) {
-            throw irrecoverableError.get();
+    private void checkState() {
+        if (state.get() != State.Open) {
+            throw new IllegalStateException(String.format("Elasticsearch 
client is in %s state", state.get().name()));
         }
     }
 
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index f76f985f721..6d38775bd05 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -76,7 +76,7 @@ public class ElasticSearchSink implements Sink<GenericObject> 
{
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         elasticSearchConfig = ElasticSearchConfig.load(config, sinkContext);
         elasticSearchConfig.validate();
-        elasticsearchClient = new ElasticSearchClient(elasticSearchConfig);
+        elasticsearchClient = new ElasticSearchClient(elasticSearchConfig, 
sinkContext);
         if (!Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
             primaryFields = 
Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
         }
@@ -110,60 +110,55 @@ public class ElasticSearchSink implements 
Sink<GenericObject> {
 
     @Override
     public void write(Record<GenericObject> record) throws Exception {
-        if (!elasticsearchClient.isFailed()) {
-            Pair<String, String> idAndDoc = extractIdAndDocument(record);
-            try {
-                if (log.isDebugEnabled()) {
-                    log.debug("index doc {} {}", idAndDoc.getLeft(), 
idAndDoc.getRight());
-                }
-                if (idAndDoc.getRight() == null) {
-                    switch (elasticSearchConfig.getNullValueAction()) {
-                        case DELETE:
-                            if (idAndDoc.getLeft() != null) {
-                                if (elasticSearchConfig.isBulkEnabled()) {
-                                    elasticsearchClient.bulkDelete(record, 
idAndDoc.getLeft());
-                                } else {
-                                    elasticsearchClient.deleteDocument(record, 
idAndDoc.getLeft());
-                                }
+        Pair<String, String> idAndDoc = extractIdAndDocument(record);
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("index doc {} {}", idAndDoc.getLeft(), 
idAndDoc.getRight());
+            }
+            if (idAndDoc.getRight() == null) {
+                switch (elasticSearchConfig.getNullValueAction()) {
+                    case DELETE:
+                        if (idAndDoc.getLeft() != null) {
+                            if (elasticSearchConfig.isBulkEnabled()) {
+                                elasticsearchClient.bulkDelete(record, 
idAndDoc.getLeft());
+                            } else {
+                                elasticsearchClient.deleteDocument(record, 
idAndDoc.getLeft());
                             }
-                            break;
-                        case IGNORE:
-                            break;
-                        case FAIL:
-                            elasticsearchClient.failed(
-                                    new 
PulsarClientException.InvalidMessageException("Unexpected null message value"));
-                            throw elasticsearchClient.irrecoverableError.get();
-                    }
-                } else {
-                    if (elasticSearchConfig.isBulkEnabled()) {
-                        elasticsearchClient.bulkIndex(record, idAndDoc);
-                    } else {
-                        elasticsearchClient.indexDocument(record, idAndDoc);
-                    }
-                }
-            } catch (JsonProcessingException jsonProcessingException) {
-                switch (elasticSearchConfig.getMalformedDocAction()) {
+                        }
+                        break;
                     case IGNORE:
                         break;
-                    case WARN:
-                        log.warn("Ignoring malformed document messageId={}",
-                                
record.getMessage().map(Message::getMessageId).orElse(null),
-                                jsonProcessingException);
-                        elasticsearchClient.failed(jsonProcessingException);
-                        throw jsonProcessingException;
                     case FAIL:
-                        log.error("Malformed document messageId={}",
-                                
record.getMessage().map(Message::getMessageId).orElse(null),
-                                jsonProcessingException);
-                        elasticsearchClient.failed(jsonProcessingException);
-                        throw jsonProcessingException;
+                        elasticsearchClient.failed(
+                                new 
PulsarClientException.InvalidMessageException("Unexpected null message value"));
+                }
+            } else {
+                if (elasticSearchConfig.isBulkEnabled()) {
+                    elasticsearchClient.bulkIndex(record, idAndDoc);
+                } else {
+                    elasticsearchClient.indexDocument(record, idAndDoc);
                 }
-            } catch (Exception e) {
-                log.error("write error for {} {}:", idAndDoc.getLeft(), 
idAndDoc.getRight(), e);
-                throw e;
             }
-        } else {
-            throw new IllegalStateException("Elasticsearch client is in FAILED 
status");
+        } catch (JsonProcessingException jsonProcessingException) {
+            switch (elasticSearchConfig.getMalformedDocAction()) {
+                case IGNORE:
+                    break;
+                case WARN:
+                    log.warn("Ignoring malformed document messageId={}",
+                            
record.getMessage().map(Message::getMessageId).orElse(null),
+                            jsonProcessingException);
+                    elasticsearchClient.failed(jsonProcessingException);
+                    break;
+                case FAIL:
+                    log.error("Malformed document messageId={}",
+                            
record.getMessage().map(Message::getMessageId).orElse(null),
+                            jsonProcessingException);
+                    elasticsearchClient.failed(jsonProcessingException);
+                    break;
+            }
+        } catch (Exception e) {
+            log.error("write error for {} {}:", idAndDoc.getLeft(), 
idAndDoc.getRight(), e);
+            throw e;
         }
     }
 
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAuthTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAuthTests.java
index 7c56bfc23c9..1deb82ad1de 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAuthTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchAuthTests.java
@@ -22,6 +22,7 @@ import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeMethod;
@@ -79,7 +80,7 @@ public abstract class ElasticSearchAuthTests extends 
ElasticSearchTestBase {
         config.setMaxRetries(1);
         config.setBulkEnabled(true);
         // ensure auth is needed
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));) {
             expectThrows(ElasticSearchConnectionException.class, () -> {
                 client.createIndexIfNeeded(indexName);
             });
@@ -87,7 +88,7 @@ public abstract class ElasticSearchAuthTests extends 
ElasticSearchTestBase {
 
         config.setPassword(ELASTICPWD);
 
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));) {
             ensureCalls(client, indexName);
         }
     }
@@ -106,7 +107,7 @@ public abstract class ElasticSearchAuthTests extends 
ElasticSearchTestBase {
 
         config.setPassword(ELASTICPWD);
         String token;
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));) {
             token = createAuthToken(client, "elastic", ELASTICPWD);
         }
 
@@ -114,14 +115,14 @@ public abstract class ElasticSearchAuthTests extends 
ElasticSearchTestBase {
         config.setPassword(null);
 
         // ensure auth is needed
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));) {
             expectThrows(ElasticSearchConnectionException.class, () -> {
                 client.createIndexIfNeeded(indexName);
             });
         }
 
         config.setToken(token);
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));) {
             ensureCalls(client, indexName);
         }
     }
@@ -139,7 +140,7 @@ public abstract class ElasticSearchAuthTests extends 
ElasticSearchTestBase {
 
         config.setPassword(ELASTICPWD);
         String apiKey;
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));) {
             apiKey = createApiKey(client);
         }
 
@@ -147,14 +148,14 @@ public abstract class ElasticSearchAuthTests extends 
ElasticSearchTestBase {
         config.setPassword(null);
 
         // ensure auth is needed
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));) {
             expectThrows(ElasticSearchConnectionException.class, () -> {
                 client.createIndexIfNeeded(indexName);
             });
         }
 
         config.setApiKey(apiKey);
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));) {
             ensureCalls(client, indexName);
         }
     }
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
index 5598a88d410..5e0b2a029b8 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientSslTests.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
+import org.apache.pulsar.io.core.SinkContext;
 import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testcontainers.utility.MountableFile;
@@ -26,6 +27,7 @@ import org.testng.annotations.Test;
 import java.io.IOException;
 import java.time.Duration;
 
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
@@ -183,7 +185,7 @@ public abstract class ElasticSearchClientSslTests extends 
ElasticSearchTestBase
     }
 
     private void testClientWithConfig(ElasticSearchConfig config) throws 
IOException {
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));) {
             testIndexExists(client);
         }
     }
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index 6d9928c0426..31ad27ec051 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -20,6 +20,9 @@ package org.apache.pulsar.io.elasticsearch;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -32,16 +35,17 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
 import 
org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestClient;
 import 
org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient;
 import 
org.apache.pulsar.io.elasticsearch.testcontainers.ElasticToxiproxiContainer;
 import org.awaitility.Awaitility;
-import org.mockito.Mockito;
 import org.testcontainers.containers.Network;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testng.annotations.AfterClass;
@@ -109,7 +113,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
     public void testClientInstance() throws Exception {
         try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
                 .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
-                .setIndexName(INDEX))) {
+                .setIndexName(INDEX), mock(SinkContext.class));) {
             if (elasticImageName.equals(OPENSEARCH) || 
elasticImageName.equals(ELASTICSEARCH_7)) {
                 assertTrue(client.getRestClient() instanceof 
OpenSearchHighLevelRestClient);
             } else {
@@ -121,23 +125,23 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
     @Test
     public void testIndexName() throws Exception {
         String index = "myindex-" + UUID.randomUUID();
-        Record<GenericObject> record = Mockito.mock(Record.class);
+        Record<GenericObject> record = mock(Record.class);
         String topicName = "topic-" + UUID.randomUUID();
         when(record.getTopicName()).thenReturn(Optional.of(topicName));
         try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
                 .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
-                .setIndexName(index))) {
+                .setIndexName(index), mock(SinkContext.class))) {
             assertEquals(client.indexName(record), index);
         }
         try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
-                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress()))) {
+                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress()), mock(SinkContext.class))) {
             assertEquals(client.indexName(record), topicName);
         }
         String indexBase = "myindex-" + UUID.randomUUID();
         index = indexBase + "-%{+yyyy-MM-dd}";
         try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
                 .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
-                .setIndexName(index))) {
+                .setIndexName(index), mock(SinkContext.class))) {
             assertThrows(IllegalStateException.class, () -> {
                 client.indexName(record);
             });
@@ -145,7 +149,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
         when(record.getEventTime()).thenReturn(Optional.of(1645182000000L));
         try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
                 .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
-                .setIndexName(index))) {
+                .setIndexName(index), mock(SinkContext.class))) {
             assertEquals(client.indexName(record), indexBase + "-2022-02-18");
         }
     }
@@ -155,7 +159,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
         String index = "myindex-" + UUID.randomUUID();
         try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
                 .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
-                .setIndexName(index));) {
+                .setIndexName(index), mock(SinkContext.class));) {
             assertTrue(client.createIndexIfNeeded(index));
             try {
                 MockRecord<GenericObject> mockRecord = new MockRecord<>();
@@ -179,7 +183,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
         String index = "mynewindex-" + UUID.randomUUID();
         try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
                 .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
-                .setIndexName(index));) {
+                .setIndexName(index), mock(SinkContext.class));) {
             assertFalse(client.indexExists(index));
             assertTrue(client.createIndexIfNeeded(index));
             try {
@@ -194,7 +198,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
     @Test
     public void testTopicToIndexName() throws IOException {
         try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
-                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress()));) {
+                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress()), mock(SinkContext.class));) {
             assertEquals(client.topicToIndexName("data-ks1.table1"), 
"data-ks1.table1");
             
assertEquals(client.topicToIndexName("persistent://public/default/testesjson"), 
"testesjson");
             assertEquals(client.topicToIndexName("default/testesjson"), 
"testesjson");
@@ -217,13 +221,19 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                 .setBulkEnabled(true)
                 .setBulkFlushIntervalInMs(-1L)
                 
.setMalformedDocAction(ElasticSearchConfig.MalformedDocAction.FAIL);
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+        SinkContext sinkContext = mock(SinkContext.class);
+        AtomicReference<Throwable> irrecoverableError = new 
AtomicReference<>();
+        doAnswer(invocation -> {
+            irrecoverableError.compareAndSet(null, invocation.getArgument(0));
+            return null;
+        }).when(sinkContext).fatal(any(Throwable.class));
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
sinkContext);) {
             MockRecord<GenericObject> mockRecord = new MockRecord<>();
             client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
             client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
             client.flush();
-            assertNotNull(client.irrecoverableError.get());
-            
assertTrue(client.irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
+            assertNotNull(irrecoverableError.get());
+            
assertTrue(irrecoverableError.get().getMessage().contains("mapper_parsing_exception"));
             assertEquals(mockRecord.getAcked(), 1);
             assertEquals(mockRecord.getFailed(), 1);
             assertThrows(Exception.class, () -> client.bulkIndex(mockRecord, 
Pair.of("3", "{\"a\":3}")));
@@ -241,12 +251,18 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                 .setBulkEnabled(true)
                 .setBulkFlushIntervalInMs(-1)
                 
.setMalformedDocAction(ElasticSearchConfig.MalformedDocAction.IGNORE);
-        try (ElasticSearchClient client = new ElasticSearchClient(config);) {
+        SinkContext sinkContext = mock(SinkContext.class);
+        AtomicReference<Throwable> irrecoverableError = new 
AtomicReference<>();
+        doAnswer(invocation -> {
+            irrecoverableError.set(invocation.getArgument(0));
+            return null;
+        }).when(sinkContext).fatal(any(Throwable.class));
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));) {
             MockRecord<GenericObject> mockRecord = new MockRecord<>();
             client.bulkIndex(mockRecord, Pair.of("1", "{\"a\":1}"));
             client.bulkIndex(mockRecord, Pair.of("2", "{\"a\":\"toto\"}"));
             client.flush();
-            assertNull(client.irrecoverableError.get());
+            assertNull(irrecoverableError.get());
             assertEquals(mockRecord.getAcked(), 1);
             assertEquals(mockRecord.getFailed(), 1);
         }
@@ -268,7 +284,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                     // disabled, we want to have full control over flush() 
method
                     .setBulkFlushIntervalInMs(-1);
 
-            try (ElasticSearchClient client = new 
ElasticSearchClient(config);) {
+            try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));) {
                 try {
                     assertTrue(client.createIndexIfNeeded(index));
                     MockRecord<GenericObject> mockRecord = new MockRecord<>();
@@ -314,7 +330,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                     .setBulkConcurrentRequests(2)
                     .setRetryBackoffInMs(100)
                     .setBulkFlushIntervalInMs(10000);
-            try (ElasticSearchClient client = new 
ElasticSearchClient(config);) {
+            try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));) {
                 assertTrue(client.createIndexIfNeeded(index));
 
                 try {
@@ -373,7 +389,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                 .setBulkActions(10)
                 .setBulkFlushIntervalInMs(-1L);
 
-        try (ElasticSearchClient client = new ElasticSearchClient(config)) {
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class))) {
             assertTrue(client.createIndexIfNeeded(index));
             MockRecord<GenericObject> mockRecord = new MockRecord<>();
             for (int i = 0; i < 5; i++) {
@@ -397,7 +413,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                 .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
                 .setIndexName(index);
 
-        try (ElasticSearchClient client = new ElasticSearchClient(config)) {
+        try (ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class))) {
             MockRecord<GenericObject> mockRecord = new MockRecord<>();
             client.indexDocument(mockRecord, Pair.of("key0", 
"{\"a\":1,\"b\":null}"));
             final Map<String, Object> sourceAsMap;
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 0ce29683f86..8e828e9a395 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -20,8 +20,8 @@ package org.apache.pulsar.io.elasticsearch;
 
 import co.elastic.clients.transport.ElasticsearchTransport;
 import com.fasterxml.jackson.core.JsonParseException;
+import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -33,6 +33,8 @@ import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaType;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -73,6 +75,7 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.fail;
 
@@ -89,6 +92,7 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
 
     @Mock
     protected SinkContext mockSinkContext;
+    AtomicReference<Throwable> irrecoverableError = new AtomicReference<>();
     protected Map<String, Object> map;
     protected ElasticSearchSink sink;
 
@@ -135,6 +139,11 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
 
         mockRecord = mock(Record.class);
         mockSinkContext = mock(SinkContext.class);
+        irrecoverableError.set(null);
+        doAnswer(invocation -> {
+            irrecoverableError.set(invocation.getArgument(0));
+            return null;
+        }).when(mockSinkContext).fatal(any(Throwable.class));
 
         when(mockRecord.getValue()).thenAnswer((Answer<GenericObject>) 
invocation -> new GenericObject() {
             @Override
@@ -422,7 +431,7 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
         assertEquals(json, 
"{\"name\":null,\"userName\":\"boby\",\"email\":null}");
     }
 
-    @Test(expectedExceptions = 
PulsarClientException.InvalidMessageException.class)
+    @Test
     public void testNullValueFailure() throws Exception {
         String index = "testnullvaluefail";
         map.put("indexName", index);
@@ -431,6 +440,7 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
         sink.open(map, mockSinkContext);
         MockRecordNullValue mockRecordNullValue = new MockRecordNullValue();
         sink.write(mockRecordNullValue);
+        assertNotNull(irrecoverableError.get());
     }
 
     @Test
@@ -480,7 +490,7 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
         
assertEquals(sink.getElasticsearchClient().getRestClient().totalHits(index), 
1L);
         sink.write(new MockRecordNullValue());
         
assertEquals(sink.getElasticsearchClient().getRestClient().totalHits(index), 
action.equals(ElasticSearchConfig.NullValueAction.DELETE) ? 0L : 1L);
-        assertNull(sink.getElasticsearchClient().irrecoverableError.get());
+        assertNull(irrecoverableError.get());
     }
 
     @Test
@@ -517,7 +527,7 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
 
                 sink.close();
                 verify(restHighLevelClient).close();
-                verify(internalBulkProcessor).awaitClose(Mockito.anyLong(), 
Mockito.any(TimeUnit.class));
+                verify(internalBulkProcessor).awaitClose(Mockito.anyLong(), 
any(TimeUnit.class));
                 verify(client).close();
                 verify(restClient).close();
             } else {
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java
index de694613685..72bebfe2bbf 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/opensearch/OpenSearchClientSslTests.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.io.elasticsearch.opensearch;
 
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.elasticsearch.ElasticSearchClient;
 import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
 import org.apache.pulsar.io.elasticsearch.ElasticSearchSslConfig;
@@ -32,6 +33,7 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
@@ -81,7 +83,7 @@ public class OpenSearchClientSslTests extends 
ElasticSearchTestBase {
                             .setEnabled(true)
                             .setTruststorePath(sslResourceDir + 
"/truststore.jks")
                             .setTruststorePassword("changeit"));
-            ElasticSearchClient client = new ElasticSearchClient(config);
+            ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));
             testIndexExists(client);
         }
     }
@@ -107,7 +109,7 @@ public class OpenSearchClientSslTests extends 
ElasticSearchTestBase {
                             .setHostnameVerification(true)
                             .setTruststorePath(sslResourceDir + 
"/truststore.jks")
                             .setTruststorePassword("changeit"));
-            ElasticSearchClient client = new ElasticSearchClient(config);
+            ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));
             testIndexExists(client);
         }
     }
@@ -133,7 +135,7 @@ public class OpenSearchClientSslTests extends 
ElasticSearchTestBase {
                             .setTruststorePassword("changeit")
                             .setKeystorePath(sslResourceDir + "/keystore.jks")
                             .setKeystorePassword("changeit"));
-            ElasticSearchClient client = new ElasticSearchClient(config);
+            ElasticSearchClient client = new ElasticSearchClient(config, 
mock(SinkContext.class));
             testIndexExists(client);
         }
     }


Reply via email to