This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 60ecc425ec0 [fix] [io] elastic-search sink connector not support 
JSON.String schema. (#20741)
60ecc425ec0 is described below

commit 60ecc425ec097c033e4624a732f13f2aff19f543
Author: Baodi Shi <[email protected]>
AuthorDate: Thu Jul 13 22:40:23 2023 +0800

    [fix] [io] elastic-search sink connector not support JSON.String schema. 
(#20741)
---
 .../pulsar/io/elasticsearch/ElasticSearchSink.java | 16 ++++-
 .../io/elasticsearch/ElasticSearchSinkTests.java   | 80 ++++++++++++++++++++++
 2 files changed, 94 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
index 70c861825d0..c7d2c365632 100644
--- 
a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
+++ 
b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
@@ -353,10 +353,22 @@ public class ElasticSearchSink implements 
Sink<GenericObject> {
         return node;
     }
 
-    public static JsonNode extractJsonNode(Schema<?> schema, Object val) {
+    public JsonNode extractJsonNode(Schema<?> schema, Object val) throws 
JsonProcessingException {
+        if (val == null) {
+            return null;
+        }
         switch (schema.getSchemaInfo().getType()) {
             case JSON:
-                return (JsonNode) ((GenericRecord) val).getNativeObject();
+                Object nativeObject = ((GenericRecord) val).getNativeObject();
+                if (nativeObject instanceof String) {
+                    try {
+                        return objectMapper.readTree((String) nativeObject);
+                    } catch (JsonProcessingException e) {
+                        log.error("Failed to read JSON string: {}", 
nativeObject, e);
+                        throw e;
+                    }
+                }
+                return (JsonNode) nativeObject;
             case AVRO:
                 org.apache.avro.generic.GenericRecord node = 
(org.apache.avro.generic.GenericRecord)
                         ((GenericRecord) val).getNativeObject();
diff --git 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
index 4975217c620..cc1da10dece 100644
--- 
a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
+++ 
b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSinkTests.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.io.elasticsearch;
 
 import co.elastic.clients.transport.ElasticsearchTransport;
+import com.fasterxml.jackson.core.JsonParseException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -251,6 +252,85 @@ public abstract class ElasticSearchSinkTests extends 
ElasticSearchTestBase {
         verify(mockRecord, times(1)).ack();
     }
 
+    @Test
+    public final void sendJsonStringSchemaTest() throws Exception {
+
+        when(mockRecord.getMessage()).thenAnswer(new 
Answer<Optional<Message<String>>>() {
+            @Override
+            public Optional<Message<String>> answer(InvocationOnMock 
invocation) throws Throwable {
+                final MessageImpl mock = mock(MessageImpl.class);
+                
when(mock.getData()).thenReturn("{\"a\":1}".getBytes(StandardCharsets.UTF_8));
+                return Optional.of(mock);
+            }
+        });
+
+        when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+            public Optional<String> answer(InvocationOnMock invocation) throws 
Throwable {
+                return Optional.empty();
+            }
+        });
+
+        GenericRecord genericRecord = mock(GenericRecord.class);
+        when(genericRecord.getNativeObject()).thenReturn("{\"a\":1}");
+        when(genericRecord.getSchemaType()).thenReturn(SchemaType.JSON);
+        when(mockRecord.getValue()).thenAnswer(new Answer<GenericRecord>() {
+            public GenericRecord answer(InvocationOnMock invocation) throws 
Throwable {
+                return genericRecord;
+            }
+        });
+
+        when(mockRecord.getSchema()).thenAnswer(new Answer<Schema>() {
+            public Schema answer(InvocationOnMock invocation) throws Throwable 
{
+                return Schema.JSON(String.class);
+            }
+        });
+
+        map.put("indexName", "test-index");
+        map.put("schemaEnable", "true");
+        sink.open(map, mockSinkContext);
+        sink.write(mockRecord);
+        verify(mockRecord, times(1)).ack();
+    }
+
+    @Test(expectedExceptions = JsonParseException.class)
+    public final void sendJsonStringSchemaErrorTest() throws Exception {
+
+        when(mockRecord.getMessage()).thenAnswer(new 
Answer<Optional<Message<String>>>() {
+            @Override
+            public Optional<Message<String>> answer(InvocationOnMock 
invocation) throws Throwable {
+                final MessageImpl mock = mock(MessageImpl.class);
+                
when(mock.getData()).thenReturn("no-json-format".getBytes(StandardCharsets.UTF_8));
+                return Optional.of(mock);
+            }
+        });
+
+        when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+            public Optional<String> answer(InvocationOnMock invocation) throws 
Throwable {
+                return Optional.empty();
+            }
+        });
+
+        GenericRecord genericRecord = mock(GenericRecord.class);
+        when(genericRecord.getNativeObject()).thenReturn("no-json-format");
+        when(genericRecord.getSchemaType()).thenReturn(SchemaType.JSON);
+        when(mockRecord.getValue()).thenAnswer(new Answer<GenericRecord>() {
+            public GenericRecord answer(InvocationOnMock invocation) throws 
Throwable {
+                return genericRecord;
+            }
+        });
+
+        when(mockRecord.getSchema()).thenAnswer(new Answer<Schema>() {
+            public Schema answer(InvocationOnMock invocation) throws Throwable 
{
+                return Schema.JSON(String.class);
+            }
+        });
+
+        map.put("indexName", "test-index");
+        map.put("schemaEnable", "true");
+        sink.open(map, mockSinkContext);
+        sink.write(mockRecord);
+    }
+
     @Test(enabled = true)
     public final void sendKeyIgnoreSingleField() throws Exception {
         final String index = "testkeyignore";

Reply via email to