tandonraghav opened a new issue #3078:
URL: https://github.com/apache/hudi/issues/3078


   
   **Describe the problem you faced**
   
   We have a stream of partial records (Mongo CDC oplogs) and want to update 
the keys which are passed in partial records, and keeping the other values 
intact.
   
   A sample record in our CDC Kafka:-
   
   
````{"op":"u","doc_id":"606ffc3c10f9138e2a6b6csdc","shard_id":"shard01","ts":{"$timestamp":{"i":1,"t":1617951883}},"db_name":"test","collection":"Users","o":{"$v":1,"$set":{"key2":"value2"}},"o2":{"_id":{"$oid":"606ffc3c10f9138e2a6b6csdc"}}}````
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Create a custom `PAYLOAD_CLASS_OPT_KEY` as described below.
   2. Push some records (partial records) using Spark DF and persist in Hudi.
   3. Evolve the schema, add a new field to the existing Schema and persist via 
Spark DF
   4. `combineAndGetUpdateValue` is not getting called when Schema is evolved, 
which is making other values as NULL, as only partial record is getting passed 
and combine logic is present in custom class. However, this behaviour is not 
observed when Schema remains constant.
   
   **Expected behavior**
   
   Irrespective of Schema evolution, when compaction happens it should always 
go through `combineAndGetUpdateValue` of the class provided.
   
   **Environment Description**
   
   * Hudi version : 0.9.0-SNAPSHOT
   
   * Spark version : 2.4
   
   * Hive version : 
   
   * Hadoop version :
   
   * Storage (HDFS/S3/GCS..) : HDFS
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   Custom Payload class-
   
   ````
   
   
   import org.apache.avro.Schema;
   import org.apache.avro.generic.GenericRecord;
   import org.apache.avro.generic.IndexedRecord;
   import org.apache.hudi.avro.HoodieAvroUtils;
   import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
   import org.apache.hudi.common.util.Option;
   
   import java.io.IOException;
   import java.util.List;
   import java.util.Properties;
   
   public class MongoHudiCDCPayload extends OverwriteWithLatestAvroPayload {
   
   
       public MongoHudiCDCPayload(GenericRecord record, Comparable orderingVal) 
{
           super(record, orderingVal);
       }
   
       public MongoHudiCDCPayload(Option<GenericRecord> record) {
           super(record);
       }
   
       @Override
       public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties)
               throws IOException {
           if (this.recordBytes.length == 0) {
               return Option.empty();
           }
           GenericRecord incomingRecord = 
HoodieAvroUtils.bytesToAvro(this.recordBytes, schema);
           GenericRecord currentRecord = (GenericRecord)currentValue;
   
           List<Schema.Field> fields = incomingRecord.getSchema().getFields();
           fields.forEach((field)->{
               Object value = incomingRecord.get(field.name());
               if(value!=null)
                   currentRecord.put(field.name(), value);
           });
            return Option.of(currentRecord);
           }
   
   }
   ````
   
   **Stacktrace**
   
   ```Add the stacktrace of the error.```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to