parisni opened a new issue, #7117:
URL: https://github.com/apache/hudi/issues/7117
hudi 0.12.1
spark 3.2.1
hudi has its own bloom implementation used mainly for fast lookup on the
hudi key. Up to 0.11 hudi allow to store them in the metadata table, and also
add bloom on several columns. So far those bloom are not use at read time.
Spark leverage parquet bloom filters at write time. They are then used at
read time to skip files. Those bloom can improve queries a lot. But so far,
hudi don't support them :
Basic parquet table => bloom works as expected: the accumulator is not used
when filtered values are outside df values, but incremented when values matches
```scala
spark.sql("set parquet.filter.bloom.enabled=true")
spark.sql("set parquet.filter.columnindex.enabled=false")
spark.sql("set parquet.filter.stats.enabled=false")
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
import org.apache.spark.sql.Row
{spark.sql("select '1' as b, '2' c union select '2',
'3'").write.mode("overwrite").option("parquet.bloom.filter.enabled#b",
"true").parquet("/tmp/bloom")}
class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
private var _sum = 0
override def isZero: Boolean = _sum == 0
override def copy(): AccumulatorV2[Integer, Integer] = {
val acc = new NumRowGroupsAcc()
acc._sum = _sum
acc
}
override def reset(): Unit = _sum = 0
override def add(v: Integer): Unit = _sum += v
override def merge(other: AccumulatorV2[Integer, Integer]): Unit = other
match {
case a: NumRowGroupsAcc => _sum += a._sum
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with
${other.getClass.getName}")
}
override def value: Integer = _sum
}
val accu = new NumRowGroupsAcc
sc.register(accu)
spark.read.format("parquet").load("/tmp/bloom").filter("b =
'3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
println(accu)
spark.read.format("parquet").load("/tmp/bloom").filter("b =
'2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
println(accu)
/**
NumRowGroupsAcc(id: 5611, name: None, value: 0)
NumRowGroupsAcc(id: 5611, name: None, value: 1)
/**
```
Hudi parquet table => bloom works NOT as expected: the accumulator is
incremented when match and no match
```scala
spark.sql("set parquet.filter.bloom.enabled=true")
spark.sql("set parquet.filter.columnindex.enabled=false")
spark.sql("set parquet.filter.stats.enabled=false")
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
val basePath = "/tmp/bloom_hudi/"
val hiveOptions = Map[String, String](
"hoodie.table.name"-> "hudi_test",
"hoodie.datasource.write.recordkey.field"-> "a",
"hoodie.datasource.write.partitionpath.field"-> "d",
"hoodie.datasource.write.precombine.field"-> "c",
"hoodie.datasource.write.table.name"-> "hudi_test",
"hoodie.datasource.write.operation" -> "bulk_insert",
"parquet.bloom.filter.enabled#b" -> "true"
)
spark.sql("select '0' as a, '1' as b, '2' c, 4 as d union select '1', '2',
'3', 4").write.format("hudi").
options(hiveOptions).
mode("overwrite").
save(basePath)
class NumRowGroupsAcc extends AccumulatorV2[Integer, Integer] {
private var _sum = 0
override def isZero: Boolean = _sum == 0
override def copy(): AccumulatorV2[Integer, Integer] = {
val acc = new NumRowGroupsAcc()
acc._sum = _sum
acc
}
override def reset(): Unit = _sum = 0
override def add(v: Integer): Unit = _sum += v
override def merge(other: AccumulatorV2[Integer, Integer]): Unit = other
match {
case a: NumRowGroupsAcc => _sum += a._sum
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with
${other.getClass.getName}")
}
override def value: Integer = _sum
}
val accu = new NumRowGroupsAcc
sc.register(accu)
spark.read.format("hudi").load(basePath).filter("b =
'3'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
println(accu)
spark.read.format("hudi").load(basePath).filter("b =
'2'").foreachPartition((it: Iterator[Row]) => it.foreach(_ => accu.add(0)))
println(accu)
/**
accu: NumRowGroupsAcc = NumRowGroupsAcc(id: 635, name: None, value: 1)
accu: NumRowGroupsAcc = NumRowGroupsAcc(id: 635, name: None, value: 2)
/*
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]