bvaradar commented on a change in pull request #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r831620873
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -237,6 +251,30 @@ protected void commit(HoodieTable table, String
commitActionType, String instant
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
finalizeWrite(table, instantTime, stats);
+ // do save internal schema to support Implicitly add columns in write
process
+ if (!metadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+ && metadata.getExtraMetadata().containsKey(SCHEMA_KEY) &&
table.getConfig().getSchemaEvolutionEnable()) {
+ TableSchemaResolver schemaUtil = new
TableSchemaResolver(table.getMetaClient());
+ String historySchemaStr =
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
+ FileBasedInternalSchemaStorageManager schemasManager = new
FileBasedInternalSchemaStorageManager(table.getMetaClient());
+ if (!historySchemaStr.isEmpty()) {
+ InternalSchema internalSchema =
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
+ SerDeHelper.parseSchemas(historySchemaStr));
+ Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new
Schema.Parser().parse(config.getSchema()));
+ InternalSchema evolutionSchema =
AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema,
internalSchema);
Review comment:
nit: evolutionSchema to evolvedSchema
##########
File path:
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
##########
@@ -237,6 +251,30 @@ protected void commit(HoodieTable table, String
commitActionType, String instant
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
finalizeWrite(table, instantTime, stats);
+ // do save internal schema to support Implicitly add columns in write
process
+ if (!metadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+ && metadata.getExtraMetadata().containsKey(SCHEMA_KEY) &&
table.getConfig().getSchemaEvolutionEnable()) {
+ TableSchemaResolver schemaUtil = new
TableSchemaResolver(table.getMetaClient());
+ String historySchemaStr =
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
+ FileBasedInternalSchemaStorageManager schemasManager = new
FileBasedInternalSchemaStorageManager(table.getMetaClient());
+ if (!historySchemaStr.isEmpty()) {
+ InternalSchema internalSchema =
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
+ SerDeHelper.parseSchemas(historySchemaStr));
+ Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new
Schema.Parser().parse(config.getSchema()));
+ InternalSchema evolutionSchema =
AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema,
internalSchema);
+ if (evolutionSchema.equals(internalSchema)) {
+ metadata.addMetadata(SerDeHelper.LATEST_SCHEMA,
SerDeHelper.toJson(evolutionSchema));
+ schemasManager.persistHistorySchemaStr(instantTime,
historySchemaStr);
Review comment:
If the Schema is unchanged, can we avoid writing the history schema ? We
would need to change the InternalSchemaUtils.searchSchema code to look for
nearest version id smaller than the query Version to resolve.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]