xiarixiaoyao commented on code in PR #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r849243770
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -246,12 +260,42 @@ protected void commit(HoodieTable table, String
commitActionType, String instant
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
finalizeWrite(table, instantTime, stats);
+ // do save internal schema to support Implicitly add columns in write
process
+ if (!metadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+ && metadata.getExtraMetadata().containsKey(SCHEMA_KEY) &&
table.getConfig().getSchemaEvolutionEnable()) {
+ saveInternalSchema(table, instantTime, metadata);
+ }
// update Metadata table
writeTableMetadata(table, instantTime, commitActionType, metadata);
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType,
instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}
+ // Save internal schema
+ private void saveInternalSchema(HoodieTable table, String instantTime,
HoodieCommitMetadata metadata) {
+ TableSchemaResolver schemaUtil = new
TableSchemaResolver(table.getMetaClient());
+ String historySchemaStr =
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
+ FileBasedInternalSchemaStorageManager schemasManager = new
FileBasedInternalSchemaStorageManager(table.getMetaClient());
+ if (!historySchemaStr.isEmpty()) {
+ InternalSchema internalSchema =
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
+ SerDeHelper.parseSchemas(historySchemaStr));
+ Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new
Schema.Parser().parse(config.getSchema()));
+ InternalSchema evolvedSchema =
AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema,
internalSchema);
+ if (evolvedSchema.equals(internalSchema)) {
+ metadata.addMetadata(SerDeHelper.LATEST_SCHEMA,
SerDeHelper.toJson(evolvedSchema));
+ //TODO save history schema by metaTable
+ schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
+ } else {
+ evolvedSchema.setSchemaId(Long.parseLong(instantTime));
+ String newSchemaStr = SerDeHelper.toJson(evolvedSchema);
+ metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, newSchemaStr);
+ schemasManager.persistHistorySchemaStr(instantTime,
SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr));
+ }
+ // update SCHEMA_KEY
Review Comment:
Hi danney Do you want to ask line 292 why we use a new timeline to save
history schema?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]