This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new eb8e829b876d [HUDI-9612] Update schema resolver to inspect files with
inserts/updates (#13586)
eb8e829b876d is described below
commit eb8e829b876d1e670d5651bd6f258ddced6b6df1
Author: Tim Brown <[email protected]>
AuthorDate: Wed Jul 23 22:48:25 2025 -0400
[HUDI-9612] Update schema resolver to inspect files with inserts/updates
(#13586)
* Update schema resolver to inspect files with inserts/updates
* move inspection internal to TableSchemaResolver, optimistically inspect
if previously loaded commit metadata has insert/update to avoid extra search
---
.../hudi/common/table/TableSchemaResolver.java | 122 +++++++++++++--------
.../hudi/common/table/TestTableSchemaResolver.java | 83 ++++++++++++++
.../hudi/hive/testutils/HiveTestCluster.java | 1 +
.../apache/hudi/hive/testutils/HiveTestUtil.java | 3 +
4 files changed, 161 insertions(+), 48 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index c9458c948d82..a1385572091d 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -58,10 +58,12 @@ import javax.annotation.concurrent.ThreadSafe;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Iterator;
+import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
+import java.util.stream.Stream;
import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.containsFieldInSchema;
@@ -99,7 +101,7 @@ public class TableSchemaResolver {
private final Lazy<ConcurrentHashMap<HoodieInstant, HoodieCommitMetadata>>
commitMetadataCache;
private volatile HoodieInstant latestCommitWithValidSchema = null;
- private volatile HoodieInstant latestCommitWithValidData = null;
+ private volatile HoodieInstant latestCommitWithInsertOrUpdate = null;
public TableSchemaResolver(HoodieTableMetaClient metaClient) {
this.metaClient = metaClient;
@@ -247,29 +249,28 @@ public class TableSchemaResolver {
* Fetches the schema for a table from any the table's data files
*/
private Option<Schema> getTableParquetSchemaFromDataFile() {
- Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata
= getLatestCommitMetadataWithValidData();
- try {
- switch (metaClient.getTableType()) {
- case COPY_ON_WRITE:
- case MERGE_ON_READ:
- // For COW table, data could be written in either Parquet or Orc
format currently;
- // For MOR table, data could be written in either Parquet, Orc,
Hfile or Delta-log format currently;
- //
- // Determine the file format based on the file name, and then
extract schema from it.
- if (instantAndCommitMetadata.isPresent()) {
- HoodieCommitMetadata commitMetadata =
instantAndCommitMetadata.get().getRight();
- Iterator<String> filePaths =
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
- return Option.of(fetchSchemaFromFiles(filePaths));
- } else {
- LOG.warn("Could not find any data file written for commit, so
could not get schema for table {}", metaClient.getBasePath());
- return Option.empty();
- }
- default:
- LOG.error("Unknown table type {}", metaClient.getTableType());
- throw new InvalidTableException(metaClient.getBasePath().toString());
- }
- } catch (IOException e) {
- throw new HoodieException("Failed to read data schema", e);
+ Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata
= getLatestCommitMetadataWithInsertOrUpdate();
+ switch (metaClient.getTableType()) {
+ case COPY_ON_WRITE:
+ case MERGE_ON_READ:
+ // For COW table, data could be written in either Parquet or Orc
format currently;
+ // For MOR table, data could be written in either Parquet, Orc, Hfile
or Delta-log format currently;
+ //
+ // Determine the file format based on the file name, and then extract
schema from it.
+ if (instantAndCommitMetadata.isPresent()) {
+ HoodieCommitMetadata commitMetadata =
instantAndCommitMetadata.get().getRight();
+ // inspect non-empty files for schema
+ Stream<StoragePath> filePaths =
commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream)
+ .filter(writeStat -> writeStat.getNumInserts() > 0 ||
writeStat.getNumUpdateWrites() > 0)
+ .map(writeStat -> new StoragePath(metaClient.getBasePath(),
writeStat.getPath()));
+ return Option.of(fetchSchemaFromFiles(filePaths));
+ } else {
+ LOG.warn("Could not find any data file written for commit, so could
not get schema for table {}", metaClient.getBasePath());
+ return Option.empty();
+ }
+ default:
+ LOG.error("Unknown table type {}", metaClient.getTableType());
+ throw new InvalidTableException(metaClient.getBasePath().toString());
}
}
@@ -427,26 +428,50 @@ public class TableSchemaResolver {
.map(instant -> Pair.of(instant,
commitMetadataCache.get().get(instant)));
}
- private Option<Pair<HoodieInstant, HoodieCommitMetadata>>
getLatestCommitMetadataWithValidData() {
- if (latestCommitWithValidData == null) {
- Option<Pair<HoodieInstant, HoodieCommitMetadata>>
instantAndCommitMetadata =
- metaClient.getActiveTimeline().getLastCommitMetadataWithValidData();
- if (instantAndCommitMetadata.isPresent()) {
- HoodieInstant instant = instantAndCommitMetadata.get().getLeft();
- HoodieCommitMetadata metadata =
instantAndCommitMetadata.get().getRight();
+ private Option<Pair<HoodieInstant, HoodieCommitMetadata>>
getLatestCommitMetadataWithInsertOrUpdate() {
+ if (latestCommitWithValidSchema != null &&
commitMetadataCache.get().containsKey(latestCommitWithValidSchema)) {
+ HoodieCommitMetadata commitMetadata =
commitMetadataCache.get().get(latestCommitWithValidSchema);
+ if (commitHasInsertOrUpdate(commitMetadata)) {
+ latestCommitWithInsertOrUpdate = latestCommitWithValidSchema;
+ }
+ }
+ if (latestCommitWithInsertOrUpdate == null) {
+ getLatestCommitWithInsertOrUpdate().ifPresent(instantAndCommitMetadata
-> {
+ HoodieInstant instant = instantAndCommitMetadata.getLeft();
+ HoodieCommitMetadata metadata = instantAndCommitMetadata.getRight();
synchronized (this) {
- if (latestCommitWithValidData == null) {
- latestCommitWithValidData = instant;
+ if (latestCommitWithInsertOrUpdate == null) {
+ latestCommitWithInsertOrUpdate = instant;
}
commitMetadataCache.get().putIfAbsent(instant, metadata);
}
- }
+ });
}
- return Option.ofNullable(latestCommitWithValidData)
+ return Option.ofNullable(latestCommitWithInsertOrUpdate)
.map(instant -> Pair.of(instant,
commitMetadataCache.get().get(instant)));
}
+ private Option<Pair<HoodieInstant, HoodieCommitMetadata>>
getLatestCommitWithInsertOrUpdate() {
+ HoodieTimeline commitsTimeline =
metaClient.getCommitsTimeline().filterCompletedInstants();
+ return Option.fromJavaOptional(commitsTimeline.getReverseOrderedInstants()
+ .map(instant -> {
+ try {
+ HoodieCommitMetadata commitMetadata =
commitsTimeline.readCommitMetadata(instant);
+ return Pair.of(instant, commitMetadata);
+ } catch (IOException e) {
+ throw new HoodieIOException(String.format("Failed to fetch
HoodieCommitMetadata for instant (%s)", instant), e);
+ }
+ })
+ .filter(pair -> commitHasInsertOrUpdate(pair.getRight()))
+ .findFirst());
+ }
+
+ private boolean commitHasInsertOrUpdate(HoodieCommitMetadata commitMetadata)
{
+ return
commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream)
+ .anyMatch(writeStat -> writeStat.getNumInserts() > 0 ||
writeStat.getNumUpdateWrites() > 0);
+ }
+
private HoodieCommitMetadata getCachedCommitMetadata(HoodieInstant instant) {
return commitMetadataCache.get()
.computeIfAbsent(instant, (missingInstant) -> {
@@ -459,19 +484,20 @@ public class TableSchemaResolver {
});
}
- private Schema fetchSchemaFromFiles(Iterator<String> filePaths) throws
IOException {
- Schema schema = null;
- while (filePaths.hasNext() && schema == null) {
- StoragePath filePath = new StoragePath(filePaths.next());
- if (FSUtils.isLogFile(filePath)) {
- // this is a log file
- schema = readSchemaFromLogFile(filePath);
- } else {
- schema = HoodieIOFactory.getIOFactory(metaClient.getStorage())
-
.getFileFormatUtils(filePath).readAvroSchema(metaClient.getStorage(), filePath);
+ private Schema fetchSchemaFromFiles(Stream<StoragePath> filePaths) {
+ return filePaths.map(filePath -> {
+ try {
+ if (FSUtils.isLogFile(filePath)) {
+ // this is a log file
+ return readSchemaFromLogFile(filePath);
+ } else {
+ return HoodieIOFactory.getIOFactory(metaClient.getStorage())
+
.getFileFormatUtils(filePath).readAvroSchema(metaClient.getStorage(), filePath);
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read schema from file: " +
filePath, e);
}
- }
- return schema;
+ }).filter(Objects::nonNull).findFirst().orElse(null);
}
public static Schema appendPartitionColumns(Schema dataSchema,
Option<String[]> partitionFields) {
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 62942b5b742b..ddefc8689340 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -19,29 +19,43 @@
package org.apache.hudi.common.table;
import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.MockedStatic;
import java.io.IOException;
import java.net.URISyntaxException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;
import static
org.apache.hudi.common.testutils.HoodieCommonTestHarness.getDataBlock;
@@ -49,6 +63,13 @@ import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.mockito.Mockito.when;
/**
* Tests {@link TableSchemaResolver}.
@@ -99,6 +120,68 @@ class TestTableSchemaResolver {
logFilePath, HoodieTestUtils.getDefaultStorageConfWithDefaults()),
logFilePath));
}
+ @Test
+ void testHasOperationFieldFileInspectionOrdering() throws IOException {
+ StorageConfiguration conf = new HadoopStorageConfiguration(false);
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
+ when(metaClient.getStorageConf()).thenReturn(conf);
+
+ when(metaClient.getTableType()).thenReturn(HoodieTableType.MERGE_ON_READ);
+ when(metaClient.getBasePath()).thenReturn(new
StoragePath("/tmp/hudi_table"));
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ // create 3 base files and 2 log files
+ HoodieWriteStat baseFileWriteStat =
buildWriteStat("partition1/baseFile1.parquet", 10, 100);
+ HoodieWriteStat logFileWriteStat = buildWriteStat("partition1/" +
FSUtils.makeLogFileName("001", ".log", "100", 2, "1-0-1"), 0, 10);
+ // we don't expect any interactions with this write stat since the code
should exit as soon as a valid schema is found
+ HoodieWriteStat baseFileWriteStat2 = mock(HoodieWriteStat.class);
+ // files that only have deletes will be skipped
+ HoodieWriteStat logFileWithOnlyDeletes = buildWriteStat("partition1/" +
FSUtils.makeLogFileName("002", ".log", "100", 2, "1-0-1"), 0, 0);
+ HoodieWriteStat baseFileWithOnlyDeletes =
buildWriteStat("partition2/baseFile2.parquet", 0, 0);
+
+ commitMetadata.addWriteStat("partition1", baseFileWriteStat);
+ commitMetadata.addWriteStat("partition1", logFileWithOnlyDeletes);
+ commitMetadata.addWriteStat("partition1", logFileWriteStat);
+ commitMetadata.addWriteStat("partition1", baseFileWriteStat2);
+ commitMetadata.addWriteStat("partition2", baseFileWithOnlyDeletes);
+ HoodieInstant instant = new HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, "001",
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+ HoodieTimeline timeline = mock(HoodieTimeline.class);
+
when(metaClient.getCommitsTimeline().filterCompletedInstants()).thenReturn(timeline);
+ when(timeline.getReverseOrderedInstants()).thenReturn(Stream.of(instant));
+ when(timeline.readCommitMetadata(instant)).thenReturn(commitMetadata);
+
+ // mock calls to read schema
+ try (MockedStatic<HoodieIOFactory> ioFactoryMockedStatic =
mockStatic(HoodieIOFactory.class);
+ MockedStatic<TableSchemaResolver> tableSchemaResolverMockedStatic =
mockStatic(TableSchemaResolver.class)) {
+ // return null for first parquet file to force iteration to inspect the
next file
+ Schema schema = Schema.createRecord("test_schema", null,
"test_namespace", false);
+ schema.setFields(Arrays.asList(new Schema.Field("int_field",
Schema.create(Schema.Type.INT)), new Schema.Field("_hoodie_operation",
Schema.create(Schema.Type.STRING))));
+
+ // mock parquet file schema reading to return null for the first base
file to force iteration
+ HoodieIOFactory ioFactory = mock(HoodieIOFactory.class);
+ FileFormatUtils fileFormatUtils = mock(FileFormatUtils.class);
+ StoragePath parquetPath = new
StoragePath("/tmp/hudi_table/partition1/baseFile1.parquet");
+
when(ioFactory.getFileFormatUtils(parquetPath)).thenReturn(fileFormatUtils);
+ when(fileFormatUtils.readAvroSchema(any(),
eq(parquetPath))).thenReturn(null);
+ ioFactoryMockedStatic.when(() ->
HoodieIOFactory.getIOFactory(any())).thenReturn(ioFactory);
+ // mock log file schema reading to return the expected schema
+ tableSchemaResolverMockedStatic.when(() ->
TableSchemaResolver.readSchemaFromLogFile(any(), eq(new
StoragePath("/tmp/hudi_table/" + logFileWriteStat.getPath()))))
+ .thenReturn(schema);
+
+ assertTrue(schemaResolver.hasOperationField());
+ }
+ verifyNoInteractions(baseFileWriteStat2);
+ }
+
+ private static HoodieWriteStat buildWriteStat(String path, int numInserts,
int numUpdateWrites) {
+ HoodieWriteStat logFileWriteStat = new HoodieWriteStat();
+ logFileWriteStat.setPath(path);
+ logFileWriteStat.setNumInserts(numInserts);
+ logFileWriteStat.setNumUpdateWrites(numUpdateWrites);
+ logFileWriteStat.setNumDeletes(1);
+ return logFileWriteStat;
+ }
+
private String initTestDir(String folderName) throws IOException {
java.nio.file.Path basePath = tempDir.resolve(folderName);
java.nio.file.Files.createDirectories(basePath);
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestCluster.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestCluster.java
index bbfce1faa82e..840865ef31a9 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestCluster.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestCluster.java
@@ -209,6 +209,7 @@ public class HiveTestCluster implements BeforeAllCallback,
AfterAllCallback, Bef
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(fileId);
writeStat.setPath(filePath.toString());
+ writeStat.setNumInserts(10);
writeStats.add(writeStat);
}
return writeStats;
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
index d063804b4895..76d0a0eabd7d 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/testutils/HiveTestUtil.java
@@ -521,6 +521,7 @@ public class HiveTestUtil {
HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat();
writeStat.setFileId(dataFile.getFileId());
writeStat.setPath(logFile.getPath().toString());
+ writeStat.setNumUpdateWrites(10);
commitMetadata.addWriteStat(partitionPath, writeStat);
}
}
@@ -541,6 +542,7 @@ public class HiveTestUtil {
HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat();
writeStat.setFileId(dataFile.getFileId());
writeStat.setPath(logFile.getPath().toString());
+ writeStat.setNumUpdateWrites(10);
commitMetadata.addWriteStat(partitionPath, writeStat);
}
}
@@ -611,6 +613,7 @@ public class HiveTestUtil {
HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setFileId(fileId);
writeStat.setPath(filePath.toString());
+ writeStat.setNumInserts(10);
writeStats.add(writeStat);
}
return writeStats;