This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 925267502fc [fix][io] Fix KinesisSink json flattening for AVRO's SchemaType.BYTES (#24132) 925267502fc is described below commit 925267502fcb6c2d446a6aa0e73e02ccc8b7a762 Author: Christophe Bornet <cbor...@hotmail.com> AuthorDate: Fri Mar 28 10:59:26 2025 +0100 [fix][io] Fix KinesisSink json flattening for AVRO's SchemaType.BYTES (#24132) --- .../java/org/apache/pulsar/io/kinesis/Utils.java | 10 +++++----- .../pulsar/io/kinesis/json/JsonConverter.java | 23 +++++++++++++++------- .../org/apache/pulsar/io/kinesis/UtilsTest.java | 20 +++++++++++++------ .../pulsar/io/kinesis/json/JsonConverterTests.java | 4 ++-- 4 files changed, 37 insertions(+), 20 deletions(-) diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java index 32d34705248..a52e42e9c6a 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java @@ -222,7 +222,7 @@ public class Utils { JsonRecord jsonRecord = new JsonRecord(); GenericObject value = record.getValue(); if (value != null) { - jsonRecord.setPayload(toJsonSerializable(record.getSchema(), value.getNativeObject())); + jsonRecord.setPayload(toJsonSerializable(record.getSchema(), value.getNativeObject(), flatten)); } record.getKey().ifPresent(jsonRecord::setKey); record.getTopicName().ifPresent(jsonRecord::setTopicName); @@ -242,7 +242,7 @@ public class Utils { .orElseThrow(() -> new IllegalArgumentException("Record does not carry message information")); } - private static Object toJsonSerializable(Schema<?> schema, Object val) { + private static Object toJsonSerializable(Schema<?> schema, Object val, boolean convertBytesToString) { if (schema == null || schema.getSchemaInfo().getType().isPrimitive()) { return val; } @@ -254,15 +254,15 @@ public class Utils { Map<String, Object> jsonKeyValue = new HashMap<>(); if (keyValue.getKey() != null) { jsonKeyValue.put("key", toJsonSerializable(keyValueSchema.getKeySchema(), - keyValue.getKey().getNativeObject())); + keyValue.getKey().getNativeObject(), convertBytesToString)); } if (keyValue.getValue() != null) { jsonKeyValue.put("value", toJsonSerializable(keyValueSchema.getValueSchema(), - keyValue.getValue().getNativeObject())); + keyValue.getValue().getNativeObject(), convertBytesToString)); } return jsonKeyValue; case AVRO: - return JsonConverter.toJson((org.apache.avro.generic.GenericRecord) val); + return JsonConverter.toJson((org.apache.avro.generic.GenericRecord) val, convertBytesToString); case JSON: return val; default: diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java index 5308971e8d2..22412c39575 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/json/JsonConverter.java @@ -26,6 +26,7 @@ import java.math.BigDecimal; import java.time.Instant; import java.time.LocalDate; import java.time.LocalTime; +import java.util.Base64; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -45,18 +46,18 @@ public class JsonConverter { private static final Map<String, LogicalTypeConverter<?>> logicalTypeConverters = new HashMap<>(); private static final JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true); - public static JsonNode toJson(GenericRecord genericRecord) { + public static JsonNode toJson(GenericRecord genericRecord, boolean convertBytesToString) { if (genericRecord == null) { return null; } ObjectNode objectNode = jsonNodeFactory.objectNode(); for (Schema.Field field : genericRecord.getSchema().getFields()) { - objectNode.set(field.name(), toJson(field.schema(), genericRecord.get(field.name()))); + objectNode.set(field.name(), toJson(field.schema(), genericRecord.get(field.name()), convertBytesToString)); } return objectNode; } - public static JsonNode toJson(Schema schema, Object value) { + public static JsonNode toJson(Schema schema, Object value, boolean convertBytesToString) { if (schema.getLogicalType() != null && logicalTypeConverters.containsKey(schema.getLogicalType().getName())) { return logicalTypeConverters.get(schema.getLogicalType().getName()).toJson(schema, value); } @@ -77,8 +78,16 @@ public class JsonConverter { case BOOLEAN: return jsonNodeFactory.booleanNode((Boolean) value); case BYTES: + // Workaround for https://github.com/wnameless/json-flattener/issues/91 + if (convertBytesToString) { + return jsonNodeFactory.textNode(Base64.getEncoder().encodeToString((byte[]) value)); + } return jsonNodeFactory.binaryNode((byte[]) value); case FIXED: + // Workaround for https://github.com/wnameless/json-flattener/issues/91 + if (convertBytesToString) { + return jsonNodeFactory.textNode(Base64.getEncoder().encodeToString(((GenericFixed) value).bytes())); + } return jsonNodeFactory.binaryNode(((GenericFixed) value).bytes()); case ENUM: // GenericEnumSymbol case STRING: @@ -93,7 +102,7 @@ public class JsonConverter { iterable = (Object[]) value; } for (Object elem : iterable) { - JsonNode fieldValue = toJson(elementSchema, elem); + JsonNode fieldValue = toJson(elementSchema, elem, convertBytesToString); arrayNode.add(fieldValue); } return arrayNode; @@ -102,7 +111,7 @@ public class JsonConverter { Map<Object, Object> map = (Map<Object, Object>) value; ObjectNode objectNode = jsonNodeFactory.objectNode(); for (Map.Entry<Object, Object> entry : map.entrySet()) { - JsonNode jsonNode = toJson(schema.getValueType(), entry.getValue()); + JsonNode jsonNode = toJson(schema.getValueType(), entry.getValue(), convertBytesToString); // can be a String or org.apache.avro.util.Utf8 final String entryKey = entry.getKey() == null ? null : entry.getKey().toString(); objectNode.set(entryKey, jsonNode); @@ -110,13 +119,13 @@ public class JsonConverter { return objectNode; } case RECORD: - return toJson((GenericRecord) value); + return toJson((GenericRecord) value, convertBytesToString); case UNION: for (Schema s : schema.getTypes()) { if (s.getType() == Schema.Type.NULL) { continue; } - return toJson(s, value); + return toJson(s, value, convertBytesToString); } // this case should not happen return jsonNodeFactory.textNode(value.toString()); diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java index b0f9456b948..1eda566df04 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java @@ -29,6 +29,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.Gson; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -358,6 +360,7 @@ public class UtilsTest { RecordSchemaBuilder udtSchemaBuilder = SchemaBuilder.record("type1"); udtSchemaBuilder.field("a").type(SchemaType.STRING).optional().defaultValue(null); udtSchemaBuilder.field("b").type(SchemaType.BOOLEAN).optional().defaultValue(null); + udtSchemaBuilder.field("c").type(SchemaType.BYTES).optional().defaultValue(null); udtSchemaBuilder.field("d").type(SchemaType.DOUBLE).optional().defaultValue(null); udtSchemaBuilder.field("f").type(SchemaType.FLOAT).optional().defaultValue(null); udtSchemaBuilder.field("i").type(SchemaType.INT32).optional().defaultValue(null); @@ -366,12 +369,16 @@ public class UtilsTest { valueSchemaBuilder.field("e", udtGenericSchema).type(schemaType).optional().defaultValue(null); GenericSchema<GenericRecord> valueSchema = Schema.generic(valueSchemaBuilder.build(schemaType)); + byte[] bytes = "10".getBytes(StandardCharsets.UTF_8); GenericRecord valueGenericRecord = valueSchema.newRecordBuilder() .set("c", "1") .set("d", 1) .set("e", udtGenericSchema.newRecordBuilder() .set("a", "a") .set("b", true) + // There's a bug in json-flattener that doesn't handle byte[] fields correctly. + // But since we use AUTO_CONSUME, we won't get byte[] fields for JSON schema anyway. + .set("c", schemaType == SchemaType.AVRO ? bytes : Base64.getEncoder().encodeToString(bytes)) .set("d", 1.0) .set("f", 1.0f) .set("i", 1) @@ -434,16 +441,17 @@ public class UtilsTest { String json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, false); assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\"," - + "\"payload\":{\"value\":{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"d\":1.0,\"f\":1.0," - + "\"i\":1,\"l\":10}},\"key\":{\"a\":\"1\",\"b\":1}},\"properties\":{\"prop-key\":\"prop-value\"}," - + "\"eventTime\":1648502845803}"); + + "\"payload\":{\"value\":{\"c\":\"1\",\"d\":1,\"e\":{\"a\":\"a\",\"b\":true,\"c\":\"MTA=\",\"d\":1.0," + + "\"f\":1.0,\"i\":1,\"l\":10}},\"key\":{\"a\":\"1\",\"b\":1}}," + + "\"properties\":{\"prop-key\":\"prop-value\"},\"eventTime\":1648502845803}"); json = Utils.serializeRecordToJsonExpandingValue(objectMapper, genericObjectRecord, true); assertEquals(json, "{\"topicName\":\"data-ks1.table1\",\"key\":\"message-key\",\"payload.value.c\":\"1\"," - + "\"payload.value.d\":1,\"payload.value.e.a\":\"a\",\"payload.value.e.b\":true,\"payload.value.e" - + ".d\":1.0,\"payload.value.e.f\":1.0,\"payload.value.e.i\":1,\"payload.value.e.l\":10,\"payload.key" - + ".a\":\"1\",\"payload.key.b\":1,\"properties.prop-key\":\"prop-value\",\"eventTime\":1648502845803}"); + + "\"payload.value.d\":1,\"payload.value.e.a\":\"a\",\"payload.value.e.b\":true," + + "\"payload.value.e.c\":\"MTA=\",\"payload.value.e.d\":1.0,\"payload.value.e.f\":1.0," + + "\"payload.value.e.i\":1,\"payload.value.e.l\":10,\"payload.key.a\":\"1\",\"payload.key.b\":1," + + "\"properties.prop-key\":\"prop-value\",\"eventTime\":1648502845803}"); } @Test(dataProvider = "schemaType") diff --git a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java index 94fd73135b3..c3bbaa06d01 100644 --- a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java +++ b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/json/JsonConverterTests.java @@ -81,7 +81,7 @@ public class JsonConverterTests { genericRecord.put("arrayavro", new GenericData.Array<>(avroArraySchema, Arrays.asList("toto"))); genericRecord.put("map", ImmutableMap.of("a",10)); genericRecord.put("maputf8", ImmutableMap.of(new org.apache.avro.util.Utf8("a"),10)); - JsonNode jsonNode = JsonConverter.toJson(genericRecord); + JsonNode jsonNode = JsonConverter.toJson(genericRecord, false); assertEquals(jsonNode.get("n"), NullNode.getInstance()); assertEquals(jsonNode.get("l").asLong(), 1L); assertEquals(jsonNode.get("i").asInt(), 1); @@ -135,7 +135,7 @@ public class JsonConverterTests { genericRecord.put("myuuid", myUuid.toString()); GenericRecord genericRecord2 = deserialize(serialize(genericRecord, schema), schema); - JsonNode jsonNode = JsonConverter.toJson(genericRecord2); + JsonNode jsonNode = JsonConverter.toJson(genericRecord2, false); assertEquals(jsonNode.get("mydate").asInt(), calendar.toInstant().getEpochSecond()); assertEquals(jsonNode.get("tsmillis").asInt(), (int)calendar.getTimeInMillis()); assertEquals(jsonNode.get("tsmicros").asLong(), calendar.getTimeInMillis() * 1000);