This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5d196fe6175 [HUDI-3639] Add Proper Incremental Records FIltering
support into Hudi's custom RDD (#8668)
5d196fe6175 is described below
commit 5d196fe61757987af29b38e1b5cf38d7ca001924
Author: cxzl25 <[email protected]>
AuthorDate: Tue Jul 4 09:25:38 2023 +0800
[HUDI-3639] Add Proper Incremental Records FIltering support into Hudi's
custom RDD (#8668)
* filter operator for incremental RDD
* remove the hard code conf 'spark.sql.parquet.enableVectorizedReader' in
relations
---------
Co-authored-by: Danny Chan <[email protected]>
---
.../scala/org/apache/hudi/HoodieBaseRelation.scala | 2 -
.../org/apache/hudi/HoodieMergeOnReadRDD.scala | 39 +++++++++++++++++-
.../hudi/MergeOnReadIncrementalRelation.scala | 28 +++++++------
.../apache/hudi/MergeOnReadSnapshotRelation.scala | 5 ---
.../functional/TestParquetColumnProjection.scala | 48 +++++++++++++++++++---
5 files changed, 95 insertions(+), 27 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index a9ddbfa4503..a67d4463bf5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -467,8 +467,6 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
def imbueConfigs(sqlContext: SQLContext): Unit = {
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown",
"true")
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled",
"true")
- // TODO(HUDI-3639) vectorized reader has to be disabled to make sure
MORIncrementalRelation is working properly
-
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
"false")
}
/**
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
index d7b60db4929..db538f110c9 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala
@@ -23,6 +23,8 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{BaseFileReader, projectReader}
import org.apache.hudi.HoodieMergeOnReadRDD.CONFIG_INSTANTIATION_LOCK
import org.apache.hudi.MergeOnReadSnapshotRelation.isProjectionCompatible
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import
org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes
import org.apache.spark.rdd.RDD
@@ -30,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.{Partition, SerializableWritable, SparkContext,
TaskContext}
import java.io.Closeable
+import java.util.function.Predicate
case class HoodieMergeOnReadPartition(index: Int, split:
HoodieMergeOnReadFileSplit) extends Partition
@@ -64,6 +67,9 @@ private[hudi] case class
HoodieMergeOnReadBaseFileReaders(fullSchemaReader: Base
* @param tableState table's state
* @param mergeType type of merge performed
* @param fileSplits target file-splits this RDD will be iterating over
+ * @param includeStartTime whether to include the commit with the commitTime
+ * @param startTimestamp start timestamp to filter records
+ * @param endTimestamp end timestamp to filter records
*/
class HoodieMergeOnReadRDD(@transient sc: SparkContext,
@transient config: Configuration,
@@ -72,7 +78,10 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
requiredSchema: HoodieTableSchema,
tableState: HoodieTableState,
mergeType: String,
- @transient fileSplits:
Seq[HoodieMergeOnReadFileSplit])
+ @transient fileSplits:
Seq[HoodieMergeOnReadFileSplit],
+ includeStartTime: Boolean = false,
+ startTimestamp: String = null,
+ endTimestamp: String = null)
extends RDD[InternalRow](sc, Nil) with HoodieUnsafeRDD {
protected val maxCompactionMemoryInBytes: Long =
getMaxCompactionMemoryInBytes(new JobConf(config))
@@ -116,7 +125,33 @@ class HoodieMergeOnReadRDD(@transient sc: SparkContext,
Option(TaskContext.get()).foreach(_.addTaskCompletionListener[Unit](_ =>
iter.asInstanceOf[Closeable].close()))
}
- iter
+ val commitTimeMetadataFieldIdx =
requiredSchema.structTypeSchema.fieldNames.indexOf(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
+ val needsFiltering = commitTimeMetadataFieldIdx >= 0 &&
!StringUtils.isNullOrEmpty(startTimestamp) &&
!StringUtils.isNullOrEmpty(endTimestamp)
+ if (needsFiltering) {
+ val filterT: Predicate[InternalRow] =
getCommitTimeFilter(includeStartTime, commitTimeMetadataFieldIdx)
+ iter.filter(filterT.test)
+ }
+ else {
+ iter
+ }
+ }
+
+ private def getCommitTimeFilter(includeStartTime: Boolean,
commitTimeMetadataFieldIdx: Int): Predicate[InternalRow] = {
+ if (includeStartTime) {
+ new Predicate[InternalRow] {
+ override def test(row: InternalRow): Boolean = {
+ val commitTime = row.getString(commitTimeMetadataFieldIdx)
+ commitTime >= startTimestamp && commitTime <= endTimestamp
+ }
+ }
+ } else {
+ new Predicate[InternalRow] {
+ override def test(row: InternalRow): Boolean = {
+ val commitTime = row.getString(commitTimeMetadataFieldIdx)
+ commitTime > startTimestamp && commitTime <= endTimestamp
+ }
+ }
+ }
}
private def pickBaseFileReader(): BaseFileReader = {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index 2bb67000753..a3163586ad4 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -54,11 +54,6 @@ case class MergeOnReadIncrementalRelation(override val
sqlContext: SQLContext,
override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
this.copy(prunedDataSchema = Some(prunedSchema))
- override def imbueConfigs(sqlContext: SQLContext): Unit = {
- super.imbueConfigs(sqlContext)
-
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
"false")
- }
-
override protected def timeline: HoodieTimeline = {
if (fullTableScan) {
handleHollowCommitIfNeeded(metaClient.getCommitsAndCompactionTimeline,
metaClient, hollowCommitHandling)
@@ -81,8 +76,6 @@ case class MergeOnReadIncrementalRelation(override val
sqlContext: SQLContext,
val optionalFilters = filters
val readers = createBaseFileReaders(tableSchema, requiredSchema,
requestedColumns, requiredFilters, optionalFilters)
- // TODO(HUDI-3639) implement incremental span record filtering w/in RDD to
make sure returned iterator is appropriately
- // filtered, since file-reader might not be capable to
perform filtering
new HoodieMergeOnReadRDD(
sqlContext.sparkContext,
config = jobConf,
@@ -91,7 +84,10 @@ case class MergeOnReadIncrementalRelation(override val
sqlContext: SQLContext,
requiredSchema = requiredSchema,
tableState = tableState,
mergeType = mergeType,
- fileSplits = fileSplits)
+ fileSplits = fileSplits,
+ includeStartTime = includeStartTime,
+ startTimestamp = startTs,
+ endTimestamp = endTs)
}
override protected def collectFileSplits(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
@@ -184,19 +180,25 @@ trait HoodieIncrementalRelationTrait extends
HoodieBaseRelation {
listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath),
commitsMetadata)
}
+ protected lazy val (includeStartTime, startTs) = if (startInstantArchived) {
+ (false, startTimestamp)
+ } else {
+ (true, includedCommits.head.getTimestamp)
+ }
+ protected lazy val endTs: String = if (endInstantArchived) endTimestamp else
includedCommits.last.getTimestamp
+
// Record filters making sure that only records w/in the requested bounds
are being fetched as part of the
// scan collected by this relation
protected lazy val incrementalSpanRecordFilters: Seq[Filter] = {
val isNotNullFilter = IsNotNull(HoodieRecord.COMMIT_TIME_METADATA_FIELD)
- val largerThanFilter = if (startInstantArchived) {
- GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTimestamp)
+ val largerThanFilter = if (includeStartTime) {
+ GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs)
} else {
- GreaterThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
includedCommits.head.getTimestamp)
+ GreaterThan(HoodieRecord.COMMIT_TIME_METADATA_FIELD, startTs)
}
- val lessThanFilter =
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD,
- if (endInstantArchived) endTimestamp else
includedCommits.last.getTimestamp)
+ val lessThanFilter =
LessThanOrEqual(HoodieRecord.COMMIT_TIME_METADATA_FIELD, endTs)
Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 46ce935c260..e8468f0a7a1 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -101,11 +101,6 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext:
SQLContext,
override def canPruneRelationSchema: Boolean =
super.canPruneRelationSchema && isProjectionCompatible(tableState)
- override def imbueConfigs(sqlContext: SQLContext): Unit = {
- super.imbueConfigs(sqlContext)
-
sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.enableVectorizedReader",
"true")
- }
-
protected override def composeRDD(fileSplits:
Seq[HoodieMergeOnReadFileSplit],
tableSchema: HoodieTableSchema,
requiredSchema: HoodieTableSchema,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index a62ef840ab9..ee1edbcccb2 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -25,16 +25,15 @@ import org.apache.hudi.common.model.{HoodieRecord,
OverwriteNonDefaultsWithLates
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.testutils.{HadoopMapRedUtils,
HoodieTestDataGenerator}
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieWriteConfig}
-import org.apache.hudi.keygen.NonpartitionedKeyGenerator
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import
org.apache.hudi.testutils.SparkClientFunctionalTestHarness.getSparkSqlConf
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
DefaultSource, HoodieBaseRelation, HoodieMergeOnReadRDD, HoodieSparkUtils,
HoodieUnsafeRDD}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
DefaultSource, HoodieBaseRelation, HoodieSparkUtils, HoodieUnsafeRDD}
import org.apache.parquet.hadoop.util.counters.BenchmarkCounter
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.{Dataset, HoodieUnsafeUtils, Row, SaveMode}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue, fail}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse,
assertTrue, fail}
import org.junit.jupiter.api.{Disabled, Tag, Test}
import scala.collection.JavaConverters._
@@ -330,6 +329,40 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
assertTrue(commitNum > 1)
}
+ @Test
+ def testMergeOnReadIncrementalRelationWithFilter(): Unit = {
+ val tablePath = s"$basePath/mor-with-logs-incr-filter"
+ val targetRecordsCount = 100
+
+ bootstrapMORTableWithDeltaLog(tablePath, targetRecordsCount,
defaultWriteOpts, populateMetaFields = true, inlineCompact = true)
+
+ val hoodieMetaClient =
HoodieTableMetaClient.builder().setConf(spark.sparkContext.hadoopConfiguration).setBasePath(tablePath).setLoadActiveTimelineOnLoad(true).build()
+ val completedCommits =
hoodieMetaClient.getCommitsAndCompactionTimeline.filterCompletedInstants()
+ val startUnarchivedCommitTs =
(completedCommits.nthInstant(1).get().getTimestamp.toLong - 1L).toString
+ val endUnarchivedCommitTs =
completedCommits.nthInstant(3).get().getTimestamp //commit
+
+ val readOpts = defaultWriteOpts ++ Map(
+ "path" -> tablePath,
+ DataSourceReadOptions.QUERY_TYPE.key ->
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL,
+ DataSourceReadOptions.BEGIN_INSTANTTIME.key -> startUnarchivedCommitTs,
+ DataSourceReadOptions.END_INSTANTTIME.key -> endUnarchivedCommitTs
+ )
+
+ val inputDf = spark.read.format("hudi")
+ .options(readOpts)
+ .load()
+ // Make sure the filter is not applied at the row group level
+ spark.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown",
"false")
+ try {
+ val rows =
inputDf.select("_hoodie_commit_time").distinct().sort("_hoodie_commit_time").collect()
+ assertTrue(rows.length == 2)
+ assertFalse(rows.exists(_.getString(0) < startUnarchivedCommitTs))
+ assertFalse(rows.exists(_.getString(0) > endUnarchivedCommitTs))
+ } finally {
+
spark.sessionState.conf.setConfString("spark.sql.parquet.filterPushdown",
"true")
+ }
+ }
+
// Test routine
private def runTest(tableState: TableState,
queryType: String,
@@ -441,7 +474,8 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
recordCount: Int,
opts: Map[String, String],
populateMetaFields: Boolean,
- dataGenOpt: Option[HoodieTestDataGenerator] =
None): (List[HoodieRecord[_]], Schema) = {
+ dataGenOpt: Option[HoodieTestDataGenerator] =
None,
+ inlineCompact: Boolean = false):
(List[HoodieRecord[_]], Schema) = {
val dataGen = dataGenOpt.getOrElse(new HoodieTestDataGenerator(0x12345))
// Step 1: Bootstrap table w/ N records (t/h bulk-insert)
@@ -455,10 +489,14 @@ class TestParquetColumnProjection extends
SparkClientFunctionalTestHarness with
// Step 2: Update M records out of those (t/h update)
val inputDF = toDataset(updatedRecords,
HoodieTestDataGenerator.AVRO_SCHEMA)
+ val compactScheduleInline = if (inlineCompact) "false" else "true"
+ val compactInline = if (inlineCompact) "true" else "false"
+
inputDF.write.format("org.apache.hudi")
.options(opts)
.option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
- .option(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key, "true")
+ .option(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key,
compactScheduleInline)
+ .option(HoodieCompactionConfig.INLINE_COMPACT.key, compactInline)
.option(DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key, "false")
.option(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key,
"3")
.option(HoodieTableConfig.POPULATE_META_FIELDS.key,
populateMetaFields.toString)