This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new efcedf6b0d4 [fix][io] Fix es index creation (#22654)
efcedf6b0d4 is described below
commit efcedf6b0d4217db7e47efef3420eb61da282c50
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon May 6 20:35:51 2024 +0800
[fix][io] Fix es index creation (#22654)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../client/elastic/ElasticSearchJavaRestClient.java | 4 ++--
.../pulsar/io/elasticsearch/ElasticSearchSinkTests.java | 13 +++++++++++++
2 files changed, 15 insertions(+), 2 deletions(-)
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index afda5ba0e74..133daa8cd6a 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -144,7 +144,7 @@ public class ElasticSearchJavaRestClient extends RestClient
{
public boolean deleteDocument(String index, String documentId) throws
IOException {
final DeleteRequest req = new
DeleteRequest.Builder()
- .index(config.getIndexName())
+ .index(index)
.id(documentId)
.build();
@@ -156,7 +156,7 @@ public class ElasticSearchJavaRestClient extends RestClient
{
public boolean indexDocument(String index, String documentId, String
documentSource) throws IOException {
final Map mapped = objectMapper.readValue(documentSource, Map.class);
final IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
- .index(config.getIndexName())
+ .index(index)
.document(mapped)
.id(documentId)
.build();
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 9a2cb4ab565..f1da6fd0c7e 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import co.elastic.clients.transport.ElasticsearchTransport;
import com.fasterxml.jackson.core.JsonParseException;
@@ -43,6 +44,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericObject;
@@ -152,6 +154,7 @@ public abstract class ElasticSearchSinkTests extends
ElasticSearchTestBase {
});
when(mockRecord.getSchema()).thenAnswer((Answer<Schema<KeyValue<String,
UserProfile>>>) invocation -> kvSchema);
+ when(mockRecord.getEventTime()).thenAnswer(invocation ->
Optional.of(System.currentTimeMillis()));
}
@AfterMethod(alwaysRun = true)
@@ -209,6 +212,16 @@ public abstract class ElasticSearchSinkTests extends
ElasticSearchTestBase {
verify(mockRecord, times(100)).ack();
}
+ @Test
+ public final void send1WithFormattedIndexTest() throws Exception {
+ map.put("indexName", "test-formatted-index-%{+yyyy-MM-dd}");
+ sink.open(map, mockSinkContext);
+ send(1);
+ verify(mockRecord, times(1)).ack();
+ String value = getHitIdAtIndex("test-formatted-index-*", 0);
+ assertTrue(StringUtils.isNotBlank(value));
+ }
+
@Test
public final void sendNoSchemaTest() throws Exception {