xiarixiaoyao commented on code in PR #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r849243770


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -246,12 +260,42 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     // Finalize write
     finalizeWrite(table, instantTime, stats);
+    // do save internal schema to support Implicitly add columns in write 
process
+    if (!metadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+        && metadata.getExtraMetadata().containsKey(SCHEMA_KEY) && 
table.getConfig().getSchemaEvolutionEnable()) {
+      saveInternalSchema(table, instantTime, metadata);
+    }
     // update Metadata table
     writeTableMetadata(table, instantTime, commitActionType, metadata);
     activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, 
instantTime),
         Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
   }
 
+  // Save internal schema
+  private void saveInternalSchema(HoodieTable table, String instantTime, 
HoodieCommitMetadata metadata) {
+    TableSchemaResolver schemaUtil = new 
TableSchemaResolver(table.getMetaClient());
+    String historySchemaStr = 
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
+    FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(table.getMetaClient());
+    if (!historySchemaStr.isEmpty()) {
+      InternalSchema internalSchema = 
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
+          SerDeHelper.parseSchemas(historySchemaStr));
+      Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new 
Schema.Parser().parse(config.getSchema()));
+      InternalSchema evolvedSchema = 
AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema, 
internalSchema);
+      if (evolvedSchema.equals(internalSchema)) {
+        metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, 
SerDeHelper.toJson(evolvedSchema));
+        //TODO save history schema by metaTable
+        schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
+      } else {
+        evolvedSchema.setSchemaId(Long.parseLong(instantTime));
+        String newSchemaStr = SerDeHelper.toJson(evolvedSchema);
+        metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, newSchemaStr);
+        schemasManager.persistHistorySchemaStr(instantTime, 
SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr));
+      }
+      // update SCHEMA_KEY

Review Comment:
   
   Hi danney  Do you want to ask line 292 why we use a new timeline to save 
history schema?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to