This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a165bda1d03 [fix][io] ElasticSearch sink: align null fields behaviour 
(#18577)
a165bda1d03 is described below

commit a165bda1d03e370f5efe1173134fb94e12b584b5
Author: Nicolò Boschi <boschi1...@gmail.com>
AuthorDate: Wed Nov 23 14:04:37 2022 +0100

    [fix][io] ElasticSearch sink: align null fields behaviour (#18577)
---
 pom.xml                                            |  4 +--
 .../elastic/ElasticSearchJavaRestClient.java       | 12 ++++----
 .../io/elasticsearch/ElasticSearchClientTests.java | 35 +++++++++++++++++++---
 .../io/elasticsearch/ElasticSearchTestBase.java    |  4 +--
 .../io/sinks/ElasticSearch7SinkTester.java         |  7 ++++-
 .../io/sinks/ElasticSearch8SinkTester.java         |  7 ++++-
 .../integration/io/sinks/OpenSearchSinkTester.java |  6 +++-
 7 files changed, 59 insertions(+), 16 deletions(-)

diff --git a/pom.xml b/pom.xml
index f67a61f8958..d02329a841d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -176,7 +176,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <hdfs-offload-version3>3.3.3</hdfs-offload-version3>
     <json-smart.version>2.4.7</json-smart.version>
     <opensearch.version>1.2.4</opensearch.version>
-    <elasticsearch-java.version>8.1.0</elasticsearch-java.version>
+    <elasticsearch-java.version>8.5.2</elasticsearch-java.version>
     <trino.version>363</trino.version>
     <scala.binary.version>2.13</scala.binary.version>
     <debezium.version>1.9.7.Final</debezium.version>
@@ -236,7 +236,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <netty-reactive-streams.version>2.0.6</netty-reactive-streams.version>
 
     <!-- test dependencies -->
-    <testcontainers.version>1.17.2</testcontainers.version>
+    <testcontainers.version>1.17.6</testcontainers.version>
     <hamcrest.version>2.2</hamcrest.version>
 
     <!-- Set docker-java.version to the version of docker-java used in 
Testcontainers -->
diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
index 50876704ff2..4749ea2e2d3 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java
@@ -36,7 +36,9 @@ import 
co.elastic.clients.elasticsearch.indices.RefreshRequest;
 import co.elastic.clients.json.jackson.JacksonJsonpMapper;
 import co.elastic.clients.transport.ElasticsearchTransport;
 import co.elastic.clients.transport.rest_client.RestClientTransport;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
 import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.Map;
@@ -53,8 +55,9 @@ import org.elasticsearch.client.RestClientBuilder;
 public class ElasticSearchJavaRestClient extends RestClient {
 
     private final ElasticsearchClient client;
-    private final ObjectMapper objectMapper = new ObjectMapper();
-
+    private final ObjectMapper objectMapper = new ObjectMapper()
+            .configure(SerializationFeature.INDENT_OUTPUT, false)
+            .setSerializationInclusion(JsonInclude.Include.ALWAYS);
     private BulkProcessor bulkProcessor;
     private ElasticsearchTransport transport;
 
@@ -87,8 +90,7 @@ public class ElasticSearchJavaRestClient extends RestClient {
                         log.warn("Node host={} failed", node.getHost());
                     }
                 });
-        transport = new RestClientTransport(builder.build(),
-                new JacksonJsonpMapper());
+        transport = new RestClientTransport(builder.build(), new 
JacksonJsonpMapper(objectMapper));
         client = new ElasticsearchClient(transport);
         if (elasticSearchConfig.isBulkEnabled()) {
             bulkProcessor = new ElasticBulkProcessor(elasticSearchConfig, 
client, bulkProcessorListener);
@@ -117,7 +119,7 @@ public class ElasticSearchJavaRestClient extends RestClient 
{
                 .build();
         try {
             final CreateIndexResponse createIndexResponse = 
client.indices().create(createIndexRequest);
-            if ((createIndexResponse.acknowledged() != null && 
createIndexResponse.acknowledged())
+            if ((createIndexResponse.acknowledged())
                     && createIndexResponse.shardsAcknowledged()) {
                 return true;
             }
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
index dc3ca8c34e7..6d9928c0426 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.assertThrows;
 import static org.testng.Assert.assertTrue;
 import eu.rekawek.toxiproxy.model.ToxicDirection;
 import java.io.IOException;
+import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.atomic.LongAdder;
@@ -193,7 +194,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
     @Test
     public void testTopicToIndexName() throws IOException {
         try (ElasticSearchClient client = new ElasticSearchClient(new 
ElasticSearchConfig()
-                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())); ) {
+                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress()));) {
             assertEquals(client.topicToIndexName("data-ks1.table1"), 
"data-ks1.table1");
             
assertEquals(client.topicToIndexName("persistent://public/default/testesjson"), 
"testesjson");
             assertEquals(client.topicToIndexName("default/testesjson"), 
"testesjson");
@@ -211,7 +212,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
     public void testMalformedDocFails() throws Exception {
         String index = "indexmalformed-" + UUID.randomUUID();
         ElasticSearchConfig config = new ElasticSearchConfig()
-                .setElasticSearchUrl("http://"+container.getHttpHostAddress())
+                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
                 .setIndexName(index)
                 .setBulkEnabled(true)
                 .setBulkFlushIntervalInMs(-1L)
@@ -235,7 +236,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
     public void testMalformedDocIgnore() throws Exception {
         String index = "indexmalformed2-" + UUID.randomUUID();
         ElasticSearchConfig config = new ElasticSearchConfig()
-                .setElasticSearchUrl("http://"+container.getHttpHostAddress())
+                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
                 .setIndexName(index)
                 .setBulkEnabled(true)
                 .setBulkFlushIntervalInMs(-1)
@@ -366,7 +367,7 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
     public void testBulkIndexAndDelete() throws Exception {
         final String index = "indexbulktest-" + UUID.randomUUID();
         ElasticSearchConfig config = new ElasticSearchConfig()
-                .setElasticSearchUrl("http://"+container.getHttpHostAddress())
+                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
                 .setIndexName(index)
                 .setBulkEnabled(true)
                 .setBulkActions(10)
@@ -389,4 +390,30 @@ public abstract class ElasticSearchClientTests extends 
ElasticSearchTestBase {
         }
     }
 
+    @Test
+    public void testIndexKeepNulls() throws Exception {
+        final String index = "indexnulls";
+        ElasticSearchConfig config = new ElasticSearchConfig()
+                .setElasticSearchUrl("http://"; + 
container.getHttpHostAddress())
+                .setIndexName(index);
+
+        try (ElasticSearchClient client = new ElasticSearchClient(config)) {
+            MockRecord<GenericObject> mockRecord = new MockRecord<>();
+            client.indexDocument(mockRecord, Pair.of("key0", 
"{\"a\":1,\"b\":null}"));
+            final Map<String, Object> sourceAsMap;
+            if (elasticImageName.equals(ELASTICSEARCH_8)) {
+                final ElasticSearchJavaRestClient restClient = 
(ElasticSearchJavaRestClient) client.getRestClient();
+                sourceAsMap =
+                        restClient.search(index, 
"*:*").hits().hits().get(0).source();
+            } else {
+                final OpenSearchHighLevelRestClient restClient = 
(OpenSearchHighLevelRestClient) client.getRestClient();
+                sourceAsMap =
+                        restClient.search(index, 
"*:*").getHits().getHits()[0].getSourceAsMap();
+            }
+            assertEquals(sourceAsMap.get("a"), 1);
+            assertTrue(sourceAsMap.containsKey("b"));
+            assertNull(sourceAsMap.get("b"));
+        }
+    }
+
 }
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
index 5f2bb1e5d75..4c6fd020fa3 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java
@@ -39,10 +39,10 @@ import org.testcontainers.utility.DockerImageName;
 public abstract class ElasticSearchTestBase {
 
     public static final String ELASTICSEARCH_8 = 
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
-            .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.1.0");
+            .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
 
     public static final String ELASTICSEARCH_7 = 
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
-            
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64");
+            .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
 
     public static final String OPENSEARCH = 
Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
             .orElse("opensearchproject/opensearch:1.2.4");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
index 17c17e7f496..65b38c677bf 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java
@@ -18,18 +18,23 @@
  */
 package org.apache.pulsar.tests.integration.io.sinks;
 
+import java.util.Optional;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 
 public class ElasticSearch7SinkTester extends ElasticSearchSinkTester {
 
+    public static final String ELASTICSEARCH_7 = 
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
+            .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
+
+
     public ElasticSearch7SinkTester(boolean schemaEnable) {
         super(schemaEnable);
     }
 
     @Override
     protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
-        return new 
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64")
+        return new ElasticsearchContainer(ELASTICSEARCH_7)
                 .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
     }
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
index 02990ae4533..bb52c4ff03f 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java
@@ -18,18 +18,23 @@
  */
 package org.apache.pulsar.tests.integration.io.sinks;
 
+import java.util.Optional;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testcontainers.elasticsearch.ElasticsearchContainer;
 
 public class ElasticSearch8SinkTester extends ElasticSearchSinkTester {
 
+    public static final String ELASTICSEARCH_8 = 
Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
+            .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
+
+
     public ElasticSearch8SinkTester(boolean schemaEnable) {
         super(schemaEnable);
     }
 
     @Override
     protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
-        return new 
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:8.1.0")
+        return new ElasticsearchContainer(ELASTICSEARCH_8)
                 .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
                 .withEnv("xpack.security.enabled", "false")
                 .withEnv("xpack.security.http.ssl.enabled", "false");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
index 12caddb52e0..1e10cc4189c 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.tests.integration.io.sinks;
 
+import java.util.Optional;
 import org.apache.http.HttpHost;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.awaitility.Awaitility;
@@ -36,6 +37,9 @@ import static org.testng.Assert.assertTrue;
 
 public class OpenSearchSinkTester extends ElasticSearchSinkTester {
 
+    public static final String OPENSEARCH = 
Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
+            .orElse("opensearchproject/opensearch:1.2.4");
+
     private RestHighLevelClient elasticClient;
 
 
@@ -45,7 +49,7 @@ public class OpenSearchSinkTester extends 
ElasticSearchSinkTester {
 
     @Override
     protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
-        DockerImageName dockerImageName = 
DockerImageName.parse("opensearchproject/opensearch:1.2.4")
+        DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
                 
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
         return new ElasticsearchContainer(dockerImageName)
                 .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")

Reply via email to