This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 925267502fc [fix][io] Fix KinesisSink json flattening for AVRO's 
SchemaType.BYTES (#24132)
925267502fc is described below

commit 925267502fcb6c2d446a6aa0e73e02ccc8b7a762
Author: Christophe Bornet <cbor...@hotmail.com>
AuthorDate: Fri Mar 28 10:59:26 2025 +0100

    [fix][io] Fix KinesisSink json flattening for AVRO's SchemaType.BYTES 
(#24132)
---
 .../java/org/apache/pulsar/io/kinesis/Utils.java   | 10 +++++-----
 .../pulsar/io/kinesis/json/JsonConverter.java      | 23 +++++++++++++++-------
 .../org/apache/pulsar/io/kinesis/UtilsTest.java    | 20 +++++++++++++------
 .../pulsar/io/kinesis/json/JsonConverterTests.java |  4 ++--
 4 files changed, 37 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index 32d34705248..a52e42e9c6a 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -222,7 +222,7 @@ public class Utils {
         JsonRecord jsonRecord = new JsonRecord();
         GenericObject value = record.getValue();
         if (value != null) {
-            jsonRecord.setPayload(toJsonSerializable(record.getSchema(), 
value.getNativeObject()));
+            jsonRecord.setPayload(toJsonSerializable(record.getSchema(), 
value.getNativeObject(), flatten));
         }
         record.getKey().ifPresent(jsonRecord::setKey);
         record.getTopicName().ifPresent(jsonRecord::setTopicName);
@@ -242,7 +242,7 @@ public class Utils {
                 .orElseThrow(() -> new IllegalArgumentException("Record does 
not carry message information"));
     }
 
-    private static Object toJsonSerializable(Schema<?> schema, Object val) {
+    private static Object toJsonSerializable(Schema<?> schema, Object val, 
boolean convertBytesToString) {
         if (schema == null || schema.getSchemaInfo().getType().isPrimitive()) {
             return val;
         }
@@ -254,15 +254,15 @@ public class Utils {
                 Map<String, Object> jsonKeyValue = new HashMap<>();
                 if (keyValue.getKey() != null) {
                     jsonKeyValue.put("key", 
toJsonSerializable(keyValueSchema.getKeySchema(),
-                            keyValue.getKey().getNativeObject()));
+                            keyValue.getKey().getNativeObject(), 
convertBytesToString));
                 }
                 if (keyValue.getValue() != null) {
                     jsonKeyValue.put("value", 
toJsonSerializable(keyValueSchema.getValueSchema(),
-                            keyValue.getValue().getNativeObject()));
+                            keyValue.getValue().getNativeObject(), 
convertBytesToString));
                 }
                 return jsonKeyValue;
             case AVRO:
-                return 
JsonConverter.toJson((org.apache.avro.generic.GenericRecord) val);
+                return 
JsonConverter.toJson((org.apache.avro.generic.GenericRecord) val, 
convertBytesToString);
             case JSON:
                 return val;
             default:
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java
index 5308971e8d2..22412c39575 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java
@@ -26,6 +26,7 @@ import java.math.BigDecimal;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalTime;
+import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -45,18 +46,18 @@ public class JsonConverter {
     private static final Map<String, LogicalTypeConverter<?>> 
logicalTypeConverters = new HashMap<>();
     private static final JsonNodeFactory jsonNodeFactory = 
JsonNodeFactory.withExactBigDecimals(true);
 
-    public static JsonNode toJson(GenericRecord genericRecord) {
+    public static JsonNode toJson(GenericRecord genericRecord, boolean 
convertBytesToString) {
         if (genericRecord == null) {
             return null;
         }
         ObjectNode objectNode = jsonNodeFactory.objectNode();
         for (Schema.Field field : genericRecord.getSchema().getFields()) {
-            objectNode.set(field.name(), toJson(field.schema(), 
genericRecord.get(field.name())));
+            objectNode.set(field.name(), toJson(field.schema(), 
genericRecord.get(field.name()), convertBytesToString));
         }
         return objectNode;
     }
 
-    public static JsonNode toJson(Schema schema, Object value) {
+    public static JsonNode toJson(Schema schema, Object value, boolean 
convertBytesToString) {
         if (schema.getLogicalType() != null && 
logicalTypeConverters.containsKey(schema.getLogicalType().getName())) {
             return 
logicalTypeConverters.get(schema.getLogicalType().getName()).toJson(schema, 
value);
         }
@@ -77,8 +78,16 @@ public class JsonConverter {
             case BOOLEAN:
                 return jsonNodeFactory.booleanNode((Boolean) value);
             case BYTES:
+                // Workaround for 
https://github.com/wnameless/json-flattener/issues/91
+                if (convertBytesToString) {
+                    return 
jsonNodeFactory.textNode(Base64.getEncoder().encodeToString((byte[]) value));
+                }
                 return jsonNodeFactory.binaryNode((byte[]) value);
             case FIXED:
+                // Workaround for 
https://github.com/wnameless/json-flattener/issues/91
+                if (convertBytesToString) {
+                    return 
jsonNodeFactory.textNode(Base64.getEncoder().encodeToString(((GenericFixed) 
value).bytes()));
+                }
                 return jsonNodeFactory.binaryNode(((GenericFixed) 
value).bytes());
             case ENUM: // GenericEnumSymbol
             case STRING:
@@ -93,7 +102,7 @@ public class JsonConverter {
                     iterable = (Object[]) value;
                 }
                 for (Object elem : iterable) {
-                    JsonNode fieldValue = toJson(elementSchema, elem);
+                    JsonNode fieldValue = toJson(elementSchema, elem, 
convertBytesToString);
                     arrayNode.add(fieldValue);
                 }
                 return arrayNode;
@@ -102,7 +111,7 @@ public class JsonConverter {
                 Map<Object, Object> map = (Map<Object, Object>) value;
                 ObjectNode objectNode = jsonNodeFactory.objectNode();
                 for (Map.Entry<Object, Object> entry : map.entrySet()) {
-                    JsonNode jsonNode = toJson(schema.getValueType(), 
entry.getValue());
+                    JsonNode jsonNode = toJson(schema.getValueType(), 
entry.getValue(), convertBytesToString);
                     // can be a String or org.apache.avro.util.Utf8
                     final String entryKey = entry.getKey() == null ? null : 
entry.getKey().toString();
                     objectNode.set(entryKey, jsonNode);
@@ -110,13 +119,13 @@ public class JsonConverter {
                 return objectNode;
             }
             case RECORD:
-                return toJson((GenericRecord) value);
+                return toJson((GenericRecord) value, convertBytesToString);
             case UNION:
                 for (Schema s : schema.getTypes()) {
                     if (s.getType() == Schema.Type.NULL) {
                         continue;
                     }
-                    return toJson(s, value);
+                    return toJson(s, value, convertBytesToString);
                 }
                 // this case should not happen
                 return jsonNodeFactory.textNode(value.toString());
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index b0f9456b948..1eda566df04 100644
--- 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -29,6 +29,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -358,6 +360,7 @@ public class UtilsTest {
         RecordSchemaBuilder udtSchemaBuilder = SchemaBuilder.record("type1");
         
udtSchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null);
         
udtSchemaBuilder.field("b").type(SchemaType.BOOLEAN).optional().defaultValue(null);
+        
udtSchemaBuilder.field("c").type(SchemaType.BYTES).optional().defaultValue(null);
         
udtSchemaBuilder.field("d").type(SchemaType.DOUBLE).optional().defaultValue(null);
         
udtSchemaBuilder.field("f").type(SchemaType.FLOAT).optional().defaultValue(null);
         
udtSchemaBuilder.field("i").type(SchemaType.INT32).optional().defaultValue(null);
@@ -366,12 +369,16 @@ public class UtilsTest {
         valueSchemaBuilder.field("e", 
udtGenericSchema).type(schemaType).optional().defaultValue(null);
         GenericSchema<GenericRecord> valueSchema = 
Schema.generic(valueSchemaBuilder.build(schemaType));
 
+        byte[] bytes = "10".getBytes(StandardCharsets.UTF_8);
         GenericRecord valueGenericRecord = valueSchema.newRecordBuilder()
                 .set("c", "1")
                 .set("d", 1)
                 .set("e", udtGenericSchema.newRecordBuilder()
                         .set("a", "a")
                         .set("b", true)
+                        // There's a bug in json-flattener that doesn't handle 
byte[] fields correctly.
+                        // But since we use AUTO_CONSUME, we won't get byte[] 
fields for JSON schema anyway.
+                        .set("c", schemaType == SchemaType.AVRO ? bytes : 
Base64.getEncoder().encodeToString(bytes))
                         .set("d", 1.0)
                         .set("f", 1.0f)
                         .set("i", 1)
@@ -434,16 +441,17 @@ public class UtilsTest {
         String json = Utils.serializeRecordToJsonExpandingValue(objectMapper, 
genericObjectRecord, false);
 
         assertEquals(json, 
"{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\","
-                + 
"\"payload\":{\"value\":{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0,"
-                + 
"\"i\":1,\"l\":10}},\"key\":{\"a\":\"1\",\"b\":1}},\"properties\":{\"prop-key\":\"prop-value\"},"
-                + "\"eventTime\":1648502845803}");
+                + 
"\"payload\":{\"value\":{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"c\":\"MTA=\",\"d\":1.0,"
+                + 
"\"f\":1.0,\"i\":1,\"l\":10}},\"key\":{\"a\":\"1\",\"b\":1}},"
+                + 
"\"properties\":{\"prop-key\":\"prop-value\"},\"eventTime\":1648502845803}");
 
         json = Utils.serializeRecordToJsonExpandingValue(objectMapper, 
genericObjectRecord, true);
 
         assertEquals(json, 
"{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\",\"payload.value.c\":\"1\","
-                + 
"\"payload.value.d\":1,\"payload.value.e.a\":\"a\",\"payload.value.e.b\":true,\"payload.value.e"
-                + 
".d\":1.0,\"payload.value.e.f\":1.0,\"payload.value.e.i\":1,\"payload.value.e.l\":10,\"payload.key"
-                + 
".a\":\"1\",\"payload.key.b\":1,\"properties.prop-key\":\"prop-value\",\"eventTime\":1648502845803}");
+                + 
"\"payload.value.d\":1,\"payload.value.e.a\":\"a\",\"payload.value.e.b\":true,"
+                + 
"\"payload.value.e.c\":\"MTA=\",\"payload.value.e.d\":1.0,\"payload.value.e.f\":1.0,"
+                + 
"\"payload.value.e.i\":1,\"payload.value.e.l\":10,\"payload.key.a\":\"1\",\"payload.key.b\":1,"
+                + 
"\"properties.prop-key\":\"prop-value\",\"eventTime\":1648502845803}");
     }
 
     @Test(dataProvider = "schemaType")
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java
 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java
index 94fd73135b3..c3bbaa06d01 100644
--- 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java
@@ -81,7 +81,7 @@ public class JsonConverterTests {
         genericRecord.put("arrayavro", new 
GenericData.Array<>(avroArraySchema, Arrays.asList("toto")));
         genericRecord.put("map", ImmutableMap.of("a",10));
         genericRecord.put("maputf8", ImmutableMap.of(new 
org.apache.avro.util.Utf8("a"),10));
-        JsonNode jsonNode = JsonConverter.toJson(genericRecord);
+        JsonNode jsonNode = JsonConverter.toJson(genericRecord, false);
         assertEquals(jsonNode.get("n"), NullNode.getInstance());
         assertEquals(jsonNode.get("l").asLong(), 1L);
         assertEquals(jsonNode.get("i").asInt(), 1);
@@ -135,7 +135,7 @@ public class JsonConverterTests {
         genericRecord.put("myuuid", myUuid.toString());
 
         GenericRecord genericRecord2 = deserialize(serialize(genericRecord, 
schema), schema);
-        JsonNode jsonNode = JsonConverter.toJson(genericRecord2);
+        JsonNode jsonNode = JsonConverter.toJson(genericRecord2, false);
         assertEquals(jsonNode.get("mydate").asInt(), 
calendar.toInstant().getEpochSecond());
         assertEquals(jsonNode.get("tsmillis").asInt(), 
(int)calendar.getTimeInMillis());
         assertEquals(jsonNode.get("tsmicros").asLong(), 
calendar.getTimeInMillis() * 1000);

Reply via email to