This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 55fe68efb52 [HUDI-7059] Eanble filter pushdown for positional merging 
(#10167)
55fe68efb52 is described below

commit 55fe68efb527754d4b6e349f723b3279c663b2c5
Author: Lin Liu <[email protected]>
AuthorDate: Wed Dec 20 23:03:54 2023 -0800

    [HUDI-7059] Eanble filter pushdown for positional merging (#10167)
    
    Changes:
    1. Add row index column to file readers for Spark3.5.
    2. Support data filters for different spark versions.
    3. Read record positions from records for Spark3.5.
---
 .../hudi/BaseSparkInternalRowReaderContext.java    |   9 ++
 .../hudi/common/engine/HoodieReaderContext.java    |   9 ++
 .../HoodiePositionBasedFileGroupRecordBuffer.java  |   2 +
 ...odieFileGroupReaderBasedParquetFileFormat.scala |  83 ++++++++++--
 ...odieFileGroupReaderBasedParquetFileFormat.scala |  96 ++++++++++++++
 .../TestSpark35RecordPositionMetadataColumn.scala  | 147 +++++++++++++++++++++
 6 files changed, 332 insertions(+), 14 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index a4d14d1eb4a..86bcaabcb17 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -115,6 +115,15 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
     return internalRow.copy();
   }
 
+  @Override
+  public long extractRecordPosition(InternalRow record, Schema recordSchema, 
String fieldName, long providedPositionIfNeeded) {
+    Object position = getFieldValueFromInternalRow(record, recordSchema, 
fieldName);
+    if (position != null) {
+      return (long) position;
+    }
+    return providedPositionIfNeeded;
+  }
+
   private Object getFieldValueFromInternalRow(InternalRow row, Schema 
recordSchema, String fieldName) {
     StructType structType = getCachedSchema(recordSchema);
     scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 8daf3f441f7..1d81007c375 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -210,4 +210,13 @@ public abstract class HoodieReaderContext<T> {
    * @return a function that takes in a record and returns the record with 
reordered columns
    */
   public abstract UnaryOperator<T> projectRecord(Schema from, Schema to);
+
+  /**
+   * Extracts the record position value from the record itself.
+   *
+   * @return the record position in the base file.
+   */
+  public long extractRecordPosition(T record, Schema schema, String fieldName, 
long providedPositionIfNeeded) {
+    return providedPositionIfNeeded;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 4bb70c1aa3d..de63b0fb2e3 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -50,6 +50,7 @@ import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STR
  * {@link #hasNext} method is called.
  */
 public class HoodiePositionBasedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupRecordBuffer<T> {
+  private static final String ROW_INDEX_COLUMN_NAME = "row_index";
   private long nextRecordPosition = 0L;
 
   public HoodiePositionBasedFileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext,
@@ -169,6 +170,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieBaseFileG
     // Handle merging.
     while (baseFileIterator.hasNext()) {
       T baseRecord = baseFileIterator.next();
+      nextRecordPosition = readerContext.extractRecordPosition(baseRecord, 
readerSchema, ROW_INDEX_COLUMN_NAME, nextRecordPosition);
       Pair<Option<T>, Map<String, Object>> logRecordInfo = 
records.remove(nextRecordPosition++);
 
       Map<String, Object> metadata = readerContext.generateMetadataForRecord(
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 57fdaf80e74..82a38a58841 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.cdc.{CDCFileGroupIterator, 
CDCRelation, HoodieCDCFileGrou
 import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.engine.HoodieReaderContext
 import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model._
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.read.HoodieFileGroupReader
 import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
HoodieSparkUtils, HoodieTableSchema, HoodieTableState, 
MergeOnReadSnapshotRelation, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
@@ -34,9 +34,10 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
+import 
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat.{ROW_INDEX,
 ROW_INDEX_TEMPORARY_COLUMN_NAME, getAppliedFilters, getAppliedRequiredSchema, 
getLogFilesFromSlice, getRecordKeyRelatedFilters}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types.{LongType, Metadata, MetadataBuilder, 
StringType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
 import scala.annotation.tailrec
@@ -277,16 +278,21 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       filters ++ requiredFilters, options, new Configuration(hadoopConf))
     m.put(generateKey(dataSchema, requiredSchema), baseFileReader)
 
-    //file reader for reading a hudi base file that needs to be merged with 
log files
-    val preMergeBaseFileReader = if (isMOR) {
-      // Add support for reading files using inline file system.
-      super.buildReaderWithPartitionValues(sparkSession, dataSchema, 
StructType(Seq.empty), requiredSchemaWithMandatory,
-        if (shouldUseRecordPosition) requiredFilters else 
recordKeyRelatedFilters ++ requiredFilters,
-        options, new Configuration(hadoopConf))
-    } else {
-      _: PartitionedFile => Iterator.empty
-    }
-    m.put(generateKey(dataSchema, requiredSchemaWithMandatory), 
preMergeBaseFileReader)
+    // File reader for reading a Hoodie base file that needs to be merged with 
log files
+    // Add support for reading files using inline file system.
+    val appliedRequiredSchema: StructType = getAppliedRequiredSchema(
+      requiredSchemaWithMandatory, shouldUseRecordPosition, 
ROW_INDEX_TEMPORARY_COLUMN_NAME)
+    val appliedFilters = getAppliedFilters(
+      requiredFilters, recordKeyRelatedFilters, shouldUseRecordPosition)
+    val preMergeBaseFileReader = super.buildReaderWithPartitionValues(
+      sparkSession,
+      dataSchema,
+      StructType(Nil),
+      appliedRequiredSchema,
+      appliedFilters,
+      options,
+      new Configuration(hadoopConf))
+    m.put(generateKey(dataSchema, appliedRequiredSchema), 
preMergeBaseFileReader)
 
     val cdcFileReader = super.buildReaderWithPartitionValues(
       sparkSession,
@@ -351,12 +357,61 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
   protected def generateKey(dataSchema: StructType, requestedSchema: 
StructType): Long = {
     AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, 
sanitizedTableName).hashCode() + 
AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, 
sanitizedTableName).hashCode()
   }
+}
+
+object HoodieFileGroupReaderBasedParquetFileFormat {
+  // From "ParquetFileFormat.scala": The names of the field for record 
position.
+  private val ROW_INDEX = "row_index"
+  private val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
+
+  // From "namedExpressions.scala": Used to construct to record position field 
metadata.
+  private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY = 
"__file_source_generated_metadata_col"
+  private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
+  private val METADATA_COL_ATTR_KEY = "__metadata_col"
 
-  protected def getRecordKeyRelatedFilters(filters: Seq[Filter], 
recordKeyColumn: String): Seq[Filter] = {
+  def getRecordKeyRelatedFilters(filters: Seq[Filter], recordKeyColumn: 
String): Seq[Filter] = {
     filters.filter(f => f.references.exists(c => 
c.equalsIgnoreCase(recordKeyColumn)))
   }
 
-  protected def getLogFilesFromSlice(fileSlice: FileSlice): 
List[HoodieLogFile] = {
+  def getLogFilesFromSlice(fileSlice: FileSlice): List[HoodieLogFile] = {
     
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
   }
+
+  def getFieldMetadata(name: String, internalName: String): Metadata = {
+    new MetadataBuilder()
+      .putString(METADATA_COL_ATTR_KEY, name)
+      .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true)
+      .putString(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY, internalName)
+      .build()
+  }
+
+  def getAppliedRequiredSchema(requiredSchema: StructType,
+                               shouldUseRecordPosition: Boolean,
+                               recordPositionColumn: String): StructType = {
+    if (shouldAddRecordPositionColumn(shouldUseRecordPosition)) {
+      val metadata = getFieldMetadata(recordPositionColumn, 
ROW_INDEX_TEMPORARY_COLUMN_NAME)
+      val rowIndexField = StructField(recordPositionColumn, LongType, nullable 
= false, metadata)
+      StructType(requiredSchema.fields :+ rowIndexField)
+    } else {
+      requiredSchema
+    }
+  }
+
+  def getAppliedFilters(requiredFilters: Seq[Filter],
+                        recordKeyRelatedFilters: Seq[Filter],
+                        shouldUseRecordPosition: Boolean): Seq[Filter] = {
+    if (shouldAddRecordKeyFilters(shouldUseRecordPosition)) {
+      requiredFilters ++ recordKeyRelatedFilters
+    } else {
+      requiredFilters
+    }
+  }
+
+  def shouldAddRecordPositionColumn(shouldUseRecordPosition: Boolean): Boolean 
= {
+    HoodieSparkUtils.gteqSpark3_5 && shouldUseRecordPosition
+  }
+
+  def shouldAddRecordKeyFilters(shouldUseRecordPosition: Boolean): Boolean = {
+    (!shouldUseRecordPosition) || HoodieSparkUtils.gteqSpark3_5
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestHoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestHoodieFileGroupReaderBasedParquetFileFormat.scala
new file mode 100644
index 00000000000..5a5456cb173
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestHoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.execution.datasources.parquet
+
+import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import 
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat
+import org.apache.spark.sql.sources.{EqualTo, GreaterThan, IsNotNull}
+import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+class TestHoodieFileGroupReaderBasedParquetFileFormat extends 
SparkClientFunctionalTestHarness {
+  @Test
+  def testGetRecordKeyRelatedFilters(): Unit = {
+    val filters = Seq(
+      IsNotNull("non_key_column"),
+      EqualTo("non_key_column", 1)
+    )
+    val filtersWithoutKeyColumn = 
HoodieFileGroupReaderBasedParquetFileFormat.getRecordKeyRelatedFilters(
+      filters, "key_column");
+    assertEquals(0, filtersWithoutKeyColumn.size)
+
+    val filtersWithKeys = Seq(
+      EqualTo("key_column", 1),
+      GreaterThan("non_key_column", 2)
+    )
+    val filtersWithKeyColumn = 
HoodieFileGroupReaderBasedParquetFileFormat.getRecordKeyRelatedFilters(
+      filtersWithKeys, "key_column")
+    assertEquals(1, filtersWithKeyColumn.size)
+    assertEquals("key_column", filtersWithKeyColumn.head.references.head)
+  }
+
+  @Test
+  def testGetAppliedRequiredSchema(): Unit = {
+    val fields = Array(
+      StructField("column_a", LongType, nullable = false),
+      StructField("column_b", StringType, nullable = false))
+    val requiredSchema = StructType(fields)
+
+    val appliedSchema: StructType = 
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedRequiredSchema(
+      requiredSchema, shouldUseRecordPosition = true, "row_index")
+    if (HoodieSparkUtils.gteqSpark3_5) {
+      assertEquals(3, appliedSchema.fields.length)
+    } else {
+      assertEquals(2, appliedSchema.fields.length)
+    }
+
+    val schemaWithoutRowIndexColumn = 
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedRequiredSchema(
+      requiredSchema, shouldUseRecordPosition = false, "row_index")
+    assertEquals(2, schemaWithoutRowIndexColumn.fields.length)
+  }
+
+  @Test
+  def testGetAppliedFilters(): Unit = {
+    val filters = Seq(
+      IsNotNull("non_key_column"),
+      EqualTo("non_key_column", 1)
+    )
+    val keyRelatedFilters = Seq(
+      EqualTo("key_column", 2)
+    )
+
+    val appliedFilters = 
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedFilters(
+      filters, keyRelatedFilters, shouldUseRecordPosition = true
+    )
+    if (!HoodieSparkUtils.gteqSpark3_5) {
+      assertEquals(2, appliedFilters.size)
+    } else {
+      assertEquals(3, appliedFilters.size)
+    }
+
+    val appliedFiltersWithoutUsingRecordPosition = 
HoodieFileGroupReaderBasedParquetFileFormat.getAppliedFilters(
+      filters, keyRelatedFilters, shouldUseRecordPosition = false
+    )
+    assertEquals(3, appliedFiltersWithoutUsingRecordPosition.size)
+  }
+}
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
new file mode 100644
index 00000000000..6065eb683a7
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.testutils.HoodieTestTable
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.util.CloseableInternalRowIterator
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils}
+import org.apache.spark.sql.catalyst.InternalRow
+import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat,
 ParquetFileFormat}
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, 
StructType}
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, 
assertTrue}
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+class TestSpark35RecordPositionMetadataColumn extends 
SparkClientFunctionalTestHarness {
+  private val PARQUET_FORMAT = "parquet"
+  private val ROW_INDEX_COLUMN = "_tmp_metadata_row_index"
+  private val SPARK_MERGER = "org.apache.hudi.HoodieSparkRecordMerger"
+
+  @BeforeEach
+  def setUp(): Unit = {
+    val _spark = spark
+    import _spark.implicits._
+
+    val userToCountryDF = Seq(
+      (1, "US", "1001"),
+      (2, "China", "1003"),
+      (3, "US", "1002"),
+      (4, "Singapore", "1004"))
+      .toDF("userid", "country", "ts")
+
+    // Create the file with record positions.
+    userToCountryDF.write.format("hudi")
+      .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "userid")
+      .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
+      .option(HoodieWriteConfig.TBL_NAME.key, "user_to_country")
+      .option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, SPARK_MERGER)
+      .option(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key, "true")
+      .option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key, 
PARQUET_FORMAT)
+      .option(
+        DataSourceWriteOptions.TABLE_TYPE.key(),
+        HoodieTableType.MERGE_ON_READ.name())
+      .save(basePath)
+  }
+
+  @Test
+  def testRecordPositionColumn(): Unit = {
+    val _spark = spark
+    // Prepare the schema
+    val dataSchema = new StructType(
+      Array(
+        StructField("userid", IntegerType, nullable = false),
+        StructField("country", StringType, nullable = false),
+        StructField("ts", StringType, nullable = false)
+      )
+    )
+    val requiredSchema = HoodieFileGroupReaderBasedParquetFileFormat
+      .getAppliedRequiredSchema(
+        dataSchema,
+        shouldUseRecordPosition = true,
+        ROW_INDEX_COLUMN)
+
+    // Confirm if the schema is as expected.
+    if (HoodieSparkUtils.gteqSpark3_5) {
+      assertEquals(4, requiredSchema.fields.length)
+      assertEquals(
+        "StructField(_tmp_metadata_row_index,LongType,false)",
+        requiredSchema.fields(3).toString)
+    }
+
+    // Prepare the file and Parquet file reader.
+    _spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
+    val metaClient = getHoodieMetaClient(
+      _spark.sparkContext.hadoopConfiguration, basePath)
+    val fileReader = new ParquetFileFormat().buildReaderWithPartitionValues(
+      _spark,
+      dataSchema,
+      StructType(Nil),
+      requiredSchema,
+      Nil,
+      Map.empty,
+      new Configuration(spark().sparkContext.hadoopConfiguration))
+    val allBaseFiles = HoodieTestTable.of(metaClient).listAllBaseFiles
+    assertTrue(allBaseFiles.nonEmpty)
+
+    // Make sure we can read all the positions out from base file.
+    // Here we don't add filters since enabling filter push-down
+    // for parquet file is tricky.
+    if (HoodieSparkUtils.gteqSpark3_5) {
+      val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
+        .createPartitionedFile(
+          InternalRow.empty,
+          allBaseFiles.head.getPath,
+          0,
+          allBaseFiles.head.getLen)
+      val iterator = new 
CloseableInternalRowIterator(fileReader.apply(fileInfo))
+      var rowIndices: Set[Long] = Set()
+      while (iterator.hasNext) {
+        val row = iterator.next()
+        rowIndices += row.getLong(3)
+      }
+      iterator.close()
+      val expectedRowIndices: Set[Long] = Set(0L, 1L, 2L, 3L)
+      assertEquals(expectedRowIndices, rowIndices)
+    }
+  }
+
+  @Test
+  def testUseFileGroupReaderDirectly(): Unit = {
+    val _spark = spark
+    import _spark.implicits._
+
+    // Read the records out with record positions.
+    val allRecords = _spark.read.format("hudi")
+      .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key, "true")
+      .load(basePath)
+
+    // Ensure the number of outcomes are correct for all Spark versions
+    // including Spark3.5.
+    val usRecords = allRecords
+      .select("userid")
+      .filter("country = 'US'").map(_.getInt(0)).collect()
+    assertArrayEquals(Array[Int](1, 3), usRecords)
+  }
+}

Reply via email to