This is an automated email from the ASF dual-hosted git repository. nicoloboschi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new a165bda1d03 [fix][io] ElasticSearch sink: align null fields behaviour (#18577) a165bda1d03 is described below commit a165bda1d03e370f5efe1173134fb94e12b584b5 Author: Nicolò Boschi <boschi1...@gmail.com> AuthorDate: Wed Nov 23 14:04:37 2022 +0100 [fix][io] ElasticSearch sink: align null fields behaviour (#18577) --- pom.xml | 4 +-- .../elastic/ElasticSearchJavaRestClient.java | 12 ++++---- .../io/elasticsearch/ElasticSearchClientTests.java | 35 +++++++++++++++++++--- .../io/elasticsearch/ElasticSearchTestBase.java | 4 +-- .../io/sinks/ElasticSearch7SinkTester.java | 7 ++++- .../io/sinks/ElasticSearch8SinkTester.java | 7 ++++- .../integration/io/sinks/OpenSearchSinkTester.java | 6 +++- 7 files changed, 59 insertions(+), 16 deletions(-) diff --git a/pom.xml b/pom.xml index f67a61f8958..d02329a841d 100644 --- a/pom.xml +++ b/pom.xml @@ -176,7 +176,7 @@ flexible messaging model and an intuitive client API.</description> <hdfs-offload-version3>3.3.3</hdfs-offload-version3> <json-smart.version>2.4.7</json-smart.version> <opensearch.version>1.2.4</opensearch.version> - <elasticsearch-java.version>8.1.0</elasticsearch-java.version> + <elasticsearch-java.version>8.5.2</elasticsearch-java.version> <trino.version>363</trino.version> <scala.binary.version>2.13</scala.binary.version> <debezium.version>1.9.7.Final</debezium.version> @@ -236,7 +236,7 @@ flexible messaging model and an intuitive client API.</description> <netty-reactive-streams.version>2.0.6</netty-reactive-streams.version> <!-- test dependencies --> - <testcontainers.version>1.17.2</testcontainers.version> + <testcontainers.version>1.17.6</testcontainers.version> <hamcrest.version>2.2</hamcrest.version> <!-- Set docker-java.version to the version of docker-java used in Testcontainers --> diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java index 50876704ff2..4749ea2e2d3 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/elastic/ElasticSearchJavaRestClient.java @@ -36,7 +36,9 @@ import co.elastic.clients.elasticsearch.indices.RefreshRequest; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.rest_client.RestClientTransport; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.util.Map; @@ -53,8 +55,9 @@ import org.elasticsearch.client.RestClientBuilder; public class ElasticSearchJavaRestClient extends RestClient { private final ElasticsearchClient client; - private final ObjectMapper objectMapper = new ObjectMapper(); - + private final ObjectMapper objectMapper = new ObjectMapper() + .configure(SerializationFeature.INDENT_OUTPUT, false) + .setSerializationInclusion(JsonInclude.Include.ALWAYS); private BulkProcessor bulkProcessor; private ElasticsearchTransport transport; @@ -87,8 +90,7 @@ public class ElasticSearchJavaRestClient extends RestClient { log.warn("Node host={} failed", node.getHost()); } }); - transport = new RestClientTransport(builder.build(), - new JacksonJsonpMapper()); + transport = new RestClientTransport(builder.build(), new JacksonJsonpMapper(objectMapper)); client = new ElasticsearchClient(transport); if (elasticSearchConfig.isBulkEnabled()) { bulkProcessor = new ElasticBulkProcessor(elasticSearchConfig, client, bulkProcessorListener); @@ -117,7 +119,7 @@ public class ElasticSearchJavaRestClient extends RestClient { .build(); try { final CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); - if ((createIndexResponse.acknowledged() != null && createIndexResponse.acknowledged()) + if ((createIndexResponse.acknowledged()) && createIndexResponse.shardsAcknowledged()) { return true; } diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java index dc3ca8c34e7..6d9928c0426 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClientTests.java @@ -29,6 +29,7 @@ import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; import eu.rekawek.toxiproxy.model.ToxicDirection; import java.io.IOException; +import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.atomic.LongAdder; @@ -193,7 +194,7 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase { @Test public void testTopicToIndexName() throws IOException { try (ElasticSearchClient client = new ElasticSearchClient(new ElasticSearchConfig() - .setElasticSearchUrl("http://" + container.getHttpHostAddress())); ) { + .setElasticSearchUrl("http://" + container.getHttpHostAddress()));) { assertEquals(client.topicToIndexName("data-ks1.table1"), "data-ks1.table1"); assertEquals(client.topicToIndexName("persistent://public/default/testesjson"), "testesjson"); assertEquals(client.topicToIndexName("default/testesjson"), "testesjson"); @@ -211,7 +212,7 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase { public void testMalformedDocFails() throws Exception { String index = "indexmalformed-" + UUID.randomUUID(); ElasticSearchConfig config = new ElasticSearchConfig() - .setElasticSearchUrl("http://"+container.getHttpHostAddress()) + .setElasticSearchUrl("http://" + container.getHttpHostAddress()) .setIndexName(index) .setBulkEnabled(true) .setBulkFlushIntervalInMs(-1L) @@ -235,7 +236,7 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase { public void testMalformedDocIgnore() throws Exception { String index = "indexmalformed2-" + UUID.randomUUID(); ElasticSearchConfig config = new ElasticSearchConfig() - .setElasticSearchUrl("http://"+container.getHttpHostAddress()) + .setElasticSearchUrl("http://" + container.getHttpHostAddress()) .setIndexName(index) .setBulkEnabled(true) .setBulkFlushIntervalInMs(-1) @@ -366,7 +367,7 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase { public void testBulkIndexAndDelete() throws Exception { final String index = "indexbulktest-" + UUID.randomUUID(); ElasticSearchConfig config = new ElasticSearchConfig() - .setElasticSearchUrl("http://"+container.getHttpHostAddress()) + .setElasticSearchUrl("http://" + container.getHttpHostAddress()) .setIndexName(index) .setBulkEnabled(true) .setBulkActions(10) @@ -389,4 +390,30 @@ public abstract class ElasticSearchClientTests extends ElasticSearchTestBase { } } + @Test + public void testIndexKeepNulls() throws Exception { + final String index = "indexnulls"; + ElasticSearchConfig config = new ElasticSearchConfig() + .setElasticSearchUrl("http://" + container.getHttpHostAddress()) + .setIndexName(index); + + try (ElasticSearchClient client = new ElasticSearchClient(config)) { + MockRecord<GenericObject> mockRecord = new MockRecord<>(); + client.indexDocument(mockRecord, Pair.of("key0", "{\"a\":1,\"b\":null}")); + final Map<String, Object> sourceAsMap; + if (elasticImageName.equals(ELASTICSEARCH_8)) { + final ElasticSearchJavaRestClient restClient = (ElasticSearchJavaRestClient) client.getRestClient(); + sourceAsMap = + restClient.search(index, "*:*").hits().hits().get(0).source(); + } else { + final OpenSearchHighLevelRestClient restClient = (OpenSearchHighLevelRestClient) client.getRestClient(); + sourceAsMap = + restClient.search(index, "*:*").getHits().getHits()[0].getSourceAsMap(); + } + assertEquals(sourceAsMap.get("a"), 1); + assertTrue(sourceAsMap.containsKey("b")); + assertNull(sourceAsMap.get("b")); + } + } + } diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java index 5f2bb1e5d75..4c6fd020fa3 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchTestBase.java @@ -39,10 +39,10 @@ import org.testcontainers.utility.DockerImageName; public abstract class ElasticSearchTestBase { public static final String ELASTICSEARCH_8 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8")) - .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.1.0"); + .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1"); public static final String ELASTICSEARCH_7 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7")) - .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64"); + .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7"); public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE")) .orElse("opensearchproject/opensearch:1.2.4"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java index 17c17e7f496..65b38c677bf 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch7SinkTester.java @@ -18,18 +18,23 @@ */ package org.apache.pulsar.tests.integration.io.sinks; +import java.util.Optional; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.elasticsearch.ElasticsearchContainer; public class ElasticSearch7SinkTester extends ElasticSearchSinkTester { + public static final String ELASTICSEARCH_7 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7")) + .orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7"); + + public ElasticSearch7SinkTester(boolean schemaEnable) { super(schemaEnable); } @Override protected ElasticsearchContainer createSinkService(PulsarCluster cluster) { - return new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.16.3-amd64") + return new ElasticsearchContainer(ELASTICSEARCH_7) .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m"); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java index 02990ae4533..bb52c4ff03f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/ElasticSearch8SinkTester.java @@ -18,18 +18,23 @@ */ package org.apache.pulsar.tests.integration.io.sinks; +import java.util.Optional; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.elasticsearch.ElasticsearchContainer; public class ElasticSearch8SinkTester extends ElasticSearchSinkTester { + public static final String ELASTICSEARCH_8 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8")) + .orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1"); + + public ElasticSearch8SinkTester(boolean schemaEnable) { super(schemaEnable); } @Override protected ElasticsearchContainer createSinkService(PulsarCluster cluster) { - return new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:8.1.0") + return new ElasticsearchContainer(ELASTICSEARCH_8) .withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m") .withEnv("xpack.security.enabled", "false") .withEnv("xpack.security.http.ssl.enabled", "false"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java index 12caddb52e0..1e10cc4189c 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/OpenSearchSinkTester.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.tests.integration.io.sinks; +import java.util.Optional; import org.apache.http.HttpHost; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.awaitility.Awaitility; @@ -36,6 +37,9 @@ import static org.testng.Assert.assertTrue; public class OpenSearchSinkTester extends ElasticSearchSinkTester { + public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE")) + .orElse("opensearchproject/opensearch:1.2.4"); + private RestHighLevelClient elasticClient; @@ -45,7 +49,7 @@ public class OpenSearchSinkTester extends ElasticSearchSinkTester { @Override protected ElasticsearchContainer createSinkService(PulsarCluster cluster) { - DockerImageName dockerImageName = DockerImageName.parse("opensearchproject/opensearch:1.2.4") + DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH) .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch"); return new ElasticsearchContainer(dockerImageName) .withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")