This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bb5abb6b0483 fix: Optimizing internal schema lookup in 
TableSchemaResolver (#18387)
bb5abb6b0483 is described below

commit bb5abb6b0483c043f38f7f32b73485f025f95b1d
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Mar 27 07:55:22 2026 -0700

    fix: Optimizing internal schema lookup in TableSchemaResolver (#18387)
---
 .../hudi/common/table/TableSchemaResolver.java     |   9 +-
 .../hudi/common/table/TestTableSchemaResolver.java | 125 +++++++++++++++++++++
 2 files changed, 131 insertions(+), 3 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 29a20e735c13..095a4ffef717 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -310,11 +310,14 @@ public class TableSchemaResolver {
    */
   public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
     HoodieTimeline completedInstants = 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
-    HoodieTimeline timeline = completedInstants
+    // Walk backwards through timeline to find the first (most recent) instant 
that can update schema
+    // This avoids reading commit metadata for all instants
+    return 
Option.fromJavaOptional(completedInstants.getReverseOrderedInstants()
         .filter(instant -> { // consider only instants that can update/change 
schema.
           return 
WriteOperationType.canUpdateSchema(getCachedCommitMetadata(instant).getOperationType());
-        });
-    return 
timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata);
+        })
+        .findFirst())
+        .flatMap(this::getTableInternalSchemaFromCommitMetadata);
   }
 
   /**
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 4a26f8715e96..193bb0173dfd 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -206,4 +206,129 @@ class TestTableSchemaResolver {
     writer.close();
     return writer.getLogFile().getPath();
   }
+
+  @Test
+  void 
testGetTableInternalSchemaFromCommitMetadataFindsLatestSchemaUpdateInstant() 
throws IOException {
+    // Given: A timeline with multiple instants where some can update schema 
and some cannot
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
+    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+
+    HoodieInstant clusterInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "001", 
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+    HoodieInstant insertInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"002", InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+    HoodieInstant compactInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"003", InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+
+    HoodieCommitMetadata clusterMetadata = new HoodieCommitMetadata();
+    
clusterMetadata.setOperationType(org.apache.hudi.common.model.WriteOperationType.CLUSTER);
+
+    HoodieCommitMetadata insertMetadata = new HoodieCommitMetadata();
+    
insertMetadata.setOperationType(org.apache.hudi.common.model.WriteOperationType.INSERT);
+    // Create a valid InternalSchema
+    org.apache.hudi.internal.schema.InternalSchema internalSchema = new 
org.apache.hudi.internal.schema.InternalSchema(
+        org.apache.hudi.internal.schema.Types.RecordType.get(
+            org.apache.hudi.internal.schema.Types.Field.get(0, false, "id", 
org.apache.hudi.internal.schema.Types.IntType.get())));
+    
insertMetadata.addMetadata(org.apache.hudi.internal.schema.utils.SerDeHelper.LATEST_SCHEMA,
+        
org.apache.hudi.internal.schema.utils.SerDeHelper.toJson(internalSchema));
+
+    HoodieCommitMetadata compactMetadata = new HoodieCommitMetadata();
+    
compactMetadata.setOperationType(org.apache.hudi.common.model.WriteOperationType.COMPACT);
+
+    HoodieTimeline timeline = mock(HoodieTimeline.class);
+    
when(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()).thenReturn(timeline);
+    // Timeline in reverse order: 003 (compact), 002 (insert), 001 (cluster)
+    
when(timeline.getReverseOrderedInstants()).thenReturn(Stream.of(compactInstant, 
insertInstant, clusterInstant));
+    
when(timeline.readCommitMetadata(compactInstant)).thenReturn(compactMetadata);
+    
when(timeline.readCommitMetadata(insertInstant)).thenReturn(insertMetadata);
+    
when(timeline.readCommitMetadata(clusterInstant)).thenReturn(clusterMetadata);
+
+    // When: Get internal schema from commit metadata
+    Option<org.apache.hudi.internal.schema.InternalSchema> result = 
schemaResolver.getTableInternalSchemaFromCommitMetadata();
+
+    // Then: Should find the insert instant (002) which is the most recent 
schema-updating operation
+    assertTrue(result.isPresent());
+  }
+
+  @Test
+  void 
testGetTableInternalSchemaFromCommitMetadataSkipsNonSchemaUpdatingOperations() 
throws IOException {
+    // Given: A timeline with only non-schema-updating operations (CLUSTER, 
COMPACT, INDEX, LOG_COMPACT)
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
+    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+
+    HoodieInstant clusterInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, "001", 
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+    HoodieInstant compactInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"002", InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+
+    HoodieCommitMetadata clusterMetadata = new HoodieCommitMetadata();
+    
clusterMetadata.setOperationType(org.apache.hudi.common.model.WriteOperationType.CLUSTER);
+
+    HoodieCommitMetadata compactMetadata = new HoodieCommitMetadata();
+    
compactMetadata.setOperationType(org.apache.hudi.common.model.WriteOperationType.COMPACT);
+
+    HoodieTimeline timeline = mock(HoodieTimeline.class);
+    
when(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()).thenReturn(timeline);
+    
when(timeline.getReverseOrderedInstants()).thenReturn(Stream.of(compactInstant, 
clusterInstant));
+    
when(timeline.readCommitMetadata(compactInstant)).thenReturn(compactMetadata);
+    
when(timeline.readCommitMetadata(clusterInstant)).thenReturn(clusterMetadata);
+
+    // When: Get internal schema from commit metadata
+    Option<org.apache.hudi.internal.schema.InternalSchema> result = 
schemaResolver.getTableInternalSchemaFromCommitMetadata();
+
+    // Then: Should return empty since no schema-updating operations exist
+    assertTrue(result.isEmpty());
+  }
+
+  @Test
+  void testGetTableInternalSchemaFromCommitMetadataHandlesEmptyTimeline() {
+    // Given: An empty timeline
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
+    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+
+    HoodieTimeline timeline = mock(HoodieTimeline.class);
+    
when(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()).thenReturn(timeline);
+    when(timeline.getReverseOrderedInstants()).thenReturn(Stream.empty());
+
+    // When: Get internal schema from commit metadata
+    Option<org.apache.hudi.internal.schema.InternalSchema> result = 
schemaResolver.getTableInternalSchemaFromCommitMetadata();
+
+    // Then: Should return empty for empty timeline
+    assertTrue(result.isEmpty());
+  }
+
+  @Test
+  void testGetTableInternalSchemaFromCommitMetadataStopsAtFirstMatch() throws 
IOException {
+    // Given: A timeline with multiple schema-updating operations
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
+    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+
+    HoodieInstant insertInstant1 = new 
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"003", InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+    HoodieInstant insertInstant2 = new 
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
"002", InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+    // This instant should never be read due to short-circuit behavior
+    HoodieInstant insertInstant3 = mock(HoodieInstant.class);
+
+    HoodieCommitMetadata insertMetadata1 = new HoodieCommitMetadata();
+    
insertMetadata1.setOperationType(org.apache.hudi.common.model.WriteOperationType.INSERT);
+    // Create a valid InternalSchema
+    org.apache.hudi.internal.schema.InternalSchema internalSchema = new 
org.apache.hudi.internal.schema.InternalSchema(
+        org.apache.hudi.internal.schema.Types.RecordType.get(
+            org.apache.hudi.internal.schema.Types.Field.get(0, false, "id", 
org.apache.hudi.internal.schema.Types.IntType.get())));
+    
insertMetadata1.addMetadata(org.apache.hudi.internal.schema.utils.SerDeHelper.LATEST_SCHEMA,
+        
org.apache.hudi.internal.schema.utils.SerDeHelper.toJson(internalSchema));
+
+    HoodieCommitMetadata insertMetadata2 = new HoodieCommitMetadata();
+    
insertMetadata2.setOperationType(org.apache.hudi.common.model.WriteOperationType.INSERT);
+
+    HoodieTimeline timeline = mock(HoodieTimeline.class);
+    
when(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()).thenReturn(timeline);
+    // Timeline in reverse order: 003, 002, 001
+    
when(timeline.getReverseOrderedInstants()).thenReturn(Stream.of(insertInstant1, 
insertInstant2, insertInstant3));
+    
when(timeline.readCommitMetadata(insertInstant1)).thenReturn(insertMetadata1);
+    
when(timeline.readCommitMetadata(insertInstant2)).thenReturn(insertMetadata2);
+    // Should not call readCommitMetadata for insertInstant3 due to 
findFirst() short-circuit
+
+    // When: Get internal schema from commit metadata
+    Option<org.apache.hudi.internal.schema.InternalSchema> result = 
schemaResolver.getTableInternalSchemaFromCommitMetadata();
+
+    // Then: Should find the first (most recent) schema-updating operation and 
stop
+    assertTrue(result.isPresent());
+    // Verify that insertInstant3 was never interacted with (proving 
short-circuit behavior)
+    verifyNoInteractions(insertInstant3);
+  }
 }

Reply via email to