Charles Hayden created SPARK-6792:
-------------------------------------

             Summary: pySpark groupByKey returns rows with the same key
                 Key: SPARK-6792
                 URL: https://issues.apache.org/jira/browse/SPARK-6792
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 1.3.0
            Reporter: Charles Hayden


Under some circumstances, pySpark groupByKey returns two or more rows with the 
same groupby key.

It is not reproducible by a short example, but it can be seen in the following 
program.
The preservesPartitioning argument is required to see the failure.
I ran this  with cluster_url=local[4], but I think it will also show up with 
cluster_url=local.
=====================================================

# The RDD.groupByKey sometimes gives two results with the same key value.  This 
is incorrect: all results with a single key need to be grouped together.

# Report the spark version
from pyspark import SparkContext
import StringIO
import csv
sc = SparkContext()
print sc.version


def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.reader(input, delimiter='\t')
    return reader.next()

# Read data from movielens dataset
# This can be obtained from 
http://files.grouplens.org/datasets/movielens/ml-100k.zip
inputFile = 'u.data'
input = sc.textFile(inputFile)
data = input.map(loadRecord)
# Trim off unneeded fields
data = data.map(lambda row: row[0:2])
print 'Data Sample'
print data.take(10)


# Use join to filter the data
#
# map bulds left key
# map builds right key
# join
# map throws away the key and gets result

# pick a couple of users
j = sc.parallelize([789, 939])
# key left
# conversion to str is required to show the error
keyed_j = j.map(lambda row: (str(row), None))
# key right
keyed_rdd = data.map(lambda row: (str(row[0]), row))
# join
joined = keyed_rdd.join(keyed_j)
# throw away key
# preservesPartitioning is required to show the error
res = joined.map(lambda row: row[1][0], preservesPartitioning=True)
#res = joined.map(lambda row: row[1][0])      # no error

print 'Filtered Sample'
print res.take(10)
#print res.count()


# Do the groupby
# There should be fewer rows
keyed_rdd = res.map(lambda row: (row[1], row), preservesPartitioning=True)
print 'Input Count', keyed_rdd.count()
grouped_rdd = keyed_rdd.groupByKey()
print 'Grouped Count', grouped_rdd.count()


# There are two rows with the same key !
print 'Group Output Sample'
print grouped_rdd.filter(lambda row: row[0] == '508').take(10)




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to