Re: How does HashPartitioner distribute data in Spark?

2017-06-25 Thread Russell Spitzer
A more clear explanation.

`parallelize` does not apply a partitioner. We can see this pretty quickly
with a quick code example

scala> val rdd1 = sc.parallelize(Seq(("aa" , 1),("aa",2), ("aa", 3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at
parallelize at :24

scala> rdd1.partitioner
res0: Option[org.apache.spark.Partitioner] = None

It has not partitioner because parallelize just packs up the collection
into partition metadata without looking at the actual contents of the
collection.

scala> rdd1.foreachPartition(it => println(it.length))
1
0
1
1
0
0
0
0

If we actually shuffle the data using the hash partitioner (using the
repartition command) we get the expected results.

scala> rdd1.repartition(8).foreachPartition(it => println(it.length))
0
0
0
0
0
0
0
3


On Sat, Jun 24, 2017 at 12:22 PM Russell Spitzer <russell.spit...@gmail.com>
wrote:

> Neither of your code examples invoke a repartitioning. Add in a
> repartition command.
>
> On Sat, Jun 24, 2017, 11:53 AM Vikash Pareek <
> vikash.par...@infoobjects.com> wrote:
>
>> Hi Vadim,
>>
>> Thank you for your response.
>>
>> I would like to know how partitioner choose the key, If we look at my
>> example then following question arises:
>> 1. In case of rdd1, hash partitioning should calculate hashcode of key
>> (i.e. *"aa"* in this case), so *all records should go to single
>> partition*
>> instead of uniform distribution?
>>  2. In case of rdd2, there is no key value pair so how hash partitoning
>> going to work i.e. *what is the key* to calculate hashcode?
>>
>>
>>
>> Best Regards,
>>
>>
>> [image: InfoObjects Inc.] <http://www.infoobjects.com/>
>> Vikash Pareek
>> Team Lead  *InfoObjects Inc.*
>> Big Data Analytics
>>
>> m: +91 8800206898 <+91%2088002%2006898> a: E5, Jhalana Institutionall
>> Area, Jaipur, Rajasthan 302004
>> w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com
>>
>>
>>
>>
>> On Fri, Jun 23, 2017 at 10:38 PM, Vadim Semenov <
>> vadim.seme...@datadoghq.com> wrote:
>>
>>> This is the code that chooses the partition for a key:
>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L85-L88
>>>
>>> it's basically `math.abs(key.hashCode % numberOfPartitions)`
>>>
>>> On Fri, Jun 23, 2017 at 3:42 AM, Vikash Pareek <
>>> vikash.par...@infoobjects.com> wrote:
>>>
>>>> I am trying to understand how spark partitoing works.
>>>>
>>>> To understand this I have following piece of code on spark 1.6
>>>>
>>>> def countByPartition1(rdd: RDD[(String, Int)]) = {
>>>> rdd.mapPartitions(iter => Iterator(iter.length))
>>>> }
>>>> def countByPartition2(rdd: RDD[String]) = {
>>>> rdd.mapPartitions(iter => Iterator(iter.length))
>>>> }
>>>>
>>>> //RDDs Creation
>>>> val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1),
>>>> ("aa",
>>>> 1)), 8)
>>>> countByPartition(rdd1).collect()
>>>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>>>
>>>> val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
>>>> countByPartition(rdd2).collect()
>>>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>>>
>>>> In both the cases data is distributed uniformaly.
>>>> I do have following questions on the basis of above observation:
>>>>
>>>>  1. In case of rdd1, hash partitioning should calculate hashcode of key
>>>> (i.e. "aa" in this case), so all records should go to single partition
>>>> instead of uniform distribution?
>>>>  2. In case of rdd2, there is no key value pair so how hash partitoning
>>>> going to work i.e. what is the key to calculate hashcode?
>>>>
>>>> I have followed @zero323 answer but not getting answer of these.
>>>>
>>>> https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work
>>>>
>>>>
>>>>
>>>>
>>>> -
>>>>
>>>> __Vikash Pareek
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-does-HashPartitioner-distribute-data-in-Spark-tp28785.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>>
>>>>
>>>
>>


Re: How does HashPartitioner distribute data in Spark?

2017-06-24 Thread Russell Spitzer
Neither of your code examples invoke a repartitioning. Add in a repartition
command.

On Sat, Jun 24, 2017, 11:53 AM Vikash Pareek <vikash.par...@infoobjects.com>
wrote:

> Hi Vadim,
>
> Thank you for your response.
>
> I would like to know how partitioner choose the key, If we look at my
> example then following question arises:
> 1. In case of rdd1, hash partitioning should calculate hashcode of key
> (i.e. *"aa"* in this case), so *all records should go to single partition*
> instead of uniform distribution?
>  2. In case of rdd2, there is no key value pair so how hash partitoning
> going to work i.e. *what is the key* to calculate hashcode?
>
>
>
> Best Regards,
>
>
> [image: InfoObjects Inc.] <http://www.infoobjects.com/>
> Vikash Pareek
> Team Lead  *InfoObjects Inc.*
> Big Data Analytics
>
> m: +91 8800206898 a: E5, Jhalana Institutionall Area, Jaipur, Rajasthan
> 302004
> w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com
>
>
>
>
> On Fri, Jun 23, 2017 at 10:38 PM, Vadim Semenov <
> vadim.seme...@datadoghq.com> wrote:
>
>> This is the code that chooses the partition for a key:
>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L85-L88
>>
>> it's basically `math.abs(key.hashCode % numberOfPartitions)`
>>
>> On Fri, Jun 23, 2017 at 3:42 AM, Vikash Pareek <
>> vikash.par...@infoobjects.com> wrote:
>>
>>> I am trying to understand how spark partitoing works.
>>>
>>> To understand this I have following piece of code on spark 1.6
>>>
>>> def countByPartition1(rdd: RDD[(String, Int)]) = {
>>> rdd.mapPartitions(iter => Iterator(iter.length))
>>> }
>>> def countByPartition2(rdd: RDD[String]) = {
>>> rdd.mapPartitions(iter => Iterator(iter.length))
>>> }
>>>
>>> //RDDs Creation
>>> val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1),
>>> ("aa",
>>> 1)), 8)
>>> countByPartition(rdd1).collect()
>>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>>
>>> val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
>>> countByPartition(rdd2).collect()
>>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>>
>>> In both the cases data is distributed uniformaly.
>>> I do have following questions on the basis of above observation:
>>>
>>>  1. In case of rdd1, hash partitioning should calculate hashcode of key
>>> (i.e. "aa" in this case), so all records should go to single partition
>>> instead of uniform distribution?
>>>  2. In case of rdd2, there is no key value pair so how hash partitoning
>>> going to work i.e. what is the key to calculate hashcode?
>>>
>>> I have followed @zero323 answer but not getting answer of these.
>>>
>>> https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work
>>>
>>>
>>>
>>>
>>> -
>>>
>>> __Vikash Pareek
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-does-HashPartitioner-distribute-data-in-Spark-tp28785.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: How does HashPartitioner distribute data in Spark?

2017-06-24 Thread Vikash Pareek
Hi Vadim,

Thank you for your response.

I would like to know how partitioner choose the key, If we look at my
example then following question arises:
1. In case of rdd1, hash partitioning should calculate hashcode of key
(i.e. *"aa"* in this case), so *all records should go to single partition*
instead of uniform distribution?
 2. In case of rdd2, there is no key value pair so how hash partitoning
going to work i.e. *what is the key* to calculate hashcode?



Best Regards,


[image: InfoObjects Inc.] <http://www.infoobjects.com/>
Vikash Pareek
Team Lead  *InfoObjects Inc.*
Big Data Analytics

m: +91 8800206898 a: E5, Jhalana Institutionall Area, Jaipur, Rajasthan
302004
w: www.linkedin.com/in/pvikash e: vikash.par...@infoobjects.com




On Fri, Jun 23, 2017 at 10:38 PM, Vadim Semenov <vadim.seme...@datadoghq.com
> wrote:

> This is the code that chooses the partition for a key: https://github.com/
> apache/spark/blob/master/core/src/main/scala/org/apache/
> spark/Partitioner.scala#L85-L88
>
> it's basically `math.abs(key.hashCode % numberOfPartitions)`
>
> On Fri, Jun 23, 2017 at 3:42 AM, Vikash Pareek <
> vikash.par...@infoobjects.com> wrote:
>
>> I am trying to understand how spark partitoing works.
>>
>> To understand this I have following piece of code on spark 1.6
>>
>> def countByPartition1(rdd: RDD[(String, Int)]) = {
>> rdd.mapPartitions(iter => Iterator(iter.length))
>> }
>> def countByPartition2(rdd: RDD[String]) = {
>> rdd.mapPartitions(iter => Iterator(iter.length))
>> }
>>
>> //RDDs Creation
>> val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1),
>> ("aa",
>> 1)), 8)
>> countByPartition(rdd1).collect()
>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>
>> val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
>> countByPartition(rdd2).collect()
>> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>>
>> In both the cases data is distributed uniformaly.
>> I do have following questions on the basis of above observation:
>>
>>  1. In case of rdd1, hash partitioning should calculate hashcode of key
>> (i.e. "aa" in this case), so all records should go to single partition
>> instead of uniform distribution?
>>  2. In case of rdd2, there is no key value pair so how hash partitoning
>> going to work i.e. what is the key to calculate hashcode?
>>
>> I have followed @zero323 answer but not getting answer of these.
>> https://stackoverflow.com/questions/31424396/how-does-hashpa
>> rtitioner-work
>>
>>
>>
>>
>> -
>>
>> __Vikash Pareek
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/How-does-HashPartitioner-distribute-
>> data-in-Spark-tp28785.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: How does HashPartitioner distribute data in Spark?

2017-06-23 Thread Vadim Semenov
This is the code that chooses the partition for a key:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala#L85-L88

it's basically `math.abs(key.hashCode % numberOfPartitions)`

On Fri, Jun 23, 2017 at 3:42 AM, Vikash Pareek <
vikash.par...@infoobjects.com> wrote:

> I am trying to understand how spark partitoing works.
>
> To understand this I have following piece of code on spark 1.6
>
> def countByPartition1(rdd: RDD[(String, Int)]) = {
> rdd.mapPartitions(iter => Iterator(iter.length))
> }
> def countByPartition2(rdd: RDD[String]) = {
> rdd.mapPartitions(iter => Iterator(iter.length))
> }
>
> //RDDs Creation
> val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1), ("aa",
> 1)), 8)
> countByPartition(rdd1).collect()
> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>
> val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
> countByPartition(rdd2).collect()
> >> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
>
> In both the cases data is distributed uniformaly.
> I do have following questions on the basis of above observation:
>
>  1. In case of rdd1, hash partitioning should calculate hashcode of key
> (i.e. "aa" in this case), so all records should go to single partition
> instead of uniform distribution?
>  2. In case of rdd2, there is no key value pair so how hash partitoning
> going to work i.e. what is the key to calculate hashcode?
>
> I have followed @zero323 answer but not getting answer of these.
> https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work
>
>
>
>
> -
>
> __Vikash Pareek
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-does-HashPartitioner-distribute-data-in-Spark-
> tp28785.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


How does HashPartitioner distribute data in Spark?

2017-06-23 Thread Vikash Pareek
I am trying to understand how spark partitoing works.

To understand this I have following piece of code on spark 1.6

def countByPartition1(rdd: RDD[(String, Int)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
def countByPartition2(rdd: RDD[String]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}

//RDDs Creation
val rdd1 = sc.parallelize(Array(("aa", 1), ("aa", 1), ("aa", 1), ("aa",
1)), 8)
countByPartition(rdd1).collect()
>> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)

val rdd2 = sc.parallelize(Array("aa", "aa", "aa", "aa"), 8)
countByPartition(rdd2).collect()
>> Array[Int] = Array(0, 1, 0, 1, 0, 1, 0, 1)
 
In both the cases data is distributed uniformaly.
I do have following questions on the basis of above observation:

 1. In case of rdd1, hash partitioning should calculate hashcode of key
(i.e. "aa" in this case), so all records should go to single partition
instead of uniform distribution?
 2. In case of rdd2, there is no key value pair so how hash partitoning
going to work i.e. what is the key to calculate hashcode?  

I have followed @zero323 answer but not getting answer of these.
https://stackoverflow.com/questions/31424396/how-does-hashpartitioner-work




-----

__Vikash Pareek
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-does-HashPartitioner-distribute-data-in-Spark-tp28785.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org