This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 55fe68efb52 [HUDI-7059] Eanble filter pushdown for positional merging
(#10167)
55fe68efb52 is described below
commit 55fe68efb527754d4b6e349f723b3279c663b2c5
Author: Lin Liu <[email protected]>
AuthorDate: Wed Dec 20 23:03:54 2023 -0800
[HUDI-7059] Eanble filter pushdown for positional merging (#10167)
Changes:
1. Add row index column to file readers for Spark3.5.
2. Support data filters for different spark versions.
3. Read record positions from records for Spark3.5.
---
.../hudi/BaseSparkInternalRowReaderContext.java | 9 ++
.../hudi/common/engine/HoodieReaderContext.java | 9 ++
.../HoodiePositionBasedFileGroupRecordBuffer.java | 2 +
...odieFileGroupReaderBasedParquetFileFormat.scala | 83 ++++++++++--
...odieFileGroupReaderBasedParquetFileFormat.scala | 96 ++++++++++++++
.../TestSpark35RecordPositionMetadataColumn.scala | 147 +++++++++++++++++++++
6 files changed, 332 insertions(+), 14 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index a4d14d1eb4a..86bcaabcb17 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -115,6 +115,15 @@ public abstract class BaseSparkInternalRowReaderContext
extends HoodieReaderCont
return internalRow.copy();
}
+ @Override
+ public long extractRecordPosition(InternalRow record, Schema recordSchema,
String fieldName, long providedPositionIfNeeded) {
+ Object position = getFieldValueFromInternalRow(record, recordSchema,
fieldName);
+ if (position != null) {
+ return (long) position;
+ }
+ return providedPositionIfNeeded;
+ }
+
private Object getFieldValueFromInternalRow(InternalRow row, Schema
recordSchema, String fieldName) {
StructType structType = getCachedSchema(recordSchema);
scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 8daf3f441f7..1d81007c375 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -210,4 +210,13 @@ public abstract class HoodieReaderContext<T> {
* @return a function that takes in a record and returns the record with
reordered columns
*/
public abstract UnaryOperator<T> projectRecord(Schema from, Schema to);
+
+ /**
+ * Extracts the record position value from the record itself.
+ *
+ * @return the record position in the base file.
+ */
+ public long extractRecordPosition(T record, Schema schema, String fieldName,
long providedPositionIfNeeded) {
+ return providedPositionIfNeeded;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 4bb70c1aa3d..de63b0fb2e3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -50,6 +50,7 @@ import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STR
* {@link #hasNext} method is called.
*/
public class HoodiePositionBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupRecordBuffer<T> {
+ private static final String ROW_INDEX_COLUMN_NAME = "row_index";
private long nextRecordPosition = 0L;
public HoodiePositionBasedFileGroupRecordBuffer(HoodieReaderContext<T>
readerContext,
@@ -169,6 +170,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
// Handle merging.
while (baseFileIterator.hasNext()) {
T baseRecord = baseFileIterator.next();
+ nextRecordPosition = readerContext.extractRecordPosition(baseRecord,
readerSchema, ROW_INDEX_COLUMN_NAME, nextRecordPosition);
Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(nextRecordPosition++);
Map<String, Object> metadata = readerContext.generateMetadataForRecord(
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 57fdaf80e74..82a38a58841 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.cdc.{CDCFileGroupIterator,
CDCRelation, HoodieCDCFileGrou
import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model._
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.table.read.HoodieFileGroupReader
import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping,
HoodieSparkUtils, HoodieTableSchema, HoodieTableState,
MergeOnReadSnapshotRelation, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
@@ -34,9 +34,10 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
+import
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat.{ROW_INDEX,
ROW_INDEX_TEMPORARY_COLUMN_NAME, getAppliedFilters, getAppliedRequiredSchema,
getLogFilesFromSlice, getRecordKeyRelatedFilters}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types.{LongType, Metadata, MetadataBuilder,
StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
import scala.annotation.tailrec
@@ -277,16 +278,21 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
filters ++ requiredFilters, options, new Configuration(hadoopConf))
m.put(generateKey(dataSchema, requiredSchema), baseFileReader)
- //file reader for reading a hudi base file that needs to be merged with
log files
- val preMergeBaseFileReader = if (isMOR) {
- // Add support for reading files using inline file system.
- super.buildReaderWithPartitionValues(sparkSession, dataSchema,
StructType(Seq.empty), requiredSchemaWithMandatory,
- if (shouldUseRecordPosition) requiredFilters else
recordKeyRelatedFilters ++ requiredFilters,
- options, new Configuration(hadoopConf))
- } else {
- _: PartitionedFile => Iterator.empty
- }
- m.put(generateKey(dataSchema, requiredSchemaWithMandatory),
preMergeBaseFileReader)
+ // File reader for reading a Hoodie base file that needs to be merged with
log files
+ // Add support for reading files using inline file system.
+ val appliedRequiredSchema: StructType = getAppliedRequiredSchema(
+ requiredSchemaWithMandatory, shouldUseRecordPosition,
ROW_INDEX_TEMPORARY_COLUMN_NAME)
+ val appliedFilters = getAppliedFilters(
+ requiredFilters, recordKeyRelatedFilters, shouldUseRecordPosition)
+ val preMergeBaseFileReader = super.buildReaderWithPartitionValues(
+ sparkSession,
+ dataSchema,
+ StructType(Nil),
+ appliedRequiredSchema,
+ appliedFilters,
+ options,
+ new Configuration(hadoopConf))
+ m.put(generateKey(dataSchema, appliedRequiredSchema),
preMergeBaseFileReader)
val cdcFileReader = super.buildReaderWithPartitionValues(
sparkSession,
@@ -351,12 +357,61 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
protected def generateKey(dataSchema: StructType, requestedSchema:
StructType): Long = {
AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema,
sanitizedTableName).hashCode() +
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema,
sanitizedTableName).hashCode()
}
+}
+
+object HoodieFileGroupReaderBasedParquetFileFormat {
+ // From "ParquetFileFormat.scala": The names of the field for record
position.
+ private val ROW_INDEX = "row_index"
+ private val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
+
+ // From "namedExpressions.scala": Used to construct to record position field
metadata.
+ private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY =
"__file_source_generated_metadata_col"
+ private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
+ private val METADATA_COL_ATTR_KEY = "__metadata_col"
- protected def getRecordKeyRelatedFilters(filters: Seq[Filter],
recordKeyColumn: String): Seq[Filter] = {
+ def getRecordKeyRelatedFilters(filters: Seq[Filter], recordKeyColumn:
String): Seq[Filter] = {
filters.filter(f => f.references.exists(c =>
c.equalsIgnoreCase(recordKeyColumn)))
}
- protected def getLogFilesFromSlice(fileSlice: FileSlice):
List[HoodieLogFile] = {
+ def getLogFilesFromSlice(fileSlice: FileSlice): List[HoodieLogFile] = {
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
}
+
+ def getFieldMetadata(name: String, internalName: String): Metadata = {
+ new MetadataBuilder()
+ .putString(METADATA_COL_ATTR_KEY, name)
+ .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true)
+ .putString(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY, internalName)
+ .build()
+ }
+
+ def getAppliedRequiredSchema(requiredSchema: StructType,
+ shouldUseRecordPosition: Boolean,
+ recordPositionColumn: String): StructType = {
+ if (shouldAddRecordPositionColumn(shouldUseRecordPosition)) {
+ val metadata = getFieldMetadata(recordPositionColumn,
ROW_INDEX_TEMPORARY_COLUMN_NAME)
+ val rowIndexField = StructField(recordPositionColumn, LongType, nullable
= false, metadata)
+ StructType(requiredSchema.fields :+ rowIndexField)
+ } else {
+ requiredSchema
+ }
+ }
+
+ def getAppliedFilters(requiredFilters: Seq[Filter],
+ recordKeyRelatedFilters: Seq[Filter],
+ shouldUseRecordPosition: Boolean): Seq[Filter] = {
+ if (shouldAddRecordKeyFilters(shouldUseRecordPosition)) {
+ requiredFilters ++ recordKeyRelatedFilters
+ } else {
+ requiredFilters
+ }
+ }
+
+ def shouldAddRecordPositionColumn(shouldUseRecordPosition: Boolean): Boolean
= {
+ HoodieSparkUtils.gteqSpark3_5 && shouldUseRecordPosition
+ }
+
+ def shouldAddRecordKeyFilters(shouldUseRecordPosition: Boolean): Boolean = {
+ (!shouldUseRecordPosition) || HoodieSparkUtils.gteqSpark3_5
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestHoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestHoodieFileGroupReaderBasedParquetFileFormat.scala
new file mode 100644
index 00000000000..5a5456cb173
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestHoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.execution.datasources.parquet
+
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat
+import org.apache.spark.sql.sources.{EqualTo, GreaterThan, IsNotNull}
+import org.apache.spark.sql.types.{LongType, StringType, StructField,
StructType}
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+class TestHoodieFileGroupReaderBasedParquetFileFormat extends
SparkClientFunctionalTestHarness {
+ @Test
+ def testGetRecordKeyRelatedFilters(): Unit = {
+ val filters = Seq(
+ IsNotNull("non_key_column"),
+ EqualTo("non_key_column", 1)
+ )
+ val filtersWithoutKeyColumn =
HoodieFileGroupReaderBasedParquetFileFormat.getRecordKeyRelatedFilters(
+ filters, "key_column");
+ assertEquals(0, filtersWithoutKeyColumn.size)
+
+ val filtersWithKeys = Seq(
+ EqualTo("key_column", 1),
+ GreaterThan("non_key_column", 2)
+ )
+ val filtersWithKeyColumn =
HoodieFileGroupReaderBasedParquetFileFormat.getRecordKeyRelatedFilters(
+ filtersWithKeys, "key_column")
+ assertEquals(1, filtersWithKeyColumn.size)
+ assertEquals("key_column", filtersWithKeyColumn.head.references.head)
+ }
+
+ @Test
+ def testGetAppliedRequiredSchema(): Unit = {
+ val fields = Array(
+ StructField("column_a", LongType, nullable = false),
+ StructField("column_b", StringType, nullable = false))
+ val requiredSchema = StructType(fields)
+
+ val appliedSchema: StructType =
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedRequiredSchema(
+ requiredSchema, shouldUseRecordPosition = true, "row_index")
+ if (HoodieSparkUtils.gteqSpark3_5) {
+ assertEquals(3, appliedSchema.fields.length)
+ } else {
+ assertEquals(2, appliedSchema.fields.length)
+ }
+
+ val schemaWithoutRowIndexColumn =
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedRequiredSchema(
+ requiredSchema, shouldUseRecordPosition = false, "row_index")
+ assertEquals(2, schemaWithoutRowIndexColumn.fields.length)
+ }
+
+ @Test
+ def testGetAppliedFilters(): Unit = {
+ val filters = Seq(
+ IsNotNull("non_key_column"),
+ EqualTo("non_key_column", 1)
+ )
+ val keyRelatedFilters = Seq(
+ EqualTo("key_column", 2)
+ )
+
+ val appliedFilters =
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedFilters(
+ filters, keyRelatedFilters, shouldUseRecordPosition = true
+ )
+ if (!HoodieSparkUtils.gteqSpark3_5) {
+ assertEquals(2, appliedFilters.size)
+ } else {
+ assertEquals(3, appliedFilters.size)
+ }
+
+ val appliedFiltersWithoutUsingRecordPosition =
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedFilters(
+ filters, keyRelatedFilters, shouldUseRecordPosition = false
+ )
+ assertEquals(3, appliedFiltersWithoutUsingRecordPosition.size)
+ }
+}
+
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
new file mode 100644
index 00000000000..6065eb683a7
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.testutils.HoodieTestTable
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.util.CloseableInternalRowIterator
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
+import org.apache.spark.sql.catalyst.InternalRow
+import
org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat,
ParquetFileFormat}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals,
assertTrue}
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+class TestSpark35RecordPositionMetadataColumn extends
SparkClientFunctionalTestHarness {
+ private val PARQUET_FORMAT = "parquet"
+ private val ROW_INDEX_COLUMN = "_tmp_metadata_row_index"
+ private val SPARK_MERGER = "org.apache.hudi.HoodieSparkRecordMerger"
+
+ @BeforeEach
+ def setUp(): Unit = {
+ val _spark = spark
+ import _spark.implicits._
+
+ val userToCountryDF = Seq(
+ (1, "US", "1001"),
+ (2, "China", "1003"),
+ (3, "US", "1002"),
+ (4, "Singapore", "1004"))
+ .toDF("userid", "country", "ts")
+
+ // Create the file with record positions.
+ userToCountryDF.write.format("hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "userid")
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
+ .option(HoodieWriteConfig.TBL_NAME.key, "user_to_country")
+ .option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, SPARK_MERGER)
+ .option(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key, "true")
+ .option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key,
PARQUET_FORMAT)
+ .option(
+ DataSourceWriteOptions.TABLE_TYPE.key(),
+ HoodieTableType.MERGE_ON_READ.name())
+ .save(basePath)
+ }
+
+ @Test
+ def testRecordPositionColumn(): Unit = {
+ val _spark = spark
+ // Prepare the schema
+ val dataSchema = new StructType(
+ Array(
+ StructField("userid", IntegerType, nullable = false),
+ StructField("country", StringType, nullable = false),
+ StructField("ts", StringType, nullable = false)
+ )
+ )
+ val requiredSchema = HoodieFileGroupReaderBasedParquetFileFormat
+ .getAppliedRequiredSchema(
+ dataSchema,
+ shouldUseRecordPosition = true,
+ ROW_INDEX_COLUMN)
+
+ // Confirm if the schema is as expected.
+ if (HoodieSparkUtils.gteqSpark3_5) {
+ assertEquals(4, requiredSchema.fields.length)
+ assertEquals(
+ "StructField(_tmp_metadata_row_index,LongType,false)",
+ requiredSchema.fields(3).toString)
+ }
+
+ // Prepare the file and Parquet file reader.
+ _spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
+ val metaClient = getHoodieMetaClient(
+ _spark.sparkContext.hadoopConfiguration, basePath)
+ val fileReader = new ParquetFileFormat().buildReaderWithPartitionValues(
+ _spark,
+ dataSchema,
+ StructType(Nil),
+ requiredSchema,
+ Nil,
+ Map.empty,
+ new Configuration(spark().sparkContext.hadoopConfiguration))
+ val allBaseFiles = HoodieTestTable.of(metaClient).listAllBaseFiles
+ assertTrue(allBaseFiles.nonEmpty)
+
+ // Make sure we can read all the positions out from base file.
+ // Here we don't add filters since enabling filter push-down
+ // for parquet file is tricky.
+ if (HoodieSparkUtils.gteqSpark3_5) {
+ val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
+ .createPartitionedFile(
+ InternalRow.empty,
+ allBaseFiles.head.getPath,
+ 0,
+ allBaseFiles.head.getLen)
+ val iterator = new
CloseableInternalRowIterator(fileReader.apply(fileInfo))
+ var rowIndices: Set[Long] = Set()
+ while (iterator.hasNext) {
+ val row = iterator.next()
+ rowIndices += row.getLong(3)
+ }
+ iterator.close()
+ val expectedRowIndices: Set[Long] = Set(0L, 1L, 2L, 3L)
+ assertEquals(expectedRowIndices, rowIndices)
+ }
+ }
+
+ @Test
+ def testUseFileGroupReaderDirectly(): Unit = {
+ val _spark = spark
+ import _spark.implicits._
+
+ // Read the records out with record positions.
+ val allRecords = _spark.read.format("hudi")
+ .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key, "true")
+ .load(basePath)
+
+ // Ensure the number of outcomes are correct for all Spark versions
+ // including Spark3.5.
+ val usRecords = allRecords
+ .select("userid")
+ .filter("country = 'US'").map(_.getInt(0)).collect()
+ assertArrayEquals(Array[Int](1, 3), usRecords)
+ }
+}