Himangshu Ranjan Borah created SPARK-24910:
----------------------------------------------
Summary: Spark Bloom Filter Closure Serialization improvement for
very high volume of Data
Key: SPARK-24910
URL: https://issues.apache.org/jira/browse/SPARK-24910
Project: Spark
Issue Type: Improvement
Components: Spark Core, SQL
Affects Versions: 2.3.1
Reporter: Himangshu Ranjan Borah
I am proposing an improvement to the Bloom Filter Generation logic being used
in the DataFrameStatFunctions' Bloom Filter API using mapPartitions() instead
of aggregate() to avoid closure serialization which fails for huge BitArrays.
Spark's Stat Functions' Bloom Filter Implementation uses
aggregate/treeAggregate operations which uses a closure with a dependency on
the bloom filter that is created in the driver. Since Spark hard codes the
closure serializer to Java Serializer it fails in closure cleanup for very big
sizes of Bloom Filters (Typically with num items ~ Billions and with fpp ~
0.001). Kryo serializer work's fine in such a scale but seems like there were
some issues using Kryo for closure serialization due to which Spark 2.0
hardcoded it to Java. The call-stack that we get typically looks like,
{{{color:#f79232}java.lang.OutOfMemoryError{color}}}
{{{color:#f79232} at
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123){color}}}
{{{color:#f79232} at
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117){color}}}
{{{color:#f79232} at
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93){color}}}
{{{color:#f79232} at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153){color}}}
{{{color:#f79232} at
org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41){color}}}
{{{color:#f79232} at
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877){color}}}
{{{color:#f79232} at
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786){color}}}
{{{color:#f79232} at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189){color}}}
{{{color:#f79232} at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548){color}}}
{{{color:#f79232} at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509){color}}}
{{{color:#f79232} at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432){color}}}
{{{color:#f79232} at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178){color}}}
{{{color:#f79232} at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348){color}}}
{{{color:#f79232} at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43){color}}}
{{{color:#f79232} at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100){color}}}
{{{color:#f79232} at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342){color}}}
{{{color:#f79232} at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335){color}}}
{{{color:#f79232} at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159){color}}}
{{{color:#f79232} at
org.apache.spark.SparkContext.clean(SparkContext.scala:2292){color}}}
{{{color:#f79232} at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2022){color}}}
{{{color:#f79232} at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2124){color}}}
{{{color:#f79232} at
org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092){color}}}
{{{color:#f79232} at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151){color}}}
{{{color:#f79232} at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDD.withScope(RDD.scala:363){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDD.fold(RDD.scala:1086){color}}}
{{{color:#f79232} at
org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155){color}}}
{{{color:#f79232} at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151){color}}}
{{{color:#f79232} at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDD.withScope(RDD.scala:363){color}}}
{{{color:#f79232} at
org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131){color}}}
{{{color:#f79232} at
org.apache.spark.sql.DataFrameStatFunctions.buildBloomFilter(DataFrameStatFunctions.scala:554){color}}}
{{{color:#f79232} at
org.apache.spark.sql.DataFrameStatFunctions.bloomFilter(DataFrameStatFunctions.scala:505){color}}}
This issue can be overcome if we *don't* use the *aggregate()* operations for
the Bloom Filter generation and use *mapPartitions()* kind of operations where
we create the Bloom Filters inside the executors (by giving them enough memory
and controlling the no. of executors and partitions) and then return the final
bloom filters per partition to the driver after which we can aggregate just the
bloom filters either in the driver itself or distributing it using
theeAggregate(). This way we can make use of the Kryo serializer for just the
returning Bloom Filter and it works fine on big datasets and is scalable
according to the cluster specifications. Please comment on this from your end
or let me know of any upcoming improvements on this issue that you might
already be working on.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]