[
https://issues.apache.org/jira/browse/HUDI-9597?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Davis Zhang updated HUDI-9597:
------------------------------
Description:
on branch 0.14, we have
{code:java}
// Save internal schema
private void saveInternalSchema(HoodieTable table, String instantTime,
HoodieCommitMetadata metadata) {
TableSchemaResolver schemaUtil = new
TableSchemaResolver(table.getMetaClient());
String historySchemaStr =
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
FileBasedInternalSchemaStorageManager schemasManager = new
FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty() ||
Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key())))
{
InternalSchema internalSchema;
Schema avroSchema =
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(),
config.allowOperationMetadataField());
if (historySchemaStr.isEmpty()) {
internalSchema =
SerDeHelper.fromJson(config.getInternalSchema()).orElse(AvroInternalSchemaConverter.convert(avroSchema));
internalSchema.setSchemaId(Long.parseLong(instantTime));
} else {
internalSchema =
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
}
InternalSchema evolvedSchema =
AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema); {code}
the AvroSchemaEvolutionUtils.reconcileSchema take 2 inputs
avroSchema - which is the writer schema populated in string format from the
writer config
internalSchema - which is read from the branch of
{code:java}
internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr)); {code}
which is reading some string from some file and build internal schema out of
it, for each field inside the schema it comes with a field id
org.apache.hudi.internal.schema.Types.Field#id
inside the AvroSchemaEvolutionUtils.reconcileSchema it converts avroSchema to
another InternalSchema, let's call it incomingInternalSchema, each field is
assigned with with org.apache.hudi.internal.schema.Types.Field#id when fields
are created out of the avro schema.
There is a corner case in some setup that
incomingInternalSchema and internalSchema only differs in field id for some of
the fields
what the code react to this is
even if field id difference will account as a schema change, as a result, it
derives a evolvedSchema by combining the 2 schemas
But what happens is that the evolved schema pick the field id from the
internalSchema. As a result, when we use the evolved schema for commit, in the
next iteration, we again find the same id mismatch and do the same thing, which
leads to a dead loop.
Given the fact that the new schema are written to some file and is read by hudi
on every writes, file keeps bulking up and eventually leads to OOM when it
tries to read the file again.
We need to walk through the hudi design on this part and come up with a proper
long term fix for it.
a short term fix we can do is if evolved schema equals to the old
internalSchema just the version number of the schema object differs (which
means we bump up version number with 0 change to the schema itself), we return
old schema. overriding the decision of schema evolution.
-
was:
on branch 0.14, we have
{code:java}
// Save internal schema
private void saveInternalSchema(HoodieTable table, String instantTime,
HoodieCommitMetadata metadata) {
TableSchemaResolver schemaUtil = new
TableSchemaResolver(table.getMetaClient());
String historySchemaStr =
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
FileBasedInternalSchemaStorageManager schemasManager = new
FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty() ||
Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key())))
{
InternalSchema internalSchema;
Schema avroSchema =
HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(),
config.allowOperationMetadataField());
if (historySchemaStr.isEmpty()) {
internalSchema =
SerDeHelper.fromJson(config.getInternalSchema()).orElse(AvroInternalSchemaConverter.convert(avroSchema));
internalSchema.setSchemaId(Long.parseLong(instantTime));
} else {
internalSchema =
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
}
InternalSchema evolvedSchema =
AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema); {code}
the AvroSchemaEvolutionUtils.reconcileSchema take 2 inputs
avroSchema - which is the writer schema populated in string format from the
writer config
internalSchema - which is read from the branch
> Schema reconcilation issue
> --------------------------
>
> Key: HUDI-9597
> URL: https://issues.apache.org/jira/browse/HUDI-9597
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Davis Zhang
> Priority: Major
>
> on branch 0.14, we have
>
> {code:java}
> // Save internal schema
> private void saveInternalSchema(HoodieTable table, String instantTime,
> HoodieCommitMetadata metadata) {
> TableSchemaResolver schemaUtil = new
> TableSchemaResolver(table.getMetaClient());
> String historySchemaStr =
> schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
> FileBasedInternalSchemaStorageManager schemasManager = new
> FileBasedInternalSchemaStorageManager(table.getMetaClient());
> if (!historySchemaStr.isEmpty() ||
> Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key())))
> {
> InternalSchema internalSchema;
> Schema avroSchema =
> HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(),
> config.allowOperationMetadataField());
> if (historySchemaStr.isEmpty()) {
> internalSchema =
> SerDeHelper.fromJson(config.getInternalSchema()).orElse(AvroInternalSchemaConverter.convert(avroSchema));
> internalSchema.setSchemaId(Long.parseLong(instantTime));
> } else {
> internalSchema =
> InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
> SerDeHelper.parseSchemas(historySchemaStr));
> }
> InternalSchema evolvedSchema =
> AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema); {code}
> the AvroSchemaEvolutionUtils.reconcileSchema take 2 inputs
>
> avroSchema - which is the writer schema populated in string format from the
> writer config
> internalSchema - which is read from the branch of
>
> {code:java}
> internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
> SerDeHelper.parseSchemas(historySchemaStr)); {code}
> which is reading some string from some file and build internal schema out of
> it, for each field inside the schema it comes with a field id
> org.apache.hudi.internal.schema.Types.Field#id
>
>
> inside the AvroSchemaEvolutionUtils.reconcileSchema it converts avroSchema to
> another InternalSchema, let's call it incomingInternalSchema, each field is
> assigned with with org.apache.hudi.internal.schema.Types.Field#id when fields
> are created out of the avro schema.
>
> There is a corner case in some setup that
> incomingInternalSchema and internalSchema only differs in field id for some
> of the fields
> what the code react to this is
> even if field id difference will account as a schema change, as a result, it
> derives a evolvedSchema by combining the 2 schemas
>
> But what happens is that the evolved schema pick the field id from the
> internalSchema. As a result, when we use the evolved schema for commit, in
> the next iteration, we again find the same id mismatch and do the same thing,
> which leads to a dead loop.
>
> Given the fact that the new schema are written to some file and is read by
> hudi on every writes, file keeps bulking up and eventually leads to OOM when
> it tries to read the file again.
>
> We need to walk through the hudi design on this part and come up with a
> proper long term fix for it.
>
> a short term fix we can do is if evolved schema equals to the old
> internalSchema just the version number of the schema object differs (which
> means we bump up version number with 0 change to the schema itself), we
> return old schema. overriding the decision of schema evolution.
>
>
>
> -
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)