[ 
https://issues.apache.org/jira/browse/SPARK-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Paul Shearer updated SPARK-12824:
---------------------------------
    Description: 
Below is a simple `pyspark` script that tries to split an RDD into a dictionary 
containing several RDDs. 

As the **sample run** shows, the script only works if we do a `collect()` on 
the intermediate RDDs as they are created. Of course I would not want to do 
that in practice, since it doesn't scale.

What's really strange is, I'm not assigning the intermediate `collect()` 
results to any variable. So the difference in behavior is due solely to a 
hidden side-effect of the computation triggered by the `collect()` call. 

Spark is supposed to be a very functional framework with minimal side effects. 
Why is it only possible to get the desired behavior by triggering some 
mysterious side effect using `collect()`?

The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.

h3. spark_script.py

{noformat}
    from pprint import PrettyPrinter
    pp = PrettyPrinter(indent=4).pprint
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
    logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
    
    def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
        d = dict()
        for key_value in key_values:
            d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
            if collect_in_loop:
                d[key_value].collect()
        return d
    def print_results(d):
        for k in d:
            print k
            pp(d[k].collect())    
    
    rdd = sc.parallelize([
        {'color':'red','size':3},
        {'color':'red', 'size':7},
        {'color':'red', 'size':8},    
        {'color':'red', 'size':10},
        {'color':'green', 'size':9},
        {'color':'green', 'size':5},
        {'color':'green', 'size':50},    
        {'color':'blue', 'size':4},
        {'color':'purple', 'size':6}])
    key_field = 'color'
    key_values = ['red', 'green', 'blue', 'purple']
    
    print '### run WITH collect in loop: '
    d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
    print_results(d)
    print '### run WITHOUT collect in loop: '
    d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
    print_results(d)
{noformat}

h3. Sample run in IPython shell

{noformat}
    In [1]: execfile('spark_script.py')
    ### run WITH collect in loop: 
    blue
    [{   'color': 'blue', 'size': 4}]
    purple
    [{   'color': 'purple', 'size': 6}]
    green
    [   {   'color': 'green', 'size': 9},
        {   'color': 'green', 'size': 5},
        {   'color': 'green', 'size': 50}]
    red
    [   {   'color': 'red', 'size': 3},
        {   'color': 'red', 'size': 7},
        {   'color': 'red', 'size': 8},
        {   'color': 'red', 'size': 10}]
    ### run WITHOUT collect in loop: 
    blue
    [{   'color': 'purple', 'size': 6}]
    purple
    [{   'color': 'purple', 'size': 6}]
    green
    [{   'color': 'purple', 'size': 6}]
    red
    [{   'color': 'purple', 'size': 6}]
{noformat}

  was:
Below is a simple `pyspark` script that tries to split an RDD into a dictionary 
containing several RDDs. 

As the **sample run** shows, the script only works if we do a `collect()` on 
the intermediate RDDs as they are created. Of course I would not want to do 
that in practice, since it doesn't scale.

What's really strange is, I'm not assigning the intermediate `collect()` 
results to any variable. So the difference in behavior is due solely to a 
hidden side-effect of the computation triggered by the `collect()` call. 

Spark is supposed to be a very functional framework with minimal side effects. 
Why is it only possible to get the desired behavior by triggering some 
mysterious side effect using `collect()`?

The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.

spark_script.py
```
    from pprint import PrettyPrinter
    pp = PrettyPrinter(indent=4).pprint
    logger = sc._jvm.org.apache.log4j
    logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
    logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
    
    def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
        d = dict()
        for key_value in key_values:
            d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
            if collect_in_loop:
                d[key_value].collect()
        return d
    def print_results(d):
        for k in d:
            print k
            pp(d[k].collect())    
    
    rdd = sc.parallelize([
        {'color':'red','size':3},
        {'color':'red', 'size':7},
        {'color':'red', 'size':8},    
        {'color':'red', 'size':10},
        {'color':'green', 'size':9},
        {'color':'green', 'size':5},
        {'color':'green', 'size':50},    
        {'color':'blue', 'size':4},
        {'color':'purple', 'size':6}])
    key_field = 'color'
    key_values = ['red', 'green', 'blue', 'purple']
    
    print '### run WITH collect in loop: '
    d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
    print_results(d)
    print '### run WITHOUT collect in loop: '
    d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
    print_results(d)
```

Sample run in IPython shell

```
    In [1]: execfile('spark_script.py')
    ### run WITH collect in loop: 
    blue
    [{   'color': 'blue', 'size': 4}]
    purple
    [{   'color': 'purple', 'size': 6}]
    green
    [   {   'color': 'green', 'size': 9},
        {   'color': 'green', 'size': 5},
        {   'color': 'green', 'size': 50}]
    red
    [   {   'color': 'red', 'size': 3},
        {   'color': 'red', 'size': 7},
        {   'color': 'red', 'size': 8},
        {   'color': 'red', 'size': 10}]
    ### run WITHOUT collect in loop: 
    blue
    [{   'color': 'purple', 'size': 6}]
    purple
    [{   'color': 'purple', 'size': 6}]
    green
    [{   'color': 'purple', 'size': 6}]
    red
    [{   'color': 'purple', 'size': 6}]
```


> Failure to maintain consistent RDD references in pyspark
> --------------------------------------------------------
>
>                 Key: SPARK-12824
>                 URL: https://issues.apache.org/jira/browse/SPARK-12824
>             Project: Spark
>          Issue Type: Bug
>         Environment: Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
>            Reporter: Paul Shearer
>
> Below is a simple `pyspark` script that tries to split an RDD into a 
> dictionary containing several RDDs. 
> As the **sample run** shows, the script only works if we do a `collect()` on 
> the intermediate RDDs as they are created. Of course I would not want to do 
> that in practice, since it doesn't scale.
> What's really strange is, I'm not assigning the intermediate `collect()` 
> results to any variable. So the difference in behavior is due solely to a 
> hidden side-effect of the computation triggered by the `collect()` call. 
> Spark is supposed to be a very functional framework with minimal side 
> effects. Why is it only possible to get the desired behavior by triggering 
> some mysterious side effect using `collect()`?
> The run below is with Spark 1.5.2, Python 2.7.10, and IPython 4.0.0.
> h3. spark_script.py
> {noformat}
>     from pprint import PrettyPrinter
>     pp = PrettyPrinter(indent=4).pprint
>     logger = sc._jvm.org.apache.log4j
>     logger.LogManager.getLogger("org"). setLevel( logger.Level.ERROR )
>     logger.LogManager.getLogger("akka").setLevel( logger.Level.ERROR )
>     
>     def split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False):
>         d = dict()
>         for key_value in key_values:
>             d[key_value] = rdd.filter(lambda row: row[key_field] == key_value)
>             if collect_in_loop:
>                 d[key_value].collect()
>         return d
>     def print_results(d):
>         for k in d:
>             print k
>             pp(d[k].collect())    
>     
>     rdd = sc.parallelize([
>         {'color':'red','size':3},
>         {'color':'red', 'size':7},
>         {'color':'red', 'size':8},    
>         {'color':'red', 'size':10},
>         {'color':'green', 'size':9},
>         {'color':'green', 'size':5},
>         {'color':'green', 'size':50},    
>         {'color':'blue', 'size':4},
>         {'color':'purple', 'size':6}])
>     key_field = 'color'
>     key_values = ['red', 'green', 'blue', 'purple']
>     
>     print '### run WITH collect in loop: '
>     d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=True)
>     print_results(d)
>     print '### run WITHOUT collect in loop: '
>     d = split_RDD_by_key(rdd, key_field, key_values, collect_in_loop=False)
>     print_results(d)
> {noformat}
> h3. Sample run in IPython shell
> {noformat}
>     In [1]: execfile('spark_script.py')
>     ### run WITH collect in loop: 
>     blue
>     [{   'color': 'blue', 'size': 4}]
>     purple
>     [{   'color': 'purple', 'size': 6}]
>     green
>     [   {   'color': 'green', 'size': 9},
>         {   'color': 'green', 'size': 5},
>         {   'color': 'green', 'size': 50}]
>     red
>     [   {   'color': 'red', 'size': 3},
>         {   'color': 'red', 'size': 7},
>         {   'color': 'red', 'size': 8},
>         {   'color': 'red', 'size': 10}]
>     ### run WITHOUT collect in loop: 
>     blue
>     [{   'color': 'purple', 'size': 6}]
>     purple
>     [{   'color': 'purple', 'size': 6}]
>     green
>     [{   'color': 'purple', 'size': 6}]
>     red
>     [{   'color': 'purple', 'size': 6}]
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to