Re: possible bug spark/python/pyspark/rdd.py portable_hash()

2015-11-29 Thread Andy Davidson
Hi Felix and Ted

This is how I am starting spark

Should I file a bug?

Andy


export PYSPARK_PYTHON=python3.4

export PYSPARK_DRIVER_PYTHON=python3.4

export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"


$SPARK_ROOT/bin/pyspark \

--master $MASTER_URL \

--total-executor-cores $numCores \

--driver-memory 2G \

--executor-memory 2G \

$extraPkgs \

$*


From:  Felix Cheung 
Date:  Saturday, November 28, 2015 at 12:11 AM
To:  Ted Yu 
Cc:  Andrew Davidson , "user @spark"

Subject:  Re: possible bug spark/python/pyspark/rdd.py portable_hash()

>  
> Ah, it's there in spark-submit and pyspark.
> Seems like it should be added for spark_ec2
> 
> 
>  
> _
> From: Ted Yu 
> Sent: Friday, November 27, 2015 11:50 AM
> Subject: Re: possible bug spark/python/pyspark/rdd.py portable_hash()
> To: Felix Cheung 
> Cc: Andy Davidson , user @spark
> 
> 
> 
> 
>ec2/spark-ec2 calls ./ec2/spark_ec2.py
>
> 
>
>
> I don't see PYTHONHASHSEED defined in any of these scripts.
>
> 
>
>
> Andy reported this for ec2 cluster.
>
> 
>
>
> I think a JIRA should be opened.
>
> 
>
>   
>   
>
>
> On Fri, Nov 27, 2015 at 11:01 AM, Felix Cheung
>  wrote:
> 
>>   
>>   
>>May I ask how you are starting Spark?
>> It looks like PYTHONHASHSEED is being set:
>> https://github.com/apache/spark/search?utf8=%E2%9C%93=PYTHONHASHSEED
>>
>> 
>>
>> 
>> 
>> Date: Thu, 26 Nov 2015 11:30:09 -0800
>> Subject: possible bug spark/python/pyspark/rdd.py portable_hash()
>> From: a...@santacruzintegration.com
>> To: user@spark.apache.org
>> 
>> 
>>  I am using  spark-1.5.1-bin-hadoop2.6. I used
>> spark-1.5.1-bin-hadoop2.6/ec2/s park-ec2 to create a cluster
>> and configured spark-env to use python3. I get and exception '
>> Randomness of hash of string should be disabled via PYTHONHASHSEED¹.
>> Is there any reason rdd.py should not just set PYTHONHASHSEED ?
>> 
>>  
>> 
>> 
>>  Should I file a bug?
>> 
>>  
>> 
>> 
>>  Kind regards
>> 
>>  
>> 
>> 
>>  Andy   
>> 
>>  
>> 
>> 
>>  details
>> 
>>  
>> 
>> 
>>  
>> http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=subtrac
>> t#pyspark.RDD.subtract
>> 
>>  
>> 
>> 
>>  Example does not work out of the box
>> 
>>  
>> 
>> 
>>   Subtract(   other,
>> numPartitions=None)
>> > ct#pyspark.RDD.subtract>
>> 
>> Return each value in self that is not contained in other.
>>
>> 
>>  
> >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])>>> y =
> sc.parallelize([("a", 3), ("c", None)])>>>
> sorted(x.subtract(y).collect())[('a', 1), ('b', 4), ('b', 5)]
>> 
>>
>> 
>> It raises   
>>  
>> 
>> 
>>  
>> if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
>> raise Exception("Randomness of hash of string should be disabled via
>> PYTHONHASHSEED")
>> 
>> 
>>  
>> 
>> 
>>  
>> 
>> 
>>  The following script fixes the problem
>> 
>>  
>> 
>> 
>>  Sudo printf "\n# set PYTHONHASHSEED so python3 will not generate
>> Exception'Randomness of hash of string should be disabled via
>> PYTHONHASHSEED'\nexport PYTHONHASHSEED=123\n" >>
>> /root/spark/conf/spark-env.sh
>> 
>>  
>> 
>> 
>>  sudo pssh -i -h /root/spark-ec2/slaves cp
>> /root/spark/conf/spark-env.sh /root/spark/conf/spark-env.sh-`date
>> "+%Y-%m-%d:%H:%M"`
>> 
>>  
>> 
>> 
>>  Sudo for i in `cat slaves` ; do scp spark-env.sh
>> root@$i:/root/spark/conf/spark-env.sh; done
>> 
>>  
>> 
>> 
>>  
>> 
>> 
>>  
>> 
>>
>>
>>  
>>  
>
>
>   
>   
> 
>  




Multiplication on decimals in a dataframe query

2015-11-29 Thread Philip Dodds
I hit a weird issue when I tried to multiply to decimals in a select
(either in scala or as SQL), and Im assuming I must be missing the point.

The issue is fairly easy to recreate with something like the following:


val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.sql.types.Decimal

case class Trade(quantity: Decimal,price: Decimal)

val data = Seq.fill(100) {
  val price = Decimal(20+scala.util.Random.nextInt(10))
val quantity = Decimal(20+scala.util.Random.nextInt(10))

  Trade(quantity, price)
}

val trades = sc.parallelize(data).toDF()
trades.registerTempTable("trades")

trades.select(trades("price")*trades("quantity")).show

sqlContext.sql("select
price/quantity,price*quantity,price+quantity,price-quantity from
trades").show

The odd part is if you run it you will see that the addition/division and
subtraction works but the multiplication returns a null.

Tested on 1.5.1/1.5.2 (Scala 2.10 and 2.11)

ie.

+--+

|(price * quantity)|

+--+

|  null|

|  null|

|  null|

|  null|

|  null|

+--+


+++++

| _c0| _c1| _c2| _c3|

+++++

|0.952380952380952381|null|41.00...|-1.00...|

|1.380952380952380952|null|50.00...|8.00|

|1.272727272727272727|null|50.00...|6.00|

|0.83|null|44.00...|-4.00...|

|1.00|null|58.00...|   0E-18|

+++++


Just keen to know what I did wrong?


Cheers

P

-- 
Philip Dodds


Re: Debug Spark

2015-11-29 Thread Ndjido Ardo BAR
Spark Job server allows you to submit your apps to any kind of deployment
(Standalone, Cluster). I think that it could be suitable for your use case.
Check the following Github repo:
https://github.com/spark-jobserver/spark-jobserver

Ardo

On Sun, Nov 29, 2015 at 6:42 PM, Նարեկ Գալստեան 
wrote:

> A question regarding the topic,
>
> I am using Intellij to write spark applications and then have to ship the
> source code to my cluster on the cloud to compile and test
>
> is there a way to automatise the process using Intellij?
>
> Narek Galstyan
>
> Նարեկ Գալստյան
>
> On 29 November 2015 at 20:51, Ndjido Ardo BAR  wrote:
>
>> Masf, the following link sets the basics to start debugging your spark
>> apps in local mode:
>>
>>
>> https://medium.com/large-scale-data-processing/how-to-kick-start-spark-development-on-intellij-idea-in-4-steps-c7c8f5c2fe63#.675s86940
>>
>> Ardo
>>
>> On Sun, Nov 29, 2015 at 5:34 PM, Masf  wrote:
>>
>>> Hi Ardo
>>>
>>>
>>> Some tutorial to debug with Intellij?
>>>
>>> Thanks
>>>
>>> Regards.
>>> Miguel.
>>>
>>>
>>> On Sun, Nov 29, 2015 at 5:32 PM, Ndjido Ardo BAR 
>>> wrote:
>>>
 hi,

 IntelliJ is just great for that!

 cheers,
 Ardo.

 On Sun, Nov 29, 2015 at 5:18 PM, Masf  wrote:

> Hi
>
> Is it possible to debug spark locally with IntelliJ or another IDE?
>
> Thanks
>
> --
> Regards.
> Miguel Ángel
>


>>>
>>>
>>> --
>>>
>>>
>>> Saludos.
>>> Miguel Ángel
>>>
>>
>>
>


Re: How to work with a joined rdd in pyspark?

2015-11-29 Thread Gylfi
Hi. 

Your code is like this right? 
"/joined_dataset = show_channel.join(show_views) joined_dataset.take(4)/"

well /joined_dataset / is now an array (because you used /.take(4)/ ). 
So it does not support any RDD operations.. 

Could that be the problem? 
Otherwise more code is needed to understand what's going on. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-work-with-a-joined-rdd-in-pyspark-tp25510p25511.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark and simulated annealing

2015-11-29 Thread Gylfi
1) Start by looking at ML-lib or KeystoneML 

2) If you can't find an impl., start by analyzing the access patterns and
data manipulations you will need to implement. 

3) Then figure out if it fits Spark structures.. and when you realized it
doesn't you start speculating on how you can twist or strong-arm it to fit
:) 

4) You skipped 2) didn't you.. ok so do that now and go back to 3). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-simulated-annealing-tp25507p25513.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Debug Spark

2015-11-29 Thread Նարեկ Գալստեան
A question regarding the topic,

I am using Intellij to write spark applications and then have to ship the
source code to my cluster on the cloud to compile and test

is there a way to automatise the process using Intellij?

Narek Galstyan

Նարեկ Գալստյան

On 29 November 2015 at 20:51, Ndjido Ardo BAR  wrote:

> Masf, the following link sets the basics to start debugging your spark
> apps in local mode:
>
>
> https://medium.com/large-scale-data-processing/how-to-kick-start-spark-development-on-intellij-idea-in-4-steps-c7c8f5c2fe63#.675s86940
>
> Ardo
>
> On Sun, Nov 29, 2015 at 5:34 PM, Masf  wrote:
>
>> Hi Ardo
>>
>>
>> Some tutorial to debug with Intellij?
>>
>> Thanks
>>
>> Regards.
>> Miguel.
>>
>>
>> On Sun, Nov 29, 2015 at 5:32 PM, Ndjido Ardo BAR 
>> wrote:
>>
>>> hi,
>>>
>>> IntelliJ is just great for that!
>>>
>>> cheers,
>>> Ardo.
>>>
>>> On Sun, Nov 29, 2015 at 5:18 PM, Masf  wrote:
>>>
 Hi

 Is it possible to debug spark locally with IntelliJ or another IDE?

 Thanks

 --
 Regards.
 Miguel Ángel

>>>
>>>
>>
>>
>> --
>>
>>
>> Saludos.
>> Miguel Ángel
>>
>
>


Re: Debug Spark

2015-11-29 Thread Ndjido Ardo BAR
Masf, the following link sets the basics to start debugging your spark apps
in local mode:

https://medium.com/large-scale-data-processing/how-to-kick-start-spark-development-on-intellij-idea-in-4-steps-c7c8f5c2fe63#.675s86940

Ardo

On Sun, Nov 29, 2015 at 5:34 PM, Masf  wrote:

> Hi Ardo
>
>
> Some tutorial to debug with Intellij?
>
> Thanks
>
> Regards.
> Miguel.
>
>
> On Sun, Nov 29, 2015 at 5:32 PM, Ndjido Ardo BAR  wrote:
>
>> hi,
>>
>> IntelliJ is just great for that!
>>
>> cheers,
>> Ardo.
>>
>> On Sun, Nov 29, 2015 at 5:18 PM, Masf  wrote:
>>
>>> Hi
>>>
>>> Is it possible to debug spark locally with IntelliJ or another IDE?
>>>
>>> Thanks
>>>
>>> --
>>> Regards.
>>> Miguel Ángel
>>>
>>
>>
>
>
> --
>
>
> Saludos.
> Miguel Ángel
>


Re: How to work with a joined rdd in pyspark?

2015-11-29 Thread arnalone
Thanks for replying so fast!

it was not clear. 
my code is :
joined_dataset = show_channel.join(show_views)

for your knowledge, the first lines are
joined_dataset.take(4)

Out[93]:

[(u'PostModern_Cooking', (u'DEF', 1038)), (u'PostModern_Cooking', (u'DEF',
415)), (u'PostModern_Cooking', (u'DEF', 100)), (u'PostModern_Cooking',
(u'DEF', 597))]


I would like to sum views per show for channel = "ABC" 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-work-with-a-joined-rdd-in-pyspark-tp25510p25512.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Debug Spark

2015-11-29 Thread Danny Stephan
Hi,

You can use “jwdp" to debug everything that run on top of JVM including Spark.
 
Specific with IntelliJ,  maybe this link can help you:

http://danosipov.com/?p=779 

regards,
Danny


> Op 29 nov. 2015, om 17:34 heeft Masf  het volgende 
> geschreven:
> 
> Hi Ardo
> 
> 
> Some tutorial to debug with Intellij?
> 
> Thanks
> 
> Regards.
> Miguel.
> 
> 
> On Sun, Nov 29, 2015 at 5:32 PM, Ndjido Ardo BAR  > wrote:
> hi,
> 
> IntelliJ is just great for that!
> 
> cheers,
> Ardo.
> 
> On Sun, Nov 29, 2015 at 5:18 PM, Masf  > wrote:
> Hi
> 
> Is it possible to debug spark locally with IntelliJ or another IDE?
> 
> Thanks
> 
> -- 
> Regards.
> Miguel Ángel
> 
> 
> 
> 
> -- 
> 
> 
> Saludos.
> Miguel Ángel



Re: possible bug spark/python/pyspark/rdd.py portable_hash()

2015-11-29 Thread Ted Yu
I think you should file a bug. 

> On Nov 29, 2015, at 9:48 AM, Andy Davidson  
> wrote:
> 
> Hi Felix and Ted
> 
> This is how I am starting spark
> 
> Should I file a bug?
> 
> Andy
> 
> 
> export PYSPARK_PYTHON=python3.4
> export PYSPARK_DRIVER_PYTHON=python3.4
> export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"  
> 
> $SPARK_ROOT/bin/pyspark \
> --master $MASTER_URL \
> --total-executor-cores $numCores \
> --driver-memory 2G \
> --executor-memory 2G \
> $extraPkgs \
> $*
> 
> From: Felix Cheung 
> Date: Saturday, November 28, 2015 at 12:11 AM
> To: Ted Yu 
> Cc: Andrew Davidson , "user @spark" 
> 
> Subject: Re: possible bug spark/python/pyspark/rdd.py portable_hash()
> 
> Ah, it's there in spark-submit and pyspark.
> Seems like it should be added for spark_ec2
> 
> 
> _
> From: Ted Yu 
> Sent: Friday, November 27, 2015 11:50 AM
> Subject: Re: possible bug spark/python/pyspark/rdd.py portable_hash()
> To: Felix Cheung 
> Cc: Andy Davidson , user @spark 
> 
> 
> 
> ec2/spark-ec2 calls ./ec2/spark_ec2.py 
> 
> I don't see PYTHONHASHSEED defined in any of these scripts.
> 
> Andy reported this for ec2 cluster.
> 
> I think a JIRA should be opened.
> 
> 
>> On Fri, Nov 27, 2015 at 11:01 AM, Felix Cheung  
>> wrote: 
>> May I ask how you are starting Spark? 
>> It looks like PYTHONHASHSEED is being set: 
>> https://github.com/apache/spark/search?utf8=%E2%9C%93=PYTHONHASHSEED 
>> 
>>   
>> Date: Thu, 26 Nov 2015 11:30:09 -0800 
>> Subject: possible bug spark/python/pyspark/rdd.py portable_hash() 
>> From: a...@santacruzintegration.com 
>> To: user@spark.apache.org 
>> 
>> I am using  spark-1.5.1-bin-hadoop2.6. I used  
>> spark-1.5.1-bin-hadoop2.6/ec2/s park-ec2 to create a cluster  and configured 
>> spark-env to use python3. I get and exception '  Randomness of hash of 
>> string should be disabled via PYTHONHASHSEED’.   Is there any reason rdd.py 
>> should not just set PYTHONHASHSEED ?
>> 
>>  Should I file a bug?
>> 
>> Kind regards
>> 
>> Andy
>> 
>> details
>> 
>> http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=subtract#pyspark.RDD.subtract
>> 
>> Example does not work out of the box
>> 
>> Subtract( other,  numPartitions=None) 
>> Return each value in self that is not contained in other.
>> 
>> >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])>>> y = 
>> >>> sc.parallelize([("a", 3), ("c", None)])>>> 
>> >>> sorted(x.subtract(y).collect())[('a', 1), ('b', 4), ('b', 5)]
>> It raises 
>> 
>> if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
>> raise Exception("Randomness of hash of string should be disabled via 
>> PYTHONHASHSEED")
>> 
>> 
>> The following script fixes the problem 
>> 
>>  Sudo printf "\n# set PYTHONHASHSEED so python3 will not generate 
>> Exception'Randomness of hash of string should be disabled via 
>> PYTHONHASHSEED'\nexport PYTHONHASHSEED=123\n" >> 
>> /root/spark/conf/spark-env.sh 
>> 
>>  sudo pssh -i -h /root/spark-ec2/slaves cp /root/spark/conf/spark-env.sh 
>> /root/spark/conf/spark-env.sh-`date "+%Y-%m-%d:%H:%M"` 
>> 
>>  Sudo for i in `cat slaves` ; do scp spark-env.sh 
>> root@$i:/root/spark/conf/spark-env.sh; done
> 
> 
> 


Re: General question on using StringIndexer in SparkML

2015-11-29 Thread Vishnu Viswanath
Thanks for the reply Yanbo.

I understand that the model will be trained using the indexer map created
during the training stage.

But since I am getting a new set of data during prediction, and I have to
do StringIndexing on the new data also,
Right now I am using a new StringIndexer for this purpose, or is there any
way that I can reuse the Indexer used for training stage.

Note: I am having a pipeline with StringIndexer in it, and I am fitting my
train data in it and building the model. Then later when i get the new data
for prediction, I am using the same pipeline to fit the data again and do
the prediction.

Thanks and Regards,
Vishnu Viswanath


On Sun, Nov 29, 2015 at 8:14 AM, Yanbo Liang  wrote:

> Hi Vishnu,
>
> The string and indexer map is generated at model training step and
> used at model prediction step.
> It means that the string and indexer map will not changed when
> prediction. You will use the original trained model when you do
> prediction.
>
> 2015-11-29 4:33 GMT+08:00 Vishnu Viswanath :
> > Hi All,
> >
> > I have a general question on using StringIndexer.
> > StringIndexer gives an index to each label in the feature starting from
> 0 (
> > 0 for least frequent word).
> >
> > Suppose I am building a model, and I use StringIndexer for transforming
> on
> > of my column.
> > e.g., suppose A was most frequent word followed by B and C.
> >
> > So the StringIndexer will generate
> >
> > A  0.0
> > B  1.0
> > C  2.0
> >
> > After building the model, I am going to do some prediction using this
> model,
> > So I do the same transformation on my new data which I need to predict.
> And
> > suppose the new dataset has C as the most frequent word, followed by B
> and
> > A. So the StringIndexer will assign index as
> >
> > C 0.0
> > B 1.0
> > A 2.0
> >
> > These indexes are different from what we used for modeling. So won’t this
> > give me a wrong prediction if I use StringIndexer?
> >
> > --
> > Thanks and Regards,
> > Vishnu Viswanath,
> > www.vishnuviswanath.com
>



-- 
Thanks and Regards,
Vishnu Viswanath,
*www.vishnuviswanath.com *


RE: possible bug spark/python/pyspark/rdd.py portable_hash()

2015-11-29 Thread Felix Cheung
Actually upon closer look PYTHONHASHSEED should be set (in worker) when your 
create a SparkContext
 
https://github.com/apache/spark/blob/master/python/pyspark/context.py#L166
 
And it should also be set from spark-submit or pyspark.
 
Can you check sys.version and os.environ.get("PYTHONHASHSEED")?
 
Date: Sun, 29 Nov 2015 09:48:19 -0800
Subject: Re: possible bug spark/python/pyspark/rdd.py portable_hash()
From: a...@santacruzintegration.com
To: felixcheun...@hotmail.com; yuzhih...@gmail.com
CC: user@spark.apache.org

Hi Felix and Ted
This is how I am starting spark
Should I file a bug?
Andy

export PYSPARK_PYTHON=python3.4
export PYSPARK_DRIVER_PYTHON=python3.4
export IPYTHON_OPTS="notebook --no-browser --port=7000 --log-level=WARN"  
$SPARK_ROOT/bin/pyspark \
--master $MASTER_URL \
--total-executor-cores $numCores \
--driver-memory 2G \
--executor-memory 2G \
$extraPkgs \
$*
From:  Felix Cheung 
Date:  Saturday, November 28, 2015 at 12:11 AM
To:  Ted Yu 
Cc:  Andrew Davidson , "user @spark" 

Subject:  Re: possible bug spark/python/pyspark/rdd.py portable_hash()


Ah, it's there in spark-submit and pyspark.Seems like it should be added 
for spark_ec2



_
From: Ted Yu 
Sent: Friday, November 27, 2015 11:50 AM
Subject: Re: possible bug spark/python/pyspark/rdd.py portable_hash()
To: Felix Cheung 
Cc: Andy Davidson , user @spark 



   ec2/spark-ec2 calls ./ec2/spark_ec2.py   
   
  I don't see PYTHONHASHSEED defined in any of these scripts.  
  Andy reported this for ec2 cluster.  
  I think a JIRA should be opened.  
  
   On Fri, Nov 27, 2015 at 11:01 AM, Felix Cheung 
 wrote:
   May I ask how you are starting Spark?   
It looks like PYTHONHASHSEED is being set:
https://github.com/apache/spark/search?utf8=%E2%9C%93=PYTHONHASHSEED   
   

   Date: Thu, 26 Nov 2015 11:30:09 -0800
Subject: possible bug spark/python/pyspark/rdd.py portable_hash()
From: a...@santacruzintegration.com
To: user@spark.apache.org

 I am using  spark-1.5.1-bin-hadoop2.6. I used  
spark-1.5.1-bin-hadoop2.6/ec2/s park-ec2 to create a cluster  
and configured spark-env to use python3. I get and exception ' 
Randomness of hash of string should be disabled via PYTHONHASHSEED’.  
Is there any reason rdd.py should not just set PYTHONHASHSEED ? 

 Should I file a bug? 
 Kind regards 
 Andy 
 details 
 
http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=subtract#pyspark.RDD.subtract
 
 Example does not work out of the box   
  
  Subtract(   other,
numPartitions=None)  Return each 
value in self that is not contained in other.   
 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])>>> y = 
sc.parallelize([("a", 3), ("c", None)])>>> 
sorted(x.subtract(y).collect())[('a', 1), ('b', 4), ('b', 5)]   
It raises  
 if sys.version >= '3.3' and 'PYTHONHASHSEED' not 
in os.environ:raise Exception("Randomness of hash of string should be 
disabled via PYTHONHASHSEED") 
 
 The following script fixes the problem 
 
 Sudo printf "\n# set PYTHONHASHSEED so python3 will 
not generate Exception'Randomness of hash of string should be disabled via 
PYTHONHASHSEED'\nexport PYTHONHASHSEED=123\n" >> /root/spark/conf/spark-env.sh  

 sudo pssh -i -h /root/spark-ec2/slaves cp 
/root/spark/conf/spark-env.sh /root/spark/conf/spark-env.sh-`date 
"+%Y-%m-%d:%H:%M"`  
 Sudo for i in `cat slaves` ; do scp spark-env.sh 
root@$i:/root/spark/conf/spark-env.sh; done 
 
 
  



  

Re: Retrieve best parameters from CrossValidator

2015-11-29 Thread Yanbo Liang
Hi Ben,

We can get the best model from CrossValidatorModel.BestModel, further more
we can use the write function of CrossValidatorModel

to implement model persistent and use the best model at other place (after
1.6 release). So I think to expose the best model parameters as public API
is not very necessary.

2015-11-29 7:36 GMT+08:00 BenFradet :

> Hi everyone,
>
> Is there a better way to retrieve the best model parameters obtained from
> cross validation than inspecting the logs issued while calling the fit
> method (with the call here:
>
> https://github.com/apache/spark/blob/branch-1.5/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L106
> )?
>
> Wouldn't it be useful to expose this to the end user through the
> crossValidatorModel?
>
> Thanks for your response.
>
> Best,
> Ben.
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-best-parameters-from-CrossValidator-tp25508.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Retrieve best parameters from CrossValidator

2015-11-29 Thread Benjamin Fradet
Hi Yanbo,

Thanks for your answer, I'm looking forward to 1.6 then.

On Sun, Nov 29, 2015 at 3:44 PM, Yanbo Liang  wrote:

> Hi Ben,
>
> We can get the best model from CrossValidatorModel.BestModel, further
> more we can use the write function of CrossValidatorModel
> 
> to implement model persistent and use the best model at other place (after
> 1.6 release). So I think to expose the best model parameters as public API
> is not very necessary.
>
> 2015-11-29 7:36 GMT+08:00 BenFradet :
>
>> Hi everyone,
>>
>> Is there a better way to retrieve the best model parameters obtained from
>> cross validation than inspecting the logs issued while calling the fit
>> method (with the call here:
>>
>> https://github.com/apache/spark/blob/branch-1.5/mllib/src/main/scala/org/apache/spark/ml/tuning/CrossValidator.scala#L106
>> )?
>>
>> Wouldn't it be useful to expose this to the end user through the
>> crossValidatorModel?
>>
>> Thanks for your response.
>>
>> Best,
>> Ben.
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Retrieve-best-parameters-from-CrossValidator-tp25508.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
Ben Fradet.


Re: Spark Streaming on mesos

2015-11-29 Thread Timothy Chen
Hi Renjie,

You can set number of cores per executor with spark executor cores in fine 
grain mode.

If you want coarse grain mode to support that it will
Be supported in the near term as he coarse grain scheduler is getting revamped 
now.

Tim

> On Nov 28, 2015, at 7:31 PM, Renjie Liu  wrote:
> 
> Hi, Nagaraj:
>  Thanks for the response, but this does not solve my problem. 
> I think executor memory should be proportional to number of cores, or number 
> of core 
> in each executor should be the same. 
>> On Sat, Nov 28, 2015 at 1:48 AM Nagaraj Chandrashekar 
>>  wrote:
>> Hi Renjie, 
>> 
>> I have not setup Spark Streaming on Mesos but there is something called 
>> reservations in Mesos.  It supports both Static and Dynamic reservations.  
>> Both types of reservations must have role defined. You may want to explore 
>> these options.   Excerpts from the Apache Mesos documentation. 
>> 
>> Cheers
>> Nagaraj C
>> Reservation
>> Mesos provides mechanisms to reserve resources in specific slaves. The 
>> concept was first introduced with static reservation in 0.14.0 which enabled 
>> operators to specify the reserved resources on slave startup. This was 
>> extended with dynamic reservation in 0.23.0 which enabled operators and 
>> authorized frameworks to dynamically reserve resources in the cluster.
>> 
>> No breaking changes were introduced with dynamic reservation, which means 
>> the existing static reservation mechanism continues to be fully supported.
>> 
>> In both types of reservations, resources are reserved for a role.
>> 
>> Static Reservation (since 0.14.0)
>> An operator can configure a slave with resources reserved for a role. The 
>> reserved resources are specified via the --resources flag. For example, 
>> suppose we have 12 CPUs and 6144 MB of RAM available on a slave and that we 
>> want to reserve 8 CPUs and 4096 MB of RAM for the ads role. We start the 
>> slave like so:
>> 
>> $ mesos-slave \
>>   --master=: \
>>   --resources="cpus:4;mem:2048;cpus(ads):8;mem(ads):4096"
>> We now have 8 CPUs and 4096 MB of RAM reserved for ads on this slave.
>> 
>> 
>> 
>> From: Renjie Liu 
>> Date: Friday, November 27, 2015 at 9:57 PM
>> To: "user@spark.apache.org" 
>> Subject: Spark Streaming on mesos
>> 
>> Hi, all:
>> I'm trying to run spark streaming on mesos and it seems that none of the 
>> scheduler is suitable for that. Fine grain scheduler will start an executor 
>> for each task so it will significantly increase the latency. While coarse 
>> grained mode can only set the max core numbers and executor memory but 
>> there's no way to set the number of cores for each executor. Has anyone 
>> deployed spark streaming on mesos? And what's your settings?
>> -- 
>> Liu, Renjie
>> Software Engineer, MVAD
> 
> -- 
> Liu, Renjie
> Software Engineer, MVAD


Re: Help with Couchbase connector error

2015-11-29 Thread Eyal Sharon
Thanks guys , that was very helpful



On Thu, Nov 26, 2015 at 10:29 PM, Shixiong Zhu  wrote:

> Het Eyal, I just checked the couchbase spark connector jar. The target
> version of some of classes are Java 8 (52.0). You can create a ticket in
> https://issues.couchbase.com/projects/SPARKC
>
> Best Regards,
> Shixiong Zhu
>
> 2015-11-26 9:03 GMT-08:00 Ted Yu :
>
>> StoreMode is from Couchbase connector.
>>
>> Where did you obtain the connector ?
>>
>> See also
>> http://stackoverflow.com/questions/1096148/how-to-check-the-jdk-version-used-to-compile-a-class-file
>>
>> On Thu, Nov 26, 2015 at 8:55 AM, Eyal Sharon  wrote:
>>
>>> Hi ,
>>> Great , that gave some directions. But can you elaborate more?  or share
>>> some post
>>> I am currently running JDK 7 , and  my Couchbase too
>>>
>>> Thanks !
>>>
>>> On Thu, Nov 26, 2015 at 6:02 PM, Ted Yu  wrote:
>>>
 This implies version mismatch between the JDK used to build your jar
 and the one at runtime.

 When building, target JDK 1.7

 There're plenty of posts on the web for dealing with such error.

 Cheers

 On Thu, Nov 26, 2015 at 7:31 AM, Eyal Sharon  wrote:

> Hi,
>
> I am trying to set a connection to Couchbase. I am at the very
> beginning, and I got stuck on   this exception
>
> Exception in thread "main" java.lang.UnsupportedClassVersionError:
> com/couchbase/spark/StoreMode : Unsupported major.minor version 52.0
>
>
> Here is the simple code fragment
>
>   val sc = new SparkContext(cfg)
>
>   val doc1 = JsonDocument.create("doc1", JsonObject.create().put("some", 
> "content"))
>   val doc2 = JsonArrayDocument.create("doc2", JsonArray.from("more", 
> "content", "in", "here"))
>
>
>   val data = sc
> .parallelize(Seq(doc1, doc2))
> .saveToCouchbase()
> }
>
>
> Any help will be a bless
>
>
> Thanks!
>
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. Please note that any disclosure, copying or distribution of the
> content of this information is strictly forbidden. If you have received
> this email message in error, please destroy it immediately and notify its
> sender.*
>


>>>
>>> *This email and any files transmitted with it are confidential and
>>> intended solely for the use of the individual or entity to whom they are
>>> addressed. Please note that any disclosure, copying or distribution of the
>>> content of this information is strictly forbidden. If you have received
>>> this email message in error, please destroy it immediately and notify its
>>> sender.*
>>>
>>
>>
>

-- 


*This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are 
addressed. Please note that any disclosure, copying or distribution of the 
content of this information is strictly forbidden. If you have received 
this email message in error, please destroy it immediately and notify its 
sender.*


Re: storing result of aggregation of spark streaming

2015-11-29 Thread Ted Yu
There is hbase connector:
 https://github.com/nerdammer/spark-hbase-connector

In the upcoming hbase 2.0 release, hbase-spark module provides support for
Spark directly.

Cheers

On Sat, Nov 28, 2015 at 10:25 PM, Michael Spector 
wrote:

> Hi Amir,
>
> You can store results of stream transformation in Cassandra using:
> https://github.com/datastax/spark-cassandra-connector
>
> Regards,
> Michael
>
> On Sun, Nov 29, 2015 at 1:41 AM, Amir Rahnama 
> wrote:
>
>> Hi,
>>
>> I am gonna store the results of my stream job into a db, which one of
>> databases has the native support (if any)?
>>
>> --
>> Thanks and Regards,
>>
>> Amir Hossein Rahnama
>>
>> *Tel: +46 (0) 761 681 102*
>> Website: www.ambodi.com
>> Twitter: @_ambodi 
>>
>
>


Debug Spark

2015-11-29 Thread Masf
Hi

Is it possible to debug spark locally with IntelliJ or another IDE?

Thanks

-- 
Regards.
Miguel Ángel


Re: Debug Spark

2015-11-29 Thread Ndjido Ardo BAR
hi,

IntelliJ is just great for that!

cheers,
Ardo.

On Sun, Nov 29, 2015 at 5:18 PM, Masf  wrote:

> Hi
>
> Is it possible to debug spark locally with IntelliJ or another IDE?
>
> Thanks
>
> --
> Regards.
> Miguel Ángel
>


Re: Debug Spark

2015-11-29 Thread Masf
Hi Ardo


Some tutorial to debug with Intellij?

Thanks

Regards.
Miguel.


On Sun, Nov 29, 2015 at 5:32 PM, Ndjido Ardo BAR  wrote:

> hi,
>
> IntelliJ is just great for that!
>
> cheers,
> Ardo.
>
> On Sun, Nov 29, 2015 at 5:18 PM, Masf  wrote:
>
>> Hi
>>
>> Is it possible to debug spark locally with IntelliJ or another IDE?
>>
>> Thanks
>>
>> --
>> Regards.
>> Miguel Ángel
>>
>
>


-- 


Saludos.
Miguel Ángel


Re: General question on using StringIndexer in SparkML

2015-11-29 Thread Jeff Zhang
StringIndexer is an estimator which would train a model to be used both in
training & prediction. So it is consistent between training & prediction.

You may want to read this section of spark ml doc
http://spark.apache.org/docs/latest/ml-guide.html#how-it-works



On Mon, Nov 30, 2015 at 12:52 AM, Vishnu Viswanath <
vishnu.viswanat...@gmail.com> wrote:

> Thanks for the reply Yanbo.
>
> I understand that the model will be trained using the indexer map created
> during the training stage.
>
> But since I am getting a new set of data during prediction, and I have to
> do StringIndexing on the new data also,
> Right now I am using a new StringIndexer for this purpose, or is there any
> way that I can reuse the Indexer used for training stage.
>
> Note: I am having a pipeline with StringIndexer in it, and I am fitting my
> train data in it and building the model. Then later when i get the new data
> for prediction, I am using the same pipeline to fit the data again and do
> the prediction.
>
> Thanks and Regards,
> Vishnu Viswanath
>
>
> On Sun, Nov 29, 2015 at 8:14 AM, Yanbo Liang  wrote:
>
>> Hi Vishnu,
>>
>> The string and indexer map is generated at model training step and
>> used at model prediction step.
>> It means that the string and indexer map will not changed when
>> prediction. You will use the original trained model when you do
>> prediction.
>>
>> 2015-11-29 4:33 GMT+08:00 Vishnu Viswanath > >:
>> > Hi All,
>> >
>> > I have a general question on using StringIndexer.
>> > StringIndexer gives an index to each label in the feature starting from
>> 0 (
>> > 0 for least frequent word).
>> >
>> > Suppose I am building a model, and I use StringIndexer for transforming
>> on
>> > of my column.
>> > e.g., suppose A was most frequent word followed by B and C.
>> >
>> > So the StringIndexer will generate
>> >
>> > A  0.0
>> > B  1.0
>> > C  2.0
>> >
>> > After building the model, I am going to do some prediction using this
>> model,
>> > So I do the same transformation on my new data which I need to predict.
>> And
>> > suppose the new dataset has C as the most frequent word, followed by B
>> and
>> > A. So the StringIndexer will assign index as
>> >
>> > C 0.0
>> > B 1.0
>> > A 2.0
>> >
>> > These indexes are different from what we used for modeling. So won’t
>> this
>> > give me a wrong prediction if I use StringIndexer?
>> >
>> > --
>> > Thanks and Regards,
>> > Vishnu Viswanath,
>> > www.vishnuviswanath.com
>>
>
>
>
> --
> Thanks and Regards,
> Vishnu Viswanath,
> *www.vishnuviswanath.com *
>



-- 
Best Regards

Jeff Zhang


Re: Spark Streaming on mesos

2015-11-29 Thread Renjie Liu
Hi, Tim:
Fine grain mode is not suitable for streaming applications since it need to
start up an executor each time. When will the revamp get release? In the
coming 1.6.0?

On Sun, Nov 29, 2015 at 6:16 PM Timothy Chen  wrote:

> Hi Renjie,
>
> You can set number of cores per executor with spark executor cores in fine
> grain mode.
>
> If you want coarse grain mode to support that it will
> Be supported in the near term as he coarse grain scheduler is getting
> revamped now.
>
> Tim
>
> On Nov 28, 2015, at 7:31 PM, Renjie Liu  wrote:
>
> Hi, Nagaraj:
>  Thanks for the response, but this does not solve my problem.
> I think executor memory should be proportional to number of cores, or
> number of core
> in each executor should be the same.
> On Sat, Nov 28, 2015 at 1:48 AM Nagaraj Chandrashekar <
> nchandrashe...@innominds.com> wrote:
>
>> Hi Renjie,
>>
>> I have not setup Spark Streaming on Mesos but there is something called
>> reservations in Mesos.  It supports both Static and Dynamic reservations.
>> Both types of reservations must have role defined. You may want to explore
>> these options.   Excerpts from the Apache Mesos documentation.
>>
>> Cheers
>> Nagaraj C
>> Reservation
>>
>> Mesos provides mechanisms to reserve resources in specific slaves. The
>> concept was first introduced with static reservation in 0.14.0 which
>> enabled operators to specify the reserved resources on slave startup. This
>> was extended with dynamic reservation in 0.23.0 which enabled operators
>> and authorized frameworks to dynamically reserve resources in the
>> cluster.
>>
>> No breaking changes were introduced with dynamic reservation, which means
>> the existing static reservation mechanism continues to be fully supported.
>>
>> In both types of reservations, resources are reserved for a role.
>> Static Reservation (since 0.14.0)
>>
>> An operator can configure a slave with resources reserved for a role. The
>> reserved resources are specified via the --resources flag. For example,
>> suppose we have 12 CPUs and 6144 MB of RAM available on a slave and that we
>> want to reserve 8 CPUs and 4096 MB of RAM for the ads role. We start the
>> slave like so:
>>
>> $ mesos-slave \
>>   --master=: \
>>   --resources="cpus:4;mem:2048;cpus(ads):8;mem(ads):4096"
>>
>> We now have 8 CPUs and 4096 MB of RAM reserved for ads on this slave.
>>
>>
>> From: Renjie Liu 
>> Date: Friday, November 27, 2015 at 9:57 PM
>> To: "user@spark.apache.org" 
>> Subject: Spark Streaming on mesos
>>
>> Hi, all:
>> I'm trying to run spark streaming on mesos and it seems that none of the
>> scheduler is suitable for that. Fine grain scheduler will start an executor
>> for each task so it will significantly increase the latency. While coarse
>> grained mode can only set the max core numbers and executor memory but
>> there's no way to set the number of cores for each executor. Has anyone
>> deployed spark streaming on mesos? And what's your settings?
>> --
>> Liu, Renjie
>> Software Engineer, MVAD
>>
> --
> Liu, Renjie
> Software Engineer, MVAD
>
> --
Liu, Renjie
Software Engineer, MVAD


Re: How to work with a joined rdd in pyspark?

2015-11-29 Thread Gylfi
Hi. 

Can't you do a filter, to get only the ABC shows, map that into a keyed
instance of the show, 
and then do a reduceByKey to sum up the views? 

Something like this in Scala code:  /filter for the channel new pair
(show, view count) /
val myAnswer = joined_dataset.filter( _._2._1 == "ABC" ).map( (_._1,
_._2._2)
  .reduceByKey( (a,b) => a + b ) 

This should give you an RDD of one record per show and the summed view count
but only for shows on ABC right? 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-work-with-a-joined-rdd-in-pyspark-tp25510p25514.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to work with a joined rdd in pyspark?

2015-11-29 Thread arnalone
Yes that 's what I am trying to do, but I do not manage to "point" on the
channel field to filter on "ABC" and then in the map step to get only shows
and views.
In scala you do it with (_._2._1 == "ABC") and (_._1, _._2._2), but I don't
find the right syntax in python to do the same  :(



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-work-with-a-joined-rdd-in-pyspark-tp25510p25516.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: General question on using StringIndexer in SparkML

2015-11-29 Thread Vishnu Viswanath
Thank you Jeff.

On Sun, Nov 29, 2015 at 7:36 PM, Jeff Zhang  wrote:

> StringIndexer is an estimator which would train a model to be used both in
> training & prediction. So it is consistent between training & prediction.
>
> You may want to read this section of spark ml doc
> http://spark.apache.org/docs/latest/ml-guide.html#how-it-works
>
>
>
> On Mon, Nov 30, 2015 at 12:52 AM, Vishnu Viswanath <
> vishnu.viswanat...@gmail.com> wrote:
>
>> Thanks for the reply Yanbo.
>>
>> I understand that the model will be trained using the indexer map created
>> during the training stage.
>>
>> But since I am getting a new set of data during prediction, and I have to
>> do StringIndexing on the new data also,
>> Right now I am using a new StringIndexer for this purpose, or is there
>> any way that I can reuse the Indexer used for training stage.
>>
>> Note: I am having a pipeline with StringIndexer in it, and I am fitting
>> my train data in it and building the model. Then later when i get the new
>> data for prediction, I am using the same pipeline to fit the data again and
>> do the prediction.
>>
>> Thanks and Regards,
>> Vishnu Viswanath
>>
>>
>> On Sun, Nov 29, 2015 at 8:14 AM, Yanbo Liang  wrote:
>>
>>> Hi Vishnu,
>>>
>>> The string and indexer map is generated at model training step and
>>> used at model prediction step.
>>> It means that the string and indexer map will not changed when
>>> prediction. You will use the original trained model when you do
>>> prediction.
>>>
>>> 2015-11-29 4:33 GMT+08:00 Vishnu Viswanath >> >:
>>> > Hi All,
>>> >
>>> > I have a general question on using StringIndexer.
>>> > StringIndexer gives an index to each label in the feature starting
>>> from 0 (
>>> > 0 for least frequent word).
>>> >
>>> > Suppose I am building a model, and I use StringIndexer for
>>> transforming on
>>> > of my column.
>>> > e.g., suppose A was most frequent word followed by B and C.
>>> >
>>> > So the StringIndexer will generate
>>> >
>>> > A  0.0
>>> > B  1.0
>>> > C  2.0
>>> >
>>> > After building the model, I am going to do some prediction using this
>>> model,
>>> > So I do the same transformation on my new data which I need to
>>> predict. And
>>> > suppose the new dataset has C as the most frequent word, followed by B
>>> and
>>> > A. So the StringIndexer will assign index as
>>> >
>>> > C 0.0
>>> > B 1.0
>>> > A 2.0
>>> >
>>> > These indexes are different from what we used for modeling. So won’t
>>> this
>>> > give me a wrong prediction if I use StringIndexer?
>>> >
>>> > --
>>> > Thanks and Regards,
>>> > Vishnu Viswanath,
>>> > www.vishnuviswanath.com
>>>
>>
>>
>>
>> --
>> Thanks and Regards,
>> Vishnu Viswanath,
>> *www.vishnuviswanath.com *
>>
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: DateTime Support - Hive Parquet

2015-11-29 Thread Cheng Lian
Oh sorry, you're right. Implicit conversion doesn't affect the schema 
inference process.


Just checked that Joda is already a direct dependency of Spark. So I 
think it's probably fine to add support for recognizing Joda DateTime as 
SQL TimestampType. Would you mind to file a JIRA ticket for it? PR is 
also welcomed!


Cheng

On 11/24/15 8:05 PM, Bryan wrote:


Cheng,

I am using Scala. I have an implicit conversion from Joda DateTime to 
timestamp. My tables are defined with Timestamp. However explicit 
conversation appears to be required. Do you have an example of 
implicit conversion for this case? Do you convert on insert or on RDD 
to DF conversion?


Regards,

Bryan Jeffrey

Sent from Outlook Mail


*From: *Cheng Lian
*Sent: *Tuesday, November 24, 2015 6:49 AM
*To: *Bryan;user
*Subject: *Re: DateTime Support - Hive Parquet

I see, then this is actually irrelevant to Parquet. I guess can 
support Joda DateTime in Spark SQL reflective schema inference to have 
this, provided that this is a frequent use case and Spark SQL already 
has Joda as a direct dependency.


On the other hand, if you are using Scala, you can write a simple 
implicit conversion method to avoid all the manual conversions.


Cheng

On 11/24/15 7:25 PM, Bryan wrote:

Cheng,

That’s exactly what I was hoping for – native support for writing
DateTime objects. As it stands Spark 1.5.2 seems to leave no
option but to do manual conversion (to nanos, Timestamp, etc)
prior to writing records to hive.

Regards,

Bryan Jeffrey

Sent from Outlook Mail


*From: *Cheng Lian
*Sent: *Tuesday, November 24, 2015 1:42 AM
*To: *Bryan Jeffrey;user
*Subject: *Re: DateTime Support - Hive Parquet

Hey Bryan,

What do you mean by "DateTime properties"? Hive and Spark SQL both

support DATE and TIMESTAMP types, but there's no DATETIME type. So I

assume you are referring to Java class DateTime (possibly the one in

joda)? Could you please provide a sample snippet that illustrates
your

requirement?

Cheng

On 11/23/15 9:40 PM, Bryan Jeffrey wrote:

> All,

>

> I am attempting to write objects that include a DateTime
properties to

> a persistent table using Spark 1.5.2 / HiveContext.  In 1.4.1 I was

> forced to convert the DateTime properties to Timestamp
properties.  I

> was under the impression that this issue was fixed in the
default Hive

> supported with 1.5.2 - however, I am still seeing the associated
errors.

>

> Is there a bug I can follow to determine when DateTime will be

> supported for Parquet?

>

> Regards,

>

> Bryan Jeffrey





Re: Parquet files not getting coalesced to smaller number of files

2015-11-29 Thread Cheng Lian
RDD.coalesce(n) returns a new RDD rather than modifying the original 
RDD. So what you need is:


metricsToBeSaved.coalesce(1500).saveAsNewAPIHadoopFile(...)

Cheng

On 11/29/15 12:21 PM, SRK wrote:

Hi,

I have the following code that saves the parquet files in my hourly batch to
hdfs. My idea is to coalesce the files to 1500 smaller files. The first run
it gives me 1500 files in hdfs. For the next runs the files seem to be
increasing even though I coalesce.

  Its not getting coalesced to 1500 files as I want. I also have an example
that I am using in the end. Please let me know if there is a different and
more efficient way of doing this.


 val job = Job.getInstance()

 var filePath = "path"


 val metricsPath: Path = new Path(filePath)

 //Check if inputFile exists
 val fs: FileSystem = FileSystem.get(job.getConfiguration)

 if (fs.exists(metricsPath)) {
   fs.delete(metricsPath, true)
 }


 // Configure the ParquetOutputFormat to use Avro as the
serialization format
 ParquetOutputFormat.setWriteSupportClass(job,
classOf[AvroWriteSupport])
 // You need to pass the schema to AvroParquet when you are writing
objects but not when you
 // are reading them. The schema is saved in Parquet file for future
readers to use.
 AvroParquetOutputFormat.setSchema(job, Metrics.SCHEMA$)


 // Create a PairRDD with all keys set to null and wrap each Metrics
in serializable objects
 val metricsToBeSaved = metrics.map(metricRecord => (null, new
SerializableMetrics(new Metrics(metricRecord._1, metricRecord._2._1,
metricRecord._2._2;

 metricsToBeSaved.coalesce(1500)
 // Save the RDD to a Parquet file in our temporary output directory
 metricsToBeSaved.saveAsNewAPIHadoopFile(filePath, classOf[Void],
classOf[Metrics],
   classOf[ParquetOutputFormat[Metrics]], job.getConfiguration)


https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SparkParquetExample.scala



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Parquet-files-not-getting-coalesced-to-smaller-number-of-files-tp25509.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: partition RDD of images

2015-11-29 Thread Gylfi
Look at KeystoneML, there is an image processing pipeline there



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/partition-RDD-of-images-tp25515p25518.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to work with a joined rdd in pyspark?

2015-11-29 Thread Gylfi
Can't you just access it by element, like with [0] and [1] ? 
http://www.tutorialspoint.com/python/python_tuples.htm





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-work-with-a-joined-rdd-in-pyspark-tp25510p25517.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark sql throw java.lang.ArrayIndexOutOfBoundsException when use table.*

2015-11-29 Thread our...@cnsuning.com
hi all,
 throw java.lang.ArrayIndexOutOfBoundsException  when I use following 
spark sql on spark standlone or yarn.
   the sql:
select ta.* 
from bi_td.dm_price_seg_td tb 
join bi_sor.sor_ord_detail_tf ta 
on 1 = 1 
where ta.sale_dt = '20140514' 
and ta.sale_price >= tb.pri_from 
and ta.sale_price < tb.pri_to limit 10 ; 

But ,the result is correct when using no * as following:
select ta.sale_dt 
from bi_td.dm_price_seg_td tb 
join bi_sor.sor_ord_detail_tf ta 
on 1 = 1 
where ta.sale_dt = '20140514' 
and ta.sale_price >= tb.pri_from 
and ta.sale_price < tb.pri_to limit 10 ; 

standlone version is 1.4.0 and version spark on yarn  is 1.5.2
error log :
   
15/11/30 14:19:59 ERROR SparkSQLDriver: Failed in [select ta.* 
from bi_td.dm_price_seg_td tb 
join bi_sor.sor_ord_detail_tf ta 
on 1 = 1 
where ta.sale_dt = '20140514' 
and ta.sale_price >= tb.pri_from 
and ta.sale_price < tb.pri_to limit 10 ] 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
3, namenode2-sit.cnsuning.com): java.lang.ArrayIndexOutOfBoundsException 

Driver stacktrace: 
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
 
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
 
at scala.Option.foreach(Option.scala:236) 
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
 
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
 
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
 
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1850) 
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215) 
at 
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) 
at 
org.apache.spark.sql.hive.HiveContext$QueryExecution.stringResult(HiveContext.scala:587)
 
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
 
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:308)
 
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:376) 
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:311) 
at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:409) 
at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:425) 
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:166)
 
at 
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:606) 
at 
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.ArrayIndexOutOfBoundsException 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 
3, namenode2-sit.cnsuning.com): java.lang.ArrayIndexOutOfBoundsException 

Driver stacktrace: 
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
 
at