This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new efcedf6b0d4 [fix][io] Fix es index creation (#22654)
efcedf6b0d4 is described below

commit efcedf6b0d4217db7e47efef3420eb61da282c50
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon May 6 20:35:51 2024 +0800

    [fix][io] Fix es index creation (#22654)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../client/elastic/ElasticSearchJavaRestClient.java         |  4 ++--
 .../pulsar/io/elasticsearch/ElasticSearchSinkTests.java     | 13 +++++++++++++
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index afda5ba0e74..133daa8cd6a 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -144,7 +144,7 @@ public class ElasticSearchJavaRestClient extends RestClient 
{
     public boolean deleteDocument(String index, String documentId) throws 
IOException {
         final DeleteRequest req = new
                 DeleteRequest.Builder()
-                .index(config.getIndexName())
+                .index(index)
                 .id(documentId)
                 .build();
 
@@ -156,7 +156,7 @@ public class ElasticSearchJavaRestClient extends RestClient 
{
     public boolean indexDocument(String index, String documentId, String 
documentSource) throws IOException {
         final Map mapped = objectMapper.readValue(documentSource, Map.class);
         final IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
-                .index(config.getIndexName())
+                .index(index)
                 .document(mapped)
                 .id(documentId)
                 .build();
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 9a2cb4ab565..f1da6fd0c7e 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 import co.elastic.clients.transport.ElasticsearchTransport;
 import com.fasterxml.jackson.core.JsonParseException;
@@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.GenericObject;
@@ -152,6 +154,7 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
         });
 
         
when(mockRecord.getSchema()).thenAnswer((Answer<Schema<KeyValue<String, 
UserProfile>>>) invocation -> kvSchema);
+        when(mockRecord.getEventTime()).thenAnswer(invocation -> 
Optional.of(System.currentTimeMillis()));
     }
 
     @AfterMethod(alwaysRun = true)
@@ -209,6 +212,16 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
         verify(mockRecord, times(100)).ack();
     }
 
+    @Test
+    public final void send1WithFormattedIndexTest() throws Exception {
+        map.put("indexName", "test-formatted-index-%{+yyyy-MM-dd}");
+        sink.open(map, mockSinkContext);
+        send(1);
+        verify(mockRecord, times(1)).ack();
+        String value = getHitIdAtIndex("test-formatted-index-*", 0);
+        assertTrue(StringUtils.isNotBlank(value));
+    }
+
     @Test
     public final void sendNoSchemaTest() throws Exception {
 

Reply via email to