This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 14ea189  [es-sink] Use topic name as the index name if indexName is 
not configured (#13064)
14ea189 is described below

commit 14ea189cb96a4acd2c39ac9c049dfd2fbcf93de7
Author: Yang Yang <[email protected]>
AuthorDate: Thu Dec 2 10:32:55 2021 +0800

    [es-sink] Use topic name as the index name if indexName is not configured 
(#13064)
    
    ### Motivation
    
    According to the 
[document](https://github.com/apache/pulsar/blob/master/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java#L53),
 the default index name should be the topic name, but the code still uses 
`indexName` only when generating write requests which makes it a required 
config in practice.
    
    ### Modifications
    
    - Use the method `indexName()` for the index name of write requests which 
will be the topic name if `indexName` is not configured.
    - Extract methods `makeIndexRequest` and `makeDeleteRequest` for better 
code reuse and testability.
    - Add unit tests to check the index name of generated index/delete requests.
---
 .../io/elasticsearch/ElasticSearchClient.java      | 40 ++++++++++----------
 .../io/elasticsearch/ElasticSearchClientTests.java | 43 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index af66795..b9e756e 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -254,16 +254,27 @@ public class ElasticSearchClient implements AutoCloseable 
{
         }
     }
 
+    IndexRequest makeIndexRequest(Record<GenericObject> record, Pair<String, 
String> idAndDoc) throws IOException {
+        IndexRequest indexRequest = 
Requests.indexRequest(indexName(record.getTopicName()));
+        if (!Strings.isNullOrEmpty(idAndDoc.getLeft()))
+            indexRequest.id(idAndDoc.getLeft());
+        indexRequest.type(config.getTypeName());
+        indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
+        return indexRequest;
+    }
+
+    DeleteRequest makeDeleteRequest(Record<GenericObject> record, String id) 
throws IOException {
+        DeleteRequest deleteRequest = 
Requests.deleteRequest(indexName(record.getTopicName()));
+        deleteRequest.id(id);
+        deleteRequest.type(config.getTypeName());
+        return deleteRequest;
+    }
+
     public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws 
Exception {
         try {
             checkNotFailed();
             checkIndexExists(record.getTopicName());
-            IndexRequest indexRequest = 
Requests.indexRequest(config.getIndexName());
-            if (!Strings.isNullOrEmpty(idAndDoc.getLeft()))
-                indexRequest.id(idAndDoc.getLeft());
-            indexRequest.type(config.getTypeName());
-            indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
-
+            IndexRequest indexRequest = makeIndexRequest(record, idAndDoc);
             records.put(indexRequest, record);
             bulkProcessor.add(indexRequest);
         } catch(Exception e) {
@@ -284,12 +295,7 @@ public class ElasticSearchClient implements AutoCloseable {
         try {
             checkNotFailed();
             checkIndexExists(record.getTopicName());
-            IndexRequest indexRequest = 
Requests.indexRequest(config.getIndexName());
-            if (!Strings.isNullOrEmpty(idAndDoc.getLeft()))
-                indexRequest.id(idAndDoc.getLeft());
-            indexRequest.type(config.getTypeName());
-            indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
-            IndexResponse indexResponse = client.index(indexRequest, 
RequestOptions.DEFAULT);
+            IndexResponse indexResponse = 
client.index(makeIndexRequest(record, idAndDoc), RequestOptions.DEFAULT);
             if 
(indexResponse.getResult().equals(DocWriteResponse.Result.CREATED) ||
                     
indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) {
                 record.ack();
@@ -309,10 +315,7 @@ public class ElasticSearchClient implements AutoCloseable {
         try {
             checkNotFailed();
             checkIndexExists(record.getTopicName());
-            DeleteRequest deleteRequest = 
Requests.deleteRequest(config.getIndexName());
-            deleteRequest.id(id);
-            deleteRequest.type(config.getTypeName());
-
+            DeleteRequest deleteRequest = makeDeleteRequest(record, id);
             records.put(deleteRequest, record);
             bulkProcessor.add(deleteRequest);
         } catch(Exception e) {
@@ -333,10 +336,7 @@ public class ElasticSearchClient implements AutoCloseable {
         try {
             checkNotFailed();
             checkIndexExists(record.getTopicName());
-            DeleteRequest deleteRequest = 
Requests.deleteRequest(config.getIndexName());
-            deleteRequest.id(id);
-            deleteRequest.type(config.getTypeName());
-            DeleteResponse deleteResponse = client.delete(deleteRequest, 
RequestOptions.DEFAULT);
+            DeleteResponse deleteResponse = 
client.delete(makeDeleteRequest(record, id), RequestOptions.DEFAULT);
             log.debug("delete result=" + deleteResponse.getResult());
             if 
(deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED) ||
                     
deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index fb927c5..a831d5c 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -24,7 +24,10 @@ import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.elasticsearch.testcontainers.ChaosContainer;
 import org.awaitility.Awaitility;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
 import org.junit.AfterClass;
+import org.mockito.Mockito;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -34,6 +37,8 @@ import java.util.Optional;
 import java.util.UUID;
 
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -79,6 +84,44 @@ public class ElasticSearchClientTests {
     }
 
     @Test
+    public void testIndexRequest() throws Exception {
+        String index = "myindex-" + UUID.randomUUID();
+        Record<GenericObject> record = Mockito.mock(Record.class);
+        String topicName = "topic-" + UUID.randomUUID();
+        when(record.getTopicName()).thenReturn(Optional.of(topicName));
+        try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
+                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
+                .setIndexName(index))) {
+            IndexRequest request = client.makeIndexRequest(record, 
Pair.of("1", "{ \"a\":1}"));
+            assertEquals(request.index(), index);
+        }
+        try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
+                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress()))) {
+            IndexRequest request = client.makeIndexRequest(record, 
Pair.of("1", "{ \"a\":1}"));
+            assertEquals(request.index(), topicName);
+        }
+    }
+
+    @Test
+    public void testDeleteRequest() throws Exception {
+        String index = "myindex-" + UUID.randomUUID();
+        Record<GenericObject> record = Mockito.mock(Record.class);
+        String topicName = "topic-" + UUID.randomUUID();
+        when(record.getTopicName()).thenReturn(Optional.of(topicName));
+        try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
+                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
+                .setIndexName(index))) {
+            DeleteRequest request = client.makeDeleteRequest(record, "1");
+            assertEquals(request.index(), index);
+        }
+        try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
+                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress()))) {
+            DeleteRequest request = client.makeDeleteRequest(record, "1");
+            assertEquals(request.index(), topicName);
+        }
+    }
+
+    @Test
     public void testIndexDelete() throws Exception {
         String index = "myindex-" + UUID.randomUUID();
         try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()

Reply via email to