Davis-Zhang-Onehouse commented on code in PR #12646:
URL: https://github.com/apache/hudi/pull/12646#discussion_r1928081544


##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -514,4 +580,56 @@ public static Schema appendPartitionColumns(Schema 
dataSchema, Option<String[]>
   private Supplier<Exception> schemaNotFoundError() {
     return () -> new HoodieSchemaNotFoundException("No schema found for table 
at " + metaClient.getBasePath());
   }
+
+  /**
+   * WARNING: This method should be only used as part of HUDI-8438 before the 
jira owner fully accommodate all existing use
+   * cases.
+   *
+   * Get timeline in REVERSE order that only contains completed instants which 
POTENTIALLY evolve the table schema.
+   * For types of instants that are included and not reflecting table schema 
at their instant completion time please refer
+   * comments inside the code.
+   * */
+  HoodieTimeline getSchemaEvolutionTimelineInReverseOrder() {
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Stream<HoodieInstant> timelineStream = timeline.getInstantsAsStream();
+    final Set<String> actions;
+    switch (metaClient.getTableType()) {
+      case COPY_ON_WRITE: {
+        actions = new HashSet<>(Arrays.asList(COMMIT_ACTION, 
REPLACE_COMMIT_ACTION));
+        break;
+      }
+      case MERGE_ON_READ: {
+        actions = new HashSet<>(Arrays.asList(DELTA_COMMIT_ACTION, 
REPLACE_COMMIT_ACTION));
+        break;
+      }
+      default:
+        throw new HoodieException("Unsupported table type :" + 
metaClient.getTableType());
+    }
+
+    // We only care committed instant when it comes to table schema.
+    TimelineLayout timelineLayout = metaClient.getTimelineLayout();
+    Comparator<HoodieInstant> reversedComparator = 
timelineLayout.getInstantComparator().requestedTimeOrderedComparator().reversed();
+
+    // The timeline still contains DELTA_COMMIT_ACTION/COMMIT_ACTION which 
might not contain a valid schema
+    // field in their commit metadata.
+    // Since the operations of filtering them out are expensive, we should do 
on-demand stream based
+    // filtering when we actually need the table schema.
+    Stream<HoodieInstant> reversedTimelineWithTableSchema = timelineStream
+        // Only focuses on those who could potentially evolve the table schema.
+        .filter(s -> actions.contains(s.getAction()))
+        // Further filtering out clustering operations as it does not evolve 
table schema.
+        .filter(s -> isaBoolean(s, timeline))
+        .filter(HoodieInstant::isCompleted)
+        // We reverse the order as the operation against this timeline would 
be very efficient if
+        // we always start from the tail.
+        .sorted(reversedComparator);
+    return timelineLayout.getTimelineFactory().createDefaultTimeline(
+        reversedTimelineWithTableSchema,
+        metaClient.getActiveTimeline()::getInstantDetails);
+  }
+
+  private boolean isaBoolean(HoodieInstant s, HoodieActiveTimeline timeline) {
+    return !s.getAction().equals(REPLACE_COMMIT_ACTION)
+      || !ClusteringUtils.isClusteringInstant(timeline, s, 
metaClient.getInstantGenerator());

Review Comment:
   as discussed that's extra work, I will follow up as a separate ticket. When 
I port concurrent schema evolution detection change later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to