hudi-agent commented on code in PR #18892:
URL: https://github.com/apache/hudi/pull/18892#discussion_r3431457562
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java:
##########
@@ -57,6 +58,21 @@ public void configure(Map<String, ?> configs, boolean isKey)
{
}
}
+ @Override
+ public Object deserialize(String topic, byte[] bytes) {
+ return this.deserialize(topic, false, bytes, sourceSchema);
+ }
+
+ @Override
+ public Object deserialize(String topic, byte[] bytes, Schema readerSchema) {
+ return this.deserialize(topic, false, bytes, sourceSchema);
+ }
+
+ @Override
+ public Object deserialize(String topic, Headers headers, byte[] bytes) {
+ return super.deserialize(topic, false, bytes, sourceSchema);
Review Comment:
🤖 Was the use of `super.deserialize(...)` here (vs `this.deserialize(...)`
in the two overloads above) intentional? It's functionally equivalent today
since the existing protected override just delegates to `super`, but the
inconsistency means this overload would silently bypass the protected override
if it ever gains additional logic (logging, metrics, retry, etc.). Mind
switching it to `this.deserialize(topic, false, bytes, sourceSchema)` for
consistency?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java:
##########
@@ -125,4 +129,205 @@ public void testKafkaAvroSchemaDeserializer() {
assertEquals(HoodieSchema.fromAvroSchema(actualRec.getSchema()),
evolSchema);
assertNull(genericRecord.get("age"));
}
+
+ private Schema loadSchemaFromResource(String resourcePath) throws
IOException {
+ try (InputStream is =
getClass().getClassLoader().getResourceAsStream(resourcePath)) {
+ return new Schema.Parser().parse(is);
+ }
+ }
+
+ private static Schema getRecordTypeFromUnion(Schema unionSchema) {
+ return unionSchema.getTypes().stream()
+ .filter(s -> s.getType() == Schema.Type.RECORD)
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("No record type in
union"));
+ }
+
+ private GenericRecord createCdcSourceRecord(Schema envelopeSchema) {
+ Schema sourceSchema = envelopeSchema.getField("source").schema();
+ GenericRecord source = new GenericData.Record(sourceSchema);
+ source.put("version", "2.3.0.Final");
+ source.put("connector", "mysql");
+ source.put("name", "cdc");
+ source.put("ts_ms", 1700000000000L);
+ source.put("snapshot", "false");
+ source.put("db", "testdb");
+ source.put("sequence", null);
+ source.put("table", "item");
+ source.put("server_id", 1L);
+ source.put("gtid", null);
+ source.put("file", "binlog.000001");
+ source.put("pos", 12345L);
+ source.put("row", 0);
+ source.put("thread", null);
+ source.put("query", null);
+ return source;
+ }
+
+ private GenericRecord createCdcValueRecord(Schema envelopeSchema) {
+ Schema valueSchema =
getRecordTypeFromUnion(envelopeSchema.getField("before").schema());
+ GenericRecord value = new GenericData.Record(valueSchema);
+ value.put("id", ByteBuffer.wrap(new byte[]{1, 2, 3, 4}));
+ value.put("account_id", 42);
+ value.put("title", "Test Item");
+ value.put("query", "test query");
+ value.put("request_query_id", null);
+ value.put("page", null);
+ value.put("request_page_id", null);
+ value.put("source_rank_id", null);
+ value.put("property_id", 100);
+ value.put("created", "2024-01-01T00:00:00Z");
+ value.put("created_by", 1);
+ value.put("updated", "2024-01-02T00:00:00Z");
+ value.put("updated_by", 2);
+ value.put("version_id", null);
+ value.put("related_urls", null);
+ value.put("assignee", 5);
+ value.put("status", "IN_PROGRESS");
+ value.put("tags", null);
+ value.put("deleted", false);
+ value.put("profile_id", null);
+ return value;
+ }
+
+ private GenericRecord createCdcValueRecordBefore(Schema envelopeSchema) {
+ Schema valueSchema =
getRecordTypeFromUnion(envelopeSchema.getField("before").schema());
+ GenericRecord value = new GenericData.Record(valueSchema);
+ value.put("id", ByteBuffer.wrap(new byte[]{1, 2, 3, 4}));
+ value.put("account_id", 42);
+ value.put("title", "Old Item Title");
+ value.put("query", "old query");
+ value.put("request_query_id", null);
+ value.put("page", "https://example.com/old");
+ value.put("request_page_id", null);
+ value.put("source_rank_id", 10);
+ value.put("property_id", 100);
+ value.put("created", "2024-01-01T00:00:00Z");
+ value.put("created_by", 1);
+ value.put("updated", "2024-01-02T00:00:00Z");
+ value.put("updated_by", 1);
+ value.put("version_id", "v1");
+ value.put("related_urls", "[\"https://example.com\"]");
+ value.put("assignee", 5);
+ value.put("status", "TO_DO");
+ value.put("tags", null);
+ value.put("deleted", false);
+ value.put("profile_id", null);
+ return value;
+ }
+
+ private GenericRecord createCdcEnvelopeRecord(Schema envelopeSchema,
GenericRecord beforeRecord, GenericRecord afterRecord) {
+ GenericRecord envelope = new GenericData.Record(envelopeSchema);
+ envelope.put("before", beforeRecord);
+ envelope.put("after", afterRecord);
+ envelope.put("source", createCdcSourceRecord(envelopeSchema));
+ envelope.put("op", "u");
+ envelope.put("ts_ms", 1700000000000L);
+ envelope.put("transaction", null);
+ return envelope;
+ }
+
+ /**
+ * Tests deserialization of a CDC (Debezium) envelope schema with schema
evolution
+ * across all deserialize method overloads. The old schema lacks 4 fields
+ * (notes, search_engine_id, locale_id, language_id) in the nested Value
record.
+ * When deserializing old records with the evolved schema, those new fields
should default to null.
+ *
+ * Exercises:
+ * - deserialize(String topic, Boolean isKey, byte[] payload, Schema
readerSchema)
+ * - deserialize(String topic, byte[] bytes)
+ * - deserialize(String topic, byte[] bytes, Schema readerSchema)
+ * - deserialize(String topic, Headers headers, byte[] bytes)
+ */
+ @Test
+ public void testKafkaAvroSchemaDeserializerWithCdcEnvelopeEvolution() throws
IOException {
+ Schema cdcOldSchema =
loadSchemaFromResource("schema/cdc_envelope_old.avsc");
+ Schema cdcNewSchema =
loadSchemaFromResource("schema/cdc_envelope_new.avsc");
+
+ // Create and serialize records with old schema (no
notes/search_engine_id/locale_id/language_id)
+ GenericRecord oldBefore = createCdcValueRecordBefore(cdcOldSchema);
+ GenericRecord oldAfter = createCdcValueRecord(cdcOldSchema);
+ GenericRecord oldEnvelope = createCdcEnvelopeRecord(cdcOldSchema,
oldBefore, oldAfter);
+ byte[] bytesOldRecord = avroSerializer.serialize(topic, oldEnvelope);
+
+ // Create and serialize records with evolved schema (new fields populated)
+ GenericRecord newBefore = createCdcValueRecordBefore(cdcNewSchema);
+ GenericRecord newAfter = createCdcValueRecord(cdcNewSchema);
+ newAfter.put("notes", "[{\"note\": \"test\"}]");
+ newAfter.put("search_engine_id", "[\"default\"]");
+ newAfter.put("locale_id", 1);
+ newAfter.put("language_id", 2);
+ GenericRecord newEnvelope = createCdcEnvelopeRecord(cdcNewSchema,
newBefore, newAfter);
+ byte[] bytesNewRecord = avroSerializer.serialize(topic, newEnvelope);
+
+ // Configure deserializer with evolved schema
+ config.put(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA,
cdcNewSchema.toString());
+ KafkaAvroSchemaDeserializer deserializer = new
KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config));
+ deserializer.configure(new HashMap(config), false);
+
+ // === 1. deserialize(String, Boolean, byte[], Schema) — the existing
override ===
+ IndexedRecord evolvedDeserialized = (IndexedRecord)
deserializer.deserialize(topic, false, bytesOldRecord, cdcNewSchema);
+ GenericRecord evolvedEnvelope = (GenericRecord) evolvedDeserialized;
+ assertEquals(cdcNewSchema, evolvedEnvelope.getSchema());
+ assertEquals("u", evolvedEnvelope.get("op").toString());
+ // Validate before record
+ GenericRecord beforeRecord = (GenericRecord) evolvedEnvelope.get("before");
+ assertEquals("Old Item Title", beforeRecord.get("title").toString());
+ assertEquals(42, beforeRecord.get("account_id"));
+ assertEquals("TO_DO", beforeRecord.get("status").toString());
+ assertNull(beforeRecord.get(20));
Review Comment:
🤖 nit: `get(20)` through `get(23)` is very hard to follow without the schema
in hand — could you use the field names directly (`get("notes")`,
`get("search_engine_id")`, etc.) to make it self-documenting and
schema-order-independent?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]