harshagudladona commented on issue #14070:
URL: https://github.com/apache/hudi/issues/14070#issuecomment-3407168975
We cannot share the whole schema but here is a snippet there the reader and
the writer schema will differ
WRITER:
```
{
"name" : "enrichments",
"type" : [ "null", {
"type" : "record",
"name" : "Enrichments",
"namespace" : "com.secureworks.platform.schemas.core.common.Types",
"fields" : [ {
"name" : "attack_technique_ids",
"type" : [ "null", {
"type" : "array",
"items" : [ "null", "string" ]
} ],
"default" : null
}, {
"name" : "rule_id_to_techniques",
"type" : [ "null", {
"type" : "array",
"items" : {
"type" : "record",
"name" : "KeyAndValues",
"fields" : [ {
"name" : "key",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "values",
"type" : [ "null", {
"type" : "array",
"items" : [ "null", "string" ]
} ],
"default" : null
} ]
}
} ],
"default" : null
} ]
} ],
"default" : null
}
```
READER:
```
{
"name" : "enrichments",
"type" : [ "null", {
"type" : "record",
"name" : "enrichments",
"namespace" : "",
"fields" : [ {
"name" : "attack_technique_ids",
"type" : [ "null", {
"type" : "array",
"items" : "string"
} ],
"default" : null
}, {
"name" : "rule_id_to_techniques",
"type" : [ "null", {
"type" : "array",
"items" : {
"type" : "record",
"name" : "array",
"fields" : [ {
"name" : "key",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "values",
"type" : [ "null", {
"type" : "array",
"items" : "string"
} ],
"default" : null
} ]
}
} ],
"default" : null
} ]
} ],
"default" : null
}
```
Notice the difference in the ` "name" : "KeyAndValues",` in writer and
`"name" : "array",` in reader. The reason for this is that the reader is schema
derived from a basefile is a conversion of the parquet schema (MessageType)
into an avro schema. In that conversion to/from avro(Schema) to
parquet(MessageType) there are differences like this, that will never match.
See the snippet below from org.apache.parquet.avro.AvroSchemaConverter
```
} else if (type.equals(Schema.Type.ARRAY)) {
if (writeOldListStructure) {
return ConversionPatterns.listType(repetition, fieldName,
convertField("array", schema.getElementType(), REPEATED,
schemaPath));
} else {
return ConversionPatterns.listOfElements(repetition, fieldName,
convertField(AvroWriteSupport.LIST_ELEMENT_NAME,
schema.getElementType(), schemaPath));
}
}
```
What I tried to address this is the following:
BEFORE:
```
@Override
public void runMerge(HoodieTable<?, ?, ?, ?> table,
HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws
IOException {
HoodieWriteConfig writeConfig = table.getConfig();
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
HoodieRecord.HoodieRecordType recordType =
table.getConfig().getRecordMerger().getRecordType();
HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(
table.getStorage().newInstance(mergeHandle.getOldFilePath(),
table.getStorageConf().newInstance()))
.getReaderFactory(recordType)
.getFileReader(writeConfig, mergeHandle.getOldFilePath());
HoodieFileReader bootstrapFileReader = null;
Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
Schema readerSchema = baseFileReader.getSchema();
// In case Advanced Schema Evolution is enabled we might need to rewrite
currently
// persisted records to adhere to an evolved schema
Option<Function<HoodieRecord, HoodieRecord>>
schemaEvolutionTransformerOpt =
composeSchemaEvolutionTransformer(readerSchema, writerSchema,
baseFile, writeConfig, table.getMetaClient());
// Check whether the writer schema is simply a projection of the file's
one, ie
// - Its field-set is a proper subset (of the reader schema)
// - There's no schema evolution transformation necessary
boolean isPureProjection = schemaEvolutionTransformerOpt.isEmpty()
&& isStrictProjectionOf(readerSchema, writerSchema);
...
```
AFTER:
```
@Override
public void runMerge(HoodieTable<?, ?, ?, ?> table,
HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws
IOException {
HoodieWriteConfig writeConfig = table.getConfig();
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
HoodieRecord.HoodieRecordType recordType =
table.getConfig().getRecordMerger().getRecordType();
HoodieFileReader baseFileReader = HoodieIOFactory.getIOFactory(
table.getStorage().newInstance(mergeHandle.getOldFilePath(),
table.getStorageConf().newInstance()))
.getReaderFactory(recordType)
.getFileReader(writeConfig, mergeHandle.getOldFilePath());
HoodieFileReader bootstrapFileReader = null;
Schema readerSchema = baseFileReader.getSchema();
Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
// This is necessary to ensure the writer schema is compatible with the
parquet schema
// as the Avro schema used to write the parquet file might be different
than the
// writer schema (eg: default values added for missing fields,
documentation etc)
MessageType parquetSchema = new
AvroSchemaConverter(table.getStorage().getConf().unwrapAs(Configuration.class)).convert(writerSchema);
Schema parquetCompatibleWriteSchema = new
AvroSchemaConverter(table.getStorage().getConf().unwrapAs(Configuration.class)).convert(parquetSchema);
// In case Advanced Schema Evolution is enabled we might need to rewrite
currently
// persisted records to adhere to an evolved schema
Option<Function<HoodieRecord, HoodieRecord>>
schemaEvolutionTransformerOpt =
composeSchemaEvolutionTransformer(readerSchema, writerSchema,
baseFile, writeConfig, table.getMetaClient());
// Check whether the writer schema is simply a projection of the file's
one, ie
// - Its field-set is a proper subset (of the reader schema)
// - There's no schema evolution transformation necessary
boolean isStrictProjection = isStrictProjectionOf(readerSchema,
parquetCompatibleWriteSchema);
boolean isPureProjection = schemaEvolutionTransformerOpt.isEmpty()
&& isStrictProjection;
```
I used the same converter to make sure the writer schema is transformed into
parquet Parquet-friendly version. This fixed the problem of false-positive
failure of `isStrictProjectionOf` thereby preventing a record rewrite.
Additionally, In the HoodieMergeHandle
```
protected void writeToFile(HoodieKey key, HoodieRecord<T> record, Schema
schema, Properties prop, boolean shouldPreserveRecordMetadata) throws
IOException {
// NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly
point to the
// file holding this record even in cases when overall metadata is
preserved
MetadataValues metadataValues = new
MetadataValues().setFileName(newFilePath.getName());
HoodieRecord populatedRecord = record.prependMetaFields(schema,
writeSchemaWithMetaFields, metadataValues, prop);
if (shouldPreserveRecordMetadata) {
fileWriter.write(key.getRecordKey(), populatedRecord,
writeSchemaWithMetaFields);
} else {
fileWriter.writeWithMetadata(key, populatedRecord,
writeSchemaWithMetaFields);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]