This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 530640f61a5 [HUDI-7055] Support reading only log files in file group
reader-based Spark parquet file format (#10020)
530640f61a5 is described below
commit 530640f61a5e3f8103d0b66ff866a2de995156b7
Author: Y Ethan Guo <[email protected]>
AuthorDate: Wed Nov 8 18:45:45 2023 -0800
[HUDI-7055] Support reading only log files in file group reader-based Spark
parquet file format (#10020)
---
.../SparkFileFormatInternalRowReaderContext.scala | 16 +++--
.../table/read/TestHoodieFileGroupReaderBase.java | 71 ++++++++++++++++------
...odieFileGroupReaderBasedParquetFileFormat.scala | 39 +++++++-----
.../read/TestHoodieFileGroupReaderOnSpark.scala | 6 +-
4 files changed, 92 insertions(+), 40 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index af3d3fd239c..beca8852686 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -44,10 +44,12 @@ import scala.collection.mutable
*
* This uses Spark parquet reader to read parquet data files or parquet log
blocks.
*
- * @param baseFileReader A reader that transforms a {@link PartitionedFile}
to an iterator of {@link InternalRow}.
+ * @param baseFileReader A reader that transforms a {@link PartitionedFile}
to an iterator of
+ * {@link InternalRow}. This is required for reading
the base file and
+ * not required for reading a file group with only log
files.
* @param partitionValues The values for a partition in which the file group
lives.
*/
-class SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile
=> Iterator[InternalRow],
+class SparkFileFormatInternalRowReaderContext(baseFileReader:
Option[PartitionedFile => Iterator[InternalRow]],
partitionValues: InternalRow)
extends BaseSparkInternalRowReaderContext {
lazy val sparkAdapter = SparkAdapterSupport.sparkAdapter
lazy val sparkFileReaderFactory = new HoodieSparkFileReaderFactory
@@ -62,11 +64,11 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile =>
val fileInfo = sparkAdapter.getSparkPartitionedFileUtils
.createPartitionedFile(partitionValues, filePath, start, length)
if (FSUtils.isLogFile(filePath)) {
- val structType: StructType =
HoodieInternalRowUtils.getCachedSchema(dataSchema)
+ val structType: StructType =
HoodieInternalRowUtils.getCachedSchema(requiredSchema)
val projection: UnsafeProjection =
HoodieInternalRowUtils.getCachedUnsafeProjection(structType, structType)
new CloseableMappingIterator[InternalRow, UnsafeRow](
sparkFileReaderFactory.newParquetFileReader(conf,
filePath).asInstanceOf[HoodieSparkParquetReader]
- .getInternalRowIterator(dataSchema, dataSchema),
+ .getInternalRowIterator(dataSchema, requiredSchema),
new java.util.function.Function[InternalRow, UnsafeRow] {
override def apply(data: InternalRow): UnsafeRow = {
// NOTE: We have to do [[UnsafeProjection]] of incoming
[[InternalRow]] to convert
@@ -75,7 +77,11 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: PartitionedFile =>
}
}).asInstanceOf[ClosableIterator[InternalRow]]
} else {
- new CloseableInternalRowIterator(baseFileReader.apply(fileInfo))
+ if (baseFileReader.isEmpty) {
+ throw new IllegalArgumentException("Base file reader is missing when
instantiating "
+ + "SparkFileFormatInternalRowReaderContext.");
+ }
+ new CloseableInternalRowIterator(baseFileReader.get.apply(fileInfo))
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
index febc0d32466..439948a6cc9 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderBase.java
@@ -40,8 +40,9 @@ import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.Collections;
@@ -78,40 +79,71 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
Schema schema,
String fileGroupId);
- @Test
- public void testReadFileGroupInMergeOnReadTable() throws Exception {
- Map<String, String> writeConfigs = new HashMap<>();
- writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
"parquet");
- writeConfigs.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
- writeConfigs.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"partition_path");
- writeConfigs.put("hoodie.datasource.write.precombine.field", "timestamp");
- writeConfigs.put("hoodie.payload.ordering.field", "timestamp");
- writeConfigs.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
- writeConfigs.put("hoodie.insert.shuffle.parallelism", "4");
- writeConfigs.put("hoodie.upsert.shuffle.parallelism", "4");
- writeConfigs.put("hoodie.bulkinsert.shuffle.parallelism", "2");
- writeConfigs.put("hoodie.delete.shuffle.parallelism", "1");
- writeConfigs.put("hoodie.merge.small.file.group.candidates.limit", "0");
- writeConfigs.put("hoodie.compact.inline", "false");
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "parquet"})
+ public void testReadFileGroupInMergeOnReadTable(String logDataBlockFormat)
throws Exception {
+ Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
+ writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logDataBlockFormat);
try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
// One commit; reading one file group containing a base file only
commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)),
INSERT.value(), writeConfigs);
- validateOutputFromFileGroupReader(getHadoopConf(), getBasePath(),
dataGen.getPartitionPaths(), 0);
+ validateOutputFromFileGroupReader(
+ getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), true,
0);
// Two commits; reading one file group containing a base file and a log
file
commitToTable(recordsToStrings(dataGen.generateUpdates("002", 100)),
UPSERT.value(), writeConfigs);
- validateOutputFromFileGroupReader(getHadoopConf(), getBasePath(),
dataGen.getPartitionPaths(), 1);
+ validateOutputFromFileGroupReader(
+ getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), true,
1);
// Three commits; reading one file group containing a base file and two
log files
commitToTable(recordsToStrings(dataGen.generateUpdates("003", 100)),
UPSERT.value(), writeConfigs);
- validateOutputFromFileGroupReader(getHadoopConf(), getBasePath(),
dataGen.getPartitionPaths(), 2);
+ validateOutputFromFileGroupReader(
+ getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), true,
2);
}
}
+ @ParameterizedTest
+ @ValueSource(strings = {"avro", "parquet"})
+ public void testReadLogFilesOnlyInMergeOnReadTable(String
logDataBlockFormat) throws Exception {
+ Map<String, String> writeConfigs = new HashMap<>(getCommonConfigs());
+ writeConfigs.put(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
logDataBlockFormat);
+ // Use InMemoryIndex to generate log only mor table
+ writeConfigs.put("hoodie.index.type", "INMEMORY");
+
+ try (HoodieTestDataGenerator dataGen = new
HoodieTestDataGenerator(0xDEEF)) {
+ // One commit; reading one file group containing a base file only
+ commitToTable(recordsToStrings(dataGen.generateInserts("001", 100)),
INSERT.value(), writeConfigs);
+ validateOutputFromFileGroupReader(
+ getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), false,
1);
+
+ // Two commits; reading one file group containing a base file and a log
file
+ commitToTable(recordsToStrings(dataGen.generateUpdates("002", 100)),
UPSERT.value(), writeConfigs);
+ validateOutputFromFileGroupReader(
+ getHadoopConf(), getBasePath(), dataGen.getPartitionPaths(), false,
2);
+ }
+ }
+
+ private Map<String, String> getCommonConfigs() {
+ Map<String, String> configMapping = new HashMap<>();
+ configMapping.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"_row_key");
+ configMapping.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"partition_path");
+ configMapping.put("hoodie.datasource.write.precombine.field", "timestamp");
+ configMapping.put("hoodie.payload.ordering.field", "timestamp");
+ configMapping.put(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, "hoodie_test");
+ configMapping.put("hoodie.insert.shuffle.parallelism", "4");
+ configMapping.put("hoodie.upsert.shuffle.parallelism", "4");
+ configMapping.put("hoodie.bulkinsert.shuffle.parallelism", "2");
+ configMapping.put("hoodie.delete.shuffle.parallelism", "1");
+ configMapping.put("hoodie.merge.small.file.group.candidates.limit", "0");
+ configMapping.put("hoodie.compact.inline", "false");
+ return configMapping;
+ }
+
private void validateOutputFromFileGroupReader(Configuration hadoopConf,
String tablePath,
String[] partitionPaths,
+ boolean containsBaseFile,
int expectedLogFileNum)
throws Exception {
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(hadoopConf).setBasePath(tablePath).build();
@@ -138,6 +170,7 @@ public abstract class TestHoodieFileGroupReaderBase<T> {
props.setProperty(PARTITION_FIELDS.key(),
metaClient.getTableConfig().getString(PARTITION_FIELDS));
}
String[] partitionValues = partitionPaths[0].isEmpty() ? new String[] {} :
new String[] {partitionPaths[0]};
+ assertEquals(containsBaseFile, fileSlice.getBaseFile().isPresent());
HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
getHoodieReaderContext(tablePath, partitionValues),
hadoopConf,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 495569b2ce8..4073d064dd5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -21,18 +21,15 @@ import kotlin.NotImplementedError
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
+import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation,
HoodieCDCFileGroupSplit}
+import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
-import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile,
HoodieRecord}
+import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile,
HoodieFileGroupId, HoodieLogFile, HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.read.HoodieFileGroupReader
-import org.apache.hudi.cdc.{CDCFileGroupIterator, CDCRelation,
HoodieCDCFileGroupSplit}
-import org.apache.hudi.common.config.TypedProperties
-import org.apache.hudi.common.model.HoodieFileGroupId
import org.apache.hudi.common.util.{Option => HOption}
-import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping,
- HoodieSparkUtils, HoodieTableSchema, HoodieTableState,
MergeOnReadSnapshotRelation, HoodiePartitionFileSliceMapping,
- SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{HoodieBaseRelation, HoodieFileIndex,
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping,
HoodieSparkUtils, HoodieTableSchema, HoodieTableState,
MergeOnReadSnapshotRelation, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
import
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -42,7 +39,6 @@ import
org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.SerializableConfiguration
-import org.apache.hudi.common.model.HoodieRecord.COMMIT_TIME_METADATA_FIELD_ORD
import scala.annotation.tailrec
import scala.collection.JavaConverters._
@@ -110,8 +106,21 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
case fileSliceMapping: HoodiePartitionFileSliceMapping =>
val filePath =
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
if (FSUtils.isLogFile(filePath)) {
- // TODO: Use FileGroupReader here: HUDI-6942.
- throw new NotImplementedError("Not support reading with only log
files")
+ val partitionValues = fileSliceMapping.getPartitionValues
+ val fileSlice =
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
+ buildFileGroupIterator(
+ Option.empty[PartitionedFile => Iterator[InternalRow]],
+ partitionValues,
+ Option.empty[HoodieBaseFile],
+ getLogFilesFromSlice(fileSlice),
+ requiredSchemaWithMandatory,
+ outputSchema,
+ partitionSchema,
+ broadcastedHadoopConf.value.value,
+ -1,
+ -1,
+ shouldUseRecordPosition
+ )
} else {
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName))
match {
case Some(fileSlice) =>
@@ -132,9 +141,9 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
+ "since it has no log or data files")
}
buildFileGroupIterator(
- preMergeBaseFileReader,
+ Option(preMergeBaseFileReader),
partitionValues,
- hoodieBaseFile,
+ Option(hoodieBaseFile),
logFiles,
requiredSchemaWithMandatory,
outputSchema,
@@ -180,9 +189,9 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
props)
}
- protected def buildFileGroupIterator(preMergeBaseFileReader: PartitionedFile
=> Iterator[InternalRow],
+ protected def buildFileGroupIterator(preMergeBaseFileReader:
Option[PartitionedFile => Iterator[InternalRow]],
partitionValues: InternalRow,
- baseFile: HoodieBaseFile,
+ baseFile: Option[HoodieBaseFile],
logFiles: List[HoodieLogFile],
requiredSchemaWithMandatory: StructType,
outputSchema: StructType,
@@ -200,7 +209,7 @@ class
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
hadoopConf,
tableState.tablePath,
tableState.latestCommitTimestamp.get,
- HOption.of(baseFile),
+ if (baseFile.nonEmpty) HOption.of(baseFile.get) else HOption.empty(),
HOption.of(logFiles.map(f => f.getPath.toString).asJava),
HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory,
tableName),
metaClient.getTableConfig.getProps,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index a1441be5ddc..087ae50ff02 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -21,6 +21,8 @@ package org.apache.hudi.common.table.read
import org.apache.avro.Schema
import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.DataSourceReadOptions.USE_NEW_HUDI_PARQUET_FILE_FORMAT
+import
org.apache.hudi.common.config.HoodieReaderConfig.FILE_GROUP_READER_ENABLED
import org.apache.hudi.common.engine.HoodieReaderContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
@@ -98,7 +100,7 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
}
val partitionValueRow = new
GenericInternalRow(partitionValuesEncoded.toArray[Any])
- new SparkFileFormatInternalRowReaderContext(recordReaderIterator,
partitionValueRow)
+ new SparkFileFormatInternalRowReaderContext(Option(recordReaderIterator),
partitionValueRow)
}
override def commitToTable(recordList: util.List[String], operation: String,
options: util.Map[String, String]): Unit = {
@@ -119,6 +121,8 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
schema: Schema,
fileGroupId: String): Unit = {
val expectedDf = spark.read.format("hudi")
+ .option(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key(), "false")
+ .option(FILE_GROUP_READER_ENABLED.key(), "false")
.load(basePath)
.where(col(HoodieRecord.FILENAME_METADATA_FIELD).contains(fileGroupId))
assertEquals(expectedDf.count, actualRecordList.size)