Re: spark streaming socket read issue

2017-06-30 Thread Shixiong(Ryan) Zhu
Could you show the codes that start the StreamingQuery from Dataset?. If
you don't call `writeStream.start(...)`, it won't run anything.

On Fri, Jun 30, 2017 at 6:47 AM, pradeepbill  wrote:

> hi there, I have a spark streaming issue that i am not able to figure out ,
> below code reads from a socket, but I don't see any input going into the
> job, I have nc -l  running, and dumping data though, not sure why my
> spark job is not able to read data from  10.176.110.112:.Please
> advice.
>
> Dataset d = sparkSession.readStream().format("socket")
> .option("host",
> "10.176.110.112").option("port", ).load();
>
>
> thanks
> Pradeep
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/spark-streaming-socket-read-issue-tp28813.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Interesting Stateful Streaming question

2017-06-30 Thread Michael Armbrust
This does sound like a good use case for that feature.  Note that Spark
2.2. adds a similar [flat]MapGroupsWithState operation to structured
streaming.  Stay tuned for a blog post on that!

On Thu, Jun 29, 2017 at 6:11 PM, kant kodali  wrote:

> Is mapWithState an answer for this ? https://databricks.com/blog/
> 2016/02/01/faster-stateful-stream-processing-in-apache-
> spark-streaming.html
>
> On Thu, Jun 29, 2017 at 11:55 AM, kant kodali  wrote:
>
>> Hi All,
>>
>> Here is a problem and I am wondering if Spark Streaming is the right tool
>> for this ?
>>
>> I have stream of messages m1, m2, m3and each of those messages can be
>> in state s1, s2, s3,sn (you can imagine the number of states are about
>> 100) and I want to compute some metrics that visit all the states from s1
>> to sn but these state transitions can happen at indefinite amount of
>> time. A simple example of that would be count all messages that visited
>> state s1, s2, s3. Other words, the transition function should know that say
>> message m1 had visited state s1 and s2 but not s3 yet and once the message
>> m1 visits s3 increment the counter +=1 .
>>
>> If it makes anything easier I can say a message has to visit s1 before
>> visiting s2 and s2 before visiting s3 and so on but would like to know both
>> with and without order.
>>
>> Thanks!
>>
>>
>


Re: Withcolumn date with sysdate

2017-06-30 Thread Pralabh Kumar
put default value inside lit

df.withcolumn("date",lit("constant value"))

On Fri, Jun 30, 2017 at 10:20 PM, sudhir k  wrote:

> Can we add a column to dataframe with a default value like sysdate .. I am
> calling my udf but it is throwing error col expected .
>
> On spark shell
> df.withcolumn("date",curent_date) works I need similiar for scala program
> which I can build in a jar
>
>
> Thanks,
> Sudhir
> --
> Sent from Gmail Mobile
>


Withcolumn date with sysdate

2017-06-30 Thread sudhir k
Can we add a column to dataframe with a default value like sysdate .. I am
calling my udf but it is throwing error col expected .

On spark shell
df.withcolumn("date",curent_date) works I need similiar for scala program
which I can build in a jar


Thanks,
Sudhir
-- 
Sent from Gmail Mobile


Re: PySpark working with Generators

2017-06-30 Thread Jörn Franke
In this case i do not see so many benefits of using Spark. Is the data volume 
high?
Alternatively i recommend to convert the proprietary format into a format 
Sparks understand and then use this format in Spark.
Another alternative would be to write a custom Spark datasource. Even your 
proprietary format should be then able to be put on HDFS.
That being said, I do not recommend to use more cores outside Sparks control. 
The reason is that Spark thinks these core are free and does the wrong 
allocation of executors/tasks. This will slow down all applications on Spark.

May I ask what the format is called?

> On 30. Jun 2017, at 16:05, Saatvik Shah  wrote:
> 
> Hi Mahesh and Ayan,
> 
> The files I'm working with are a very complex proprietary format, for whom I 
> only have access to a reader function as I had described earlier which only 
> accepts a path to a local file system.
> This rules out sc.wholeTextFile - since I cannot pass the contents of 
> wholeTextFile to an function(API call) expecting a local file path.
> For similar reasons, I cannot use HDFS and am bound to using a highly 
> available Network File System arrangement currently.
> Any suggestions, given these constraints? Or any incorrect assumptions you'll 
> think I've made?
> 
> Thanks and Regards,
> Saatvik Shah
> 
>  
> 
>> On Fri, Jun 30, 2017 at 12:50 AM, Mahesh Sawaiker 
>>  wrote:
>> Wouldn’t this work if you load the files in hdfs and let the partitions be 
>> equal to the amount of parallelism you want?
>> 
>>  
>> 
>> From: Saatvik Shah [mailto:saatvikshah1...@gmail.com] 
>> Sent: Friday, June 30, 2017 8:55 AM
>> To: ayan guha
>> Cc: user
>> Subject: Re: PySpark working with Generators
>> 
>>  
>> 
>> Hey Ayan,
>> 
>>  
>> 
>> This isnt a typical text file - Its a proprietary data format for which a 
>> native Spark reader is not available.
>> 
>>  
>> 
>> Thanks and Regards,
>> 
>> Saatvik Shah
>> 
>>  
>> 
>> On Thu, Jun 29, 2017 at 6:48 PM, ayan guha  wrote:
>> 
>> If your files are in same location you can use sc.wholeTextFile. If not, 
>> sc.textFile accepts a list of filepaths.
>> 
>>  
>> 
>> On Fri, 30 Jun 2017 at 5:59 am, saatvikshah1994  
>> wrote:
>> 
>> Hi,
>> 
>> I have this file reading function is called /foo/ which reads contents into
>> a list of lists or into a generator of list of lists representing the same
>> file.
>> 
>> When reading as a complete chunk(1 record array) I do something like:
>> rdd = file_paths_rdd.map(lambda x: foo(x,"wholeFile")).flatMap(lambda x:x)
>> 
>> I'd like to now do something similar but with the generator, so that I can
>> work with more cores and a lower memory. I'm not sure how to tackle this
>> since generators cannot be pickled and thus I'm not sure how to ditribute
>> the work of reading each file_path on the rdd?
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-working-with-Generators-tp28810.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 
>> --
>> 
>> Best Regards,
>> Ayan Guha
>> 
>>  
>> 
>> DISCLAIMER
>> ==
>> This e-mail may contain privileged and confidential information which is the 
>> property of Persistent Systems Ltd. It is intended only for the use of the 
>> individual or entity to which it is addressed. If you are not the intended 
>> recipient, you are not authorized to read, retain, copy, print, distribute 
>> or use this message. If you have received this communication in error, 
>> please notify the sender and delete all copies of this message. Persistent 
>> Systems  Ltd. does not accept any liability for virus infected mails.


Re: PySpark working with Generators

2017-06-30 Thread Saatvik Shah
Hi Mahesh and Ayan,

The files I'm working with are a very complex proprietary format, for whom
I only have access to a reader function as I had described earlier which
only accepts a path to a local file system.
This rules out sc.wholeTextFile - since I cannot pass the contents of
wholeTextFile to an function(API call) expecting a local file path.
For similar reasons, I cannot use HDFS and am bound to using a highly
available Network File System arrangement currently.
Any suggestions, given these constraints? Or any incorrect assumptions
you'll think I've made?

Thanks and Regards,
Saatvik Shah



On Fri, Jun 30, 2017 at 12:50 AM, Mahesh Sawaiker <
mahesh_sawai...@persistent.com> wrote:

> Wouldn’t this work if you load the files in hdfs and let the partitions be
> equal to the amount of parallelism you want?
>
>
>
> *From:* Saatvik Shah [mailto:saatvikshah1...@gmail.com]
> *Sent:* Friday, June 30, 2017 8:55 AM
> *To:* ayan guha
> *Cc:* user
> *Subject:* Re: PySpark working with Generators
>
>
>
> Hey Ayan,
>
>
>
> This isnt a typical text file - Its a proprietary data format for which a
> native Spark reader is not available.
>
>
>
> Thanks and Regards,
>
> Saatvik Shah
>
>
>
> On Thu, Jun 29, 2017 at 6:48 PM, ayan guha  wrote:
>
> If your files are in same location you can use sc.wholeTextFile. If not,
> sc.textFile accepts a list of filepaths.
>
>
>
> On Fri, 30 Jun 2017 at 5:59 am, saatvikshah1994 
> wrote:
>
> Hi,
>
> I have this file reading function is called /foo/ which reads contents into
> a list of lists or into a generator of list of lists representing the same
> file.
>
> When reading as a complete chunk(1 record array) I do something like:
> rdd = file_paths_rdd.map(lambda x: foo(x,"wholeFile")).flatMap(lambda x:x)
>
> I'd like to now do something similar but with the generator, so that I can
> work with more cores and a lower memory. I'm not sure how to tackle this
> since generators cannot be pickled and thus I'm not sure how to ditribute
> the work of reading each file_path on the rdd?
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/PySpark-working-with-Generators-tp28810.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
>
> Best Regards,
> Ayan Guha
>
>
> DISCLAIMER
> ==
> This e-mail may contain privileged and confidential information which is
> the property of Persistent Systems Ltd. It is intended only for the use of
> the individual or entity to which it is addressed. If you are not the
> intended recipient, you are not authorized to read, retain, copy, print,
> distribute or use this message. If you have received this communication in
> error, please notify the sender and delete all copies of this message.
> Persistent Systems Ltd. does not accept any liability for virus infected
> mails.
>


spark streaming socket read issue

2017-06-30 Thread pradeepbill
hi there, I have a spark streaming issue that i am not able to figure out ,
below code reads from a socket, but I don't see any input going into the
job, I have nc -l  running, and dumping data though, not sure why my
spark job is not able to read data from  10.176.110.112:.Please advice.

Dataset d = sparkSession.readStream().format("socket")
.option("host", 
"10.176.110.112").option("port", ).load();


thanks
Pradeep




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-socket-read-issue-tp28813.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: about broadcast join of base table in spark sql

2017-06-30 Thread Yong Zhang
Or since you already use the DataFrame API, instead of SQL, you can add the 
broadcast function to force it.


https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/sql/functions.html#broadcast(org.apache.spark.sql.DataFrame)


Yong

functions - Apache 
Spark
spark.apache.org
Computes the numeric value of the first character of the string column, and 
returns the result as a int column.






From: Bryan Jeffrey 
Sent: Friday, June 30, 2017 6:57 AM
To: d...@spark.org; user@spark.apache.org; paleyl
Subject: Re: about broadcast join of base table in spark sql

Hello.

If you want to allow broadcast join with larger broadcasts you can set 
spark.sql.autoBroadcastJoinThreshold to a higher value. This will cause the 
plan to allow join despite 'A' being larger than the default threshold.

Get Outlook for Android



From: paleyl
Sent: Wednesday, June 28, 10:42 PM
Subject: about broadcast join of base table in spark sql
To: d...@spark.org, user@spark.apache.org


Hi All,


Recently I meet a problem in broadcast join: I want to left join table A and B, 
A is the smaller one and the left table, so I wrote

A = A.join(B,A("key1") === B("key2"),"left")

but I found that A is not broadcast out, as the shuffle size is still very 
large.

I guess this is a designed mechanism in spark, so could anyone please tell me 
why it is designed like this? I am just very curious.


Best,


Paley





Re: about broadcast join of base table in spark sql

2017-06-30 Thread Bryan Jeffrey
Hello. 




If you want to allow broadcast join with larger broadcasts you can set 
spark.sql.autoBroadcastJoinThreshold to a higher value. This will cause the 
plan to allow join despite 'A' being larger than the default threshold. 




Get Outlook for Android







From: paleyl


Sent: Wednesday, June 28, 10:42 PM


Subject: about broadcast join of base table in spark sql


To: d...@spark.org, user@spark.apache.org






Hi All,






Recently I meet a problem in broadcast join: I want to left join table A and B, 
A is the smaller one and the left table, so I wrote 




A = A.join(B,A("key1") === B("key2"),"left")




but I found that A is not broadcast out, as the shuffle size is still very 
large.




I guess this is a designed mechanism in spark, so could anyone please tell me 
why it is designed like this? I am just very curious.






Best,






Paley 










Spark querying parquet data partitioned in S3

2017-06-30 Thread Francisco Blaya
We have got data stored in S3 partitioned by several columns. Let's say
following this hierarchy:
s3://bucket/data/column1=X/column2=Y/parquet-files

We run a Spark job in a EMR cluster (1 master,3 slaves) and realised the
following:

A) - When we declare the initial dataframe to be the whole dataset (val df
= sqlContext.read.parquet("s3://bucket/data/) then the driver splits the
job into several tasks (259) that are performed by the executors and we
believe the driver gets back the parquet metadata.

Question: The above takes about 25 minutes for our dataset, we believe it
should be a lazy query (as we are not performing any actions) however it
looks like something is happening, all the executors are reading from S3.
We have tried mergeData=false and setting the schema explicitly via
.schema(someSchema). Is there any way to speed this up?

B) - When we declare the initial dataframe to be scoped by the first column
(val df = sqlContext.read.parquet("s3://bucket/data/column1=X) then it
seems that all the work (getting the parquet metadata) is done by the
driver and there is no job submitted to Spark.

Question: Why does (A) send the work to executors but (B) does not?

The above is for EMR 5.5.0, Hadoop 2.7.3 and Spark 2.1.0.

-- 
hivehome.com 



Hive | London | Cambridge | Houston | Toronto
The information contained in or attached to this email is confidential and 
intended only for the use of the individual(s) to which it is addressed. It 
may contain information which is confidential and/or covered by legal 
professional or other privilege. The views expressed in this email are not 
necessarily the views of Centrica plc, and the company, its directors, 
officers or employees make no representation or accept any liability for 
their accuracy or completeness unless expressly stated to the contrary. 
Centrica Connected Home Limited (company no: 5782908), registered in 
England and Wales with its registered office at Millstream, Maidenhead 
Road, Windsor, Berkshire SL4 5GD.


Fwd: about broadcast join of base table in spark sql

2017-06-30 Thread paleyl
Hi All,

Recently I meet a problem in broadcast join: I want to left join table A
and B, A is the smaller one and the left table, so I wrote
A = A.join(B,A("key1") === B("key2"),"left")
but I found that A is not broadcast out, as the shuffle size is still very
large.
I guess this is a designed mechanism in spark, so could anyone please tell
me why it is designed like this? I am just very curious.

Best,

Paley