Re: spark streaming with kinesis

2016-11-20 Thread Takeshi Yamamuro
"1 userid data" is ambiguous though (user-input data? stream? shard?),
since a kinesis worker fetch data from shards that the worker has an
ownership of, IIUC user-input data in a shard are transferred into an
assigned worker as long as you get no failure.

// maropu

On Mon, Nov 21, 2016 at 1:59 PM, Shushant Arora 
wrote:

> Hi
>
> Thanks.
> Have a doubt on spark streaming kinesis consumer. Say I have a batch time
> of 500 ms and kiensis stream is partitioned on userid(uniformly
> distributed).But since IdleTimeBetweenReadsInMillis is set to 1000ms so
> Spark receiver nodes will fetch the data at interval of 1 second and store
> in InputDstream.
>
> 1. When worker executors will fetch the data from receiver at after every
> 500 ms does its gurantee that 1 userid data will go to one partition and
> that to one worker only always ?
> 2.If not - can I repartition stream data before processing? If yes how-
> since JavaDStream has only one method repartition which takes number of
> partitions and not the partitioner function ?So it will randomly
> repartition the Dstream data.
>
> Thanks
>
>
>
>
>
>
>
>
> On Tue, Nov 15, 2016 at 8:23 AM, Takeshi Yamamuro 
> wrote:
>
>> Seems it it not a good design to frequently restart workers in a minute
>> because
>> their initialization and shutdown take much time as you said
>> (e.g., interconnection overheads with dynamodb and graceful shutdown).
>>
>> Anyway, since this is a kind of questions about the aws kinesis library,
>> so
>> you'd better to ask aws guys in their forum or something.
>>
>> // maropu
>>
>>
>> On Mon, Nov 14, 2016 at 11:20 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> 1.No, I want to implement low level consumer on kinesis stream.
>>> so need to stop the worker once it read the latest sequence number sent
>>> by driver.
>>>
>>> 2.What is the cost of frequent register and deregister of worker node.
>>> Is that when worker's shutdown is called it will terminate run method but
>>> leasecoordinator will wait for 2seconds before releasing the lease. So I
>>> cannot deregister a worker in less than 2 seconds ?
>>>
>>> Thanks!
>>>
>>>
>>>
>>> On Mon, Nov 14, 2016 at 7:36 PM, Takeshi Yamamuro >> > wrote:
>>>
 Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not
 enough for your usecase?

 On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> Thanks!
> Is there a way to get the latest sequence number of all shards of a
> kinesis stream?
>
>
>
> On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro <
> linguin@gmail.com> wrote:
>
>> Hi,
>>
>> The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
>> in KinesisClientLibConfiguration though,
>> it is not configurable in the current implementation.
>>
>> The detail can be found in;
>> https://github.com/apache/spark/blob/master/external/kinesis
>> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines
>> isReceiver.scala#L152
>>
>> // maropu
>>
>>
>> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> *Hi *
>>>
>>> *is **spark.streaming.blockInterval* for kinesis input stream is
>>> hardcoded to 1 sec or is it configurable ? Time interval at which 
>>> receiver
>>> fetched data from kinesis .
>>>
>>> Means stream batch interval cannot be less than 
>>> *spark.streaming.blockInterval
>>> and this should be configrable , Also is there any minimum value for
>>> streaming batch interval ?*
>>>
>>> *Thanks*
>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


 --
 ---
 Takeshi Yamamuro

>>>
>>>
>>
>>
>> --
>> ---
>> Takeshi Yamamuro
>>
>
>


-- 
---
Takeshi Yamamuro


Re: Re: Multiple streaming aggregations in structured streaming

2016-11-20 Thread Reynold Xin
Can you use the approximate count distinct?


On Sun, Nov 20, 2016 at 11:51 PM, Xinyu Zhang  wrote:

>
> MapWithState is also very useful.
> I want to calculate UV in real time, but "distinct count" and "multiple
> streaming aggregations" are not supported.
> Is there any method to calculate real-time UV in the current version?
>
>
>
> At 2016-11-19 06:01:45, "Michael Armbrust"  wrote:
>
> Doing this generally is pretty hard.  We will likely support algebraic
> aggregate eventually, but this is not currently slotted for 2.2.  Instead I
> think we will add something like mapWithState that lets users compute
> arbitrary stateful things.  What is your use case?
>
>
> On Wed, Nov 16, 2016 at 6:58 PM, wszxyh  wrote:
>
>> Hi
>>
>> Multiple streaming aggregations are not yet supported. When will it be
>> supported? Is it in the plan?
>>
>> Thanks
>>
>>
>>
>>
>
>
>
>
>


Re:Re: Multiple streaming aggregations in structured streaming

2016-11-20 Thread Xinyu Zhang


MapWithState is also very useful. 
I want to calculate UV in real time, but "distinct count" and "multiple 
streaming aggregations" are not supported.
Is there any method to calculate real-time UV in the current version?




At 2016-11-19 06:01:45, "Michael Armbrust"  wrote:

Doing this generally is pretty hard.  We will likely support algebraic 
aggregate eventually, but this is not currently slotted for 2.2.  Instead I 
think we will add something like mapWithState that lets users compute arbitrary 
stateful things.  What is your use case?




On Wed, Nov 16, 2016 at 6:58 PM, wszxyh  wrote:

Hi


Multiple streaming aggregations are not yet supported. When will it be 
supported? Is it in the plan?


Thanks




 




RE: DataFrame select non-existing column

2016-11-20 Thread Mendelson, Assaf
The nested columns are in fact a syntactic sugar.
You basically have a column called pass. The type of this column is a struct 
which has a field called mobile.
After you read the parquet file you can check the schema (df.schema) and 
looking at what it has. Basically loop through the types and see if you have a 
pass column. Then take the type of that and check if it has a mobile element. 
This would be something like this (writing it without testing so it probably 
will have small mistakes):

df_schema = df.schema
found_pass = False
found_mobile = False
for f in df_schema:
if f.name == 'pass':
found_pass = True
for g in f.dataType:
 if g.name == 'mobile':
 found_mobile = True
break

Assaf
-Original Message-
From: Kristoffer Sjögren [mailto:sto...@gmail.com] 
Sent: Sunday, November 20, 2016 4:13 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: DataFrame select non-existing column

The problem is that I do not know which data frames has the pass.mobile column. 
I just list a HDFS directory which contain the parquet files and some files has 
the column and some don't. I really don't want to have conditional logic that 
inspect the schema. But maybe that's the only option?

Maybe I misunderstand you, but the following code fails with the same error as 
before.

DataFrame dataFrame = ctx.read().parquet(localPath)
.select("pass")
.withColumn("mobile", col("pass.mobile"));


The flatten option works for my use case. But the problem is that there seems 
to be no way of dropping nested columns, i.e.
drop("pass.auction")


On Sun, Nov 20, 2016 at 10:55 AM, Mendelson, Assaf  
wrote:
> The issue is that you already have a struct called pass. What you did was add 
> a new columned called "pass.mobile" instead of adding the element to pass - 
> The schema for pass element is the same as before.
> When you do select pass.mobile, it finds the pass structure and checks for 
> mobile in it.
>
> You can do it the other way around: set the name to be: pass_mobile. Add it 
> as before with lit(0) for those that dataframes that do not have the mobile 
> field and do something like withColumn("pass_mobile", df["pass.modile"]) for 
> those that do.
> Another option is to use do something like df.select("pass.*") to flatten the 
> pass structure and work on that (then you can do withColumn("mobile",...) 
> instead of "pass.mobile") but this would change the schema.
>
>
> -Original Message-
> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
> Sent: Saturday, November 19, 2016 4:57 PM
> To: Mendelson, Assaf
> Cc: user
> Subject: Re: DataFrame select non-existing column
>
> Thanks. Here's my code example [1] and the printSchema() output [2].
>
> This code still fails with the following message: "No such struct field 
> mobile in auction, geo"
>
> By looking at the schema, it seems that pass.mobile did not get nested, which 
> is the way it needs to be for my use case. Is nested columns not supported by 
> withColumn()?
>
> [1]
>
> DataFrame df = ctx.read().parquet(localPath).withColumn("pass.mobile", 
> lit(0L)); dataFrame.printSchema(); dataFrame.select("pass.mobile");
>
> [2]
>
> root
>  |-- pass: struct (nullable = true)
>  ||-- auction: struct (nullable = true)
>  |||-- id: integer (nullable = true)
>  ||-- geo: struct (nullable = true)
>  |||-- postalCode: string (nullable = true)
>  |-- pass.mobile: long (nullable = false)
>
> On Sat, Nov 19, 2016 at 7:45 AM, Mendelson, Assaf  
> wrote:
>> In pyspark for example you would do something like:
>>
>> df.withColumn("newColName",pyspark.sql.functions.lit(None))
>>
>> Assaf.
>> -Original Message-
>> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
>> Sent: Friday, November 18, 2016 9:19 PM
>> To: Mendelson, Assaf
>> Cc: user
>> Subject: Re: DataFrame select non-existing column
>>
>> Thanks for your answer. I have been searching the API for doing that but I 
>> could not find how to do it?
>>
>> Could you give me a code snippet?
>>
>> On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf  
>> wrote:
>>> You can always add the columns to old dataframes giving them null (or some 
>>> literal) as a preprocessing.
>>>
>>> -Original Message-
>>> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
>>> Sent: Friday, November 18, 2016 4:32 PM
>>> To: user
>>> Subject: DataFrame select non-existing column
>>>
>>> Hi
>>>
>>> We have evolved a DataFrame by adding a few columns but cannot write select 
>>> statements on these columns for older data that doesn't have them since 
>>> they fail with a AnalysisException with message "No such struct field".
>>>
>>> We also tried dropping columns but this doesn't work for nested columns.
>>>
>>> Any non-hacky ways to get around this?
>>>
>>> Cheers,
>>> -Kristoffer
>>>
>>> 
>>> - To unsubscribe e-mail: 

RE: Join Query

2016-11-20 Thread Shreya Agarwal

Replication join = broadcast join. Look for that term on google. Many examples.

Semi join can be done on dataframes/dataset by passing “semi join” as the third 
parameter on the join/joinWith function.

Not sure about the other two.

Sent from my Windows 10 phone

From: Aakash Basu
Sent: Thursday, November 17, 2016 3:17 PM
To: user@spark.apache.org
Subject: Join Query

Hi,



Conceptually I can understand below spark joins, when it comes to 
implementation I don’t find much information in Google. Please help me with 
code/pseudo code for below joins using java-spark or scala-spark.

Replication Join:
Given two datasets, where one is small enough to fit into the 
memory, perform a Replicated join using Spark.
Note: Need a program to justify this fits for Replication Join.

Semi-Join:
Given a huge dataset, do a semi-join using spark. Note that, 
with semi-join, one dataset needs to do Filter and projection to fit into the 
cache.
Note: Need a program to justify this fits for Semi-Join.


Composite Join:
Given a dataset whereby a dataset is still too big after 
filtering and cannot fit into the memory. Perform composite join on a 
pre-sorted and pre-partitioned data using spark.
Note: Need a program to justify this fits for composite Join.


Repartition join:
Join two datasets by performing Repartition join in spark.
Note: Need a program to justify this fits for repartition Join.





Thanks,
Aakash.


RE: HDPCD SPARK Certification Queries

2016-11-20 Thread Shreya Agarwal
Replication join = broadcast join. Look for that term on google. Many examples.

Semi join can be done on dataframes/dataset by passing “semi join” as the third 
parameter on the join/joinWith function.

Not sure about the other two.

Sent from my Windows 10 phone

From: Aakash Basu
Sent: Thursday, November 17, 2016 3:41 PM
To: user@spark.apache.org
Subject: HDPCD SPARK Certification Queries

Hi all,


I want to know more about this examination - 
http://hortonworks.com/training/certification/exam-objectives/#hdpcdspark


If anyone's there who appeared for the examination, can you kindly help?

1) What are the kind of questions that come,

2) Samples,

3) All the other details.

Thanks,
Aakash.


Re: spark streaming with kinesis

2016-11-20 Thread Shushant Arora
Hi

Thanks.
Have a doubt on spark streaming kinesis consumer. Say I have a batch time
of 500 ms and kiensis stream is partitioned on userid(uniformly
distributed).But since IdleTimeBetweenReadsInMillis is set to 1000ms so
Spark receiver nodes will fetch the data at interval of 1 second and store
in InputDstream.

1. When worker executors will fetch the data from receiver at after every
500 ms does its gurantee that 1 userid data will go to one partition and
that to one worker only always ?
2.If not - can I repartition stream data before processing? If yes how-
since JavaDStream has only one method repartition which takes number of
partitions and not the partitioner function ?So it will randomly
repartition the Dstream data.

Thanks








On Tue, Nov 15, 2016 at 8:23 AM, Takeshi Yamamuro 
wrote:

> Seems it it not a good design to frequently restart workers in a minute
> because
> their initialization and shutdown take much time as you said
> (e.g., interconnection overheads with dynamodb and graceful shutdown).
>
> Anyway, since this is a kind of questions about the aws kinesis library, so
> you'd better to ask aws guys in their forum or something.
>
> // maropu
>
>
> On Mon, Nov 14, 2016 at 11:20 PM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> 1.No, I want to implement low level consumer on kinesis stream.
>> so need to stop the worker once it read the latest sequence number sent
>> by driver.
>>
>> 2.What is the cost of frequent register and deregister of worker node. Is
>> that when worker's shutdown is called it will terminate run method but
>> leasecoordinator will wait for 2seconds before releasing the lease. So I
>> cannot deregister a worker in less than 2 seconds ?
>>
>> Thanks!
>>
>>
>>
>> On Mon, Nov 14, 2016 at 7:36 PM, Takeshi Yamamuro 
>> wrote:
>>
>>> Is "aws kinesis get-shard-iterator --shard-iterator-type LATEST" not
>>> enough for your usecase?
>>>
>>> On Mon, Nov 14, 2016 at 10:23 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Thanks!
 Is there a way to get the latest sequence number of all shards of a
 kinesis stream?



 On Mon, Nov 14, 2016 at 5:43 PM, Takeshi Yamamuro <
 linguin@gmail.com> wrote:

> Hi,
>
> The time interval can be controlled by `IdleTimeBetweenReadsInMillis`
> in KinesisClientLibConfiguration though,
> it is not configurable in the current implementation.
>
> The detail can be found in;
> https://github.com/apache/spark/blob/master/external/kinesis
> -asl/src/main/scala/org/apache/spark/streaming/kinesis/Kines
> isReceiver.scala#L152
>
> // maropu
>
>
> On Sun, Nov 13, 2016 at 12:08 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> *Hi *
>>
>> *is **spark.streaming.blockInterval* for kinesis input stream is
>> hardcoded to 1 sec or is it configurable ? Time interval at which 
>> receiver
>> fetched data from kinesis .
>>
>> Means stream batch interval cannot be less than 
>> *spark.streaming.blockInterval
>> and this should be configrable , Also is there any minimum value for
>> streaming batch interval ?*
>>
>> *Thanks*
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


>>>
>>>
>>> --
>>> ---
>>> Takeshi Yamamuro
>>>
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Spark driver not reusing HConnection

2016-11-20 Thread Mukesh Jha
Any ideas folks?

On Fri, Nov 18, 2016 at 3:37 PM, Mukesh Jha  wrote:

> Hi
>
> I'm accessing multiple regions (~5k) of an HBase table using spark's
> newAPIHadoopRDD. But the driver is trying to calculate the region size of
> all the regions.
> It is not even reusing the hconnection and creting a new connection for
> every request (see below) which is taking lots of time.
>
> Is there a better approach to do this?
>
>
> 8 Nov 2016 22:25:22,759] [INFO Driver] RecoverableZooKeeper: Process
> identifier=*hconnection-0x1e7824af* connecting to ZooKeeper ensemble=
> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
> [18 Nov 2016 22:25:22,759] [INFO Driver] ZooKeeper: Initiating client
> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
> hbase28.cloud.com:2181 sessionTimeout=6 watcher=hconnection-0x1e7824af0x0,
> quorum=hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28
> .cloud.com:2181, baseZNode=/hbase
> [18 Nov 2016 22:25:22,761] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
> ClientCnxn: Opening socket connection to server
> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
> using SASL (unknown error)
> [18 Nov 2016 22:25:22,763] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
> ClientCnxn: Socket connection established, initiating session, client: /
> 10.193.138.145:47891, server: hbase24.cloud.com/10.193.150.217:2181
> [18 Nov 2016 22:25:22,766] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
> ClientCnxn: Session establishment complete on server
> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e95,
> negotiated timeout = 6
> [18 Nov 2016 22:25:22,766] [INFO Driver] RegionSizeCalculator: Calculating
> region sizes for table "message".
> [18 Nov 2016 22:25:27,867] [INFO Driver] 
> ConnectionManager$HConnectionImplementation:
> Closing master protocol: MasterService
> [18 Nov 2016 22:25:27,868] [INFO Driver] 
> ConnectionManager$HConnectionImplementation:
> Closing zookeeper sessionid=0x2564f6f013e0e95
> [18 Nov 2016 22:25:27,869] [INFO Driver] ZooKeeper: Session:
> 0x2564f6f013e0e95 closed
> [18 Nov 2016 22:25:27,869] [INFO Driver-EventThread] ClientCnxn:
> EventThread shut down
> [18 Nov 2016 22:25:27,880] [INFO Driver] RecoverableZooKeeper: Process
> identifier=*hconnection-0x6a8a1efa* connecting to ZooKeeper ensemble=
> hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28.cloud.com:2181
> [18 Nov 2016 22:25:27,880] [INFO Driver] ZooKeeper: Initiating client
> connection, connectString=hbase19.cloud.com:2181,hbase24.cloud.com:2181,
> hbase28.cloud.com:2181 sessionTimeout=6 watcher=hconnection-0x6a8a1efa0x0,
> quorum=hbase19.cloud.com:2181,hbase24.cloud.com:2181,hbase28
> .cloud.com:2181, baseZNode=/hbase
> [18 Nov 2016 22:25:27,883] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
> ClientCnxn: Opening socket connection to server
> hbase24.cloud.com/10.193.150.217:2181. Will not attempt to authenticate
> using SASL (unknown error)
> [18 Nov 2016 22:25:27,885] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
> ClientCnxn: Socket connection established, initiating session, client: /
> 10.193.138.145:47894, server: hbase24.cloud.com/10.193.150.217:2181
> [18 Nov 2016 22:25:27,887] [INFO Driver-SendThread(hbase24.cloud.com:2181)]
> ClientCnxn: Session establishment complete on server
> hbase24.cloud.com/10.193.150.217:2181, sessionid = 0x2564f6f013e0e97,
> negotiated timeout = 6
> [18 Nov 2016 22:25:27,888] [INFO Driver] RegionSizeCalculator: Calculating
> region sizes for table "message".
> 
>
> --
> Thanks & Regards,
>
> *Mukesh Jha *
>



-- 


Thanks & Regards,

*Mukesh Jha *


Re: dataframe data visualization

2016-11-20 Thread ayan guha
Zeppelin with Spark thrift server?

On Mon, Nov 21, 2016 at 1:47 PM, Saisai Shao  wrote:

> You might take a look at this project (https://github.com/vegas-viz/Vegas),
> it has Spark integration.
>
> Thanks
> Saisai
>
> On Mon, Nov 21, 2016 at 10:23 AM, wenli.o...@alibaba-inc.com <
> wenli.o...@alibaba-inc.com> wrote:
>
>> Hi anyone,
>>
>> is there any easy way for me to do data visualization in spark using
>> scala when data is in dataframe format? Thanks.
>>
>> Wayne Ouyang
>
>
>


-- 
Best Regards,
Ayan Guha


Re: Linear regression + Janino Exception

2016-11-20 Thread janardhan shetty
Seems like this is associated to :
https://issues.apache.org/jira/browse/SPARK-16845

On Sun, Nov 20, 2016 at 6:09 PM, janardhan shetty 
wrote:

> Hi,
>
> I am trying to execute Linear regression algorithm for Spark 2.02 and
> hitting the below error when I am fitting my training set:
>
> val lrModel = lr.fit(train)
>
>
> It happened on 2.0.0 as well. Any resolution steps is appreciated.
>
> *Error Snippet: *
> 16/11/20 18:03:45 *ERROR CodeGenerator: failed to compile:
> org.codehaus.janino.JaninoRuntimeException: Code of method
> "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V"
> of class
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
> grows beyond 64 KB*
> /* 001 */ public java.lang.Object generate(Object[] references) {
> /* 002 */   return new SpecificUnsafeProjection(references);
> /* 003 */ }
> /* 004 */
> /* 005 */ class SpecificUnsafeProjection extends
> org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
> /* 006 */
> /* 007 */   private Object[] references;
> /* 008 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF
> scalaUDF;
> /* 009 */   private scala.Function1 catalystConverter;
> /* 010 */   private scala.Function1 converter;
> /* 011 */   private scala.Function1 udf;
> /* 012 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF
> scalaUDF1;
> /* 013 */   private scala.Function1 catalystConverter1;
> /* 014 */   private scala.Function1 converter1;
> /* 015 */   private scala.Function1 udf1;
> /* 016 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF
> scalaUDF2;
> /* 017 */   private scala.Function1 catalystConverter2;
>
>
>


Re: dataframe data visualization

2016-11-20 Thread Saisai Shao
You might take a look at this project (https://github.com/vegas-viz/Vegas),
it has Spark integration.

Thanks
Saisai

On Mon, Nov 21, 2016 at 10:23 AM, wenli.o...@alibaba-inc.com <
wenli.o...@alibaba-inc.com> wrote:

> Hi anyone,
>
> is there any easy way for me to do data visualization in spark using scala
> when data is in dataframe format? Thanks.
>
> Wayne Ouyang


dataframe data visualization

2016-11-20 Thread wenli.o...@alibaba-inc.com
Hi anyone,

is there any easy way for me to do data visualization in spark using scala when 
data is in dataframe format? Thanks.

Wayne Ouyang

smime.p7s
Description: S/MIME cryptographic signature


Linear regression + Janino Exception

2016-11-20 Thread janardhan shetty
Hi,

I am trying to execute Linear regression algorithm for Spark 2.02 and
hitting the below error when I am fitting my training set:

val lrModel = lr.fit(train)


It happened on 2.0.0 as well. Any resolution steps is appreciated.

*Error Snippet: *
16/11/20 18:03:45 *ERROR CodeGenerator: failed to compile:
org.codehaus.janino.JaninoRuntimeException: Code of method
"(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V"
of class
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
grows beyond 64 KB*
/* 001 */ public java.lang.Object generate(Object[] references) {
/* 002 */   return new SpecificUnsafeProjection(references);
/* 003 */ }
/* 004 */
/* 005 */ class SpecificUnsafeProjection extends
org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
/* 006 */
/* 007 */   private Object[] references;
/* 008 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF
scalaUDF;
/* 009 */   private scala.Function1 catalystConverter;
/* 010 */   private scala.Function1 converter;
/* 011 */   private scala.Function1 udf;
/* 012 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF
scalaUDF1;
/* 013 */   private scala.Function1 catalystConverter1;
/* 014 */   private scala.Function1 converter1;
/* 015 */   private scala.Function1 udf1;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.ScalaUDF
scalaUDF2;
/* 017 */   private scala.Function1 catalystConverter2;


Fwd: Yarn resource utilization with Spark pipe()

2016-11-20 Thread Sameer Choudhary
Hi,

I am working on an Spark 1.6.2 application on YARN managed EMR cluster that
uses RDD's pipe method to process my data. I start a light weight daemon
process that starts processes for each task via pipes. This is to ensure
that I don't run into https://issues.apache.org/jira/browse/SPARK-671.

I'm running into Spark job failure due to task failures across the cluster.
Following are the questions that I think would help in understanding the
issue:
- How does resource allocation in PySpark work? How does YARN and SPARK
track the memory consumed by python processes launched on the worker nodes?
- As an example, let's say SPARK started n tasks on a worker node. These n
tasks start n processes via pipe. Memory for executors is already reserved
during application launch. As the processes run their memory footprint
grows and eventually there is not enough memory on the box. In this case
how will YARN and SPARK behave? Will the executors be killed or my
processes will kill, eventually killing the task? I think this could lead
to cascading failures of tasks across cluster as retry attempts also fail,
eventually leading to termination of SPARK job. Is there a way to avoid
this?
- When we define number of executors in my SparkConf, are they distributed
evenly across my nodes? One approach to get around this problem would be to
limit the number of executors on each host that YARN can launch. So we will
manage the memory for piped processes outside of YARN. Is there way to
avoid this?

Thanks,
Sameer


Re: How do I access the nested field in a dataframe, spark Streaming app... Please help.

2016-11-20 Thread shyla deshpande
Thanks Jon, great Learning resource.
Thanks Pandees,  addresses[0].city would work , but I want all the cities
not just from addresses[0].
Finally, I wrote the following function to get the cities.

def getCities(addresses: Seq[Address]) : String = {
  var cities:String = ""
  if (addresses.size > 0) {
cities = (for(a <- addresses) yield a.city.getOrElse("")).mkString(",")
//cities = addresses.foldLeft("")((str,addr) => str  +
addr.city.getOrElse(""))
  }
  cities
}

Great help. Thanks again


On Sun, Nov 20, 2016 at 1:10 PM, Jon Gregg  wrote:

> In these cases it might help to just flatten the DataFrame.  Here's a
> helper function from the tutorial (scroll down to the "Flattening" header:
> https://docs.cloud.databricks.com/docs/latest/databricks_
> guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/
> 02%20Introduction%20to%20DataFrames%20-%20scala.html
>
>
> On Sun, Nov 20, 2016 at 1:24 PM, pandees waran  wrote:
>
>> have you tried using "." access method?
>>
>> e.g:
>> ds1.select("name","addresses[0].element.city")
>>
>> On Sun, Nov 20, 2016 at 9:59 AM, shyla deshpande <
>> deshpandesh...@gmail.com> wrote:
>>
>>> The following my dataframe schema
>>>
>>> root
>>>  |-- name: string (nullable = true)
>>>  |-- addresses: array (nullable = true)
>>>  ||-- element: struct (containsNull = true)
>>>  |||-- street: string (nullable = true)
>>>  |||-- city: string (nullable = true)
>>>
>>> I want to output name and city. The following is my spark streaming app
>>> which outputs name and addresses, but I want name and cities in the output.
>>>
>>> object PersonConsumer {
>>>   import org.apache.spark.sql.{SQLContext, SparkSession}
>>>   import com.example.protos.demo._
>>>
>>>   def main(args : Array[String]) {
>>>
>>> val spark = SparkSession.builder.
>>>   master("local")
>>>   .appName("spark session example")
>>>   .getOrCreate()
>>>
>>> import spark.implicits._
>>>
>>> val ds1 = spark.readStream.format("kafka").
>>>   option("kafka.bootstrap.servers","localhost:9092").
>>>   option("subscribe","person").load()
>>>
>>> val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value"
>>> )).map(Person.parseFrom(_)).select($"name", $"addresses")
>>>
>>> ds2.printSchema()
>>>
>>> val query = ds2.writeStream
>>>   .outputMode("append")
>>>   .format("console")
>>>   .start()
>>>
>>> query.awaitTermination()
>>>   }
>>> }
>>>
>>> Appreciate your help. Thanks.
>>>
>>
>>
>>
>> --
>> Thanks,
>> Pandeeswaran
>>
>
>


Re: How do I access the nested field in a dataframe, spark Streaming app... Please help.

2016-11-20 Thread Jon Gregg
In these cases it might help to just flatten the DataFrame.  Here's a
helper function from the tutorial (scroll down to the "Flattening" header:
https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/02%20Introduction%20to%20DataFrames%20-%20scala.html



On Sun, Nov 20, 2016 at 1:24 PM, pandees waran  wrote:

> have you tried using "." access method?
>
> e.g:
> ds1.select("name","addresses[0].element.city")
>
> On Sun, Nov 20, 2016 at 9:59 AM, shyla deshpande  > wrote:
>
>> The following my dataframe schema
>>
>> root
>>  |-- name: string (nullable = true)
>>  |-- addresses: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- street: string (nullable = true)
>>  |||-- city: string (nullable = true)
>>
>> I want to output name and city. The following is my spark streaming app
>> which outputs name and addresses, but I want name and cities in the output.
>>
>> object PersonConsumer {
>>   import org.apache.spark.sql.{SQLContext, SparkSession}
>>   import com.example.protos.demo._
>>
>>   def main(args : Array[String]) {
>>
>> val spark = SparkSession.builder.
>>   master("local")
>>   .appName("spark session example")
>>   .getOrCreate()
>>
>> import spark.implicits._
>>
>> val ds1 = spark.readStream.format("kafka").
>>   option("kafka.bootstrap.servers","localhost:9092").
>>   option("subscribe","person").load()
>>
>> val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value"
>> )).map(Person.parseFrom(_)).select($"name", $"addresses")
>>
>> ds2.printSchema()
>>
>> val query = ds2.writeStream
>>   .outputMode("append")
>>   .format("console")
>>   .start()
>>
>> query.awaitTermination()
>>   }
>> }
>>
>> Appreciate your help. Thanks.
>>
>
>
>
> --
> Thanks,
> Pandeeswaran
>


Re: Flume integration

2016-11-20 Thread ayan guha
Hi

While I am following this discussion with interest, I am trying to
comprehend any architectural benefit of a spark sink.

Is there any feature in flume makes it more suitable to ingest stream data
than sppark streaming, so that we should chain them? For example does it
help durability or reliability of the source?

Or, it is a more tactical choice based on connector availability or such?

To me, flume is important component to ingest streams to hdfs or hive
directly ie it plays on the batch side of lambda architecture pattern.
On 20 Nov 2016 22:30, "Mich Talebzadeh"  wrote:

> Hi Ian,
>
> Has this been resolved?
>
> How about data to Flume and then Kafka and Kafka streaming into Spark?
>
> Thanks
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 13 July 2016 at 11:13, Ian Brooks  wrote:
>
>> Hi,
>>
>>
>>
>> I'm currently trying to implement a prototype Spark application that gets
>> data from Flume and processes it. I'm using the pull based method mentioned
>> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>>
>>
>>
>> The is initially working fine for getting data from Flume, however the
>> Spark client doesn't appear to be letting Flume know that the data has been
>> received, so Flume doesn't remove it from the batch.
>>
>>
>>
>> After 100 requests Flume stops allowing any new data and logs
>>
>>
>>
>> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
>> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
>> Error while processing transaction.
>> org.apache.flume.ChannelException: Take list for MemoryTransaction,
>> capacity 100 full, consider committing more frequently, increasing
>> capacity, or increasing thread count
>>at org.apache.flume.channel.MemoryChannel$MemoryTransaction.
>> doTake(MemoryChannel.java:96)
>>
>>
>>
>> My code to pull the data from Flume is
>>
>>
>>
>> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>>
>> Duration batchInterval = new Duration(1);
>>
>> final String checkpointDir = "/tmp/";
>>
>>
>>
>> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>> batchInterval);
>>
>> ssc.checkpoint(checkpointDir);
>>
>> JavaReceiverInputDStream flumeStream =
>> FlumeUtils.createPollingStream(ssc, host, port);
>>
>>
>>
>> // Transform each flume avro event to a process-able format
>>
>> JavaDStream transformedEvents = flumeStream.map(new
>> Function() {
>>
>>
>>
>> @Override
>>
>> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>>
>> String flumeEventStr = flumeEvent.event().toString();
>>
>> avroData avroData = new avroData();
>>
>> Gson gson = new GsonBuilder().create();
>>
>> avroData = gson.fromJson(flumeEventStr, avroData.class);
>>
>> HashMap body = avroData.getBody();
>>
>> String data = body.get("bytes");
>>
>> return data;
>>
>> }
>>
>> });
>>
>>
>>
>> ...
>>
>>
>>
>> ssc.start();
>>
>> ssc.awaitTermination();
>>
>> ssc.close();
>>
>> }
>>
>>
>>
>> Is there something specific I should be doing to let the Flume server
>> know the batch has been received and processed?
>>
>>
>> --
>>
>> Ian Brooks
>>
>>
>>
>
>


Re: How do I access the nested field in a dataframe, spark Streaming app... Please help.

2016-11-20 Thread pandees waran
have you tried using "." access method?

e.g:
ds1.select("name","addresses[0].element.city")

On Sun, Nov 20, 2016 at 9:59 AM, shyla deshpande 
wrote:

> The following my dataframe schema
>
> root
>  |-- name: string (nullable = true)
>  |-- addresses: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- street: string (nullable = true)
>  |||-- city: string (nullable = true)
>
> I want to output name and city. The following is my spark streaming app
> which outputs name and addresses, but I want name and cities in the output.
>
> object PersonConsumer {
>   import org.apache.spark.sql.{SQLContext, SparkSession}
>   import com.example.protos.demo._
>
>   def main(args : Array[String]) {
>
> val spark = SparkSession.builder.
>   master("local")
>   .appName("spark session example")
>   .getOrCreate()
>
> import spark.implicits._
>
> val ds1 = spark.readStream.format("kafka").
>   option("kafka.bootstrap.servers","localhost:9092").
>   option("subscribe","person").load()
>
> val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value"
> )).map(Person.parseFrom(_)).select($"name", $"addresses")
>
> ds2.printSchema()
>
> val query = ds2.writeStream
>   .outputMode("append")
>   .format("console")
>   .start()
>
> query.awaitTermination()
>   }
> }
>
> Appreciate your help. Thanks.
>



-- 
Thanks,
Pandeeswaran


RE: Error in running twitter streaming job

2016-11-20 Thread Marco Mistroni
Hi
Start by running it locally. See how it compares.
Debug
Then move to cluster
Debugging stuff running on cluster is a pain as there can be tons of reasons
Isolate the problem locally

On 20 Nov 2016 5:04 pm, "Kappaganthu, Sivaram (ES)" <
sivaram.kappagan...@adp.com> wrote:

> Thank You for helping Mistroni.
>
>
>
> I took this example from the link http://stdatalabs.blogspot.in/
> 2016/09/spark-streaming-part-1-real-time.html .
>
>
>
> I also took top 10 elements in the row. I just ran this example with Yarn
> master. It ran fine and giving me good results. But in Spark master, it is
> taking so much time and went on hold. Please help.
>
>
>
> Thanks,
>
> SIvaram
>
>
>
> *From:* Marco Mistroni [mailto:mmistr...@gmail.com]
> *Sent:* Sunday, November 20, 2016 6:34 PM
> *To:* Kappaganthu, Sivaram (ES)
> *Cc:* user@spark.apache.org
> *Subject:* Re: Error in running twitter streaming job
>
>
>
> HI
>
>  is that from Spark Streaming Course  by Frank Kane? anyway, i have nearly
> same example (where  i only pick up top 10) and works for me
>
> so i am guessing perhaps you are not setting up twitter credentials?
>
> Did you try to see if in the output you are fetching them correctly?
>
> hth
>
>  marco
>
>
>
>
>
> On Sun, Nov 20, 2016 at 11:09 AM, Kappaganthu, Sivaram (ES) <
> sivaram.kappagan...@adp.com> wrote:
>
> Hi,
>
>
>
> I am trying to run a twitter streaming job with the below code  and not
> getting any output. Could someone please help in resolving the issue.
>
>
>
>
>
> package org.test.scala
>
>
>
> import org.apache.spark.SparkConf
>
> import org.apache.spark.SparkContext
>
> import org.apache.spark.streaming._
>
> import org.apache.spark.SparkContext._
>
> import org.apache.spark.streaming.twitter._
>
>
>
>
>
>
>
> object twitterAnalysis {
>
>   val conf = new SparkConf().setAppName("twitter sentiment analysis")
>
>   val sc = new SparkContext(conf)
>
>
>
>def main(args: Array[String]): Unit = {
>
>
>
> sc.setLogLevel("WARN")
>
>// val Array(consumerKey,consumerSecret,accessToken,accessSecret) =
> args.take(4)
>
> val filters = args
>
>
>
> for(i <- filters) { println("recieved arg" + i) }
>
>
>
> /* setting up the Twitter4j library parameters */
>
>
>
>
>
> System.setProperty("twitter4j.oauth.consumerKey", mykey)
>
> System.setProperty("twitter4j.oauth.consumerSecret", mysecret)
>
> System.setProperty("twitter4j.oauth.accessToken", acesskey)
>
> System.setProperty("twitter4j.oauth.accessTokenSecret", accesssecret)
>
>
>
>
>
> val ssc = new StreamingContext(sc,Seconds(5))
>
>
>
> val stream = TwitterUtils.createStream(ssc,None,filters)
>
>
>
> //stream.print()
>
>
>
> val hashtags = stream.flatMap( status => status.getText.split("
> ").filter( _.startsWith("#")) )
>
>
>
> println("here are the elements of HashTags")
>
>// hashtags.print()
>
>
>
> val topcounts60 = hashtags.map((_,1)).reduceByKeyAndWindow(_ + _
> ,Seconds(60))
>
>  .map { case (topic,count) => (count,topic) }
>
>  .transform(_.sortByKey(false))
>
>
>
>
>
> val topcount10 = hashtags.map((_,1)).reduceByKeyAndWindow(_+_,
> Seconds(10))
>
>  .map{ case(topic,count) => (count,topic) }
>
>  .transform(_.sortByKey(false))
>
>
>
>
>
> topcounts60.foreachRDD(rdd => {
>
> val toplist = rdd.take(10)
>
> println("\n Popular topics in last 10 seconds ( %s total)
> ".format(rdd.count()))
>
> toplist.foreach { case (count,tag)  => println("%s (%s
> tweets)".format(tag, count)) }
>
>
>
> })
>
>ssc.start
>
> ssc.awaitTermination
>
>
>
>   }
>
>
>
> }
>
>
>
>
>
> Upon hitting this with command bin/spark-submit –class
> org.test.scala.twitterAnalysis –master spark://localhost.localdomain:7077
> –jars  jar name, the job is getting hold and not
> giving me any output. Below are the terminal output and  STDERR log
>
>
>
> Terminal output
>
>
>
> [edureka@localhost spark-1.5.2]$ bin/spark-submit --class
> org.test.scala.twitterAnalysis --master spark://localhost.localdomain:7077
> --jars file:///home/edureka/TransferredJars/twitter-0.0.1-
> SNAPSHOT-jar-with-dependencies.jar  
> /home/edureka/TransferredJars/twitter-0.0.1-SNAPSHOT.jar
> #apple #spark #currency #iphone
>
> 16/11/20 15:48:31 INFO spark.SparkContext: Running Spark version 1.5.2
>
> 16/11/20 15:48:32 WARN util.NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
>
> 16/11/20 15:48:32 WARN util.Utils: Your hostname, localhost.localdomain
> resolves to a loopback address: 127.0.0.1; using 192.168.0.107 instead (on
> interface eth3)
>
> 16/11/20 15:48:32 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind
> to another address
>
> 16/11/20 15:48:32 INFO spark.SecurityManager: Changing view acls to:
> edureka
>
> 16/11/20 15:48:32 INFO spark.SecurityManager: Changing modify acls to:
> edureka
>
> 16/11/20 15:48:32 INFO 

How do I access the nested field in a dataframe, spark Streaming app... Please help.

2016-11-20 Thread shyla deshpande
The following my dataframe schema

root
 |-- name: string (nullable = true)
 |-- addresses: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- street: string (nullable = true)
 |||-- city: string (nullable = true)

I want to output name and city. The following is my spark streaming app
which outputs name and addresses, but I want name and cities in the output.

object PersonConsumer {
  import org.apache.spark.sql.{SQLContext, SparkSession}
  import com.example.protos.demo._

  def main(args : Array[String]) {

val spark = SparkSession.builder.
  master("local")
  .appName("spark session example")
  .getOrCreate()

import spark.implicits._

val ds1 = spark.readStream.format("kafka").
  option("kafka.bootstrap.servers","localhost:9092").
  option("subscribe","person").load()

val ds2 = ds1.map(row=>
row.getAs[Array[Byte]]("value")).map(Person.parseFrom(_)).select($"name",
$"addresses")

ds2.printSchema()

val query = ds2.writeStream
  .outputMode("append")
  .format("console")
  .start()

query.awaitTermination()
  }
}

Appreciate your help. Thanks.


Re: Flume integration

2016-11-20 Thread Mich Talebzadeh
Thanks Ian.

Was your source of Flume IBM/MQ by any chance?



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 20 November 2016 at 16:40, Ian Brooks  wrote:

> Hi Mich,
>
>
>
> Yes, i managed to resolve this one. The issue was because the way
> described in the docs doesn't work properly as in order for the Flume part
> to be notified you need to set the storageLevel on the PollingStream like
>
>
>
> JavaReceiverInputDStream flumeStream = 
> FlumeUtils.createPollingStream(ssc,
> addresses, StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 10);
>
>
>
>
>
> After setting this, the data is correclty maked as processed by the SPARK
> reveiver and the Flume sink is notified.
>
>
>
> -Ian
>
>
>
>
>
> > Hi Ian,
>
> >
>
> > Has this been resolved?
>
> >
>
> > How about data to Flume and then Kafka and Kafka streaming into Spark?
>
> >
>
> > Thanks
>
> >
>
> > Dr Mich Talebzadeh
>
> >
>
> >
>
> >
>
> > LinkedIn *
>
> > https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCd
> OABU
>
> > rV8Pw
>
> >  OAB
>
> > UrV8Pw>*
>
> >
>
> >
>
> >
>
> > http://talebzadehmich.wordpress.com
>
> >
>
> >
>
> > *Disclaimer:* Use it at your own risk. Any and all responsibility for any
>
> > loss, damage or destruction of data or any other property which may arise
>
> > from relying on this email's technical content is explicitly disclaimed.
>
> > The author will in no case be liable for any monetary damages arising
> from
>
> > such loss, damage or destruction.
>
> >
>
> > On 13 July 2016 at 11:13, Ian Brooks  wrote:
>
> > > Hi,
>
> > >
>
> > >
>
> > >
>
> > > I'm currently trying to implement a prototype Spark application that
> gets
>
> > > data from Flume and processes it. I'm using the pull based method
>
> > > mentioned
>
> > > in https://spark.apache.org/docs/1.6.1/streaming-flume-
> integration.html
>
> > >
>
> > >
>
> > >
>
> > > The is initially working fine for getting data from Flume, however the
>
> > > Spark client doesn't appear to be letting Flume know that the data has
>
> > > been
>
> > > received, so Flume doesn't remove it from the batch.
>
> > >
>
> > >
>
> > >
>
> > > After 100 requests Flume stops allowing any new data and logs
>
> > >
>
> > >
>
> > >
>
> > > 08 Jul 2016 14:59:00,265 WARN [Spark Sink Processor Thread - 5]
>
> > > (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80) -
>
> > > Error while processing transaction.
>
> > > org.apache.flume.ChannelException: Take list for MemoryTransaction,
>
> > > capacity 100 full, consider committing more frequently, increasing
>
> > > capacity, or increasing thread count
>
> > >
>
> > > at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
>
> > >
>
> > > MemoryChannel.java:96)
>
> > >
>
> > >
>
> > >
>
> > > My code to pull the data from Flume is
>
> > >
>
> > >
>
> > >
>
> > > SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> > >
>
> > > Duration batchInterval = new Duration(1);
>
> > >
>
> > > final String checkpointDir = "/tmp/";
>
> > >
>
> > >
>
> > >
>
> > > final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>
> > > batchInterval);
>
> > >
>
> > > ssc.checkpoint(checkpointDir);
>
> > >
>
> > > JavaReceiverInputDStream flumeStream =
>
> > > FlumeUtils.createPollingStream(ssc, host, port);
>
> > >
>
> > >
>
> > >
>
> > > // Transform each flume avro event to a process-able format
>
> > >
>
> > > JavaDStream transformedEvents = flumeStream.map(new
>
> > > Function() {
>
> > >
>
> > >
>
> > >
>
> > > @Override
>
> > >
>
> > > public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> > >
>
> > > String flumeEventStr = flumeEvent.event().toString();
>
> > >
>
> > > avroData avroData = new avroData();
>
> > >
>
> > > Gson gson = new GsonBuilder().create();
>
> > >
>
> > > avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> > >
>
> > > HashMap body = avroData.getBody();
>
> > >
>
> > > String data = body.get("bytes");
>
> > >
>
> > > return data;
>
> > >
>
> > > }
>
> > >
>
> > > });
>
> > >
>
> > >
>
> > >
>
> > > ...
>
> > >
>
> > >
>
> > >
>
> > > ssc.start();
>
> > >
>
> > > ssc.awaitTermination();
>
> > >
>
> > > ssc.close();
>
> > >
>
> > > }
>
> > >
>
> > >
>
> > >
>
> > > Is there something specific I should be doing to let the Flume server
> know
>
> > > the batch has been 

Re: covert local tsv file to orc file on distributed cloud storage(openstack).

2016-11-20 Thread Steve Loughran

On 19 Nov 2016, at 17:21, vr spark 
> wrote:

Hi,
I am looking for scala or python code samples to covert local tsv file to orc 
file and store on distributed cloud storage(openstack).

So, need these 3 samples. Please suggest.

1. read tsv
2. convert to orc
3. store on distributed cloud storage


thanks
VR

all options, 9 lines of code, assuming a spark context has already been setup 
with the permissions to write to AWS, and the relevant JARs for S3A to work on 
the CP. The read operation is inefficient as to determine the schema it scans 
the (here, remote) file twice; that may be OK for an example, but I wouldn't do 
that in production. The source is a real file belonging to amazon; dest a 
bucket of mine.

More details at: 
http://www.slideshare.net/steve_l/apache-spark-and-object-stores


val csvdata = spark.read.options(Map(
  "header" -> "true",
  "ignoreLeadingWhiteSpace" -> "true",
  "ignoreTrailingWhiteSpace" -> "true",
  "timestampFormat" -> "-MM-dd HH:mm:ss.SSSZZZ",
  "inferSchema" -> "true",
  "mode" -> "FAILFAST"))
.csv("s3a://landsat-pds/scene_list.gz")
csvdata.write.mode("overwrite").orc("s3a://hwdev-stevel-demo2/landsatOrc")


Re: Flume integration

2016-11-20 Thread Ian Brooks
Hi Mich,

Yes, i managed to resolve this one. The issue was because the way described in 
the docs 
doesn't work properly as in order for the Flume part to be notified you need to 
set the 
storageLevel on the PollingStream like

JavaReceiverInputDStream flumeStream = 
FlumeUtils.createPollingStream(ssc, addresses, 
StorageLevel.MEMORY_AND_DISK_SER_2(), 100, 10);


After setting this, the data is correclty maked as processed by the SPARK 
reveiver and the 
Flume sink is notified.

-Ian


> Hi Ian,
> 
> Has this been resolved?
> 
> How about data to Flume and then Kafka and Kafka streaming into Spark?
> 
> Thanks
> 
> Dr Mich Talebzadeh
> 
> 
> 
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABU
> rV8Pw
>  UrV8Pw>*
> 
> 
> 
> http://talebzadehmich.wordpress.com
> 
> 
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
> 

Re: Usage of mllib api in ml

2016-11-20 Thread Marco Mistroni
Hi
If it is an rdd based can't u use data frame.rdd (though I don't know if u
will have an rdd of vectorsu might need to convert each row to a vector
yourself..
Hth

On 20 Nov 2016 4:29 pm, "janardhan shetty"  wrote:

> Hi Marco and Yanbo,
>
> It is not the usage of MulticlassClassificationEvaluator. Probably I was
> not clear.  Let me explain:
>
> I am trying to use confusionMatrix which is not present in
> MulticlassClassificationEvaluator ml version where as it is present in
> MulticlassMetrics of mllib.
> How to leverage RDD version  using ml dataframes ?
>
> *mllib*: MulticlassMetrics
> *ml*: MulticlassClassificationEvaluator
>
> On Sun, Nov 20, 2016 at 4:52 AM, Marco Mistroni 
> wrote:
>
>> Hi
>>   you can also have a look at this example,
>>
>> https://github.com/sryza/aas/blob/master/ch04-rdf/src/main/s
>> cala/com/cloudera/datascience/rdf/RunRDF.scala#L220
>>
>> kr
>>  marco
>>
>> On Sun, Nov 20, 2016 at 9:09 AM, Yanbo Liang  wrote:
>>
>>> You can refer this example(http://spark.apache.or
>>> g/docs/latest/ml-tuning.html#example-model-selection-via-cro
>>> ss-validation) which use BinaryClassificationEvaluator, and it should
>>> be very straightforward to switch to MulticlassClassificationEvaluator.
>>>
>>> Thanks
>>> Yanbo
>>>
>>> On Sat, Nov 19, 2016 at 9:03 AM, janardhan shetty <
>>> janardhan...@gmail.com> wrote:
>>>
 Hi,

 I am trying to use the evaluation metrics offered by mllib
 multiclassmetrics in ml dataframe setting.
 Is there any examples how to use it?

>>>
>>>
>>
>


Re: Usage of mllib api in ml

2016-11-20 Thread janardhan shetty
Hi Marco and Yanbo,

It is not the usage of MulticlassClassificationEvaluator. Probably I was
not clear.  Let me explain:

I am trying to use confusionMatrix which is not present in
MulticlassClassificationEvaluator ml version where as it is present in
MulticlassMetrics of mllib.
How to leverage RDD version  using ml dataframes ?

*mllib*: MulticlassMetrics
*ml*: MulticlassClassificationEvaluator

On Sun, Nov 20, 2016 at 4:52 AM, Marco Mistroni  wrote:

> Hi
>   you can also have a look at this example,
>
> https://github.com/sryza/aas/blob/master/ch04-rdf/src/main/
> scala/com/cloudera/datascience/rdf/RunRDF.scala#L220
>
> kr
>  marco
>
> On Sun, Nov 20, 2016 at 9:09 AM, Yanbo Liang  wrote:
>
>> You can refer this example(http://spark.apache.or
>> g/docs/latest/ml-tuning.html#example-model-selection-via-cross-validation)
>> which use BinaryClassificationEvaluator, and it should be very
>> straightforward to switch to MulticlassClassificationEvaluator.
>>
>> Thanks
>> Yanbo
>>
>> On Sat, Nov 19, 2016 at 9:03 AM, janardhan shetty > > wrote:
>>
>>> Hi,
>>>
>>> I am trying to use the evaluation metrics offered by mllib
>>> multiclassmetrics in ml dataframe setting.
>>> Is there any examples how to use it?
>>>
>>
>>
>


Re: Will spark cache table once even if I call read/cache on the same table multiple times

2016-11-20 Thread Yong Zhang
If you have 2 different RDD (as 2 different references and RDD ID shown in your 
example), then YES, Spark will cache 2 exactly same thing in the memory.


There is no way that spark will compare and know that they are the same 
content. You define them as 2 RDD, then they are different RDDs, and will be 
cached individually.


Yong



From: Taotao.Li 
Sent: Sunday, November 20, 2016 6:18 AM
To: Rabin Banerjee
Cc: Yong Zhang; user; Mich Talebzadeh; Tathagata Das
Subject: Re: Will spark cache table once even if I call read/cache on the same 
table multiple times

hi, you can check my stackoverflow question : 
http://stackoverflow.com/questions/36195105/what-happens-if-i-cache-the-same-rdd-twice-in-spark/36195812#36195812

On Sat, Nov 19, 2016 at 3:16 AM, Rabin Banerjee 
> wrote:
Hi Yong,

  But every time  val tabdf = sqlContext.table(tablename) is called tabdf.rdd 
is having a new id which can be checked by calling 
tabdf.rdd.id .
And,
https://github.com/apache/spark/blob/b6de0c98c70960a97b07615b0b08fbd8f900fbe7/core/src/main/scala/org/apache/spark/SparkContext.scala#L268

Spark is maintaining the Map if [RDD_ID,RDD] , as RDD id is changing , will 
spark cache same data again and again ??

For example ,

val tabdf = sqlContext.table("employee")
tabdf.cache()
tabdf.someTransformation.someAction
println(tabledf.rdd.id)
val tabdf1 = sqlContext.table("employee")
tabdf1.cache() <= Will spark again go to disk read and load data into memory or 
look into cache ?
tabdf1.someTransformation.someAction
println(tabledf1.rdd.id)

Regards,
R Banerjee




On Fri, Nov 18, 2016 at 9:14 PM, Yong Zhang 
> wrote:

That's correct, as long as you don't change the StorageLevel.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L166



Yong


From: Rabin Banerjee 
>
Sent: Friday, November 18, 2016 10:36 AM
To: user; Mich Talebzadeh; Tathagata Das
Subject: Will spark cache table once even if I call read/cache on the same 
table multiple times

Hi All ,

  I am working in a project where code is divided into multiple reusable module 
. I am not able to understand spark persist/cache on that context.

My Question is Will spark cache table once even if I call read/cache on the 
same table multiple times ??

 Sample Code ::

  TableReader::

   def getTableDF(tablename:String,persist:Boolean = false) : DataFrame = {
 val tabdf = sqlContext.table(tablename)
 if(persist) {
 tabdf.cache()
}
  return tableDF
}

 Now
Module1::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction

Module2::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction



ModuleN::
 val emp = TableReader.getTable("employee")
 emp.someTransformation.someAction

Will spark cache emp table once , or it will cache every time I am calling ?? 
Shall I maintain a global hashmap to handle that ? something like 
Map[String,DataFrame] ??

 Regards,
Rabin Banerjee







--
___
Quant | Engineer | Boy
___
blog:
http://litaotao.github.io
github: www.github.com/litaotao


Re: DataFrame select non-existing column

2016-11-20 Thread Kristoffer Sjögren
The problem is that I do not know which data frames has the
pass.mobile column. I just list a HDFS directory which contain the
parquet files and some files has the column and some don't. I really
don't want to have conditional logic that inspect the schema. But
maybe that's the only option?

Maybe I misunderstand you, but the following code fails with the same
error as before.

DataFrame dataFrame = ctx.read().parquet(localPath)
.select("pass")
.withColumn("mobile", col("pass.mobile"));


The flatten option works for my use case. But the problem is that
there seems to be no way of dropping nested columns, i.e.
drop("pass.auction")


On Sun, Nov 20, 2016 at 10:55 AM, Mendelson, Assaf
 wrote:
> The issue is that you already have a struct called pass. What you did was add 
> a new columned called "pass.mobile" instead of adding the element to pass - 
> The schema for pass element is the same as before.
> When you do select pass.mobile, it finds the pass structure and checks for 
> mobile in it.
>
> You can do it the other way around: set the name to be: pass_mobile. Add it 
> as before with lit(0) for those that dataframes that do not have the mobile 
> field and do something like withColumn("pass_mobile", df["pass.modile"]) for 
> those that do.
> Another option is to use do something like df.select("pass.*") to flatten the 
> pass structure and work on that (then you can do withColumn("mobile",...) 
> instead of "pass.mobile") but this would change the schema.
>
>
> -Original Message-
> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
> Sent: Saturday, November 19, 2016 4:57 PM
> To: Mendelson, Assaf
> Cc: user
> Subject: Re: DataFrame select non-existing column
>
> Thanks. Here's my code example [1] and the printSchema() output [2].
>
> This code still fails with the following message: "No such struct field 
> mobile in auction, geo"
>
> By looking at the schema, it seems that pass.mobile did not get nested, which 
> is the way it needs to be for my use case. Is nested columns not supported by 
> withColumn()?
>
> [1]
>
> DataFrame df = ctx.read().parquet(localPath).withColumn("pass.mobile", 
> lit(0L)); dataFrame.printSchema(); dataFrame.select("pass.mobile");
>
> [2]
>
> root
>  |-- pass: struct (nullable = true)
>  ||-- auction: struct (nullable = true)
>  |||-- id: integer (nullable = true)
>  ||-- geo: struct (nullable = true)
>  |||-- postalCode: string (nullable = true)
>  |-- pass.mobile: long (nullable = false)
>
> On Sat, Nov 19, 2016 at 7:45 AM, Mendelson, Assaf  
> wrote:
>> In pyspark for example you would do something like:
>>
>> df.withColumn("newColName",pyspark.sql.functions.lit(None))
>>
>> Assaf.
>> -Original Message-
>> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
>> Sent: Friday, November 18, 2016 9:19 PM
>> To: Mendelson, Assaf
>> Cc: user
>> Subject: Re: DataFrame select non-existing column
>>
>> Thanks for your answer. I have been searching the API for doing that but I 
>> could not find how to do it?
>>
>> Could you give me a code snippet?
>>
>> On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf  
>> wrote:
>>> You can always add the columns to old dataframes giving them null (or some 
>>> literal) as a preprocessing.
>>>
>>> -Original Message-
>>> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
>>> Sent: Friday, November 18, 2016 4:32 PM
>>> To: user
>>> Subject: DataFrame select non-existing column
>>>
>>> Hi
>>>
>>> We have evolved a DataFrame by adding a few columns but cannot write select 
>>> statements on these columns for older data that doesn't have them since 
>>> they fail with a AnalysisException with message "No such struct field".
>>>
>>> We also tried dropping columns but this doesn't work for nested columns.
>>>
>>> Any non-hacky ways to get around this?
>>>
>>> Cheers,
>>> -Kristoffer
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Usage of mllib api in ml

2016-11-20 Thread Marco Mistroni
Hi
  you can also have a look at this example,

https://github.com/sryza/aas/blob/master/ch04-rdf/src/main/scala/com/cloudera/datascience/rdf/RunRDF.scala#L220

kr
 marco

On Sun, Nov 20, 2016 at 9:09 AM, Yanbo Liang  wrote:

> You can refer this example(http://spark.apache.org/docs/latest/ml-tuning.
> html#example-model-selection-via-cross-validation) which use
> BinaryClassificationEvaluator, and it should be very straightforward to
> switch to MulticlassClassificationEvaluator.
>
> Thanks
> Yanbo
>
> On Sat, Nov 19, 2016 at 9:03 AM, janardhan shetty 
> wrote:
>
>> Hi,
>>
>> I am trying to use the evaluation metrics offered by mllib
>> multiclassmetrics in ml dataframe setting.
>> Is there any examples how to use it?
>>
>
>


Re: Flume integration

2016-11-20 Thread Mich Talebzadeh
Hi Ian,

Has this been resolved?

How about data to Flume and then Kafka and Kafka streaming into Spark?

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 13 July 2016 at 11:13, Ian Brooks  wrote:

> Hi,
>
>
>
> I'm currently trying to implement a prototype Spark application that gets
> data from Flume and processes it. I'm using the pull based method mentioned
> in https://spark.apache.org/docs/1.6.1/streaming-flume-integration.html
>
>
>
> The is initially working fine for getting data from Flume, however the
> Spark client doesn't appear to be letting Flume know that the data has been
> received, so Flume doesn't remove it from the batch.
>
>
>
> After 100 requests Flume stops allowing any new data and logs
>
>
>
> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
> Error while processing transaction.
> org.apache.flume.ChannelException: Take list for MemoryTransaction,
> capacity 100 full, consider committing more frequently, increasing
> capacity, or increasing thread count
>at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(
> MemoryChannel.java:96)
>
>
>
> My code to pull the data from Flume is
>
>
>
> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>
> Duration batchInterval = new Duration(1);
>
> final String checkpointDir = "/tmp/";
>
>
>
> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
> batchInterval);
>
> ssc.checkpoint(checkpointDir);
>
> JavaReceiverInputDStream flumeStream = 
> FlumeUtils.createPollingStream(ssc,
> host, port);
>
>
>
> // Transform each flume avro event to a process-able format
>
> JavaDStream transformedEvents = flumeStream.map(new
> Function() {
>
>
>
> @Override
>
> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>
> String flumeEventStr = flumeEvent.event().toString();
>
> avroData avroData = new avroData();
>
> Gson gson = new GsonBuilder().create();
>
> avroData = gson.fromJson(flumeEventStr, avroData.class);
>
> HashMap body = avroData.getBody();
>
> String data = body.get("bytes");
>
> return data;
>
> }
>
> });
>
>
>
> ...
>
>
>
> ssc.start();
>
> ssc.awaitTermination();
>
> ssc.close();
>
> }
>
>
>
> Is there something specific I should be doing to let the Flume server know
> the batch has been received and processed?
>
>
> --
>
> Ian Brooks
>
>
>


Re: Will spark cache table once even if I call read/cache on the same table multiple times

2016-11-20 Thread Taotao.Li
hi, you can check my stackoverflow question :
http://stackoverflow.com/questions/36195105/what-happens-if-i-cache-the-same-rdd-twice-in-spark/36195812#36195812

On Sat, Nov 19, 2016 at 3:16 AM, Rabin Banerjee <
dev.rabin.baner...@gmail.com> wrote:

> Hi Yong,
>
>   But every time  val tabdf = sqlContext.table(tablename) is called tabdf.rdd
> is having a new id which can be checked by calling tabdf.rdd.id .
> And,
> https://github.com/apache/spark/blob/b6de0c98c70960a97b07615b0b08fb
> d8f900fbe7/core/src/main/scala/org/apache/spark/SparkContext.scala#L268
>
> Spark is maintaining the Map if [RDD_ID,RDD] , as RDD id is changing ,
> will spark cache same data again and again ??
>
> For example ,
>
> val tabdf = sqlContext.table("employee")
> tabdf.cache()
> tabdf.someTransformation.someAction
> println(tabledf.rdd.id)
> val tabdf1 = sqlContext.table("employee")
> tabdf1.cache() <= *Will spark again go to disk read and load data into
> memory or look into cache ?*
> tabdf1.someTransformation.someAction
> println(tabledf1.rdd.id)
>
> Regards,
> R Banerjee
>
>
>
>
> On Fri, Nov 18, 2016 at 9:14 PM, Yong Zhang  wrote:
>
>> That's correct, as long as you don't change the StorageLevel.
>>
>>
>> https://github.com/apache/spark/blob/master/core/src/main/
>> scala/org/apache/spark/rdd/RDD.scala#L166
>>
>>
>>
>> Yong
>>
>> --
>> *From:* Rabin Banerjee 
>> *Sent:* Friday, November 18, 2016 10:36 AM
>> *To:* user; Mich Talebzadeh; Tathagata Das
>> *Subject:* Will spark cache table once even if I call read/cache on the
>> same table multiple times
>>
>> Hi All ,
>>
>>   I am working in a project where code is divided into multiple reusable
>> module . I am not able to understand spark persist/cache on that context.
>>
>> My Question is Will spark cache table once even if I call read/cache on
>> the same table multiple times ??
>>
>>  Sample Code ::
>>
>>   TableReader::
>>
>>def getTableDF(tablename:String,persist:Boolean = false) : DataFrame
>> = {
>>  val tabdf = sqlContext.table(tablename)
>>  if(persist) {
>>  tabdf.cache()
>> }
>>   return tableDF
>> }
>>
>>  Now
>> Module1::
>>  val emp = TableReader.getTable("employee")
>>  emp.someTransformation.someAction
>>
>> Module2::
>>  val emp = TableReader.getTable("employee")
>>  emp.someTransformation.someAction
>>
>> 
>>
>> ModuleN::
>>  val emp = TableReader.getTable("employee")
>>  emp.someTransformation.someAction
>>
>> Will spark cache emp table once , or it will cache every time I am
>> calling ?? Shall I maintain a global hashmap to handle that ? something
>> like Map[String,DataFrame] ??
>>
>>  Regards,
>> Rabin Banerjee
>>
>>
>>
>>
>


-- 
*___*
Quant | Engineer | Boy
*___*
*blog*:http://litaotao.github.io

*github*: www.github.com/litaotao


Using Flume as Input Stream to Spark

2016-11-20 Thread Mich Talebzadeh
Hi,

For streaming data I have used both Kafka and Twitter as Spark has
receivers for both and procedures for Kafka streaming are well established.

However, I have not used Flume as feeds to Spark streaming and to the best
of my knowledge I have not seen any discussion on this in this forum.

I was wondering if this is a tried and tested as opposed experimental one?
For example this Spark doc
talks
about Flume integration.

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.


RE: DataFrame select non-existing column

2016-11-20 Thread Mendelson, Assaf
The issue is that you already have a struct called pass. What you did was add a 
new columned called "pass.mobile" instead of adding the element to pass - The 
schema for pass element is the same as before.
When you do select pass.mobile, it finds the pass structure and checks for 
mobile in it.

You can do it the other way around: set the name to be: pass_mobile. Add it as 
before with lit(0) for those that dataframes that do not have the mobile field 
and do something like withColumn("pass_mobile", df["pass.modile"]) for those 
that do.
Another option is to use do something like df.select("pass.*") to flatten the 
pass structure and work on that (then you can do withColumn("mobile",...) 
instead of "pass.mobile") but this would change the schema.


-Original Message-
From: Kristoffer Sjögren [mailto:sto...@gmail.com] 
Sent: Saturday, November 19, 2016 4:57 PM
To: Mendelson, Assaf
Cc: user
Subject: Re: DataFrame select non-existing column

Thanks. Here's my code example [1] and the printSchema() output [2].

This code still fails with the following message: "No such struct field mobile 
in auction, geo"

By looking at the schema, it seems that pass.mobile did not get nested, which 
is the way it needs to be for my use case. Is nested columns not supported by 
withColumn()?

[1]

DataFrame df = ctx.read().parquet(localPath).withColumn("pass.mobile", 
lit(0L)); dataFrame.printSchema(); dataFrame.select("pass.mobile");

[2]

root
 |-- pass: struct (nullable = true)
 ||-- auction: struct (nullable = true)
 |||-- id: integer (nullable = true)
 ||-- geo: struct (nullable = true)
 |||-- postalCode: string (nullable = true)
 |-- pass.mobile: long (nullable = false)

On Sat, Nov 19, 2016 at 7:45 AM, Mendelson, Assaf  
wrote:
> In pyspark for example you would do something like:
>
> df.withColumn("newColName",pyspark.sql.functions.lit(None))
>
> Assaf.
> -Original Message-
> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
> Sent: Friday, November 18, 2016 9:19 PM
> To: Mendelson, Assaf
> Cc: user
> Subject: Re: DataFrame select non-existing column
>
> Thanks for your answer. I have been searching the API for doing that but I 
> could not find how to do it?
>
> Could you give me a code snippet?
>
> On Fri, Nov 18, 2016 at 8:03 PM, Mendelson, Assaf  
> wrote:
>> You can always add the columns to old dataframes giving them null (or some 
>> literal) as a preprocessing.
>>
>> -Original Message-
>> From: Kristoffer Sjögren [mailto:sto...@gmail.com]
>> Sent: Friday, November 18, 2016 4:32 PM
>> To: user
>> Subject: DataFrame select non-existing column
>>
>> Hi
>>
>> We have evolved a DataFrame by adding a few columns but cannot write select 
>> statements on these columns for older data that doesn't have them since they 
>> fail with a AnalysisException with message "No such struct field".
>>
>> We also tried dropping columns but this doesn't work for nested columns.
>>
>> Any non-hacky ways to get around this?
>>
>> Cheers,
>> -Kristoffer
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>


Re: Usage of mllib api in ml

2016-11-20 Thread Yanbo Liang
You can refer this example(
http://spark.apache.org/docs/latest/ml-tuning.html#example-model-selection-via-cross-validation)
which use BinaryClassificationEvaluator, and it should be very
straightforward to switch to MulticlassClassificationEvaluator.

Thanks
Yanbo

On Sat, Nov 19, 2016 at 9:03 AM, janardhan shetty 
wrote:

> Hi,
>
> I am trying to use the evaluation metrics offered by mllib
> multiclassmetrics in ml dataframe setting.
> Is there any examples how to use it?
>