Re: Is there synchronous way to predict against model for real time data

2016-12-15 Thread vincent gromakowski
Something like that ?
https://spark-summit.org/eu-2015/events/real-time-anomaly-detection-with-spark-ml-and-akka/

Le 16 déc. 2016 1:08 AM, "suyogchoudh...@gmail.com" <
suyogchoudh...@gmail.com> a écrit :

> Hi,
>
> I have question about, how can I real time make decision using a model I
> have created with Spark ML.
>
> 1. I have some data and created model using it.
> // Train the model
>
> val model = new
> LogisticRegressionWithLBFGS().setNumClasses(2).run(trainingData)
>
> 2. I believe, I can use spark streaming to get real time feed and then
> predict result against model created in step1
>
> 3. My question is, how can I do it in synchronous way?
>
> For e.g. lets say if some customer logs in to my site, then according to
> his
> data, I want to personalize his site. I want to send his attributes to
> model
> and get prediction before rendering anything on page.
>
> How can I do this synchronously?
>
> Regards,
>
> Suyog
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Is-there-synchronous-way-to-
> predict-against-model-for-real-time-data-tp28222.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: When will multiple aggregations be supported in Structured Streaming?

2016-12-15 Thread Lawrence Wagerfield
We have a stream of products, each with an ID, and each product has a price 
which may be updated.

We want a running count of the number of products over £30.

Schema: Product(productID: Int, price: Int)

To handle these updates, we currently have…

——

val products = session.readStream.schema(productSchema).csv(productDataPath)
val productsVersioned = products.groupBy(“productId").agg(first(“price”))
val productsOver30 = productsVersioned.filter(“price > 
30”).agg(count(“productId”))
productsOver30.writeStream
        .outputMode("complete")
        .format("console")
        .start()
        .awaitTermination()

——

However, the ‘productsOver30’ part introduces the second aggregation.

On 15 Dec 2016, 22:28 +, Michael Armbrust , wrote:
> What is your use case?
>
> > On Thu, Dec 15, 2016 at 10:43 AM, ljwagerfield 
> >  wrote:
> > > The current version of Spark (2.0.2) only supports one aggregation per
> > > structured stream (and will throw an exception if multiple aggregations 
> > > are
> > > applied).
> > >
> > > Roughly when will Spark support multiple aggregations?
> > >
> > >
> > >
> > > --
> > > View this message in context: 
> > > http://apache-spark-user-list.1001560.n3.nabble.com/When-will-multiple-aggregations-be-supported-in-Structured-Streaming-tp28219.html
> > > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> > >
> > > -
> > > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> > >
>


Re: When will multiple aggregations be supported in Structured Streaming?

2016-12-15 Thread Mattz
I had the same question too. My use case is to take a streaming source and
perform few steps (some aggregations and transformations) and send it to
multiple output sources.

On Fri, Dec 16, 2016 at 3:58 AM, Michael Armbrust 
wrote:

> What is your use case?
>
> On Thu, Dec 15, 2016 at 10:43 AM, ljwagerfield <
> lawre...@dmz.wagerfield.com> wrote:
>
>> The current version of Spark (2.0.2) only supports one aggregation per
>> structured stream (and will throw an exception if multiple aggregations
>> are
>> applied).
>>
>> Roughly when will Spark support multiple aggregations?
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/When-will-multiple-aggregations-be-sup
>> ported-in-Structured-Streaming-tp28219.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Can't use RandomForestClassificationModel.predict(Vector v) in scala

2016-12-15 Thread Patrick Chen
Hi All
I'm writing Java to use spark 2.0 RandomForestClassificationModel.
After I trained the model, I can use below code to predict :

RandomForestClassificationModel rfModel =
RandomForestClassificationModel.load(modelPath);
Vector v =
Vectors.sparse(FeatureIndex.TOTAL_INDEX.getIndex(), indexes, values);
double result = rfModel.predict(v)

But when I changed to scala , I couldn't use predict method in Classifier
anymore.
How should I use the RandomForestClassificationModel in Scala ?

Thanks
BR

Patrick


Re: Spark Dataframe: Save to hdfs is taking long time

2016-12-15 Thread KhajaAsmath Mohammed
I am trying to save the files as Paraquet.

On Thu, Dec 15, 2016 at 10:41 PM, Felix Cheung 
wrote:

> What is the format?
>
>
> --
> *From:* KhajaAsmath Mohammed 
> *Sent:* Thursday, December 15, 2016 7:54:27 PM
> *To:* user @spark
> *Subject:* Spark Dataframe: Save to hdfs is taking long time
>
> Hi,
>
> I am using issue while saving the dataframe back to HDFS. It's taking long
> time to run.
>
> val results_dataframe = sqlContext.sql("select gt.*,ct.* from 
> PredictTempTable pt,ClusterTempTable ct,GamificationTempTable gt where 
> gt.vin=pt.vin and pt.cluster=ct.cluster")
> results_dataframe.coalesce(numPartitions)
> results_dataframe.persist(StorageLevel.MEMORY_AND_DISK)
>
> dataFrame.write.mode(saveMode).format(format)
>   .option(Codec, compressCodec) //"org.apache.hadoop.io.compress.snappyCodec"
>   .save(outputPath)
>
> It was taking long time and total number of records for  this dataframe is 
> 4903764
>
> I even increased number of partitions from 10 to 20, still no luck. Can 
> anyone help me in resolving this performance issue
>
> Thanks,
>
> Asmath
>
>


Re: Spark Dataframe: Save to hdfs is taking long time

2016-12-15 Thread Felix Cheung
What is the format?



From: KhajaAsmath Mohammed 
Sent: Thursday, December 15, 2016 7:54:27 PM
To: user @spark
Subject: Spark Dataframe: Save to hdfs is taking long time

Hi,

I am using issue while saving the dataframe back to HDFS. It's taking long time 
to run.


val results_dataframe = sqlContext.sql("select gt.*,ct.* from PredictTempTable 
pt,ClusterTempTable ct,GamificationTempTable gt where gt.vin=pt.vin and 
pt.cluster=ct.cluster")
results_dataframe.coalesce(numPartitions)
results_dataframe.persist(StorageLevel.MEMORY_AND_DISK)

dataFrame.write.mode(saveMode).format(format)
  .option(Codec, compressCodec) //"org.apache.hadoop.io.compress.snappyCodec"
  .save(outputPath)

It was taking long time and total number of records for  this dataframe is 
4903764

I even increased number of partitions from 10 to 20, still no luck. Can anyone 
help me in resolving this performance issue

Thanks,

Asmath


Re: How to load edge with properties file useing GraphX

2016-12-15 Thread Felix Cheung
Have you checked out https://github.com/graphframes/graphframes?

It might be easier to work with DataFrame.



From: zjp_j...@163.com 
Sent: Thursday, December 15, 2016 7:23:57 PM
To: user
Subject: How to load edge with properties file useing GraphX

Hi,
   I want to load a edge file  and vertex attriInfos file as follow ,how can i 
use these two files create Graph ?
  edge file -> "SrcId,DestId,propertis...  "   vertex attriInfos file-> "VID, 
properties..."

   I learned about there have a GraphLoader object  that can load edge file 
with no properties  and then join Vertex properties to create Graph. So the 
issue is how to then attach edge properties.

   Thanks.


zjp_j...@163.com


Spark Dataframe: Save to hdfs is taking long time

2016-12-15 Thread KhajaAsmath Mohammed
Hi,

I am using issue while saving the dataframe back to HDFS. It's taking long
time to run.

val results_dataframe = sqlContext.sql("select gt.*,ct.* from
PredictTempTable pt,ClusterTempTable ct,GamificationTempTable gt where
gt.vin=pt.vin and pt.cluster=ct.cluster")
results_dataframe.coalesce(numPartitions)
results_dataframe.persist(StorageLevel.MEMORY_AND_DISK)

dataFrame.write.mode(saveMode).format(format)
  .option(Codec, compressCodec) //"org.apache.hadoop.io.compress.snappyCodec"
  .save(outputPath)

It was taking long time and total number of records for  this
dataframe is 4903764

I even increased number of partitions from 10 to 20, still no luck.
Can anyone help me in resolving this performance issue

Thanks,

Asmath


How to load edge with properties file useing GraphX

2016-12-15 Thread zjp_j...@163.com
Hi,
   I want to load a edge file  and vertex attriInfos file as follow ,how can i 
use these two files create Graph ?
  edge file -> "SrcId,DestId,propertis...  "   vertex attriInfos file-> "VID, 
properties..."   
   
   I learned about there have a GraphLoader object  that can load edge file 
with no properties  and then join Vertex properties to create Graph. So the 
issue is how to then attach edge properties.

   Thanks.



zjp_j...@163.com


How to reflect dynamic registration udf?

2016-12-15 Thread 李斌松
How to reflect dynamic registration udf?

java.lang.UnsupportedOperationException: Schema for type _$13 is not
supported
at
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:153)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
at
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
at
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
at org.apache.spark.sql.UDFRegistration.register(UDFRegistration.scala:145)
at
com.alibaba.spark.odps.driver.util.Utils$$anon$1.processMatch(Utils.scala:115)
at
io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec$1.lookForMatches(ScanSpec.java:759)
at
io.github.lukehutch.fastclasspathscanner.scanner.ScanSpec.callMatchProcessors(ScanSpec.java:446)
at
io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:368)
at
io.github.lukehutch.fastclasspathscanner.scanner.Scanner.call(Scanner.java:59)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

final class sparkFunc(val name: String) extends StaticAnnotation{}

def registerFunc(hiveContext: HiveContext): Unit = {
info("register udf function")

val ru = scala.reflect.runtime.universe
val classLoaderMirror = ru.runtimeMirror(getClass.getClassLoader)

new FastClasspathScanner("com.alibaba.spark.odps.driver.functions")
.matchAllClasses(new ClassMatchProcessor() {
override def processMatch(aClass: Class[_]): Unit = {
val classMirror = classLoaderMirror.classSymbol(aClass)
val annotation = classMirror.annotations.find(_.tpe
=:= ru.typeOf[sparkFunc]).getOrElse(null)

try {
if (annotation != null) {
var funcName =
StringUtils.substringBetween(annotation.toString, "\"", "\"")

if (chekClazz(aClass, classOf[Function1[_, _]])) {
val func: Function1[_, _] =
createInstance[Function1[_, _]](aClass).get
hiveContext.udf.register(funcName, func)
} else if (chekClazz(aClass,
classOf[Function2[_, _, _]])) {
val func: Function2[_, _, _] =
createInstance[Function2[_, _, _]](aClass).get
hiveContext.udf.register(funcName, func)
} else if (chekClazz(aClass,
classOf[Function3[_, _, _, _]])) {
val func: Function3[_, _, _, _] =
createInstance[Function3[_, _, _, _]](aClass).get
hiveContext.udf.register(funcName, func)
} else {
throw new RuntimeException("not support function")
}

info("== register function: {}", funcName)
}
} catch {
case e: Exception => error(e.getMessage, e)
}
}
}).scan()
}

private def chekClazz(sClass: Class[_], pClass: Class[_]): Boolean = {
try {
sClass.asSubclass(pClass)
true
} catch {
case e: Exception => false
}
}

private def createInstance[T: ClassTag](clazz: Class[_]): Try[T] = {
Try {
val constructor = clazz.getDeclaredConstructor()
constructor.setAccessible(true)
val obj = constructor.newInstance()
val t = implicitly[ClassTag[T]].runtimeClass
if (t.isInstance(obj)) {
obj.asInstanceOf[T]
} else throw new ClassCastException(clazz.getName + " is not a
subtype of " + t)
} recover {
case i: InvocationTargetException if i.getTargetException ne
null ⇒ throw i.getTargetException
}
}


how to see cpu time of application on yarn mode

2016-12-15 Thread 黄晓萌
Hi,
I run spark job on yarn mode, and want to collect the total cpu time of the 
application.
I can't find cpu time in spark historyserver.
Is there a way to see the cpu time?




--

Thanks,
Xiaomeng

Re: Belief propagation algorithm is open sourced

2016-12-15 Thread Ulanov, Alexander
Hi Bertrand,


We only do inference. We do not do structure or parameter estimation (or 
learning) - that for the MRF would be estimation of the factors, and the 
structure of the graphical model. The parameters can be estimated using maximum 
likelihood if data is available for all the nodes, or by EM if there are hidden 
nodes. We of course don't implement MLE, or EM.


Assuming the model parameters are already available, we can do inference for 
both Bayesian and Markov models.

So to answer the question below, we don't do "learning", we do "inference" 
using BP.

We were using both LibDAI and our own implementation of BP for GraphLab and as 
a reference.


Best regards, Manish Marwah & Alexander


From: Bertrand Dechoux 
Sent: Thursday, December 15, 2016 1:03:49 AM
To: Bryan Cutler
Cc: Ulanov, Alexander; user; dev
Subject: Re: Belief propagation algorithm is open sourced

Nice! I am especially interested in Bayesian Networks, which are only one of 
the many models that can be expressed by a factor graph representation. Do you 
do Bayesian Networks learning at scale (parameters and structure) with latent 
variables? Are you using publicly available tools for that? Which ones?

LibDAI, which created the supported format, "supports parameter learning of 
conditional probability tables by Expectation Maximization" according to the 
documentation. Is it your reference tool?

Bertrand

On Thu, Dec 15, 2016 at 5:21 AM, Bryan Cutler 
> wrote:
I'll check it out, thanks for sharing Alexander!

On Dec 13, 2016 4:58 PM, "Ulanov, Alexander" 
> wrote:

Dear Spark developers and users,


HPE has open sourced the implementation of the belief propagation (BP) 
algorithm for Apache Spark, a popular message passing algorithm for performing 
inference in probabilistic graphical models. It provides exact inference for 
graphical models without loops. While inference for graphical models with loops 
is approximate, in practice it is shown to work well. The implementation is 
generic and operates on factor graph representation of graphical models. It 
handles factors of any order, and variable domains of any size. It is 
implemented with Apache Spark GraphX, and thus can scale to large scale models. 
Further, it supports computations in log scale for numerical stability. Large 
scale applications of BP include fraud detection in banking transactions and 
malicious site detection in computer networks.


Source code: https://github.com/HewlettPackard/sandpiper


Best regards, Alexander



Is there synchronous way to predict against model for real time data

2016-12-15 Thread suyogchoudh...@gmail.com
Hi,

I have question about, how can I real time make decision using a model I
have created with Spark ML.

1. I have some data and created model using it.
// Train the model

val model = new
LogisticRegressionWithLBFGS().setNumClasses(2).run(trainingData)

2. I believe, I can use spark streaming to get real time feed and then
predict result against model created in step1

3. My question is, how can I do it in synchronous way? 

For e.g. lets say if some customer logs in to my site, then according to his
data, I want to personalize his site. I want to send his attributes to model
and get prediction before rendering anything on page.

How can I do this synchronously?

Regards,

Suyog



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-there-synchronous-way-to-predict-against-model-for-real-time-data-tp28222.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Why foreachPartition function make duplicate invocation to map function for every message ? (Spark 2.0.2)

2016-12-15 Thread Michael Nguyen
I have the following sequence of Spark Java API calls (Spark 2.0.2):

   1. Kafka stream that is processed via a map function, which returns the
   string value from tuple2._2() for JavaDStream as in

return tuple2._2();

   1.

   The returned JavaDStream is then processed by foreachPartition, which is
   wrapped by foreachRDD.
   2.

   foreachPartition's call function does Iterator on the RDD as in
   inputRDD.next ();

When data is received, step 1 is executed, which is correct. However,
inputRDD.next () in step 3 makes a duplicate call to the map function in
step 1. So that map function is called twice for every message:

-  the first time when the message is received from the Kafka stream, and

- the second time when Iterator inputParams.next () is invoked from
foreachPartition's call function.

I also tried transforming the data in the map function as in

public TestTransformedClass call(Tuple2  tuple2) for step 1

public void call(Iterator  inputParams) for step 3

and the same issue occurs. So this issue occurs, no matter whether this
sequence of Spark API calls involves data transformation or not.

Questions:

   1.

   Since the message was already processed in step 1, why does
   inputRDD.next () in step 3 makes a duplicate call to the map function in
   step 1 ?
   2.

   How do I fix it to avoid duplicate invocation for every message ?

Thanks.


Re: Negative values of predictions in ALS.tranform

2016-12-15 Thread Manish Tripathi
ok. Thanks. So here is what I understood.

Input data to Als.fit(implicitPrefs=True) is the actual strengths (count
data). So if I have a matrix of (user,item,views/purchases) I pass that as
the input and not the binarized one (preference). This signifies the
strength.

2) Since we also pass the alpha parameter to this Als.fit() method, Spark
internally creates the confidence matrix +1+alpha*input_data or some other
alpha factor.

3). The output which it gives is basically a factorization of 0/1 matrix
(binarized matrix from initial input data), hence the output also resembles
the preference matrix (0/1) suggesting the interaction. So typically it
should be between 0-1but if it is negative it means very less
preference/interaction

*Does all the above sound correct?.*

If yes, then one last question-

1). *For explicit dataset where we don't use implicitPref=True,* the
predicted ratings would be actual ratings like it can be 2.3,4.5 etc and
not the interaction measure. That is because in explicit we are not using
the confidence matrix and preference matrix concept and use the actual
rating data. So any output from Spark ALS for explicit data would be a
rating prediction.
ᐧ

On Thu, Dec 15, 2016 at 3:46 PM, Sean Owen  wrote:

> No, input are weights or strengths. The output is a factorization of the
> binarization of that to 0/1, not probs or a factorization of the input.
> This explains the range of the output.
>
>
> On Thu, Dec 15, 2016, 23:43 Manish Tripathi  wrote:
>
>> when you say *implicit ALS *is* factoring the 0/1 matrix. , are you
>> saying for implicit feedback algorithm we need to pass the input data as
>> the preference matrix i.e a matrix of 0 and 1?. *
>>
>> Then how will they calculate the confidence matrix which is basically
>> =1+alpha*count matrix. If we don't pass the actual count of values (views
>> etc) then how does Spark calculates the confidence matrix?.
>>
>> I was of the understanding that input data for als.fit(implicitPref=True)
>> is the actual count matrix of the views/purchases?. Am I going wrong here
>> if yes, then how is Spark calculating the confidence matrix if it doesn't
>> have the actual count data.
>>
>> The original paper on which Spark algo is based needs the actual count
>> data to create a confidence matrix and also needs the 0/1 matrix since the
>> objective functions uses both the confidence matrix and 0/1 matrix to find
>> the user and item factors.
>> ᐧ
>>
>> On Thu, Dec 15, 2016 at 3:38 PM, Sean Owen  wrote:
>>
>> No, you can't interpret the output as probabilities at all. In particular
>> they may be negative. It is not predicting rating but interaction. Negative
>> means very strongly not predicted to interact. No, implicit ALS *is*
>> factoring the 0/1 matrix.
>>
>> On Thu, Dec 15, 2016, 23:31 Manish Tripathi  wrote:
>>
>> Ok. So we can kind of interpret the output as probabilities even though
>> it is not modeling probabilities. This is to be able to use it for
>> binaryclassification evaluator.
>>
>> So the way I understand is and as per the algo, the predicted matrix is
>> basically a dot product of user factor and item factor matrix.
>>
>> but in what circumstances the ratings predicted can be negative. I can
>> understand if the individual user factor vector and item factor vector is
>> having negative factor terms, then it can be negative. But practically does
>> negative make any sense? AS per algorithm the dot product is the predicted
>> rating. So rating shouldnt be negative for it to make any sense. Also
>> rating just between 0-1 is normalised rating? Typically rating we expect to
>> be like any real value 2.3,4.5 etc.
>>
>> Also please note, for implicit feedback ALS, we don't feed 0/1 matrix. We
>> feed the count matrix (discrete count values) and am assuming spark
>> internally converts it into a preference matrix (1/0) and a confidence
>> matrix =1+alpha*count_matrix
>>
>>
>>
>>
>> ᐧ
>>
>> On Thu, Dec 15, 2016 at 2:56 PM, Sean Owen  wrote:
>>
>> No, ALS is not modeling probabilities. The outputs are reconstructions of
>> a 0/1 matrix. Most values will be in [0,1], but, it's possible to get
>> values outside that range.
>>
>> On Thu, Dec 15, 2016 at 10:21 PM Manish Tripathi 
>> wrote:
>>
>> Hi
>>
>> ran the ALS model for implicit feedback thing. Then I used the .transform
>> method of the model to predict the ratings for the original dataset. My
>> dataset is of the form (user,item,rating)
>>
>> I see something like below:
>>
>> predictions.show(5,truncate=False)
>>
>>
>> Why is the last prediction value negative ?. Isn't the transform method
>> giving the prediction(probability) of seeing the rating as 1?. I had counts
>> data for rating (implicit feedback) and for validation dataset I binarized
>> the rating (1 if >0 else 0). My training data has rating positive (it's
>> basically the count of views to a video).
>>
>> 

Re: Negative values of predictions in ALS.tranform

2016-12-15 Thread Sean Owen
No, input are weights or strengths. The output is a factorization of the
binarization of that to 0/1, not probs or a factorization of the input.
This explains the range of the output.

On Thu, Dec 15, 2016, 23:43 Manish Tripathi  wrote:

> when you say *implicit ALS *is* factoring the 0/1 matrix. , are you
> saying for implicit feedback algorithm we need to pass the input data as
> the preference matrix i.e a matrix of 0 and 1?. *
>
> Then how will they calculate the confidence matrix which is basically
> =1+alpha*count matrix. If we don't pass the actual count of values (views
> etc) then how does Spark calculates the confidence matrix?.
>
> I was of the understanding that input data for als.fit(implicitPref=True)
> is the actual count matrix of the views/purchases?. Am I going wrong here
> if yes, then how is Spark calculating the confidence matrix if it doesn't
> have the actual count data.
>
> The original paper on which Spark algo is based needs the actual count
> data to create a confidence matrix and also needs the 0/1 matrix since the
> objective functions uses both the confidence matrix and 0/1 matrix to find
> the user and item factors.
> ᐧ
>
> On Thu, Dec 15, 2016 at 3:38 PM, Sean Owen  wrote:
>
> No, you can't interpret the output as probabilities at all. In particular
> they may be negative. It is not predicting rating but interaction. Negative
> means very strongly not predicted to interact. No, implicit ALS *is*
> factoring the 0/1 matrix.
>
> On Thu, Dec 15, 2016, 23:31 Manish Tripathi  wrote:
>
> Ok. So we can kind of interpret the output as probabilities even though it
> is not modeling probabilities. This is to be able to use it for
> binaryclassification evaluator.
>
> So the way I understand is and as per the algo, the predicted matrix is
> basically a dot product of user factor and item factor matrix.
>
> but in what circumstances the ratings predicted can be negative. I can
> understand if the individual user factor vector and item factor vector is
> having negative factor terms, then it can be negative. But practically does
> negative make any sense? AS per algorithm the dot product is the predicted
> rating. So rating shouldnt be negative for it to make any sense. Also
> rating just between 0-1 is normalised rating? Typically rating we expect to
> be like any real value 2.3,4.5 etc.
>
> Also please note, for implicit feedback ALS, we don't feed 0/1 matrix. We
> feed the count matrix (discrete count values) and am assuming spark
> internally converts it into a preference matrix (1/0) and a confidence
> matrix =1+alpha*count_matrix
>
>
>
>
> ᐧ
>
> On Thu, Dec 15, 2016 at 2:56 PM, Sean Owen  wrote:
>
> No, ALS is not modeling probabilities. The outputs are reconstructions of
> a 0/1 matrix. Most values will be in [0,1], but, it's possible to get
> values outside that range.
>
> On Thu, Dec 15, 2016 at 10:21 PM Manish Tripathi 
> wrote:
>
> Hi
>
> ran the ALS model for implicit feedback thing. Then I used the .transform
> method of the model to predict the ratings for the original dataset. My
> dataset is of the form (user,item,rating)
>
> I see something like below:
>
> predictions.show(5,truncate=False)
>
>
> Why is the last prediction value negative ?. Isn't the transform method
> giving the prediction(probability) of seeing the rating as 1?. I had counts
> data for rating (implicit feedback) and for validation dataset I binarized
> the rating (1 if >0 else 0). My training data has rating positive (it's
> basically the count of views to a video).
>
> I used following to train:
>
> * als = ALS(rank=x, maxIter=15, regParam=y, implicitPrefs=True,alpha=40.0)*
>
> *model=als.fit(self.train)*
>
> What does negative prediction mean here and is it ok to have that?
> ᐧ
>
>
>
>


Re: Dataset encoders for further types?

2016-12-15 Thread Michael Armbrust
I would have sworn there was a ticket, but I can't find it.  So here you
go: https://issues.apache.org/jira/browse/SPARK-18891

A work around until that is fixed would be for you to manually specify the kryo
encoder

.

On Thu, Dec 15, 2016 at 8:18 AM, Jakub Dubovsky <
spark.dubovsky.ja...@gmail.com> wrote:

> Hey,
>
> I want to ask whether there is any roadmap/plan for adding Encoders for
> further types in next releases of Spark. Here is a list
>  of
> currently supported types. We would like to use Datasets with our
> internally defined case classes containing scala.collection.immutable.List(s).
> This does not work now because these lists are converted to ArrayType
> (Seq). This then fails a constructor lookup because of seq-is-not-a-list
> error...
>
> This means that for now we are stuck with using RDDs.
>
> Thanks for any insights!
>
> Jakub Dubovsky
>
>


Re: Negative values of predictions in ALS.tranform

2016-12-15 Thread Manish Tripathi
when you say *implicit ALS *is* factoring the 0/1 matrix. , are you saying
for implicit feedback algorithm we need to pass the input data as the
preference matrix i.e a matrix of 0 and 1?. *

Then how will they calculate the confidence matrix which is basically
=1+alpha*count matrix. If we don't pass the actual count of values (views
etc) then how does Spark calculates the confidence matrix?.

I was of the understanding that input data for als.fit(implicitPref=True)
is the actual count matrix of the views/purchases?. Am I going wrong here
if yes, then how is Spark calculating the confidence matrix if it doesn't
have the actual count data.

The original paper on which Spark algo is based needs the actual count data
to create a confidence matrix and also needs the 0/1 matrix since the
objective functions uses both the confidence matrix and 0/1 matrix to find
the user and item factors.
ᐧ

On Thu, Dec 15, 2016 at 3:38 PM, Sean Owen  wrote:

> No, you can't interpret the output as probabilities at all. In particular
> they may be negative. It is not predicting rating but interaction. Negative
> means very strongly not predicted to interact. No, implicit ALS *is*
> factoring the 0/1 matrix.
>
> On Thu, Dec 15, 2016, 23:31 Manish Tripathi  wrote:
>
>> Ok. So we can kind of interpret the output as probabilities even though
>> it is not modeling probabilities. This is to be able to use it for
>> binaryclassification evaluator.
>>
>> So the way I understand is and as per the algo, the predicted matrix is
>> basically a dot product of user factor and item factor matrix.
>>
>> but in what circumstances the ratings predicted can be negative. I can
>> understand if the individual user factor vector and item factor vector is
>> having negative factor terms, then it can be negative. But practically does
>> negative make any sense? AS per algorithm the dot product is the predicted
>> rating. So rating shouldnt be negative for it to make any sense. Also
>> rating just between 0-1 is normalised rating? Typically rating we expect to
>> be like any real value 2.3,4.5 etc.
>>
>> Also please note, for implicit feedback ALS, we don't feed 0/1 matrix. We
>> feed the count matrix (discrete count values) and am assuming spark
>> internally converts it into a preference matrix (1/0) and a confidence
>> matrix =1+alpha*count_matrix
>>
>>
>>
>>
>> ᐧ
>>
>> On Thu, Dec 15, 2016 at 2:56 PM, Sean Owen  wrote:
>>
>> No, ALS is not modeling probabilities. The outputs are reconstructions of
>> a 0/1 matrix. Most values will be in [0,1], but, it's possible to get
>> values outside that range.
>>
>> On Thu, Dec 15, 2016 at 10:21 PM Manish Tripathi 
>> wrote:
>>
>> Hi
>>
>> ran the ALS model for implicit feedback thing. Then I used the .transform
>> method of the model to predict the ratings for the original dataset. My
>> dataset is of the form (user,item,rating)
>>
>> I see something like below:
>>
>> predictions.show(5,truncate=False)
>>
>>
>> Why is the last prediction value negative ?. Isn't the transform method
>> giving the prediction(probability) of seeing the rating as 1?. I had counts
>> data for rating (implicit feedback) and for validation dataset I binarized
>> the rating (1 if >0 else 0). My training data has rating positive (it's
>> basically the count of views to a video).
>>
>> I used following to train:
>>
>> * als = ALS(rank=x, maxIter=15, regParam=y,
>> implicitPrefs=True,alpha=40.0)*
>>
>> *model=als.fit(self.train)*
>>
>> What does negative prediction mean here and is it ok to have that?
>> ᐧ
>>
>>
>>


Re: spark reshape hive table and save to parquet

2016-12-15 Thread Anton Kravchenko
Hi Divya,

Thanks, it is exactly what I am looking for!

Anton

On Wed, Dec 14, 2016 at 6:01 PM, Divya Gehlot 
wrote:

> you can use udfs to do it
> http://stackoverflow.com/questions/31615657/how-to-add-
> a-new-struct-column-to-a-dataframe
>
> Hope it will help.
>
>
> Thanks,
> Divya
>
> On 9 December 2016 at 00:53, Anton Kravchenko <
> kravchenko.anto...@gmail.com> wrote:
>
>> Hello,
>>
>> I wonder if there is a way (preferably efficient) in Spark to reshape
>> hive table and save it to parquet.
>>
>> Here is a minimal example, input hive table:
>> col1 col2 col3
>> 1 2 3
>> 4 5 6
>>
>> output parquet:
>> col1 newcol2
>> 1 [2 3]
>> 4 [5 6]
>>
>> p.s. The real input hive table has ~1000 columns.
>>
>> Thank you,
>> Anton
>>
>
>


Re: Negative values of predictions in ALS.tranform

2016-12-15 Thread Sean Owen
No, you can't interpret the output as probabilities at all. In particular
they may be negative. It is not predicting rating but interaction. Negative
means very strongly not predicted to interact. No, implicit ALS *is*
factoring the 0/1 matrix.

On Thu, Dec 15, 2016, 23:31 Manish Tripathi  wrote:

> Ok. So we can kind of interpret the output as probabilities even though it
> is not modeling probabilities. This is to be able to use it for
> binaryclassification evaluator.
>
> So the way I understand is and as per the algo, the predicted matrix is
> basically a dot product of user factor and item factor matrix.
>
> but in what circumstances the ratings predicted can be negative. I can
> understand if the individual user factor vector and item factor vector is
> having negative factor terms, then it can be negative. But practically does
> negative make any sense? AS per algorithm the dot product is the predicted
> rating. So rating shouldnt be negative for it to make any sense. Also
> rating just between 0-1 is normalised rating? Typically rating we expect to
> be like any real value 2.3,4.5 etc.
>
> Also please note, for implicit feedback ALS, we don't feed 0/1 matrix. We
> feed the count matrix (discrete count values) and am assuming spark
> internally converts it into a preference matrix (1/0) and a confidence
> matrix =1+alpha*count_matrix
>
>
>
>
> ᐧ
>
> On Thu, Dec 15, 2016 at 2:56 PM, Sean Owen  wrote:
>
> No, ALS is not modeling probabilities. The outputs are reconstructions of
> a 0/1 matrix. Most values will be in [0,1], but, it's possible to get
> values outside that range.
>
> On Thu, Dec 15, 2016 at 10:21 PM Manish Tripathi 
> wrote:
>
> Hi
>
> ran the ALS model for implicit feedback thing. Then I used the .transform
> method of the model to predict the ratings for the original dataset. My
> dataset is of the form (user,item,rating)
>
> I see something like below:
>
> predictions.show(5,truncate=False)
>
>
> Why is the last prediction value negative ?. Isn't the transform method
> giving the prediction(probability) of seeing the rating as 1?. I had counts
> data for rating (implicit feedback) and for validation dataset I binarized
> the rating (1 if >0 else 0). My training data has rating positive (it's
> basically the count of views to a video).
>
> I used following to train:
>
> * als = ALS(rank=x, maxIter=15, regParam=y, implicitPrefs=True,alpha=40.0)*
>
> *model=als.fit(self.train)*
>
> What does negative prediction mean here and is it ok to have that?
> ᐧ
>
>
>


Re: Negative values of predictions in ALS.tranform

2016-12-15 Thread Manish Tripathi
Ok. So we can kind of interpret the output as probabilities even though it
is not modeling probabilities. This is to be able to use it for
binaryclassification evaluator.

So the way I understand is and as per the algo, the predicted matrix is
basically a dot product of user factor and item factor matrix.

but in what circumstances the ratings predicted can be negative. I can
understand if the individual user factor vector and item factor vector is
having negative factor terms, then it can be negative. But practically does
negative make any sense? AS per algorithm the dot product is the predicted
rating. So rating shouldnt be negative for it to make any sense. Also
rating just between 0-1 is normalised rating? Typically rating we expect to
be like any real value 2.3,4.5 etc.

Also please note, for implicit feedback ALS, we don't feed 0/1 matrix. We
feed the count matrix (discrete count values) and am assuming spark
internally converts it into a preference matrix (1/0) and a confidence
matrix =1+alpha*count_matrix




ᐧ

On Thu, Dec 15, 2016 at 2:56 PM, Sean Owen  wrote:

> No, ALS is not modeling probabilities. The outputs are reconstructions of
> a 0/1 matrix. Most values will be in [0,1], but, it's possible to get
> values outside that range.
>
> On Thu, Dec 15, 2016 at 10:21 PM Manish Tripathi 
> wrote:
>
>> Hi
>>
>> ran the ALS model for implicit feedback thing. Then I used the .transform
>> method of the model to predict the ratings for the original dataset. My
>> dataset is of the form (user,item,rating)
>>
>> I see something like below:
>>
>> predictions.show(5,truncate=False)
>>
>>
>> Why is the last prediction value negative ?. Isn't the transform method
>> giving the prediction(probability) of seeing the rating as 1?. I had counts
>> data for rating (implicit feedback) and for validation dataset I binarized
>> the rating (1 if >0 else 0). My training data has rating positive (it's
>> basically the count of views to a video).
>>
>> I used following to train:
>>
>> * als = ALS(rank=x, maxIter=15, regParam=y,
>> implicitPrefs=True,alpha=40.0)*
>>
>> *model=als.fit(self.train)*
>>
>> What does negative prediction mean here and is it ok to have that?
>> ᐧ
>>
>


Financial fraud detection using streaming RDBMS data into Spark & Hbase

2016-12-15 Thread Mich Talebzadeh
I am not talking about Credit Card fraud etc.

In the complex fraud cases like that one in UBS
 , the rogue
trader over a period of time manipulated the figures. Although there is a
lot of talk about using elaborate set-ups to predict unusual behaviour etc,
in my opinion it all boils down how the figures are manipulated at database
level!

Bottom line money has to leave the account and paid out to be real money so
to speak. In most cases these type of fraud happens because someone is
pretty familiar with the front office and settlement work but ignorant of
how a transactional database works.

In short a transactional DB only keeps the latest updates. However, it is
now possible to get the DML data out of the log by use of replication
technologies (I am not talking about simple CDC). If we start sending
replicated transactional logs (as SQL statements) of out of database for
all updates and store them in Hbase, then one can go through the Hbase data
with Spark.

In the past this was prohibitive using database audit as it had heavy price
on the RDBMS performance. However, with Big Data this can be done through
the time series/immutable inserts (Updates are simply flagged as updates
and time stamped) as a new row in Hbase.

I thought about it a bit and I think it can provide results far quicker
and better than searching a needle in a haystack these days with the stuff
promoted by various companies?

Your thoughts?


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


Re: How to control saveAsTable() warehouse path?

2016-12-15 Thread epettijohn
I don't profess to be an expert on this, but I did face the same problem.  A
couple of possibilities:

1.  If your default Hive database is stored in "/tmp/hive/warehouse/...",
then that could be the issue.  I recommend creating a database on s3a and
then storing the table there ( .saveAsTable('s3db.test_table',... )

2.  Have you tried setting spark.sql.warehouse.dir?  I know the docs say
that it uses the hive metastore dir first, but I've had some luck with
setting that one.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-control-saveAsTable-warehouse-path-tp23340p28221.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark on yarn can't load kafka dependency jar

2016-12-15 Thread Mich Talebzadeh
try this it should work and yes they are comma separated

spark-streaming-kafka_2.10-1.5.1.jar

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 December 2016 at 22:49, neil90  wrote:

> Don't the jars need to be comma sperated when you pass?
>
> i.e. --jars "hdfs://zzz:8020/jars/kafka_2.10-0.8.2.2.jar",
> /opt/bigdevProject/sparkStreaming_jar4/sparkStreaming.jar
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-on-yarn-can-t-load-kafka-
> dependency-jar-tp28216p28220.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Negative values of predictions in ALS.tranform

2016-12-15 Thread Sean Owen
No, ALS is not modeling probabilities. The outputs are reconstructions of a
0/1 matrix. Most values will be in [0,1], but, it's possible to get values
outside that range.

On Thu, Dec 15, 2016 at 10:21 PM Manish Tripathi 
wrote:

> Hi
>
> ran the ALS model for implicit feedback thing. Then I used the .transform
> method of the model to predict the ratings for the original dataset. My
> dataset is of the form (user,item,rating)
>
> I see something like below:
>
> predictions.show(5,truncate=False)
>
>
> Why is the last prediction value negative ?. Isn't the transform method
> giving the prediction(probability) of seeing the rating as 1?. I had counts
> data for rating (implicit feedback) and for validation dataset I binarized
> the rating (1 if >0 else 0). My training data has rating positive (it's
> basically the count of views to a video).
>
> I used following to train:
>
> * als = ALS(rank=x, maxIter=15, regParam=y, implicitPrefs=True,alpha=40.0)*
>
> *model=als.fit(self.train)*
>
> What does negative prediction mean here and is it ok to have that?
> ᐧ
>


Re: spark on yarn can't load kafka dependency jar

2016-12-15 Thread neil90
Don't the jars need to be comma sperated when you pass?

i.e. --jars "hdfs://zzz:8020/jars/kafka_2.10-0.8.2.2.jar",
/opt/bigdevProject/sparkStreaming_jar4/sparkStreaming.jar 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-on-yarn-can-t-load-kafka-dependency-jar-tp28216p28220.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark Batch checkpoint

2016-12-15 Thread Selvam Raman
I am using java. I will try and let u know.
On Dec 15, 2016 8:45 PM, "Irving Duran"  wrote:

> Not sure what programming language you are using, but in python you can do
> "sc.setCheckpointDir('~/apps/spark-2.0.1-bin-hadoop2.7/checkpoint/')".
> This will store checkpoints on that directory that I called checkpoint.
>
>
> Thank You,
>
> Irving Duran
>
> On Thu, Dec 15, 2016 at 10:33 AM, Selvam Raman  wrote:
>
>> Hi,
>>
>> is there any provision in spark batch for checkpoint.
>>
>> I am having huge data, it takes more than 3 hours to process all data. I
>> am currently having 100 partitions.
>>
>> if the job fails after two hours, lets say it has processed 70 partition.
>> should i start spark job from the beginning or is there way for checkpoint
>> provision.
>>
>> Checkpoint,what i am expecting is start from 71 partition to till end.
>>
>> Please give me your suggestions.
>>
>> --
>> Selvam Raman
>> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>>
>
>


Re: Cached Tables SQL Performance Worse than Uncached

2016-12-15 Thread Mich Talebzadeh
How many tables are involved in the SQL join and how do you cache them?

If you do unpersist on the DF(s) and run the same SQL query (the same
sesiion)  what do you see with explain?

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 December 2016 at 22:27, Mich Talebzadeh 
wrote:

> How many tables are involved in the SQL join and how do you cache them?
>
> If you do unpersist on the DF and run the sdame
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 15 December 2016 at 22:14, Warren Kim 
> wrote:
>
>> Playing with TPC-H and comparing performance between cached (serialized
>> in-memory tables) and uncached (DF from parquet) results in various
>> SQL queries performing much worse, duration-wise.
>>
>>
>> I see some physical plans have an extra layer of shuffle/sort/merge under
>> cached scenario.
>>
>>
>> I could do some filtering by key to optimize, but I'm just curious as to
>> why out-of-the-box planning is more complex and slower when tables are
>> cached to mem.
>>
>>
>> Thanks!
>>
>
>


Re: When will multiple aggregations be supported in Structured Streaming?

2016-12-15 Thread Michael Armbrust
What is your use case?

On Thu, Dec 15, 2016 at 10:43 AM, ljwagerfield 
wrote:

> The current version of Spark (2.0.2) only supports one aggregation per
> structured stream (and will throw an exception if multiple aggregations are
> applied).
>
> Roughly when will Spark support multiple aggregations?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/When-will-multiple-aggregations-be-
> supported-in-Structured-Streaming-tp28219.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [DataFrames] map function - 2.0

2016-12-15 Thread Michael Armbrust
Experimental in Spark really just means that we are not promising binary
compatibly for those functions in the 2.x release line.  For Datasets in
particular, we want a few releases to make sure the APIs don't have any
major gaps before removing the experimental tag.

On Thu, Dec 15, 2016 at 1:17 PM, Ninad Shringarpure 
wrote:

> Hi Team,
>
> When going through Dataset class for Spark 2.0 it comes across that both
> overloaded map functions with encoder and without are marked as
> experimental.
>
> Is there a reason and issues that developers whould be aware of when using
> this for production applications. Also is there a "non-experimental" way of
> using map function on Dataframe in Spark 2.0
>
> Thanks,
> Ninad
>


Re: Cached Tables SQL Performance Worse than Uncached

2016-12-15 Thread Mich Talebzadeh
How many tables are involved in the SQL join and how do you cache them?

If you do unpersist on the DF and run the sdame

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 15 December 2016 at 22:14, Warren Kim 
wrote:

> Playing with TPC-H and comparing performance between cached (serialized
> in-memory tables) and uncached (DF from parquet) results in various
> SQL queries performing much worse, duration-wise.
>
>
> I see some physical plans have an extra layer of shuffle/sort/merge under
> cached scenario.
>
>
> I could do some filtering by key to optimize, but I'm just curious as to
> why out-of-the-box planning is more complex and slower when tables are
> cached to mem.
>
>
> Thanks!
>


Re: Cached Tables SQL Performance Worse than Uncached

2016-12-15 Thread Michael Armbrust
Its hard to comment on performance without seeing query plans.  I'd suggest
posting the result of an explain.

On Thu, Dec 15, 2016 at 2:14 PM, Warren Kim 
wrote:

> Playing with TPC-H and comparing performance between cached (serialized
> in-memory tables) and uncached (DF from parquet) results in various
> SQL queries performing much worse, duration-wise.
>
>
> I see some physical plans have an extra layer of shuffle/sort/merge under
> cached scenario.
>
>
> I could do some filtering by key to optimize, but I'm just curious as to
> why out-of-the-box planning is more complex and slower when tables are
> cached to mem.
>
>
> Thanks!
>


Negative values of predictions in ALS.tranform

2016-12-15 Thread Manish Tripathi
Hi

ran the ALS model for implicit feedback thing. Then I used the .transform
method of the model to predict the ratings for the original dataset. My
dataset is of the form (user,item,rating)

I see something like below:

predictions.show(5,truncate=False)


Why is the last prediction value negative ?. Isn't the transform method
giving the prediction(probability) of seeing the rating as 1?. I had counts
data for rating (implicit feedback) and for validation dataset I binarized
the rating (1 if >0 else 0). My training data has rating positive (it's
basically the count of views to a video).

I used following to train:

* als = ALS(rank=x, maxIter=15, regParam=y, implicitPrefs=True,alpha=40.0)*

*model=als.fit(self.train)*

What does negative prediction mean here and is it ok to have that?
ᐧ


Cached Tables SQL Performance Worse than Uncached

2016-12-15 Thread Warren Kim
Playing with TPC-H and comparing performance between cached (serialized 
in-memory tables) and uncached (DF from parquet) results in various SQL queries 
performing much worse, duration-wise.


I see some physical plans have an extra layer of shuffle/sort/merge under 
cached scenario.


I could do some filtering by key to optimize, but I'm just curious as to why 
out-of-the-box planning is more complex and slower when tables are cached to 
mem.


Thanks!


PowerIterationClustering Benchmark

2016-12-15 Thread Lydia Ickler
Hi all,

I have a question regarding the PowerIterationClusteringExample.
I have adjusted the code so that it reads a file via 
„sc.textFile(„path/to/input“)“ which works fine.

Now I wanted to benchmark the algorithm using different number of nodes to see 
how well the implementation scales. As a testbed I have up to 32 nodes 
available, each with 16 cores and Spark 2.0.2 on Yarn running.
For my smallest input data set (16MB) the runtime does not really change if I 
use 1,2,4,8,16 or 32 nodes. (always ~ 1.5 minute)
Same behavior for my largest data set (2.3GB). The runtime stays around 1h if I 
use 16 or if I use 32 nodes.

I was expecting that when I e.g. double the number of nodes the runtime would 
shrink. 
As for setting up my cluster environment I tried different suggestions from 
this paper https://hal.inria.fr/hal-01347638v1/document 


Has someone experienced the same? Or has someone suggestions what might went 
wrong?

Thanks in advance!
Lydia




Re: Is restarting of SparkContext allowed?

2016-12-15 Thread Marcelo Vanzin
(-dev, +user. dev is for Spark development, not for questions about
using Spark.)

You haven't posted code here or the actual error. But you might be
running into SPARK-15754. Or into other issues with yarn-client mode
and "--principal / --keytab" (those have known issues in client mode).

If you have the above fix, you should be able to run the SparkContext
in client mode inside a UGI.doAs() block, after you login the user,
and later stop the context and start a new one. (And don't use
"--principal" / "--keytab" in that case.)


On Thu, Dec 15, 2016 at 1:46 PM, Alexey Klimov  wrote:
> Hello, my question is the continuation of a problem I described  here
> 
> .
>
> I've done some investigation and found out that nameNode.getDelegationToken
> is called during constructing SparkContext even if delegation token is
> already presented in token list of current logged user in object of
> UserGroupInforation class. The problem doesn't occur when waiting time
> before constructing a new context is less than 10 seconds, because rpc
> connection to namenode just isn't resetting because of
> ipc.client.connection.maxidletime property.
>
> As a workaround of this problem I do login from keytab before every
> constructing of SparkContext, which basically just resets token list of
> current logged user (as well as whole user structure) and the problem seems
> to be gone. Still I'm not really sure that it is correct way to deal with
> SparkContext.
>
> Having found a reason of the problem, I've got 2 assumptions now:
> First - SparkContext was designed to be restarted during JVM run and
> behaviour above is just a bug.
> Second - it wasn't and I'm just using SparkContext in a wrong manner.
>
> Since I haven't found any related bug in Jira and any solution on the
> internet (as well as too many users facing this error) I tend to think that
> it is rather a not allowed usage of SparkContext.
>
> Is that correct?
>
>
>
> --
> View this message in context: 
> http://apache-spark-developers-list.1001551.n3.nabble.com/Is-restarting-of-SparkContext-allowed-tp20240.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>



-- 
Marcelo

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Output Side Effects for different chain of operations

2016-12-15 Thread Chawla,Sumit
I am already creating these files on slave.  How can i create an RDD from
these slaves?

Regards
Sumit Chawla


On Thu, Dec 15, 2016 at 11:42 AM, Reynold Xin  wrote:

> You can just write some files out directly (and idempotently) in your
> map/mapPartitions functions. It is just a function that you can run
> arbitrary code after all.
>
>
> On Thu, Dec 15, 2016 at 11:33 AM, Chawla,Sumit 
> wrote:
>
>> Any suggestions on this one?
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Tue, Dec 13, 2016 at 8:31 AM, Chawla,Sumit 
>> wrote:
>>
>>> Hi All
>>>
>>> I have a workflow with different steps in my program. Lets say these are
>>> steps A, B, C, D.  Step B produces some temp files on each executor node.
>>> How can i add another step E which consumes these files?
>>>
>>> I understand the easiest choice is  to copy all these temp files to any
>>> shared location, and then step E can create another RDD from it and work on
>>> that.  But i am trying to avoid this copy.  I was wondering if there is any
>>> way i can queue up these files for E as they are getting generated on
>>> executors.  Is there any possibility of creating a dummy RDD in start of
>>> program, and then push these files into this RDD from each executor.
>>>
>>> I take my inspiration from the concept of Side Outputs in Google
>>> Dataflow:
>>>
>>> https://cloud.google.com/dataflow/model/par-do#emitting-to-s
>>> ide-outputs-in-your-dofn
>>>
>>>
>>>
>>> Regards
>>> Sumit Chawla
>>>
>>>
>>
>


Is there synchronous way to predict against model for real time data

2016-12-15 Thread suyog choudhari
Hi,

I have question about, how can I real time make decision using a model I
have created with Spark ML.

1. I have some data and created model using it.

// Train the model

val model = new LogisticRegressionWithLBFGS().setNumClasses(2).run(
trainingData)

2. I believe, I can use spark streaming to get real time feed and then
predict result against model created in step1

3. My question is, how can I do it in synchronous way?

For e.g. lets say if some customer logs in to my site, then according to
his data, I want to personalize his site. I want to send his attributes to
model and get prediction before rendering anything on page.

How can I do this synchronously?

Regards,

Suyog


[DataFrames] map function - 2.0

2016-12-15 Thread Ninad Shringarpure
Hi Team,

When going through Dataset class for Spark 2.0 it comes across that both
overloaded map functions with encoder and without are marked as
experimental.

Is there a reason and issues that developers whould be aware of when using
this for production applications. Also is there a "non-experimental" way of
using map function on Dataframe in Spark 2.0

Thanks,
Ninad


Re: Spark Batch checkpoint

2016-12-15 Thread Irving Duran
Not sure what programming language you are using, but in python you can do "
sc.setCheckpointDir('~/apps/spark-2.0.1-bin-hadoop2.7/checkpoint/')".  This
will store checkpoints on that directory that I called checkpoint.


Thank You,

Irving Duran

On Thu, Dec 15, 2016 at 10:33 AM, Selvam Raman  wrote:

> Hi,
>
> is there any provision in spark batch for checkpoint.
>
> I am having huge data, it takes more than 3 hours to process all data. I
> am currently having 100 partitions.
>
> if the job fails after two hours, lets say it has processed 70 partition.
> should i start spark job from the beginning or is there way for checkpoint
> provision.
>
> Checkpoint,what i am expecting is start from 71 partition to till end.
>
> Please give me your suggestions.
>
> --
> Selvam Raman
> "லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"
>


Re: Output Side Effects for different chain of operations

2016-12-15 Thread Reynold Xin
You can just write some files out directly (and idempotently) in your
map/mapPartitions functions. It is just a function that you can run
arbitrary code after all.


On Thu, Dec 15, 2016 at 11:33 AM, Chawla,Sumit 
wrote:

> Any suggestions on this one?
>
> Regards
> Sumit Chawla
>
>
> On Tue, Dec 13, 2016 at 8:31 AM, Chawla,Sumit 
> wrote:
>
>> Hi All
>>
>> I have a workflow with different steps in my program. Lets say these are
>> steps A, B, C, D.  Step B produces some temp files on each executor node.
>> How can i add another step E which consumes these files?
>>
>> I understand the easiest choice is  to copy all these temp files to any
>> shared location, and then step E can create another RDD from it and work on
>> that.  But i am trying to avoid this copy.  I was wondering if there is any
>> way i can queue up these files for E as they are getting generated on
>> executors.  Is there any possibility of creating a dummy RDD in start of
>> program, and then push these files into this RDD from each executor.
>>
>> I take my inspiration from the concept of Side Outputs in Google Dataflow:
>>
>> https://cloud.google.com/dataflow/model/par-do#emitting-to-
>> side-outputs-in-your-dofn
>>
>>
>>
>> Regards
>> Sumit Chawla
>>
>>
>


Re: Output Side Effects for different chain of operations

2016-12-15 Thread Chawla,Sumit
Any suggestions on this one?

Regards
Sumit Chawla


On Tue, Dec 13, 2016 at 8:31 AM, Chawla,Sumit 
wrote:

> Hi All
>
> I have a workflow with different steps in my program. Lets say these are
> steps A, B, C, D.  Step B produces some temp files on each executor node.
> How can i add another step E which consumes these files?
>
> I understand the easiest choice is  to copy all these temp files to any
> shared location, and then step E can create another RDD from it and work on
> that.  But i am trying to avoid this copy.  I was wondering if there is any
> way i can queue up these files for E as they are getting generated on
> executors.  Is there any possibility of creating a dummy RDD in start of
> program, and then push these files into this RDD from each executor.
>
> I take my inspiration from the concept of Side Outputs in Google Dataflow:
>
> https://cloud.google.com/dataflow/model/par-do#
> emitting-to-side-outputs-in-your-dofn
>
>
>
> Regards
> Sumit Chawla
>
>


When will multiple aggregations be supported in Structured Streaming?

2016-12-15 Thread ljwagerfield
The current version of Spark (2.0.2) only supports one aggregation per
structured stream (and will throw an exception if multiple aggregations are
applied).

Roughly when will Spark support multiple aggregations?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/When-will-multiple-aggregations-be-supported-in-Structured-Streaming-tp28219.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Spark Batch checkpoint

2016-12-15 Thread Selvam Raman
Hi,

is there any provision in spark batch for checkpoint.

I am having huge data, it takes more than 3 hours to process all data. I am
currently having 100 partitions.

if the job fails after two hours, lets say it has processed 70 partition.
should i start spark job from the beginning or is there way for checkpoint
provision.

Checkpoint,what i am expecting is start from 71 partition to till end.

Please give me your suggestions.

-- 
Selvam Raman
"லஞ்சம் தவிர்த்து நெஞ்சம் நிமிர்த்து"


Dataset encoders for further types?

2016-12-15 Thread Jakub Dubovsky
Hey,

I want to ask whether there is any roadmap/plan for adding Encoders for
further types in next releases of Spark. Here is a list
 of
currently supported types. We would like to use Datasets with our
internally defined case classes containing
scala.collection.immutable.List(s). This does not work now because these
lists are converted to ArrayType (Seq). This then fails a constructor
lookup because of seq-is-not-a-list error...

This means that for now we are stuck with using RDDs.

Thanks for any insights!

Jakub Dubovsky


Re: Few questions on reliability of accumulators value.

2016-12-15 Thread Steve Loughran

On 12 Dec 2016, at 19:57, Daniel Siegmann 
> wrote:

Accumulators are generally unreliable and should not be used. The answer to (2) 
and (4) is yes. The answer to (3) is both.

Here's a more in-depth explanation: 
http://imranrashid.com/posts/Spark-Accumulators/


That's a really nice article.

Accumulators work for generating statistics of things, such as bytes 
read/written (used internally for this), network errors ignored, etc. These 
things stay correct on retries: if you read more bytes, the byte counter should 
increase.

Where they are dangerous is they are treated as a real output of work, an 
answer to some query, albeit just a side effect. People have been doing that 
with MR counters since Hadoop 0.1x, so there's no need to feel bad about 
trying; everyone tries at one point. In Hadoop 1.x, trying to create too many 
counters would actually overload the entire job tracker; At some point a 
per-job limit went in for that reason; it's still in the MR code to keep costs 
down.

Spark's accumulators only use up your cluster's storage + extra data on the 
heartbeats, but because of retries it's less an accumulator of results, and 
more an accumulator of 'things that happened during one or more executions of a 
function against an RDD'

You should really be generating all output as the output of a series of 
functional operations on RDDs.


spark use /tmp directory instead of directory from spark.local.dir

2016-12-15 Thread AlexModestov
Hello!
I want to use another dir instaed of /tmp directory for all stuff...
I set spark.local.dir and -Djava.io.tmpdir=/... but I see that Spark uses
/tmp for some data...
What does Spark do? And what should I do my Spark uses only my directories?
Thank you!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-use-tmp-directory-instead-of-directory-from-spark-local-dir-tp28217.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



"remember" vs "window" in Spark Streaming

2016-12-15 Thread Mattz
Hello,

Can someone please help me understand the different scenarios when I could
use "remember" vs "window" in Spark streaming?

Thanks!


Re: Belief propagation algorithm is open sourced

2016-12-15 Thread Bertrand Dechoux
Nice! I am especially interested in Bayesian Networks, which are only one
of the many models that can be expressed by a factor graph representation.
Do you do Bayesian Networks learning at scale (parameters and structure)
with latent variables? Are you using publicly available tools for that?
Which ones?

LibDAI, which created the supported format, "supports parameter learning of
conditional probability tables by Expectation Maximization" according to
the documentation. Is it your reference tool?

Bertrand

On Thu, Dec 15, 2016 at 5:21 AM, Bryan Cutler  wrote:

> I'll check it out, thanks for sharing Alexander!
>
> On Dec 13, 2016 4:58 PM, "Ulanov, Alexander" 
> wrote:
>
>> Dear Spark developers and users,
>>
>>
>> HPE has open sourced the implementation of the belief propagation (BP)
>> algorithm for Apache Spark, a popular message passing algorithm for
>> performing inference in probabilistic graphical models. It provides exact
>> inference for graphical models without loops. While inference for graphical
>> models with loops is approximate, in practice it is shown to work well. The
>> implementation is generic and operates on factor graph representation of
>> graphical models. It handles factors of any order, and variable domains of
>> any size. It is implemented with Apache Spark GraphX, and thus can scale to
>> large scale models. Further, it supports computations in log scale for
>> numerical stability. Large scale applications of BP include fraud detection
>> in banking transactions and malicious site detection in computer
>> networks.
>>
>>
>> Source code: https://github.com/HewlettPackard/sandpiper
>>
>>
>> Best regards, Alexander
>>
>