This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6ca7efa2a3a [HUDI-7007] Add bloom_filters index support on read side
(#11043)
6ca7efa2a3a is described below
commit 6ca7efa2a3a512a6a234110f3df00ecf0f991f0b
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Jun 1 17:12:10 2024 +0530
[HUDI-7007] Add bloom_filters index support on read side (#11043)
---
.../org/apache/hudi/BloomFiltersIndexSupport.scala | 90 +++++++
.../scala/org/apache/hudi/HoodieFileIndex.scala | 7 +-
.../org/apache/hudi/RecordLevelIndexSupport.scala | 49 +---
.../org/apache/hudi/SparkBaseIndexSupport.scala | 42 +++-
.../functional/TestBloomFiltersIndexSupport.scala | 258 +++++++++++++++++++++
5 files changed, 399 insertions(+), 47 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BloomFiltersIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BloomFiltersIndexSupport.scala
new file mode 100644
index 00000000000..4684a58966a
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BloomFiltersIndexSupport.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.metadata.HoodieTableMetadataUtil
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+class BloomFiltersIndexSupport(spark: SparkSession,
+ metadataConfig: HoodieMetadataConfig,
+ metaClient: HoodieTableMetaClient) extends
SparkBaseIndexSupport(spark, metadataConfig, metaClient) {
+
+ override def getIndexName: String = BloomFiltersIndexSupport.INDEX_NAME
+
+ override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+ queryFilters: Seq[Expression],
+ queryReferencedColumns: Seq[String],
+ prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+ shouldPushDownFilesFilter: Boolean
+ ): Option[Set[String]] = {
+ lazy val (_, recordKeys) = filterQueriesWithRecordKey(queryFilters)
+ if (recordKeys.nonEmpty) {
+ val prunedPartitionAndFileNames =
getPrunedPartitionAndFileNames(prunedPartitionsAndFileSlices)
+
Option.apply(getCandidateFilesForSecondaryKeys(prunedPartitionAndFileNames,
recordKeys))
+ } else {
+ Option.empty
+ }
+ }
+
+ private def getPrunedPartitionAndFileNames(prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])]):
Seq[(String, String)] = {
+ prunedPartitionsAndFileSlices
+ .flatMap {
+ case (Some(partitionPath), fileSlices) => fileSlices.map(fileSlice =>
(partitionPath.getPath, fileSlice))
+ case (None, fileSlices) => fileSlices.map(fileSlice => ("", fileSlice))
+ }
+ .flatMap { case (partitionPath, fileSlice) =>
+ val baseFileOption = Option(fileSlice.getBaseFile.orElse(null))
+ baseFileOption.map(baseFile => (partitionPath, baseFile.getFileName))
+ }
+ }
+
+ private def getCandidateFilesForSecondaryKeys(prunedPartitionAndFileNames:
Seq[(String, String)], recordKeys: List[String]): Set[String] = {
+ val candidateFiles = prunedPartitionAndFileNames.filter {
partitionAndFileName =>
+ val bloomFilterOpt =
toScalaOption(metadataTable.getBloomFilter(partitionAndFileName._1,
partitionAndFileName._2))
+ bloomFilterOpt match {
+ case Some(bloomFilter) =>
+ recordKeys.exists(bloomFilter.mightContain)
+ case None =>
+ true // If bloom filter is empty or undefined, assume the file might
contain the record key
+ }
+ }.map(_._2).toSet
+
+ candidateFiles
+ }
+
+ override def isIndexAvailable: Boolean = {
+ metadataConfig.isEnabled &&
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)
+ }
+
+ override def invalidateCaches(): Unit = {
+ // no caches for this index type, do nothing
+ }
+
+}
+
+
+object BloomFiltersIndexSupport {
+ val INDEX_NAME = "BLOOM_FILTERS"
+}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 416a7a95832..ebf0cb8992e 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -105,6 +105,7 @@ case class HoodieFileIndex(spark: SparkSession,
new BucketIndexSupport(spark, metadataConfig, metaClient),
new PartitionStatsIndexSupport(spark, schema, metadataConfig, metaClient),
new FunctionalIndexSupport(spark, metadataConfig, metaClient),
+ new BloomFiltersIndexSupport(spark, metadataConfig, metaClient),
new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
)
@@ -407,6 +408,9 @@ case class HoodieFileIndex(spark: SparkSession,
private def isPartitionStatsIndexEnabled: Boolean =
indicesSupport.exists(idx =>
idx.getIndexName == PartitionStatsIndexSupport.INDEX_NAME &&
idx.isIndexAvailable)
+ private def isBloomFiltersIndexEnabled: Boolean = indicesSupport.exists(idx
=>
+ idx.getIndexName == BloomFiltersIndexSupport.INDEX_NAME &&
idx.isIndexAvailable)
+
private def isIndexEnabled: Boolean = indicesSupport.exists(idx =>
idx.isIndexAvailable)
private def validateConfig(): Unit = {
@@ -414,7 +418,8 @@ case class HoodieFileIndex(spark: SparkSession,
logWarning("Data skipping requires both Metadata Table and at least one
of Column Stats Index, Record Level Index, or Functional Index" +
" to be enabled as well! " + s"(isMetadataTableEnabled =
$isMetadataTableEnabled, isColumnStatsIndexEnabled = $isColumnStatsIndexEnabled"
+ s", isRecordIndexApplicable = $isRecordIndexEnabled,
isFunctionalIndexEnabled = $isFunctionalIndexEnabled, " +
- s"isBucketIndexEnable = $isBucketIndexEnabled,
isPartitionStatsIndexEnabled = $isPartitionStatsIndexEnabled)")
+ s"isBucketIndexEnable = $isBucketIndexEnabled,
isPartitionStatsIndexEnabled = $isPartitionStatsIndexEnabled)"
+ + s", isBloomFiltersIndexEnabled = $isBloomFiltersIndexEnabled)")
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index ee8c6f9c86e..a0e51e410e4 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -24,15 +24,14 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.metadata.HoodieTableMetadataUtil
import org.apache.hudi.storage.StoragePath
-import org.apache.hudi.util.JFunction
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, In, Literal}
import scala.collection.{JavaConverters, mutable}
class RecordLevelIndexSupport (spark: SparkSession,
- metadataConfig: HoodieMetadataConfig,
- metaClient: HoodieTableMetaClient)
+ metadataConfig: HoodieMetadataConfig,
+ metaClient: HoodieTableMetaClient)
extends SparkBaseIndexSupport (spark, metadataConfig, metaClient) {
@@ -47,7 +46,7 @@ class RecordLevelIndexSupport (spark: SparkSession,
lazy val (_, recordKeys) = filterQueriesWithRecordKey(queryFilters)
val allFiles = fileIndex.inputFiles.map(strPath => new
StoragePath(strPath)).toSeq
if (recordKeys.nonEmpty) {
- Option.apply(getCandidateFiles(allFiles, recordKeys))
+ Option.apply(getCandidateFilesForRecordKeys(allFiles, recordKeys))
} else {
Option.empty
}
@@ -64,7 +63,7 @@ class RecordLevelIndexSupport (spark: SparkSession,
* @param recordKeys - List of record keys.
* @return Sequence of file names which need to be queried
*/
- def getCandidateFiles(allFiles: Seq[StoragePath], recordKeys: List[String]):
Set[String] = {
+ private def getCandidateFilesForRecordKeys(allFiles: Seq[StoragePath],
recordKeys: List[String]): Set[String] = {
val recordKeyLocationsMap =
metadataTable.readRecordIndex(JavaConverters.seqAsJavaListConverter(recordKeys).asJava)
val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
val candidateFiles: mutable.Set[String] = mutable.Set.empty
@@ -83,46 +82,6 @@ class RecordLevelIndexSupport (spark: SparkSession,
candidateFiles.toSet
}
- /**
- * Returns the configured record key for the table if it is a simple record
key else returns empty option.
- */
- private def getRecordKeyConfig: Option[String] = {
- val recordKeysOpt: org.apache.hudi.common.util.Option[Array[String]] =
metaClient.getTableConfig.getRecordKeyFields
- val recordKeyOpt =
recordKeysOpt.map[String](JFunction.toJavaFunction[Array[String], String](arr =>
- if (arr.length == 1) {
- arr(0)
- } else {
- null
- }))
- Option.apply(recordKeyOpt.orElse(null))
- }
-
- /**
- * Given query filters, it filters the EqualTo and IN queries on simple
record key columns and returns a tuple of
- * list of such queries and list of record key literals present in the query.
- * If record index is not available, it returns empty list for record
filters and record keys
- * @param queryFilters The queries that need to be filtered.
- * @return Tuple of List of filtered queries and list of record key literals
that need to be matched
- */
- private def filterQueriesWithRecordKey(queryFilters: Seq[Expression]):
(List[Expression], List[String]) = {
- if (!isIndexAvailable) {
- (List.empty, List.empty)
- } else {
- var recordKeyQueries: List[Expression] = List.empty
- var recordKeys: List[String] = List.empty
- for (query <- queryFilters) {
- val recordKeyOpt = getRecordKeyConfig
- RecordLevelIndexSupport.filterQueryWithRecordKey(query,
recordKeyOpt).foreach({
- case (exp: Expression, recKeys: List[String]) =>
- recordKeys = recordKeys ++ recKeys
- recordKeyQueries = recordKeyQueries :+ exp
- })
- }
-
- Tuple2.apply(recordKeyQueries, recordKeys)
- }
- }
-
/**
* Return true if metadata table is enabled and record index metadata
partition is available.
*/
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
index 2614e4bb37b..12d00132311 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
-
+import org.apache.hudi.util.JFunction
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.catalyst.expressions.{And, Expression}
import
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
@@ -106,4 +106,44 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
}
}
+ /**
+ * Given query filters, it filters the EqualTo and IN queries on simple
record key columns and returns a tuple of
+ * list of such queries and list of record key literals present in the query.
+ * If record index is not available, it returns empty list for record
filters and record keys
+ * @param queryFilters The queries that need to be filtered.
+ * @return Tuple of List of filtered queries and list of record key literals
that need to be matched
+ */
+ protected def filterQueriesWithRecordKey(queryFilters: Seq[Expression]):
(List[Expression], List[String]) = {
+ if (!isIndexAvailable) {
+ (List.empty, List.empty)
+ } else {
+ var recordKeyQueries: List[Expression] = List.empty
+ var recordKeys: List[String] = List.empty
+ for (query <- queryFilters) {
+ val recordKeyOpt = getRecordKeyConfig
+ RecordLevelIndexSupport.filterQueryWithRecordKey(query,
recordKeyOpt).foreach({
+ case (exp: Expression, recKeys: List[String]) =>
+ recordKeys = recordKeys ++ recKeys
+ recordKeyQueries = recordKeyQueries :+ exp
+ })
+ }
+
+ Tuple2.apply(recordKeyQueries, recordKeys)
+ }
+ }
+
+ /**
+ * Returns the configured record key for the table if it is a simple record
key else returns empty option.
+ */
+ private def getRecordKeyConfig: Option[String] = {
+ val recordKeysOpt: org.apache.hudi.common.util.Option[Array[String]] =
metaClient.getTableConfig.getRecordKeyFields
+ val recordKeyOpt =
recordKeysOpt.map[String](JFunction.toJavaFunction[Array[String], String](arr =>
+ if (arr.length == 1) {
+ arr(0)
+ } else {
+ null
+ }))
+ Option.apply(recordKeyOpt.orElse(null))
+ }
+
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBloomFiltersIndexSupport.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBloomFiltersIndexSupport.scala
new file mode 100644
index 00000000000..cc46f40523d
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBloomFiltersIndexSupport.scala
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.util.{JFunction, JavaConversions}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions,
HoodieFileIndex}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, Literal}
+import org.apache.spark.sql.functions.{col, not}
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.stream.Collectors
+import scala.collection.JavaConverters._
+import scala.collection.{JavaConverters, mutable}
+
+class TestBloomFiltersIndexSupport extends HoodieSparkClientTestBase {
+
+ val sqlTempTable = "hudi_tbl_bloom"
+ var spark: SparkSession = _
+ var instantTime: AtomicInteger = _
+ val metadataOpts: Map[String, String] = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true",
+ HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key -> "true",
+ HoodieMetadataConfig.BLOOM_FILTER_INDEX_FOR_COLUMNS.key -> "_row_key"
+ )
+ val commonOpts: Map[String, String] = Map(
+ "hoodie.insert.shuffle.parallelism" -> "4",
+ "hoodie.upsert.shuffle.parallelism" -> "4",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ RECORDKEY_FIELD.key -> "_row_key",
+ PARTITIONPATH_FIELD.key -> "partition",
+ PRECOMBINE_FIELD.key -> "timestamp",
+ HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+ ) ++ metadataOpts
+ var mergedDfList: List[DataFrame] = List.empty
+
+ @BeforeEach
+ override def setUp(): Unit = {
+ initPath()
+ initSparkContexts()
+ initHoodieStorage()
+ initTestDataGenerator()
+
+ setTableName("hoodie_test")
+ initMetaClient()
+
+ instantTime = new AtomicInteger(1)
+
+ spark = sqlContext.sparkSession
+ }
+
+ @AfterEach
+ override def tearDown(): Unit = {
+ cleanupFileSystem()
+ cleanupSparkContexts()
+ }
+
+ @ParameterizedTest
+ @EnumSource(classOf[HoodieTableType])
+ def testIndexInitialization(tableType: HoodieTableType): Unit = {
+ val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key ->
tableType.name())
+ doWriteAndValidateBloomFilters(
+ hudiOpts,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite)
+ }
+
+ /**
+ * Test case to do a write with updates and then validate file pruning using
bloom filters.
+ */
+ @Test
+ def testBloomFiltersIndexFilePruning(): Unit = {
+ var hudiOpts = commonOpts
+ hudiOpts = hudiOpts + (
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+
+ doWriteAndValidateBloomFilters(
+ hudiOpts,
+ operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Overwrite,
+ shouldValidate = false)
+ doWriteAndValidateBloomFilters(
+ hudiOpts,
+ operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+ saveMode = SaveMode.Append)
+
+ createTempTable(hudiOpts)
+ verifyQueryPredicate(hudiOpts, "_row_key")
+ }
+
+ private def createTempTable(hudiOpts: Map[String, String]): Unit = {
+ val readDf = spark.read.format("hudi").options(hudiOpts).load(basePath)
+ readDf.createOrReplaceTempView(sqlTempTable)
+ }
+
+ def verifyQueryPredicate(hudiOpts: Map[String, String], columnName: String):
Unit = {
+ val reckey = mergedDfList.last.limit(1).collect().map(row =>
row.getAs(columnName).toString)
+ val dataFilter = EqualTo(attribute(columnName), Literal(reckey(0)))
+ verifyFilePruning(hudiOpts, dataFilter)
+ }
+
+ private def attribute(partition: String): AttributeReference = {
+ AttributeReference(partition, StringType, true)()
+ }
+
+
+ private def verifyFilePruning(opts: Map[String, String], dataFilter:
Expression): Unit = {
+ // with data skipping
+ val commonOpts = opts + ("path" -> basePath)
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts,
includeLogFiles = true)
+ val filteredPartitionDirectories = fileIndex.listFiles(Seq(),
Seq(dataFilter))
+ val filteredFilesCount = filteredPartitionDirectories.flatMap(s =>
s.files).size
+ assertTrue(filteredFilesCount < getLatestDataFilesCount(opts))
+
+ // with no data skipping
+ fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts +
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles =
true)
+ val filesCountWithNoSkipping = fileIndex.listFiles(Seq(),
Seq(dataFilter)).flatMap(s => s.files).size
+ assertTrue(filesCountWithNoSkipping == getLatestDataFilesCount(opts))
+ }
+
+ private def getLatestDataFilesCount(opts: Map[String, String],
includeLogFiles: Boolean = true) = {
+ var totalLatestDataFiles = 0L
+
getTableFileSystemView(opts).getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
+ .values()
+ .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
+ (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
+ slice => totalLatestDataFiles += (if (includeLogFiles)
slice.getLogFiles.count() else 0)
+ + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+ totalLatestDataFiles
+ }
+
+ private def getTableFileSystemView(opts: Map[String, String]):
HoodieMetadataFileSystemView = {
+ new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline,
metadataWriter(getWriteConfig(opts)).getTableMetadata)
+ }
+
+ private def doWriteAndValidateBloomFilters(hudiOpts: Map[String, String],
+ operation: String,
+ saveMode: SaveMode,
+ shouldValidate: Boolean = true):
Unit = {
+ var latestBatch: mutable.Buffer[String] = null
+ if (operation == UPSERT_OPERATION_OPT_VAL) {
+ val instantTime = getInstantTime()
+ val records =
recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 1))
+ records.addAll(recordsToStrings(dataGen.generateInserts(instantTime, 1)))
+ latestBatch = records.asScala
+ } else if (operation == INSERT_OVERWRITE_OPERATION_OPT_VAL) {
+ latestBatch = recordsToStrings(dataGen.generateInsertsForPartition(
+ getInstantTime(), 5, dataGen.getPartitionPaths.last)).asScala
+ } else {
+ latestBatch = recordsToStrings(dataGen.generateInserts(getInstantTime(),
5)).asScala
+ }
+ val latestBatchDf =
spark.read.json(spark.sparkContext.parallelize(latestBatch.toSeq, 2))
+ latestBatchDf.cache()
+ latestBatchDf.write.format("org.apache.hudi")
+ .options(hudiOpts)
+ .option(DataSourceWriteOptions.OPERATION.key, operation)
+ .mode(saveMode)
+ .save(basePath)
+
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ calculateMergedDf(latestBatchDf, operation)
+ if (shouldValidate) {
+ validateBloomFiltersIndex(hudiOpts)
+ }
+ }
+
+ private def getInstantTime(): String = {
+ String.format("%03d", new Integer(instantTime.incrementAndGet()))
+ }
+
+ private def validateBloomFiltersIndex(hudiOpts: Map[String, String]): Unit =
{
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ val writeConfig = getWriteConfig(hudiOpts)
+ val metadata = metadataWriter(writeConfig).getTableMetadata
+ val readDf = spark.read.format("hudi").load(basePath)
+ val rowArr = readDf.collect()
+
+ assertTrue(rowArr.length > 0)
+
+ for (row <- rowArr) {
+ val recordKey: String = row.getAs("_hoodie_record_key")
+ val partitionPath: String = row.getAs("_hoodie_partition_path")
+ val fileName: String = row.getAs("_hoodie_file_name")
+ val bloomFilter = metadata.getBloomFilter(partitionPath, fileName)
+ assertTrue(bloomFilter.isPresent, "BloomFilter should be present for " +
fileName)
+ assertTrue(bloomFilter.get().mightContain(recordKey))
+ }
+ }
+
+ private def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig
= {
+ val props =
TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(hudiOpts).asJava)
+ HoodieWriteConfig.newBuilder()
+ .withProps(props)
+ .withPath(basePath)
+ .build()
+ }
+
+ private def calculateMergedDf(latestBatchDf: DataFrame, operation: String):
DataFrame = {
+ val prevDfOpt = mergedDfList.lastOption
+ if (prevDfOpt.isEmpty) {
+ mergedDfList = mergedDfList :+ latestBatchDf
+ sparkSession.emptyDataFrame
+ } else {
+ if (operation == INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL) {
+ mergedDfList = mergedDfList :+ latestBatchDf
+ prevDfOpt.get
+ } else if (operation == INSERT_OVERWRITE_OPERATION_OPT_VAL) {
+ val overwrittenPartitions = latestBatchDf.select("partition")
+
.collectAsList().stream().map[String](JavaConversions.getFunction[Row,
String](r => r.getString(0))).collect(Collectors.toList[String])
+ val prevDf = prevDfOpt.get
+ val latestSnapshot = prevDf
+ .filter(not(col("partition").isInCollection(overwrittenPartitions)))
+ .union(latestBatchDf)
+ mergedDfList = mergedDfList :+ latestSnapshot
+ prevDf.filter(col("partition").isInCollection(overwrittenPartitions))
+ } else {
+ val prevDf = prevDfOpt.get
+ val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") ===
latestBatchDf("_row_key")
+ && prevDf("partition") === latestBatchDf("partition") &&
prevDf("trip_type") === latestBatchDf("trip_type"), "leftanti")
+ val latestSnapshot = prevDfOld.union(latestBatchDf)
+ mergedDfList = mergedDfList :+ latestSnapshot
+ sparkSession.emptyDataFrame
+ }
+ }
+ }
+
+}