nsivabalan commented on code in PR #12646:
URL: https://github.com/apache/hudi/pull/12646#discussion_r1925890902


##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java:
##########
@@ -91,18 +133,252 @@ public void testRecreateSchemaWhenDropPartitionColumns() {
 
   @Test
   public void testReadSchemaFromLogFile() throws IOException, 
URISyntaxException, InterruptedException {
-    String testDir = initTestDir("read_schema_from_log_file");
-    StoragePath partitionPath = new StoragePath(testDir, "partition1");
+    initPath("read_schema_from_log_file");
+    StoragePath partitionPath = new StoragePath(basePath, "partition1");
     Schema expectedSchema = getSimpleSchema();
     StoragePath logFilePath = writeLogFile(partitionPath, expectedSchema);
     assertEquals(expectedSchema, TableSchemaResolver.readSchemaFromLogFile(new 
HoodieHadoopStorage(
         logFilePath, HoodieTestUtils.getDefaultStorageConfWithDefaults()), 
logFilePath));
   }
 
-  private String initTestDir(String folderName) throws IOException {
-    java.nio.file.Path basePath = tempDir.resolve(folderName);
-    java.nio.file.Files.createDirectories(basePath);
-    return basePath.toString();
+  @Test
+  public void testGetTableSchemaFromLatestCommitMetadataV2() throws Exception {
+    Schema originalSchema = new 
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    String commitTime1 = "001";
+    HoodieInstant instant1 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, 
commitTime1, InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
+    activeTimeline.createNewInstant(instant1);
+
+    Map<String, String> extraMetadata = new HashMap<>();
+    extraMetadata.put(HoodieCommitMetadata.SCHEMA_KEY, 
originalSchema.toString());
+
+    activeTimeline.saveAsComplete(instant1,
+        Option.of(getCommitMetadata(basePath, "partition1", commitTime1, 2, 
extraMetadata)));
+    metaClient.reloadActiveTimeline();
+
+    TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
+    Option<Schema> schemaOption = 
resolver.getTableAvroSchemaIfPresentV2(false);
+    assertTrue(schemaOption.isPresent());
+    assertEquals(originalSchema, schemaOption.get());
+  }
+
+  @Test
+  public void testGetTableCreateSchemaWithMetadata() throws IOException {
+    Schema originalSchema = new 
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+
+    // Set table create schema in table config
+    // Create commit metadata without schema information, which should be 
ignored.
+    Map<String, String> emptyMetadata = new HashMap<>();
+
+    HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+    String commitTime1 = "001";
+    HoodieInstant instant1 = new HoodieInstant(INFLIGHT, COMMIT_ACTION, 
commitTime1, InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
+    activeTimeline.createNewInstant(instant1);
+    activeTimeline.saveAsComplete(instant1, 
Option.of(getCommitMetadata(basePath, "partition1", commitTime1, 2, 
emptyMetadata)));
+    metaClient.reloadActiveTimeline();
+    metaClient.getTableConfig().setValue(HoodieTableConfig.CREATE_SCHEMA, 
originalSchema.toString());
+
+    TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
+    Option<Schema> schemaOption = 
resolver.getTableAvroSchemaIfPresentV2(false);
+    assertTrue(schemaOption.isPresent());
+    assertEquals(originalSchema, schemaOption.get());
+  }
+
+  @Test
+  public void testHandlePartitionColumnsIfNeeded() {
+    Schema originalSchema = new 
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
+
+    String[] partitionFields = new String[] {"partition_path"};
+    metaClient.getTableConfig().setValue(PARTITION_FIELDS, String.join(",", 
partitionFields));
+    
metaClient.getTableConfig().setValue(HoodieTableConfig.DROP_PARTITION_COLUMNS, 
"true");
+    metaClient.getTableConfig().setValue(HoodieTableConfig.CREATE_SCHEMA, 
originalSchema.toString());
+
+    TableSchemaResolver resolver = new TableSchemaResolver(metaClient);
+    Option<Schema> schemaOption = 
resolver.getTableAvroSchemaIfPresentV2(false);
+    assertTrue(schemaOption.isPresent());
+
+    Schema resultSchema = schemaOption.get();
+    assertTrue(resultSchema.getFields().stream()
+        .anyMatch(f -> f.name().equals("partition_path")));
+  }
+
+  static class SchemaEvolutionTestCase {
+    private final String name;
+    private final HoodieTableType tableType;
+    private final List<HoodieInstant> inputInstants;
+    private final List<HoodieInstant> expectedInstants;
+    private final Option<HoodieInstant> clusteringInstants;
+
+    public SchemaEvolutionTestCase(
+        String name,
+        HoodieTableType tableType,
+        List<HoodieInstant> inputInstants,
+        List<HoodieInstant> expectedInstants) {
+      this.name = name;
+      this.tableType = tableType;
+      this.inputInstants = inputInstants;
+      this.expectedInstants = expectedInstants;
+      this.clusteringInstants = Option.empty();
+    }
+
+    public SchemaEvolutionTestCase(
+        String name,
+        HoodieTableType tableType,
+        List<HoodieInstant> inputInstants,
+        List<HoodieInstant> expectedInstants,
+        HoodieInstant clusteringInstants
+    ) {
+      this.name = name;
+      this.tableType = tableType;
+      this.inputInstants = inputInstants;
+      this.expectedInstants = expectedInstants;
+      this.clusteringInstants = Option.of(clusteringInstants);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%s (%s)", name, tableType);
+    }
+  }
+
+  static Stream<SchemaEvolutionTestCase> schemaEvolutionTestCases() {
+    return Stream.of(
+        // Empty timeline case
+        new SchemaEvolutionTestCase(
+            "Empty Timeline",
+            HoodieTableType.COPY_ON_WRITE,
+            new ArrayList<>(),
+            new ArrayList<>()
+        ),
+
+        // Empty timeline case
+        new SchemaEvolutionTestCase(
+            "Empty Timeline",
+            HoodieTableType.MERGE_ON_READ,
+            new ArrayList<>(),
+            new ArrayList<>()
+        ),
+
+        // Mixed actions case
+        new SchemaEvolutionTestCase(
+            "Mixed Actions",
+            HoodieTableType.COPY_ON_WRITE,
+            Arrays.asList(
+                new HoodieInstant(COMPLETED, COMMIT_ACTION, "001", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+                new HoodieInstant(COMPLETED, REPLACE_COMMIT_ACTION, "002", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+                new HoodieInstant(COMPLETED, COMMIT_ACTION, "003", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+                // Table service actions that should be filtered out
+                new HoodieInstant(REQUESTED, CLEAN_ACTION, "004", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+                new HoodieInstant(REQUESTED, ROLLBACK_ACTION, "005", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+                new HoodieInstant(REQUESTED, CLUSTERING_ACTION, "006", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+                new HoodieInstant(INFLIGHT, CLEAN_ACTION, "004", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+                new HoodieInstant(INFLIGHT, ROLLBACK_ACTION, "005", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+                new HoodieInstant(INFLIGHT, CLUSTERING_ACTION, "006", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+                new HoodieInstant(COMPLETED, CLEAN_ACTION, "004", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+                new HoodieInstant(COMPLETED, ROLLBACK_ACTION, "005", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+                new HoodieInstant(COMPLETED, CLUSTERING_ACTION, "006", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+                new HoodieInstant(COMPLETED, REPLACE_COMMIT_ACTION, "010", 
InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR) // clustering commits

Review Comment:
   do we have replacecommits which are not clustering instants? if not, can we 
add them please. and ensure schema evolves w/ it and getTableSchema returns the 
right one. 
   
   
   and can we also test the case where latest instant is a clustering instant 
and it has a diff schema. but getTableSchema returns the right one and matches 
the prev right instant's schema. 
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -222,6 +244,50 @@ private Option<Schema> 
getTableSchemaFromLatestCommitMetadata(boolean includeMet
     }
   }
 
+  public Option<Schema> getTableAvroSchemaIfPresentV2(boolean 
includeMetadataFields) {

Review Comment:
   whats the necessity to add new API. can you help me understand, why can't we 
fix the existing impl only? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -222,6 +244,50 @@ private Option<Schema> 
getTableSchemaFromLatestCommitMetadata(boolean includeMet
     }
   }
 
+  public Option<Schema> getTableAvroSchemaIfPresentV2(boolean 
includeMetadataFields) {

Review Comment:
   also, we don't need "IfPresent" clause. anyways we return Option<Schema> and 
so we can just name this method 
   ```
   getTableAvroSchema
   ```



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -514,4 +580,56 @@ public static Schema appendPartitionColumns(Schema 
dataSchema, Option<String[]>
   private Supplier<Exception> schemaNotFoundError() {
     return () -> new HoodieSchemaNotFoundException("No schema found for table 
at " + metaClient.getBasePath());
   }
+
+  /**
+   * WARNING: This method should be only used as part of HUDI-8438 before the 
jira owner fully accommodate all existing use
+   * cases.
+   *
+   * Get timeline in REVERSE order that only contains completed instants which 
POTENTIALLY evolve the table schema.
+   * For types of instants that are included and not reflecting table schema 
at their instant completion time please refer
+   * comments inside the code.
+   * */
+  HoodieTimeline getSchemaEvolutionTimelineInReverseOrder() {
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Stream<HoodieInstant> timelineStream = timeline.getInstantsAsStream();
+    final Set<String> actions;
+    switch (metaClient.getTableType()) {
+      case COPY_ON_WRITE: {
+        actions = new HashSet<>(Arrays.asList(COMMIT_ACTION, 
REPLACE_COMMIT_ACTION));
+        break;
+      }
+      case MERGE_ON_READ: {
+        actions = new HashSet<>(Arrays.asList(DELTA_COMMIT_ACTION, 
REPLACE_COMMIT_ACTION));
+        break;
+      }
+      default:
+        throw new HoodieException("Unsupported table type :" + 
metaClient.getTableType());
+    }
+
+    // We only care committed instant when it comes to table schema.
+    TimelineLayout timelineLayout = metaClient.getTimelineLayout();
+    Comparator<HoodieInstant> reversedComparator = 
timelineLayout.getInstantComparator().requestedTimeOrderedComparator().reversed();
+
+    // The timeline still contains DELTA_COMMIT_ACTION/COMMIT_ACTION which 
might not contain a valid schema
+    // field in their commit metadata.
+    // Since the operations of filtering them out are expensive, we should do 
on-demand stream based
+    // filtering when we actually need the table schema.
+    Stream<HoodieInstant> reversedTimelineWithTableSchema = timelineStream
+        // Only focuses on those who could potentially evolve the table schema.
+        .filter(s -> actions.contains(s.getAction()))
+        // Further filtering out clustering operations as it does not evolve 
table schema.
+        .filter(s -> isaBoolean(s, timeline))
+        .filter(HoodieInstant::isCompleted)
+        // We reverse the order as the operation against this timeline would 
be very efficient if
+        // we always start from the tail.
+        .sorted(reversedComparator);
+    return timelineLayout.getTimelineFactory().createDefaultTimeline(
+        reversedTimelineWithTableSchema,
+        metaClient.getActiveTimeline()::getInstantDetails);
+  }
+
+  private boolean isaBoolean(HoodieInstant s, HoodieActiveTimeline timeline) {

Review Comment:
   isClusteringInstant is a good naming.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -514,4 +580,56 @@ public static Schema appendPartitionColumns(Schema 
dataSchema, Option<String[]>
   private Supplier<Exception> schemaNotFoundError() {
     return () -> new HoodieSchemaNotFoundException("No schema found for table 
at " + metaClient.getBasePath());
   }
+
+  /**
+   * WARNING: This method should be only used as part of HUDI-8438 before the 
jira owner fully accommodate all existing use
+   * cases.
+   *
+   * Get timeline in REVERSE order that only contains completed instants which 
POTENTIALLY evolve the table schema.
+   * For types of instants that are included and not reflecting table schema 
at their instant completion time please refer
+   * comments inside the code.
+   * */
+  HoodieTimeline getSchemaEvolutionTimelineInReverseOrder() {
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Stream<HoodieInstant> timelineStream = timeline.getInstantsAsStream();
+    final Set<String> actions;
+    switch (metaClient.getTableType()) {
+      case COPY_ON_WRITE: {
+        actions = new HashSet<>(Arrays.asList(COMMIT_ACTION, 
REPLACE_COMMIT_ACTION));
+        break;
+      }
+      case MERGE_ON_READ: {
+        actions = new HashSet<>(Arrays.asList(DELTA_COMMIT_ACTION, 
REPLACE_COMMIT_ACTION));
+        break;
+      }
+      default:
+        throw new HoodieException("Unsupported table type :" + 
metaClient.getTableType());
+    }
+
+    // We only care committed instant when it comes to table schema.
+    TimelineLayout timelineLayout = metaClient.getTimelineLayout();
+    Comparator<HoodieInstant> reversedComparator = 
timelineLayout.getInstantComparator().requestedTimeOrderedComparator().reversed();
+
+    // The timeline still contains DELTA_COMMIT_ACTION/COMMIT_ACTION which 
might not contain a valid schema
+    // field in their commit metadata.
+    // Since the operations of filtering them out are expensive, we should do 
on-demand stream based
+    // filtering when we actually need the table schema.
+    Stream<HoodieInstant> reversedTimelineWithTableSchema = timelineStream
+        // Only focuses on those who could potentially evolve the table schema.
+        .filter(s -> actions.contains(s.getAction()))
+        // Further filtering out clustering operations as it does not evolve 
table schema.
+        .filter(s -> isaBoolean(s, timeline))
+        .filter(HoodieInstant::isCompleted)
+        // We reverse the order as the operation against this timeline would 
be very efficient if
+        // we always start from the tail.
+        .sorted(reversedComparator);
+    return timelineLayout.getTimelineFactory().createDefaultTimeline(
+        reversedTimelineWithTableSchema,
+        metaClient.getActiveTimeline()::getInstantDetails);
+  }
+
+  private boolean isaBoolean(HoodieInstant s, HoodieActiveTimeline timeline) {
+    return !s.getAction().equals(REPLACE_COMMIT_ACTION)
+      || !ClusteringUtils.isClusteringInstant(timeline, s, 
metaClient.getInstantGenerator());

Review Comment:
   in case of 1.0:
   we can identify clustering instant just by looking up the corresponding 
requested file. action will be "CLUSTERING". 
   in case of 0.x: 
   we have to deser to find if there is a plan. 
   
   we have two diff Timeline Implementations. So, lets ensure our deduction 
works for both 0.x i.e tbl version6 and 1.x i.e. tbl version 8. 
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -514,4 +580,56 @@ public static Schema appendPartitionColumns(Schema 
dataSchema, Option<String[]>
   private Supplier<Exception> schemaNotFoundError() {
     return () -> new HoodieSchemaNotFoundException("No schema found for table 
at " + metaClient.getBasePath());
   }
+
+  /**
+   * WARNING: This method should be only used as part of HUDI-8438 before the 
jira owner fully accommodate all existing use
+   * cases.
+   *
+   * Get timeline in REVERSE order that only contains completed instants which 
POTENTIALLY evolve the table schema.
+   * For types of instants that are included and not reflecting table schema 
at their instant completion time please refer
+   * comments inside the code.
+   * */
+  HoodieTimeline getSchemaEvolutionTimelineInReverseOrder() {
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    Stream<HoodieInstant> timelineStream = timeline.getInstantsAsStream();
+    final Set<String> actions;
+    switch (metaClient.getTableType()) {
+      case COPY_ON_WRITE: {
+        actions = new HashSet<>(Arrays.asList(COMMIT_ACTION, 
REPLACE_COMMIT_ACTION));
+        break;
+      }
+      case MERGE_ON_READ: {
+        actions = new HashSet<>(Arrays.asList(DELTA_COMMIT_ACTION, 
REPLACE_COMMIT_ACTION));
+        break;
+      }
+      default:
+        throw new HoodieException("Unsupported table type :" + 
metaClient.getTableType());
+    }
+
+    // We only care committed instant when it comes to table schema.
+    TimelineLayout timelineLayout = metaClient.getTimelineLayout();
+    Comparator<HoodieInstant> reversedComparator = 
timelineLayout.getInstantComparator().requestedTimeOrderedComparator().reversed();
+
+    // The timeline still contains DELTA_COMMIT_ACTION/COMMIT_ACTION which 
might not contain a valid schema
+    // field in their commit metadata.
+    // Since the operations of filtering them out are expensive, we should do 
on-demand stream based
+    // filtering when we actually need the table schema.
+    Stream<HoodieInstant> reversedTimelineWithTableSchema = timelineStream
+        // Only focuses on those who could potentially evolve the table schema.
+        .filter(s -> actions.contains(s.getAction()))
+        // Further filtering out clustering operations as it does not evolve 
table schema.
+        .filter(s -> isaBoolean(s, timeline))
+        .filter(HoodieInstant::isCompleted)
+        // We reverse the order as the operation against this timeline would 
be very efficient if
+        // we always start from the tail.
+        .sorted(reversedComparator);
+    return timelineLayout.getTimelineFactory().createDefaultTimeline(
+        reversedTimelineWithTableSchema,
+        metaClient.getActiveTimeline()::getInstantDetails);
+  }
+
+  private boolean isaBoolean(HoodieInstant s, HoodieActiveTimeline timeline) {

Review Comment:
   also, lets avoid naming w/ single letter. 
   
   ``` 
   instant or hoodieInstant for HoodieInstant
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to