This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0d7cc87d687 [HUDI-7840] Add position merging to the new file group
reader (#11413)
0d7cc87d687 is described below
commit 0d7cc87d687bd235bac099e481535cb9f223b501
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Jun 7 19:47:42 2024 -0400
[HUDI-7840] Add position merging to the new file group reader (#11413)
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Sagar Sumit <[email protected]>
---
.../SparkFileFormatInternalRowReaderContext.scala | 202 +++++++++++++++----
.../hudi/common/engine/HoodieReaderContext.java | 22 +++
.../common/table/read/HoodieFileGroupReader.java | 6 +-
.../HoodiePositionBasedFileGroupRecordBuffer.java | 4 +-
.../read/HoodiePositionBasedSchemaHandler.java | 75 ++++++++
...odieFileGroupReaderBasedParquetFileFormat.scala | 2 +-
...stSparkFileFormatInternalRowReaderContext.scala | 72 +++++++
...stHoodiePositionBasedFileGroupRecordBuffer.java | 214 +++++++++++++++++++++
.../functional/TestFiltersInFileGroupReader.java | 109 +++++++++++
.../read/TestHoodieFileGroupReaderOnSpark.scala | 2 +-
.../TestSpark35RecordPositionMetadataColumn.scala | 143 ++++++++++++++
11 files changed, 812 insertions(+), 39 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 640f1219fbf..715e2d9a9ab 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -22,10 +22,14 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.IndexedRecord
import org.apache.hadoop.conf.Configuration
+import
org.apache.hudi.SparkFileFormatInternalRowReaderContext.{filterIsSafeForBootstrap,
getAppliedRequiredSchema}
+import org.apache.hudi.avro.AvroSchemaUtils
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieRecord
+import
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
import org.apache.hudi.common.util.ValidationUtils.checkState
-import org.apache.hudi.common.util.collection.{ClosableIterator,
CloseableMappingIterator}
+import org.apache.hudi.common.util.collection.{CachingIterator,
ClosableIterator, CloseableMappingIterator}
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory,
HoodieSparkParquetReader}
import org.apache.hudi.storage.{HoodieStorage, StorageConfiguration,
StoragePath}
import org.apache.hudi.util.CloseableInternalRowIterator
@@ -37,7 +41,7 @@ import
org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
SparkParquetReader}
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{LongType, MetadataBuilder, StructField,
StructType}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import scala.collection.mutable
@@ -53,12 +57,20 @@ import scala.collection.mutable
* not required for reading a file group with only
log files.
* @param recordKeyColumn column name for the recordkey
* @param filters spark filters that might be pushed down into the
reader
+ * @param requiredFilters filters that are required and should always be
used, even in merging situations
*/
class SparkFileFormatInternalRowReaderContext(parquetFileReader:
SparkParquetReader,
recordKeyColumn: String,
- filters: Seq[Filter]) extends
BaseSparkInternalRowReaderContext {
+ filters: Seq[Filter],
+ requiredFilters: Seq[Filter])
extends BaseSparkInternalRowReaderContext {
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
+ private lazy val bootstrapSafeFilters: Seq[Filter] =
filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
private val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] =
mutable.Map()
+ private lazy val allFilters = filters ++ requiredFilters
+
+ override def supportsParquetRowIndex: Boolean = {
+ HoodieSparkUtils.gteqSpark3_5
+ }
override def getFileRecordIterator(filePath: StoragePath,
start: Long,
@@ -66,6 +78,10 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
dataSchema: Schema,
requiredSchema: Schema,
storage: HoodieStorage):
ClosableIterator[InternalRow] = {
+ val hasRowIndexField =
AvroSchemaUtils.containsFieldInSchema(requiredSchema,
ROW_INDEX_TEMPORARY_COLUMN_NAME)
+ if (hasRowIndexField) {
+ assert(supportsParquetRowIndex())
+ }
val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
if (FSUtils.isLogFile(filePath)) {
val projection =
HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType)
@@ -84,8 +100,20 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
// each row if they are given. That is the only usage of the partition
values in the reader.
val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
.createPartitionedFile(InternalRow.empty, filePath, start, length)
+ val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType,
hasRowIndexField)
new CloseableInternalRowIterator(parquetFileReader.read(fileInfo,
- structType, StructType(Seq.empty), Seq.empty,
storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
+ readSchema, StructType(Seq.empty), readFilters,
storage.getConf.asInstanceOf[StorageConfiguration[Configuration]]))
+ }
+ }
+
+ private def getSchemaAndFiltersForRead(structType: StructType,
hasRowIndexField: Boolean): (StructType, Seq[Filter]) = {
+ val schemaForRead = getAppliedRequiredSchema(structType, hasRowIndexField)
+ if (!getHasLogFiles && !getNeedsBootstrapMerge) {
+ (schemaForRead, allFilters)
+ } else if (!getHasLogFiles && hasRowIndexField) {
+ (schemaForRead, bootstrapSafeFilters)
+ } else {
+ (schemaForRead, requiredFilters)
}
}
@@ -116,45 +144,153 @@ class
SparkFileFormatInternalRowReaderContext(parquetFileReader: SparkParquetRea
skeletonRequiredSchema: Schema,
dataFileIterator:
ClosableIterator[InternalRow],
dataRequiredSchema: Schema):
ClosableIterator[InternalRow] = {
- doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
- dataFileIterator.asInstanceOf[ClosableIterator[Any]])
+ doBootstrapMerge(skeletonFileIterator.asInstanceOf[ClosableIterator[Any]],
skeletonRequiredSchema,
+ dataFileIterator.asInstanceOf[ClosableIterator[Any]], dataRequiredSchema)
}
- protected def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any],
dataFileIterator: ClosableIterator[Any]): ClosableIterator[InternalRow] = {
- new ClosableIterator[Any] {
- val combinedRow = new JoinedRow()
+ private def doBootstrapMerge(skeletonFileIterator: ClosableIterator[Any],
+ skeletonRequiredSchema: Schema,
+ dataFileIterator: ClosableIterator[Any],
+ dataRequiredSchema: Schema):
ClosableIterator[InternalRow] = {
+ if (supportsParquetRowIndex()) {
+ assert(AvroSchemaUtils.containsFieldInSchema(skeletonRequiredSchema,
ROW_INDEX_TEMPORARY_COLUMN_NAME))
+ assert(AvroSchemaUtils.containsFieldInSchema(dataRequiredSchema,
ROW_INDEX_TEMPORARY_COLUMN_NAME))
+ val rowIndexColumn = new java.util.HashSet[String]()
+ rowIndexColumn.add(ROW_INDEX_TEMPORARY_COLUMN_NAME)
+ //always remove the row index column from the skeleton because the data
file will also have the same column
+ val skeletonProjection = projectRecord(skeletonRequiredSchema,
+ AvroSchemaUtils.removeFieldsFromSchema(skeletonRequiredSchema,
rowIndexColumn))
- override def hasNext: Boolean = {
- //If the iterators are out of sync it is probably due to filter
pushdown
- checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
- "Bootstrap data-file iterator and skeleton-file iterator have to be
in-sync!")
- dataFileIterator.hasNext && skeletonFileIterator.hasNext
+ //If we need to do position based merging with log files we will leave
the row index column at the end
+ val dataProjection = if (getHasLogFiles &&
getShouldMergeUseRecordPosition) {
+ getIdentityProjection
+ } else {
+ projectRecord(dataRequiredSchema,
+ AvroSchemaUtils.removeFieldsFromSchema(dataRequiredSchema,
rowIndexColumn))
}
- override def next(): Any = {
- (skeletonFileIterator.next(), dataFileIterator.next()) match {
- case (s: ColumnarBatch, d: ColumnarBatch) =>
- val numCols = s.numCols() + d.numCols()
- val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
- for (i <- 0 until numCols) {
- if (i < s.numCols()) {
- vecs(i) = s.column(i)
+ //row index will always be the last column
+ val skeletonRowIndex = skeletonRequiredSchema.getFields.size() - 1
+ val dataRowIndex = dataRequiredSchema.getFields.size() - 1
+
+ //Always use internal row for positional merge because
+ //we need to iterate row by row when merging
+ new CachingIterator[InternalRow] {
+ val combinedRow = new JoinedRow()
+
+ private def getNextSkeleton: (InternalRow, Long) = {
+ val nextSkeletonRow =
skeletonFileIterator.next().asInstanceOf[InternalRow]
+ (nextSkeletonRow, nextSkeletonRow.getLong(skeletonRowIndex))
+ }
+
+ private def getNextData: (InternalRow, Long) = {
+ val nextDataRow = dataFileIterator.next().asInstanceOf[InternalRow]
+ (nextDataRow, nextDataRow.getLong(dataRowIndex))
+ }
+
+ override def close(): Unit = {
+ skeletonFileIterator.close()
+ dataFileIterator.close()
+ }
+
+ override protected def doHasNext(): Boolean = {
+ if (!dataFileIterator.hasNext || !skeletonFileIterator.hasNext) {
+ false
+ } else {
+ var nextSkeleton = getNextSkeleton
+ var nextData = getNextData
+ while (nextSkeleton._2 != nextData._2) {
+ if (nextSkeleton._2 > nextData._2) {
+ if (!dataFileIterator.hasNext) {
+ return false
+ } else {
+ nextData = getNextData
+ }
} else {
- vecs(i) = d.column(i - s.numCols())
+ if (!skeletonFileIterator.hasNext) {
+ return false
+ } else {
+ nextSkeleton = getNextSkeleton
+ }
}
}
- assert(s.numRows() == d.numRows())
- sparkAdapter.makeColumnarBatch(vecs, s.numRows())
- case (_: ColumnarBatch, _: InternalRow) => throw new
IllegalStateException("InternalRow ColumnVector mismatch")
- case (_: InternalRow, _: ColumnarBatch) => throw new
IllegalStateException("InternalRow ColumnVector mismatch")
- case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+ nextRecord =
combinedRow(skeletonProjection.apply(nextSkeleton._1),
dataProjection.apply(nextData._1))
+ true
+ }
}
}
+ } else {
+ new ClosableIterator[Any] {
+ val combinedRow = new JoinedRow()
- override def close(): Unit = {
- skeletonFileIterator.close()
- dataFileIterator.close()
- }
- }.asInstanceOf[ClosableIterator[InternalRow]]
+ override def hasNext: Boolean = {
+ //If the iterators are out of sync it is probably due to filter
pushdown
+ checkState(dataFileIterator.hasNext == skeletonFileIterator.hasNext,
+ "Bootstrap data-file iterator and skeleton-file iterator have to
be in-sync!")
+ dataFileIterator.hasNext && skeletonFileIterator.hasNext
+ }
+
+ override def next(): Any = {
+ (skeletonFileIterator.next(), dataFileIterator.next()) match {
+ case (s: ColumnarBatch, d: ColumnarBatch) =>
+ //This will not be used until [HUDI-7693] is implemented
+ val numCols = s.numCols() + d.numCols()
+ val vecs: Array[ColumnVector] = new Array[ColumnVector](numCols)
+ for (i <- 0 until numCols) {
+ if (i < s.numCols()) {
+ vecs(i) = s.column(i)
+ } else {
+ vecs(i) = d.column(i - s.numCols())
+ }
+ }
+ assert(s.numRows() == d.numRows())
+ sparkAdapter.makeColumnarBatch(vecs, s.numRows())
+ case (_: ColumnarBatch, _: InternalRow) => throw new
IllegalStateException("InternalRow ColumnVector mismatch")
+ case (_: InternalRow, _: ColumnarBatch) => throw new
IllegalStateException("InternalRow ColumnVector mismatch")
+ case (s: InternalRow, d: InternalRow) => combinedRow(s, d)
+ }
+ }
+
+ override def close(): Unit = {
+ skeletonFileIterator.close()
+ dataFileIterator.close()
+ }
+ }.asInstanceOf[ClosableIterator[InternalRow]]
+ }
}
}
+
+object SparkFileFormatInternalRowReaderContext {
+ // From "namedExpressions.scala": Used to construct to record position field
metadata.
+ private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY =
"__file_source_generated_metadata_col"
+ private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
+ private val METADATA_COL_ATTR_KEY = "__metadata_col"
+
+ def getAppliedRequiredSchema(requiredSchema: StructType,
shouldAddRecordPosition: Boolean): StructType = {
+ if (shouldAddRecordPosition) {
+ val metadata = new MetadataBuilder()
+ .putString(METADATA_COL_ATTR_KEY, ROW_INDEX_TEMPORARY_COLUMN_NAME)
+ .putBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY, value = true)
+ .putString(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY,
ROW_INDEX_TEMPORARY_COLUMN_NAME)
+ .build()
+ val rowIndexField = StructField(ROW_INDEX_TEMPORARY_COLUMN_NAME,
LongType, nullable = false, metadata)
+ StructType(requiredSchema.fields.filterNot(isIndexTempColumn) :+
rowIndexField)
+ } else {
+ requiredSchema
+ }
+ }
+
+ /**
+ * Only valid if there is support for RowIndexField and no log files
+ * Filters are safe for bootstrap if meta col filters are independent from
data col filters.
+ */
+ def filterIsSafeForBootstrap(filter: Filter): Boolean = {
+ val metaRefCount = filter.references.count(c =>
HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(c.toLowerCase))
+ metaRefCount == filter.references.length || metaRefCount == 0
+ }
+
+ private def isIndexTempColumn(field: StructField): Boolean = {
+ field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
+ }
+
+}
\ No newline at end of file
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 81db7ef9e70..218e0eb4b03 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -58,6 +58,7 @@ public abstract class HoodieReaderContext<T> {
private Boolean hasLogFiles = null;
private Boolean hasBootstrapBaseFile = null;
private Boolean needsBootstrapMerge = null;
+ private Boolean shouldMergeUseRecordPosition = null;
// Getter and Setter for schemaHandler
public HoodieFileGroupReaderSchemaHandler<T> getSchemaHandler() {
@@ -122,6 +123,15 @@ public abstract class HoodieReaderContext<T> {
this.needsBootstrapMerge = needsBootstrapMerge;
}
+ // Getter and Setter for useRecordPosition
+ public boolean getShouldMergeUseRecordPosition() {
+ return shouldMergeUseRecordPosition;
+ }
+
+ public void setShouldMergeUseRecordPosition(boolean
shouldMergeUseRecordPosition) {
+ this.shouldMergeUseRecordPosition = shouldMergeUseRecordPosition;
+ }
+
// These internal key names are only used in memory for record metadata and
merging,
// and should not be persisted to storage.
public static final String INTERNAL_META_RECORD_KEY = "_0";
@@ -301,9 +311,21 @@ public abstract class HoodieReaderContext<T> {
* @return the record position in the base file.
*/
public long extractRecordPosition(T record, Schema schema, String fieldName,
long providedPositionIfNeeded) {
+ if (supportsParquetRowIndex()) {
+ Object position = getValue(record, schema, fieldName);
+ if (position != null) {
+ return (long) position;
+ } else {
+ throw new IllegalStateException("Record position extraction failed");
+ }
+ }
return providedPositionIfNeeded;
}
+ public boolean supportsParquetRowIndex() {
+ return false;
+ }
+
/**
* Constructs engine specific delete record.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index fe450bb0165..396da4166a7 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -106,10 +106,12 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
readerContext.setRecordMerger(this.recordMerger);
readerContext.setTablePath(tablePath);
readerContext.setLatestCommitTime(latestCommitTime);
+ readerContext.setShouldMergeUseRecordPosition(shouldUseRecordPosition);
readerContext.setHasLogFiles(!this.logFiles.isEmpty());
readerContext.setHasBootstrapBaseFile(hoodieBaseFileOption.isPresent() &&
hoodieBaseFileOption.get().getBootstrapBaseFile().isPresent());
- readerContext.setSchemaHandler(new
HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema,
- requestedSchema, internalSchemaOpt, tableConfig));
+ readerContext.setSchemaHandler(readerContext.supportsParquetRowIndex()
+ ? new HoodiePositionBasedSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig)
+ : new HoodieFileGroupReaderSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig));
this.outputConverter =
readerContext.getSchemaHandler().getOutputConverter();
this.recordBuffer = this.logFiles.isEmpty()
? null
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
index 29f05b015ed..bc25bb96f5e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java
@@ -53,7 +53,7 @@ import static
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STR
* {@link #hasNext} method is called.
*/
public class HoodiePositionBasedFileGroupRecordBuffer<T> extends
HoodieBaseFileGroupRecordBuffer<T> {
- public static final String ROW_INDEX_COLUMN_NAME = "row_index";
+ private static final String ROW_INDEX_COLUMN_NAME = "row_index";
public static final String ROW_INDEX_TEMPORARY_COLUMN_NAME =
"_tmp_metadata_" + ROW_INDEX_COLUMN_NAME;
private long nextRecordPosition = 0L;
@@ -180,7 +180,7 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T>
extends HoodieBaseFileG
// Handle merging.
while (baseFileIterator.hasNext()) {
T baseRecord = baseFileIterator.next();
- nextRecordPosition = readerContext.extractRecordPosition(baseRecord,
readerSchema, ROW_INDEX_COLUMN_NAME, nextRecordPosition);
+ nextRecordPosition = readerContext.extractRecordPosition(baseRecord,
readerSchema, ROW_INDEX_TEMPORARY_COLUMN_NAME, nextRecordPosition);
Pair<Option<T>, Map<String, Object>> logRecordInfo =
records.remove(nextRecordPosition++);
if (hasNextBaseRecord(baseRecord, logRecordInfo)) {
return true;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
new file mode 100644
index 00000000000..87c4266e350
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedSchemaHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read;
+
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+
+import org.apache.avro.Schema;
+
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchemaDedupNested;
+
+/**
+ * This class is responsible for handling the schema for the file group reader
that supports positional merge.
+ */
+public class HoodiePositionBasedSchemaHandler<T> extends
HoodieFileGroupReaderSchemaHandler<T> {
+ public HoodiePositionBasedSchemaHandler(HoodieReaderContext<T> readerContext,
+ Schema dataSchema,
+ Schema requestedSchema,
+ Option<InternalSchema>
internalSchemaOpt,
+ HoodieTableConfig hoodieTableConfig)
{
+ super(readerContext, dataSchema, requestedSchema, internalSchemaOpt,
hoodieTableConfig);
+ }
+
+ @Override
+ protected Schema prepareRequiredSchema() {
+ Schema preMergeSchema = super.prepareRequiredSchema();
+ return readerContext.getShouldMergeUseRecordPosition() &&
readerContext.getHasLogFiles()
+ ? addPositionalMergeCol(preMergeSchema)
+ : preMergeSchema;
+ }
+
+ @Override
+ public Pair<List<Schema.Field>,List<Schema.Field>>
getBootstrapRequiredFields() {
+ Pair<List<Schema.Field>,List<Schema.Field>> dataAndMetaCols =
super.getBootstrapRequiredFields();
+ if (readerContext.supportsParquetRowIndex()) {
+ if (!dataAndMetaCols.getLeft().isEmpty() &&
!dataAndMetaCols.getRight().isEmpty()) {
+ dataAndMetaCols.getLeft().add(getPositionalMergeField());
+ dataAndMetaCols.getRight().add(getPositionalMergeField());
+ }
+ }
+ return dataAndMetaCols;
+ }
+
+ private static Schema addPositionalMergeCol(Schema input) {
+ return appendFieldsToSchemaDedupNested(input,
Collections.singletonList(getPositionalMergeField()));
+ }
+
+ private static Schema.Field getPositionalMergeField() {
+ return new
Schema.Field(HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME,
+ Schema.create(Schema.Type.LONG), "", -1L);
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 0412d202148..c9e001dde59 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -149,7 +149,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val hoodieBaseFile = fileSlice.getBaseFile.get()
baseFileReader(createPartitionedFile(fileSliceMapping.getPartitionValues,
hoodieBaseFile.getStoragePath, 0, hoodieBaseFile.getFileLen))
} else {
- val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetFileReader.value,
tableState.recordKeyField, filters)
+ val readerContext = new
SparkFileFormatInternalRowReaderContext(parquetFileReader.value,
tableState.recordKeyField, filters, requiredFilters)
val storageConf = broadcastedStorageConf.value
val metaClient: HoodieTableMetaClient = HoodieTableMetaClient
.builder().setConf(storageConf).setBasePath(tableState.tablePath).build
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
new file mode 100644
index 00000000000..abf9d238dd6
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.execution.datasources.parquet
+
+import org.apache.hudi.SparkFileFormatInternalRowReaderContext
+import
org.apache.hudi.SparkFileFormatInternalRowReaderContext.filterIsSafeForBootstrap
+import org.apache.hudi.common.model.HoodieRecord
+import
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.spark.sql.sources.{And, IsNotNull, Or}
+import org.apache.spark.sql.types.{LongType, StringType, StructField,
StructType}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Test
+
+class TestSparkFileFormatInternalRowReaderContext extends
SparkClientFunctionalTestHarness {
+
+ @Test
+ def testBootstrapFilters(): Unit = {
+ val recordKeyField =
HoodieRecord.HoodieMetadataField.RECORD_KEY_METADATA_FIELD.getFieldName
+ val commitTimeField =
HoodieRecord.HoodieMetadataField.COMMIT_TIME_METADATA_FIELD.getFieldName
+
+ val recordKeyFilter = IsNotNull(recordKeyField)
+ assertTrue(filterIsSafeForBootstrap(recordKeyFilter))
+ val commitTimeFilter = IsNotNull(commitTimeField)
+ assertTrue(filterIsSafeForBootstrap(commitTimeFilter))
+
+ val dataFieldFilter = IsNotNull("someotherfield")
+ assertTrue(filterIsSafeForBootstrap(dataFieldFilter))
+
+ val legalComplexFilter = Or(recordKeyFilter, commitTimeFilter)
+ assertTrue(filterIsSafeForBootstrap(legalComplexFilter))
+
+ val illegalComplexFilter = Or(recordKeyFilter, dataFieldFilter)
+ assertFalse(filterIsSafeForBootstrap(illegalComplexFilter))
+
+ val illegalNestedFilter = And(legalComplexFilter, illegalComplexFilter)
+ assertFalse(filterIsSafeForBootstrap(illegalNestedFilter))
+
+ val legalNestedFilter = And(legalComplexFilter, recordKeyFilter)
+ assertTrue(filterIsSafeForBootstrap(legalNestedFilter))
+ }
+
+ @Test
+ def testGetAppliedRequiredSchema(): Unit = {
+ val fields = Array(
+ StructField("column_a", LongType, nullable = false),
+ StructField("column_b", StringType, nullable = false))
+ val requiredSchema = StructType(fields)
+
+ val appliedSchema: StructType =
SparkFileFormatInternalRowReaderContext.getAppliedRequiredSchema(
+ requiredSchema, true)
+ assertEquals(3, appliedSchema.fields.length)
+ assertTrue(appliedSchema.fields.map(f =>
f.name).contains(ROW_INDEX_TEMPORARY_COLUMN_NAME))
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
new file mode 100644
index 00000000000..e59e65bea3e
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestHoodiePositionBasedFileGroupRecordBuffer.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi;
+
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import
org.apache.hudi.common.table.read.HoodiePositionBasedFileGroupRecordBuffer;
+import org.apache.hudi.common.table.read.HoodiePositionBasedSchemaHandler;
+import org.apache.hudi.common.table.read.TestHoodieFileGroupReaderOnSpark;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
+import static org.apache.hudi.common.model.WriteOperationType.INSERT;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.createMetaClient;
+import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodiePositionBasedFileGroupRecordBuffer extends
TestHoodieFileGroupReaderOnSpark {
+ private final HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF);
+ private HoodieTableMetaClient metaClient;
+ private Schema avroSchema;
+ private HoodiePositionBasedFileGroupRecordBuffer<InternalRow> buffer;
+ private String partitionPath;
+
+ public void prepareBuffer(boolean useCustomMerger) throws Exception {
+ Map<String, String> writeConfigs = new HashMap<>();
+ writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
"parquet");
+ writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
+ writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"partition_path");
+ writeConfigs.put("hoodie.datasource.write.precombine.field", "timestamp");
+ writeConfigs.put("hoodie.payload.ordering.field", "timestamp");
+ writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
+ writeConfigs.put("hoodie.insert.shuffle.parallelism", "4");
+ writeConfigs.put("hoodie.upsert.shuffle.parallelism", "4");
+ writeConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "2");
+ writeConfigs.put("hoodie.delete.shuffle.parallelism", "1");
+ writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0");
+ writeConfigs.put("hoodie.compact.inline", "false");
+ writeConfigs.put(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(), "true");
+ commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)),
INSERT.value(), writeConfigs);
+
+ String[] partitionPaths = dataGen.getPartitionPaths();
+ String[] partitionValues = new String[1];
+ partitionPath = partitionPaths[0];
+ partitionValues[0] = partitionPath;
+
+ metaClient = createMetaClient(getStorageConf(), getBasePath());
+ avroSchema = new TableSchemaResolver(metaClient).getTableAvroSchema();
+ Option<String[]> partitionFields =
metaClient.getTableConfig().getPartitionFields();
+ Option<String> partitionNameOpt =
StringUtils.isNullOrEmpty(partitionPaths[0])
+ ? Option.empty() : Option.of(partitionPaths[0]);
+
+ HoodieReaderContext ctx = getHoodieReaderContext(getBasePath(),
avroSchema, getStorageConf());
+ ctx.setTablePath(metaClient.getBasePathV2().toString());
+ ctx.setLatestCommitTime(metaClient.createNewInstantTime());
+ ctx.setShouldMergeUseRecordPosition(true);
+ ctx.setHasBootstrapBaseFile(false);
+ ctx.setHasLogFiles(true);
+ ctx.setNeedsBootstrapMerge(false);
+ ctx.setRecordMerger(useCustomMerger ? new CustomMerger() : new
HoodieSparkRecordMerger());
+ ctx.setSchemaHandler(new HoodiePositionBasedSchemaHandler<>(ctx,
avroSchema, avroSchema,
+ Option.empty(), metaClient.getTableConfig()));
+ buffer = new HoodiePositionBasedFileGroupRecordBuffer<>(
+ ctx,
+ metaClient,
+ partitionNameOpt,
+ partitionFields,
+ ctx.getRecordMerger(),
+ new TypedProperties(),
+ 1024 * 1024 * 1000,
+ metaClient.getTempFolderPath(),
+ ExternalSpillableMap.DiskMapType.ROCKS_DB,
+ false);
+ }
+
+ public Map<HoodieLogBlock.HeaderMetadataType, String> getHeader() {
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA,
avroSchema.toString());
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+ return header;
+ }
+
+ public List<DeleteRecord> getDeleteRecords() throws IOException,
URISyntaxException {
+ SchemaTestUtil testUtil = new SchemaTestUtil();
+ List<IndexedRecord> records = testUtil.generateHoodieTestRecords(0, 100);
+
+ List<DeleteRecord> deletedRecords = records.stream()
+ .map(s -> (DeleteRecord.create(((GenericRecord)
s).get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString(),
+ ((GenericRecord)
s).get(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString())))
+ .collect(Collectors.toList()).subList(0, 50);
+ return deletedRecords;
+ }
+
+ public HoodieDeleteBlock getDeleteBlockWithPositions() throws IOException,
URISyntaxException {
+ List<DeleteRecord> deletedRecords = getDeleteRecords();
+ List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+
+ long position = 0;
+ for (DeleteRecord dr : deletedRecords) {
+ deleteRecordList.add(Pair.of(dr, position++));
+ }
+ return new HoodieDeleteBlock(deleteRecordList, true, getHeader());
+ }
+
+ public HoodieDeleteBlock getDeleteBlockWithoutPositions() throws
IOException, URISyntaxException {
+ List<DeleteRecord> deletedRecords = getDeleteRecords();
+ List<Pair<DeleteRecord, Long>> deleteRecordList = new ArrayList<>();
+
+ for (DeleteRecord dr : deletedRecords) {
+ deleteRecordList.add(Pair.of(dr, -1L));
+ }
+ return new HoodieDeleteBlock(deleteRecordList, true, getHeader());
+ }
+
+ @Test
+ public void testProcessDeleteBlockWithPositions() throws Exception {
+ prepareBuffer(false);
+ HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
+ buffer.processDeleteBlock(deleteBlock);
+ assertEquals(50, buffer.getLogRecords().size());
+ // With record positions, we do not need the record keys.
+
assertNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
+ }
+
+ @Test
+ public void testProcessDeleteBlockWithCustomMerger() throws Exception {
+ prepareBuffer(true);
+ HoodieDeleteBlock deleteBlock = getDeleteBlockWithPositions();
+ buffer.processDeleteBlock(deleteBlock);
+ assertEquals(50, buffer.getLogRecords().size());
+
assertNotNull(buffer.getLogRecords().get(0L).getRight().get(INTERNAL_META_RECORD_KEY));
+ }
+
+ @Test
+ public void testProcessDeleteBlockWithoutPositions() throws Exception {
+ prepareBuffer(false);
+ HoodieDeleteBlock deleteBlock = getDeleteBlockWithoutPositions();
+ Exception exception = assertThrows(
+ HoodieValidationException.class, () ->
buffer.processDeleteBlock(deleteBlock));
+ assertTrue(exception.getMessage().contains("No record position info is
found"));
+ }
+
+ public static class CustomMerger implements HoodieRecordMerger {
+ @Override
+ public String getMergingStrategy() {
+ return "random_strategy";
+ }
+
+ @Override
+ public Option<Pair<HoodieRecord, Schema>> merge(
+ HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema
newSchema, TypedProperties props
+ ) throws IOException {
+ throw new IOException("Not implemented");
+ }
+
+ @Override
+ public HoodieRecord.HoodieRecordType getRecordType() {
+ return HoodieRecord.HoodieRecordType.SPARK;
+ }
+ }
+}
+
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
new file mode 100644
index 00000000000..b8ca6373237
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestFiltersInFileGroupReader.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional;
+
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.internal.SQLConf;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Map;
+
+
+/**
+ * Ensure that parquet filters are not being pushed down when they shouldn't be
+ */
+@Tag("functional")
+public class TestFiltersInFileGroupReader extends TestBootstrapReadBase {
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testFiltersInFileFormat(boolean mergeUseRecordPositions) {
+ this.bootstrapType = "mixed";
+ this.dashPartitions = true;
+ this.tableType = HoodieTableType.MERGE_ON_READ;
+ this.nPartitions = 2;
+ this.nInserts = 100000;
+ this.nUpdates = 20000;
+ sparkSession.conf().set(SQLConf.PARQUET_RECORD_FILTER_ENABLED().key(),
"true");
+ setupDirs();
+
+ //do bootstrap
+ Map<String, String> options = setBootstrapOptions();
+ Dataset<Row> bootstrapDf = sparkSession.emptyDataFrame();
+ bootstrapDf.write().format("hudi")
+ .options(options)
+ .mode(SaveMode.Overwrite)
+ .save(bootstrapTargetPath);
+ runComparison(mergeUseRecordPositions);
+
+
+ options = basicOptions();
+ options.put(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(),
String.valueOf(mergeUseRecordPositions));
+ options.put(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key(),
String.valueOf(mergeUseRecordPositions));
+
+ doUpdate(options, "001");
+ runComparison(mergeUseRecordPositions);
+
+ doInsert(options, "002");
+ runComparison(mergeUseRecordPositions);
+
+ doDelete(options, "003");
+ runComparison(mergeUseRecordPositions);
+ }
+
+ protected void runComparison(boolean mergeUseRecordPositions) {
+ compareDf(createDf(hudiBasePath, true, mergeUseRecordPositions),
createDf(hudiBasePath, false, false));
+ compareDf(createDf(bootstrapTargetPath, true, mergeUseRecordPositions),
createDf(bootstrapTargetPath, false, false));
+ compareDf(createDf2(hudiBasePath, true, mergeUseRecordPositions),
createDf2(hudiBasePath, false, false));
+ compareDf(createDf2(bootstrapTargetPath, true, mergeUseRecordPositions),
createDf2(bootstrapTargetPath, false, false));
+ }
+
+ protected Dataset<Row> createDf(String tableBasePath, Boolean
fgReaderEnabled, Boolean mergeUseRecordPositions) {
+ //The chances of a uuid containing 00 with the 8-4-4-4-12 format is around
90%
+ //for bootstrap, _hoodie_record_key is in the skeleton while begin_lat is
in the data
+ //We have a record key filter so that tests MORs filter pushdown with
position based merging
+ return sparkSession.read().format("hudi")
+ .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
fgReaderEnabled)
+ .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(),
mergeUseRecordPositions)
+ .load(tableBasePath)
+ .drop("city_to_state")
+ .where("begin_lat > 0.5 and _hoodie_record_key LIKE '%00%'");
+ }
+
+ protected Dataset<Row> createDf2(String tableBasePath, Boolean
fgReaderEnabled, Boolean mergeUseRecordPositions) {
+ //The chances of a uuid containing 00 with the 8-4-4-4-12 format is around
90%
+ //for bootstrap, _hoodie_record_key is in the skeleton while begin_lat is
in the data
+ //We have a record key filter so that tests MORs filter pushdown with
position based merging
+ return sparkSession.read().format("hudi")
+ .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
fgReaderEnabled)
+ .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key(),
mergeUseRecordPositions)
+ .load(tableBasePath)
+ .drop("city_to_state")
+ .where("begin_lat > 0.5 or _hoodie_record_key LIKE '%00%'");
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 49277884eb1..747fcb9a2eb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -84,7 +84,7 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
val reader = sparkAdapter.createParquetFileReader(vectorized = false,
spark.sessionState.conf, Map.empty,
storageConf.unwrapAs(classOf[Configuration]))
val metaClient =
HoodieTableMetaClient.builder().setConf(storageConf).setBasePath(tablePath).build
val recordKeyField = new
HoodieSparkRecordMerger().getMandatoryFieldsForMerging(metaClient.getTableConfig)(0)
- new SparkFileFormatInternalRowReaderContext(reader, recordKeyField,
Seq.empty)
+ new SparkFileFormatInternalRowReaderContext(reader, recordKeyField,
Seq.empty, Seq.empty)
}
override def commitToTable(recordList: util.List[String], operation: String,
options: util.Map[String, String]): Unit = {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
new file mode 100644
index 00000000000..61b7b0ded04
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestSpark35RecordPositionMetadataColumn.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table.read
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.SparkAdapterSupport.sparkAdapter
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
+import org.apache.hudi.common.model.HoodieTableType
+import org.apache.hudi.common.testutils.HoodieTestTable
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.hadoop.fs.HadoopFSUtils
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
+import org.apache.hudi.util.CloseableInternalRowIterator
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils,
SparkFileFormatInternalRowReaderContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField,
StructType}
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals,
assertFalse}
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+class TestSpark35RecordPositionMetadataColumn extends
SparkClientFunctionalTestHarness {
+ private val PARQUET_FORMAT = "parquet"
+ private val SPARK_MERGER = "org.apache.hudi.HoodieSparkRecordMerger"
+
+ @BeforeEach
+ def setUp(): Unit = {
+ val _spark = spark
+ import _spark.implicits._
+
+ val userToCountryDF = Seq(
+ (1, "US", "1001"),
+ (2, "China", "1003"),
+ (3, "US", "1002"),
+ (4, "Singapore", "1004"))
+ .toDF("userid", "country", "ts")
+
+ // Create the file with record positions.
+ userToCountryDF.write.format("hudi")
+ .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "userid")
+ .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts")
+ .option(HoodieWriteConfig.TBL_NAME.key, "user_to_country")
+ .option(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, SPARK_MERGER)
+ .option(HoodieWriteConfig.WRITE_RECORD_POSITIONS.key, "true")
+ .option(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key,
PARQUET_FORMAT)
+ .option(
+ DataSourceWriteOptions.TABLE_TYPE.key(),
+ HoodieTableType.MERGE_ON_READ.name())
+ .save(basePath)
+ }
+
+ @Test
+ def testRecordPositionColumn(): Unit = {
+ val _spark = spark
+ // Prepare the schema
+ val dataSchema = new StructType(
+ Array(
+ StructField("userid", IntegerType, nullable = false),
+ StructField("country", StringType, nullable = false),
+ StructField("ts", StringType, nullable = false)
+ )
+ )
+
+ // Prepare the file and Parquet file reader.
+ _spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
+
+ val hadoopConf = new
Configuration(spark().sparkContext.hadoopConfiguration)
+ val props = Map("spark.sql.parquet.enableVectorizedReader" -> "false")
+ _spark.conf.set("spark.sql.parquet.enableVectorizedReader", "false")
+ val reader = sparkAdapter.createParquetFileReader(vectorized = false,
_spark.sessionState.conf, props, hadoopConf)
+
+ val metaClient =
getHoodieMetaClient(HadoopFSUtils.getStorageConfWithCopy(_spark.sparkContext.hadoopConfiguration),
basePath)
+ val allBaseFiles = HoodieTestTable.of(metaClient).listAllBaseFiles
+ assertFalse(allBaseFiles.isEmpty)
+
+ val requiredSchema =
SparkFileFormatInternalRowReaderContext.getAppliedRequiredSchema(dataSchema,
+ new SparkFileFormatInternalRowReaderContext(reader, "userid",
Seq.empty, Seq.empty).supportsParquetRowIndex)
+
+ // Confirm if the schema is as expected.
+ if (HoodieSparkUtils.gteqSpark3_5) {
+ assertEquals(4, requiredSchema.fields.length)
+ assertEquals(
+ "StructField(_tmp_metadata_row_index,LongType,false)",
+ requiredSchema.fields(3).toString)
+ }
+
+ // Make sure we can read all the positions out from base file.
+ // Here we don't add filters since enabling filter push-down
+ // for parquet file is tricky.
+ if (HoodieSparkUtils.gteqSpark3_5) {
+ val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
+ .createPartitionedFile(
+ InternalRow.empty,
+ allBaseFiles.get(0).getPath,
+ 0,
+ allBaseFiles.get(0).getLength)
+ val iterator = new CloseableInternalRowIterator(reader.read(fileInfo,
requiredSchema,
+ StructType(Seq.empty), Seq.empty, new
HadoopStorageConfiguration(hadoopConf)))
+ var rowIndices: Set[Long] = Set()
+ while (iterator.hasNext) {
+ val row = iterator.next()
+ rowIndices += row.getLong(3)
+ }
+ iterator.close()
+ val expectedRowIndices: Set[Long] = Set(0L, 1L, 2L, 3L)
+ assertEquals(expectedRowIndices, rowIndices)
+ }
+ }
+
+ @Test
+ def testUseFileGroupReaderDirectly(): Unit = {
+ val _spark = spark
+ import _spark.implicits._
+
+ // Read the records out with record positions.
+ val allRecords = _spark.read.format("hudi")
+ .option(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS.key, "true")
+ .load(basePath)
+
+ // Ensure the number of outcomes are correct for all Spark versions
+ // including Spark3.5.
+ val usRecords = allRecords
+ .select("userid")
+ .filter("country = 'US'").map(_.getInt(0)).collect()
+ assertArrayEquals(Array[Int](1, 3), usRecords)
+ }
+}