This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 14ea189 [es-sink] Use topic name as the index name if indexName is
not configured (#13064)
14ea189 is described below
commit 14ea189cb96a4acd2c39ac9c049dfd2fbcf93de7
Author: Yang Yang <[email protected]>
AuthorDate: Thu Dec 2 10:32:55 2021 +0800
[es-sink] Use topic name as the index name if indexName is not configured
(#13064)
### Motivation
According to the
[document](https://github.com/apache/pulsar/blob/master/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java#L53),
the default index name should be the topic name, but the code still uses
`indexName` only when generating write requests which makes it a required
config in practice.
### Modifications
- Use the method `indexName()` for the index name of write requests which
will be the topic name if `indexName` is not configured.
- Extract methods `makeIndexRequest` and `makeDeleteRequest` for better
code reuse and testability.
- Add unit tests to check the index name of generated index/delete requests.
---
.../io/elasticsearch/ElasticSearchClient.java | 40 ++++++++++----------
.../io/elasticsearch/ElasticSearchClientTests.java | 43 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 20 deletions(-)
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
index af66795..b9e756e 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java
@@ -254,16 +254,27 @@ public class ElasticSearchClient implements AutoCloseable
{
}
}
+ IndexRequest makeIndexRequest(Record<GenericObject> record, Pair<String,
String> idAndDoc) throws IOException {
+ IndexRequest indexRequest =
Requests.indexRequest(indexName(record.getTopicName()));
+ if (!Strings.isNullOrEmpty(idAndDoc.getLeft()))
+ indexRequest.id(idAndDoc.getLeft());
+ indexRequest.type(config.getTypeName());
+ indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
+ return indexRequest;
+ }
+
+ DeleteRequest makeDeleteRequest(Record<GenericObject> record, String id)
throws IOException {
+ DeleteRequest deleteRequest =
Requests.deleteRequest(indexName(record.getTopicName()));
+ deleteRequest.id(id);
+ deleteRequest.type(config.getTypeName());
+ return deleteRequest;
+ }
+
public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws
Exception {
try {
checkNotFailed();
checkIndexExists(record.getTopicName());
- IndexRequest indexRequest =
Requests.indexRequest(config.getIndexName());
- if (!Strings.isNullOrEmpty(idAndDoc.getLeft()))
- indexRequest.id(idAndDoc.getLeft());
- indexRequest.type(config.getTypeName());
- indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
-
+ IndexRequest indexRequest = makeIndexRequest(record, idAndDoc);
records.put(indexRequest, record);
bulkProcessor.add(indexRequest);
} catch(Exception e) {
@@ -284,12 +295,7 @@ public class ElasticSearchClient implements AutoCloseable {
try {
checkNotFailed();
checkIndexExists(record.getTopicName());
- IndexRequest indexRequest =
Requests.indexRequest(config.getIndexName());
- if (!Strings.isNullOrEmpty(idAndDoc.getLeft()))
- indexRequest.id(idAndDoc.getLeft());
- indexRequest.type(config.getTypeName());
- indexRequest.source(idAndDoc.getRight(), XContentType.JSON);
- IndexResponse indexResponse = client.index(indexRequest,
RequestOptions.DEFAULT);
+ IndexResponse indexResponse =
client.index(makeIndexRequest(record, idAndDoc), RequestOptions.DEFAULT);
if
(indexResponse.getResult().equals(DocWriteResponse.Result.CREATED) ||
indexResponse.getResult().equals(DocWriteResponse.Result.UPDATED)) {
record.ack();
@@ -309,10 +315,7 @@ public class ElasticSearchClient implements AutoCloseable {
try {
checkNotFailed();
checkIndexExists(record.getTopicName());
- DeleteRequest deleteRequest =
Requests.deleteRequest(config.getIndexName());
- deleteRequest.id(id);
- deleteRequest.type(config.getTypeName());
-
+ DeleteRequest deleteRequest = makeDeleteRequest(record, id);
records.put(deleteRequest, record);
bulkProcessor.add(deleteRequest);
} catch(Exception e) {
@@ -333,10 +336,7 @@ public class ElasticSearchClient implements AutoCloseable {
try {
checkNotFailed();
checkIndexExists(record.getTopicName());
- DeleteRequest deleteRequest =
Requests.deleteRequest(config.getIndexName());
- deleteRequest.id(id);
- deleteRequest.type(config.getTypeName());
- DeleteResponse deleteResponse = client.delete(deleteRequest,
RequestOptions.DEFAULT);
+ DeleteResponse deleteResponse =
client.delete(makeDeleteRequest(record, id), RequestOptions.DEFAULT);
log.debug("delete result=" + deleteResponse.getResult());
if
(deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED) ||
deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index fb927c5..a831d5c 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -24,7 +24,10 @@ import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.elasticsearch.testcontainers.ChaosContainer;
import org.awaitility.Awaitility;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.index.IndexRequest;
import org.junit.AfterClass;
+import org.mockito.Mockito;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -34,6 +37,8 @@ import java.util.Optional;
import java.util.UUID;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -79,6 +84,44 @@ public class ElasticSearchClientTests {
}
@Test
+ public void testIndexRequest() throws Exception {
+ String index = "myindex-" + UUID.randomUUID();
+ Record<GenericObject> record = Mockito.mock(Record.class);
+ String topicName = "topic-" + UUID.randomUUID();
+ when(record.getTopicName()).thenReturn(Optional.of(topicName));
+ try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
+ .setElasticSearchUrl("http://" +
container.getHttpHostAddress())
+ .setIndexName(index))) {
+ IndexRequest request = client.makeIndexRequest(record,
Pair.of("1", "{ \"a\":1}"));
+ assertEquals(request.index(), index);
+ }
+ try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
+ .setElasticSearchUrl("http://" +
container.getHttpHostAddress()))) {
+ IndexRequest request = client.makeIndexRequest(record,
Pair.of("1", "{ \"a\":1}"));
+ assertEquals(request.index(), topicName);
+ }
+ }
+
+ @Test
+ public void testDeleteRequest() throws Exception {
+ String index = "myindex-" + UUID.randomUUID();
+ Record<GenericObject> record = Mockito.mock(Record.class);
+ String topicName = "topic-" + UUID.randomUUID();
+ when(record.getTopicName()).thenReturn(Optional.of(topicName));
+ try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
+ .setElasticSearchUrl("http://" +
container.getHttpHostAddress())
+ .setIndexName(index))) {
+ DeleteRequest request = client.makeDeleteRequest(record, "1");
+ assertEquals(request.index(), index);
+ }
+ try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()
+ .setElasticSearchUrl("http://" +
container.getHttpHostAddress()))) {
+ DeleteRequest request = client.makeDeleteRequest(record, "1");
+ assertEquals(request.index(), topicName);
+ }
+ }
+
+ @Test
public void testIndexDelete() throws Exception {
String index = "myindex-" + UUID.randomUUID();
try (ElasticSearchClient client = new ElasticSearchClient(new
ElasticSearchConfig()