need help to have a Java version of this scala script

2016-12-16 Thread Richard Xin
what I am trying to do:I need to add column (could be complicated 
transformation based on value of a column) to a give dataframe.
scala script:val hContext = new HiveContext(sc)
import hContext.implicits._
val df = hContext.sql("select x,y,cluster_no from test.dc")
val len = udf((str: String) => str.length)
val twice = udf { (x: Int) => println(s"Computed: twice($x)"); x * 2 }
val triple = udf { (x: Int) => println(s"Computed: triple($x)"); x * 3}
val df1 = df.withColumn("name-len", len($"x"))
val df2 = df1.withColumn("twice", twice($"cluster_no"))
val df3 = df2.withColumn("triple", triple($"cluster_no"))
The scala script above seems to work ok, but I am having trouble to do it Java 
way (note that transformation based on value of a column could be complicated, 
not limited to simple add/minus etc.). is there a way in java? Thanks.


Running Multiple Versions of Spark on the same cluster (YARN)

2016-12-16 Thread Jorge Machado
Hi Everyone, 

I have one question : is it possible to run like on HDP Spark 1.6.1 and then 
run Spark 2.0.0 inside of it ? 
Like passing the spark libs with —jars ? The Ideia behind it is not to need to 
use the default installation of HDP and be able to deploy new versions of spark 
quickly. 

Thx

Jorge Machado








Regarding Connection Problem

2016-12-16 Thread Chintan Bhatt
Hi
I want to give continuous output (avg. temperature) generated from node.js
to store on Hadoop and then retrieve it for visualization.
please guide me how to give continuous output of node.js to kafka.

-- 
CHINTAN BHATT 

-- 


DISCLAIMER: The information transmitted is intended only for the person or 
entity to which it is addressed and may contain confidential and/or 
privileged material which is the intellectual property of Charotar 
University of Science & Technology (CHARUSAT). Any review, retransmission, 
dissemination or other use of, or taking of any action in reliance upon 
this information by persons or entities other than the intended recipient 
is strictly prohibited. If you are not the intended recipient, or the 
employee, or agent responsible for delivering the message to the intended 
recipient and/or if you have received this in error, please contact the 
sender and delete the material from the computer or device. CHARUSAT does 
not take any liability or responsibility for any malicious codes/software 
and/or viruses/Trojan horses that may have been picked up during the 
transmission of this message. By opening and solely relying on the contents 
or part thereof this message, and taking action thereof, the recipient 
relieves the CHARUSAT of all the liabilities including any damages done to 
the recipient's pc/laptop/peripherals and other communication devices due 
to any reason.


Spark GraphFrames generic question

2016-12-16 Thread Ankur Srivastava
Hi

I am working on two different use cases where the basic problem is same but 
scale is very different.

In case 1 we have two entities that can have many to many relation and we would 
want to identify all subgraphs in the full graph and then further prune the 
graph to find the best relation. There are close to 1 billion edges with a few 
100 million entities.

In case 2 the entities are more and they all can have many to many relations 
but the scale is much larger. We will have close to 50 billion entities and 
many more edges but again we would want to find subgraphs and then prune to 
find the best edges.

Is GraphFrame a good choice for this use case or we should use spark just for 
processing with some other graph database like Neo4j?

Thanks for any help!!

Thanks
Ankur

Sent from my iPhone
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Do we really need mesos or yarn? or is standalone sufficent?

2016-12-16 Thread vaquar khan
Hi Kant,

Hope following information will help .

1)Cluster
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-standalone.html
http://spark.apache.org/docs/latest/hardware-provisioning.html


2) Yarn vs Mesos
https://www.linkedin.com/pulse/mesos-compare-yarn-vaquar-
khan-?articleId=7126978319066345849

Yarn for Hadoop and Mesos for enterprise so if use together will be more
beneficial. In short Yarn on Mesos.

  http://mesosphere.github.io/presentations/myriad-strata/#/


Regards,
Vaquar khan

On 16 Dec 2016 18:46, "kant kodali"  wrote:

Hi Saif,

What do you mean by small cluster? Any specific size?

Also can you shine some light on how YARN takes a win over mesos?




Thanks,
kant

On Fri, Dec 16, 2016 at 10:45 AM,  wrote:

> In my experience, Standalone works very well in small cluster where there
> isn’t anything else running.
>
>
>
> Bigger cluster or shared resources, YARN takes a win, surpassing the
> overhead of spawning containers as opposed to a background running worker.
>
>
>
> Best is if you try both, if standalone is good enough keep it till you
> need more. Otherwise, try YARN or MESOS depending on the rest of your
> components.
>
>
>
> 2cents
>
>
>
> Saif
>
>
>
> *From:* kant kodali [mailto:kanth...@gmail.com]
> *Sent:* Friday, December 16, 2016 3:14 AM
> *To:* user @spark
> *Subject:* Do we really need mesos or yarn? or is standalone sufficent?
>
>
>
> Do we really need mesos or yarn? or is standalone sufficient for
> production systems? I understand the difference but I don't know the
> capabilities of standalone cluster. does anyone have experience deploying
> standalone in the production?
>
>
>
>
>


Re: Do we really need mesos or yarn? or is standalone sufficent?

2016-12-16 Thread kant kodali
Hi Saif,

What do you mean by small cluster? Any specific size?

Also can you shine some light on how YARN takes a win over mesos?

Thanks,
kant

On Fri, Dec 16, 2016 at 10:45 AM,  wrote:

> In my experience, Standalone works very well in small cluster where there
> isn’t anything else running.
>
>
>
> Bigger cluster or shared resources, YARN takes a win, surpassing the
> overhead of spawning containers as opposed to a background running worker.
>
>
>
> Best is if you try both, if standalone is good enough keep it till you
> need more. Otherwise, try YARN or MESOS depending on the rest of your
> components.
>
>
>
> 2cents
>
>
>
> Saif
>
>
>
> *From:* kant kodali [mailto:kanth...@gmail.com]
> *Sent:* Friday, December 16, 2016 3:14 AM
> *To:* user @spark
> *Subject:* Do we really need mesos or yarn? or is standalone sufficent?
>
>
>
> Do we really need mesos or yarn? or is standalone sufficient for
> production systems? I understand the difference but I don't know the
> capabilities of standalone cluster. does anyone have experience deploying
> standalone in the production?
>
>
>
>
>


Re: Issue: Skew on Dataframes while Joining the dataset

2016-12-16 Thread vaquar khan
That kind of issue SparkUI and DAG  visualization always helpful.


https://databricks.com/blog/2015/06/22/understanding-your-spark-application-through-visualization.html

Regards,
Vaquar khan

On Fri, Dec 16, 2016 at 11:10 AM, Vikas K.  wrote:

> Unsubscribe.
>
> On Fri, Dec 16, 2016 at 9:21 PM, KhajaAsmath Mohammed <
> mdkhajaasm...@gmail.com> wrote:
>
>> Hi,
>>
>> I am facing an issue with join operation on dataframe. My job is running
>> for very long time( > 2 hrs ) without any result. can someone help me on
>> how to resolve.
>>
>> I tried re-partition with 13 but no luck.
>>
>>
>> val results_dataframe = sqlContext.sql("select gt.*,ct.* from 
>> PredictTempTable pt,ClusterTempTable ct,GamificationTempTable gt where 
>> gt.vin=pt.vin and pt.cluster=ct.cluster")
>> //val results_dataframe_partitioned=results_dataframe.coalesce(numPartitions)
>> val results_dataframe_partitioned=results_dataframe.repartition(13)
>>
>> [image: Inline image 1]
>>
>> Thanks,
>> Asmath
>>
>
>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783

IT Architect / Lead Consultant
Greater Chicago


Re: How to get recent value in spark dataframe

2016-12-16 Thread Michael Armbrust
Oh and to get the null for missing years, you'd need to do an outer join
with a table containing all of the years you are interested in.

On Fri, Dec 16, 2016 at 3:24 PM, Michael Armbrust 
wrote:

> Are you looking for argmax? Here is an example
> 
> .
>
> On Wed, Dec 14, 2016 at 8:49 PM, Milin korath 
> wrote:
>
>> Hi
>>
>> I have a spark data frame with following structure
>>
>>  id  flag price date
>>   a   0100  2015
>>   a   050   2015
>>   a   1200  2014
>>   a   1300  2013
>>   a   0400  2012
>>
>> I need to create a data frame with recent value of flag 1 and updated in
>> the flag 0 rows.
>>
>>   id  flag price date new_column
>>   a   0100  2015200
>>   a   050   2015200
>>   a   1200  2014null
>>   a   1300  2013null
>>   a   0400  2012null
>>
>> We have 2 rows having flag=0. Consider the first row(flag=0),I will have
>> 2 values(200 and 300) and I am taking the recent one 200(2014). And the
>> last row I don't have any recent value for flag 1 so it is updated with
>> null.
>>
>> Looking for a solution using scala. Any help would be appreciated.Thanks
>>
>> Thanks
>> Milin
>>
>
>


Re: How to get recent value in spark dataframe

2016-12-16 Thread Michael Armbrust
Are you looking for argmax? Here is an example

.

On Wed, Dec 14, 2016 at 8:49 PM, Milin korath 
wrote:

> Hi
>
> I have a spark data frame with following structure
>
>  id  flag price date
>   a   0100  2015
>   a   050   2015
>   a   1200  2014
>   a   1300  2013
>   a   0400  2012
>
> I need to create a data frame with recent value of flag 1 and updated in
> the flag 0 rows.
>
>   id  flag price date new_column
>   a   0100  2015200
>   a   050   2015200
>   a   1200  2014null
>   a   1300  2013null
>   a   0400  2012null
>
> We have 2 rows having flag=0. Consider the first row(flag=0),I will have 2
> values(200 and 300) and I am taking the recent one 200(2014). And the last
> row I don't have any recent value for flag 1 so it is updated with null.
>
> Looking for a solution using scala. Any help would be appreciated.Thanks
>
> Thanks
> Milin
>


Re: Do we really need mesos or yarn? or is standalone sufficent?

2016-12-16 Thread Anant Chintamaneni
+1

Sent from my iPhone

> On Dec 16, 2016, at 10:45 AM,  
>  wrote:
> 
> In my experience, Standalone works very well in small cluster where there 
> isn’t anything else running.
>  
> Bigger cluster or shared resources, YARN takes a win, surpassing the overhead 
> of spawning containers as opposed to a background running worker.
>  
> Best is if you try both, if standalone is good enough keep it till you need 
> more. Otherwise, try YARN or MESOS depending on the rest of your components.
>  
> 2cents
>  
> Saif
>  
> From: kant kodali [mailto:kanth...@gmail.com] 
> Sent: Friday, December 16, 2016 3:14 AM
> To: user @spark
> Subject: Do we really need mesos or yarn? or is standalone sufficent?
>  
> Do we really need mesos or yarn? or is standalone sufficient for production 
> systems? I understand the difference but I don't know the capabilities of 
> standalone cluster. does anyone have experience deploying standalone in the 
> production?
>  
>  


RE: Do we really need mesos or yarn? or is standalone sufficent?

2016-12-16 Thread Saif.A.Ellafi
In my experience, Standalone works very well in small cluster where there isn’t 
anything else running.

Bigger cluster or shared resources, YARN takes a win, surpassing the overhead 
of spawning containers as opposed to a background running worker.

Best is if you try both, if standalone is good enough keep it till you need 
more. Otherwise, try YARN or MESOS depending on the rest of your components.

2cents

Saif

From: kant kodali [mailto:kanth...@gmail.com]
Sent: Friday, December 16, 2016 3:14 AM
To: user @spark
Subject: Do we really need mesos or yarn? or is standalone sufficent?

Do we really need mesos or yarn? or is standalone sufficient for production 
systems? I understand the difference but I don't know the capabilities of 
standalone cluster. does anyone have experience deploying standalone in the 
production?




Re: How to reflect dynamic registration udf?

2016-12-16 Thread Cheng Lian
Could you please provide more context about what you are trying to do here?

On Thu, Dec 15, 2016 at 6:27 PM 李斌松  wrote:

> How to reflect dynamic registration udf?
>
> java.lang.UnsupportedOperationException: Schema for type _$13 is not
> supported
> at
> org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
> at
> org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
> at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:145)
> at
> com.alibaba.spark.odps.driver.util.Utils$$anon$1.processMatch(Utils.scala:115)
> at
> io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec$1.lookForMatches(ScanSpec.java:759)
> at
> io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:446)
> at
> io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:368)
> at
> io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:59)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> final class sparkFunc(val name: String) extends StaticAnnotation{}
>
> def registerFunc(hiveContext: HiveContext): Unit = {
> info("register udf function")
>
> val ru = scala.reflect.runtime.universe
> val classLoaderMirror = ru.runtimeMirror(getClass.getClassLoader)
>
> new FastClasspathScanner("com.alibaba.spark.odps.driver.functions")
> .matchAllClasses(new ClassMatchProcessor() {
> override def processMatch(aClass: Class[_]): Unit = {
> val classMirror = classLoaderMirror.classSymbol(aClass)
> val annotation = classMirror.annotations.find(_.tpe =:= 
> ru.typeOf[sparkFunc]).getOrElse(null)
>
> try {
> if (annotation != null) {
> var funcName = 
> StringUtils.substringBetween(annotation.toString, "\"", "\"")
>
> if (chekClazz(aClass, classOf[Function1[_, _]])) {
> val func: Function1[_, _] = 
> createInstance[Function1[_, _]](aClass).get
> hiveContext.udf.register(funcName, func)
> } else if (chekClazz(aClass, classOf[Function2[_, _, 
> _]])) {
> val func: Function2[_, _, _] = 
> createInstance[Function2[_, _, _]](aClass).get
> hiveContext.udf.register(funcName, func)
> } else if (chekClazz(aClass, classOf[Function3[_, _, 
> _, _]])) {
> val func: Function3[_, _, _, _] = 
> createInstance[Function3[_, _, _, _]](aClass).get
> hiveContext.udf.register(funcName, func)
> } else {
> throw new RuntimeException("not support function")
> }
>
> info("== register function: {}", funcName)
> }
> } catch {
> case e: Exception => error(e.getMessage, e)
> }
> }
> }).scan()
> }
>
> private def chekClazz(sClass: Class[_], pClass: Class[_]): Boolean = {
> try {
> sClass.asSubclass(pClass)
> true
> } catch {
> case e: Exception => false
> }
> }
>
> private def createInstance[T: ClassTag](clazz: Class[_]): Try[T] = {
> Try {
> val constructor = clazz.getDeclaredConstructor()
> constructor.setAccessible(true)
> val obj = constructor.newInstance()
> val t = implicitly[ClassTag[T]].runtimeClass
> if (t.isInstance(obj)) {
> obj.asInstanceOf[T]
> } else throw new ClassCastException(clazz.getName + " is not a 
> subtype of " + t)
> } recover {
> case i: InvocationTargetException if i.getTargetException ne null ⇒ 
> throw i.getTargetException
> }
> }
>
>


Re: Issue: Skew on Dataframes while Joining the dataset

2016-12-16 Thread Vikas K.
Unsubscribe.

On Fri, Dec 16, 2016 at 9:21 PM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am facing an issue with join operation on dataframe. My job is running
> for very long time( > 2 hrs ) without any result. can someone help me on
> how to resolve.
>
> I tried re-partition with 13 but no luck.
>
>
> val results_dataframe = sqlContext.sql("select gt.*,ct.* from 
> PredictTempTable pt,ClusterTempTable ct,GamificationTempTable gt where 
> gt.vin=pt.vin and pt.cluster=ct.cluster")
> //val results_dataframe_partitioned=results_dataframe.coalesce(numPartitions)
> val results_dataframe_partitioned=results_dataframe.repartition(13)
>
> [image: Inline image 1]
>
> Thanks,
> Asmath
>


Unsubscribe

2016-12-16 Thread krishna ramachandran
Unsubscribe


Mesos Spark Fine Grained Execution - CPU count

2016-12-16 Thread Chawla,Sumit
Hi

I am using Spark 1.6. I have one query about Fine Grained model in Spark.
I have a simple Spark application which transforms A -> B.  Its a single
stage application.  To begin the program, It starts with 48 partitions.
When the program starts running, in mesos UI it shows 48 tasks and 48 CPUs
allocated to job.  Now as the tasks get done, the number of active tasks
number starts decreasing.  How ever, the number of CPUs does not decrease
propotionally.  When the job was about to finish, there was a single
remaininig task, however CPU count was still 20.

My questions, is why there is no one to one mapping between tasks and cpus
in Fine grained?  How can these CPUs be released when the job is done, so
that other jobs can start.


Regards
Sumit Chawla


Re: Issue: Skew on Dataframes while Joining the dataset

2016-12-16 Thread KhajaAsmath Mohammed
Hi,

I am able to resolve this issue. Culprit was the SQL query adding one more
join returned records in less time.

Thanks,
Asmath

On Fri, Dec 16, 2016 at 9:51 AM, KhajaAsmath Mohammed <
mdkhajaasm...@gmail.com> wrote:

> Hi,
>
> I am facing an issue with join operation on dataframe. My job is running
> for very long time( > 2 hrs ) without any result. can someone help me on
> how to resolve.
>
> I tried re-partition with 13 but no luck.
>
>
> val results_dataframe = sqlContext.sql("select gt.*,ct.* from 
> PredictTempTable pt,ClusterTempTable ct,GamificationTempTable gt where 
> gt.vin=pt.vin and pt.cluster=ct.cluster")
> //val results_dataframe_partitioned=results_dataframe.coalesce(numPartitions)
> val results_dataframe_partitioned=results_dataframe.repartition(13)
>
> [image: Inline image 1]
>
> Thanks,
> Asmath
>


Re: Spark Batch checkpoint

2016-12-16 Thread Chawla,Sumit
sorry for hijacking this thread.

@irving, how do you restart a spark job from checkpoint?

Regards
Sumit Chawla


On Fri, Dec 16, 2016 at 2:24 AM, Selvam Raman  wrote:

> Hi,
>
> Acutally my requiremnt is read the parquet file which is 100 partition.
> Then i use foreachpartition to read the data and process it.
>
> My sample code
>
> public static void main(String[] args) {
>
>
> SparkSession sparkSession = SparkSession.builder().appName("checkpoint
> verification").getOrCreate();
>
> sparkSession.implicits();
>
> sparkSession.sparkContext().setCheckpointDir("Checkpoint/Dec16");
>
> Dataset sampleData=sparkSession.read().parquet("filepath");
>
> sampleData.foreachPartition(new ForeachPartitionFunction(){
>
>
> /**
>
> *
>
> */
>
> private static final long serialVersionUID = 1L;
>
>
> @Override
>
> public void call(Iterator row) throws Exception
>
> {
>
>
> while(row.hasNext())
>
> {
>
> //Process data and insert into No-Sql DB
>
> }
>
> }
>
> });
>
> }
>
> }
>
>
>
> Now where can i apply rdd.checkpoint().
>
>
>
> Thanks,
>
> selvam
>
>
>
> On Thu, Dec 15, 2016 at 10:44 PM, Selvam Raman  wrote:
>
>> I am using java. I will try and let u know.
>> On Dec 15, 2016 8:45 PM, "Irving Duran"  wrote:
>>
>>> Not sure what programming language you are using, but in python you can
>>> do "sc.setCheckpointDir('~/apps/spark-2.0.1-bin-hadoop2.7/checkpoint/')".
>>> This will store checkpoints on that directory that I called checkpoint.
>>>
>>>
>>> Thank You,
>>>
>>> Irving Duran
>>>
>>> On Thu, Dec 15, 2016 at 10:33 AM, Selvam Raman  wrote:
>>>
 Hi,

 is there any provision in spark batch for checkpoint.

 I am having huge data, it takes more than 3 hours to process all data.
 I am currently having 100 partitions.

 if the job fails after two hours, lets say it has processed 70
 partition. should i start spark job from the beginning or is there way for
 checkpoint provision.

 Checkpoint,what i am expecting is start from 71 partition to till end.

 Please give me your suggestions.

 --
 Selvam Raman
 "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"

>>>
>>>
>
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Issue: Skew on Dataframes while Joining the dataset

2016-12-16 Thread KhajaAsmath Mohammed
Hi,

I am facing an issue with join operation on dataframe. My job is running
for very long time( > 2 hrs ) without any result. can someone help me on
how to resolve.

I tried re-partition with 13 but no luck.


val results_dataframe = sqlContext.sql("select gt.*,ct.* from
PredictTempTable pt,ClusterTempTable ct,GamificationTempTable gt where
gt.vin=pt.vin and pt.cluster=ct.cluster")
//val results_dataframe_partitioned=results_dataframe.coalesce(numPartitions)
val results_dataframe_partitioned=results_dataframe.repartition(13)

[image: Inline image 1]

Thanks,
Asmath


Re: Negative values of predictions in ALS.tranform

2016-12-16 Thread Manish Tripathi
Thanks a bunch. That's very helpful.

On Friday, December 16, 2016, Sean Owen  wrote:

> That all looks correct.
>
> On Thu, Dec 15, 2016 at 11:54 PM Manish Tripathi  > wrote:
>
>> ok. Thanks. So here is what I understood.
>>
>> Input data to Als.fit(implicitPrefs=True) is the actual strengths (count
>> data). So if I have a matrix of (user,item,views/purchases) I pass that as
>> the input and not the binarized one (preference). This signifies the
>> strength.
>>
>> 2) Since we also pass the alpha parameter to this Als.fit() method, Spark
>> internally creates the confidence matrix +1+alpha*input_data or some other
>> alpha factor.
>>
>> 3). The output which it gives is basically a factorization of 0/1 matrix
>> (binarized matrix from initial input data), hence the output also resembles
>> the preference matrix (0/1) suggesting the interaction. So typically it
>> should be between 0-1but if it is negative it means very less
>> preference/interaction
>>
>> *Does all the above sound correct?.*
>>
>> If yes, then one last question-
>>
>> 1). *For explicit dataset where we don't use implicitPref=True,* the
>> predicted ratings would be actual ratings like it can be 2.3,4.5 etc and
>> not the interaction measure. That is because in explicit we are not using
>> the confidence matrix and preference matrix concept and use the actual
>> rating data. So any output from Spark ALS for explicit data would be a
>> rating prediction.
>> ᐧ
>>
>> On Thu, Dec 15, 2016 at 3:46 PM, Sean Owen > > wrote:
>>
>>> No, input are weights or strengths. The output is a factorization of the
>>> binarization of that to 0/1, not probs or a factorization of the input.
>>> This explains the range of the output.
>>>
>>>
>>> On Thu, Dec 15, 2016, 23:43 Manish Tripathi >> > wrote:
>>>
 when you say *implicit ALS *is* factoring the 0/1 matrix. , are you
 saying for implicit feedback algorithm we need to pass the input data as
 the preference matrix i.e a matrix of 0 and 1?. *

 Then how will they calculate the confidence matrix which is basically
 =1+alpha*count matrix. If we don't pass the actual count of values (views
 etc) then how does Spark calculates the confidence matrix?.

 I was of the understanding that input data for
 als.fit(implicitPref=True) is the actual count matrix of the
 views/purchases?. Am I going wrong here if yes, then how is Spark
 calculating the confidence matrix if it doesn't have the actual count data.

 The original paper on which Spark algo is based needs the actual count
 data to create a confidence matrix and also needs the 0/1 matrix since the
 objective functions uses both the confidence matrix and 0/1 matrix to find
 the user and item factors.
 ᐧ

 On Thu, Dec 15, 2016 at 3:38 PM, Sean Owen > wrote:

> No, you can't interpret the output as probabilities at all. In
> particular they may be negative. It is not predicting rating but
> interaction. Negative means very strongly not predicted to interact. No,
> implicit ALS *is* factoring the 0/1 matrix.
>
> On Thu, Dec 15, 2016, 23:31 Manish Tripathi  > wrote:
>
>> Ok. So we can kind of interpret the output as probabilities even
>> though it is not modeling probabilities. This is to be able to use it for
>> binaryclassification evaluator.
>>
>> So the way I understand is and as per the algo, the predicted matrix
>> is basically a dot product of user factor and item factor matrix.
>>
>> but in what circumstances the ratings predicted can be negative. I
>> can understand if the individual user factor vector and item factor 
>> vector
>> is having negative factor terms, then it can be negative. But practically
>> does negative make any sense? AS per algorithm the dot product is the
>> predicted rating. So rating shouldnt be negative for it to make any 
>> sense.
>> Also rating just between 0-1 is normalised rating? Typically rating we
>> expect to be like any real value 2.3,4.5 etc.
>>
>> Also please note, for implicit feedback ALS, we don't feed 0/1
>> matrix. We feed the count matrix (discrete count values) and am assuming
>> spark internally converts it into a preference matrix (1/0) and a
>> confidence matrix =1+alpha*count_matrix
>>
>>
>>
>>
>> ᐧ
>>
>> On Thu, Dec 15, 2016 at 2:56 PM, Sean Owen > > wrote:
>>
>>> No, ALS is not modeling probabilities. The outputs are

Re: Why foreachPartition function make duplicate invocation to map function for every message ? (Spark 2.0.2)

2016-12-16 Thread Cody Koeninger
Please post a minimal complete code example of what you are talking about

On Thu, Dec 15, 2016 at 6:00 PM, Michael Nguyen
 wrote:
> I have the following sequence of Spark Java API calls (Spark 2.0.2):
>
> Kafka stream that is processed via a map function, which returns the string
> value from tuple2._2() for JavaDStream as in
>
> return tuple2._2();
>
> The returned JavaDStream is then processed by foreachPartition, which is
> wrapped by foreachRDD.
>
> foreachPartition's call function does Iterator on the RDD as in
> inputRDD.next ();
>
> When data is received, step 1 is executed, which is correct. However,
> inputRDD.next () in step 3 makes a duplicate call to the map function in
> step 1. So that map function is called twice for every message:
>
> -  the first time when the message is received from the Kafka stream, and
>
> - the second time when Iterator inputParams.next () is invoked from
> foreachPartition's call function.
>
> I also tried transforming the data in the map function as in
>
> public TestTransformedClass call(Tuple2  tuple2) for step 1
>
> public void call(Iterator  inputParams) for step 3
>
> and the same issue occurs. So this issue occurs, no matter whether this
> sequence of Spark API calls involves data transformation or not.
>
> Questions:
>
> Since the message was already processed in step 1, why does inputRDD.next ()
> in step 3 makes a duplicate call to the map function in step 1 ?
>
> How do I fix it to avoid duplicate invocation for every message ?
>
> Thanks.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: coalesce ending up very unbalanced - but why?

2016-12-16 Thread vaquar khan
https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-rdd-partitions.html

Regards,
vaquar khan

On Wed, Dec 14, 2016 at 12:15 PM, Vaibhav Sinha 
wrote:

> Hi,
> I see a similar behaviour in an exactly similar scenario at my deployment
> as well. I am using scala, so the behaviour is not limited to pyspark.
> In my observation 9 out of 10 partitions (as in my case) are of similar
> size ~38 GB each and final one is significantly larger ~59 GB.
> Prime number of partitions is an interesting approach I will try that out.
>
> Best,
> Vaibhav.
>
> On 14 Dec 2016, 10:18 PM +0530, Dirceu Semighini Filho <
> dirceu.semigh...@gmail.com>, wrote:
>
> Hello,
> We have done some test in here, and it seems that when we use prime number
> of partitions the data is more spread.
> This has to be with the hashpartitioning and the Java Hash algorithm.
> I don't know how your data is and how is this in python, but if you (can)
> implement a partitioner, or change it from default, you will get a better
> result.
>
> Dirceu
>
> 2016-12-14 12:41 GMT-02:00 Adrian Bridgett :
>
>> Since it's pyspark it's just using the default hash partitioning I
>> believe.  Trying a prime number (71 so that there's enough CPUs) doesn't
>> seem to change anything.  Out of curiousity why did you suggest that?
>> Googling "spark coalesce prime" doesn't give me any clue :-)
>> Adrian
>>
>>
>> On 14/12/2016 13:58, Dirceu Semighini Filho wrote:
>>
>> Hi Adrian,
>> Which kind of partitioning are you using?
>> Have you already tried to coalesce it to a prime number?
>>
>>
>> 2016-12-14 11:56 GMT-02:00 Adrian Bridgett :
>>
>>> I realise that coalesce() isn't guaranteed to be balanced and adding a
>>> repartition() does indeed fix this (at the cost of a large shuffle.
>>>
>>> I'm trying to understand _why_ it's so uneven (hopefully it helps
>>> someone else too).   This is using spark v2.0.2 (pyspark).
>>>
>>> Essentially we're just reading CSVs into a DataFrame (which we persist
>>> serialised for some calculations), then writing it back out as PRQ.  To
>>> avoid too many PRQ files I've set a coalesce of 72 (9 boxes, 8 CPUs each).
>>>
>>> The writers end up with about 700-900MB each (not bad).  Except for one
>>> which is at 6GB before I killed it.
>>>
>>> Input data is 12000 gzipped CSV files in S3 (approx 30GB), named like
>>> this, almost all about 2MB each:
>>> s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587209
>>> -i-da71c942-389.gz
>>> s3://example-rawdata-prod/data/2016-12-13/v3.19.0/1481587529
>>> -i-01d3dab021b760d29-334.gz
>>>
>>> (we're aware that this isn't an ideal naming convention from an S3
>>> performance PoV).
>>>
>>> The actual CSV file format is:
>>> UUID\tINT\tINT\... . (wide rows - about 300 columns)
>>>
>>> e.g.:
>>> 17f9c2a7-ddf6-42d3-bada-63b845cb33a51481587198750   11213
>>> 1d723493-5341-450d-a506-5c96ce0697f01481587198751   11212 ...
>>> 64cec96f-732c-44b8-a02e-098d5b63ad771481587198752   11211 ...
>>>
>>> The dataframe seems to be stored evenly on all the nodes (according to
>>> the storage tab) and all the blocks are the same size.   Most of the tasks
>>> are executed at NODE_LOCAL locality (although there are a few ANY).  The
>>> oversized task is NODE_LOCAL though.
>>>
>>> The reading and calculations all seem evenly spread, confused why the
>>> writes aren't as I'd expect the input partitions to be even, what's causing
>>> and what we can do?  Maybe it's possible for coalesce() to be a bit smarter
>>> in terms of which partitions it coalesces - balancing the size of the final
>>> partitions rather than the number of source partitions in each final
>>> partition.
>>>
>>> Thanks for any light you can shine!
>>>
>>> Adrian
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>> *Adrian Bridgett* |  Sysadmin Engineer, OpenSignal
>> 
>> _
>> Office: 3rd Floor, The Angel Office, 2 Angel Square, London, EC1V 1NY
>> Phone #: +44 777-377-8251 <+44%20777-377-8251>
>> Skype: abridgett  |  @adrianbridgett 
>>   |  LinkedIn link  
>> _
>>
>
>


-- 
Regards,
Vaquar Khan
+1 -224-436-0783

IT Architect / Lead Consultant
Greater Chicago


java.lang.RuntimeException: Stream '/jars/' not found

2016-12-16 Thread Hanumath Rao Maduri
Hello All,

I am trying to test an application on standalone cluster. Here is my
scenario.
I started a spark master on a  node A and also 1 worker on the same node A.

I am trying to run the application from node B(this means I think this acts
as driver).

I have added jars to the sparkconf using setJars("jar1","jar2")

When I start the application, I see the following info saying that It could
the jar

16/12/16 07:45:56 INFO SparkContext: Added JAR jar1.jar at
spark://nodeb:48151/jars/jar1.jar with timestamp 1481899556375

and

16/12/16 07:45:56 INFO SparkContext: Added JAR jar2.jar at
spark://nodeb:48151/jars/jar2.jar with timestamp 1481899556376



But I get the following exception from nodeA during netty fetch (I think)


16/12/16 07:46:00 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
nodeA): java.lang.RuntimeException: Stream '/jars/node2.jar' was not found

at
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:222)

at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:121)

at
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)

at
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)

at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)

at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)

at
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)

at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)

at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)

at
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)

at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)

at
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)

at
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)

at
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)

at
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)

at
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)

at
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)

at
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

at
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)

at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)

at io.netty.util.concurrent.SingleThreadEven


Any help is much appreciated. I am stuck at this point from quite some time.


Thanks,


unsuscribe

2016-12-16 Thread Javier Rey



Re: How to get recent value in spark dataframe

2016-12-16 Thread vaquar khan
Not sure about your logic 0 and 1 but you can use orderBy the data
according to time and get the first value.

Regards,
Vaquar khan

On Wed, Dec 14, 2016 at 10:49 PM, Milin korath 
wrote:

> Hi
>
> I have a spark data frame with following structure
>
>  id  flag price date
>   a   0100  2015
>   a   050   2015
>   a   1200  2014
>   a   1300  2013
>   a   0400  2012
>
> I need to create a data frame with recent value of flag 1 and updated in
> the flag 0 rows.
>
>   id  flag price date new_column
>   a   0100  2015200
>   a   050   2015200
>   a   1200  2014null
>   a   1300  2013null
>   a   0400  2012null
>
> We have 2 rows having flag=0. Consider the first row(flag=0),I will have 2
> values(200 and 300) and I am taking the recent one 200(2014). And the last
> row I don't have any recent value for flag 1 so it is updated with null.
>
> Looking for a solution using scala. Any help would be appreciated.Thanks
>
> Thanks
> Milin
>



-- 
Regards,
Vaquar Khan
+1 -224-436-0783

IT Architect / Lead Consultant
Greater Chicago


Re: Spark dump in slave Node EMR

2016-12-16 Thread Selvam Raman
If i want to take specifically for the task number which got failed. is it
possible to take heap dump.


"16/12/16 12:25:54 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
Container killed by YARN for exceeding memory limits. 20.0 GB of 19.8 GB
physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

16/12/16 12:25:54 ERROR YarnClusterScheduler: Lost executor 1 on
ip-.dev: Container killed by YARN for exceeding memory limits. 20.0 GB
of 19.8 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.
16/12/16 12:25:55 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID
9, ip.dev): ExecutorLostFailure (executor 1 exited caused by one of
the running tasks) Reason: Container killed by YARN for exceeding
memory limits. 20.0 GB of 19.8 GB physical memory used. Consider
boosting spark.yarn.executor.memoryOverhead.
16/12/16 12:25:55 INFO BlockManagerMasterEndpoint: Trying to remove
executor 1 from BlockManagerMaster.
16/12/16 12:25:55 INFO BlockManagerMaster: Removal of executor 1 requested
16/12/16 12:25:55 INFO YarnSchedulerBackend$YarnDriverEndpoint: Asked
to remove non-existent executor 1

"

thanks,
selvam R

On Fri, Dec 16, 2016 at 12:30 PM, Selvam Raman  wrote:

> Hi,
>
> how can i take heap dump in EMR slave node to analyze.
>
> I have one master and two slave.
>
> if i enter jps command in Master, i could see sparksubmit with pid.
>
> But i could not see anything in slave node.
>
> how can i take heap dump for spark job.
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>



-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Spark dump in slave Node EMR

2016-12-16 Thread Selvam Raman
Hi,

how can i take heap dump in EMR slave node to analyze.

I have one master and two slave.

if i enter jps command in Master, i could see sparksubmit with pid.

But i could not see anything in slave node.

how can i take heap dump for spark job.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Re: Gradle dependency problem with spark

2016-12-16 Thread Steve Loughran
FWIW, although the underlying Hadoop declared guava dependency is pretty low, 
everything in org.apache.hadoop is set up to run against later versions. It 
just sticks with the old one to avoid breaking anything donwstream which does 
expect a low version number. See HADOOP-10101 for the ongoing pain there —and 
complain on there if you do find something in the Hadoop layer which can't 
handle later guava versions.




On 16 Dec 2016, at 11:07, Sean Owen 
> wrote:

Yes, that's the problem. Guava isn't generally mutually compatible across more 
than a couple major releases. You may have to hunt for a version that happens 
to have the functionality that both dependencies want, and hope that exists. 
Spark should shade Guava at this point but doesn't mean that you won't hit this 
problem from transitive dependencies.

On Fri, Dec 16, 2016 at 11:05 AM kant kodali 
> wrote:
I replaced guava-14.0.1.jar  with guava-19.0.jar in SPARK_HOME/jars and seem to 
work ok but I am not sure if it is the right thing to do. My fear is that if 
Spark uses features from Guava that are only present in 14.0.1 but not in 19.0 
I guess my app will break.



On Fri, Dec 16, 2016 at 2:22 AM, kant kodali 
> wrote:
Hi Guys,

Here is the simplified version of my problem. I have the following problem and 
I new to gradle



dependencies {
compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.2'
compile group: 'com.github.brainlag', name: 'nsq-client', version: 
'1.0.0.RC2'
}

I took out the other dependencies for simplicity. The problem here is 
spark-core_2.11 uses com.google.guava:14.0.1 and nsq-client uses 
com.google.guava:19.0 so when I submit my fat uber jar using spark-submit I get 
the following error


Exception in thread "main" java.lang.NoSuchMethodError: 
com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set;
at com.github.brainlag.nsq.NSQProducer.(NSQProducer.java:22)
at com.hello.streamprocessing.app.SparkDriver2.main(SparkDriver2.java:37)

any help would be great. Also if you need more description you can find it 
here

Thanks!





Re: Handling Exception or Control in spark dataframe write()

2016-12-16 Thread Steve Loughran

> On 14 Dec 2016, at 18:10, bhayat  wrote:
> 
> Hello,
> 
> I am writing my RDD into parquet format but what i understand that write()
> method is still experimental and i do not know how i will deal with possible
> exceptions.
> 
> For example:
> 
> schemaXXX.write().mode(saveMode).parquet(parquetPathInHdfs);
> 
> In this example i do not know how i will handle exception if parquet path
> does not exist or host is not reachable.


the parent path will be created. You are more likely to see a problem if the 
final path does exist. 

> 
> Do you have any way to do it ?

generally, catch the IOExceptions raised and report them. The HDFS IPC layer 
has a fair amount of retry logic built in to handle transient outages of the 
namenode/datanodes (and long GC pauses, which look similar); when they give up 
you'll see an IOException of some kind or other. All other filesystem API calls 
tend to raise IOExceptions too. try/catch are your friends.

What is hard is: what do you do next? Retry? Give up? I don't think there's a 
clear consensus there

> 
> Thank you,
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Handling-Exception-or-Control-in-spark-dataframe-write-tp28210.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Gradle dependency problem with spark

2016-12-16 Thread Sean Owen
Yes, that's the problem. Guava isn't generally mutually compatible across
more than a couple major releases. You may have to hunt for a version that
happens to have the functionality that both dependencies want, and hope
that exists. Spark should shade Guava at this point but doesn't mean that
you won't hit this problem from transitive dependencies.

On Fri, Dec 16, 2016 at 11:05 AM kant kodali  wrote:

> I replaced *guava-14.0.1.jar*  with *guava-19.0.jar in *SPARK_HOME/jars
> and seem to work ok but I am not sure if it is the right thing to do. My
> fear is that if Spark uses features from Guava that are only present in
> 14.0.1 but not in 19.0 I guess my app will break.
>
>
>
> On Fri, Dec 16, 2016 at 2:22 AM, kant kodali  wrote:
>
> Hi Guys,
>
> Here is the simplified version of my problem. I have the following problem
> and I new to gradle
>
>
> dependencies {
> compile group: 'org.apache.spark', name: 'spark-core_2.11', version: 
> '2.0.2'
> compile group: 'com.github.brainlag', name: 'nsq-client', version: 
> '1.0.0.RC2'
> }
>
>
> I took out the other dependencies for simplicity. The problem here
> is spark-core_2.11 uses com.google.guava:14.0.1 and nsq-client uses
> com.google.guava:19.0 so when I submit my fat uber jar using spark-submit I
> get the following error
>
> Exception in thread "main" java.lang.NoSuchMethodError: 
> com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set;
> at com.github.brainlag.nsq.NSQProducer.(NSQProducer.java:22)
> at com.hello.streamprocessing.app.SparkDriver2.main(SparkDriver2.java:37)
>
>
> any help would be great. Also if you need more description you can find it
> here
> 
>
> Thanks!
>
>
>


Re: Gradle dependency problem with spark

2016-12-16 Thread kant kodali
I replaced *guava-14.0.1.jar*  with *guava-19.0.jar in *SPARK_HOME/jars and
seem to work ok but I am not sure if it is the right thing to do. My fear
is that if Spark uses features from Guava that are only present in 14.0.1
but not in 19.0 I guess my app will break.



On Fri, Dec 16, 2016 at 2:22 AM, kant kodali  wrote:

> Hi Guys,
>
> Here is the simplified version of my problem. I have the following problem
> and I new to gradle
>
>
> dependencies {
> compile group: 'org.apache.spark', name: 'spark-core_2.11', version: 
> '2.0.2'
> compile group: 'com.github.brainlag', name: 'nsq-client', version: 
> '1.0.0.RC2'
> }
>
>
> I took out the other dependencies for simplicity. The problem here
> is spark-core_2.11 uses com.google.guava:14.0.1 and nsq-client uses
> com.google.guava:19.0 so when I submit my fat uber jar using spark-submit I
> get the following error
>
> Exception in thread "main" java.lang.NoSuchMethodError: 
> com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set;
> at com.github.brainlag.nsq.NSQProducer.(NSQProducer.java:22)
> at com.hello.streamprocessing.app.SparkDriver2.main(SparkDriver2.java:37)
>
>
> any help would be great. Also if you need more description you can find it
> here
> 
>
> Thanks!
>
>


Re: Dataset encoders for further types?

2016-12-16 Thread Jakub Dubovsky
I will give that a try. Thanks!

On Fri, Dec 16, 2016 at 12:45 AM, Michael Armbrust 
wrote:

> I would have sworn there was a ticket, but I can't find it.  So here you
> go: https://issues.apache.org/jira/browse/SPARK-18891
>
> A work around until that is fixed would be for you to manually specify the 
> kryo
> encoder
> 
> .
>
> On Thu, Dec 15, 2016 at 8:18 AM, Jakub Dubovsky <
> spark.dubovsky.ja...@gmail.com> wrote:
>
>> Hey,
>>
>> I want to ask whether there is any roadmap/plan for adding Encoders for
>> further types in next releases of Spark. Here is a list
>>  
>> of
>> currently supported types. We would like to use Datasets with our
>> internally defined case classes containing 
>> scala.collection.immutable.List(s).
>> This does not work now because these lists are converted to ArrayType
>> (Seq). This then fails a constructor lookup because of seq-is-not-a-list
>> error...
>>
>> This means that for now we are stuck with using RDDs.
>>
>> Thanks for any insights!
>>
>> Jakub Dubovsky
>>
>>
>


Re: Spark Batch checkpoint

2016-12-16 Thread Selvam Raman
Hi,

Acutally my requiremnt is read the parquet file which is 100 partition.
Then i use foreachpartition to read the data and process it.

My sample code

public static void main(String[] args) {


SparkSession sparkSession = SparkSession.builder().appName("checkpoint
verification").getOrCreate();

sparkSession.implicits();

sparkSession.sparkContext().setCheckpointDir("Checkpoint/Dec16");

Dataset sampleData=sparkSession.read().parquet("filepath");

sampleData.foreachPartition(new ForeachPartitionFunction(){


/**

*

*/

private static final long serialVersionUID = 1L;


@Override

public void call(Iterator row) throws Exception

{


while(row.hasNext())

{

//Process data and insert into No-Sql DB

}

}

});

}

}



Now where can i apply rdd.checkpoint().



Thanks,

selvam



On Thu, Dec 15, 2016 at 10:44 PM, Selvam Raman  wrote:

> I am using java. I will try and let u know.
> On Dec 15, 2016 8:45 PM, "Irving Duran"  wrote:
>
>> Not sure what programming language you are using, but in python you can
>> do "sc.setCheckpointDir('~/apps/spark-2.0.1-bin-hadoop2.7/checkpoint/')".
>> This will store checkpoints on that directory that I called checkpoint.
>>
>>
>> Thank You,
>>
>> Irving Duran
>>
>> On Thu, Dec 15, 2016 at 10:33 AM, Selvam Raman  wrote:
>>
>>> Hi,
>>>
>>> is there any provision in spark batch for checkpoint.
>>>
>>> I am having huge data, it takes more than 3 hours to process all data. I
>>> am currently having 100 partitions.
>>>
>>> if the job fails after two hours, lets say it has processed 70
>>> partition. should i start spark job from the beginning or is there way for
>>> checkpoint provision.
>>>
>>> Checkpoint,what i am expecting is start from 71 partition to till end.
>>>
>>> Please give me your suggestions.
>>>
>>> --
>>> Selvam Raman
>>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>>
>>
>>


-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Gradle dependency problem with spark

2016-12-16 Thread kant kodali
Hi Guys,

Here is the simplified version of my problem. I have the following problem
and I new to gradle


dependencies {
compile group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.0.2'
compile group: 'com.github.brainlag', name: 'nsq-client', version:
'1.0.0.RC2'
}


I took out the other dependencies for simplicity. The problem here
is spark-core_2.11 uses com.google.guava:14.0.1 and nsq-client uses
com.google.guava:19.0 so when I submit my fat uber jar using spark-submit I
get the following error

Exception in thread "main" java.lang.NoSuchMethodError:
com.google.common.collect.Sets.newConcurrentHashSet()Ljava/util/Set;
at com.github.brainlag.nsq.NSQProducer.(NSQProducer.java:22)
at com.hello.streamprocessing.app.SparkDriver2.main(SparkDriver2.java:37)


any help would be great. Also if you need more description you can find it
here


Thanks!


Re: Negative values of predictions in ALS.tranform

2016-12-16 Thread Sean Owen
That all looks correct.

On Thu, Dec 15, 2016 at 11:54 PM Manish Tripathi 
wrote:

> ok. Thanks. So here is what I understood.
>
> Input data to Als.fit(implicitPrefs=True) is the actual strengths (count
> data). So if I have a matrix of (user,item,views/purchases) I pass that as
> the input and not the binarized one (preference). This signifies the
> strength.
>
> 2) Since we also pass the alpha parameter to this Als.fit() method, Spark
> internally creates the confidence matrix +1+alpha*input_data or some other
> alpha factor.
>
> 3). The output which it gives is basically a factorization of 0/1 matrix
> (binarized matrix from initial input data), hence the output also resembles
> the preference matrix (0/1) suggesting the interaction. So typically it
> should be between 0-1but if it is negative it means very less
> preference/interaction
>
> *Does all the above sound correct?.*
>
> If yes, then one last question-
>
> 1). *For explicit dataset where we don't use implicitPref=True,* the
> predicted ratings would be actual ratings like it can be 2.3,4.5 etc and
> not the interaction measure. That is because in explicit we are not using
> the confidence matrix and preference matrix concept and use the actual
> rating data. So any output from Spark ALS for explicit data would be a
> rating prediction.
> ᐧ
>
> On Thu, Dec 15, 2016 at 3:46 PM, Sean Owen  wrote:
>
> No, input are weights or strengths. The output is a factorization of the
> binarization of that to 0/1, not probs or a factorization of the input.
> This explains the range of the output.
>
>
> On Thu, Dec 15, 2016, 23:43 Manish Tripathi  wrote:
>
> when you say *implicit ALS *is* factoring the 0/1 matrix. , are you
> saying for implicit feedback algorithm we need to pass the input data as
> the preference matrix i.e a matrix of 0 and 1?. *
>
> Then how will they calculate the confidence matrix which is basically
> =1+alpha*count matrix. If we don't pass the actual count of values (views
> etc) then how does Spark calculates the confidence matrix?.
>
> I was of the understanding that input data for als.fit(implicitPref=True)
> is the actual count matrix of the views/purchases?. Am I going wrong here
> if yes, then how is Spark calculating the confidence matrix if it doesn't
> have the actual count data.
>
> The original paper on which Spark algo is based needs the actual count
> data to create a confidence matrix and also needs the 0/1 matrix since the
> objective functions uses both the confidence matrix and 0/1 matrix to find
> the user and item factors.
> ᐧ
>
> On Thu, Dec 15, 2016 at 3:38 PM, Sean Owen  wrote:
>
> No, you can't interpret the output as probabilities at all. In particular
> they may be negative. It is not predicting rating but interaction. Negative
> means very strongly not predicted to interact. No, implicit ALS *is*
> factoring the 0/1 matrix.
>
> On Thu, Dec 15, 2016, 23:31 Manish Tripathi  wrote:
>
> Ok. So we can kind of interpret the output as probabilities even though it
> is not modeling probabilities. This is to be able to use it for
> binaryclassification evaluator.
>
> So the way I understand is and as per the algo, the predicted matrix is
> basically a dot product of user factor and item factor matrix.
>
> but in what circumstances the ratings predicted can be negative. I can
> understand if the individual user factor vector and item factor vector is
> having negative factor terms, then it can be negative. But practically does
> negative make any sense? AS per algorithm the dot product is the predicted
> rating. So rating shouldnt be negative for it to make any sense. Also
> rating just between 0-1 is normalised rating? Typically rating we expect to
> be like any real value 2.3,4.5 etc.
>
> Also please note, for implicit feedback ALS, we don't feed 0/1 matrix. We
> feed the count matrix (discrete count values) and am assuming spark
> internally converts it into a preference matrix (1/0) and a confidence
> matrix =1+alpha*count_matrix
>
>
>
>
> ᐧ
>
> On Thu, Dec 15, 2016 at 2:56 PM, Sean Owen  wrote:
>
> No, ALS is not modeling probabilities. The outputs are reconstructions of
> a 0/1 matrix. Most values will be in [0,1], but, it's possible to get
> values outside that range.
>
> On Thu, Dec 15, 2016 at 10:21 PM Manish Tripathi 
> wrote:
>
> Hi
>
> ran the ALS model for implicit feedback thing. Then I used the .transform
> method of the model to predict the ratings for the original dataset. My
> dataset is of the form (user,item,rating)
>
> I see something like below:
>
> predictions.show(5,truncate=False)
>
>
> Why is the last prediction value negative ?. Isn't the transform method
> giving the prediction(probability) of seeing the rating as 1?. I had counts
> data for rating (implicit feedback) and for validation dataset I binarized
> the rating (1 if >0 else 0). My training 

What is the deployment model for Spark Streaming? A specific example.

2016-12-16 Thread Russell Jurney
I have created a PySpark Streaming application that uses Spark ML to
classify flight delays into three categories: on-time, slightly late, very
late. After an hour or so something times out and the whole thing crashes.

The code and error are on a gist here:
https://gist.github.com/rjurney/17d471bc98fd1ec925c37d141017640d

While I am interested in why I am getting an exception, I am more
interested in understanding what the correct deployment model is... because
long running processes will have new and varied errors and exceptions.
Right now with what I've built, Spark is a highly dependable distributed
system but in streaming mode the entire thing is dependent on one Python
PID going down. This can't be how apps are deployed in the wild because it
will never be very reliable, right? But I don't see anything about this in
the docs, so I am confused.

Note that I use this to run the app, maybe that is the problem?

ssc.start()
ssc.awaitTermination()


What is the actual deployment model for Spark Streaming? All I know to do
right now is to restart the PID. I'm new to Spark, and the docs don't
really explain this (that I can see).

Thanks!
-- 
Russell Jurney twitter.com/rjurney russell.jur...@gmail.com relato.io


Do we really need mesos or yarn? or is standalone sufficent?

2016-12-16 Thread kant kodali
Do we really need mesos or yarn? or is standalone sufficient for production
systems? I understand the difference but I don't know the capabilities of
standalone cluster. does anyone have experience deploying standalone in the
production?