harshagudladona commented on issue #14070:
URL: https://github.com/apache/hudi/issues/14070#issuecomment-3407168975

   We cannot share the whole schema but here is a snippet there the reader and 
the writer schema will differ 
   WRITER:
   
   ```
   {
       "name" : "enrichments",
       "type" : [ "null", {
         "type" : "record",
         "name" : "Enrichments",
         "namespace" : "com.secureworks.platform.schemas.core.common.Types",
         "fields" : [ {
           "name" : "attack_technique_ids",
           "type" : [ "null", {
             "type" : "array",
             "items" : [ "null", "string" ]
           } ],
           "default" : null
         }, {
           "name" : "rule_id_to_techniques",
           "type" : [ "null", {
             "type" : "array",
             "items" : {
               "type" : "record",
               "name" : "KeyAndValues",
               "fields" : [ {
                 "name" : "key",
                 "type" : [ "null", "string" ],
                 "default" : null
               }, {
                 "name" : "values",
                 "type" : [ "null", {
                   "type" : "array",
                   "items" : [ "null", "string" ]
                 } ],
                 "default" : null
               } ]
             }
           } ],
           "default" : null
         } ]
       } ],
       "default" : null
     }
   ```
   
   READER:
   ```
   {
       "name" : "enrichments",
       "type" : [ "null", {
         "type" : "record",
         "name" : "enrichments",
         "namespace" : "",
         "fields" : [ {
           "name" : "attack_technique_ids",
           "type" : [ "null", {
             "type" : "array",
             "items" : "string"
           } ],
           "default" : null
         }, {
           "name" : "rule_id_to_techniques",
           "type" : [ "null", {
             "type" : "array",
             "items" : {
               "type" : "record",
               "name" : "array",
               "fields" : [ {
                 "name" : "key",
                 "type" : [ "null", "string" ],
                 "default" : null
               }, {  
                 "name" : "values",
                 "type" : [ "null", {
                   "type" : "array",
                   "items" : "string"
                 } ],
                 "default" : null
               } ]
             }
           } ],
           "default" : null
         } ]
       } ],
       "default" : null
     }
   ```
   
   Notice the difference in the ` "name" : "KeyAndValues",` in writer and 
`"name" : "array",` in reader. The reason for this is that the reader is schema 
derived from a basefile is a conversion of the parquet schema (MessageType) 
into an avro schema. In that conversion to/from avro(Schema) to 
parquet(MessageType) there are differences like this, that will never match. 
See the snippet below from org.apache.parquet.avro.AvroSchemaConverter
   
   ```
       } else if (type.equals(Schema.Type.ARRAY)) {
         if (writeOldListStructure) {
           return ConversionPatterns.listType(repetition, fieldName,
               convertField("array", schema.getElementType(), REPEATED, 
schemaPath));
         } else {
           return ConversionPatterns.listOfElements(repetition, fieldName,
               convertField(AvroWriteSupport.LIST_ELEMENT_NAME, 
schema.getElementType(), schemaPath));
         }
       } 
   ```
   
   What I tried to address this is the following:
   
   BEFORE:
   ```
     @Override
     public void runMerge(HoodieTable<?, ?, ?, ?> table,
                          HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws 
IOException {
       HoodieWriteConfig writeConfig = table.getConfig();
       HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
   
       HoodieRecord.HoodieRecordType recordType = 
table.getConfig().getRecordMerger().getRecordType();
       HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(
               table.getStorage().newInstance(mergeHandle.getOldFilePath(), 
table.getStorageConf().newInstance()))
           .getReaderFactory(recordType)
           .getFileReader(writeConfig, mergeHandle.getOldFilePath());
       HoodieFileReader bootstrapFileReader = null;
   
       Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
       Schema readerSchema = baseFileReader.getSchema();
   
       // In case Advanced Schema Evolution is enabled we might need to rewrite 
currently
       // persisted records to adhere to an evolved schema
       Option<Function<HoodieRecord, HoodieRecord>> 
schemaEvolutionTransformerOpt =
           composeSchemaEvolutionTransformer(readerSchema, writerSchema, 
baseFile, writeConfig, table.getMetaClient());
   
       // Check whether the writer schema is simply a projection of the file's 
one, ie
       //   - Its field-set is a proper subset (of the reader schema)
       //   - There's no schema evolution transformation necessary
       boolean isPureProjection = schemaEvolutionTransformerOpt.isEmpty()
           && isStrictProjectionOf(readerSchema, writerSchema);
     ...
   ```
   
   AFTER:
   ```
     @Override
     public void runMerge(HoodieTable<?, ?, ?, ?> table,
                          HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws 
IOException {
       HoodieWriteConfig writeConfig = table.getConfig();
       HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
   
       HoodieRecord.HoodieRecordType recordType = 
table.getConfig().getRecordMerger().getRecordType();
       HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(
               table.getStorage().newInstance(mergeHandle.getOldFilePath(), 
table.getStorageConf().newInstance()))
           .getReaderFactory(recordType)
           .getFileReader(writeConfig, mergeHandle.getOldFilePath());
       HoodieFileReader bootstrapFileReader = null;
       Schema readerSchema = baseFileReader.getSchema();
   
       Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
   
       // This is necessary to ensure the writer schema is compatible with the 
parquet schema
       // as the Avro schema used to write the parquet file might be different 
than the
       // writer schema (eg: default values added for missing fields, 
documentation etc)
       MessageType parquetSchema = new 
AvroSchemaConverter(table.getStorage().getConf().unwrapAs(Configuration.class)).convert(writerSchema);
       Schema parquetCompatibleWriteSchema = new 
AvroSchemaConverter(table.getStorage().getConf().unwrapAs(Configuration.class)).convert(parquetSchema);
   
       // In case Advanced Schema Evolution is enabled we might need to rewrite 
currently
       // persisted records to adhere to an evolved schema
       Option<Function<HoodieRecord, HoodieRecord>> 
schemaEvolutionTransformerOpt =
           composeSchemaEvolutionTransformer(readerSchema, writerSchema, 
baseFile, writeConfig, table.getMetaClient());
   
       // Check whether the writer schema is simply a projection of the file's 
one, ie
       //   - Its field-set is a proper subset (of the reader schema)
       //   - There's no schema evolution transformation necessary
       boolean isStrictProjection = isStrictProjectionOf(readerSchema, 
parquetCompatibleWriteSchema);
       boolean isPureProjection = schemaEvolutionTransformerOpt.isEmpty()
           && isStrictProjection;
     ```
   
   I used the same converter to make sure the writer schema is transformed into 
parquet Parquet-friendly version. This fixed the problem of false-positive 
failure of `isStrictProjectionOf` thereby preventing a record rewrite.
   
   
   Additionally, In the HoodieMergeHandle 
   
   ```
     protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema 
schema, Properties prop, boolean shouldPreserveRecordMetadata) throws 
IOException {
       // NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly 
point to the
       //       file holding this record even in cases when overall metadata is 
preserved
       MetadataValues metadataValues = new 
MetadataValues().setFileName(newFilePath.getName());
       HoodieRecord populatedRecord = record.prependMetaFields(schema, 
writeSchemaWithMetaFields, metadataValues, prop);
   
       if (shouldPreserveRecordMetadata) {
         fileWriter.write(key.getRecordKey(), populatedRecord, 
writeSchemaWithMetaFields);
       } else {
         fileWriter.writeWithMetadata(key, populatedRecord, 
writeSchemaWithMetaFields);
       }
     }
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to