This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ca7efa2a3a [HUDI-7007] Add bloom_filters index support on read side 
(#11043)
6ca7efa2a3a is described below

commit 6ca7efa2a3a512a6a234110f3df00ecf0f991f0b
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Jun 1 17:12:10 2024 +0530

    [HUDI-7007] Add bloom_filters index support on read side (#11043)
---
 .../org/apache/hudi/BloomFiltersIndexSupport.scala |  90 +++++++
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |   7 +-
 .../org/apache/hudi/RecordLevelIndexSupport.scala  |  49 +---
 .../org/apache/hudi/SparkBaseIndexSupport.scala    |  42 +++-
 .../functional/TestBloomFiltersIndexSupport.scala  | 258 +++++++++++++++++++++
 5 files changed, 399 insertions(+), 47 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BloomFiltersIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BloomFiltersIndexSupport.scala
new file mode 100644
index 00000000000..4684a58966a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BloomFiltersIndexSupport.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hudi.HoodieConversionUtils.toScalaOption
+import org.apache.hudi.common.config.HoodieMetadataConfig
+import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.metadata.HoodieTableMetadataUtil
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+class BloomFiltersIndexSupport(spark: SparkSession,
+                               metadataConfig: HoodieMetadataConfig,
+                               metaClient: HoodieTableMetaClient) extends 
SparkBaseIndexSupport(spark, metadataConfig, metaClient) {
+
+  override def getIndexName: String = BloomFiltersIndexSupport.INDEX_NAME
+
+  override def computeCandidateFileNames(fileIndex: HoodieFileIndex,
+                                         queryFilters: Seq[Expression],
+                                         queryReferencedColumns: Seq[String],
+                                         prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
+                                         shouldPushDownFilesFilter: Boolean
+                                        ): Option[Set[String]] = {
+    lazy val (_, recordKeys) = filterQueriesWithRecordKey(queryFilters)
+    if (recordKeys.nonEmpty) {
+      val prunedPartitionAndFileNames = 
getPrunedPartitionAndFileNames(prunedPartitionsAndFileSlices)
+      
Option.apply(getCandidateFilesForSecondaryKeys(prunedPartitionAndFileNames, 
recordKeys))
+    } else {
+      Option.empty
+    }
+  }
+
+  private def getPrunedPartitionAndFileNames(prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])]): 
Seq[(String, String)] = {
+    prunedPartitionsAndFileSlices
+      .flatMap {
+        case (Some(partitionPath), fileSlices) => fileSlices.map(fileSlice => 
(partitionPath.getPath, fileSlice))
+        case (None, fileSlices) => fileSlices.map(fileSlice => ("", fileSlice))
+      }
+      .flatMap { case (partitionPath, fileSlice) =>
+        val baseFileOption = Option(fileSlice.getBaseFile.orElse(null))
+        baseFileOption.map(baseFile => (partitionPath, baseFile.getFileName))
+      }
+  }
+
+  private def getCandidateFilesForSecondaryKeys(prunedPartitionAndFileNames: 
Seq[(String, String)], recordKeys: List[String]): Set[String] = {
+    val candidateFiles = prunedPartitionAndFileNames.filter { 
partitionAndFileName =>
+      val bloomFilterOpt = 
toScalaOption(metadataTable.getBloomFilter(partitionAndFileName._1, 
partitionAndFileName._2))
+      bloomFilterOpt match {
+        case Some(bloomFilter) =>
+          recordKeys.exists(bloomFilter.mightContain)
+        case None =>
+          true // If bloom filter is empty or undefined, assume the file might 
contain the record key
+      }
+    }.map(_._2).toSet
+
+    candidateFiles
+  }
+
+  override def isIndexAvailable: Boolean = {
+    metadataConfig.isEnabled && 
metaClient.getTableConfig.getMetadataPartitions.contains(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS)
+  }
+
+  override def invalidateCaches(): Unit = {
+    // no caches for this index type, do nothing
+  }
+
+}
+
+
+object BloomFiltersIndexSupport {
+  val INDEX_NAME = "BLOOM_FILTERS"
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 416a7a95832..ebf0cb8992e 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -105,6 +105,7 @@ case class HoodieFileIndex(spark: SparkSession,
     new BucketIndexSupport(spark, metadataConfig, metaClient),
     new PartitionStatsIndexSupport(spark, schema, metadataConfig, metaClient),
     new FunctionalIndexSupport(spark, metadataConfig, metaClient),
+    new BloomFiltersIndexSupport(spark, metadataConfig, metaClient),
     new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
   )
 
@@ -407,6 +408,9 @@ case class HoodieFileIndex(spark: SparkSession,
   private def isPartitionStatsIndexEnabled: Boolean = 
indicesSupport.exists(idx =>
     idx.getIndexName == PartitionStatsIndexSupport.INDEX_NAME && 
idx.isIndexAvailable)
 
+  private def isBloomFiltersIndexEnabled: Boolean = indicesSupport.exists(idx 
=>
+    idx.getIndexName == BloomFiltersIndexSupport.INDEX_NAME && 
idx.isIndexAvailable)
+
   private def isIndexEnabled: Boolean = indicesSupport.exists(idx => 
idx.isIndexAvailable)
 
   private def validateConfig(): Unit = {
@@ -414,7 +418,8 @@ case class HoodieFileIndex(spark: SparkSession,
       logWarning("Data skipping requires both Metadata Table and at least one 
of Column Stats Index, Record Level Index, or Functional Index" +
         " to be enabled as well! " + s"(isMetadataTableEnabled = 
$isMetadataTableEnabled, isColumnStatsIndexEnabled = $isColumnStatsIndexEnabled"
         + s", isRecordIndexApplicable = $isRecordIndexEnabled, 
isFunctionalIndexEnabled = $isFunctionalIndexEnabled, " +
-        s"isBucketIndexEnable = $isBucketIndexEnabled, 
isPartitionStatsIndexEnabled = $isPartitionStatsIndexEnabled)")
+        s"isBucketIndexEnable = $isBucketIndexEnabled, 
isPartitionStatsIndexEnabled = $isPartitionStatsIndexEnabled)"
+        + s", isBloomFiltersIndexEnabled = $isBloomFiltersIndexEnabled)")
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index ee8c6f9c86e..a0e51e410e4 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -24,15 +24,14 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.metadata.HoodieTableMetadataUtil
 import org.apache.hudi.storage.StoragePath
-import org.apache.hudi.util.JFunction
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, In, Literal}
 
 import scala.collection.{JavaConverters, mutable}
 
 class RecordLevelIndexSupport (spark: SparkSession,
-                              metadataConfig: HoodieMetadataConfig,
-                              metaClient: HoodieTableMetaClient)
+                               metadataConfig: HoodieMetadataConfig,
+                               metaClient: HoodieTableMetaClient)
   extends SparkBaseIndexSupport (spark, metadataConfig, metaClient) {
 
 
@@ -47,7 +46,7 @@ class RecordLevelIndexSupport (spark: SparkSession,
     lazy val (_, recordKeys) = filterQueriesWithRecordKey(queryFilters)
     val allFiles = fileIndex.inputFiles.map(strPath => new 
StoragePath(strPath)).toSeq
     if (recordKeys.nonEmpty) {
-      Option.apply(getCandidateFiles(allFiles, recordKeys))
+      Option.apply(getCandidateFilesForRecordKeys(allFiles, recordKeys))
     } else {
       Option.empty
     }
@@ -64,7 +63,7 @@ class RecordLevelIndexSupport (spark: SparkSession,
    * @param recordKeys - List of record keys.
    * @return Sequence of file names which need to be queried
    */
-  def getCandidateFiles(allFiles: Seq[StoragePath], recordKeys: List[String]): 
Set[String] = {
+  private def getCandidateFilesForRecordKeys(allFiles: Seq[StoragePath], 
recordKeys: List[String]): Set[String] = {
     val recordKeyLocationsMap = 
metadataTable.readRecordIndex(JavaConverters.seqAsJavaListConverter(recordKeys).asJava)
     val fileIdToPartitionMap: mutable.Map[String, String] = mutable.Map.empty
     val candidateFiles: mutable.Set[String] = mutable.Set.empty
@@ -83,46 +82,6 @@ class RecordLevelIndexSupport (spark: SparkSession,
     candidateFiles.toSet
   }
 
-  /**
-   * Returns the configured record key for the table if it is a simple record 
key else returns empty option.
-   */
-  private def getRecordKeyConfig: Option[String] = {
-    val recordKeysOpt: org.apache.hudi.common.util.Option[Array[String]] = 
metaClient.getTableConfig.getRecordKeyFields
-    val recordKeyOpt = 
recordKeysOpt.map[String](JFunction.toJavaFunction[Array[String], String](arr =>
-      if (arr.length == 1) {
-        arr(0)
-      } else {
-        null
-      }))
-    Option.apply(recordKeyOpt.orElse(null))
-  }
-
-  /**
-   * Given query filters, it filters the EqualTo and IN queries on simple 
record key columns and returns a tuple of
-   * list of such queries and list of record key literals present in the query.
-   * If record index is not available, it returns empty list for record 
filters and record keys
-   * @param queryFilters The queries that need to be filtered.
-   * @return Tuple of List of filtered queries and list of record key literals 
that need to be matched
-   */
-  private def filterQueriesWithRecordKey(queryFilters: Seq[Expression]): 
(List[Expression], List[String]) = {
-    if (!isIndexAvailable) {
-      (List.empty, List.empty)
-    } else {
-      var recordKeyQueries: List[Expression] = List.empty
-      var recordKeys: List[String] = List.empty
-      for (query <- queryFilters) {
-        val recordKeyOpt = getRecordKeyConfig
-        RecordLevelIndexSupport.filterQueryWithRecordKey(query, 
recordKeyOpt).foreach({
-          case (exp: Expression, recKeys: List[String]) =>
-            recordKeys = recordKeys ++ recKeys
-            recordKeyQueries = recordKeyQueries :+ exp
-        })
-      }
-
-      Tuple2.apply(recordKeyQueries, recordKeys)
-    }
-  }
-
   /**
    * Return true if metadata table is enabled and record index metadata 
partition is available.
    */
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
index 2614e4bb37b..12d00132311 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
@@ -23,7 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.model.FileSlice
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.metadata.{HoodieMetadataPayload, HoodieTableMetadata}
-
+import org.apache.hudi.util.JFunction
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql.catalyst.expressions.{And, Expression}
 import 
org.apache.spark.sql.hudi.DataSkippingUtils.translateIntoColumnStatsIndexFilterExpr
@@ -106,4 +106,44 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
     }
   }
 
+  /**
+   * Given query filters, it filters the EqualTo and IN queries on simple 
record key columns and returns a tuple of
+   * list of such queries and list of record key literals present in the query.
+   * If record index is not available, it returns empty list for record 
filters and record keys
+   * @param queryFilters The queries that need to be filtered.
+   * @return Tuple of List of filtered queries and list of record key literals 
that need to be matched
+   */
+  protected def filterQueriesWithRecordKey(queryFilters: Seq[Expression]): 
(List[Expression], List[String]) = {
+    if (!isIndexAvailable) {
+      (List.empty, List.empty)
+    } else {
+      var recordKeyQueries: List[Expression] = List.empty
+      var recordKeys: List[String] = List.empty
+      for (query <- queryFilters) {
+        val recordKeyOpt = getRecordKeyConfig
+        RecordLevelIndexSupport.filterQueryWithRecordKey(query, 
recordKeyOpt).foreach({
+          case (exp: Expression, recKeys: List[String]) =>
+            recordKeys = recordKeys ++ recKeys
+            recordKeyQueries = recordKeyQueries :+ exp
+        })
+      }
+
+      Tuple2.apply(recordKeyQueries, recordKeys)
+    }
+  }
+
+  /**
+   * Returns the configured record key for the table if it is a simple record 
key else returns empty option.
+   */
+  private def getRecordKeyConfig: Option[String] = {
+    val recordKeysOpt: org.apache.hudi.common.util.Option[Array[String]] = 
metaClient.getTableConfig.getRecordKeyFields
+    val recordKeyOpt = 
recordKeysOpt.map[String](JFunction.toJavaFunction[Array[String], String](arr =>
+      if (arr.length == 1) {
+        arr(0)
+      } else {
+        null
+      }))
+    Option.apply(recordKeyOpt.orElse(null))
+  }
+
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBloomFiltersIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBloomFiltersIndexSupport.scala
new file mode 100644
index 00000000000..cc46f40523d
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBloomFiltersIndexSupport.scala
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.DataSourceWriteOptions._
+import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
+import org.apache.hudi.common.model.{FileSlice, HoodieTableType}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.metadata.HoodieMetadataFileSystemView
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.util.{JFunction, JavaConversions}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieFileIndex}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
+import org.apache.spark.sql.functions.{col, not}
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.EnumSource
+
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.stream.Collectors
+import scala.collection.JavaConverters._
+import scala.collection.{JavaConverters, mutable}
+
+class TestBloomFiltersIndexSupport extends HoodieSparkClientTestBase {
+
+  val sqlTempTable = "hudi_tbl_bloom"
+  var spark: SparkSession = _
+  var instantTime: AtomicInteger = _
+  val metadataOpts: Map[String, String] = Map(
+    HoodieMetadataConfig.ENABLE.key -> "true",
+    HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key -> "true",
+    HoodieMetadataConfig.BLOOM_FILTER_INDEX_FOR_COLUMNS.key -> "_row_key"
+  )
+  val commonOpts: Map[String, String] = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+    RECORDKEY_FIELD.key -> "_row_key",
+    PARTITIONPATH_FIELD.key -> "partition",
+    PRECOMBINE_FIELD.key -> "timestamp",
+    HoodieTableConfig.POPULATE_META_FIELDS.key -> "true"
+  ) ++ metadataOpts
+  var mergedDfList: List[DataFrame] = List.empty
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    initPath()
+    initSparkContexts()
+    initHoodieStorage()
+    initTestDataGenerator()
+
+    setTableName("hoodie_test")
+    initMetaClient()
+
+    instantTime = new AtomicInteger(1)
+
+    spark = sqlContext.sparkSession
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    cleanupFileSystem()
+    cleanupSparkContexts()
+  }
+
+  @ParameterizedTest
+  @EnumSource(classOf[HoodieTableType])
+  def testIndexInitialization(tableType: HoodieTableType): Unit = {
+    val hudiOpts = commonOpts + (DataSourceWriteOptions.TABLE_TYPE.key -> 
tableType.name())
+    doWriteAndValidateBloomFilters(
+      hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite)
+  }
+
+  /**
+   * Test case to do a write with updates and then validate file pruning using 
bloom filters.
+   */
+  @Test
+  def testBloomFiltersIndexFilePruning(): Unit = {
+    var hudiOpts = commonOpts
+    hudiOpts = hudiOpts + (
+      DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+
+    doWriteAndValidateBloomFilters(
+      hudiOpts,
+      operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Overwrite,
+      shouldValidate = false)
+    doWriteAndValidateBloomFilters(
+      hudiOpts,
+      operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
+      saveMode = SaveMode.Append)
+
+    createTempTable(hudiOpts)
+    verifyQueryPredicate(hudiOpts, "_row_key")
+  }
+
+  private def createTempTable(hudiOpts: Map[String, String]): Unit = {
+    val readDf = spark.read.format("hudi").options(hudiOpts).load(basePath)
+    readDf.createOrReplaceTempView(sqlTempTable)
+  }
+
+  def verifyQueryPredicate(hudiOpts: Map[String, String], columnName: String): 
Unit = {
+    val reckey = mergedDfList.last.limit(1).collect().map(row => 
row.getAs(columnName).toString)
+    val dataFilter = EqualTo(attribute(columnName), Literal(reckey(0)))
+    verifyFilePruning(hudiOpts, dataFilter)
+  }
+
+  private def attribute(partition: String): AttributeReference = {
+    AttributeReference(partition, StringType, true)()
+  }
+
+
+  private def verifyFilePruning(opts: Map[String, String], dataFilter: 
Expression): Unit = {
+    // with data skipping
+    val commonOpts = opts + ("path" -> basePath)
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts, 
includeLogFiles = true)
+    val filteredPartitionDirectories = fileIndex.listFiles(Seq(), 
Seq(dataFilter))
+    val filteredFilesCount = filteredPartitionDirectories.flatMap(s => 
s.files).size
+    assertTrue(filteredFilesCount < getLatestDataFilesCount(opts))
+
+    // with no data skipping
+    fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts + 
(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "false"), includeLogFiles = 
true)
+    val filesCountWithNoSkipping = fileIndex.listFiles(Seq(), 
Seq(dataFilter)).flatMap(s => s.files).size
+    assertTrue(filesCountWithNoSkipping == getLatestDataFilesCount(opts))
+  }
+
+  private def getLatestDataFilesCount(opts: Map[String, String], 
includeLogFiles: Boolean = true) = {
+    var totalLatestDataFiles = 0L
+    
getTableFileSystemView(opts).getAllLatestFileSlicesBeforeOrOn(metaClient.getActiveTimeline.lastInstant().get().getTimestamp)
+      .values()
+      .forEach(JFunction.toJavaConsumer[java.util.stream.Stream[FileSlice]]
+        (slices => slices.forEach(JFunction.toJavaConsumer[FileSlice](
+          slice => totalLatestDataFiles += (if (includeLogFiles) 
slice.getLogFiles.count() else 0)
+            + (if (slice.getBaseFile.isPresent) 1 else 0)))))
+    totalLatestDataFiles
+  }
+
+  private def getTableFileSystemView(opts: Map[String, String]): 
HoodieMetadataFileSystemView = {
+    new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline, 
metadataWriter(getWriteConfig(opts)).getTableMetadata)
+  }
+
+  private def doWriteAndValidateBloomFilters(hudiOpts: Map[String, String],
+                                             operation: String,
+                                             saveMode: SaveMode,
+                                             shouldValidate: Boolean = true): 
Unit = {
+    var latestBatch: mutable.Buffer[String] = null
+    if (operation == UPSERT_OPERATION_OPT_VAL) {
+      val instantTime = getInstantTime()
+      val records = 
recordsToStrings(dataGen.generateUniqueUpdates(instantTime, 1))
+      records.addAll(recordsToStrings(dataGen.generateInserts(instantTime, 1)))
+      latestBatch = records.asScala
+    } else if (operation == INSERT_OVERWRITE_OPERATION_OPT_VAL) {
+      latestBatch = recordsToStrings(dataGen.generateInsertsForPartition(
+        getInstantTime(), 5, dataGen.getPartitionPaths.last)).asScala
+    } else {
+      latestBatch = recordsToStrings(dataGen.generateInserts(getInstantTime(), 
5)).asScala
+    }
+    val latestBatchDf = 
spark.read.json(spark.sparkContext.parallelize(latestBatch.toSeq, 2))
+    latestBatchDf.cache()
+    latestBatchDf.write.format("org.apache.hudi")
+      .options(hudiOpts)
+      .option(DataSourceWriteOptions.OPERATION.key, operation)
+      .mode(saveMode)
+      .save(basePath)
+
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    calculateMergedDf(latestBatchDf, operation)
+    if (shouldValidate) {
+      validateBloomFiltersIndex(hudiOpts)
+    }
+  }
+
+  private def getInstantTime(): String = {
+    String.format("%03d", new Integer(instantTime.incrementAndGet()))
+  }
+
+  private def validateBloomFiltersIndex(hudiOpts: Map[String, String]): Unit = 
{
+    metaClient = HoodieTableMetaClient.reload(metaClient)
+    val writeConfig = getWriteConfig(hudiOpts)
+    val metadata = metadataWriter(writeConfig).getTableMetadata
+    val readDf = spark.read.format("hudi").load(basePath)
+    val rowArr = readDf.collect()
+
+    assertTrue(rowArr.length > 0)
+
+    for (row <- rowArr) {
+      val recordKey: String = row.getAs("_hoodie_record_key")
+      val partitionPath: String = row.getAs("_hoodie_partition_path")
+      val fileName: String = row.getAs("_hoodie_file_name")
+      val bloomFilter = metadata.getBloomFilter(partitionPath, fileName)
+      assertTrue(bloomFilter.isPresent, "BloomFilter should be present for " + 
fileName)
+      assertTrue(bloomFilter.get().mightContain(recordKey))
+    }
+  }
+
+  private def getWriteConfig(hudiOpts: Map[String, String]): HoodieWriteConfig 
= {
+    val props = 
TypedProperties.fromMap(JavaConverters.mapAsJavaMapConverter(hudiOpts).asJava)
+    HoodieWriteConfig.newBuilder()
+      .withProps(props)
+      .withPath(basePath)
+      .build()
+  }
+
+  private def calculateMergedDf(latestBatchDf: DataFrame, operation: String): 
DataFrame = {
+    val prevDfOpt = mergedDfList.lastOption
+    if (prevDfOpt.isEmpty) {
+      mergedDfList = mergedDfList :+ latestBatchDf
+      sparkSession.emptyDataFrame
+    } else {
+      if (operation == INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL) {
+        mergedDfList = mergedDfList :+ latestBatchDf
+        prevDfOpt.get
+      } else if (operation == INSERT_OVERWRITE_OPERATION_OPT_VAL) {
+        val overwrittenPartitions = latestBatchDf.select("partition")
+          
.collectAsList().stream().map[String](JavaConversions.getFunction[Row, 
String](r => r.getString(0))).collect(Collectors.toList[String])
+        val prevDf = prevDfOpt.get
+        val latestSnapshot = prevDf
+          .filter(not(col("partition").isInCollection(overwrittenPartitions)))
+          .union(latestBatchDf)
+        mergedDfList = mergedDfList :+ latestSnapshot
+        prevDf.filter(col("partition").isInCollection(overwrittenPartitions))
+      } else {
+        val prevDf = prevDfOpt.get
+        val prevDfOld = prevDf.join(latestBatchDf, prevDf("_row_key") === 
latestBatchDf("_row_key")
+          && prevDf("partition") === latestBatchDf("partition") && 
prevDf("trip_type") === latestBatchDf("trip_type"), "leftanti")
+        val latestSnapshot = prevDfOld.union(latestBatchDf)
+        mergedDfList = mergedDfList :+ latestSnapshot
+        sparkSession.emptyDataFrame
+      }
+    }
+  }
+
+}

Reply via email to