This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f1daa75525c [fix][io] Fix es index creation (#22654) (#22701)
f1daa75525c is described below

commit f1daa75525c99bd6093d8ac28b6e635bdb9b8011
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon May 13 20:13:50 2024 +0800

    [fix][io] Fix es index creation (#22654) (#22701)
    
    Signed-off-by: Zixuan Liu <[email protected]>
---
 .../elastic/ElasticSearchJavaRestClient.java       |  4 +-
 .../io/elasticsearch/ElasticSearchSinkTests.java   | 53 +++++++++++++---------
 2 files changed, 33 insertions(+), 24 deletions(-)

diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index afda5ba0e74..133daa8cd6a 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -144,7 +144,7 @@ public class ElasticSearchJavaRestClient extends RestClient 
{
     public boolean deleteDocument(String index, String documentId) throws 
IOException {
         final DeleteRequest req = new
                 DeleteRequest.Builder()
-                .index(config.getIndexName())
+                .index(index)
                 .id(documentId)
                 .build();
 
@@ -156,7 +156,7 @@ public class ElasticSearchJavaRestClient extends RestClient 
{
     public boolean indexDocument(String index, String documentId, String 
documentSource) throws IOException {
         final Map mapped = objectMapper.readValue(documentSource, Map.class);
         final IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
-                .index(config.getIndexName())
+                .index(index)
                 .document(mapped)
                 .id(documentId)
                 .build();
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 62592f5f09b..4a16caf3ede 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -18,39 +18,41 @@
  */
 package org.apache.pulsar.io.elasticsearch;
 
-import co.elastic.clients.transport.ElasticsearchTransport;
-import com.fasterxml.jackson.core.JsonParseException;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.GenericObject;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericSchema;
-import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
-import org.apache.pulsar.client.api.schema.SchemaBuilder;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.common.schema.KeyValueEncodingType;
-import org.apache.pulsar.common.schema.SchemaType;
-
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
-
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import com.fasterxml.jackson.core.JsonParseException;
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
-import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
@@ -72,10 +74,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.fail;
-
 public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
 
     private static ElasticsearchContainer container;
@@ -149,6 +147,7 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
         });
 
         
when(mockRecord.getSchema()).thenAnswer((Answer<Schema<KeyValue<String, 
UserProfile>>>) invocation -> kvSchema);
+        when(mockRecord.getEventTime()).thenAnswer(invocation -> 
Optional.of(System.currentTimeMillis()));
     }
 
     @AfterMethod(alwaysRun = true)
@@ -206,6 +205,16 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
         verify(mockRecord, times(100)).ack();
     }
 
+    @Test
+    public final void send1WithFormattedIndexTest() throws Exception {
+        map.put("indexName", "test-formatted-index-%{+yyyy-MM-dd}");
+        sink.open(map, mockSinkContext);
+        send(1);
+        verify(mockRecord, times(1)).ack();
+        String value = getHitIdAtIndex("test-formatted-index-*", 0);
+        assertTrue(StringUtils.isNotBlank(value));
+    }
+
     @Test
     public final void sendNoSchemaTest() throws Exception {
 

Reply via email to