This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit d22798a3d303a27272ffd91ea3d251e76089b018 Author: Yang Yang <[email protected]> AuthorDate: Thu Dec 2 10:32:55 2021 +0800 [es-sink] Use topic name as the index name if indexName is not configured (#13064) ### Motivation According to the [document](https://github.com/apache/pulsar/blob/master/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java#L53), the default index name should be the topic name, but the code still uses `indexName` only when generating write requests which makes it a required config in practice. ### Modifications - Use the method `indexName()` for the index name of write requests which will be the topic name if `indexName` is not configured. - Extract methods `makeIndexRequest` and `makeDeleteRequest` for better code reuse and testability. - Add unit tests to check the index name of generated index/delete requests. (cherry picked from commit 14ea189cb96a4acd2c39ac9c049dfd2fbcf93de7) --- .../io/elasticsearch/ElasticSearchClient.java | 40 ++++++++++---------- .../io/elasticsearch/ElasticSearchClientTests.java | 43 ++++++++++++++++++++++ 2 files changed, 63 insertions(+), 20 deletions(-) diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java index af66795..b9e756e 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java @@ -254,16 +254,27 @@ public class ElasticSearchClient implements AutoCloseable { } } + IndexRequest makeIndexRequest(Record<GenericObject> record, Pair<String, String> idAndDoc) throws IOException { + IndexRequest indexRequest = Requests.indexRequest(indexName(record.getTopicName())); + if (!Strings.isNullOrEmpty(idAndDoc.getLeft())) + indexRequest.id(idAndDoc.getLeft()); + indexRequest.type(config.getTypeName()); + indexRequest.source(idAndDoc.getRight(), XContentType.JSON); + return indexRequest; + } + + DeleteRequest makeDeleteRequest(Record<GenericObject> record, String id) throws IOException { + DeleteRequest deleteRequest = Requests.deleteRequest(indexName(record.getTopicName())); + deleteRequest.id(id); + deleteRequest.type(config.getTypeName()); + return deleteRequest; + } + public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws Exception { try { checkNotFailed(); checkIndexExists(record.getTopicName()); - IndexRequest indexRequest = Requests.indexRequest(config.getIndexName()); - if (!Strings.isNullOrEmpty(idAndDoc.getLeft())) - indexRequest.id(idAndDoc.getLeft()); - indexRequest.type(config.getTypeName()); - indexRequest.source(idAndDoc.getRight(), XContentType.JSON); - + IndexRequest indexRequest = makeIndexRequest(record, idAndDoc); records.put(indexRequest, record); bulkProcessor.add(indexRequest); } catch(Exception e) { @@ -284,12 +295,7 @@ public class ElasticSearchClient implements AutoCloseable { try { checkNotFailed(); checkIndexExists(record.getTopicName()); - IndexRequest indexRequest = Requests.indexRequest(config.getIndexName()); - if (!Strings.isNullOrEmpty(idAndDoc.getLeft())) - indexRequest.id(idAndDoc.getLeft()); - indexRequest.type(config.getTypeName()); - indexRequest.source(idAndDoc.getRight(), XContentType.JSON); - IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT); + IndexResponse indexResponse = client.index(makeIndexRequest(record, idAndDoc), RequestOptions.DEFAULT); if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED) || indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) { record.ack(); @@ -309,10 +315,7 @@ public class ElasticSearchClient implements AutoCloseable { try { checkNotFailed(); checkIndexExists(record.getTopicName()); - DeleteRequest deleteRequest = Requests.deleteRequest(config.getIndexName()); - deleteRequest.id(id); - deleteRequest.type(config.getTypeName()); - + DeleteRequest deleteRequest = makeDeleteRequest(record, id); records.put(deleteRequest, record); bulkProcessor.add(deleteRequest); } catch(Exception e) { @@ -333,10 +336,7 @@ public class ElasticSearchClient implements AutoCloseable { try { checkNotFailed(); checkIndexExists(record.getTopicName()); - DeleteRequest deleteRequest = Requests.deleteRequest(config.getIndexName()); - deleteRequest.id(id); - deleteRequest.type(config.getTypeName()); - DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT); + DeleteResponse deleteResponse = client.delete(makeDeleteRequest(record, id), RequestOptions.DEFAULT); log.debug("delete result=" + deleteResponse.getResult()); if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED) || deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) { diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java index fb927c5..a831d5c 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java @@ -24,7 +24,10 @@ import org.apache.pulsar.client.api.schema.GenericObject; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.elasticsearch.testcontainers.ChaosContainer; import org.awaitility.Awaitility; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.index.IndexRequest; import org.junit.AfterClass; +import org.mockito.Mockito; import org.testcontainers.elasticsearch.ElasticsearchContainer; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -34,6 +37,8 @@ import java.util.Optional; import java.util.UUID; import static org.junit.Assert.*; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; import static org.hamcrest.MatcherAssert.assertThat; @@ -79,6 +84,44 @@ public class ElasticSearchClientTests { } @Test + public void testIndexRequest() throws Exception { + String index = "myindex-" + UUID.randomUUID(); + Record<GenericObject> record = Mockito.mock(Record.class); + String topicName = "topic-" + UUID.randomUUID(); + when(record.getTopicName()).thenReturn(Optional.of(topicName)); + try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig() + .setElasticSearchUrl("http://" + container.getHttpHostAddress()) + .setIndexName(index))) { + IndexRequest request = client.makeIndexRequest(record, Pair.of("1", "{ \"a\":1}")); + assertEquals(request.index(), index); + } + try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig() + .setElasticSearchUrl("http://" + container.getHttpHostAddress()))) { + IndexRequest request = client.makeIndexRequest(record, Pair.of("1", "{ \"a\":1}")); + assertEquals(request.index(), topicName); + } + } + + @Test + public void testDeleteRequest() throws Exception { + String index = "myindex-" + UUID.randomUUID(); + Record<GenericObject> record = Mockito.mock(Record.class); + String topicName = "topic-" + UUID.randomUUID(); + when(record.getTopicName()).thenReturn(Optional.of(topicName)); + try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig() + .setElasticSearchUrl("http://" + container.getHttpHostAddress()) + .setIndexName(index))) { + DeleteRequest request = client.makeDeleteRequest(record, "1"); + assertEquals(request.index(), index); + } + try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig() + .setElasticSearchUrl("http://" + container.getHttpHostAddress()))) { + DeleteRequest request = client.makeDeleteRequest(record, "1"); + assertEquals(request.index(), topicName); + } + } + + @Test public void testIndexDelete() throws Exception { String index = "myindex-" + UUID.randomUUID(); try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig()
