xiarixiaoyao commented on code in PR #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r849186256
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -246,12 +260,42 @@ protected void commit(HoodieTable table, String
commitActionType, String instant
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
// Finalize write
finalizeWrite(table, instantTime, stats);
+ // do save internal schema to support Implicitly add columns in write
process
+ if (!metadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+ && metadata.getExtraMetadata().containsKey(SCHEMA_KEY) &&
table.getConfig().getSchemaEvolutionEnable()) {
+ saveInternalSchema(table, instantTime, metadata);
+ }
// update Metadata table
writeTableMetadata(table, instantTime, commitActionType, metadata);
activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType,
instantTime),
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
}
+ // Save internal schema
+ private void saveInternalSchema(HoodieTable table, String instantTime,
HoodieCommitMetadata metadata) {
+ TableSchemaResolver schemaUtil = new
TableSchemaResolver(table.getMetaClient());
+ String historySchemaStr =
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
+ FileBasedInternalSchemaStorageManager schemasManager = new
FileBasedInternalSchemaStorageManager(table.getMetaClient());
+ if (!historySchemaStr.isEmpty()) {
+ InternalSchema internalSchema =
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
+ SerDeHelper.parseSchemas(historySchemaStr));
+ Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new
Schema.Parser().parse(config.getSchema()));
+ InternalSchema evolvedSchema =
AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema,
internalSchema);
+ if (evolvedSchema.equals(internalSchema)) {
+ metadata.addMetadata(SerDeHelper.LATEST_SCHEMA,
SerDeHelper.toJson(evolvedSchema));
+ //TODO save history schema by metaTable
+ schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
+ } else {
+ evolvedSchema.setSchemaId(Long.parseLong(instantTime));
+ String newSchemaStr = SerDeHelper.toJson(evolvedSchema);
+ metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, newSchemaStr);
+ schemasManager.persistHistorySchemaStr(instantTime,
SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr));
+ }
+ // update SCHEMA_KEY
Review Comment:
answer
1)I think DDL should be an independent operation and should not intersect
with the original commit
2)yes,we plan to do that, but before we start that we need flink to support
full schema evolution, Otherwise, the gap of the Flink module and other modules
will become larger and larger
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]