hudi-agent commented on code in PR #18892:
URL: https://github.com/apache/hudi/pull/18892#discussion_r3431457562


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deser/KafkaAvroSchemaDeserializer.java:
##########
@@ -57,6 +58,21 @@ public void configure(Map<String, ?> configs, boolean isKey) 
{
     }
   }
 
+  @Override
+  public Object deserialize(String topic, byte[] bytes) {
+    return this.deserialize(topic, false, bytes, sourceSchema);
+  }
+
+  @Override
+  public Object deserialize(String topic, byte[] bytes, Schema readerSchema) {
+    return this.deserialize(topic, false, bytes, sourceSchema);
+  }
+
+  @Override
+  public Object deserialize(String topic, Headers headers, byte[] bytes) {
+    return super.deserialize(topic, false, bytes, sourceSchema);

Review Comment:
   🤖 Was the use of `super.deserialize(...)` here (vs `this.deserialize(...)` 
in the two overloads above) intentional? It's functionally equivalent today 
since the existing protected override just delegates to `super`, but the 
inconsistency means this overload would silently bypass the protected override 
if it ever gains additional logic (logging, metrics, retry, etc.). Mind 
switching it to `this.deserialize(topic, false, bytes, sourceSchema)` for 
consistency?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deser/TestKafkaAvroSchemaDeserializer.java:
##########
@@ -125,4 +129,205 @@ public void testKafkaAvroSchemaDeserializer() {
     assertEquals(HoodieSchema.fromAvroSchema(actualRec.getSchema()), 
evolSchema);
     assertNull(genericRecord.get("age"));
   }
+
+  private Schema loadSchemaFromResource(String resourcePath) throws 
IOException {
+    try (InputStream is = 
getClass().getClassLoader().getResourceAsStream(resourcePath)) {
+      return new Schema.Parser().parse(is);
+    }
+  }
+
+  private static Schema getRecordTypeFromUnion(Schema unionSchema) {
+    return unionSchema.getTypes().stream()
+        .filter(s -> s.getType() == Schema.Type.RECORD)
+        .findFirst()
+        .orElseThrow(() -> new IllegalArgumentException("No record type in 
union"));
+  }
+
+  private GenericRecord createCdcSourceRecord(Schema envelopeSchema) {
+    Schema sourceSchema = envelopeSchema.getField("source").schema();
+    GenericRecord source = new GenericData.Record(sourceSchema);
+    source.put("version", "2.3.0.Final");
+    source.put("connector", "mysql");
+    source.put("name", "cdc");
+    source.put("ts_ms", 1700000000000L);
+    source.put("snapshot", "false");
+    source.put("db", "testdb");
+    source.put("sequence", null);
+    source.put("table", "item");
+    source.put("server_id", 1L);
+    source.put("gtid", null);
+    source.put("file", "binlog.000001");
+    source.put("pos", 12345L);
+    source.put("row", 0);
+    source.put("thread", null);
+    source.put("query", null);
+    return source;
+  }
+
+  private GenericRecord createCdcValueRecord(Schema envelopeSchema) {
+    Schema valueSchema = 
getRecordTypeFromUnion(envelopeSchema.getField("before").schema());
+    GenericRecord value = new GenericData.Record(valueSchema);
+    value.put("id", ByteBuffer.wrap(new byte[]{1, 2, 3, 4}));
+    value.put("account_id", 42);
+    value.put("title", "Test Item");
+    value.put("query", "test query");
+    value.put("request_query_id", null);
+    value.put("page", null);
+    value.put("request_page_id", null);
+    value.put("source_rank_id", null);
+    value.put("property_id", 100);
+    value.put("created", "2024-01-01T00:00:00Z");
+    value.put("created_by", 1);
+    value.put("updated", "2024-01-02T00:00:00Z");
+    value.put("updated_by", 2);
+    value.put("version_id", null);
+    value.put("related_urls", null);
+    value.put("assignee", 5);
+    value.put("status", "IN_PROGRESS");
+    value.put("tags", null);
+    value.put("deleted", false);
+    value.put("profile_id", null);
+    return value;
+  }
+
+  private GenericRecord createCdcValueRecordBefore(Schema envelopeSchema) {
+    Schema valueSchema = 
getRecordTypeFromUnion(envelopeSchema.getField("before").schema());
+    GenericRecord value = new GenericData.Record(valueSchema);
+    value.put("id", ByteBuffer.wrap(new byte[]{1, 2, 3, 4}));
+    value.put("account_id", 42);
+    value.put("title", "Old Item Title");
+    value.put("query", "old query");
+    value.put("request_query_id", null);
+    value.put("page", "https://example.com/old";);
+    value.put("request_page_id", null);
+    value.put("source_rank_id", 10);
+    value.put("property_id", 100);
+    value.put("created", "2024-01-01T00:00:00Z");
+    value.put("created_by", 1);
+    value.put("updated", "2024-01-02T00:00:00Z");
+    value.put("updated_by", 1);
+    value.put("version_id", "v1");
+    value.put("related_urls", "[\"https://example.com\"]";);
+    value.put("assignee", 5);
+    value.put("status", "TO_DO");
+    value.put("tags", null);
+    value.put("deleted", false);
+    value.put("profile_id", null);
+    return value;
+  }
+
+  private GenericRecord createCdcEnvelopeRecord(Schema envelopeSchema, 
GenericRecord beforeRecord, GenericRecord afterRecord) {
+    GenericRecord envelope = new GenericData.Record(envelopeSchema);
+    envelope.put("before", beforeRecord);
+    envelope.put("after", afterRecord);
+    envelope.put("source", createCdcSourceRecord(envelopeSchema));
+    envelope.put("op", "u");
+    envelope.put("ts_ms", 1700000000000L);
+    envelope.put("transaction", null);
+    return envelope;
+  }
+
+  /**
+   * Tests deserialization of a CDC (Debezium) envelope schema with schema 
evolution
+   * across all deserialize method overloads. The old schema lacks 4 fields
+   * (notes, search_engine_id, locale_id, language_id) in the nested Value 
record.
+   * When deserializing old records with the evolved schema, those new fields 
should default to null.
+   *
+   * Exercises:
+   * - deserialize(String topic, Boolean isKey, byte[] payload, Schema 
readerSchema)
+   * - deserialize(String topic, byte[] bytes)
+   * - deserialize(String topic, byte[] bytes, Schema readerSchema)
+   * - deserialize(String topic, Headers headers, byte[] bytes)
+   */
+  @Test
+  public void testKafkaAvroSchemaDeserializerWithCdcEnvelopeEvolution() throws 
IOException {
+    Schema cdcOldSchema = 
loadSchemaFromResource("schema/cdc_envelope_old.avsc");
+    Schema cdcNewSchema = 
loadSchemaFromResource("schema/cdc_envelope_new.avsc");
+
+    // Create and serialize records with old schema (no 
notes/search_engine_id/locale_id/language_id)
+    GenericRecord oldBefore = createCdcValueRecordBefore(cdcOldSchema);
+    GenericRecord oldAfter = createCdcValueRecord(cdcOldSchema);
+    GenericRecord oldEnvelope = createCdcEnvelopeRecord(cdcOldSchema, 
oldBefore, oldAfter);
+    byte[] bytesOldRecord = avroSerializer.serialize(topic, oldEnvelope);
+
+    // Create and serialize records with evolved schema (new fields populated)
+    GenericRecord newBefore = createCdcValueRecordBefore(cdcNewSchema);
+    GenericRecord newAfter = createCdcValueRecord(cdcNewSchema);
+    newAfter.put("notes", "[{\"note\": \"test\"}]");
+    newAfter.put("search_engine_id", "[\"default\"]");
+    newAfter.put("locale_id", 1);
+    newAfter.put("language_id", 2);
+    GenericRecord newEnvelope = createCdcEnvelopeRecord(cdcNewSchema, 
newBefore, newAfter);
+    byte[] bytesNewRecord = avroSerializer.serialize(topic, newEnvelope);
+
+    // Configure deserializer with evolved schema
+    config.put(AvroKafkaSource.KAFKA_AVRO_VALUE_DESERIALIZER_SCHEMA, 
cdcNewSchema.toString());
+    KafkaAvroSchemaDeserializer deserializer = new 
KafkaAvroSchemaDeserializer(schemaRegistry, new HashMap(config));
+    deserializer.configure(new HashMap(config), false);
+
+    // === 1. deserialize(String, Boolean, byte[], Schema) — the existing 
override ===
+    IndexedRecord evolvedDeserialized = (IndexedRecord) 
deserializer.deserialize(topic, false, bytesOldRecord, cdcNewSchema);
+    GenericRecord evolvedEnvelope = (GenericRecord) evolvedDeserialized;
+    assertEquals(cdcNewSchema, evolvedEnvelope.getSchema());
+    assertEquals("u", evolvedEnvelope.get("op").toString());
+    // Validate before record
+    GenericRecord beforeRecord = (GenericRecord) evolvedEnvelope.get("before");
+    assertEquals("Old Item Title", beforeRecord.get("title").toString());
+    assertEquals(42, beforeRecord.get("account_id"));
+    assertEquals("TO_DO", beforeRecord.get("status").toString());
+    assertNull(beforeRecord.get(20));

Review Comment:
   🤖 nit: `get(20)` through `get(23)` is very hard to follow without the schema 
in hand — could you use the field names directly (`get("notes")`, 
`get("search_engine_id")`, etc.) to make it self-documenting and 
schema-order-independent?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to