This is an automated email from the ASF dual-hosted git repository. voonhous pushed a commit to branch bump-trino-plugin-to-1.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 8441f80e4b295123c0fd0ab706d4ae8ef1098fb6 Author: voon <[email protected]> AuthorDate: Wed May 13 12:17:55 2026 +0800 chore: Bump hudi-trino-plugin to 1.1.0 --- hudi-trino-plugin/pom.xml | 25 ++- .../java/io/trino/plugin/hudi/HudiMetadata.java | 2 + .../java/io/trino/plugin/hudi/HudiPageSource.java | 76 ++++++--- .../trino/plugin/hudi/HudiPageSourceProvider.java | 37 ++--- .../java/io/trino/plugin/hudi/HudiSplitSource.java | 8 +- .../java/io/trino/plugin/hudi/HudiTableHandle.java | 15 +- .../main/java/io/trino/plugin/hudi/HudiUtil.java | 121 +++++++++++--- .../plugin/hudi/io/HudiTrinoFileReaderFactory.java | 25 ++- .../hudi/io/InlineSeekableDataInputStream.java | 2 +- .../query/index/HudiColumnStatsIndexSupport.java | 10 +- .../index/HudiPartitionStatsIndexSupport.java | 9 +- .../query/index/HudiRecordLevelIndexSupport.java | 4 +- .../query/index/HudiSecondaryIndexSupport.java | 4 +- .../plugin/hudi/reader/HudiTrinoReaderContext.java | 126 ++++---------- .../trino/plugin/hudi/reader/HudiTrinoRecord.java | 183 --------------------- .../plugin/hudi/stats/TableMetadataReader.java | 86 +++------- .../plugin/hudi/stats/TableStatisticsReader.java | 4 +- .../hudi/TestHudiAlluxioCacheFileOperations.java | 78 +++------ .../hudi/TestHudiMemoryCacheFileOperations.java | 77 +++------ .../plugin/hudi/TestHudiNoCacheFileOperations.java | 77 +++------ .../io/trino/plugin/hudi/TestHudiSmokeTest.java | 2 +- .../plugin/hudi/split/TestHudiSplitFactory.java | 1 + .../hudi/testing/TpchHudiTablesInitializer.java | 10 +- .../plugin/hudi/util/FileOperationAssertions.java | 143 ++++++++++++++++ 24 files changed, 524 insertions(+), 601 deletions(-) diff --git a/hudi-trino-plugin/pom.xml b/hudi-trino-plugin/pom.xml index c67ab6a07f51..5809c60e98c5 100644 --- a/hudi-trino-plugin/pom.xml +++ b/hudi-trino-plugin/pom.xml @@ -16,13 +16,15 @@ <properties> <air.compiler.fail-warnings>true</air.compiler.fail-warnings> - <dep.hudi.version>1.0.2</dep.hudi.version> + <dep.hudi.version>1.1.0</dep.hudi.version> <trino.parquet.version>1.15.2</trino.parquet.version> </properties> <dependencies> <dependency> - <!--Used to test execution in task executor after de-serializing--> + <!-- Required for compilation: HoodieRecord implements KryoSerializable, + so the compiler needs Kryo on classpath to verify the class hierarchy + when loading HoodieRecord for static imports --> <groupId>com.esotericsoftware</groupId> <artifactId>kryo</artifactId> <version>4.0.2</version> @@ -169,6 +171,7 @@ <groupId>org.apache.hudi</groupId> <artifactId>hudi-io</artifactId> <version>${dep.hudi.version}</version> + <classifier>shaded</classifier> <exclusions> <exclusion> <groupId>com.google.protobuf</groupId> @@ -470,13 +473,31 @@ <!-- org.apache.hudi:hudi-client-common and org.apache.hudi:hudi-java-client log4j.properties duplicates --> <ignoredResourcePattern>log4j.properties</ignoredResourcePattern> <ignoredResourcePattern>log4j-surefire.properties</ignoredResourcePattern> + <ignoredResource>google/protobuf/.*</ignoredResource> </ignoredResourcePatterns> + <ignoredClassPatterns> + <!-- org.apache.hudi:hudi-hadoop-common bundles parquet classes that conflict with org.apache.parquet:parquet-common --> + <!-- ParquetConfiguration class is shaded/bundled inside the hudi-hadoop-common JAR, not a transitive dependency, so Maven exclusions won't work --> + <ignoredClassPattern>org.apache.parquet.conf.ParquetConfiguration</ignoredClassPattern> + </ignoredClassPatterns> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <configuration> + <ignoredUnusedDeclaredDependencies> + <!-- Kryo is not directly used but required for compilation: + HoodieRecord implements KryoSerializable, so the compiler + needs Kryo to verify the class hierarchy --> + <ignoredUnusedDeclaredDependency>com.esotericsoftware:kryo</ignoredUnusedDeclaredDependency> + </ignoredUnusedDeclaredDependencies> + </configuration> + </plugin> </plugins> </build> </project> diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java index 917c94ea43d9..2613be54ff09 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiMetadata.java @@ -91,6 +91,7 @@ import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY; import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY; import static io.trino.plugin.hudi.HudiUtil.buildTableMetaClient; import static io.trino.plugin.hudi.HudiUtil.getLatestTableSchema; +import static io.trino.plugin.hudi.HudiUtil.getOrderingColumnHandles; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.StandardErrorCode.QUERY_REJECTED; import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE; @@ -173,6 +174,7 @@ public class HudiMetadata table.getStorage().getLocation(), hoodieTableType, getPartitionKeyColumnHandles(table, typeManager), + Lazy.lazily(() -> getOrderingColumnHandles(table, typeManager, lazyMetaClient, NANOSECONDS)), ImmutableSet.of(), TupleDomain.all(), TupleDomain.all(), diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSource.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSource.java index e81887414425..5186341b0a50 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSource.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSource.java @@ -23,6 +23,7 @@ import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.metrics.Metrics; import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.common.table.read.HoodieFileGroupReader; +import org.apache.hudi.common.util.collection.ClosableIterator; import java.io.IOException; import java.util.List; @@ -41,6 +42,7 @@ public class HudiPageSource PageBuilder pageBuilder; HudiAvroSerializer avroSerializer; List<HiveColumnHandle> columnHandles; + ClosableIterator<IndexedRecord> recordIterator; public HudiPageSource( ConnectorPageSource pageSource, @@ -51,11 +53,29 @@ public class HudiPageSource { this.pageSource = pageSource; this.fileGroupReader = fileGroupReader; - this.initFileGroupReader(); this.readerContext = readerContext; this.columnHandles = columnHandles; this.pageBuilder = new PageBuilder(columnHandles.stream().map(HiveColumnHandle::getType).toList()); this.avroSerializer = new HudiAvroSerializer(columnHandles, synthesizedColumnHandler); + try { + this.recordIterator = fileGroupReader.getClosableIterator(); + } + catch (IOException e) { + // Clean up resources on initialization failure + try { + fileGroupReader.close(); + } + catch (IOException closeException) { + e.addSuppressed(closeException); + } + try { + pageSource.close(); + } + catch (IOException closeException) { + e.addSuppressed(closeException); + } + throw new RuntimeException("Failed to initialize file group reader!", e); + } } @Override @@ -79,25 +99,15 @@ public class HudiPageSource @Override public boolean isFinished() { - try { - return !fileGroupReader.hasNext(); - } - catch (IOException e) { - throw new RuntimeException(e); - } + return !recordIterator.hasNext(); } @Override public Page getNextPage() { checkState(pageBuilder.isEmpty(), "PageBuilder is not empty at the beginning of a new page"); - try { - while (fileGroupReader.hasNext()) { - avroSerializer.buildRecordInPage(pageBuilder, fileGroupReader.next()); - } - } - catch (IOException e) { - throw new RuntimeException(e); + while (recordIterator.hasNext()) { + avroSerializer.buildRecordInPage(pageBuilder, recordIterator.next()); } Page newPage = pageBuilder.build(); @@ -115,8 +125,32 @@ public class HudiPageSource public void close() throws IOException { - fileGroupReader.close(); - pageSource.close(); + IOException closeException = null; + + recordIterator.close(); + + try { + fileGroupReader.close(); + } + catch (IOException e) { + closeException = e; + } + + try { + pageSource.close(); + } + catch (IOException e) { + if (closeException == null) { + closeException = e; + } + else { + closeException.addSuppressed(e); + } + } + + if (closeException != null) { + throw closeException; + } } @Override @@ -130,14 +164,4 @@ public class HudiPageSource { return pageSource.getMetrics(); } - - protected void initFileGroupReader() - { - try { - this.fileGroupReader.initRecordIterators(); - } - catch (IOException e) { - throw new RuntimeException("Failed to initialize file group reader!", e); - } - } } diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java index 9b8411fdf907..1553b8389a0b 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiPageSourceProvider.java @@ -38,8 +38,6 @@ import io.trino.plugin.hive.ReaderColumns; import io.trino.plugin.hive.parquet.ParquetReaderConfig; import io.trino.plugin.hudi.file.HudiBaseFile; import io.trino.plugin.hudi.reader.HudiTrinoReaderContext; -import io.trino.plugin.hudi.storage.HudiTrinoStorage; -import io.trino.plugin.hudi.storage.TrinoStorageConfiguration; import io.trino.plugin.hudi.util.SynthesizedColumnHandler; import io.trino.spi.TrinoException; import io.trino.spi.connector.ColumnHandle; @@ -57,7 +55,6 @@ import org.apache.avro.generic.IndexedRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.read.HoodieFileGroupReader; -import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.storage.StoragePath; import org.apache.parquet.column.ColumnDescriptor; @@ -104,7 +101,7 @@ import static io.trino.plugin.hudi.HudiUtil.buildTableMetaClient; import static io.trino.plugin.hudi.HudiUtil.constructSchema; import static io.trino.plugin.hudi.HudiUtil.convertToFileSlice; import static io.trino.plugin.hudi.HudiUtil.getLatestTableSchema; -import static io.trino.plugin.hudi.HudiUtil.prependHudiMetaColumns; +import static io.trino.plugin.hudi.HudiUtil.prependHudiMetaAndOrderingColumns; import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toUnmodifiableList; @@ -184,7 +181,7 @@ public class HudiPageSourceProvider // The `columns` list could be empty when count(*) is issued, // prepending hoodie meta columns for Hudi split with log files // to allow a non-empty dataPageSource to be returned - List<HiveColumnHandle> hudiMetaAndDataColumnHandles = prependHudiMetaColumns(dataColumnHandles); + List<HiveColumnHandle> hudiMetaAndDataColumnHandles = prependHudiMetaAndOrderingColumns(hudiTableHandle, dataColumnHandles); TrinoFileSystem fileSystem = fileSystemFactory.create(session); ConnectorPageSource dataPageSource = createPageSource( @@ -219,8 +216,9 @@ public class HudiPageSourceProvider // TODO: Move this into HudiTableHandle HoodieTableMetaClient metaClient = buildTableMetaClient( fileSystemFactory.create(session), hudiTableHandle.getSchemaTableName().toString(), hudiTableHandle.getBasePath()); - HudiTrinoReaderContext readerContext = new HudiTrinoReaderContext( + metaClient.getStorageConf(), + metaClient.getTableConfig(), dataPageSource, dataColumnHandles, hudiMetaAndDataColumnHandles, @@ -232,21 +230,18 @@ public class HudiPageSourceProvider // Construct an Avro schema for log file reader Schema requestedSchema = constructSchema(dataSchema, hudiMetaAndDataColumnHandles.stream().map(HiveColumnHandle::getName).toList()); HoodieFileGroupReader<IndexedRecord> fileGroupReader = - new HoodieFileGroupReader<>( - readerContext, - new HudiTrinoStorage(fileSystemFactory.create(session), new TrinoStorageConfiguration()), - hudiTableHandle.getBasePath(), - hudiTableHandle.getLatestCommitTime(), - convertToFileSlice(hudiSplit, hudiTableHandle.getBasePath()), - dataSchema, - requestedSchema, - Option.empty(), - metaClient, - metaClient.getTableConfig().getProps(), - start, - length, - false); - + HoodieFileGroupReader.<IndexedRecord>newBuilder() + .withReaderContext(readerContext) + .withHoodieTableMetaClient(metaClient) + .withFileSlice(convertToFileSlice(hudiSplit, hudiTableHandle.getBasePath())) + .withDataSchema(dataSchema) + .withRequestedSchema(requestedSchema) + .withLatestCommitTime(hudiTableHandle.getLatestCommitTime()) + .withProps(metaClient.getTableConfig().getProps()) + .withShouldUseRecordPosition(false) + .withStart(start) + .withLength(length) + .build(); return new HudiPageSource( dataPageSource, fileGroupReader, diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java index 278ca2e463c7..6ea49d9baaf8 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiSplitSource.java @@ -44,6 +44,8 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.metadata.FileSystemBackedTableMetadata; +import org.apache.hudi.metadata.HoodieBackedTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.util.Lazy; @@ -104,9 +106,9 @@ public class HudiSplitSource HoodieTableMetaClient metaClient = tableHandle.getMetaClient(); HoodieEngineContext engineContext = new HoodieLocalEngineContext(metaClient.getStorage().getConf()); - HoodieTableMetadata tableMetadata = HoodieTableMetadata.create( - engineContext, - tableHandle.getMetaClient().getStorage(), metadataConfig, metaClient.getBasePath().toString(), true); + HoodieTableMetadata tableMetadata = enableMetadataTable && tableHandle.getMetaClient().getTableConfig().isMetadataTableAvailable() ? + new HoodieBackedTableMetadata(engineContext, tableHandle.getMetaClient().getStorage(), metadataConfig, metaClient.getBasePath().toString(), true) : + new FileSystemBackedTableMetadata(engineContext, tableHandle.getMetaClient().getStorage(), metaClient.getBasePath().toString()); log.info("Loaded table metadata for table: %s in %s ms", tableHandle.getSchemaTableName(), timer.endTimer()); return tableMetadata; }); diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java index 75ae962286a4..f9f828bcca55 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java @@ -49,6 +49,7 @@ public class HudiTableHandle private final String basePath; private final HoodieTableType tableType; private final List<HiveColumnHandle> partitionColumns; + private final Lazy<List<HiveColumnHandle>> lazyOrderingColumns; // Used only for validation when config property hudi.query-partition-filter-required is enabled private final Set<HiveColumnHandle> constraintColumns; private final TupleDomain<HiveColumnHandle> partitionPredicates; @@ -66,12 +67,13 @@ public class HudiTableHandle @JsonProperty("basePath") String basePath, @JsonProperty("tableType") HoodieTableType tableType, @JsonProperty("partitionColumns") List<HiveColumnHandle> partitionColumns, + @JsonProperty("orderingColumns") List<HiveColumnHandle> orderingColumns, @JsonProperty("partitionPredicates") TupleDomain<HiveColumnHandle> partitionPredicates, @JsonProperty("regularPredicates") TupleDomain<HiveColumnHandle> regularPredicates, @JsonProperty("tableSchemaStr") String tableSchemaStr, @JsonProperty("latestCommitTime") String latestCommitTime) { - this(Optional.empty(), Optional.empty(), schemaName, tableName, basePath, tableType, partitionColumns, ImmutableSet.of(), + this(Optional.empty(), Optional.empty(), schemaName, tableName, basePath, tableType, partitionColumns, Lazy.lazily(() -> orderingColumns), ImmutableSet.of(), partitionPredicates, regularPredicates, buildTableSchema(tableSchemaStr), () -> latestCommitTime); } @@ -83,6 +85,7 @@ public class HudiTableHandle String basePath, HoodieTableType tableType, List<HiveColumnHandle> partitionColumns, + Lazy<List<HiveColumnHandle>> lazyOrderingColumns, Set<HiveColumnHandle> constraintColumns, TupleDomain<HiveColumnHandle> partitionPredicates, TupleDomain<HiveColumnHandle> regularPredicates, @@ -96,6 +99,7 @@ public class HudiTableHandle basePath, tableType, partitionColumns, + lazyOrderingColumns, constraintColumns, partitionPredicates, regularPredicates, @@ -120,6 +124,7 @@ public class HudiTableHandle String basePath, HoodieTableType tableType, List<HiveColumnHandle> partitionColumns, + Lazy<List<HiveColumnHandle>> lazyOrderingColumns, Set<HiveColumnHandle> constraintColumns, TupleDomain<HiveColumnHandle> partitionPredicates, TupleDomain<HiveColumnHandle> regularPredicates, @@ -133,6 +138,7 @@ public class HudiTableHandle this.basePath = requireNonNull(basePath, "basePath is null"); this.tableType = requireNonNull(tableType, "tableType is null"); this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null"); + this.lazyOrderingColumns = requireNonNull(lazyOrderingColumns, "lazyOrderingColumns is null"); this.constraintColumns = requireNonNull(constraintColumns, "constraintColumns is null"); this.partitionPredicates = requireNonNull(partitionPredicates, "partitionPredicates is null"); this.regularPredicates = requireNonNull(regularPredicates, "regularPredicates is null"); @@ -248,6 +254,12 @@ public class HudiTableHandle return regularPredicates; } + @JsonProperty + public List<HiveColumnHandle> getOrderingColumns() + { + return lazyOrderingColumns.get(); + } + public SchemaTableName getSchemaTableName() { return schemaTableName(schemaName, tableName); @@ -266,6 +278,7 @@ public class HudiTableHandle basePath, tableType, partitionColumns, + lazyOrderingColumns, constraintColumns, partitionPredicates.intersect(partitionTupleDomain), regularPredicates.intersect(regularTupleDomain), diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java index 8eb10de4cc4c..52e594bbc4be 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/HudiUtil.java @@ -22,10 +22,13 @@ import io.trino.cache.EvictableCacheBuilder; import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; +import io.trino.metastore.Column; import io.trino.metastore.HivePartition; import io.trino.metastore.HiveType; +import io.trino.metastore.Table; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.HivePartitionKey; +import io.trino.plugin.hive.HiveTimestampPrecision; import io.trino.plugin.hive.avro.AvroHiveFileUtils; import io.trino.plugin.hudi.storage.HudiTrinoStorage; import io.trino.plugin.hudi.storage.TrinoStorageConfiguration; @@ -35,9 +38,12 @@ import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.NullableValue; import io.trino.spi.predicate.TupleDomain; +import io.trino.spi.type.TypeManager; import io.trino.spi.type.VarcharType; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; +import org.apache.hudi.common.config.RecordMergeMode; +import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; @@ -47,18 +53,25 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.TableNotFoundException; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.util.Lazy; import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -66,7 +79,11 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static io.airlift.slice.SizeOf.estimatedSizeOf; +import static io.trino.plugin.hive.HiveColumnHandle.ColumnType.REGULAR; +import static io.trino.plugin.hive.HiveColumnHandle.createBaseColumn; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_METADATA; +import static io.trino.plugin.hive.util.HiveTypeUtil.getType; +import static io.trino.plugin.hive.util.HiveTypeUtil.typeSupported; import static io.trino.plugin.hive.util.HiveUtil.checkCondition; import static io.trino.plugin.hive.util.HiveUtil.parsePartitionValue; import static io.trino.plugin.hive.util.SerdeConstants.LIST_COLUMNS; @@ -77,11 +94,14 @@ import static io.trino.plugin.hudi.HudiErrorCode.HUDI_META_CLIENT_ERROR; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_SCHEMA_ERROR; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNSUPPORTED_FILE_FORMAT; import static java.lang.Math.toIntExact; +import static org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD; import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD; -import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_META_FIELD_ORD; public final class HudiUtil { + public static final List<String> HOODIE_META_COLUMNS = + CollectionUtils.createImmutableList(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD); + private static final Logger log = Logger.get(HudiUtil.class); private static final Cache<Schema, Map<String, Schema.Field>> SCHEMA_FIELD_CACHE = EvictableCacheBuilder.newBuilder() @@ -291,7 +311,6 @@ public final class HudiUtil * <li>First, attempts an exact match on the column name.</li> * <li>If not found, falls back to a case-insensitive match using a cached lookup table</li> * </ul> - * <p> * * @param columnName Column name to search for. * @param schema Avro {@link Schema} in which to search. @@ -321,29 +340,33 @@ public final class HudiUtil "Failed to get column " + columnName + " from table schema"); } - public static List<HiveColumnHandle> prependHudiMetaColumns(List<HiveColumnHandle> dataColumns) + public static List<HiveColumnHandle> prependHudiMetaAndOrderingColumns(HudiTableHandle tableHandle, List<HiveColumnHandle> dataColumns) { - //For efficient lookup - Set<String> dataColumnNames = dataColumns.stream() + Set<String> existingColumns = dataColumns.stream() .map(HiveColumnHandle::getName) - .collect(Collectors.toSet()); - - // If Hudi record key meta column is already present, return the original list - if (dataColumnNames.contains(RECORD_KEY_METADATA_FIELD)) { - return dataColumns; - } + .collect(Collectors.toCollection(HashSet::new)); List<HiveColumnHandle> columns = new ArrayList<>(); - // Create and prepend the new HiveColumnHandle for the record key column - columns.add(new HiveColumnHandle( - RECORD_KEY_METADATA_FIELD, - RECORD_KEY_META_FIELD_ORD, - HiveType.HIVE_STRING, - VarcharType.VARCHAR, - Optional.empty(), - HiveColumnHandle.ColumnType.REGULAR, - Optional.empty())); + // Add missing Hudi meta columns first + for (int i = 0; i < HOODIE_META_COLUMNS.size(); i++) { + String metaColumn = HOODIE_META_COLUMNS.get(i); + if (existingColumns.add(metaColumn)) { // add() returns false if already present + columns.add(new HiveColumnHandle( + metaColumn, + i, + HiveType.HIVE_STRING, + VarcharType.VARCHAR, + Optional.empty(), + HiveColumnHandle.ColumnType.REGULAR, + Optional.empty())); + } + } + + // Add missing ordering columns next + tableHandle.getOrderingColumns().stream() + .filter(col -> existingColumns.add(col.getName())) + .forEach(columns::add); // Add all the original data columns after the new meta columns columns.addAll(dataColumns); @@ -389,4 +412,62 @@ public final class HudiUtil throw new TrinoException(HUDI_FILESYSTEM_ERROR, e); } } + + public static List<HiveColumnHandle> getOrderingColumnHandles(Table table, TypeManager typeManager, Lazy<HoodieTableMetaClient> lazyMetaClient, HiveTimestampPrecision timestampPrecision) + { + RecordMergeMode recordMergeMode = lazyMetaClient.get().getTableConfig().getRecordMergeMode(); + if (Objects.isNull(recordMergeMode) || recordMergeMode.equals(RecordMergeMode.COMMIT_TIME_ORDERING)) { + // if commit time ordering is enabled, return empty list + return Collections.emptyList(); + } + + ImmutableList.Builder<HiveColumnHandle> columns = ImmutableList.builder(); + List<String> orderingColumnNames = lazyMetaClient.get().getTableConfig().getOrderingFields(); + + int hiveColumnIndex = 0; + for (Column field : table.getDataColumns()) { + // ignore unsupported types rather than failing + if (orderingColumnNames.contains(field.getName())) { + HiveType hiveType = field.getType(); + if (typeSupported(hiveType.getTypeInfo(), table.getStorage().getStorageFormat())) { + columns.add(createBaseColumn(field.getName(), hiveColumnIndex, hiveType, getType(hiveType, typeManager, timestampPrecision), REGULAR, field.getComment())); + } + } + hiveColumnIndex++; + } + + return columns.build(); + } + + /** + * Converts the given {@link HoodiePairData} into a {@link Map}. + * <p> + * Special handling is applied for null keys: + * <ul> + * <li>If a key is null, it is stored in the map as a {@code null} entry.</li> + * <li>If multiple entries share the same key (including null), the latest value overwrites the previous one.</li> + * </ul> + * + * @param pairData the HoodiePairData containing key-value pairs + * @param <K> the type of keys maintained by the resulting map + * @param <V> the type of mapped values + * @return a {@link Map} containing all key-value pairs from the input data + */ + public static <K, V> Map<K, V> collectAsMap(HoodiePairData<K, V> pairData) + { + // Map each pair to (Option<Pair.key>, V) to handle null keys uniformly + // If there are multiple entries sharing the same key, use the incoming one + return pairData.mapToPair(pair -> + Pair.of( + Option.ofNullable(pair.getKey()), + pair.getValue())) + .collectAsList() + .stream() + .collect(HashMap::new, + (map, pair) -> { + K key = pair.getKey().orElse(null); + map.put(key, pair.getValue()); + }, + HashMap::putAll); + } } diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/HudiTrinoFileReaderFactory.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/HudiTrinoFileReaderFactory.java index 7e8bb967ffc7..3571de43d79e 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/HudiTrinoFileReaderFactory.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/HudiTrinoFileReaderFactory.java @@ -16,12 +16,14 @@ package io.trino.plugin.hudi.io; import org.apache.avro.Schema; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.util.Option; +import org.apache.hudi.io.storage.HFileReaderFactory; import org.apache.hudi.io.storage.HoodieAvroBootstrapFileReader; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; import java.io.IOException; @@ -45,7 +47,11 @@ public class HudiTrinoFileReaderFactory Option<Schema> schemaOption) throws IOException { - return new HoodieNativeAvroHFileReader(storage, path, schemaOption); + HFileReaderFactory readerFactory = HFileReaderFactory.builder() + .withStorage(storage).withProps(hoodieConfig.getProps()) + .withPath(path).build(); + return HoodieNativeAvroHFileReader.builder() + .readerFactory(readerFactory).path(path).schema(schemaOption).build(); } @Override @@ -56,7 +62,22 @@ public class HudiTrinoFileReaderFactory Option<Schema> schemaOption) throws IOException { - return new HoodieNativeAvroHFileReader(this.storage, content, schemaOption); + HFileReaderFactory readerFactory = HFileReaderFactory.builder() + .withStorage(storage).withProps(hoodieConfig.getProps()) + .withContent(content).build(); + return HoodieNativeAvroHFileReader.builder() + .readerFactory(readerFactory).path(path).schema(schemaOption).build(); + } + + @Override + protected HoodieFileReader newHFileFileReader(HoodieConfig hoodieConfig, StoragePathInfo pathInfo, Option<Schema> schemaOption) + throws IOException + { + HFileReaderFactory readerFactory = HFileReaderFactory.builder() + .withStorage(storage).withProps(hoodieConfig.getProps()) + .withPath(pathInfo.getPath()).build(); + return HoodieNativeAvroHFileReader.builder() + .readerFactory(readerFactory).path(pathInfo.getPath()).schema(schemaOption).build(); } @Override diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java index f75ec7a55f90..b67b0012905b 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/io/InlineSeekableDataInputStream.java @@ -26,7 +26,7 @@ import java.io.IOException; * Example InlineFS URL: * <pre> * inlinefs://tests_7af7f087-c807-4f5e-a759-65fd9c21063b/hudi_multi_fg_pt_v8_mor/.hoodie/metadata/column_stats/ - * .col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&length=6959 + * .col-stats-0001-0_20250429145946675.log.1_1-120-382/local/?start_offset=8036&length=6959 * </pre> * <p> * Key behaviors: diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiColumnStatsIndexSupport.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiColumnStatsIndexSupport.java index 0d1c9eaa6cd0..7626272db612 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiColumnStatsIndexSupport.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiColumnStatsIndexSupport.java @@ -30,13 +30,14 @@ import io.trino.spi.type.Type; import io.trino.spi.type.VarcharType; import org.apache.avro.generic.GenericRecord; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.util.HoodieTimer; -import org.apache.hudi.common.util.hash.ColumnIndexID; +import org.apache.hudi.metadata.ColumnStatsIndexPrefixRawKey; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.util.Lazy; @@ -92,9 +93,9 @@ public class HudiColumnStatsIndexSupport } else { // Get filter columns - List<String> encodedTargetColumnNames = regularColumns + List<ColumnStatsIndexPrefixRawKey> rawKeys = regularColumns .stream() - .map(col -> new ColumnIndexID(col).asBase64EncodedString()).collect(Collectors.toList()); + .map(ColumnStatsIndexPrefixRawKey::new).toList(); Map<String, Type> columnTypes = regularColumnPredicates.getDomains().get().entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getType())); @@ -107,7 +108,8 @@ public class HudiColumnStatsIndexSupport } Map<String, Map<String, Domain>> domainsWithStats = - lazyTableMetadata.get().getRecordsByKeyPrefixes(encodedTargetColumnNames, + lazyTableMetadata.get().getRecordsByKeyPrefixes( + HoodieListData.lazy(rawKeys), HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, true) .collectAsList() .stream() diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiPartitionStatsIndexSupport.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiPartitionStatsIndexSupport.java index 3491d10cccfe..7d1e6907d976 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiPartitionStatsIndexSupport.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiPartitionStatsIndexSupport.java @@ -21,10 +21,11 @@ import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.Type; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.util.HoodieTimer; -import org.apache.hudi.common.util.hash.ColumnIndexID; +import org.apache.hudi.metadata.ColumnStatsIndexPrefixRawKey; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.util.Lazy; @@ -66,15 +67,15 @@ public class HudiPartitionStatsIndexSupport List<String> regularColumns = new ArrayList<>(filteredRegularPredicates.getDomains().get().keySet()); // Get columns to filter on - List<String> encodedTargetColumnNames = regularColumns.stream() - .map(col -> new ColumnIndexID(col).asBase64EncodedString()).toList(); + List<ColumnStatsIndexPrefixRawKey> columnStatsIndexPrefixRawKeys = regularColumns.stream() + .map(ColumnStatsIndexPrefixRawKey::new).toList(); Map<String, Type> columnTypes = regularColumnPredicates.getDomains().get().entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getType())); // Map of domains with partition stats keyed by partition name and column name Map<String, Map<String, Domain>> domainsWithStats = lazyMetadataTable.get().getRecordsByKeyPrefixes( - encodedTargetColumnNames, + HoodieListData.eager(columnStatsIndexPrefixRawKeys), HoodieTableMetadataUtil.PARTITION_NAME_PARTITION_STATS, true) .collectAsList() .stream() diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiRecordLevelIndexSupport.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiRecordLevelIndexSupport.java index 78f503f4b21e..e438e5686ebb 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiRecordLevelIndexSupport.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiRecordLevelIndexSupport.java @@ -22,6 +22,7 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.Domain; import io.trino.spi.predicate.TupleDomain; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -47,6 +48,7 @@ import java.util.stream.Collectors; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_BAD_DATA; import static io.trino.plugin.hudi.HudiSessionProperties.getRecordIndexWaitTimeout; +import static io.trino.plugin.hudi.HudiUtil.collectAsMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; public class HudiRecordLevelIndexSupport @@ -94,7 +96,7 @@ public class HudiRecordLevelIndexSupport // Perform index lookup in metadataTable // TODO: document here what this map is keyed by - Map<String, HoodieRecordGlobalLocation> recordIndex = lazyTableMetadata.get().readRecordIndex(recordKeys); + Map<String, HoodieRecordGlobalLocation> recordIndex = collectAsMap(lazyTableMetadata.get().readRecordIndexLocationsWithKeys(HoodieListData.eager(recordKeys))); if (recordIndex.isEmpty()) { log.debug("Record level index lookup took %s ms but returned no locations for the given keys %s for table %s", timer.endTimer(), recordKeys, schemaTableName); diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiSecondaryIndexSupport.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiSecondaryIndexSupport.java index 23121902f72c..eddf4d48f547 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiSecondaryIndexSupport.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/query/index/HudiSecondaryIndexSupport.java @@ -19,10 +19,12 @@ import io.trino.plugin.hudi.util.TupleDomainUtils; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; +import org.apache.hudi.common.data.HoodieListData; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieIndexDefinition; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.HoodieDataUtils; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; @@ -85,7 +87,7 @@ public class HudiSecondaryIndexSupport // Perform index lookup in metadataTable // TODO: document here what this map is keyed by - Map<String, HoodieRecordGlobalLocation> recordKeyLocationsMap = lazyTableMetadata.get().readSecondaryIndex(secondaryKeys, indexName); + Map<String, HoodieRecordGlobalLocation> recordKeyLocationsMap = HoodieDataUtils.dedupeAndCollectAsMap(lazyTableMetadata.get().readSecondaryIndexLocationsWithKeys(HoodieListData.eager(secondaryKeys), indexName)); if (recordKeyLocationsMap.isEmpty()) { log.debug("Took %s ms, but secondary index lookup returned no locations for the given keys for table %s", timer.endTimer(), schemaTableName); // Return all original fileSlices diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java index 36c5342920f4..1f7584c3c168 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoReaderContext.java @@ -19,29 +19,25 @@ import io.trino.plugin.hudi.util.SynthesizedColumnHandler; import io.trino.spi.Page; import io.trino.spi.connector.ConnectorPageSource; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericData.Record; -import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.avro.AvroRecordContext; import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.engine.HoodieReaderContext; -import org.apache.hudi.common.model.HoodieAvroIndexedRecord; -import org.apache.hudi.common.model.HoodieAvroRecordMerger; -import org.apache.hudi.common.model.HoodieEmptyRecord; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodiePreCombineAvroRecordMerger; import org.apache.hudi.common.model.HoodieRecordMerger; -import org.apache.hudi.common.table.read.BufferedRecord; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.UnaryOperator; public class HudiTrinoReaderContext extends HoodieReaderContext<IndexedRecord> @@ -53,11 +49,14 @@ public class HudiTrinoReaderContext List<HiveColumnHandle> columnHandles; public HudiTrinoReaderContext( + StorageConfiguration storageConfiguration, + HoodieTableConfig tableConfig, ConnectorPageSource pageSource, List<HiveColumnHandle> dataHandles, List<HiveColumnHandle> columnHandles, SynthesizedColumnHandler synthesizedColumnHandler) { + super(storageConfiguration, tableConfig, Option.empty(), Option.empty(), new AvroRecordContext(tableConfig, tableConfig.getPayloadClass())); this.pageSource = pageSource; this.avroSerializer = new HudiAvroSerializer(columnHandles, synthesizedColumnHandler); this.dataHandles = dataHandles; @@ -77,6 +76,23 @@ public class HudiTrinoReaderContext Schema dataSchema, Schema requiredSchema, HoodieStorage storage) + { + return createRecordIterator(); + } + + @Override + public ClosableIterator<IndexedRecord> getFileRecordIterator( + StoragePathInfo storagePathInfo, + long start, + long length, + Schema dataSchema, + Schema requiredSchema, + HoodieStorage storage) + { + return createRecordIterator(); + } + + private ClosableIterator<IndexedRecord> createRecordIterator() { return new ClosableIterator<>() { @@ -130,98 +146,14 @@ public class HudiTrinoReaderContext } @Override - public IndexedRecord convertAvroRecord(IndexedRecord record) + protected Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode, String mergeStrategyId, String mergeImplClasses) { - return record; + return Option.of(new HoodiePreCombineAvroRecordMerger()); } @Override - public GenericRecord convertToAvroRecord(IndexedRecord record, Schema schema) - { - GenericRecord ret = new GenericData.Record(schema); - for (Schema.Field field : schema.getFields()) { - ret.put(field.name(), record.get(field.pos())); - } - return ret; - } - - @Override - public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode, String mergeStrategyId, String mergeImplClasses) - { - return Option.of(HoodieAvroRecordMerger.INSTANCE); - } - - @Override - public Object getValue(IndexedRecord record, Schema schema, String fieldName) - { - if (colToPosMap.containsKey(fieldName)) { - return record.get(colToPosMap.get(fieldName)); - } - else { - // record doesn't have the queried field, return null - return null; - } - } - - @Override - public IndexedRecord seal(IndexedRecord record) - { - // TODO: this can rely on colToPos map directly instead of schema - Schema schema = record.getSchema(); - IndexedRecord newRecord = new Record(schema); - List<Schema.Field> fields = schema.getFields(); - for (Schema.Field field : fields) { - int pos = schema.getField(field.name()).pos(); - newRecord.put(pos, record.get(pos)); - } - return newRecord; - } - - @Override - public IndexedRecord toBinaryRow(Schema schema, IndexedRecord record) - { - return record; - } - - @Override - public ClosableIterator<IndexedRecord> mergeBootstrapReaders( - ClosableIterator closableIterator, Schema schema, - ClosableIterator closableIterator1, Schema schema1) + public ClosableIterator<IndexedRecord> mergeBootstrapReaders(ClosableIterator<IndexedRecord> skeletonFileIterator, Schema skeletonRequiredSchema, ClosableIterator<IndexedRecord> dataFileIterator, Schema dataRequiredSchema, List<Pair<String, Object>> requiredPartitionFieldAndValues) { return null; } - - @Override - public UnaryOperator<IndexedRecord> projectRecord( - Schema from, - Schema to, - Map<String, String> renamedColumns) - { - List<Schema.Field> toFields = to.getFields(); - int[] projection = new int[toFields.size()]; - for (int i = 0; i < projection.length; i++) { - projection[i] = from.getField(toFields.get(i).name()).pos(); - } - - return fromRecord -> { - IndexedRecord toRecord = new Record(to); - for (int i = 0; i < projection.length; i++) { - toRecord.put(i, fromRecord.get(projection[i])); - } - return toRecord; - }; - } - - @Override - public HoodieRecord<IndexedRecord> constructHoodieRecord( - BufferedRecord<IndexedRecord> bufferedRecord) - { - if (bufferedRecord.isDelete()) { - return new HoodieEmptyRecord<>( - new HoodieKey(bufferedRecord.getRecordKey(), null), - HoodieRecord.HoodieRecordType.AVRO); - } - - return new HoodieAvroIndexedRecord(bufferedRecord.getRecord()); - } } diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoRecord.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoRecord.java deleted file mode 100644 index 7fa0f71399ed..000000000000 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/reader/HudiTrinoRecord.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.plugin.hudi.reader; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import org.apache.avro.Schema; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hudi.common.model.HoodieAvroIndexedRecord; -import org.apache.hudi.common.model.HoodieKey; -import org.apache.hudi.common.model.HoodieOperation; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.MetadataValues; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.keygen.BaseKeyGenerator; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.Map; -import java.util.Properties; - -public class HudiTrinoRecord - extends HoodieRecord<IndexedRecord> -{ - public HudiTrinoRecord() - { - } - - @Override - public HoodieRecord<IndexedRecord> newInstance() - { - return null; - } - - @Override - public HoodieRecord<IndexedRecord> newInstance(HoodieKey hoodieKey, HoodieOperation hoodieOperation) - { - return null; - } - - @Override - public HoodieRecord<IndexedRecord> newInstance(HoodieKey hoodieKey) - { - return null; - } - - @Override - public Comparable<?> doGetOrderingValue(Schema schema, Properties properties) - { - return null; - } - - @Override - public HoodieRecordType getRecordType() - { - return null; - } - - @Override - public String getRecordKey(Schema schema, Option<BaseKeyGenerator> option) - { - return ""; - } - - @Override - public String getRecordKey(Schema schema, String s) - { - return ""; - } - - @Override - protected void writeRecordPayload(IndexedRecord page, Kryo kryo, Output output) - { - } - - @Override - protected IndexedRecord readRecordPayload(Kryo kryo, Input input) - { - return null; - } - - @Override - public Object[] getColumnValues(Schema schema, String[] strings, boolean b) - { - return new Object[0]; - } - - @Override - public HoodieRecord joinWith(HoodieRecord hoodieRecord, Schema schema) - { - return null; - } - - @Override - public HoodieRecord prependMetaFields(Schema schema, Schema schema1, - MetadataValues metadataValues, Properties properties) - { - return null; - } - - @Override - public HoodieRecord rewriteRecordWithNewSchema(Schema schema, Properties properties, - Schema schema1, Map<String, String> map) - { - return null; - } - - @Override - public boolean isDelete(Schema schema, Properties properties) - throws IOException - { - return false; - } - - @Override - public boolean shouldIgnore(Schema schema, Properties properties) - throws IOException - { - return false; - } - - @Override - public HoodieRecord<IndexedRecord> copy() - { - return null; - } - - @Override - public Option<Map<String, String>> getMetadata() - { - return null; - } - - @Override - public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(Schema schema, Properties properties, - Option<Pair<String, String>> option, Boolean aBoolean, Option<String> option1, - Boolean aBoolean1, Option<Schema> option2) - throws IOException - { - return null; - } - - @Override - public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema schema, Properties properties, - Option<BaseKeyGenerator> option) - { - return null; - } - - @Override - public HoodieRecord truncateRecordKey(Schema schema, Properties properties, String s) - throws IOException - { - return null; - } - - @Override - public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema schema, Properties properties) - throws IOException - { - return null; - } - - @Override - public ByteArrayOutputStream getAvroBytes(Schema schema, Properties properties) - throws IOException - { - return null; - } -} diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java index e87122de0795..a431465d938a 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableMetadataReader.java @@ -16,21 +16,14 @@ package io.trino.plugin.hudi.stats; import org.apache.hudi.avro.model.HoodieMetadataColumnStats; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.model.HoodieColumnRangeMetadata; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.collection.Pair; -import org.apache.hudi.common.util.hash.ColumnIndexID; -import org.apache.hudi.common.util.hash.FileIndexID; -import org.apache.hudi.common.util.hash.PartitionIndexID; import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.metadata.HoodieBackedTableMetadata; -import org.apache.hudi.metadata.HoodieMetadataMetrics; -import org.apache.hudi.metadata.HoodieMetadataPayload; -import org.apache.hudi.metadata.HoodieTableMetadataUtil; -import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.stats.HoodieColumnRangeMetadata; +import org.apache.hudi.stats.ValueMetadata; import org.apache.hudi.storage.HoodieStorage; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -55,60 +48,29 @@ public class TableMetadataReader * @return a map from column name to their corresponding {@link HoodieColumnRangeMetadata} * @throws HoodieMetadataException if an error occurs while fetching the column statistics */ - Map<String, HoodieColumnRangeMetadata> getColumnStats(List<Pair<String, String>> partitionNameFileNameList, List<String> columnNames) + public Map<String, HoodieColumnRangeMetadata> getColumnsRange(List<Pair<String, String>> partitionNameFileNameList, List<String> columnNames) throws HoodieMetadataException { - return computeFileToColumnStatsMap(computeColumnStatsLookupKeys(partitionNameFileNameList, columnNames)); - } - - /** - * @param partitionNameFileNameList a list of partition and file name pairs for which column stats need to be retrieved - * @param columnNames list of column names for which stats are needed - * @return a list of column stats keys to look up in the metadata table col_stats partition. - */ - private List<String> computeColumnStatsLookupKeys( - final List<Pair<String, String>> partitionNameFileNameList, - final List<String> columnNames) - { - return columnNames.stream() - .flatMap(columnName -> partitionNameFileNameList.stream() - .map(partitionNameFileNamePair -> HoodieMetadataPayload.getColumnStatsIndexKey( - new PartitionIndexID(HoodieTableMetadataUtil.getColumnStatsIndexPartitionIdentifier(partitionNameFileNamePair.getLeft())), - new FileIndexID(partitionNameFileNamePair.getRight()), - new ColumnIndexID(columnName)))) - .toList(); - } - - /** - * @param columnStatsLookupKeys a map from column stats key to partition and file name pair - * @return a map from column name to merged HoodieMetadataColumnStats - */ - private Map<String, HoodieColumnRangeMetadata> computeFileToColumnStatsMap(List<String> columnStatsLookupKeys) - { - HoodieTimer timer = HoodieTimer.start(); - Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords = - getRecordsByKeys(columnStatsLookupKeys, MetadataPartitionType.COLUMN_STATS.getPartitionPath()); - metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_COLUMN_STATS_METADATA_STR, timer.endTimer())); - return hoodieRecords.values().stream() - .collect(Collectors.groupingBy( - r -> r.getData().getColumnStatMetadata().get().getColumnName(), - Collectors.mapping(r -> r.getData().getColumnStatMetadata().get(), Collectors.toList()))) + Map<Pair<String, String>, List<HoodieMetadataColumnStats>> columnStatsMap = getColumnStats(partitionNameFileNameList, columnNames); + return columnStatsMap.values().stream().flatMap(Collection::stream).collect(Collectors.groupingBy( + HoodieMetadataColumnStats::getColumnName, + Collectors.mapping(colStats -> colStats, Collectors.toList()))) .entrySet().stream() - .collect(Collectors.toMap( - Map.Entry::getKey, - e -> { - long valueCount = 0L; - long nullCount = 0L; - long totalSize = 0L; - long totalUncompressedSize = 0L; - for (HoodieMetadataColumnStats stats : e.getValue()) { - valueCount += stats.getValueCount(); - nullCount += stats.getNullCount(); - totalSize += stats.getTotalSize(); - totalUncompressedSize += stats.getTotalUncompressedSize(); - } - return HoodieColumnRangeMetadata.create( - "", e.getKey(), null, null, nullCount, valueCount, totalSize, totalUncompressedSize); - })); + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> { + long valueCount = 0L; + long nullCount = 0L; + long totalSize = 0L; + long totalUncompressedSize = 0L; + for (HoodieMetadataColumnStats stats : e.getValue()) { + valueCount += stats.getValueCount(); + nullCount += stats.getNullCount(); + totalSize += stats.getTotalSize(); + totalUncompressedSize += stats.getTotalUncompressedSize(); + } + return HoodieColumnRangeMetadata.create( + "", e.getKey(), null, null, nullCount, valueCount, totalSize, totalUncompressedSize, ValueMetadata.NULL_METADATA); + })); } } diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableStatisticsReader.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableStatisticsReader.java index 55d424db40ca..5501905dcd00 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableStatisticsReader.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/stats/TableStatisticsReader.java @@ -24,11 +24,11 @@ import io.trino.spi.statistics.TableStatistics; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieLocalEngineContext; -import org.apache.hudi.common.model.HoodieColumnRangeMetadata; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.stats.HoodieColumnRangeMetadata; import java.util.List; import java.util.Map; @@ -113,6 +113,6 @@ public class TableStatisticsReader .stream().flatMap(entry -> entry.getValue() .map(baseFile -> Pair.of(entry.getKey(), baseFile.getFileName()))) .toList(); - return tableMetadata.getColumnStats(filePaths, columnNames); + return tableMetadata.getColumnsRange(filePaths, columnNames); } } diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java index 1308cff269a9..5fe42eb55a46 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiAlluxioCacheFileOperations.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.hudi; -import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.Multiset; @@ -21,7 +20,6 @@ import io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer; import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; @@ -34,15 +32,13 @@ import java.util.Map; import static com.google.common.io.MoreFiles.deleteRecursively; import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; -import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans; import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_MULTI_FG_PT_V8_MOR; +import static io.trino.plugin.hudi.util.FileOperationAssertions.assertAlluxioFileSystemAccesses; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.DATA; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.INDEX_DEFINITION; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE_PROPERTIES; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.TABLE_PROPERTIES; -import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; -import static java.util.stream.Collectors.toCollection; @ResourceLock("HUDI_CACHE_SYSTEM") @Execution(ExecutionMode.SAME_THREAD) @@ -78,29 +74,18 @@ public class TestHudiAlluxioCacheFileOperations throws InterruptedException { @Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'"; - assertFileSystemAccesses( - query, - ImmutableMultiset.<FileOperation>builder() - .addCopies(new FileOperation("Alluxio.readCached", DATA), 2) - .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 27) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 4) - .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 10) - .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) - .add(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES)) - .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2) - .build()); + Multiset<FileOperation> expectedFileOperations = ImmutableMultiset.<FileOperation>builder() + .addCopies(new FileOperation("Alluxio.readCached", DATA), 2) + .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 20) + .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 5) + .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 11) + .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) + .add(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES)) + .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2) + .build(); + assertAlluxioFileSystemAccesses(getDistributedQueryRunner(), query, expectedFileOperations); - assertFileSystemAccesses( - query, - ImmutableMultiset.<FileOperation>builder() - .addCopies(new FileOperation("Alluxio.readCached", DATA), 2) - .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 27) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 4) - .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 10) - .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) - .add(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES)) - .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2) - .build()); + assertAlluxioFileSystemAccesses(getDistributedQueryRunner(), query, expectedFileOperations); } @Test @@ -112,45 +97,30 @@ public class TestHudiAlluxioCacheFileOperations "INNER JOIN " + HUDI_MULTI_FG_PT_V8_MOR + " t2 ON t1.id = t2.id " + "WHERE t2.price <= 102"; - assertFileSystemAccesses(query, + assertAlluxioFileSystemAccesses( + getDistributedQueryRunner(), + query, ImmutableMultiset.<FileOperation>builder() .addCopies(new FileOperation("Alluxio.readCached", DATA), 6) - .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 288) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 39) - .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 93) + .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 222) + .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 60) + .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 114) .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 5) .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 3) .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5) .build()); - assertFileSystemAccesses(query, + assertAlluxioFileSystemAccesses( + getDistributedQueryRunner(), + query, ImmutableMultiset.<FileOperation>builder() .addCopies(new FileOperation("Alluxio.readCached", DATA), 6) - .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 215) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29) - .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 69) + .addCopies(new FileOperation("Alluxio.readCached", METADATA_TABLE), 166) + .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 45) + .addCopies(new FileOperation("InputFile.length", METADATA_TABLE), 85) .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) .build()); } - - private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<FileOperation> expectedCacheAccesses) - throws InterruptedException - { - DistributedQueryRunner queryRunner = getDistributedQueryRunner(); - queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); - // Allow time for table stats computation to finish before validation. - Thread.sleep(1000L); - assertMultisetsEqual(getFileOperations(queryRunner), expectedCacheAccesses); - } - - public static Multiset<FileOperation> getFileOperations(QueryRunner queryRunner) - { - return getCacheOperationSpans(queryRunner) - .stream() - .filter(span -> !span.getName().startsWith("InputFile.exists")) - .map(FileOperation::create) - .collect(toCollection(HashMultiset::create)); - } } diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java index 3f6d97f16919..7c61d260cab5 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiMemoryCacheFileOperations.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.hudi; -import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.Multiset; @@ -21,7 +20,6 @@ import io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer; import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; @@ -30,16 +28,13 @@ import org.junit.jupiter.api.parallel.ResourceLock; import java.util.Map; -import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation; -import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.isTrinoSchemaOrPermissions; import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_MULTI_FG_PT_V8_MOR; +import static io.trino.plugin.hudi.util.FileOperationAssertions.assertFileSystemAccesses; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.DATA; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.INDEX_DEFINITION; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE_PROPERTIES; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.TABLE_PROPERTIES; -import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; -import static java.util.stream.Collectors.toCollection; @ResourceLock("HUDI_CACHE_SYSTEM") @Execution(ExecutionMode.SAME_THREAD) @@ -68,29 +63,18 @@ public class TestHudiMemoryCacheFileOperations throws InterruptedException { @Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'"; - assertFileSystemAccesses( - query, - ImmutableMultiset.<FileOperation>builder() - .addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 2) - .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 4) - .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 6) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 4) - .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) - .add(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES)) - .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2) - .build()); + Multiset<FileOperation> expectedFileOperations = ImmutableMultiset.<FileOperation>builder() + .addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 2) + .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 5) + .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 6) + .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 5) + .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) + .add(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES)) + .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2) + .build(); + assertFileSystemAccesses(getDistributedQueryRunner(), query, expectedFileOperations); - assertFileSystemAccesses( - query, - ImmutableMultiset.<FileOperation>builder() - .addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 2) - .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 4) - .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 6) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 4) - .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) - .add(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES)) - .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2) - .build()); + assertFileSystemAccesses(getDistributedQueryRunner(), query, expectedFileOperations); } @Test @@ -102,47 +86,30 @@ public class TestHudiMemoryCacheFileOperations "INNER JOIN " + HUDI_MULTI_FG_PT_V8_MOR + " t2 ON t1.id = t2.id " + "WHERE t2.price <= 102"; - assertFileSystemAccesses(query, + assertFileSystemAccesses( + getDistributedQueryRunner(), + query, ImmutableMultiset.<FileOperation>builder() .addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 6) - .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 39) + .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 60) .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 54) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 39) + .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 60) .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 5) .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 3) .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5) .build()); - assertFileSystemAccesses(query, + assertFileSystemAccesses( + getDistributedQueryRunner(), + query, ImmutableMultiset.<FileOperation>builder() .addCopies(new FileOperation("FileSystemCache.cacheInput", DATA), 6) - .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 29) + .addCopies(new FileOperation("FileSystemCache.cacheLength", METADATA_TABLE), 45) .addCopies(new FileOperation("FileSystemCache.cacheStream", METADATA_TABLE), 40) - .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 29) + .addCopies(new FileOperation("InputFile.lastModified", METADATA_TABLE), 45) .addCopies(new FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) .addCopies(new FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) .addCopies(new FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) .build()); } - - private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<FileOperation> expectedCacheAccesses) - throws InterruptedException - { - DistributedQueryRunner queryRunner = getDistributedQueryRunner(); - queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); - // Allow time for table stats computation to finish before validation. - Thread.sleep(1000L); - assertMultisetsEqual(getFileOperations(queryRunner), expectedCacheAccesses); - } - - private static Multiset<FileOperation> getFileOperations(QueryRunner queryRunner) - { - return queryRunner.getSpans().stream() - .filter(span -> span.getName().startsWith("Input.") || span.getName().startsWith("InputFile.") || span.getName().startsWith("FileSystemCache.")) - .filter(span -> !span.getName().startsWith("InputFile.newInput")) - .filter(span -> !span.getName().startsWith("InputFile.exists")) - .filter(span -> !isTrinoSchemaOrPermissions(getFileLocation(span))) - .map(FileOperation::create) - .collect(toCollection(HashMultiset::create)); - } } diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java index 416914d3f7e2..c2fa041a99bd 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiNoCacheFileOperations.java @@ -13,7 +13,6 @@ */ package io.trino.plugin.hudi; -import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultiset; import com.google.common.collect.Multiset; @@ -21,7 +20,6 @@ import io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer; import io.trino.plugin.hudi.util.FileOperationUtils; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; -import io.trino.testing.QueryRunner; import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.parallel.Execution; @@ -30,16 +28,13 @@ import org.junit.jupiter.api.parallel.ResourceLock; import java.util.Map; -import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation; -import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.isTrinoSchemaOrPermissions; import static io.trino.plugin.hudi.testing.ResourceHudiTablesInitializer.TestingTable.HUDI_MULTI_FG_PT_V8_MOR; +import static io.trino.plugin.hudi.util.FileOperationAssertions.assertFileSystemAccesses; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.DATA; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.INDEX_DEFINITION; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.METADATA_TABLE_PROPERTIES; import static io.trino.plugin.hudi.util.FileOperationUtils.FileType.TABLE_PROPERTIES; -import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; -import static java.util.stream.Collectors.toCollection; @ResourceLock("HUDI_CACHE_SYSTEM") @Execution(ExecutionMode.SAME_THREAD) @@ -68,29 +63,18 @@ public class TestHudiNoCacheFileOperations throws InterruptedException { @Language("SQL") String query = "SELECT * FROM " + HUDI_MULTI_FG_PT_V8_MOR + " WHERE country='SG'"; - assertFileSystemAccesses( - query, - ImmutableMultiset.<FileOperationUtils.FileOperation>builder() - .addCopies(new FileOperationUtils.FileOperation("Input.readTail", DATA), 2) - .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 4) - .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 4) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 6) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) - .add(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES)) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2) - .build()); + Multiset<FileOperationUtils.FileOperation> expectedFileOperations = ImmutableMultiset.<FileOperationUtils.FileOperation>builder() + .addCopies(new FileOperationUtils.FileOperation("Input.readTail", DATA), 2) + .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 5) + .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 5) + .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 6) + .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) + .add(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES)) + .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2) + .build(); + assertFileSystemAccesses(getDistributedQueryRunner(), query, expectedFileOperations); - assertFileSystemAccesses( - query, - ImmutableMultiset.<FileOperationUtils.FileOperation>builder() - .addCopies(new FileOperationUtils.FileOperation("Input.readTail", DATA), 2) - .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 4) - .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 4) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 2) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 6) - .add(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES)) - .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 2) - .build()); + assertFileSystemAccesses(getDistributedQueryRunner(), query, expectedFileOperations); } @Test @@ -102,47 +86,30 @@ public class TestHudiNoCacheFileOperations "INNER JOIN " + HUDI_MULTI_FG_PT_V8_MOR + " t2 ON t1.id = t2.id " + "WHERE t2.price <= 102"; - assertFileSystemAccesses(query, + assertFileSystemAccesses( + getDistributedQueryRunner(), + query, ImmutableMultiset.<FileOperationUtils.FileOperation>builder() .addCopies(new FileOperationUtils.FileOperation("Input.readTail", DATA), 6) - .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 39) - .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 39) + .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 60) + .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 60) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 5) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 54) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 3) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 5) .build()); - assertFileSystemAccesses(query, + assertFileSystemAccesses( + getDistributedQueryRunner(), + query, ImmutableMultiset.<FileOperationUtils.FileOperation>builder() .addCopies(new FileOperationUtils.FileOperation("Input.readTail", DATA), 6) - .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 29) - .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 29) + .addCopies(new FileOperationUtils.FileOperation("InputFile.lastModified", METADATA_TABLE), 45) + .addCopies(new FileOperationUtils.FileOperation("InputFile.length", METADATA_TABLE), 45) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", INDEX_DEFINITION), 4) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE), 40) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", METADATA_TABLE_PROPERTIES), 2) .addCopies(new FileOperationUtils.FileOperation("InputFile.newStream", TABLE_PROPERTIES), 4) .build()); } - - private void assertFileSystemAccesses(@Language("SQL") String query, Multiset<FileOperationUtils.FileOperation> expectedCacheAccesses) - throws InterruptedException - { - DistributedQueryRunner queryRunner = getDistributedQueryRunner(); - queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); - // Allow time for table stats computation to finish before validation. - Thread.sleep(1000L); - assertMultisetsEqual(getFileOperations(queryRunner), expectedCacheAccesses); - } - - private static Multiset<FileOperationUtils.FileOperation> getFileOperations(QueryRunner queryRunner) - { - return queryRunner.getSpans().stream() - .filter(span -> span.getName().startsWith("Input.") || span.getName().startsWith("InputFile.") || span.getName().startsWith("FileSystemCache.")) - .filter(span -> !span.getName().startsWith("InputFile.newInput")) - .filter(span -> !span.getName().startsWith("InputFile.exists")) - .filter(span -> !isTrinoSchemaOrPermissions(getFileLocation(span))) - .map(FileOperationUtils.FileOperation::create) - .collect(toCollection(HashMultiset::create)); - } } diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java index d1c26fbf5edd..dbb7195d38ef 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java @@ -754,8 +754,8 @@ public class TestHudiSmokeTest .withPartitionStatsIndexEnabled(false) .withResolveColumnNameCasingEnabled(true) .build(); - MaterializedResult totalRes = getQueryRunner().execute(session, "SELECT * FROM " + HUDI_COW_TABLE_WITH_FIELD_NAMES_IN_CAPS); MaterializedResult prunedRes = getQueryRunner().execute(session, "SELECT * FROM " + HUDI_COW_TABLE_WITH_FIELD_NAMES_IN_CAPS + " WHERE id='1'"); + MaterializedResult totalRes = getQueryRunner().execute(session, "SELECT * FROM " + HUDI_COW_TABLE_WITH_FIELD_NAMES_IN_CAPS); int totalSplits = totalRes.getStatementStats().get().getTotalSplits(); int totalRows = totalRes.getRowCount(); int prunedSplits = prunedRes.getStatementStats().get().getTotalSplits(); diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java index 9c4f4a6176fc..266a5397af41 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/split/TestHudiSplitFactory.java @@ -181,6 +181,7 @@ public class TestHudiSplitFactory "/test/path", HoodieTableType.MERGE_ON_READ, ImmutableList.of(), + ImmutableList.of(), TupleDomain.all(), TupleDomain.all(), "", diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java index 10d1b9873ad3..640980c41936 100644 --- a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/testing/TpchHudiTablesInitializer.java @@ -45,6 +45,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.common.HoodieJavaEngineContext; import org.apache.hudi.common.bootstrap.index.NoOpBootstrapIndex; import org.apache.hudi.common.config.HoodieMetadataConfig; @@ -54,7 +55,6 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.marker.MarkerType; -import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieArchivalConfig; import org.apache.hudi.config.HoodieIndexConfig; @@ -64,12 +64,10 @@ import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; import org.intellij.lang.annotations.Language; import java.io.IOException; -import java.time.Instant; import java.time.LocalDate; import java.time.temporal.ChronoField; import java.util.ArrayList; import java.util.Collection; -import java.util.Date; import java.util.List; import java.util.Map; import java.util.Optional; @@ -168,9 +166,9 @@ public class TpchHudiTablesInitializer .map(MaterializedRow::getFields) .map(recordConverter::toRecord) .collect(Collectors.toList()); - String timestamp = HoodieInstantTimeGenerator.formatDate(Date.from(Instant.now())); - writeClient.startCommitWithTime(timestamp); - writeClient.insert(records, timestamp); + String instantTime = writeClient.startCommit(); + List<WriteStatus> writeStatuses = writeClient.insert(records, instantTime); + writeClient.commit(instantTime, writeStatuses); } } diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/util/FileOperationAssertions.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/util/FileOperationAssertions.java new file mode 100644 index 000000000000..9523ff61e366 --- /dev/null +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/util/FileOperationAssertions.java @@ -0,0 +1,143 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.util; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; +import io.airlift.log.Logger; +import io.trino.plugin.hudi.util.FileOperationUtils.FileOperation; +import io.trino.testing.QueryRunner; +import org.intellij.lang.annotations.Language; + +import java.util.Comparator; + +import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getCacheOperationSpans; +import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.getFileLocation; +import static io.trino.filesystem.tracing.CacheFileSystemTraceUtils.isTrinoSchemaOrPermissions; +import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; +import static java.util.stream.Collectors.toCollection; + +public final class FileOperationAssertions +{ + private static final Logger log = Logger.get(FileOperationAssertions.class); + + private FileOperationAssertions() {} + + /** + * Asserts that file system accesses match expected operations. + * This version uses manual filtering for Input/InputFile operations. + * Logs detailed comparison at WARN level for debugging test failures. + */ + public static void assertFileSystemAccesses( + QueryRunner queryRunner, + @Language("SQL") String query, + Multiset<FileOperation> expectedCacheAccesses) + throws InterruptedException + { + queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); + // Allow time for table stats computation to finish before validation. + Thread.sleep(1000L); + Multiset<FileOperation> actualCacheAccesses = getFileOperations(queryRunner); + printFileAccessDebugInfo(queryRunner, actualCacheAccesses, expectedCacheAccesses); + assertMultisetsEqual(actualCacheAccesses, expectedCacheAccesses); + } + + /** + * Asserts that file system accesses match expected operations for Alluxio cache tests. + * This version uses getCacheOperationSpans for filtering. + * Logs detailed comparison at WARN level for debugging test failures. + */ + public static void assertAlluxioFileSystemAccesses( + QueryRunner queryRunner, + @Language("SQL") String query, + Multiset<FileOperation> expectedCacheAccesses) + throws InterruptedException + { + queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); + // Allow time for table stats computation to finish before validation. + Thread.sleep(1000L); + Multiset<FileOperation> actualCacheAccesses = getAlluxioFileOperations(queryRunner); + printFileAccessDebugInfo(queryRunner, actualCacheAccesses, expectedCacheAccesses); + assertMultisetsEqual(actualCacheAccesses, expectedCacheAccesses); + } + + /** + * Gets file operations from query runner spans using manual filtering. + */ + public static Multiset<FileOperation> getFileOperations(QueryRunner queryRunner) + { + return queryRunner.getSpans().stream() + .filter(span -> span.getName().startsWith("Input.") || span.getName().startsWith("InputFile.") || span.getName().startsWith("FileSystemCache.")) + .filter(span -> !span.getName().startsWith("InputFile.newInput")) + .filter(span -> !span.getName().startsWith("InputFile.exists")) + .filter(span -> !isTrinoSchemaOrPermissions(getFileLocation(span))) + .map(FileOperation::create) + .collect(toCollection(HashMultiset::create)); + } + + /** + * Gets file operations for Alluxio cache tests using getCacheOperationSpans. + */ + public static Multiset<FileOperation> getAlluxioFileOperations(QueryRunner queryRunner) + { + return getCacheOperationSpans(queryRunner) + .stream() + .filter(span -> !span.getName().startsWith("InputFile.exists")) + .map(FileOperation::create) + .collect(toCollection(HashMultiset::create)); + } + + private static void printFileAccessDebugInfo( + QueryRunner queryRunner, + Multiset<FileOperation> actualCacheAccesses, + Multiset<FileOperation> expectedCacheAccesses) + { + // Log all file paths accessed for debugging + log.warn("=== All File Paths Accessed ==="); + queryRunner.getSpans().stream() + .filter(span -> span.getName().equals("InputFile.lastModified") || span.getName().equals("InputFile.length")) + .forEach(span -> log.warn("%s: %s", span.getName(), getFileLocation(span))); + + // Log actual and expected cache accesses + log.warn("=== Actual Cache Accesses ==="); + logSortedMultiset(actualCacheAccesses); + + log.warn("=== Expected Cache Accesses ==="); + logSortedMultiset(expectedCacheAccesses); + + // Calculate and log differences + Multiset<FileOperation> extraInActual = HashMultiset.create(actualCacheAccesses); + extraInActual.removeAll(expectedCacheAccesses); + + Multiset<FileOperation> missingFromActual = HashMultiset.create(expectedCacheAccesses); + missingFromActual.removeAll(actualCacheAccesses); + + if (!extraInActual.isEmpty()) { + log.warn("=== Extra in Actual (not expected) ==="); + logSortedMultiset(extraInActual); + } + + if (!missingFromActual.isEmpty()) { + log.warn("=== Missing from Actual (expected but not found) ==="); + logSortedMultiset(missingFromActual); + } + } + + private static void logSortedMultiset(Multiset<FileOperation> multiset) + { + multiset.entrySet().stream() + .sorted(Comparator.comparing(a -> a.getElement().toString())) + .forEach(entry -> log.warn("%s: %s", entry.getElement(), entry.getCount())); + } +}
