This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7da43d618b55 [HUDI-9614] should only use metaclient to read index def
file from spark driver (#13728)
7da43d618b55 is described below
commit 7da43d618b5586593843f5ba870243b5f21c8ae4
Author: Vamshi Krishna Kyatham
<[email protected]>
AuthorDate: Fri Aug 22 06:22:43 2025 -0700
[HUDI-9614] should only use metaclient to read index def file from spark
driver (#13728)
---
.../hudi/common/table/HoodieTableMetaClient.java | 29 ++++---
.../common/table/TestHoodieTableMetaClient.java | 94 ++++++++++++++++++++++
.../apache/hudi/io/TestMetadataWriterCommit.java | 29 ++++++-
3 files changed, 134 insertions(+), 18 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 1a1d072874e2..0d1d8a4fec7c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -170,7 +170,7 @@ public class HoodieTableMetaClient implements Serializable {
private FileSystemRetryConfig fileSystemRetryConfig =
FileSystemRetryConfig.newBuilder().build();
protected HoodieMetaserverConfig metaserverConfig;
private HoodieTimeGeneratorConfig timeGeneratorConfig;
- private Option<HoodieIndexMetadata> indexMetadataOpt = Option.empty();
+ private Option<HoodieIndexMetadata> indexMetadataOpt;
private HoodieTableFormat tableFormat;
/**
@@ -189,7 +189,7 @@ public class HoodieTableMetaClient implements Serializable {
this.basePath = new StoragePath(basePath);
this.metaPath = new StoragePath(basePath, METAFOLDER_NAME);
this.tableConfig = new HoodieTableConfig(this.storage, metaPath);
- this.indexMetadataOpt = getIndexMetadata();
+ this.indexMetadataOpt = readIndexDefFromStorage(this.storage,
this.basePath, this.tableConfig);
this.tableType = tableConfig.getTableType();
Option<TimelineLayoutVersion> tableConfigVersion =
tableConfig.getTimelineLayoutVersion();
if (layoutVersion.isPresent() && tableConfigVersion.isPresent()) {
@@ -272,6 +272,9 @@ public class HoodieTableMetaClient implements Serializable {
checkState(indexMetadataOpt.isPresent(), "Index metadata is not present");
indexMetadataOpt.get().getIndexDefinitions().remove(indexName);
writeIndexMetadataToStorage();
+ if (indexMetadataOpt.get().getIndexDefinitions().isEmpty()) {
+ indexMetadataOpt = Option.empty();
+ }
}
/**
@@ -320,20 +323,16 @@ public class HoodieTableMetaClient implements
Serializable {
* Returns Option of {@link HoodieIndexMetadata} from index definition file
if present, else returns empty Option.
*/
public Option<HoodieIndexMetadata> getIndexMetadata() {
- if (indexMetadataOpt.isPresent() &&
!indexMetadataOpt.get().getIndexDefinitions().isEmpty()) {
- return indexMetadataOpt;
- }
- Option<HoodieIndexMetadata> indexDefOption = Option.empty();
- if (tableConfig.getRelativeIndexDefinitionPath().isPresent() &&
StringUtils.nonEmpty(tableConfig.getRelativeIndexDefinitionPath().get())) {
- indexDefOption = loadIndexDefFromStorage(basePath,
tableConfig.getRelativeIndexDefinitionPath().get(), storage);
- }
- return indexDefOption;
+ return indexMetadataOpt;
}
- private static Option<HoodieIndexMetadata> loadIndexDefFromStorage(
- StoragePath basePath, String relativeIndexDefinitionPath, HoodieStorage
storage) {
- StoragePath indexDefinitionPath =
- new StoragePath(basePath, relativeIndexDefinitionPath);
+ private static Option<HoodieIndexMetadata> readIndexDefFromStorage(
+ HoodieStorage storage, StoragePath basePath, HoodieTableConfig
tableConfig) {
+ Option<String> indexDefinitionPathOpt =
tableConfig.getRelativeIndexDefinitionPath();
+ if (indexDefinitionPathOpt.isEmpty() ||
StringUtils.isNullOrEmpty(indexDefinitionPathOpt.get())) {
+ return Option.empty();
+ }
+ StoragePath indexDefinitionPath = new StoragePath(basePath,
indexDefinitionPathOpt.get());
try {
Option<byte[]> bytesOpt = FileIOUtils.readDataFromPath(storage,
indexDefinitionPath, true);
if (bytesOpt.isPresent()) {
@@ -342,7 +341,7 @@ public class HoodieTableMetaClient implements Serializable {
return Option.of(new HoodieIndexMetadata());
}
} catch (IOException e) {
- throw new HoodieIOException("Could not load index definition at path: "
+ relativeIndexDefinitionPath, e);
+ throw new HoodieIOException("Could not load index definition at path: "
+ indexDefinitionPath, e);
}
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
index fd85ea258178..4f116020db8b 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableMetaClient.java
@@ -29,7 +29,9 @@ import
org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.metadata.HoodieIndexVersion;
+import org.apache.hudi.storage.HoodieInstantWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.storage.StoragePath;
@@ -39,6 +41,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -283,4 +286,95 @@ class TestHoodieTableMetaClient extends
HoodieCommonTestHarness {
new String(FileIOUtils.readDataFromPath(metaClient.getStorage(), new
StoragePath(metaClient.getIndexDefinitionPath())).get()));
assertTrue(indexMetadata.getIndexDefinitions().isEmpty());
}
+
+ @Test
+ void testReadIndexDefFromStorage() throws Exception {
+ final String basePath = tempDir.toAbsolutePath() + Path.SEPARATOR + "t8";
+
+ // No index definition path configured - should return empty
+ HoodieTableMetaClient metaClient = HoodieTableMetaClient.newTableBuilder()
+ .setTableType(HoodieTableType.COPY_ON_WRITE.name())
+ .setTableName("table")
+ .initTable(this.metaClient.getStorageConf(), basePath);
+
+ Method readIndexDefMethod = HoodieTableMetaClient.class
+ .getDeclaredMethod("readIndexDefFromStorage",
+ org.apache.hudi.storage.HoodieStorage.class,
+ StoragePath.class,
+ HoodieTableConfig.class);
+ readIndexDefMethod.setAccessible(true);
+
+ @SuppressWarnings("unchecked")
+ Option<HoodieIndexMetadata> result = (Option<HoodieIndexMetadata>)
readIndexDefMethod.invoke(
+ null, metaClient.getStorage(), metaClient.getBasePath(),
metaClient.getTableConfig());
+ assertTrue(result.isEmpty(), "Should return empty when no index definition
path is configured");
+
+ // Empty index definition path - should return empty
+
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key(),
"");
+ @SuppressWarnings("unchecked")
+ Option<HoodieIndexMetadata> result2 = (Option<HoodieIndexMetadata>)
readIndexDefMethod.invoke(
+ null, metaClient.getStorage(), metaClient.getBasePath(),
metaClient.getTableConfig());
+ assertTrue(result2.isEmpty(), "Should return empty when index definition
path is empty string");
+
+ // Valid path but file doesn't exist - should return empty
HoodieIndexMetadata
+ String relativePath = ".hoodie/.index_defs/index.json";
+
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key(),
relativePath);
+ @SuppressWarnings("unchecked")
+ Option<HoodieIndexMetadata> result3 = (Option<HoodieIndexMetadata>)
readIndexDefMethod.invoke(
+ null, metaClient.getStorage(), metaClient.getBasePath(),
metaClient.getTableConfig());
+ assertTrue(result3.isPresent(), "Should return present Option when path is
configured but file doesn't exist");
+ assertTrue(result3.get().getIndexDefinitions().isEmpty(), "Should return
empty HoodieIndexMetadata when file doesn't exist");
+
+ // Valid path with existing empty file - should return empty
HoodieIndexMetadata
+ StoragePath indexPath = new StoragePath(metaClient.getBasePath(),
relativePath);
+ FileIOUtils.createFileInPath(metaClient.getStorage(), indexPath,
+
Option.of(HoodieInstantWriter.convertByteArrayToWriter("{}".getBytes())));
+ @SuppressWarnings("unchecked")
+ Option<HoodieIndexMetadata> result4 = (Option<HoodieIndexMetadata>)
readIndexDefMethod.invoke(
+ null, metaClient.getStorage(), metaClient.getBasePath(),
metaClient.getTableConfig());
+ assertTrue(result4.isPresent(), "Should return present Option when file
exists");
+ assertTrue(result4.get().getIndexDefinitions().isEmpty(), "Should return
empty HoodieIndexMetadata for empty file");
+
+ // Valid path with valid index metadata - should return populated
HoodieIndexMetadata
+ Map<String, Map<String, String>> columnsMap = new HashMap<>();
+ columnsMap.put("c1", Collections.emptyMap());
+ String indexName =
MetadataPartitionType.EXPRESSION_INDEX.getPartitionPath() + "test_idx";
+ HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+ .withIndexName(indexName)
+ .withIndexType("column_stats")
+ .withIndexFunction("identity")
+
.withVersion(HoodieIndexVersion.getCurrentVersion(HoodieTableVersion.current(),
indexName))
+ .withSourceFields(new ArrayList<>(columnsMap.keySet()))
+ .withIndexOptions(Collections.emptyMap())
+ .build();
+
+ Map<String, HoodieIndexDefinition> indexDefMap = new HashMap<>();
+ indexDefMap.put(indexName, indexDefinition);
+ HoodieIndexMetadata validIndexMetadata = new
HoodieIndexMetadata(indexDefMap);
+
+ FileIOUtils.createFileInPath(metaClient.getStorage(), indexPath,
+
Option.of(HoodieInstantWriter.convertByteArrayToWriter(validIndexMetadata.toJson().getBytes())));
+ @SuppressWarnings("unchecked")
+ Option<HoodieIndexMetadata> result5 = (Option<HoodieIndexMetadata>)
readIndexDefMethod.invoke(
+ null, metaClient.getStorage(), metaClient.getBasePath(),
metaClient.getTableConfig());
+ assertTrue(result5.isPresent(), "Should return present Option when valid
file exists");
+ assertFalse(result5.get().getIndexDefinitions().isEmpty(), "Should return
populated HoodieIndexMetadata");
+ assertEquals(1, result5.get().getIndexDefinitions().size(), "Should have
one index definition");
+ assertTrue(result5.get().getIndexDefinitions().containsKey(indexName),
"Should contain the test index");
+ assertEquals("column_stats",
result5.get().getIndexDefinitions().get(indexName).getIndexType(), "Index type
should match");
+
+ // Invalid JSON file - should throw HoodieIOException
+ FileIOUtils.createFileInPath(metaClient.getStorage(), indexPath,
+ Option.of(HoodieInstantWriter.convertByteArrayToWriter("invalid
json".getBytes())));
+ assertThrows(HoodieIOException.class, () -> {
+ try {
+ readIndexDefMethod.invoke(null, metaClient.getStorage(),
metaClient.getBasePath(), metaClient.getTableConfig());
+ } catch (java.lang.reflect.InvocationTargetException e) {
+ if (e.getCause() instanceof HoodieIOException) {
+ throw (HoodieIOException) e.getCause();
+ }
+ throw new RuntimeException(e);
+ }
+ }, "Should throw HoodieIOException for invalid JSON");
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
index 03bb758e091d..6ad06102511b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java
@@ -35,6 +35,11 @@ import
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.metadata.HoodieIndexVersion;
+import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
@@ -48,6 +53,7 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -185,15 +191,32 @@ public class TestMetadataWriterCommit extends
BaseTestHandle {
.build();
config.setSchema(TRIP_EXAMPLE_SCHEMA);
+ // Just replicating the production code path to create the metadata table
+ metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build();
+
+ HoodieIndexDefinition indexDefinition = HoodieIndexDefinition.newBuilder()
+ .withIndexName("secondary_index_sec-rider") // matches
PARTITION_NAME_SECONDARY_INDEX_PREFIX + indexName
+ .withIndexType("secondary_index")
+ .withSourceFields(Collections.singletonList("rider"))
+ .withIndexOptions(Collections.emptyMap())
+
.withVersion(HoodieIndexVersion.getCurrentVersion(metaClient.getTableConfig().getTableVersion(),
MetadataPartitionType.SECONDARY_INDEX))
+ .build();
+
+ metaClient.buildIndexDefinition(indexDefinition);
+
+ Properties indexProps = new Properties();
+
indexProps.setProperty(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH.key(),
+ FSUtils.getRelativePartitionPath(metaClient.getBasePath(), new
StoragePath(metaClient.getIndexDefinitionPath())));
+ HoodieTableConfig.update(metaClient.getStorage(),
metaClient.getMetaPath(), indexProps);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+
// create mdt writer
HoodieBackedTableMetadataWriter mdtWriter =
(HoodieBackedTableMetadataWriter)
SparkMetadataWriterFactory.createWithStreamingWrites(storageConf, config,
HoodieFailedWritesCleaningPolicy.LAZY, context, Option.empty());
HoodieTableMetaClient mdtMetaClient =
HoodieTableMetaClient.builder().setBasePath(metaClient.getMetaPath() +
"/metadata").setConf(storageConf).build();
// 3 bootstrapped MDT partitions - files, record index and secondary index
assertEquals(3,
mdtMetaClient.getActiveTimeline().filterCompletedInstants().countInstants());
-
- metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(storageConf).build();
-
metaClient.getTableConfig().setValue(HoodieTableConfig.RELATIVE_INDEX_DEFINITION_PATH,
metaClient.getIndexDefinitionPath());
table = HoodieSparkTable.create(config, context, metaClient);
dataGenerator = new HoodieTestDataGenerator(new String[] {partitionPath});
// create a parquet file and obtain corresponding write status