This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0cb77908357 [HUDI-7042] Fix new filegroup reader (#10003)
0cb77908357 is described below
commit 0cb7790835775c39b5cf71683b95f7618c6c95cc
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Nov 8 09:58:28 2023 +0530
[HUDI-7042] Fix new filegroup reader (#10003)
---
.../hudi/common/model/HoodieSparkRecord.java | 2 +
.../read/HoodieBaseFileGroupRecordBuffer.java | 2 +-
.../common/table/read/HoodieFileGroupReader.java | 2 +-
.../table/read/HoodieFileGroupRecordBuffer.java | 2 +-
...odieFileGroupReaderBasedParquetFileFormat.scala | 69 +++++++++++++++++++---
.../hudi/functional/TestMORDataSourceStorage.scala | 16 +++--
.../functional/TestPartialUpdateAvroPayload.scala | 23 +++++---
style/scalastyle.xml | 2 +-
8 files changed, 93 insertions(+), 25 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index 3d59ad27257..5cb8800411c 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.catalyst.expressions.JoinedRow;
+import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.types.DataType;
@@ -447,6 +448,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
|| schema != null && (
data instanceof HoodieInternalRow
|| data instanceof GenericInternalRow
+ || data instanceof SpecificInternalRow
||
SparkAdapterSupport$.MODULE$.sparkAdapter().isColumnarBatchRow(data));
ValidationUtils.checkState(isValid);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index 4a1bd08e4ef..90ebf71dfb1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -80,7 +80,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T>
implements HoodieFileGr
}
@Override
- public void setBaseFileIteraotr(ClosableIterator<T> baseFileIterator) {
+ public void setBaseFileIterator(ClosableIterator<T> baseFileIterator) {
this.baseFileIterator = baseFileIterator;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index b655238412d..2850a77d709 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -154,7 +154,7 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
baseFilePath.get().getHadoopPath(), start, length,
readerState.baseFileAvroSchema, readerState.baseFileAvroSchema, hadoopConf)
: new EmptyIterator<>();
scanLogFiles();
- recordBuffer.setBaseFileIteraotr(baseFileIterator);
+ recordBuffer.setBaseFileIterator(baseFileIterator);
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index 680bbf9d705..0bf27cfc71e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -100,7 +100,7 @@ public interface HoodieFileGroupRecordBuffer<T> {
*
* @param baseFileIterator
*/
- void setBaseFileIteraotr(ClosableIterator<T> baseFileIterator);
+ void setBaseFileIterator(ClosableIterator<T> baseFileIterator);
/**
* Check if next merged record exists.
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index d8278ea8218..3fb80509b61 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -33,8 +33,10 @@ import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping,
HoodieSparkUtils, HoodieTableSchema, HoodieTableState,
MergeOnReadSnapshotRelation, HoodiePartitionFileSliceMapping,
SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
+import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.sources.Filter
@@ -42,6 +44,7 @@ import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
import org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD
+import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.jdk.CollectionConverters.asScalaIteratorConverter
@@ -89,6 +92,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState:
HoodieTableState,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration):
PartitionedFile => Iterator[InternalRow] = {
+ val outputSchema = StructType(requiredSchema.fields ++
partitionSchema.fields)
val requiredSchemaWithMandatory =
generateRequiredSchemaWithMandatory(requiredSchema, dataSchema, partitionSchema)
val requiredSchemaSplits = requiredSchemaWithMandatory.fields.partition(f
=> HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION.contains(f.name))
val requiredMeta = StructType(requiredSchemaSplits._1)
@@ -117,7 +121,6 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
val logFiles = getLogFilesFromSlice(fileSlice)
if (requiredSchemaWithMandatory.isEmpty) {
val baseFile = createPartitionedFile(partitionValues,
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
- // TODO: Use FileGroupReader here: HUDI-6942.
baseFileReader(baseFile)
} else if (bootstrapFileOpt.isPresent) {
// TODO: Use FileGroupReader here: HUDI-6942.
@@ -134,9 +137,11 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
hoodieBaseFile,
logFiles,
requiredSchemaWithMandatory,
+ outputSchema,
+ partitionSchema,
broadcastedHadoopConf.value.value,
- file.start,
- file.length,
+ 0,
+ hoodieBaseFile.getFileLen,
shouldUseRecordPosition
)
}
@@ -180,6 +185,8 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
baseFile: HoodieBaseFile,
logFiles: List[HoodieLogFile],
requiredSchemaWithMandatory: StructType,
+ outputSchema: StructType,
+ partitionSchema: StructType,
hadoopConf: Configuration,
start: Long,
length: Long,
@@ -201,19 +208,65 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
length,
shouldUseRecordPosition)
reader.initRecordIterators()
-
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala
+ // Append partition values to rows and project to output schema
+ appendPartitionAndProject(
+
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala,
+ requiredSchemaWithMandatory,
+ partitionSchema,
+ outputSchema,
+ partitionValues)
+ }
+
+ private def appendPartitionAndProject(iter: Iterator[InternalRow],
+ inputSchema: StructType,
+ partitionSchema: StructType,
+ to: StructType,
+ partitionValues: InternalRow):
Iterator[InternalRow] = {
+ if (partitionSchema.isEmpty) {
+ projectSchema(iter, inputSchema, to)
+ } else {
+ val unsafeProjection =
generateUnsafeProjection(StructType(inputSchema.fields ++
partitionSchema.fields), to)
+ val joinedRow = new JoinedRow()
+ iter.map(d => unsafeProjection(joinedRow(d, partitionValues)))
+ }
}
- def generateRequiredSchemaWithMandatory(requiredSchema: StructType,
- dataSchema: StructType,
- partitionSchema: StructType):
StructType = {
+ private def projectSchema(iter: Iterator[InternalRow],
+ from: StructType,
+ to: StructType): Iterator[InternalRow] = {
+ val unsafeProjection = generateUnsafeProjection(from, to)
+ iter.map(d => unsafeProjection(d))
+ }
+
+ private def generateRequiredSchemaWithMandatory(requiredSchema: StructType,
+ dataSchema: StructType,
+ partitionSchema:
StructType): StructType = {
+ // Helper method to get the StructField for nested fields
+ @tailrec
+ def findNestedField(schema: StructType, fieldParts: Array[String]):
Option[StructField] = {
+ fieldParts.toList match {
+ case head :: Nil => schema.fields.find(_.name == head) // If it's the
last part, find and return the field
+ case head :: tail => // If there are more parts, find the field and
its nested fields
+ schema.fields.find(_.name == head) match {
+ case Some(StructField(_, nested: StructType, _, _)) =>
findNestedField(nested, tail.toArray)
+ case _ => None // The path is not valid
+ }
+ case _ => None // Empty path, should not happen if the input is correct
+ }
+ }
+
+ // If not MergeOnRead or if projection is compatible
if (isIncremental) {
StructType(dataSchema.toArray ++ partitionSchema.fields)
} else if (!isMOR ||
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState)) {
val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
for (field <- mandatoryFields) {
if (requiredSchema.getFieldIndex(field).isEmpty) {
- val fieldToAdd =
dataSchema.fields(dataSchema.getFieldIndex(field).get)
+ // Support for nested fields
+ val fieldParts = field.split("\\.")
+ val fieldToAdd = findNestedField(dataSchema, fieldParts).getOrElse(
+ throw new IllegalArgumentException(s"Field $field does not exist
in the data schema")
+ )
added.append(fieldToAdd)
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
index 300c9ab877b..acf406c9fbc 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSourceStorage.scala
@@ -19,7 +19,7 @@
package org.apache.hudi.functional
-import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.config.{HoodieMetadataConfig, HoodieReaderConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
@@ -46,12 +46,12 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
@ParameterizedTest
@CsvSource(Array(
- "true,",
- "true,fare.currency",
- "false,",
- "false,fare.currency"
+ "true,,false",
+ "true,fare.currency,false",
+ "false,,false",
+ "false,fare.currency,true"
))
- def testMergeOnReadStorage(isMetadataEnabled: Boolean, preCombineField:
String): Unit = {
+ def testMergeOnReadStorage(isMetadataEnabled: Boolean, preCombineField:
String, useFileGroupReader: Boolean): Unit = {
val commonOpts = Map(
"hoodie.insert.shuffle.parallelism" -> "4",
"hoodie.upsert.shuffle.parallelism" -> "4",
@@ -70,6 +70,10 @@ class TestMORDataSourceStorage extends
SparkClientFunctionalTestHarness {
if (!StringUtils.isNullOrEmpty(preCombineField)) {
options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() ->
preCombineField)
}
+ if (useFileGroupReader) {
+ options += (DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key()
-> String.valueOf(useFileGroupReader))
+ options += (HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key() ->
String.valueOf(useFileGroupReader))
+ }
val dataGen = new HoodieTestDataGenerator(0xDEEF)
val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
// Bulk Insert Operation
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
index 172d0a7f945..1e605b092bf 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartialUpdateAvroPayload.scala
@@ -18,17 +18,16 @@
package org.apache.hudi.functional
-import java.util.function.Consumer
-
import org.apache.hadoop.fs.FileSystem
import org.apache.hudi.HoodieConversionUtils.toJavaOption
-import org.apache.hudi.{DataSourceWriteOptions, QuickstartUtils}
import org.apache.hudi.QuickstartUtils.{convertToStringList,
getQuickstartWriteConfigs}
+import org.apache.hudi.common.config.HoodieReaderConfig
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.common.util
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
QuickstartUtils}
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{lit, typedLit}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
@@ -36,8 +35,9 @@ import org.apache.spark.sql.types.{DoubleType, StringType}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.EnumSource
+import org.junit.jupiter.params.provider.CsvSource
+import java.util.function.Consumer
import scala.collection.JavaConversions._
class TestPartialUpdateAvroPayload extends HoodieClientTestBase {
@@ -66,8 +66,14 @@ class TestPartialUpdateAvroPayload extends
HoodieClientTestBase {
}
@ParameterizedTest
- @EnumSource(classOf[HoodieTableType])
- def testPartialUpdatesAvroPayloadPrecombine(hoodieTableType:
HoodieTableType): Unit = {
+ @CsvSource(Array(
+ "COPY_ON_WRITE,false",
+ "MERGE_ON_READ,false",
+ "COPY_ON_WRITE,true",
+ "MERGE_ON_READ,true"
+ ))
+ def testPartialUpdatesAvroPayloadPrecombine(tableType: String,
useFileGroupReader: Boolean): Unit = {
+ val hoodieTableType = HoodieTableType.valueOf(tableType)
val dataGenerator = new QuickstartUtils.DataGenerator()
val records = convertToStringList(dataGenerator.generateInserts(1))
val recordsRDD = spark.sparkContext.parallelize(records, 2)
@@ -116,7 +122,10 @@ class TestPartialUpdateAvroPayload extends
HoodieClientTestBase {
.mode(SaveMode.Append)
.save(basePath)
- val finalDF = spark.read.format("hudi").load(basePath)
+ val finalDF = spark.read.format("hudi")
+ .option(DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(),
String.valueOf(useFileGroupReader))
+ .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
String.valueOf(useFileGroupReader))
+ .load(basePath)
assertEquals(finalDF.select("rider").collectAsList().get(0).getString(0),
upsert1DF.select("rider").collectAsList().get(0).getString(0))
assertEquals(finalDF.select("driver").collectAsList().get(0).getString(0),
upsert2DF.select("driver").collectAsList().get(0).getString(0))
assertEquals(finalDF.select("fare").collectAsList().get(0).getDouble(0),
upsert3DF.select("fare").collectAsList().get(0).getDouble(0))
diff --git a/style/scalastyle.xml b/style/scalastyle.xml
index bb4bcbeb60d..463ceebef30 100644
--- a/style/scalastyle.xml
+++ b/style/scalastyle.xml
@@ -62,7 +62,7 @@
</check>
<check level="error"
class="org.scalastyle.scalariform.ParameterNumberChecker" enabled="true">
<parameters>
- <parameter name="maxParameters"><![CDATA[10]]></parameter>
+ <parameter name="maxParameters"><![CDATA[12]]></parameter>
</parameters>
</check>
<check level="error" class="org.scalastyle.scalariform.MagicNumberChecker"
enabled="false">