parisni opened a new issue, #7117:
URL: https://github.com/apache/hudi/issues/7117

   hudi 0.12.1
   spark 3.2.1
   
   hudi has its own bloom implementation used mainly for fast lookup on the 
hudi key. Up to 0.11 hudi allow to store them in the metadata table, and also 
add bloom on several columns. So far those bloom are not use at read time.
   
   Spark leverage parquet bloom filters at write time. They are then used at 
read time to skip files. Those bloom can improve queries a lot. But so far, 
hudi don't support them : 
   
   Basic parquet table => bloom works as expected: the accumulator is not used 
when filtered values are outside df values, but incremented when values matches 
   ```scala
   spark.sql("set parquet.filter.bloom.enabled=true")
   spark.sql("set parquet.filter.columnindex.enabled=false")
   spark.sql("set parquet.filter.stats.enabled=false")
   
   import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
   import org.apache.spark.sql.Row
   {spark.sql("select '1' as b, '2' c union select '2', 
'3'").write.mode("overwrite").option("parquet.bloom.filter.enabled#b", 
"true").parquet("/tmp/bloom")}
   
   class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
     private var _sum = 0
     override def isZero: Boolean = _sum == 0
     override def copy(): AccumulatorV2[Integer, Integer] = {
       val acc = new NumRowGroupsAcc()
       acc._sum = _sum
       acc
     }
     override def reset(): Unit = _sum = 0
     override def add(v: Integer): Unit = _sum += v
     override def merge(other: AccumulatorV2[Integer, Integer]): Unit = other 
match {
       case a: NumRowGroupsAcc => _sum += a._sum
       case _ => throw new UnsupportedOperationException(
         s"Cannot merge ${this.getClass.getName} with 
${other.getClass.getName}")
     }
     override def value: Integer = _sum
   }
   val accu = new NumRowGroupsAcc
   sc.register(accu)
   spark.read.format("parquet").load("/tmp/bloom").filter("b = 
'3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
   println(accu)
   spark.read.format("parquet").load("/tmp/bloom").filter("b = 
'2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
   println(accu)
   
   /**
   NumRowGroupsAcc(id: 5611, name: None, value: 0)
   NumRowGroupsAcc(id: 5611, name: None, value: 1)
   /**
   ```
   
   Hudi parquet table => bloom works NOT as expected: the accumulator is 
incremented when match and no match
   ```scala
   spark.sql("set parquet.filter.bloom.enabled=true")
   spark.sql("set parquet.filter.columnindex.enabled=false")
   spark.sql("set parquet.filter.stats.enabled=false")
   import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
   
   val basePath = "/tmp/bloom_hudi/"
   val hiveOptions = Map[String, String](
     "hoodie.table.name"-> "hudi_test",
   "hoodie.datasource.write.recordkey.field"-> "a",
   "hoodie.datasource.write.partitionpath.field"-> "d",
   "hoodie.datasource.write.precombine.field"-> "c",
   "hoodie.datasource.write.table.name"-> "hudi_test",
   "hoodie.datasource.write.operation" -> "bulk_insert",
   "parquet.bloom.filter.enabled#b" -> "true"
   )
   spark.sql("select '0' as a, '1' as b, '2' c, 4 as d union select '1', '2', 
'3', 4").write.format("hudi").
   options(hiveOptions).
   mode("overwrite").
   save(basePath)
   
   class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
     private var _sum = 0
     override def isZero: Boolean = _sum == 0
     override def copy(): AccumulatorV2[Integer, Integer] = {
       val acc = new NumRowGroupsAcc()
       acc._sum = _sum
       acc
     }
     override def reset(): Unit = _sum = 0
     override def add(v: Integer): Unit = _sum += v
     override def merge(other: AccumulatorV2[Integer, Integer]): Unit = other 
match {
       case a: NumRowGroupsAcc => _sum += a._sum
       case _ => throw new UnsupportedOperationException(
         s"Cannot merge ${this.getClass.getName} with 
${other.getClass.getName}")
     }
     override def value: Integer = _sum
   }
   val accu = new NumRowGroupsAcc
   sc.register(accu)
   spark.read.format("hudi").load(basePath).filter("b = 
'3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
   println(accu)
   spark.read.format("hudi").load(basePath).filter("b = 
'2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
   println(accu)
   /**
   accu: NumRowGroupsAcc = NumRowGroupsAcc(id: 635, name: None, value: 1)
   accu: NumRowGroupsAcc = NumRowGroupsAcc(id: 635, name: None, value: 2)
   
   /*
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to