[ 
https://issues.apache.org/jira/browse/HUDI-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Davis Zhang updated HUDI-9597:
------------------------------
    Description: 
on branch 0.14, we have 

 
{code:java}
// Save internal schema
private void saveInternalSchema(HoodieTable table, String instantTime, 
HoodieCommitMetadata metadata) {
  TableSchemaResolver schemaUtil = new 
TableSchemaResolver(table.getMetaClient());
  String historySchemaStr = 
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
  FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(table.getMetaClient());
  if (!historySchemaStr.isEmpty() || 
Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key())))
 {
    InternalSchema internalSchema;
    Schema avroSchema = 
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), 
config.allowOperationMetadataField());
    if (historySchemaStr.isEmpty()) {
      internalSchema = 
SerDeHelper.fromJson(config.getInternalSchema()).orElse(AvroInternalSchemaConverter.convert(avroSchema));
      internalSchema.setSchemaId(Long.parseLong(instantTime));
    } else {
      internalSchema = 
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
          SerDeHelper.parseSchemas(historySchemaStr));
    }
    InternalSchema evolvedSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema); {code}
the AvroSchemaEvolutionUtils.reconcileSchema take 2 inputs

 

avroSchema - which is the writer schema populated in string format from the 
writer config

internalSchema - which is read from the branch of

 
{code:java}
internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
    SerDeHelper.parseSchemas(historySchemaStr)); {code}
which is reading some string from some file and build internal schema out of 
it, for each field inside the schema it comes with a field id 
org.apache.hudi.internal.schema.Types.Field#id

 

 

inside the AvroSchemaEvolutionUtils.reconcileSchema it converts avroSchema to 
another InternalSchema, let's call it incomingInternalSchema, each field is 
assigned with with org.apache.hudi.internal.schema.Types.Field#id when fields 
are created out of the avro schema.

 

There is a corner case in some setup that

incomingInternalSchema and internalSchema only differs in field id for some of 
the fields

what the code react to this is

even if field id difference will account as a schema change, as a result, it 
derives a evolvedSchema by combining the 2 schemas

 

But what happens is that the evolved schema pick the field id from the 
internalSchema. As a result, when we use the evolved schema for commit, in the 
next iteration, we again find the same id mismatch and do the same thing, which 
leads to a dead loop.

 

Given the fact that the new schema are written to some file and is read by hudi 
on every writes, file keeps bulking up and eventually leads to OOM when it 
tries to read the file again.

 

We need to walk through the hudi design on this part and come up with a proper 
long term fix for it.

 

a short term fix we can do is if evolved schema equals to the old 
internalSchema just the version number of the schema object differs (which 
means we bump up version number with 0 change to the schema itself), we return 
old schema. overriding the decision of schema evolution.

 

 

 

- 

 

 

  was:
on branch 0.14, we have 

 
{code:java}
// Save internal schema
private void saveInternalSchema(HoodieTable table, String instantTime, 
HoodieCommitMetadata metadata) {
  TableSchemaResolver schemaUtil = new 
TableSchemaResolver(table.getMetaClient());
  String historySchemaStr = 
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
  FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(table.getMetaClient());
  if (!historySchemaStr.isEmpty() || 
Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key())))
 {
    InternalSchema internalSchema;
    Schema avroSchema = 
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), 
config.allowOperationMetadataField());
    if (historySchemaStr.isEmpty()) {
      internalSchema = 
SerDeHelper.fromJson(config.getInternalSchema()).orElse(AvroInternalSchemaConverter.convert(avroSchema));
      internalSchema.setSchemaId(Long.parseLong(instantTime));
    } else {
      internalSchema = 
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
          SerDeHelper.parseSchemas(historySchemaStr));
    }
    InternalSchema evolvedSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema); {code}
the AvroSchemaEvolutionUtils.reconcileSchema take 2 inputs

 

avroSchema - which is the writer schema populated in string format from the 
writer config

internalSchema - which is read from the branch

 

 


> Schema reconcilation issue
> --------------------------
>
>                 Key: HUDI-9597
>                 URL: https://issues.apache.org/jira/browse/HUDI-9597
>             Project: Apache Hudi
>          Issue Type: Bug
>            Reporter: Davis Zhang
>            Priority: Major
>
> on branch 0.14, we have 
>  
> {code:java}
> // Save internal schema
> private void saveInternalSchema(HoodieTable table, String instantTime, 
> HoodieCommitMetadata metadata) {
>   TableSchemaResolver schemaUtil = new 
> TableSchemaResolver(table.getMetaClient());
>   String historySchemaStr = 
> schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
>   FileBasedInternalSchemaStorageManager schemasManager = new 
> FileBasedInternalSchemaStorageManager(table.getMetaClient());
>   if (!historySchemaStr.isEmpty() || 
> Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key())))
>  {
>     InternalSchema internalSchema;
>     Schema avroSchema = 
> HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), 
> config.allowOperationMetadataField());
>     if (historySchemaStr.isEmpty()) {
>       internalSchema = 
> SerDeHelper.fromJson(config.getInternalSchema()).orElse(AvroInternalSchemaConverter.convert(avroSchema));
>       internalSchema.setSchemaId(Long.parseLong(instantTime));
>     } else {
>       internalSchema = 
> InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
>           SerDeHelper.parseSchemas(historySchemaStr));
>     }
>     InternalSchema evolvedSchema = 
> AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema); {code}
> the AvroSchemaEvolutionUtils.reconcileSchema take 2 inputs
>  
> avroSchema - which is the writer schema populated in string format from the 
> writer config
> internalSchema - which is read from the branch of
>  
> {code:java}
> internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
>     SerDeHelper.parseSchemas(historySchemaStr)); {code}
> which is reading some string from some file and build internal schema out of 
> it, for each field inside the schema it comes with a field id 
> org.apache.hudi.internal.schema.Types.Field#id
>  
>  
> inside the AvroSchemaEvolutionUtils.reconcileSchema it converts avroSchema to 
> another InternalSchema, let's call it incomingInternalSchema, each field is 
> assigned with with org.apache.hudi.internal.schema.Types.Field#id when fields 
> are created out of the avro schema.
>  
> There is a corner case in some setup that
> incomingInternalSchema and internalSchema only differs in field id for some 
> of the fields
> what the code react to this is
> even if field id difference will account as a schema change, as a result, it 
> derives a evolvedSchema by combining the 2 schemas
>  
> But what happens is that the evolved schema pick the field id from the 
> internalSchema. As a result, when we use the evolved schema for commit, in 
> the next iteration, we again find the same id mismatch and do the same thing, 
> which leads to a dead loop.
>  
> Given the fact that the new schema are written to some file and is read by 
> hudi on every writes, file keeps bulking up and eventually leads to OOM when 
> it tries to read the file again.
>  
> We need to walk through the hudi design on this part and come up with a 
> proper long term fix for it.
>  
> a short term fix we can do is if evolved schema equals to the old 
> internalSchema just the version number of the schema object differs (which 
> means we bump up version number with 0 change to the schema itself), we 
> return old schema. overriding the decision of schema evolution.
>  
>  
>  
> - 
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to