yihua commented on code in PR #11978:
URL: https://github.com/apache/hudi/pull/11978#discussion_r1797239113


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java:
##########
@@ -96,11 +97,122 @@ public static Option<HoodieCommitMetadata> 
resolveWriteConflictIfAny(
       });
       LOG.info("Successfully resolved conflicts, if any");
 
+      // Resolve schema.
+      Schema schemaOfCommitMetadata = resolveConcurrentSchemaEvolution(
+          table, config, lastCompletedTxnOwnerInstant, new 
TableSchemaResolver(table.getMetaClient()));
+      if (thisCommitMetadata.isPresent()) {
+        thisCommitMetadata.get().addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
schemaOfCommitMetadata.toString());
+      }
+      thisOperation.getCommitMetadataOption().get().addMetadata(
+          HoodieCommitMetadata.SCHEMA_KEY, schemaOfCommitMetadata.toString());
       return thisOperation.getCommitMetadataOption();
     }
     return thisCommitMetadata;
   }
 
+  /**
+   * Resolve concurrent schema evolution. If it is resolvable, return the 
schema to be set in the commit metadata.
+   * Otherwise, throw a {@link HoodieWriteConflictException}.
+   *
+   * @param table The Hoodie table.
+   * @param config The Hoodie write configuration.
+   * @param lastCompletedTxnOwnerInstant The last completed transaction owner 
instant.
+   * @param schemaResolver The table schema resolver.
+   * @throws HoodieWriteConflictException If there is a concurrent schema 
evolution.
+   */
+  static Schema resolveConcurrentSchemaEvolution(
+      HoodieTable table,
+      HoodieWriteConfig config,
+      Option<HoodieInstant> lastCompletedTxnOwnerInstant,
+      TableSchemaResolver schemaResolver) {
+    HoodieInstant lastCompletedInstantsAtTxnStart = 
lastCompletedTxnOwnerInstant.isPresent() ? lastCompletedTxnOwnerInstant.get() : 
null;
+    HoodieInstant lastCompletedInstantsAtTxnValidation = 
table.getMetaClient().reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants()
+        .lastInstant()
+        .orElse(null);
+    // Populate configs regardless of what's the case we are trying to handle.
+    Schema schemaOfTxn = new Schema.Parser().parse(config.getWriteSchema());
+    Schema schemaAtTxnStart = null;
+    Schema schemaAtTxnValidation = null;
+    try {
+      if (lastCompletedInstantsAtTxnStart != null) {

Review Comment:
   Let's update the logic according to the RFC to make it easy to follow.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java:
##########
@@ -96,11 +97,122 @@ public static Option<HoodieCommitMetadata> 
resolveWriteConflictIfAny(
       });
       LOG.info("Successfully resolved conflicts, if any");
 
+      // Resolve schema.

Review Comment:
   This comment does not seem to be useful.
   ```suggestion
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java:
##########
@@ -96,11 +97,122 @@ public static Option<HoodieCommitMetadata> 
resolveWriteConflictIfAny(
       });
       LOG.info("Successfully resolved conflicts, if any");
 
+      // Resolve schema.
+      Schema schemaOfCommitMetadata = resolveConcurrentSchemaEvolution(
+          table, config, lastCompletedTxnOwnerInstant, new 
TableSchemaResolver(table.getMetaClient()));
+      if (thisCommitMetadata.isPresent()) {
+        thisCommitMetadata.get().addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
schemaOfCommitMetadata.toString());
+      }
+      thisOperation.getCommitMetadataOption().get().addMetadata(
+          HoodieCommitMetadata.SCHEMA_KEY, schemaOfCommitMetadata.toString());
       return thisOperation.getCommitMetadataOption();
     }
     return thisCommitMetadata;
   }
 
+  /**
+   * Resolve concurrent schema evolution. If it is resolvable, return the 
schema to be set in the commit metadata.
+   * Otherwise, throw a {@link HoodieWriteConflictException}.
+   *
+   * @param table The Hoodie table.
+   * @param config The Hoodie write configuration.
+   * @param lastCompletedTxnOwnerInstant The last completed transaction owner 
instant.
+   * @param schemaResolver The table schema resolver.
+   * @throws HoodieWriteConflictException If there is a concurrent schema 
evolution.
+   */
+  static Schema resolveConcurrentSchemaEvolution(
+      HoodieTable table,
+      HoodieWriteConfig config,
+      Option<HoodieInstant> lastCompletedTxnOwnerInstant,
+      TableSchemaResolver schemaResolver) {
+    HoodieInstant lastCompletedInstantsAtTxnStart = 
lastCompletedTxnOwnerInstant.isPresent() ? lastCompletedTxnOwnerInstant.get() : 
null;
+    HoodieInstant lastCompletedInstantsAtTxnValidation = 
table.getMetaClient().reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants()

Review Comment:
   Could we reuse `ConcurrentOperation` instances for both schema and data 
conflict detection?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java:
##########
@@ -96,11 +97,122 @@ public static Option<HoodieCommitMetadata> 
resolveWriteConflictIfAny(
       });
       LOG.info("Successfully resolved conflicts, if any");
 
+      // Resolve schema.
+      Schema schemaOfCommitMetadata = resolveConcurrentSchemaEvolution(
+          table, config, lastCompletedTxnOwnerInstant, new 
TableSchemaResolver(table.getMetaClient()));
+      if (thisCommitMetadata.isPresent()) {
+        thisCommitMetadata.get().addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
schemaOfCommitMetadata.toString());
+      }
+      thisOperation.getCommitMetadataOption().get().addMetadata(
+          HoodieCommitMetadata.SCHEMA_KEY, schemaOfCommitMetadata.toString());

Review Comment:
   Let's move this before the file group conflict resolution.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java:
##########
@@ -96,11 +97,122 @@ public static Option<HoodieCommitMetadata> 
resolveWriteConflictIfAny(
       });
       LOG.info("Successfully resolved conflicts, if any");
 
+      // Resolve schema.
+      Schema schemaOfCommitMetadata = resolveConcurrentSchemaEvolution(
+          table, config, lastCompletedTxnOwnerInstant, new 
TableSchemaResolver(table.getMetaClient()));
+      if (thisCommitMetadata.isPresent()) {
+        thisCommitMetadata.get().addMetadata(HoodieCommitMetadata.SCHEMA_KEY, 
schemaOfCommitMetadata.toString());
+      }
+      thisOperation.getCommitMetadataOption().get().addMetadata(
+          HoodieCommitMetadata.SCHEMA_KEY, schemaOfCommitMetadata.toString());
       return thisOperation.getCommitMetadataOption();
     }
     return thisCommitMetadata;
   }
 
+  /**
+   * Resolve concurrent schema evolution. If it is resolvable, return the 
schema to be set in the commit metadata.
+   * Otherwise, throw a {@link HoodieWriteConflictException}.
+   *
+   * @param table The Hoodie table.
+   * @param config The Hoodie write configuration.
+   * @param lastCompletedTxnOwnerInstant The last completed transaction owner 
instant.
+   * @param schemaResolver The table schema resolver.
+   * @throws HoodieWriteConflictException If there is a concurrent schema 
evolution.
+   */
+  static Schema resolveConcurrentSchemaEvolution(
+      HoodieTable table,
+      HoodieWriteConfig config,
+      Option<HoodieInstant> lastCompletedTxnOwnerInstant,
+      TableSchemaResolver schemaResolver) {
+    HoodieInstant lastCompletedInstantsAtTxnStart = 
lastCompletedTxnOwnerInstant.isPresent() ? lastCompletedTxnOwnerInstant.get() : 
null;
+    HoodieInstant lastCompletedInstantsAtTxnValidation = 
table.getMetaClient().reloadActiveTimeline().getCommitsTimeline().filterCompletedInstants()

Review Comment:
   The timeline is already reloaded so there is no need to do it again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to