This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7da43d618b55 [HUDI-9614] should only use metaclient to read index def 
file from spark driver (#13728)
7da43d618b55 is described below

commit 7da43d618b5586593843f5ba870243b5f21c8ae4
Author: Vamshi Krishna Kyatham 
<[email protected]>
AuthorDate: Fri Aug 22 06:22:43 2025 -0700

    [HUDI-9614] should only use metaclient to read index def file from spark 
driver (#13728)
---
 .../hudi/common/table/HoodieTableMetaClient.java   | 29 ++++---
 .../common/table/TestHoodieTableMetaClient.java    | 94 ++++++++++++++++++++++
 .../apache/hudi/io/TestMetadataWriterCommit.java   | 29 ++++++-
 3 files changed, 134 insertions(+), 18 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 1a1d072874e2..0d1d8a4fec7c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -170,7 +170,7 @@ public class HoodieTableMetaClient implements Serializable {
   private FileSystemRetryConfig fileSystemRetryConfig = 
FileSystemRetryConfig.newBuilder().build();
   protected HoodieMetaserverConfig metaserverConfig;
   private HoodieTimeGeneratorConfig timeGeneratorConfig;
-  private Option<HoodieIndexMetadata> indexMetadataOpt = Option.empty();
+  private Option<HoodieIndexMetadata> indexMetadataOpt;
   private HoodieTableFormat tableFormat;
 
   /**
@@ -189,7 +189,7 @@ public class HoodieTableMetaClient implements Serializable {
     this.basePath = new StoragePath(basePath);
     this.metaPath = new StoragePath(basePath, METAFOLDER_NAME);
     this.tableConfig = new HoodieTableConfig(this.storage, metaPath);
-    this.indexMetadataOpt = getIndexMetadata();
+    this.indexMetadataOpt = readIndexDefFromStorage(this.storage, 
this.basePath, this.tableConfig);
     this.tableType = tableConfig.getTableType();
     Option<TimelineLayoutVersion> tableConfigVersion = 
tableConfig.getTimelineLayoutVersion();
     if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) {
@@ -272,6 +272,9 @@ public class HoodieTableMetaClient implements Serializable {
     checkState(indexMetadataOpt.isPresent(), "Index metadata is not present");
     indexMetadataOpt.get().getIndexDefinitions().remove(indexName);
     writeIndexMetadataToStorage();
+    if (indexMetadataOpt.get().getIndexDefinitions().isEmpty()) {
+      indexMetadataOpt = Option.empty();
+    }
   }
 
   /**
@@ -320,20 +323,16 @@ public class HoodieTableMetaClient implements 
Serializable {
    * Returns Option of {@link HoodieIndexMetadata} from index definition file 
if present, else returns empty Option.
    */
   public Option<HoodieIndexMetadata> getIndexMetadata() {
-    if (indexMetadataOpt.isPresent() && 
!indexMetadataOpt.get().getIndexDefinitions().isEmpty()) {
-      return indexMetadataOpt;
-    }
-    Option<HoodieIndexMetadata> indexDefOption = Option.empty();
-    if (tableConfig.getRelativeIndexDefinitionPath().isPresent() && 
StringUtils.nonEmpty(tableConfig.getRelativeIndexDefinitionPath().get())) {
-      indexDefOption = loadIndexDefFromStorage(basePath, 
tableConfig.getRelativeIndexDefinitionPath().get(), storage);
-    }
-    return indexDefOption;
+    return indexMetadataOpt;
   }
 
-  private static Option<HoodieIndexMetadata> loadIndexDefFromStorage(
-      StoragePath basePath, String relativeIndexDefinitionPath, HoodieStorage 
storage) {
-    StoragePath indexDefinitionPath =
-        new StoragePath(basePath, relativeIndexDefinitionPath);
+  private static Option<HoodieIndexMetadata> readIndexDefFromStorage(
+      HoodieStorage storage, StoragePath basePath, HoodieTableConfig 
tableConfig) {
+    Option<String> indexDefinitionPathOpt = 
tableConfig.getRelativeIndexDefinitionPath();
+    if (indexDefinitionPathOpt.isEmpty() || 
StringUtils.isNullOrEmpty(indexDefinitionPathOpt.get())) {
+      return Option.empty();
+    }
+    StoragePath indexDefinitionPath = new StoragePath(basePath, 
indexDefinitionPathOpt.get());
     try {
       Option<byte[]> bytesOpt = FileIOUtils.readDataFromPath(storage, 
indexDefinitionPath, true);
       if (bytesOpt.isPresent()) {
@@ -342,7 +341,7 @@ public class HoodieTableMetaClient implements Serializable {
         return Option.of(new HoodieIndexMetadata());
       }
     } catch (IOException e) {
-      throw new HoodieIOException("Could not load index definition at path: " 
+ relativeIndexDefinitionPath, e);
+      throw new HoodieIOException("Could not load index definition at path: " 
+ indexDefinitionPath, e);
     }
   }
 
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index fd85ea258178..4f116020db8b 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -29,7 +29,9 @@ import 
org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.metadata.HoodieIndexVersion;
+import org.apache.hudi.storage.HoodieInstantWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.storage.StoragePath;
 
@@ -39,6 +41,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -283,4 +286,95 @@ class TestHoodieTableMetaClient extends 
HoodieCommonTestHarness {
         new String(FileIOUtils.readDataFromPath(metaClient.getStorage(), new 
StoragePath(metaClient.getIndexDefinitionPath())).get()));
     assertTrue(indexMetadata.getIndexDefinitions().isEmpty());
   }
+
+  @Test
+  void testReadIndexDefFromStorage() throws Exception {
+    final String basePath = tempDir.toAbsolutePath() + Path.SEPARATOR + "t8";
+
+    // No index definition path configured - should return empty
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.newTableBuilder()
+        .setTableType(HoodieTableType.COPY_ON_WRITE.name())
+        .setTableName("table")
+        .initTable(this.metaClient.getStorageConf(), basePath);
+
+    Method readIndexDefMethod = HoodieTableMetaClient.class
+        .getDeclaredMethod("readIndexDefFromStorage",
+            org.apache.hudi.storage.HoodieStorage.class,
+            StoragePath.class,
+            HoodieTableConfig.class);
+    readIndexDefMethod.setAccessible(true);
+
+    @SuppressWarnings("unchecked")
+    Option<HoodieIndexMetadata> result = (Option<HoodieIndexMetadata>) 
readIndexDefMethod.invoke(
+        null, metaClient.getStorage(), metaClient.getBasePath(), 
metaClient.getTableConfig());
+    assertTrue(result.isEmpty(), "Should return empty when no index definition 
path is configured");
+
+    // Empty index definition path - should return empty
+    
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key(),
 "");
+    @SuppressWarnings("unchecked")
+    Option<HoodieIndexMetadata> result2 = (Option<HoodieIndexMetadata>) 
readIndexDefMethod.invoke(
+        null, metaClient.getStorage(), metaClient.getBasePath(), 
metaClient.getTableConfig());
+    assertTrue(result2.isEmpty(), "Should return empty when index definition 
path is empty string");
+
+    // Valid path but file doesn't exist - should return empty 
HoodieIndexMetadata
+    String relativePath = ".hoodie/.index_defs/index.json";
+    
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key(),
 relativePath);
+    @SuppressWarnings("unchecked")
+    Option<HoodieIndexMetadata> result3 = (Option<HoodieIndexMetadata>) 
readIndexDefMethod.invoke(
+        null, metaClient.getStorage(), metaClient.getBasePath(), 
metaClient.getTableConfig());
+    assertTrue(result3.isPresent(), "Should return present Option when path is 
configured but file doesn't exist");
+    assertTrue(result3.get().getIndexDefinitions().isEmpty(), "Should return 
empty HoodieIndexMetadata when file doesn't exist");
+
+    // Valid path with existing empty file - should return empty 
HoodieIndexMetadata
+    StoragePath indexPath = new StoragePath(metaClient.getBasePath(), 
relativePath);
+    FileIOUtils.createFileInPath(metaClient.getStorage(), indexPath,
+        
Option.of(HoodieInstantWriter.convertByteArrayToWriter("{}".getBytes())));
+    @SuppressWarnings("unchecked")
+    Option<HoodieIndexMetadata> result4 = (Option<HoodieIndexMetadata>) 
readIndexDefMethod.invoke(
+        null, metaClient.getStorage(), metaClient.getBasePath(), 
metaClient.getTableConfig());
+    assertTrue(result4.isPresent(), "Should return present Option when file 
exists");
+    assertTrue(result4.get().getIndexDefinitions().isEmpty(), "Should return 
empty HoodieIndexMetadata for empty file");
+
+    // Valid path with valid index metadata - should return populated 
HoodieIndexMetadata
+    Map<String, Map<String, String>> columnsMap = new HashMap<>();
+    columnsMap.put("c1", Collections.emptyMap());
+    String indexName = 
MetadataPartitionType.EXPRESSION_INDEX.getPartitionPath() + "test_idx";
+    HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+        .withIndexName(indexName)
+        .withIndexType("column_stats")
+        .withIndexFunction("identity")
+        
.withVersion(HoodieIndexVersion.getCurrentVersion(HoodieTableVersion.current(), 
indexName))
+        .withSourceFields(new ArrayList<>(columnsMap.keySet()))
+        .withIndexOptions(Collections.emptyMap())
+        .build();
+
+    Map<String, HoodieIndexDefinition> indexDefMap = new HashMap<>();
+    indexDefMap.put(indexName, indexDefinition);
+    HoodieIndexMetadata validIndexMetadata = new 
HoodieIndexMetadata(indexDefMap);
+
+    FileIOUtils.createFileInPath(metaClient.getStorage(), indexPath,
+        
Option.of(HoodieInstantWriter.convertByteArrayToWriter(validIndexMetadata.toJson().getBytes())));
+    @SuppressWarnings("unchecked")
+    Option<HoodieIndexMetadata> result5 = (Option<HoodieIndexMetadata>) 
readIndexDefMethod.invoke(
+        null, metaClient.getStorage(), metaClient.getBasePath(), 
metaClient.getTableConfig());
+    assertTrue(result5.isPresent(), "Should return present Option when valid 
file exists");
+    assertFalse(result5.get().getIndexDefinitions().isEmpty(), "Should return 
populated HoodieIndexMetadata");
+    assertEquals(1, result5.get().getIndexDefinitions().size(), "Should have 
one index definition");
+    assertTrue(result5.get().getIndexDefinitions().containsKey(indexName), 
"Should contain the test index");
+    assertEquals("column_stats", 
result5.get().getIndexDefinitions().get(indexName).getIndexType(), "Index type 
should match");
+
+    // Invalid JSON file - should throw HoodieIOException
+    FileIOUtils.createFileInPath(metaClient.getStorage(), indexPath,
+        Option.of(HoodieInstantWriter.convertByteArrayToWriter("invalid 
json".getBytes())));
+    assertThrows(HoodieIOException.class, () -> {
+      try {
+        readIndexDefMethod.invoke(null, metaClient.getStorage(), 
metaClient.getBasePath(), metaClient.getTableConfig());
+      } catch (java.lang.reflect.InvocationTargetException e) {
+        if (e.getCause() instanceof HoodieIOException) {
+          throw (HoodieIOException) e.getCause();
+        }
+        throw new RuntimeException(e);
+      }
+    }, "Should throw HoodieIOException for invalid JSON");
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
index 03bb758e091d..6ad06102511b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
@@ -35,6 +35,11 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.InProcessTimeGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.metadata.HoodieIndexVersion;
+import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
@@ -48,6 +53,7 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+import java.util.Properties;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -185,15 +191,32 @@ public class TestMetadataWriterCommit extends 
BaseTestHandle {
         .build();
     config.setSchema(TRIP_EXAMPLE_SCHEMA);
 
+    // Just replicating the production code path to create the metadata table
+    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build();
+
+    HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+        .withIndexName("secondary_index_sec-rider")  // matches 
PARTITION_NAME_SECONDARY_INDEX_PREFIX + indexName
+        .withIndexType("secondary_index")
+        .withSourceFields(Collections.singletonList("rider"))
+        .withIndexOptions(Collections.emptyMap())
+        
.withVersion(HoodieIndexVersion.getCurrentVersion(metaClient.getTableConfig().getTableVersion(),
 MetadataPartitionType.SECONDARY_INDEX))
+        .build();
+
+    metaClient.buildIndexDefinition(indexDefinition);
+
+    Properties indexProps = new Properties();
+    
indexProps.setProperty(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key(), 
+        FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new 
StoragePath(metaClient.getIndexDefinitionPath())));
+    HoodieTableConfig.update(metaClient.getStorage(), 
metaClient.getMetaPath(), indexProps);
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
     // create mdt writer
     HoodieBackedTableMetadataWriter mdtWriter = 
(HoodieBackedTableMetadataWriter) 
SparkMetadataWriterFactory.createWithStreamingWrites(storageConf, config,
         HoodieFailedWritesCleaningPolicy.LAZY, context, Option.empty());
     HoodieTableMetaClient mdtMetaClient = 
HoodieTableMetaClient.builder().setBasePath(metaClient.getMetaPath() + 
"/metadata").setConf(storageConf).build();
     // 3 bootstrapped MDT partitions - files, record index and secondary index
     assertEquals(3, 
mdtMetaClient.getActiveTimeline().filterCompletedInstants().countInstants());
-
-    metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build();
-    
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH,
 metaClient.getIndexDefinitionPath());
     table = HoodieSparkTable.create(config, context, metaClient);
     dataGenerator = new HoodieTestDataGenerator(new String[] {partitionPath});
     // create a parquet file and obtain corresponding write status

Reply via email to