Himangshu Ranjan Borah created SPARK-24910:
----------------------------------------------

             Summary: Spark Bloom Filter Closure Serialization improvement for 
very high volume of Data
                 Key: SPARK-24910
                 URL: https://issues.apache.org/jira/browse/SPARK-24910
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core, SQL
    Affects Versions: 2.3.1
            Reporter: Himangshu Ranjan Borah


I am proposing an improvement to the Bloom Filter Generation logic being used 
in the DataFrameStatFunctions' Bloom Filter API using mapPartitions() instead 
of aggregate() to avoid closure serialization which fails for huge BitArrays.

Spark's Stat Functions' Bloom Filter Implementation uses 
aggregate/treeAggregate operations which uses a closure with a dependency on 
the bloom filter that is created in the driver. Since Spark hard codes the 
closure serializer to Java Serializer it fails in closure cleanup for very big 
sizes of Bloom Filters (Typically with num items ~ Billions and with fpp ~ 
0.001). Kryo serializer work's fine in such a scale but seems like there were 
some issues using Kryo for closure serialization due to which Spark 2.0 
hardcoded it to Java. The call-stack that we get typically looks like,

{{{color:#f79232}java.lang.OutOfMemoryError{color}}}
{{{color:#f79232} at 
java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123){color}}}
{{{color:#f79232} at 
java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117){color}}}
{{{color:#f79232} at 
java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93){color}}}
{{{color:#f79232} at 
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153){color}}}
{{{color:#f79232} at 
org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41){color}}}
{{{color:#f79232} at 
java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877){color}}}
{{{color:#f79232} at 
java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786){color}}}
{{{color:#f79232} at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189){color}}}
{{{color:#f79232} at 
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548){color}}}
{{{color:#f79232} at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509){color}}}
{{{color:#f79232} at 
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432){color}}}
{{{color:#f79232} at 
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178){color}}}
{{{color:#f79232} at 
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348){color}}}
{{{color:#f79232} at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43){color}}}
{{{color:#f79232} at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100){color}}}
{{{color:#f79232} at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342){color}}}
{{{color:#f79232} at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335){color}}}
{{{color:#f79232} at 
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159){color}}}
{{{color:#f79232} at 
org.apache.spark.SparkContext.clean(SparkContext.scala:2292){color}}}
{{{color:#f79232} at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2022){color}}}
{{{color:#f79232} at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2124){color}}}
{{{color:#f79232} at 
org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092){color}}}
{{{color:#f79232} at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151){color}}}
{{{color:#f79232} at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDD.withScope(RDD.scala:363){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDD.fold(RDD.scala:1086){color}}}
{{{color:#f79232} at 
org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155){color}}}
{{{color:#f79232} at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151){color}}}
{{{color:#f79232} at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112){color}}}
{{{color:#f79232} at org.apache.spark.rdd.RDD.withScope(RDD.scala:363){color}}}
{{{color:#f79232} at 
org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131){color}}}
{{{color:#f79232} at 
org.apache.spark.sql.DataFrameStatFunctions.buildBloomFilter(DataFrameStatFunctions.scala:554){color}}}
{{{color:#f79232} at 
org.apache.spark.sql.DataFrameStatFunctions.bloomFilter(DataFrameStatFunctions.scala:505){color}}}

This issue can be overcome if we *don't* use the *aggregate()* operations for 
the Bloom Filter generation and use *mapPartitions()* kind of operations where 
we create the Bloom Filters inside the executors (by giving them enough memory 
and controlling the no. of executors and partitions) and then return the final 
bloom filters per partition to the driver after which we can aggregate just the 
bloom filters either in the driver itself or distributing it using  
theeAggregate(). This way we can make use of the Kryo serializer for just the 
returning Bloom Filter and it works fine on big datasets and is scalable 
according to the cluster specifications. Please comment on this from your end 
or let me know of any upcoming improvements on this issue that you might 
already be working on.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to