vinothchandar commented on code in PR #12290:
URL: https://github.com/apache/hudi/pull/12290#discussion_r1850227135
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -184,10 +184,10 @@ public Option<BloomFilter> getBloomFilter(final String
partitionName, final Stri
}
@Override
- public Map<Pair<String, String>, BloomFilter> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList)
+ public Map<Pair<String, String>, BloomFilter> getBloomFilters(final
List<Pair<String, String>> partitionNameFileNameList, final String
metadataPartitionName)
throws HoodieMetadataException {
- if
(!dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.BLOOM_FILTERS))
{
- LOG.error("Metadata bloom filter index is disabled!");
+ if
(!dataMetaClient.getTableConfig().getMetadataPartitions().contains(metadataPartitionName))
{
+ LOG.error("Metadata partition not found {}", metadataPartitionName);
Review Comment:
why make the error message more generic vs stating specifically. that bloom
filter index is what was disabled.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -312,6 +312,18 @@ private static void
constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
}
}
+ public static boolean isGenericIndex(String metadataPartitionPath) {
Review Comment:
actually find this confusing. a generic index is sth that is either using a
function to index or a secondary index?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -312,6 +312,18 @@ private static void
constructColumnStatsMetadataPayload(HoodieMetadataPayload pa
}
}
+ public static boolean isGenericIndex(String metadataPartitionPath) {
Review Comment:
not fixable in this PR i think
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java:
##########
@@ -150,21 +151,22 @@ public static HoodieData<HoodieRecord>
getFunctionalIndexRecordsUsingColumnStats
}
public static HoodieData<HoodieRecord>
getFunctionalIndexRecordsUsingBloomFilter(Dataset<Row> dataset, String
columnToIndex,
-
HoodieWriteConfig metadataWriteConfig, String instantTime) {
+
HoodieWriteConfig metadataWriteConfig, String instantTime, String
indexName) {
Review Comment:
can we UT this method
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala:
##########
@@ -59,10 +59,20 @@ class FunctionalIndexSupport(spark: SparkSession,
): Option[Set[String]] = {
lazy val functionalIndexPartitionOpt =
getFunctionalIndexPartition(queryFilters)
if (isIndexAvailable && queryFilters.nonEmpty &&
functionalIndexPartitionOpt.nonEmpty) {
- val readInMemory = shouldReadInMemory(fileIndex, queryReferencedColumns,
inMemoryProjectionThreshold)
- val (prunedPartitions, prunedFileNames) =
getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices)
- val indexDf =
loadFunctionalIndexDataFrame(functionalIndexPartitionOpt.get, prunedPartitions,
readInMemory)
- Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
+ val indexPartition = functionalIndexPartitionOpt.get
+ val indexDefinition =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
+ if
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS))
{
+ val readInMemory = shouldReadInMemory(fileIndex,
queryReferencedColumns, inMemoryProjectionThreshold)
+ val (prunedPartitions, prunedFileNames) =
getPrunedPartitionsAndFileNames(prunedPartitionsAndFileSlices)
+ val indexDf = loadFunctionalIndexDataFrame(indexPartition,
prunedPartitions, readInMemory)
+ Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
+ } else if
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS))
{
+ val prunedPartitionAndFileNames =
getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices,
includeLogFiles = true)
+ // TODO: we should get the list of keys to pass from queryFilters
Review Comment:
don't we need this
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -226,6 +232,22 @@ public void buildIndexDefinition(String indexMetaPath,
}
}
+ /**
+ * Deletes index definition and writes to index definition file.
+ *
+ * @param indexName Name of the index
+ */
+ public void deleteIndexDefinition(String indexName) {
Review Comment:
same. UT please
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/FunctionalIndexSupport.scala:
##########
@@ -198,6 +209,44 @@ class FunctionalIndexSupport(spark: SparkSession,
columnStatsRecords
}
+
+ private def
getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices:
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])],
Review Comment:
can we make these package private/protected to get some UTs on them?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -1197,13 +1198,168 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
}
}
+ @Test
+ def testBloomFiltersIndexWithChanges(): Unit = {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ val tableName = "test_bloom_filters_index_with_changes"
+ val hudiOpts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+ val sqlTableType = "cow"
+
+ spark.sql(
+ s"""
+ CREATE TABLE $tableName (
+ | ts BIGINT,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | city STRING,
+ | state STRING
+ |) USING HUDI
+ |options(
+ | primaryKey ='id',
+ | type = '$sqlTableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'id',
+ | hoodie.enable.data.skipping = 'true'
+ |)
+ |PARTITIONED BY (state)
+ |location '$basePath'
+ """.stripMargin)
+
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.metadata.enable=true")
+
+ spark.sql(s"""
+ |insert into $tableName(ts, id, rider, driver, fare, city, state)
VALUES
+ |
(1695159649,'trip1','rider-A','driver-K',19.10,'san_francisco','california'),
+ |
(1695414531,'trip6','rider-C','driver-K',17.14,'san_diego','california'),
+ | (1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas'),
+ | (1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas')
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, city, state)
VALUES
+ |
(1695091554,'trip2','rider-C','driver-M',27.70,'sunnyvale','california'),
+ |
(1699349649,'trip5','rider-A','driver-Q',3.32,'san_diego','texas')
+ |""".stripMargin)
+
+ spark.sql(s"create index idx_bloom_$tableName on $tableName using
bloom_filters(city) options(numHashFunctions=1, fpp=0.00000000001)")
+
+ checkAnswer(s"select id, rider from $tableName where city =
'sunnyvale'")(
+ Seq("trip2", "rider-C")
+ )
+
+ if (true) {
Review Comment:
@lokeshj1703 this was from my debugging.. :) please remove and cleanup.. Add
calls to `verifyQueryPredicate` to verify query correctness
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexPruning.scala:
##########
@@ -1197,13 +1198,168 @@ class TestSecondaryIndexPruning extends
SparkClientFunctionalTestHarness {
}
}
+ @Test
+ def testBloomFiltersIndexWithChanges(): Unit = {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ val tableName = "test_bloom_filters_index_with_changes"
+ val hudiOpts = commonOpts ++ Map(
+ DataSourceWriteOptions.TABLE_TYPE.key -> MOR_TABLE_TYPE_OPT_VAL,
+ DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true")
+ val sqlTableType = "cow"
+
+ spark.sql(
+ s"""
+ CREATE TABLE $tableName (
+ | ts BIGINT,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | city STRING,
+ | state STRING
+ |) USING HUDI
+ |options(
+ | primaryKey ='id',
+ | type = '$sqlTableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'id',
+ | hoodie.enable.data.skipping = 'true'
+ |)
+ |PARTITIONED BY (state)
+ |location '$basePath'
+ """.stripMargin)
+
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ spark.sql("set hoodie.enable.data.skipping=true")
+ spark.sql("set hoodie.metadata.enable=true")
+
+ spark.sql(s"""
+ |insert into $tableName(ts, id, rider, driver, fare, city, state)
VALUES
+ |
(1695159649,'trip1','rider-A','driver-K',19.10,'san_francisco','california'),
+ |
(1695414531,'trip6','rider-C','driver-K',17.14,'san_diego','california'),
+ | (1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas'),
+ | (1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas')
+ |""".stripMargin)
+
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, city, state)
VALUES
+ |
(1695091554,'trip2','rider-C','driver-M',27.70,'sunnyvale','california'),
+ |
(1699349649,'trip5','rider-A','driver-Q',3.32,'san_diego','texas')
+ |""".stripMargin)
+
+ spark.sql(s"create index idx_bloom_$tableName on $tableName using
bloom_filters(city) options(numHashFunctions=1, fpp=0.00000000001)")
+
+ checkAnswer(s"select id, rider from $tableName where city =
'sunnyvale'")(
+ Seq("trip2", "rider-C")
+ )
+
+ if (true) {
+ val a = 1;
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = classOf[HoodieTableType])
Review Comment:
same for this test
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]