xiarixiaoyao commented on code in PR #4910:
URL: https://github.com/apache/hudi/pull/4910#discussion_r849186256


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -246,12 +260,42 @@ protected void commit(HoodieTable table, String 
commitActionType, String instant
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     // Finalize write
     finalizeWrite(table, instantTime, stats);
+    // do save internal schema to support Implicitly add columns in write 
process
+    if (!metadata.getExtraMetadata().containsKey(SerDeHelper.LATEST_SCHEMA)
+        && metadata.getExtraMetadata().containsKey(SCHEMA_KEY) && 
table.getConfig().getSchemaEvolutionEnable()) {
+      saveInternalSchema(table, instantTime, metadata);
+    }
     // update Metadata table
     writeTableMetadata(table, instantTime, commitActionType, metadata);
     activeTimeline.saveAsComplete(new HoodieInstant(true, commitActionType, 
instantTime),
         Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
   }
 
+  // Save internal schema
+  private void saveInternalSchema(HoodieTable table, String instantTime, 
HoodieCommitMetadata metadata) {
+    TableSchemaResolver schemaUtil = new 
TableSchemaResolver(table.getMetaClient());
+    String historySchemaStr = 
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElse("");
+    FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(table.getMetaClient());
+    if (!historySchemaStr.isEmpty()) {
+      InternalSchema internalSchema = 
InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
+          SerDeHelper.parseSchemas(historySchemaStr));
+      Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(new 
Schema.Parser().parse(config.getSchema()));
+      InternalSchema evolvedSchema = 
AvroSchemaEvolutionUtils.evolveSchemaFromNewAvroSchema(avroSchema, 
internalSchema);
+      if (evolvedSchema.equals(internalSchema)) {
+        metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, 
SerDeHelper.toJson(evolvedSchema));
+        //TODO save history schema by metaTable
+        schemasManager.persistHistorySchemaStr(instantTime, historySchemaStr);
+      } else {
+        evolvedSchema.setSchemaId(Long.parseLong(instantTime));
+        String newSchemaStr = SerDeHelper.toJson(evolvedSchema);
+        metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, newSchemaStr);
+        schemasManager.persistHistorySchemaStr(instantTime, 
SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr));
+      }
+      // update SCHEMA_KEY

Review Comment:
   answer
   1)I think DDL should be an independent operation and should not intersect 
with the original commit
   2)yes,we plan to do that, but before we start that we need flink to support 
full schema evolution, Otherwise, the gap of the Flink module and other modules 
will become larger and larger



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to