Davis Zhang created HUDI-8438:
---------------------------------
Summary: Fix table schema in commit metadata of table services
Key: HUDI-8438
URL: https://issues.apache.org/jira/browse/HUDI-8438
Project: Apache Hudi
Issue Type: Improvement
Components: multi-writer
Reporter: Davis Zhang
related Jira https://issues.apache.org/jira/browse/HUDI-8219
In the Jira above we found issues with how table schema is resolved, where it
can read a latest completed instant coming from table service and the schema in
the commit metadata is a stale one.
The main reason is they don't go through
org.apache.hudi.client.transaction.SimpleSchemaConflictResolutionStrategy#resolveConcurrentSchemaEvolution
when it writes commit metadata to complete instant.
As a result,
org.apache.hudi.common.table.TableSchemaResolver#getTableAvroSchemaFromSchemaEvolutionTimeline
is used in
org.apache.hudi.client.transaction.SimpleSchemaConflictResolutionStrategy#resolveConcurrentSchemaEvolution
to skip instants from table services when fetching the table schema.
To fix the issue,
For clustering, the auto commit hard-coded to false, so at the time it tries to
do commit, it goes a different commit code path at
org.apache.hudi.client.HoodieFlinkTableServiceClient#completeClustering. Here
we need to read the latest table schema and use that in commit metadata.
For compaction, in
SimpleSchemaConflictResolutionStrategy#resolveConcurrentSchemaEvolution we
should do the same thing of reading the latest table schema and return. We
don't need to go with full schema evolution logic since compaction does not
evolve schema so it must not hit any schema conflict.
For clean, rollback, archive their completed instant will be filtered out while
reading the table schema, so it's fine if they do not come with one in their
commit metadata.
After fixing these, we should have:
In the timeline, for all completed instant of COMMIT, DELTA_COMMNIT,
REPLACEMENT_COMMIT, COMPACTION, the table schema "version" is monotoncially
increasing, whoever comes later must use a more recent table schema instead of
a stale one.
The getTableAvroSchemaFromSchemaEvolutionTimeline function call would be
replace by normal getTableAvroSchema call since we don't need to filter out
table service instant anymore
What we need to change is after
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
--
This message was sent by Atlassian Jira
(v8.20.10#820010)