This is an automated email from the ASF dual-hosted git repository.
zixuan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f1daa75525c [fix][io] Fix es index creation (#22654) (#22701)
f1daa75525c is described below
commit f1daa75525c99bd6093d8ac28b6e635bdb9b8011
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon May 13 20:13:50 2024 +0800
[fix][io] Fix es index creation (#22654) (#22701)
Signed-off-by: Zixuan Liu <[email protected]>
---
.../elastic/ElasticSearchJavaRestClient.java | 4 +-
.../io/elasticsearch/ElasticSearchSinkTests.java | 53 +++++++++++++---------
2 files changed, 33 insertions(+), 24 deletions(-)
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index afda5ba0e74..133daa8cd6a 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -144,7 +144,7 @@ public class ElasticSearchJavaRestClient extends RestClient
{
public boolean deleteDocument(String index, String documentId) throws
IOException {
final DeleteRequest req = new
DeleteRequest.Builder()
- .index(config.getIndexName())
+ .index(index)
.id(documentId)
.build();
@@ -156,7 +156,7 @@ public class ElasticSearchJavaRestClient extends RestClient
{
public boolean indexDocument(String index, String documentId, String
documentSource) throws IOException {
final Map mapped = objectMapper.readValue(documentSource, Map.class);
final IndexRequest<Object> indexRequest = new IndexRequest.Builder<>()
- .index(config.getIndexName())
+ .index(index)
.document(mapped)
.id(documentId)
.build();
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 62592f5f09b..4a16caf3ede 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -18,39 +18,41 @@
*/
package org.apache.pulsar.io.elasticsearch;
-import co.elastic.clients.transport.ElasticsearchTransport;
-import com.fasterxml.jackson.core.JsonParseException;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.GenericObject;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.client.api.schema.GenericSchema;
-import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
-import org.apache.pulsar.client.api.schema.SchemaBuilder;
-import org.apache.pulsar.client.impl.MessageImpl;
-import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.common.schema.KeyValueEncodingType;
-import org.apache.pulsar.common.schema.SchemaType;
-
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
-
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import co.elastic.clients.transport.ElasticsearchTransport;
+import com.fasterxml.jackson.core.JsonParseException;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
import java.util.List;
import java.util.Locale;
-import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericObject;
+import org.apache.pulsar.client.api.schema.GenericRecord;
+import org.apache.pulsar.client.api.schema.GenericSchema;
+import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
+import org.apache.pulsar.client.api.schema.SchemaBuilder;
+import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.schema.KeyValue;
+import org.apache.pulsar.common.schema.KeyValueEncodingType;
+import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
@@ -72,10 +74,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.fail;
-
public abstract class ElasticSearchSinkTests extends ElasticSearchTestBase {
private static ElasticsearchContainer container;
@@ -149,6 +147,7 @@ public abstract class ElasticSearchSinkTests extends
ElasticSearchTestBase {
});
when(mockRecord.getSchema()).thenAnswer((Answer<Schema<KeyValue<String,
UserProfile>>>) invocation -> kvSchema);
+ when(mockRecord.getEventTime()).thenAnswer(invocation ->
Optional.of(System.currentTimeMillis()));
}
@AfterMethod(alwaysRun = true)
@@ -206,6 +205,16 @@ public abstract class ElasticSearchSinkTests extends
ElasticSearchTestBase {
verify(mockRecord, times(100)).ack();
}
+ @Test
+ public final void send1WithFormattedIndexTest() throws Exception {
+ map.put("indexName", "test-formatted-index-%{+yyyy-MM-dd}");
+ sink.open(map, mockSinkContext);
+ send(1);
+ verify(mockRecord, times(1)).ack();
+ String value = getHitIdAtIndex("test-formatted-index-*", 0);
+ assertTrue(StringUtils.isNotBlank(value));
+ }
+
@Test
public final void sendNoSchemaTest() throws Exception {