This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push:
new 60ecc425ec0 [fix] [io] elastic-search sink connector not support
JSON.String schema. (#20741)
60ecc425ec0 is described below
commit 60ecc425ec097c033e4624a732f13f2aff19f543
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jul 13 22:40:23 2023 +0800
[fix] [io] elastic-search sink connector not support JSON.String schema.
(#20741)
---
.../pulsar/io/elasticsearch/ElasticSearchSink.java | 16 ++++-
.../io/elasticsearch/ElasticSearchSinkTests.java | 80 ++++++++++++++++++++++
2 files changed, 94 insertions(+), 2 deletions(-)
diff --git
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 70c861825d0..c7d2c365632 100644
---
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -353,10 +353,22 @@ public class ElasticSearchSink implements
Sink<GenericObject> {
return node;
}
- public static JsonNode extractJsonNode(Schema<?> schema, Object val) {
+ public JsonNode extractJsonNode(Schema<?> schema, Object val) throws
JsonProcessingException {
+ if (val == null) {
+ return null;
+ }
switch (schema.getSchemaInfo().getType()) {
case JSON:
- return (JsonNode) ((GenericRecord) val).getNativeObject();
+ Object nativeObject = ((GenericRecord) val).getNativeObject();
+ if (nativeObject instanceof String) {
+ try {
+ return objectMapper.readTree((String) nativeObject);
+ } catch (JsonProcessingException e) {
+ log.error("Failed to read JSON string: {}",
nativeObject, e);
+ throw e;
+ }
+ }
+ return (JsonNode) nativeObject;
case AVRO:
org.apache.avro.generic.GenericRecord node =
(org.apache.avro.generic.GenericRecord)
((GenericRecord) val).getNativeObject();
diff --git
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 4975217c620..cc1da10dece 100644
---
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.elasticsearch;
import co.elastic.clients.transport.ElasticsearchTransport;
+import com.fasterxml.jackson.core.JsonParseException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
@@ -251,6 +252,85 @@ public abstract class ElasticSearchSinkTests extends
ElasticSearchTestBase {
verify(mockRecord, times(1)).ack();
}
+ @Test
+ public final void sendJsonStringSchemaTest() throws Exception {
+
+ when(mockRecord.getMessage()).thenAnswer(new
Answer<Optional<Message<String>>>() {
+ @Override
+ public Optional<Message<String>> answer(InvocationOnMock
invocation) throws Throwable {
+ final MessageImpl mock = mock(MessageImpl.class);
+
when(mock.getData()).thenReturn("{\"a\":1}".getBytes(StandardCharsets.UTF_8));
+ return Optional.of(mock);
+ }
+ });
+
+ when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+ public Optional<String> answer(InvocationOnMock invocation) throws
Throwable {
+ return Optional.empty();
+ }
+ });
+
+ GenericRecord genericRecord = mock(GenericRecord.class);
+ when(genericRecord.getNativeObject()).thenReturn("{\"a\":1}");
+ when(genericRecord.getSchemaType()).thenReturn(SchemaType.JSON);
+ when(mockRecord.getValue()).thenAnswer(new Answer<GenericRecord>() {
+ public GenericRecord answer(InvocationOnMock invocation) throws
Throwable {
+ return genericRecord;
+ }
+ });
+
+ when(mockRecord.getSchema()).thenAnswer(new Answer<Schema>() {
+ public Schema answer(InvocationOnMock invocation) throws Throwable
{
+ return Schema.JSON(String.class);
+ }
+ });
+
+ map.put("indexName", "test-index");
+ map.put("schemaEnable", "true");
+ sink.open(map, mockSinkContext);
+ sink.write(mockRecord);
+ verify(mockRecord, times(1)).ack();
+ }
+
+ @Test(expectedExceptions = JsonParseException.class)
+ public final void sendJsonStringSchemaErrorTest() throws Exception {
+
+ when(mockRecord.getMessage()).thenAnswer(new
Answer<Optional<Message<String>>>() {
+ @Override
+ public Optional<Message<String>> answer(InvocationOnMock
invocation) throws Throwable {
+ final MessageImpl mock = mock(MessageImpl.class);
+
when(mock.getData()).thenReturn("no-json-format".getBytes(StandardCharsets.UTF_8));
+ return Optional.of(mock);
+ }
+ });
+
+ when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+ public Optional<String> answer(InvocationOnMock invocation) throws
Throwable {
+ return Optional.empty();
+ }
+ });
+
+ GenericRecord genericRecord = mock(GenericRecord.class);
+ when(genericRecord.getNativeObject()).thenReturn("no-json-format");
+ when(genericRecord.getSchemaType()).thenReturn(SchemaType.JSON);
+ when(mockRecord.getValue()).thenAnswer(new Answer<GenericRecord>() {
+ public GenericRecord answer(InvocationOnMock invocation) throws
Throwable {
+ return genericRecord;
+ }
+ });
+
+ when(mockRecord.getSchema()).thenAnswer(new Answer<Schema>() {
+ public Schema answer(InvocationOnMock invocation) throws Throwable
{
+ return Schema.JSON(String.class);
+ }
+ });
+
+ map.put("indexName", "test-index");
+ map.put("schemaEnable", "true");
+ sink.open(map, mockSinkContext);
+ sink.write(mockRecord);
+ }
+
@Test(enabled = true)
public final void sendKeyIgnoreSingleField() throws Exception {
final String index = "testkeyignore";