[
https://issues.apache.org/jira/browse/SPARK-6792?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Charles Hayden updated SPARK-6792:
----------------------------------
Description:
Under some circumstances, pySpark groupByKey returns two or more rows with the
same groupby key.
It is not reproducible by a short example, but it can be seen in the following
program.
The preservesPartitioning argument is required to see the failure.
I ran this with cluster_url=local[4], but I think it will also show up with
cluster_url=local.
=====================================================
{noformat}
# The RDD.groupByKey sometimes gives two results with the same key value.
This is incorrect: all results with a single key need to be grouped together.
# Report the spark version
from pyspark import SparkContext
import StringIO
import csv
sc = SparkContext()
print sc.version
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.reader(input, delimiter='\t')
return reader.next()
# Read data from movielens dataset
# This can be obtained from
http://files.grouplens.org/datasets/movielens/ml-100k.zip
inputFile = 'u.data'
input = sc.textFile(inputFile)
data = input.map(loadRecord)
# Trim off unneeded fields
data = data.map(lambda row: row[0:2])
print 'Data Sample'
print data.take(10)
# Use join to filter the data
#
# map bulds left key
# map builds right key
# join
# map throws away the key and gets result
# pick a couple of users
j = sc.parallelize([789, 939])
# key left
# conversion to str is required to show the error
keyed_j = j.map(lambda row: (str(row), None))
# key right
keyed_rdd = data.map(lambda row: (str(row[0]), row))
# join
joined = keyed_rdd.join(keyed_j)
# throw away key
# preservesPartitioning is required to show the error
res = joined.map(lambda row: row[1][0], preservesPartitioning=True)
#res = joined.map(lambda row: row[1][0]) # no error
print 'Filtered Sample'
print res.take(10)
#print res.count()
# Do the groupby
# There should be fewer rows
keyed_rdd = res.map(lambda row: (row[1], row),
preservesPartitioning=True)
print 'Input Count', keyed_rdd.count()
grouped_rdd = keyed_rdd.groupByKey()
print 'Grouped Count', grouped_rdd.count()
# There are two rows with the same key !
print 'Group Output Sample'
print grouped_rdd.filter(lambda row: row[0] == '508').take(10)
{noformat}
was:
Under some circumstances, pySpark groupByKey returns two or more rows with the
same groupby key.
It is not reproducible by a short example, but it can be seen in the following
program.
The preservesPartitioning argument is required to see the failure.
I ran this with cluster_url=local[4], but I think it will also show up with
cluster_url=local.
=====================================================
{noformat}
# The RDD.groupByKey sometimes gives two results with the same key value.
This is incorrect: all results with a single key need to be grouped together.
# Report the spark version
from pyspark import SparkContext
import StringIO
import csv
sc = SparkContext()
print sc.version
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.reader(input, delimiter='\t')
return reader.next()
# Read data from movielens dataset
# This can be obtained from
http://files.grouplens.org/datasets/movielens/ml-100k.zip
inputFile = 'u.data'
input = sc.textFile(inputFile)
data = input.map(loadRecord)
# Trim off unneeded fields
data = data.map(lambda row: row[0:2])
print 'Data Sample'
print data.take(10)
# Use join to filter the data
#
# map bulds left key
# map builds right key
# join
# map throws away the key and gets result
# pick a couple of users
j = sc.parallelize([789, 939])
# key left
# conversion to str is required to show the error
keyed_j = j.map(lambda row: (str(row), None))
# key right
keyed_rdd = data.map(lambda row: (str(row[0]), row))
# join
joined = keyed_rdd.join(keyed_j)
# throw away key
# preservesPartitioning is required to show the error
res = joined.map(lambda row: row[1][0], preservesPartitioning=True)
#res = joined.map(lambda row: row[1][0]) # no error
print 'Filtered Sample'
print res.take(10)
#print res.count()
# Do the groupby
# There should be fewer rows
keyed_rdd = res.map(lambda row: (row[1], row),
preservesPartitioning=True)
print 'Input Count', keyed_rdd.count()
grouped_rdd = keyed_rdd.groupByKey()
print 'Grouped Count', grouped_rdd.count()
# There are two rows with the same key !
print 'Group Output Sample'
print grouped_rdd.filter(lambda row: row[0] == '508').take(10)
{noformat}
> pySpark groupByKey returns rows with the same key
> -------------------------------------------------
>
> Key: SPARK-6792
> URL: https://issues.apache.org/jira/browse/SPARK-6792
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 1.3.0
> Reporter: Charles Hayden
>
> Under some circumstances, pySpark groupByKey returns two or more rows with
> the same groupby key.
> It is not reproducible by a short example, but it can be seen in the
> following program.
> The preservesPartitioning argument is required to see the failure.
> I ran this with cluster_url=local[4], but I think it will also show up with
> cluster_url=local.
> =====================================================
> {noformat}
> # The RDD.groupByKey sometimes gives two results with the same key
> value. This is incorrect: all results with a single key need to be grouped
> together.
> # Report the spark version
> from pyspark import SparkContext
> import StringIO
> import csv
> sc = SparkContext()
> print sc.version
> def loadRecord(line):
> input = StringIO.StringIO(line)
> reader = csv.reader(input, delimiter='\t')
> return reader.next()
> # Read data from movielens dataset
> # This can be obtained from
> http://files.grouplens.org/datasets/movielens/ml-100k.zip
> inputFile = 'u.data'
> input = sc.textFile(inputFile)
> data = input.map(loadRecord)
> # Trim off unneeded fields
> data = data.map(lambda row: row[0:2])
> print 'Data Sample'
> print data.take(10)
> # Use join to filter the data
> #
> # map bulds left key
> # map builds right key
> # join
> # map throws away the key and gets result
> # pick a couple of users
> j = sc.parallelize([789, 939])
> # key left
> # conversion to str is required to show the error
> keyed_j = j.map(lambda row: (str(row), None))
> # key right
> keyed_rdd = data.map(lambda row: (str(row[0]), row))
> # join
> joined = keyed_rdd.join(keyed_j)
> # throw away key
> # preservesPartitioning is required to show the error
> res = joined.map(lambda row: row[1][0], preservesPartitioning=True)
> #res = joined.map(lambda row: row[1][0]) # no error
> print 'Filtered Sample'
> print res.take(10)
> #print res.count()
> # Do the groupby
> # There should be fewer rows
> keyed_rdd = res.map(lambda row: (row[1], row),
> preservesPartitioning=True)
> print 'Input Count', keyed_rdd.count()
> grouped_rdd = keyed_rdd.groupByKey()
> print 'Grouped Count', grouped_rdd.count()
> # There are two rows with the same key !
> print 'Group Output Sample'
> print grouped_rdd.filter(lambda row: row[0] == '508').take(10)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]