vinothchandar commented on code in PR #12290:
URL: https://github.com/apache/hudi/pull/12290#discussion_r1850227135


##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -184,10 +184,10 @@ public Option<BloomFilter> getBloomFilter(final String 
partitionName, final Stri
   }
 
   @Override
-  public Map<Pair<String, String>, BloomFilter> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+  public Map<Pair<String, String>, BloomFilter> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList, final String 
metadataPartitionName)
       throws HoodieMetadataException {
-    if 
(!dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.BLOOM_FILTERS))
 {
-      LOG.error("Metadata bloom filter index is disabled!");
+    if 
(!dataMetaClient.getTableConfig().getMetadataPartitions().contains(metadataPartitionName))
 {
+      LOG.error("Metadata partition not found {}", metadataPartitionName);

Review Comment:
   why make the error message more generic vs stating specifically. that bloom 
filter index is what was disabled.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -312,6 +312,18 @@ private static void 
constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
     }
   }
 
+  public static boolean isGenericIndex(String metadataPartitionPath) {

Review Comment:
   actually find this confusing. a generic index is sth that is either using a 
function to index or a secondary index?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -312,6 +312,18 @@ private static void 
constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
     }
   }
 
+  public static boolean isGenericIndex(String metadataPartitionPath) {

Review Comment:
   not fixable in this PR i think



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -150,21 +151,22 @@ public static HoodieData<HoodieRecord> 
getFunctionalIndexRecordsUsingColumnStats
   }
 
   public static HoodieData<HoodieRecord> 
getFunctionalIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String 
columnToIndex,
-                                                                               
    HoodieWriteConfig metadataWriteConfig, String instantTime) {
+                                                                               
    HoodieWriteConfig metadataWriteConfig, String instantTime, String 
indexName) {

Review Comment:
   can we UT this method



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala:
##########
@@ -59,10 +59,20 @@ class FunctionalIndexSupport(spark: SparkSession,
                                         ): Option[Set[String]] = {
     lazy val functionalIndexPartitionOpt = 
getFunctionalIndexPartition(queryFilters)
     if (isIndexAvailable && queryFilters.nonEmpty && 
functionalIndexPartitionOpt.nonEmpty) {
-      val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns, 
inMemoryProjectionThreshold)
-      val (prunedPartitions, prunedFileNames) = 
getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices)
-      val indexDf = 
loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get, prunedPartitions, 
readInMemory)
-      Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
+      val indexPartition = functionalIndexPartitionOpt.get
+      val indexDefinition = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
+      if 
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS))
 {
+        val readInMemory = shouldReadInMemory(fileIndex, 
queryReferencedColumns, inMemoryProjectionThreshold)
+        val (prunedPartitions, prunedFileNames) = 
getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices)
+        val indexDf = loadFunctionalIndexDataFrame(indexPartition, 
prunedPartitions, readInMemory)
+        Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
+      } else if 
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS))
 {
+        val prunedPartitionAndFileNames = 
getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices, 
includeLogFiles = true)
+        // TODO: we should get the list of keys to pass from queryFilters

Review Comment:
   don't we need this



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -226,6 +232,22 @@ public void buildIndexDefinition(String indexMetaPath,
     }
   }
 
+  /**
+   * Deletes index definition and writes to index definition file.
+   *
+   * @param indexName Name of the index
+   */
+  public void deleteIndexDefinition(String indexName) {

Review Comment:
   same. UT please



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala:
##########
@@ -198,6 +209,44 @@ class FunctionalIndexSupport(spark: SparkSession,
 
     columnStatsRecords
   }
+
+  private def 
getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],

Review Comment:
   can we make these package private/protected to get some UTs on them?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -1197,13 +1198,168 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @Test
+  def testBloomFiltersIndexWithChanges(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      val tableName = "test_bloom_filters_index_with_changes"
+      val hudiOpts = commonOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+      val sqlTableType = "cow"
+
+      spark.sql(
+        s"""
+           CREATE TABLE $tableName (
+           |    ts BIGINT,
+           |    id STRING,
+           |    rider STRING,
+           |    driver STRING,
+           |    fare DOUBLE,
+           |    city STRING,
+           |    state STRING
+           |) USING HUDI
+           |options(
+           |    primaryKey ='id',
+           |    type = '$sqlTableType',
+           |    hoodie.metadata.enable = 'true',
+           |    hoodie.datasource.write.recordkey.field = 'id',
+           |    hoodie.enable.data.skipping = 'true'
+           |)
+           |PARTITIONED BY (state)
+           |location '$basePath'
+       """.stripMargin)
+
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.metadata.enable=true")
+
+      spark.sql(s"""
+           |insert into $tableName(ts, id, rider, driver, fare, city, state) 
VALUES
+           |  
(1695159649,'trip1','rider-A','driver-K',19.10,'san_francisco','california'),
+           |  
(1695414531,'trip6','rider-C','driver-K',17.14,'san_diego','california'),
+           |  (1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas'),
+           |  (1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas')
+           |""".stripMargin)
+
+      spark.sql(
+        s"""
+           |insert into $tableName(ts, id, rider, driver, fare, city, state) 
VALUES
+           |  
(1695091554,'trip2','rider-C','driver-M',27.70,'sunnyvale','california'),
+           |  
(1699349649,'trip5','rider-A','driver-Q',3.32,'san_diego','texas')
+           |""".stripMargin)
+
+      spark.sql(s"create index idx_bloom_$tableName on $tableName using 
bloom_filters(city) options(numHashFunctions=1, fpp=0.00000000001)")
+
+      checkAnswer(s"select id, rider from $tableName where city = 
'sunnyvale'")(
+        Seq("trip2", "rider-C")
+      )
+
+      if (true) {

Review Comment:
   @lokeshj1703 this was from my debugging.. :) please remove and cleanup.. Add 
calls to `verifyQueryPredicate` to verify query correctness



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -1197,13 +1198,168 @@ class TestSecondaryIndexPruning extends 
SparkClientFunctionalTestHarness {
     }
   }
 
+  @Test
+  def testBloomFiltersIndexWithChanges(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      val tableName = "test_bloom_filters_index_with_changes"
+      val hudiOpts = commonOpts ++ Map(
+        DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+        DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+      val sqlTableType = "cow"
+
+      spark.sql(
+        s"""
+           CREATE TABLE $tableName (
+           |    ts BIGINT,
+           |    id STRING,
+           |    rider STRING,
+           |    driver STRING,
+           |    fare DOUBLE,
+           |    city STRING,
+           |    state STRING
+           |) USING HUDI
+           |options(
+           |    primaryKey ='id',
+           |    type = '$sqlTableType',
+           |    hoodie.metadata.enable = 'true',
+           |    hoodie.datasource.write.recordkey.field = 'id',
+           |    hoodie.enable.data.skipping = 'true'
+           |)
+           |PARTITIONED BY (state)
+           |location '$basePath'
+       """.stripMargin)
+
+      spark.sql("set hoodie.parquet.small.file.limit=0")
+      spark.sql("set hoodie.enable.data.skipping=true")
+      spark.sql("set hoodie.metadata.enable=true")
+
+      spark.sql(s"""
+           |insert into $tableName(ts, id, rider, driver, fare, city, state) 
VALUES
+           |  
(1695159649,'trip1','rider-A','driver-K',19.10,'san_francisco','california'),
+           |  
(1695414531,'trip6','rider-C','driver-K',17.14,'san_diego','california'),
+           |  (1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas'),
+           |  (1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas')
+           |""".stripMargin)
+
+      spark.sql(
+        s"""
+           |insert into $tableName(ts, id, rider, driver, fare, city, state) 
VALUES
+           |  
(1695091554,'trip2','rider-C','driver-M',27.70,'sunnyvale','california'),
+           |  
(1699349649,'trip5','rider-A','driver-Q',3.32,'san_diego','texas')
+           |""".stripMargin)
+
+      spark.sql(s"create index idx_bloom_$tableName on $tableName using 
bloom_filters(city) options(numHashFunctions=1, fpp=0.00000000001)")
+
+      checkAnswer(s"select id, rider from $tableName where city = 
'sunnyvale'")(
+        Seq("trip2", "rider-C")
+      )
+
+      if (true) {
+        val a = 1;
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])

Review Comment:
   same for this test



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to