This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 97a88255717 [fix][pulsar-io] Fix memory leak in ElasticSearch sink 
(#15125)
97a88255717 is described below

commit 97a882557179071b6e039e5fd1d76b2f18dfa8c1
Author: Nicolò Boschi <[email protected]>
AuthorDate: Wed May 4 17:35:44 2022 +0200

    [fix][pulsar-io] Fix memory leak in ElasticSearch sink (#15125)
---
 .../io/elasticsearch/ElasticSearchClient.java      | 21 ++------
 .../io/elasticsearch/client/BulkProcessor.java     |  5 +-
 .../client/elastic/ElasticBulkProcessor.java       | 40 +++++++-------
 .../opensearch/OpenSearchHighLevelRestClient.java  | 63 +++++++++++++++++-----
 .../io/elasticsearch/ElasticSearchClientTests.java | 28 +++++++++-
 .../io/elasticsearch/ElasticSearchSinkTests.java   | 43 +++++++++++++--
 6 files changed, 146 insertions(+), 54 deletions(-)

diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index 21ec35c7158..dcbdfbc64fb 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -28,9 +28,6 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
@@ -56,9 +53,7 @@ public class ElasticSearchClient implements AutoCloseable {
     final Set<String> indexCache = new HashSet<>();
     final Map<String, String> topicToIndexCache = new HashMap<>();
 
-    final ConcurrentMap<Long, Record> records = new ConcurrentHashMap<>();
     final AtomicReference<Exception> irrecoverableError = new 
AtomicReference<>();
-    final AtomicLong bulkOperationIdGenerator = new AtomicLong();
     private final IndexNameFormatter indexNameFormatter;
 
     public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) {
@@ -69,9 +64,6 @@ public class ElasticSearchClient implements AutoCloseable {
             this.indexNameFormatter = null;
         }
         final BulkProcessor.Listener bulkListener = new 
BulkProcessor.Listener() {
-            private Record 
removeAndGetRecordForOperation(BulkProcessor.BulkOperationRequest operation) {
-                return records.remove(operation.getOperationId());
-            }
 
             @Override
             public void afterBulk(long executionId, 
List<BulkProcessor.BulkOperationRequest> bulkOperationList,
@@ -81,7 +73,7 @@ public class ElasticSearchClient implements AutoCloseable {
                 }
                 int index = 0;
                 for (BulkProcessor.BulkOperationResult result: results) {
-                    final Record record = 
removeAndGetRecordForOperation(bulkOperationList.get(index++));
+                    final Record record = 
bulkOperationList.get(index++).getPulsarRecord();
                     if (result.isError()) {
                         record.fail();
                         checkForIrrecoverableError(result);
@@ -95,7 +87,7 @@ public class ElasticSearchClient implements AutoCloseable {
             public void afterBulk(long executionId, 
List<BulkProcessor.BulkOperationRequest> bulkOperationList, Throwable 
throwable) {
                 log.warn("Bulk request id={} failed:", executionId, throwable);
                 for (BulkProcessor.BulkOperationRequest operation: 
bulkOperationList) {
-                    final Record record = 
removeAndGetRecordForOperation(operation);
+                    final Record record = operation.getPulsarRecord();
                     record.fail();
                 }
             }
@@ -150,15 +142,12 @@ public class ElasticSearchClient implements AutoCloseable 
{
             final String documentId = idAndDoc.getLeft();
             final String documentSource = idAndDoc.getRight();
 
-            final long operationId = 
bulkOperationIdGenerator.incrementAndGet();
             final BulkProcessor.BulkIndexRequest bulkIndexRequest = 
BulkProcessor.BulkIndexRequest.builder()
                     .index(indexName)
                     .documentId(documentId)
                     .documentSource(documentSource)
-                    .requestId(operationId)
+                    .record(record)
                     .build();
-
-            records.put(operationId, record);
             client.getBulkProcessor().appendIndexRequest(bulkIndexRequest);
         } catch (Exception e) {
             log.debug("index failed id=" + idAndDoc.getLeft(), e);
@@ -203,14 +192,12 @@ public class ElasticSearchClient implements AutoCloseable 
{
             checkIndexExists(record);
 
             final String indexName = indexName(record);
-            final long operationId = 
bulkOperationIdGenerator.incrementAndGet();
             final BulkProcessor.BulkDeleteRequest bulkDeleteRequest = 
BulkProcessor.BulkDeleteRequest.builder()
                     .index(indexName)
                     .documentId(id)
-                    .requestId(operationId)
+                    .record(record)
                     .build();
 
-            records.put(operationId, record);
             client.getBulkProcessor().appendDeleteRequest(bulkDeleteRequest);
         } catch (Exception e) {
             log.debug("delete failed id: {}", id, e);
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/BulkProcessor.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/BulkProcessor.java
index 39d7ea8391b..709d4212390 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/BulkProcessor.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/BulkProcessor.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.List;
 import lombok.Builder;
 import lombok.Getter;
+import org.apache.pulsar.functions.api.Record;
 
 /**
  * Processor for "bulk" call to the Elastic REST Endpoint.
@@ -32,7 +33,7 @@ public interface BulkProcessor extends Closeable {
     @Builder
     @Getter
     class BulkOperationRequest {
-        private long operationId;
+        private Record pulsarRecord;
     }
 
     @Builder
@@ -57,6 +58,7 @@ public interface BulkProcessor extends Closeable {
     @Builder
     @Getter
     class BulkIndexRequest {
+        private Record record;
         private long requestId;
         private String index;
         private String documentId;
@@ -66,6 +68,7 @@ public interface BulkProcessor extends Closeable {
     @Builder
     @Getter
     class BulkDeleteRequest {
+        private Record record;
         private long requestId;
         private String index;
         private String documentId;
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
index 70edd353fc4..3754c0c7acb 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.java
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
 import org.apache.pulsar.io.elasticsearch.RandomExponentialRetry;
 import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
@@ -56,7 +57,7 @@ public class ElasticBulkProcessor implements BulkProcessor {
     private final AtomicLong executionIdGen = new AtomicLong();
     private final int bulkActions;
     private final long bulkSize;
-    private final List<BulkOperationWithId> pendingOperations = new 
ArrayList<>();
+    private final List<BulkOperationWithPulsarRecord> pendingOperations = new 
ArrayList<>();
     private final BulkRequestHandler bulkRequestHandler;
     private volatile boolean closed = false;
     private final ReentrantLock lock;
@@ -99,7 +100,7 @@ public class ElasticBulkProcessor implements BulkProcessor {
         if (config.getBulkSizeInMb() > 0) {
             sourceLength = 
request.getDocumentSource().getBytes(StandardCharsets.UTF_8).length;
         }
-        add(BulkOperationWithId.indexOperation(indexOperation, 
request.getRequestId(), sourceLength));
+        add(BulkOperationWithPulsarRecord.indexOperation(indexOperation, 
request.getRecord(), sourceLength));
     }
 
     @Override
@@ -108,7 +109,7 @@ public class ElasticBulkProcessor implements BulkProcessor {
                 .index(request.getIndex())
                 .id(request.getDocumentId())
                 .build();
-        add(BulkOperationWithId.deleteOperation(deleteOperation, 
request.getRequestId()));
+        add(BulkOperationWithPulsarRecord.deleteOperation(deleteOperation, 
request.getRecord()));
     }
 
     protected void ensureOpen() {
@@ -169,7 +170,7 @@ public class ElasticBulkProcessor implements BulkProcessor {
         execute(false);
     }
 
-    public void add(BulkOperationWithId bulkOperation) {
+    public void add(BulkOperationWithPulsarRecord bulkOperation) {
         lock.lock();
         try {
             ensureOpen();
@@ -202,7 +203,7 @@ public class ElasticBulkProcessor implements BulkProcessor {
         }
     }
 
-    public static class BulkOperationWithId extends BulkOperation {
+    public static class BulkOperationWithPulsarRecord extends BulkOperation {
 
         /**
          * REQUEST_OVERHEAD:
@@ -211,29 +212,30 @@ public class ElasticBulkProcessor implements 
BulkProcessor {
          */
         private static final int REQUEST_OVERHEAD = 50;
 
-        public static BulkOperationWithId indexOperation(IndexOperation 
indexOperation,
-                                                         long operationId,
-                                                         long sourceLength) {
+        public static BulkOperationWithPulsarRecord 
indexOperation(IndexOperation indexOperation,
+                                                                   Record 
pulsarRecord,
+                                                                   long 
sourceLength) {
             long estimatedSizeInBytes = REQUEST_OVERHEAD + sourceLength;
-            return new BulkOperationWithId(indexOperation, operationId, 
estimatedSizeInBytes);
+            return new BulkOperationWithPulsarRecord(indexOperation, 
pulsarRecord, estimatedSizeInBytes);
         }
 
-        public static BulkOperationWithId deleteOperation(DeleteOperation 
indexOperation,
-                                                          long operationId) {
-            return new BulkOperationWithId(indexOperation, operationId, 
REQUEST_OVERHEAD);
+        public static BulkOperationWithPulsarRecord 
deleteOperation(DeleteOperation indexOperation,
+                                                                    Record 
pulsarRecord) {
+            return new BulkOperationWithPulsarRecord(indexOperation, 
pulsarRecord, REQUEST_OVERHEAD);
         }
 
-        private final long operationId;
+        private final Record pulsarRecord;
         private final long estimatedSizeInBytes;
 
-        public BulkOperationWithId(BulkOperationVariant value, long 
operationId, long estimatedSizeInBytes) {
+        public BulkOperationWithPulsarRecord(BulkOperationVariant value,
+                                             Record pulsarRecord, long 
estimatedSizeInBytes) {
             super(value);
-            this.operationId = operationId;
+            this.pulsarRecord = pulsarRecord;
             this.estimatedSizeInBytes = estimatedSizeInBytes;
         }
 
-        public long getOperationId() {
-            return operationId;
+        public Record getPulsarRecord() {
+            return pulsarRecord;
         }
 
         public long getEstimatedSizeInBytes() {
@@ -328,9 +330,9 @@ public class ElasticBulkProcessor implements BulkProcessor {
 
         private List<BulkOperationRequest> convertBulkRequest(BulkRequest 
bulkRequest) {
             return bulkRequest.operations().stream().map(op -> {
-                        BulkOperationWithId opWithId = (BulkOperationWithId) 
op;
+                        BulkOperationWithPulsarRecord opWithRecord = 
(BulkOperationWithPulsarRecord) op;
                         return BulkOperationRequest.builder()
-                                .operationId(opWithId.getOperationId())
+                                .pulsarRecord(opWithRecord.getPulsarRecord())
                                 .build();
                     }).collect(Collectors.toList());
         }
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
index b48f103c4f1..1a939774c4c 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/opensearch/OpenSearchHighLevelRestClient.java
@@ -23,18 +23,15 @@ import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.HttpHost;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
 import org.apache.pulsar.io.elasticsearch.RandomExponentialRetry;
 import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
 import org.apache.pulsar.io.elasticsearch.client.RestClient;
-import org.elasticsearch.client.Node;
-import org.opensearch.action.DocWriteRequest;
 import org.opensearch.action.DocWriteResponse;
 import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
 import org.opensearch.action.admin.indices.refresh.RefreshRequest;
@@ -65,9 +62,43 @@ import org.opensearch.search.builder.SearchSourceBuilder;
 @Slf4j
 public class OpenSearchHighLevelRestClient extends RestClient implements 
BulkProcessor {
 
+    private interface DocWriteRequestWithPulsarRecord {
+        Record getPulsarRecord();
+    }
+
+    private static class IndexRequestWithPulsarRecord extends IndexRequest 
implements DocWriteRequestWithPulsarRecord {
+        private Record pulsarRecord;
+
+        public IndexRequestWithPulsarRecord(String index, Record pulsarRecord) 
{
+            super(index);
+            this.pulsarRecord = pulsarRecord;
+        }
+
+        @Override
+        public Record getPulsarRecord() {
+            return pulsarRecord;
+        }
+    }
+
+
+    private static class DeleteRequestWithPulsarRecord
+            extends DeleteRequest
+            implements DocWriteRequestWithPulsarRecord {
+        private Record pulsarRecord;
+
+        public DeleteRequestWithPulsarRecord(String index, Record 
pulsarRecord) {
+            super(index);
+            this.pulsarRecord = pulsarRecord;
+        }
+
+        @Override
+        public Record getPulsarRecord() {
+            return pulsarRecord;
+        }
+    }
+
     private RestHighLevelClient client;
     private org.opensearch.action.bulk.BulkProcessor internalBulkProcessor;
-    private final ConcurrentMap<DocWriteRequest<?>, Long> bulkRequestMappings 
= new ConcurrentHashMap<>();
 
     public OpenSearchHighLevelRestClient(ElasticSearchConfig 
elasticSearchConfig,
                                          BulkProcessor.Listener 
bulkProcessorListener) {
@@ -83,7 +114,8 @@ public class OpenSearchHighLevelRestClient extends 
RestClient implements BulkPro
                         .setSocketTimeout(config.getSocketTimeoutInMs()))
                 .setHttpClientConfigCallback(this.configCallback)
                 .setFailureListener(new 
org.opensearch.client.RestClient.FailureListener() {
-                    public void onFailure(Node node) {
+                    @Override
+                    public void onFailure(org.opensearch.client.Node node) {
                         log.warn("Node host={} failed", node.getHost());
                     }
                 });
@@ -101,9 +133,17 @@ public class OpenSearchHighLevelRestClient extends 
RestClient implements BulkPro
                                 private 
List<BulkProcessor.BulkOperationRequest>
                                         convertBulkRequest(BulkRequest 
bulkRequest) {
                                     return 
bulkRequest.requests().stream().map(docWriteRequest -> {
-                                        long requestId = 
bulkRequestMappings.get(docWriteRequest);
+                                        final Record pulsarRecord;
+                                        if (docWriteRequest instanceof 
DocWriteRequestWithPulsarRecord) {
+                                            DocWriteRequestWithPulsarRecord 
requestWithId =
+                                                    
(DocWriteRequestWithPulsarRecord) docWriteRequest;
+                                            pulsarRecord = 
requestWithId.getPulsarRecord();
+                                        } else {
+                                            throw new 
UnsupportedOperationException("Unexpected bulk request of type: "
+                                                    + 
docWriteRequest.getClass());
+                                        }
                                         return 
BulkProcessor.BulkOperationRequest.builder()
-                                                .operationId(requestId)
+                                                .pulsarRecord(pulsarRecord)
                                                 .build();
                                     }).collect(Collectors.toList());
                                 }
@@ -235,25 +275,22 @@ public class OpenSearchHighLevelRestClient extends 
RestClient implements BulkPro
         return this;
     }
 
-
     @Override
     public void appendIndexRequest(BulkProcessor.BulkIndexRequest request) 
throws IOException {
-        IndexRequest indexRequest = Requests.indexRequest(request.getIndex());
+        IndexRequest indexRequest = new 
IndexRequestWithPulsarRecord(request.getIndex(), request.getRecord());
         if (!Strings.isNullOrEmpty(request.getDocumentId())) {
             indexRequest.id(request.getDocumentId());
         }
         indexRequest.type(config.getTypeName());
         indexRequest.source(request.getDocumentSource(), XContentType.JSON);
-        bulkRequestMappings.put(indexRequest, request.getRequestId());
         internalBulkProcessor.add(indexRequest);
     }
 
     @Override
     public void appendDeleteRequest(BulkProcessor.BulkDeleteRequest request) 
throws IOException {
-        DeleteRequest deleteRequest = 
Requests.deleteRequest(request.getIndex());
+        DeleteRequest deleteRequest = new 
DeleteRequestWithPulsarRecord(request.getIndex(), request.getRecord());
         deleteRequest.id(request.getDocumentId());
         deleteRequest.type(config.getTypeName());
-        bulkRequestMappings.put(deleteRequest, request.getRequestId());
         internalBulkProcessor.add(deleteRequest);
     }
 
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index 7ac86cd2ef0..7ade25046f1 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -346,7 +346,6 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
                     Awaitility.await().untilAsserted(() -> {
                         assertEquals(mockRecord.acked, 15);
                         assertEquals(mockRecord.failed, 0);
-                        assertEquals(client.records.size(), 0);
                     });
 
                 } finally {
@@ -356,4 +355,31 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
         }
     }
 
+    @Test
+    public void testBulkIndexAndDelete() throws Exception {
+        final String index = "indexbulktest-" + UUID.randomUUID();
+        ElasticSearchConfig config = new ElasticSearchConfig()
+                .setElasticSearchUrl("http://"+container.getHttpHostAddress())
+                .setIndexName(index)
+                .setBulkEnabled(true)
+                .setBulkActions(10)
+                .setBulkFlushIntervalInMs(-1L);
+
+        try (ElasticSearchClient client = new ElasticSearchClient(config)) {
+            assertTrue(client.createIndexIfNeeded(index));
+            MockRecord<GenericObject> mockRecord = new MockRecord<>();
+            for (int i = 0; i < 5; i++) {
+                client.bulkIndex(mockRecord, Pair.of("key" + i, "{\"a\":" + i 
+ "}"));
+                client.bulkDelete(mockRecord, "key" + i);
+            }
+            assertEquals(mockRecord.acked, 10);
+            assertEquals(mockRecord.failed, 0);
+            assertEquals(client.getRestClient().totalHits(index), 0);
+            // no effect
+            client.flush();
+
+            assertEquals(mockRecord.acked, 10);
+        }
+    }
+
 }
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 6ffed0f4277..5c0651f3720 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -19,11 +19,13 @@
 package org.apache.pulsar.io.elasticsearch;
 
 import co.elastic.clients.transport.ElasticsearchTransport;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.impl.MessageImpl;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaType;
@@ -35,11 +37,13 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
-
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
@@ -60,8 +64,6 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import java.util.Locale;
-import java.util.concurrent.TimeUnit;
 
 import static org.testng.Assert.assertNull;
 
@@ -207,6 +209,41 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
     }
 
     @Test
+    public final void sendNoSchemaTest() throws Exception {
+
+        when(mockRecord.getMessage()).thenAnswer(new 
Answer<Optional<Message<String>>>() {
+            @Override
+            public Optional<Message<String>> answer(InvocationOnMock 
invocation) throws Throwable {
+                final MessageImpl mock = mock(MessageImpl.class);
+                
when(mock.getData()).thenReturn("{\"a\":1}".getBytes(StandardCharsets.UTF_8));
+                return Optional.of(mock);
+            }
+        });
+
+        when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+            public Optional<String> answer(InvocationOnMock invocation) throws 
Throwable {
+                return null;
+            }});
+
+
+        when(mockRecord.getValue()).thenAnswer(new Answer<String>() {
+            public String answer(InvocationOnMock invocation) throws Throwable 
{
+                return "hello";
+            }});
+
+        when(mockRecord.getSchema()).thenAnswer(new Answer<Schema>() {
+            public Schema answer(InvocationOnMock invocation) throws Throwable 
{
+                return Schema.STRING;
+            }});
+
+        map.put("indexName", "test-index");
+        map.put("schemaEnable", "false");
+        sink.open(map, mockSinkContext);
+        sink.write(mockRecord);
+        verify(mockRecord, times(1)).ack();
+    }
+
+    @Test(enabled = true)
     public final void sendKeyIgnoreSingleField() throws Exception {
         final String index = "testkeyignore";
         map.put("indexName", index);

Reply via email to