[
https://issues.apache.org/jira/browse/SPARK-3721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14185189#comment-14185189
]
Daniel Darabos commented on SPARK-3721:
---------------------------------------
We are hitting all kinds of MaxInt and array size limits when broadcasting a
12GB beast from Scala too.
> Broadcast Variables above 2GB break in PySpark
> ----------------------------------------------
>
> Key: SPARK-3721
> URL: https://issues.apache.org/jira/browse/SPARK-3721
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 1.1.0
> Reporter: Brad Miller
> Assignee: Davies Liu
>
> The bug displays 3 unique failure modes in PySpark, all of which seem to be
> related to broadcast variable size. Note that the tests below ran python
> 2.7.3 on all machines and used the Spark 1.1.0 binaries.
> **BLOCK 1** [no problem]
> {noformat}
> import cPickle
> from pyspark import SparkContext
> def check_pre_serialized(size):
> msg = cPickle.dumps(range(2 ** size))
> print 'serialized length:', len(msg)
> bvar = sc.broadcast(msg)
> print 'length recovered from broadcast variable:', len(bvar.value)
> print 'correct value recovered:', msg == bvar.value
> bvar.unpersist()
> def check_unserialized(size):
> msg = range(2 ** size)
> bvar = sc.broadcast(msg)
> print 'correct value recovered:', msg == bvar.value
> bvar.unpersist()
> SparkContext.setSystemProperty('spark.executor.memory', '15g')
> SparkContext.setSystemProperty('spark.cores.max', '5')
> sc = SparkContext('spark://crosby.research.intel-research.net:7077',
> 'broadcast_bug')
> {noformat}
> **BLOCK 2** [no problem]
> {noformat}
> check_pre_serialized(20)
> > serialized length: 9374656
> > length recovered from broadcast variable: 9374656
> > correct value recovered: True
> {noformat}
> **BLOCK 3** [no problem]
> {noformat}
> check_unserialized(20)
> > correct value recovered: True
> {noformat}
> **BLOCK 4** [no problem]
> {noformat}
> check_pre_serialized(27)
> > serialized length: 1499501632
> > length recovered from broadcast variable: 1499501632
> > correct value recovered: True
> {noformat}
> **BLOCK 5** [no problem]
> {noformat}
> check_unserialized(27)
> > correct value recovered: True
> {noformat}
> **BLOCK 6** **[ERROR 1: unhandled error from cPickle.dumps inside
> sc.broadcast]**
> {noformat}
> check_pre_serialized(28)
> .....
> > /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
> > 354
> > 355 def dumps(self, obj):
> > --> 356 return cPickle.dumps(obj, 2)
> > 357
> > 358 loads = cPickle.loads
> >
> > SystemError: error return without exception set
> {noformat}
> **BLOCK 7** [no problem]
> {noformat}
> check_unserialized(28)
> > correct value recovered: True
> {noformat}
> **BLOCK 8** **[ERROR 2: no error occurs and *incorrect result* is returned]**
> {noformat}
> check_pre_serialized(29)
> > serialized length: 6331339840
> > length recovered from broadcast variable: 2036372544
> > correct value recovered: False
> {noformat}
> **BLOCK 9** **[ERROR 3: unhandled error from zlib.compress inside
> sc.broadcast]**
> {noformat}
> check_unserialized(29)
> ......
> > /home/spark/greatest/python/pyspark/serializers.py in dumps(self, obj)
> > 418
> > 419 def dumps(self, obj):
> > --> 420 return zlib.compress(self.serializer.dumps(obj), 1)
> > 421
> > 422 def loads(self, obj):
> >
> > OverflowError: size does not fit in an int
> {noformat}
> **BLOCK 10** [ERROR 1]
> {noformat}
> check_pre_serialized(30)
> ...same as above...
> {noformat}
> **BLOCK 11** [ERROR 3]
> {noformat}
> check_unserialized(30)
> ...same as above...
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]