This is an automated email from the ASF dual-hosted git repository.
wombatu-kun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4bafe5a6075e test(trino): de-flake TestHudi*FileOperations by
asserting only synchronous reads (#19004)
4bafe5a6075e is described below
commit 4bafe5a6075ef27691ef126cb412759d83e4a611
Author: Vova Kolmakov <[email protected]>
AuthorDate: Thu Jun 18 14:41:52 2026 +0700
test(trino): de-flake TestHudi*FileOperations by asserting only synchronous
reads (#19004)
---
.../hudi/TestHudiAlluxioCacheFileOperations.java | 79 ++++++++++++++--------
.../hudi/TestHudiMemoryCacheFileOperations.java | 26 +++----
.../plugin/hudi/TestHudiNoCacheFileOperations.java | 26 +++----
3 files changed, 70 insertions(+), 61 deletions(-)
diff --git
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java
index 89507721afa5..911c5d4c3e7d 100644
---
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java
+++
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java
@@ -17,6 +17,7 @@ import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.Multiset;
+import io.airlift.units.Duration;
import io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer;
import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation;
import io.trino.testing.AbstractTestQueryFramework;
@@ -36,7 +37,6 @@ import static
com.google.common.io.MoreFiles.deleteRecursively;
import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
import static
io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans;
import static
io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_MULTI_FG_PT_V8_MOR;
-import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.DATA;
import static
io.trino.plugin.hudi.util.FileOperationUtils.FileType.INDEX_DEFINITION;
import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.LOG;
import static
io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE;
@@ -44,7 +44,9 @@ import static
io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TAB
import static
io.trino.plugin.hudi.util.FileOperationUtils.FileType.TABLE_PROPERTIES;
import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.TIMELINE;
import static io.trino.testing.MultisetAssertions.assertMultisetsEqual;
+import static io.trino.testing.assertions.Assert.assertEventually;
import static java.util.stream.Collectors.toCollection;
+import static org.assertj.core.api.Assertions.assertThat;
@ResourceLock("HUDI_CACHE_SYSTEM")
@Execution(ExecutionMode.SAME_THREAD)
@@ -66,10 +68,11 @@ public class TestHudiAlluxioCacheFileOperations
.put("fs.cache.directories",
cacheDirectory.toAbsolutePath().toString())
.put("fs.cache.max-sizes", "100MB")
.put("hudi.metadata.cache.enabled", "false")
- // Disable async table-statistics refresh: it reads the
metadata table on a
- // background executor whose spans can outlive the query and
leak into the next
- // test's measurement (the symmetric off-by-N flake).
Disabling it makes the
- // file-operation counts deterministic right after the query
returns.
+ // Disable the async table-statistics refresh: on the first
query it reads the index
+ // definitions and table-property files (and the metadata
table) on a background
+ // executor. Those non-metadata-table reads land in the
asserted set and their timing
+ // is non-deterministic, so we turn the refresh off and assert
only the synchronous
+ // planning-path reads.
.put("hudi.table-statistics-enabled", "false")
.buildOrThrow();
@@ -87,12 +90,6 @@ public class TestHudiAlluxioCacheFileOperations
assertFileSystemAccesses(
query,
ImmutableMultiset.<FileOperation>builder()
- .addCopies(new FileOperation("Alluxio.readCached",
DATA), 2)
- .addCopies(new FileOperation("Alluxio.readCached",
METADATA_TABLE), 27)
- .addCopies(new FileOperation("Alluxio.readCached",
TIMELINE), 4)
- .addCopies(new FileOperation("Alluxio.readCached",
LOG), 15)
- .addCopies(new FileOperation("InputFile.lastModified",
METADATA_TABLE), 4)
- .addCopies(new FileOperation("InputFile.length",
METADATA_TABLE), 10)
.addCopies(new FileOperation("InputFile.length",
TIMELINE), 2)
.addCopies(new FileOperation("InputFile.length", LOG),
1)
.addCopies(new FileOperation("InputFile.newStream",
INDEX_DEFINITION), 2)
@@ -103,12 +100,6 @@ public class TestHudiAlluxioCacheFileOperations
assertFileSystemAccesses(
query,
ImmutableMultiset.<FileOperation>builder()
- .addCopies(new FileOperation("Alluxio.readCached",
DATA), 2)
- .addCopies(new FileOperation("Alluxio.readCached",
METADATA_TABLE), 27)
- .addCopies(new FileOperation("Alluxio.readCached",
TIMELINE), 4)
- .addCopies(new FileOperation("Alluxio.readCached",
LOG), 15)
- .addCopies(new FileOperation("InputFile.lastModified",
METADATA_TABLE), 4)
- .addCopies(new FileOperation("InputFile.length",
METADATA_TABLE), 10)
.addCopies(new FileOperation("InputFile.length",
TIMELINE), 2)
.addCopies(new FileOperation("InputFile.length", LOG),
1)
.addCopies(new FileOperation("InputFile.newStream",
INDEX_DEFINITION), 2)
@@ -127,12 +118,6 @@ public class TestHudiAlluxioCacheFileOperations
assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperation>builder()
- .addCopies(new FileOperation("Alluxio.readCached",
DATA), 6)
- .addCopies(new FileOperation("Alluxio.readCached",
METADATA_TABLE), 215)
- .addCopies(new FileOperation("Alluxio.readCached",
TIMELINE), 8)
- .addCopies(new FileOperation("Alluxio.readCached",
LOG), 30)
- .addCopies(new FileOperation("InputFile.lastModified",
METADATA_TABLE), 29)
- .addCopies(new FileOperation("InputFile.length",
METADATA_TABLE), 69)
.addCopies(new FileOperation("InputFile.length",
TIMELINE), 4)
.addCopies(new FileOperation("InputFile.length", LOG),
2)
.addCopies(new FileOperation("InputFile.newStream",
INDEX_DEFINITION), 4)
@@ -142,12 +127,6 @@ public class TestHudiAlluxioCacheFileOperations
assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperation>builder()
- .addCopies(new FileOperation("Alluxio.readCached",
DATA), 6)
- .addCopies(new FileOperation("Alluxio.readCached",
METADATA_TABLE), 215)
- .addCopies(new FileOperation("Alluxio.readCached",
TIMELINE), 8)
- .addCopies(new FileOperation("Alluxio.readCached",
LOG), 30)
- .addCopies(new FileOperation("InputFile.lastModified",
METADATA_TABLE), 29)
- .addCopies(new FileOperation("InputFile.length",
METADATA_TABLE), 69)
.addCopies(new FileOperation("InputFile.length",
TIMELINE), 4)
.addCopies(new FileOperation("InputFile.length", LOG),
2)
.addCopies(new FileOperation("InputFile.newStream",
INDEX_DEFINITION), 4)
@@ -156,6 +135,40 @@ public class TestHudiAlluxioCacheFileOperations
.build());
}
+ @Test
+ public void testReadsServedFromAlluxioCache()
+ {
+ // The tests above intentionally do not assert exact Alluxio cache
hit/miss counts: the cache
+ // write is asynchronous, so the per-query counts flake (a write from
one query can still be in
+ // flight when the next query runs). This test instead gives
count-independent coverage that the
+ // Alluxio cache is actually engaged: once the cache is warmed, at
least one read is served from
+ // it (an "Alluxio.readCached" span). assertEventually re-runs the
query until the asynchronous
+ // cache write has landed and a genuine hit is observed, and fails
loudly at the deadline if the
+ // cache never serves a read.
+ @Language("SQL") String query = "SELECT * FROM " +
HUDI_MULTI_FG_PT_V8_MOR;
+ DistributedQueryRunner queryRunner = getDistributedQueryRunner();
+
+ // Warm the cache; the page write into Alluxio happens on a background
thread.
+ queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
+
+ assertEventually(
+ Duration.valueOf("30s"),
+ Duration.valueOf("500ms"),
+ () -> {
+
queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query);
+ assertThat(countCachedReads(queryRunner))
+ .as("Alluxio.readCached spans (cache hits)")
+ .isGreaterThanOrEqualTo(1);
+ });
+ }
+
+ private static long countCachedReads(QueryRunner queryRunner)
+ {
+ return getCacheOperationSpans(queryRunner).stream()
+ .filter(span -> span.getName().equals("Alluxio.readCached"))
+ .count();
+ }
+
private void assertFileSystemAccesses(@Language("SQL") String query,
Multiset<FileOperation> expectedCacheAccesses)
{
DistributedQueryRunner queryRunner = getDistributedQueryRunner();
@@ -169,6 +182,14 @@ public class TestHudiAlluxioCacheFileOperations
.stream()
.filter(span -> !span.getName().startsWith("InputFile.exists"))
.map(FileOperation::create)
+ // Metadata-table reads are issued from Hudi background pools
(split loading, partition
+ // listing, table-statistics refresh) whose spans can outlive
the synchronous query and
+ // land in the next query's measurement window, so their
per-query counts are not
+ // deterministic. Alluxio cache hits/misses (Alluxio.*) depend
on whether an earlier
+ // asynchronous cache write had already completed, so their
counts are not deterministic
+ // either. Both are excluded; only synchronous foreground
reads are asserted.
+ .filter(operation -> operation.fileType() != METADATA_TABLE)
+ .filter(operation ->
!operation.operationType().startsWith("Alluxio."))
.collect(toCollection(HashMultiset::create));
}
}
diff --git
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java
index 61867b1cde7d..8e1b2fe81911 100644
---
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java
+++
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java
@@ -56,10 +56,11 @@ public class TestHudiMemoryCacheFileOperations
.put("hudi.metadata-enabled", "true")
.put("hudi.metadata.cache.enabled", "true")
.put("fs.cache.enabled", "false")
- // Disable async table-statistics refresh: it reads the
metadata table on a
- // background executor whose spans can outlive the query and
leak into the next
- // test's measurement (the symmetric off-by-N flake).
Disabling it makes the
- // file-operation counts deterministic right after the query
returns.
+ // Disable the async table-statistics refresh: on the first
query it reads the index
+ // definitions and table-property files (and the metadata
table) on a background
+ // executor. Those non-metadata-table reads land in the
asserted set and their timing
+ // is non-deterministic, so we turn the refresh off and assert
only the synchronous
+ // planning-path reads.
.put("hudi.table-statistics-enabled", "false")
.buildOrThrow();
@@ -78,11 +79,8 @@ public class TestHudiMemoryCacheFileOperations
query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new
FileOperation("FileSystemCache.cacheInput", DATA), 2)
- .addCopies(new
FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 4)
- .addCopies(new
FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 6)
.addCopies(new
FileOperation("FileSystemCache.cacheStream", TIMELINE), 2)
.addCopies(new
FileOperation("FileSystemCache.cacheStream", LOG), 1)
- .addCopies(new FileOperation("InputFile.lastModified",
METADATA_TABLE), 4)
.addCopies(new FileOperation("InputFile.newStream",
INDEX_DEFINITION), 2)
.add(new FileOperation("InputFile.newStream",
METADATA_TABLE_PROPERTIES))
.addCopies(new FileOperation("InputFile.newStream",
TABLE_PROPERTIES), 2)
@@ -92,11 +90,8 @@ public class TestHudiMemoryCacheFileOperations
query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new
FileOperation("FileSystemCache.cacheInput", DATA), 2)
- .addCopies(new
FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 4)
- .addCopies(new
FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 6)
.addCopies(new
FileOperation("FileSystemCache.cacheStream", TIMELINE), 2)
.addCopies(new
FileOperation("FileSystemCache.cacheStream", LOG), 1)
- .addCopies(new FileOperation("InputFile.lastModified",
METADATA_TABLE), 4)
.addCopies(new FileOperation("InputFile.newStream",
INDEX_DEFINITION), 2)
.add(new FileOperation("InputFile.newStream",
METADATA_TABLE_PROPERTIES))
.addCopies(new FileOperation("InputFile.newStream",
TABLE_PROPERTIES), 2)
@@ -114,11 +109,8 @@ public class TestHudiMemoryCacheFileOperations
assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new
FileOperation("FileSystemCache.cacheInput", DATA), 6)
- .addCopies(new
FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 29)
- .addCopies(new
FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 40)
.addCopies(new
FileOperation("FileSystemCache.cacheStream", TIMELINE), 4)
.addCopies(new
FileOperation("FileSystemCache.cacheStream", LOG), 2)
- .addCopies(new FileOperation("InputFile.lastModified",
METADATA_TABLE), 29)
.addCopies(new FileOperation("InputFile.newStream",
INDEX_DEFINITION), 4)
.addCopies(new FileOperation("InputFile.newStream",
METADATA_TABLE_PROPERTIES), 2)
.addCopies(new FileOperation("InputFile.newStream",
TABLE_PROPERTIES), 4)
@@ -127,11 +119,8 @@ public class TestHudiMemoryCacheFileOperations
assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new
FileOperation("FileSystemCache.cacheInput", DATA), 6)
- .addCopies(new
FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 29)
- .addCopies(new
FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 40)
.addCopies(new
FileOperation("FileSystemCache.cacheStream", TIMELINE), 4)
.addCopies(new
FileOperation("FileSystemCache.cacheStream", LOG), 2)
- .addCopies(new FileOperation("InputFile.lastModified",
METADATA_TABLE), 29)
.addCopies(new FileOperation("InputFile.newStream",
INDEX_DEFINITION), 4)
.addCopies(new FileOperation("InputFile.newStream",
METADATA_TABLE_PROPERTIES), 2)
.addCopies(new FileOperation("InputFile.newStream",
TABLE_PROPERTIES), 4)
@@ -153,6 +142,11 @@ public class TestHudiMemoryCacheFileOperations
.filter(span -> !span.getName().startsWith("InputFile.exists"))
.filter(span ->
!isTrinoSchemaOrPermissions(getFileLocation(span)))
.map(FileOperation::create)
+ // Metadata-table reads are issued from Hudi background pools
(split loading, partition
+ // listing, table-statistics refresh) whose spans can outlive
the synchronous query and
+ // land in the next query's measurement window. Their
per-query counts are therefore
+ // non-deterministic, so they are excluded; only synchronous
foreground reads are asserted.
+ .filter(operation -> operation.fileType() != METADATA_TABLE)
.collect(toCollection(HashMultiset::create));
}
}
diff --git
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java
index 71518b9fc67b..55071241fff7 100644
---
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java
+++
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java
@@ -56,10 +56,11 @@ public class TestHudiNoCacheFileOperations
.put("hudi.metadata-enabled", "true")
.put("hudi.metadata.cache.enabled", "false")
.put("fs.cache.enabled", "false")
- // Disable async table-statistics refresh: it reads the
metadata table on a
- // background executor whose spans can outlive the query and
leak into the next
- // test's measurement (the symmetric off-by-N flake).
Disabling it makes the
- // file-operation counts deterministic right after the query
returns.
+ // Disable the async table-statistics refresh: on the first
query it reads the index
+ // definitions and table-property files (and the metadata
table) on a background
+ // executor. Those non-metadata-table reads land in the
asserted set and their timing
+ // is non-deterministic, so we turn the refresh off and assert
only the synchronous
+ // planning-path reads.
.put("hudi.table-statistics-enabled", "false")
.buildOrThrow();
@@ -78,9 +79,6 @@ public class TestHudiNoCacheFileOperations
query,
ImmutableMultiset.<FileOperationUtils.FileOperation>builder()
.addCopies(new
FileOperationUtils.FileOperation("Input.readTail", DATA), 2)
- .addCopies(new
FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 4)
- .addCopies(new
FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 4)
- .addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 6)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 2)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", TIMELINE), 2)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", LOG), 1)
@@ -92,10 +90,7 @@ public class TestHudiNoCacheFileOperations
query,
ImmutableMultiset.<FileOperationUtils.FileOperation>builder()
.addCopies(new
FileOperationUtils.FileOperation("Input.readTail", DATA), 2)
- .addCopies(new
FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 4)
- .addCopies(new
FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 4)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 2)
- .addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 6)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", TIMELINE), 2)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", LOG), 1)
.add(new
FileOperationUtils.FileOperation("InputFile.newStream",
METADATA_TABLE_PROPERTIES))
@@ -114,10 +109,7 @@ public class TestHudiNoCacheFileOperations
assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperationUtils.FileOperation>builder()
.addCopies(new
FileOperationUtils.FileOperation("Input.readTail", DATA), 6)
- .addCopies(new
FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 29)
- .addCopies(new
FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 29)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 4)
- .addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 40)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream",
METADATA_TABLE_PROPERTIES), 2)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", TIMELINE), 4)
@@ -127,10 +119,7 @@ public class TestHudiNoCacheFileOperations
assertFileSystemAccesses(query,
ImmutableMultiset.<FileOperationUtils.FileOperation>builder()
.addCopies(new
FileOperationUtils.FileOperation("Input.readTail", DATA), 6)
- .addCopies(new
FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 29)
- .addCopies(new
FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 29)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 4)
- .addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 40)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream",
METADATA_TABLE_PROPERTIES), 2)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4)
.addCopies(new
FileOperationUtils.FileOperation("InputFile.newStream", TIMELINE), 4)
@@ -153,6 +142,11 @@ public class TestHudiNoCacheFileOperations
.filter(span -> !span.getName().startsWith("InputFile.exists"))
.filter(span ->
!isTrinoSchemaOrPermissions(getFileLocation(span)))
.map(FileOperationUtils.FileOperation::create)
+ // Metadata-table reads are issued from Hudi background pools
(split loading, partition
+ // listing, table-statistics refresh) whose spans can outlive
the synchronous query and
+ // land in the next query's measurement window. Their
per-query counts are therefore
+ // non-deterministic, so they are excluded; only synchronous
foreground reads are asserted.
+ .filter(operation -> operation.fileType() != METADATA_TABLE)
.collect(toCollection(HashMultiset::create));
}
}