This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eb8e829b876d [HUDI-9612] Update schema resolver to inspect files with 
inserts/updates (#13586)
eb8e829b876d is described below

commit eb8e829b876d1e670d5651bd6f258ddced6b6df1
Author: Tim Brown <[email protected]>
AuthorDate: Wed Jul 23 22:48:25 2025 -0400

    [HUDI-9612] Update schema resolver to inspect files with inserts/updates 
(#13586)
    
    * Update schema resolver to inspect files with inserts/updates
    * move inspection internal to TableSchemaResolver, optimistically inspect 
if previously loaded commit metadata has insert/update to avoid extra search
---
 .../hudi/common/table/TableSchemaResolver.java     | 122 +++++++++++++--------
 .../hudi/common/table/TestTableSchemaResolver.java |  83 ++++++++++++++
 .../hudi/hive/testutils/HiveTestCluster.java       |   1 +
 .../apache/hudi/hive/testutils/HiveTestUtil.java   |   3 +
 4 files changed, 161 insertions(+), 48 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index c9458c948d82..a1385572091d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -58,10 +58,12 @@ import javax.annotation.concurrent.ThreadSafe;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
+import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
+import java.util.stream.Stream;
 
 import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
 import static org.apache.hudi.avro.AvroSchemaUtils.containsFieldInSchema;
@@ -99,7 +101,7 @@ public class TableSchemaResolver {
   private final Lazy<ConcurrentHashMap<HoodieInstant, HoodieCommitMetadata>> 
commitMetadataCache;
 
   private volatile HoodieInstant latestCommitWithValidSchema = null;
-  private volatile HoodieInstant latestCommitWithValidData = null;
+  private volatile HoodieInstant latestCommitWithInsertOrUpdate = null;
 
   public TableSchemaResolver(HoodieTableMetaClient metaClient) {
     this.metaClient = metaClient;
@@ -247,29 +249,28 @@ public class TableSchemaResolver {
    * Fetches the schema for a table from any the table's data files
    */
   private Option<Schema> getTableParquetSchemaFromDataFile() {
-    Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata 
= getLatestCommitMetadataWithValidData();
-    try {
-      switch (metaClient.getTableType()) {
-        case COPY_ON_WRITE:
-        case MERGE_ON_READ:
-          // For COW table, data could be written in either Parquet or Orc 
format currently;
-          // For MOR table, data could be written in either Parquet, Orc, 
Hfile or Delta-log format currently;
-          //
-          // Determine the file format based on the file name, and then 
extract schema from it.
-          if (instantAndCommitMetadata.isPresent()) {
-            HoodieCommitMetadata commitMetadata = 
instantAndCommitMetadata.get().getRight();
-            Iterator<String> filePaths = 
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
-            return Option.of(fetchSchemaFromFiles(filePaths));
-          } else {
-            LOG.warn("Could not find any data file written for commit, so 
could not get schema for table {}", metaClient.getBasePath());
-            return Option.empty();
-          }
-        default:
-          LOG.error("Unknown table type {}", metaClient.getTableType());
-          throw new InvalidTableException(metaClient.getBasePath().toString());
-      }
-    } catch (IOException e) {
-      throw new HoodieException("Failed to read data schema", e);
+    Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata 
= getLatestCommitMetadataWithInsertOrUpdate();
+    switch (metaClient.getTableType()) {
+      case COPY_ON_WRITE:
+      case MERGE_ON_READ:
+        // For COW table, data could be written in either Parquet or Orc 
format currently;
+        // For MOR table, data could be written in either Parquet, Orc, Hfile 
or Delta-log format currently;
+        //
+        // Determine the file format based on the file name, and then extract 
schema from it.
+        if (instantAndCommitMetadata.isPresent()) {
+          HoodieCommitMetadata commitMetadata = 
instantAndCommitMetadata.get().getRight();
+          // inspect non-empty files for schema
+          Stream<StoragePath> filePaths = 
commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream)
+              .filter(writeStat -> writeStat.getNumInserts() > 0 || 
writeStat.getNumUpdateWrites() > 0)
+              .map(writeStat -> new StoragePath(metaClient.getBasePath(), 
writeStat.getPath()));
+          return Option.of(fetchSchemaFromFiles(filePaths));
+        } else {
+          LOG.warn("Could not find any data file written for commit, so could 
not get schema for table {}", metaClient.getBasePath());
+          return Option.empty();
+        }
+      default:
+        LOG.error("Unknown table type {}", metaClient.getTableType());
+        throw new InvalidTableException(metaClient.getBasePath().toString());
     }
   }
 
@@ -427,26 +428,50 @@ public class TableSchemaResolver {
         .map(instant -> Pair.of(instant, 
commitMetadataCache.get().get(instant)));
   }
 
-  private Option<Pair<HoodieInstant, HoodieCommitMetadata>> 
getLatestCommitMetadataWithValidData() {
-    if (latestCommitWithValidData == null) {
-      Option<Pair<HoodieInstant, HoodieCommitMetadata>> 
instantAndCommitMetadata =
-          metaClient.getActiveTimeline().getLastCommitMetadataWithValidData();
-      if (instantAndCommitMetadata.isPresent()) {
-        HoodieInstant instant = instantAndCommitMetadata.get().getLeft();
-        HoodieCommitMetadata metadata = 
instantAndCommitMetadata.get().getRight();
+  private Option<Pair<HoodieInstant, HoodieCommitMetadata>> 
getLatestCommitMetadataWithInsertOrUpdate() {
+    if (latestCommitWithValidSchema != null && 
commitMetadataCache.get().containsKey(latestCommitWithValidSchema)) {
+      HoodieCommitMetadata commitMetadata = 
commitMetadataCache.get().get(latestCommitWithValidSchema);
+      if (commitHasInsertOrUpdate(commitMetadata)) {
+        latestCommitWithInsertOrUpdate = latestCommitWithValidSchema;
+      }
+    }
+    if (latestCommitWithInsertOrUpdate == null) {
+      getLatestCommitWithInsertOrUpdate().ifPresent(instantAndCommitMetadata 
-> {
+        HoodieInstant instant = instantAndCommitMetadata.getLeft();
+        HoodieCommitMetadata metadata = instantAndCommitMetadata.getRight();
         synchronized (this) {
-          if (latestCommitWithValidData == null) {
-            latestCommitWithValidData = instant;
+          if (latestCommitWithInsertOrUpdate == null) {
+            latestCommitWithInsertOrUpdate = instant;
           }
           commitMetadataCache.get().putIfAbsent(instant, metadata);
         }
-      }
+      });
     }
 
-    return Option.ofNullable(latestCommitWithValidData)
+    return Option.ofNullable(latestCommitWithInsertOrUpdate)
         .map(instant -> Pair.of(instant, 
commitMetadataCache.get().get(instant)));
   }
 
+  private Option<Pair<HoodieInstant, HoodieCommitMetadata>> 
getLatestCommitWithInsertOrUpdate() {
+    HoodieTimeline commitsTimeline = 
metaClient.getCommitsTimeline().filterCompletedInstants();
+    return Option.fromJavaOptional(commitsTimeline.getReverseOrderedInstants()
+        .map(instant -> {
+          try {
+            HoodieCommitMetadata commitMetadata = 
commitsTimeline.readCommitMetadata(instant);
+            return Pair.of(instant, commitMetadata);
+          } catch (IOException e) {
+            throw new HoodieIOException(String.format("Failed to fetch 
HoodieCommitMetadata for instant (%s)", instant), e);
+          }
+        })
+        .filter(pair -> commitHasInsertOrUpdate(pair.getRight()))
+        .findFirst());
+  }
+
+  private boolean commitHasInsertOrUpdate(HoodieCommitMetadata commitMetadata) 
{
+    return 
commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream)
+        .anyMatch(writeStat -> writeStat.getNumInserts() > 0 || 
writeStat.getNumUpdateWrites() > 0);
+  }
+
   private HoodieCommitMetadata getCachedCommitMetadata(HoodieInstant instant) {
     return commitMetadataCache.get()
         .computeIfAbsent(instant, (missingInstant) -> {
@@ -459,19 +484,20 @@ public class TableSchemaResolver {
         });
   }
 
-  private Schema fetchSchemaFromFiles(Iterator<String> filePaths) throws 
IOException {
-    Schema schema = null;
-    while (filePaths.hasNext() && schema == null) {
-      StoragePath filePath = new StoragePath(filePaths.next());
-      if (FSUtils.isLogFile(filePath)) {
-        // this is a log file
-        schema = readSchemaFromLogFile(filePath);
-      } else {
-        schema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
-            
.getFileFormatUtils(filePath).readAvroSchema(metaClient.getStorage(), filePath);
+  private Schema fetchSchemaFromFiles(Stream<StoragePath> filePaths) {
+    return filePaths.map(filePath -> {
+      try {
+        if (FSUtils.isLogFile(filePath)) {
+          // this is a log file
+          return readSchemaFromLogFile(filePath);
+        } else {
+          return HoodieIOFactory.getIOFactory(metaClient.getStorage())
+              
.getFileFormatUtils(filePath).readAvroSchema(metaClient.getStorage(), filePath);
+        }
+      } catch (IOException e) {
+        throw new HoodieIOException("Failed to read schema from file: " + 
filePath, e);
       }
-    }
-    return schema;
+    }).filter(Objects::nonNull).findFirst().orElse(null);
   }
 
   public static Schema appendPartitionColumns(Schema dataSchema, 
Option<String[]> partitionFields) {
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 62942b5b742b..ddefc8689340 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -19,29 +19,43 @@
 package org.apache.hudi.common.table;
 
 import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.MockedStatic;
 
 import java.io.IOException;
 import java.net.URISyntaxException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;
 import static 
org.apache.hudi.common.testutils.HoodieCommonTestHarness.getDataBlock;
@@ -49,6 +63,13 @@ import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests {@link TableSchemaResolver}.
@@ -99,6 +120,68 @@ class TestTableSchemaResolver {
         logFilePath, HoodieTestUtils.getDefaultStorageConfWithDefaults()), 
logFilePath));
   }
 
+  @Test
+  void testHasOperationFieldFileInspectionOrdering() throws IOException {
+    StorageConfiguration conf = new HadoopStorageConfiguration(false);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
+    when(metaClient.getStorageConf()).thenReturn(conf);
+
+    when(metaClient.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ);
+    when(metaClient.getBasePath()).thenReturn(new 
StoragePath("/tmp/hudi_table"));
+    TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+    // create 3 base files and 2 log files
+    HoodieWriteStat baseFileWriteStat = 
buildWriteStat("partition1/baseFile1.parquet", 10, 100);
+    HoodieWriteStat logFileWriteStat = buildWriteStat("partition1/" + 
FSUtils.makeLogFileName("001", ".log", "100", 2, "1-0-1"), 0, 10);
+    // we don't expect any interactions with this write stat since the code 
should exit as soon as a valid schema is found
+    HoodieWriteStat baseFileWriteStat2 = mock(HoodieWriteStat.class);
+    // files that only have deletes will be skipped
+    HoodieWriteStat logFileWithOnlyDeletes = buildWriteStat("partition1/" + 
FSUtils.makeLogFileName("002", ".log", "100", 2, "1-0-1"), 0, 0);
+    HoodieWriteStat baseFileWithOnlyDeletes = 
buildWriteStat("partition2/baseFile2.parquet", 0, 0);
+
+    commitMetadata.addWriteStat("partition1", baseFileWriteStat);
+    commitMetadata.addWriteStat("partition1", logFileWithOnlyDeletes);
+    commitMetadata.addWriteStat("partition1", logFileWriteStat);
+    commitMetadata.addWriteStat("partition1", baseFileWriteStat2);
+    commitMetadata.addWriteStat("partition2", baseFileWithOnlyDeletes);
+    HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, "001", 
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+    HoodieTimeline timeline = mock(HoodieTimeline.class);
+    
when(metaClient.getCommitsTimeline().filterCompletedInstants()).thenReturn(timeline);
+    when(timeline.getReverseOrderedInstants()).thenReturn(Stream.of(instant));
+    when(timeline.readCommitMetadata(instant)).thenReturn(commitMetadata);
+
+    // mock calls to read schema
+    try (MockedStatic<HoodieIOFactory> ioFactoryMockedStatic = 
mockStatic(HoodieIOFactory.class);
+         MockedStatic<TableSchemaResolver> tableSchemaResolverMockedStatic = 
mockStatic(TableSchemaResolver.class)) {
+      // return null for first parquet file to force iteration to inspect the 
next file
+      Schema schema = Schema.createRecord("test_schema", null, 
"test_namespace", false);
+      schema.setFields(Arrays.asList(new Schema.Field("int_field", 
Schema.create(Schema.Type.INT)), new Schema.Field("_hoodie_operation", 
Schema.create(Schema.Type.STRING))));
+
+      // mock parquet file schema reading to return null for the first base 
file to force iteration
+      HoodieIOFactory ioFactory = mock(HoodieIOFactory.class);
+      FileFormatUtils fileFormatUtils = mock(FileFormatUtils.class);
+      StoragePath parquetPath = new 
StoragePath("/tmp/hudi_table/partition1/baseFile1.parquet");
+      
when(ioFactory.getFileFormatUtils(parquetPath)).thenReturn(fileFormatUtils);
+      when(fileFormatUtils.readAvroSchema(any(), 
eq(parquetPath))).thenReturn(null);
+      ioFactoryMockedStatic.when(() -> 
HoodieIOFactory.getIOFactory(any())).thenReturn(ioFactory);
+      // mock log file schema reading to return the expected schema
+      tableSchemaResolverMockedStatic.when(() -> 
TableSchemaResolver.readSchemaFromLogFile(any(), eq(new 
StoragePath("/tmp/hudi_table/" + logFileWriteStat.getPath()))))
+          .thenReturn(schema);
+
+      assertTrue(schemaResolver.hasOperationField());
+    }
+    verifyNoInteractions(baseFileWriteStat2);
+  }
+
+  private static HoodieWriteStat buildWriteStat(String path, int numInserts, 
int numUpdateWrites) {
+    HoodieWriteStat logFileWriteStat = new HoodieWriteStat();
+    logFileWriteStat.setPath(path);
+    logFileWriteStat.setNumInserts(numInserts);
+    logFileWriteStat.setNumUpdateWrites(numUpdateWrites);
+    logFileWriteStat.setNumDeletes(1);
+    return logFileWriteStat;
+  }
+
   private String initTestDir(String folderName) throws IOException {
     java.nio.file.Path basePath = tempDir.resolve(folderName);
     java.nio.file.Files.createDirectories(basePath);
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestCluster.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestCluster.java
index bbfce1faa82e..840865ef31a9 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestCluster.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestCluster.java
@@ -209,6 +209,7 @@ public class HiveTestCluster implements BeforeAllCallback, 
AfterAllCallback, Bef
       HoodieWriteStat writeStat = new HoodieWriteStat();
       writeStat.setFileId(fileId);
       writeStat.setPath(filePath.toString());
+      writeStat.setNumInserts(10);
       writeStats.add(writeStat);
     }
     return writeStats;
diff --git 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index d063804b4895..76d0a0eabd7d 100644
--- 
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++ 
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -521,6 +521,7 @@ public class HiveTestUtil {
         HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat();
         writeStat.setFileId(dataFile.getFileId());
         writeStat.setPath(logFile.getPath().toString());
+        writeStat.setNumUpdateWrites(10);
         commitMetadata.addWriteStat(partitionPath, writeStat);
       }
     }
@@ -541,6 +542,7 @@ public class HiveTestUtil {
         HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat();
         writeStat.setFileId(dataFile.getFileId());
         writeStat.setPath(logFile.getPath().toString());
+        writeStat.setNumUpdateWrites(10);
         commitMetadata.addWriteStat(partitionPath, writeStat);
       }
     }
@@ -611,6 +613,7 @@ public class HiveTestUtil {
       HoodieWriteStat writeStat = new HoodieWriteStat();
       writeStat.setFileId(fileId);
       writeStat.setPath(filePath.toString());
+      writeStat.setNumInserts(10);
       writeStats.add(writeStat);
     }
     return writeStats;

Reply via email to