Re: pyspark.sql.DataFrame write error to Postgres DB

2017-04-20 Thread Cinyoung Hur
Ooops, sorry.
I meant Maria DB.



2017-04-21 12:51 GMT+09:00 Takeshi Yamamuro :

> Why you use a mysql jdbc driver?
>
> // maropu
>
> On Fri, Apr 21, 2017 at 11:54 AM, Cinyoung Hur 
> wrote:
>
>> Hi,
>>
>> I tried to insert Dataframe into Postgres DB.
>>
>> But, I don't know what causes this error.
>>
>>
>> properties = {
>> "user": "user",
>> "password": "pass",
>> "driver": "com.mysql.jdbc.Driver",
>> }
>> url = "jdbc:mysql://ip 
>> address/MYDB?useServerPrepStmts=false=true"
>> df.write.jdbc(url=url, table=table_name, mode='overwrite', 
>> properties=properties)
>>
>>
>>
>> The schema of Dataframe is this.
>>
>> root
>>  |-- fom_tp_cd: string (nullable = true)
>>  |-- agg: integer (nullable = true)
>>  |-- sex_tp_cd: integer (nullable = true)
>>  |-- dgsbjt_cd: string (nullable = true)
>>  |-- recu_cl_cd: integer (nullable = true)
>>  |-- sick_set: string (nullable = true)
>>  |-- gnl_nm_set: string (nullable = true)
>>  |-- count: long (nullable = false)
>>
>>
>>
>> Py4JJavaErrorTraceback (most recent call 
>> last) in ()> 1 
>> result1.filter(result1["gnl_nm_set"] == "").count()
>> /usr/local/linewalks/spark/spark/python/pyspark/sql/dataframe.pyc in 
>> count(self)297 2298 """--> 299 return 
>> int(self._jdf.count())300 301 @ignore_unicode_prefix
>> /usr/local/linewalks/spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py
>>  in __call__(self, *args)931 answer = 
>> self.gateway_client.send_command(command)932 return_value = 
>> get_return_value(--> 933 answer, self.gateway_client, 
>> self.target_id, self.name)934 935 for temp_arg in temp_args:
>> /usr/local/linewalks/spark/spark/python/pyspark/sql/utils.pyc in deco(*a, 
>> **kw) 61 def deco(*a, **kw): 62 try:---> 63 
>> return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e:   
>>   65 s = e.java_exception.toString()
>> /usr/local/linewalks/spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py
>>  in get_return_value(answer, gateway_client, target_id, name)310 
>> raise Py4JJavaError(311 "An error occurred 
>> while calling {0}{1}{2}.\n".--> 312 format(target_id, 
>> ".", name), value)313 else:314 raise 
>> Py4JError(
>> Py4JJavaError: An error occurred while calling o1331.count.
>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 
>> 178 in stage 324.0 failed 4 times, most recent failure: Lost task 178.3 in 
>> stage 324.0 (TID 14274, 7.linewalks.local): 
>> org.apache.spark.api.python.PythonException: Traceback (most recent call 
>> last):
>>   File 
>> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>>  line 172, in main
>> process()
>>   File 
>> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>>  line 167, in process
>> serializer.dump_stream(func(split_index, iterator), outfile)
>>   File 
>> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>>  line 106, in 
>> func = lambda _, it: map(mapper, it)
>>   File 
>> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>>  line 92, in 
>> mapper = lambda a: udf(*a)
>>   File 
>> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>>  line 70, in 
>> return lambda *a: f(*a)
>>   File "", line 3, in 
>> TypeError: sequence item 0: expected string, NoneType found
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: Azure Event Hub with Pyspark

2017-04-20 Thread Denny Lee
As well, perhaps another option could be to use the Spark Connector to
DocumentDB (https://github.com/Azure/azure-documentdb-spark) if sticking
with Scala?
On Thu, Apr 20, 2017 at 21:46 Nan Zhu  wrote:

> DocDB does have a java client? Anything prevent you using that?
>
> Get Outlook for iOS 
> --
> *From:* ayan guha 
> *Sent:* Thursday, April 20, 2017 9:24:03 PM
> *To:* Ashish Singh
> *Cc:* user
> *Subject:* Re: Azure Event Hub with Pyspark
>
> Hi
>
> yes, its only scala. I am looking for a pyspark version, as i want to
> write to documentDB which has good python integration.
>
> Thanks in advance
>
> best
> Ayan
>
> On Fri, Apr 21, 2017 at 2:02 PM, Ashish Singh 
> wrote:
>
>> Hi ,
>>
>> You can try https://github.com/hdinsight/spark-eventhubs : which is
>> eventhub receiver for spark streaming
>> We are using it but you have scala version only i guess
>>
>>
>> Thanks,
>> Ashish Singh
>>
>> On Fri, Apr 21, 2017 at 9:19 AM, ayan guha  wrote:
>>
>>> [image: Boxbe]  This message is
>>> eligible for Automatic Cleanup! (guha.a...@gmail.com) Add cleanup rule
>>> 
>>> | More info
>>> 
>>>
>>> Hi
>>>
>>> I am not able to find any conector to be used to connect spark streaming
>>> with Azure Event Hub, using pyspark.
>>>
>>> Does anyone know if there is such library/package exists>?
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Azure Event Hub with Pyspark

2017-04-20 Thread Nan Zhu
DocDB does have a java client? Anything prevent you using that?

Get Outlook for iOS

From: ayan guha 
Sent: Thursday, April 20, 2017 9:24:03 PM
To: Ashish Singh
Cc: user
Subject: Re: Azure Event Hub with Pyspark

Hi

yes, its only scala. I am looking for a pyspark version, as i want to write to 
documentDB which has good python integration.

Thanks in advance

best
Ayan

On Fri, Apr 21, 2017 at 2:02 PM, Ashish Singh 
> wrote:
Hi ,

You can try https://github.com/hdinsight/spark-eventhubs : which is eventhub 
receiver for spark streaming
We are using it but you have scala version only i guess


Thanks,
Ashish Singh

On Fri, Apr 21, 2017 at 9:19 AM, ayan guha 
> wrote:
[Boxbe] 
[http://www.boxbe.com/stfopen?tc_serial=29917919067_rand=1256538785_source=stf_medium=email_campaign=ANNO_CLEANUP_ADD_content=001]
  This message is eligible for Automatic Cleanup! 
(guha.a...@gmail.com) Add cleanup 
rule
 | More 
info

Hi

I am not able to find any conector to be used to connect spark streaming with 
Azure Event Hub, using pyspark.

Does anyone know if there is such library/package exists>?

--
Best Regards,
Ayan Guha





--
Best Regards,
Ayan Guha


Re: Azure Event Hub with Pyspark

2017-04-20 Thread ayan guha
Hi

yes, its only scala. I am looking for a pyspark version, as i want to write
to documentDB which has good python integration.

Thanks in advance

best
Ayan

On Fri, Apr 21, 2017 at 2:02 PM, Ashish Singh  wrote:

> Hi ,
>
> You can try https://github.com/hdinsight/spark-eventhubs : which is
> eventhub receiver for spark streaming
> We are using it but you have scala version only i guess
>
>
> Thanks,
> Ashish Singh
>
> On Fri, Apr 21, 2017 at 9:19 AM, ayan guha  wrote:
>
>> [image: Boxbe]  This message is eligible
>> for Automatic Cleanup! (guha.a...@gmail.com) Add cleanup rule
>> 
>> | More info
>> 
>>
>> Hi
>>
>> I am not able to find any conector to be used to connect spark streaming
>> with Azure Event Hub, using pyspark.
>>
>> Does anyone know if there is such library/package exists>?
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>>
>


-- 
Best Regards,
Ayan Guha


Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Georg Heiler
Unfortunately I think this currently might require the old api.
Hemanth Gudela  schrieb am Fr. 21. Apr. 2017 um
05:58:

> Idea #2 probably suits my needs better, because
>
> -  Streaming query does not have a source database connector yet
>
> -  My source database table is big, so in-memory table could be
> huge for driver to handle.
>
>
>
> Thanks for cool ideas, TD!
>
>
>
> Regards,
>
> Hemanth
>
>
>
> *From: *Tathagata Das 
> *Date: *Friday, 21 April 2017 at 0.03
> *To: *Hemanth Gudela 
> *Cc: *Georg Heiler , "user@spark.apache.org" <
> user@spark.apache.org>
>
>
> *Subject: *Re: Spark structured streaming: Is it possible to periodically
> refresh static data frame?
>
>
>
> Here are couple of ideas.
>
> 1. You can set up a Structured Streaming query to update in-memory table.
>
> Look at the memory sink in the programming guide -
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
>
> So you can query the latest table using a specified table name, and also
> join that table with another stream. However, note that this in-memory
> table is maintained in the driver, and so you have be careful about the
> size of the table.
>
>
>
> 2. If you cannot define a streaming query in the slow moving due to
> unavailability of connector for your streaming data source, then you can
> always define a batch Dataframe and register it as view, and then run a
> background then periodically creates a new Dataframe with updated data and
> re-registers it as a view with the same name. Any streaming query that
> joins a streaming dataframe with the view will automatically start using
> the most updated data as soon as the view is updated.
>
>
>
> Hope this helps.
>
>
>
>
>
> On Thu, Apr 20, 2017 at 1:30 PM, Hemanth Gudela <
> hemanth.gud...@qvantel.com> wrote:
>
> Thanks Georg for your reply.
>
> But I’m not sure if I fully understood your answer.
>
>
>
> If you meant to join two streams (one reading Kafka, and another reading
> database table), then I think it’s not possible, because
>
> 1.   According to documentation
> ,
> Structured streaming does not support database as a streaming source
>
> 2.   Joining between two streams is not possible yet.
>
>
>
> Regards,
>
> Hemanth
>
>
>
> *From: *Georg Heiler 
> *Date: *Thursday, 20 April 2017 at 23.11
> *To: *Hemanth Gudela , "user@spark.apache.org"
> 
> *Subject: *Re: Spark structured streaming: Is it possible to periodically
> refresh static data frame?
>
>
>
> What about treating the static data as a (slow) stream as well?
>
>
>
> Hemanth Gudela  schrieb am Do., 20. Apr. 2017
> um 22:09 Uhr:
>
> Hello,
>
>
>
> I am working on a use case where there is a need to join streaming data
> frame with a static data frame.
>
> The streaming data frame continuously gets data from Kafka topics, whereas
> static data frame fetches data from a database table.
>
>
>
> However, as the underlying database table is getting updated often, I must
> somehow manage to refresh my static data frame periodically to get the
> latest information from underlying database table.
>
>
>
> My questions:
>
> 1.   Is it possible to periodically refresh static data frame?
>
> 2.   If refreshing static data frame is not possible, is there a
> mechanism to automatically stop & restarting spark structured streaming
> job, so that every time the job restarts, the static data frame gets
> updated with latest information from underlying database table.
>
> 3.   If 1) and 2) are not possible, please suggest alternatives to
> achieve my requirement described above.
>
>
>
> Thanks,
>
> Hemanth
>
>
>


Re: Azure Event Hub with Pyspark

2017-04-20 Thread Ashish Singh
Hi ,

You can try https://github.com/hdinsight/spark-eventhubs : which is
eventhub receiver for spark streaming
We are using it but you have scala version only i guess


Thanks,
Ashish Singh

On Fri, Apr 21, 2017 at 9:19 AM, ayan guha  wrote:

> [image: Boxbe]  This message is eligible
> for Automatic Cleanup! (guha.a...@gmail.com) Add cleanup rule
> 
> | More info
> 
>
> Hi
>
> I am not able to find any conector to be used to connect spark streaming
> with Azure Event Hub, using pyspark.
>
> Does anyone know if there is such library/package exists>?
>
> --
> Best Regards,
> Ayan Guha
>
>


Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Hemanth Gudela
Idea #2 probably suits my needs better, because

-  Streaming query does not have a source database connector yet

-  My source database table is big, so in-memory table could be huge 
for driver to handle.

Thanks for cool ideas, TD!

Regards,
Hemanth

From: Tathagata Das 
Date: Friday, 21 April 2017 at 0.03
To: Hemanth Gudela 
Cc: Georg Heiler , "user@spark.apache.org" 

Subject: Re: Spark structured streaming: Is it possible to periodically refresh 
static data frame?

Here are couple of ideas.
1. You can set up a Structured Streaming query to update in-memory table.
Look at the memory sink in the programming guide - 
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
So you can query the latest table using a specified table name, and also join 
that table with another stream. However, note that this in-memory table is 
maintained in the driver, and so you have be careful about the size of the 
table.

2. If you cannot define a streaming query in the slow moving due to 
unavailability of connector for your streaming data source, then you can always 
define a batch Dataframe and register it as view, and then run a background 
then periodically creates a new Dataframe with updated data and re-registers it 
as a view with the same name. Any streaming query that joins a streaming 
dataframe with the view will automatically start using the most updated data as 
soon as the view is updated.

Hope this helps.


On Thu, Apr 20, 2017 at 1:30 PM, Hemanth Gudela 
> wrote:
Thanks Georg for your reply.
But I’m not sure if I fully understood your answer.

If you meant to join two streams (one reading Kafka, and another reading 
database table), then I think it’s not possible, because

1.   According to 
documentation,
 Structured streaming does not support database as a streaming source

2.   Joining between two streams is not possible yet.

Regards,
Hemanth

From: Georg Heiler >
Date: Thursday, 20 April 2017 at 23.11
To: Hemanth Gudela 
>, 
"user@spark.apache.org" 
>
Subject: Re: Spark structured streaming: Is it possible to periodically refresh 
static data frame?

What about treating the static data as a (slow) stream as well?

Hemanth Gudela > 
schrieb am Do., 20. Apr. 2017 um 22:09 Uhr:
Hello,

I am working on a use case where there is a need to join streaming data frame 
with a static data frame.
The streaming data frame continuously gets data from Kafka topics, whereas 
static data frame fetches data from a database table.

However, as the underlying database table is getting updated often, I must 
somehow manage to refresh my static data frame periodically to get the latest 
information from underlying database table.

My questions:

1.   Is it possible to periodically refresh static data frame?

2.   If refreshing static data frame is not possible, is there a mechanism 
to automatically stop & restarting spark structured streaming job, so that 
every time the job restarts, the static data frame gets updated with latest 
information from underlying database table.

3.   If 1) and 2) are not possible, please suggest alternatives to achieve 
my requirement described above.

Thanks,
Hemanth



Re: pyspark.sql.DataFrame write error to Postgres DB

2017-04-20 Thread Takeshi Yamamuro
Why you use a mysql jdbc driver?

// maropu

On Fri, Apr 21, 2017 at 11:54 AM, Cinyoung Hur 
wrote:

> Hi,
>
> I tried to insert Dataframe into Postgres DB.
>
> But, I don't know what causes this error.
>
>
> properties = {
> "user": "user",
> "password": "pass",
> "driver": "com.mysql.jdbc.Driver",
> }
> url = "jdbc:mysql://ip 
> address/MYDB?useServerPrepStmts=false=true"
> df.write.jdbc(url=url, table=table_name, mode='overwrite', 
> properties=properties)
>
>
>
> The schema of Dataframe is this.
>
> root
>  |-- fom_tp_cd: string (nullable = true)
>  |-- agg: integer (nullable = true)
>  |-- sex_tp_cd: integer (nullable = true)
>  |-- dgsbjt_cd: string (nullable = true)
>  |-- recu_cl_cd: integer (nullable = true)
>  |-- sick_set: string (nullable = true)
>  |-- gnl_nm_set: string (nullable = true)
>  |-- count: long (nullable = false)
>
>
>
> Py4JJavaErrorTraceback (most recent call last) 
> in ()> 1 result1.filter(result1["gnl_nm_set"] == "").count()
> /usr/local/linewalks/spark/spark/python/pyspark/sql/dataframe.pyc in 
> count(self)297 2298 """--> 299 return 
> int(self._jdf.count())300 301 @ignore_unicode_prefix
> /usr/local/linewalks/spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py
>  in __call__(self, *args)931 answer = 
> self.gateway_client.send_command(command)932 return_value = 
> get_return_value(--> 933 answer, self.gateway_client, 
> self.target_id, self.name)934 935 for temp_arg in temp_args:
> /usr/local/linewalks/spark/spark/python/pyspark/sql/utils.pyc in deco(*a, 
> **kw) 61 def deco(*a, **kw): 62 try:---> 63 
> return f(*a, **kw) 64 except py4j.protocol.Py4JJavaError as e:
>  65 s = e.java_exception.toString()
> /usr/local/linewalks/spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py
>  in get_return_value(answer, gateway_client, target_id, name)310  
>raise Py4JJavaError(311 "An error occurred 
> while calling {0}{1}{2}.\n".--> 312 format(target_id, 
> ".", name), value)313 else:314 raise 
> Py4JError(
> Py4JJavaError: An error occurred while calling o1331.count.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 178 
> in stage 324.0 failed 4 times, most recent failure: Lost task 178.3 in stage 
> 324.0 (TID 14274, 7.linewalks.local): 
> org.apache.spark.api.python.PythonException: Traceback (most recent call 
> last):
>   File 
> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>  line 172, in main
> process()
>   File 
> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>  line 167, in process
> serializer.dump_stream(func(split_index, iterator), outfile)
>   File 
> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>  line 106, in 
> func = lambda _, it: map(mapper, it)
>   File 
> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>  line 92, in 
> mapper = lambda a: udf(*a)
>   File 
> "/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
>  line 70, in 
> return lambda *a: f(*a)
>   File "", line 3, in 
> TypeError: sequence item 0: expected string, NoneType found
>
>


-- 
---
Takeshi Yamamuro


Azure Event Hub with Pyspark

2017-04-20 Thread ayan guha
Hi

I am not able to find any conector to be used to connect spark streaming
with Azure Event Hub, using pyspark.

Does anyone know if there is such library/package exists>?

-- 
Best Regards,
Ayan Guha


pyspark.sql.DataFrame write error to Postgres DB

2017-04-20 Thread Cinyoung Hur
Hi,

I tried to insert Dataframe into Postgres DB.

But, I don't know what causes this error.


properties = {
"user": "user",
"password": "pass",
"driver": "com.mysql.jdbc.Driver",
}
url = "jdbc:mysql://ip
address/MYDB?useServerPrepStmts=false=true"
df.write.jdbc(url=url, table=table_name, mode='overwrite',
properties=properties)



The schema of Dataframe is this.

root
 |-- fom_tp_cd: string (nullable = true)
 |-- agg: integer (nullable = true)
 |-- sex_tp_cd: integer (nullable = true)
 |-- dgsbjt_cd: string (nullable = true)
 |-- recu_cl_cd: integer (nullable = true)
 |-- sick_set: string (nullable = true)
 |-- gnl_nm_set: string (nullable = true)
 |-- count: long (nullable = false)



Py4JJavaErrorTraceback (most recent call
last) in ()> 1
result1.filter(result1["gnl_nm_set"] == "").count()
/usr/local/linewalks/spark/spark/python/pyspark/sql/dataframe.pyc in
count(self)297 2298 """--> 299 return
int(self._jdf.count())300 301 @ignore_unicode_prefix
/usr/local/linewalks/spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py
in __call__(self, *args)931 answer =
self.gateway_client.send_command(command)932 return_value
= get_return_value(--> 933 answer, self.gateway_client,
self.target_id, self.name)934 935 for temp_arg in
temp_args:
/usr/local/linewalks/spark/spark/python/pyspark/sql/utils.pyc in
deco(*a, **kw) 61 def deco(*a, **kw): 62 try:--->
63 return f(*a, **kw) 64 except
py4j.protocol.Py4JJavaError as e: 65 s =
e.java_exception.toString()
/usr/local/linewalks/spark/spark/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py
in get_return_value(answer, gateway_client, target_id, name)310
 raise Py4JJavaError(311 "An error
occurred while calling {0}{1}{2}.\n".--> 312
format(target_id, ".", name), value)313 else:314
  raise Py4JError(
Py4JJavaError: An error occurred while calling o1331.count.
: org.apache.spark.SparkException: Job aborted due to stage failure:
Task 178 in stage 324.0 failed 4 times, most recent failure: Lost task
178.3 in stage 324.0 (TID 14274, 7.linewalks.local):
org.apache.spark.api.python.PythonException: Traceback (most recent
call last):
  File 
"/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
line 172, in main
process()
  File 
"/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
line 167, in process
serializer.dump_stream(func(split_index, iterator), outfile)
  File 
"/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
line 106, in 
func = lambda _, it: map(mapper, it)
  File 
"/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
line 92, in 
mapper = lambda a: udf(*a)
  File 
"/home/hadoop/hdtmp/nm-local-dir/usercache/hadoop/appcache/application_1491889279272_0040/container_1491889279272_0040_01_03/pyspark.zip/pyspark/worker.py",
line 70, in 
return lambda *a: f(*a)
  File "", line 3, in 
TypeError: sequence item 0: expected string, NoneType found


Re: 答复: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-20 Thread Ryan
Hi mo,

I don't think it needs shuffle cause the bloom filter only depends on data
within each row group, not the whole data. But the HAR solution seems nice.
I've thought of combining small files together and store the offsets.. not
aware of hdfs has provided such functionality. And after some searching I
find sequence file might be a comparator of har you may interested with.

Thanks for all people involved. I've learnt a lot too :-)


On Thu, Apr 20, 2017 at 5:25 PM, 莫涛  wrote:

> Hi Ryan,
>
>
> The attachment is the event timeline on executors. They are always busy
> computing.
>
> More executors are helpful but that's not my job as a developer.
>
>
> 1. The bad performance could be caused by my poor implementation, as "checkID"
> would not pushdown as a user defined function.
>
> 2. To make the group index works, I need to sort the data by id, which
> leads to shuffle of 50T data. That's somehow crazy.
>
>
> I'm on the way testing HAR, but the discussion brings me lots of insight
> about ORC.
>
> Thanks for your help!
>
>
> --
> *发件人:* Ryan 
> *发送时间:* 2017年4月17日 16:48:47
> *收件人:* 莫涛
> *抄送:* user
> *主题:* Re: 答复: 答复: How to store 10M records in HDFS to speed up further
> filtering?
>
> how about the event timeline on executors? It seems add more executor
> could help.
>
> 1. I found a jira(https://issues.apache.org/jira/browse/SPARK-11621) that
> states the ppd should work. And I think "only for matched ones the binary
> data is read" is true if proper index is configured. The row group wouldn't
> be read if the predicate isn't satisfied due to index.
>
> 2. It is absolutely true the performance gain depends on the id
> distribution...
>
> On Mon, Apr 17, 2017 at 4:23 PM, 莫涛  wrote:
>
>> Hi Ryan,
>>
>>
>> The attachment is a screen shot for the spark job and this is the only
>> stage for this job.
>>
>> I've changed the partition size to 1GB by "--conf
>> spark.sql.files.maxPartitionBytes=1073741824 <010%207374%201824>".
>>
>>
>> 1. spark-orc seems not that smart. The input size is almost the whole
>> data. I guess "only for matched ones the binary data is read" is not
>> true as orc does not know the offset of each BINARY so things like seek
>> could not happen
>>
>> 2. I've tried orc and it does skip the partition that has no hit. This
>> could be a solution but the performance depends on the distribution of the
>> given ID list. No partition could be skipped in the worst case.
>>
>>
>> Mo Tao
>>
>>
>>
>> --
>> *发件人:* Ryan 
>> *发送时间:* 2017年4月17日 15:42:46
>> *收件人:* 莫涛
>> *抄送:* user
>> *主题:* Re: 答复: How to store 10M records in HDFS to speed up further
>> filtering?
>>
>> 1. Per my understanding, for orc files, it should push down the filters,
>> which means all id columns will be scanned but only for matched ones the
>> binary data is read. I haven't dig into spark-orc reader though..
>>
>> 2. orc itself have row group index and bloom filter index. you may try
>> configurations like 'orc.bloom.filter.columns' on the source table first.
>> From the spark side, with mapPartitions, it's possible to build sort of
>> index for each partition.
>>
>> And could you check how many tasks does the filter stage have? maybe
>> there's too few partitions..
>>
>> On Mon, Apr 17, 2017 at 3:01 PM, 莫涛  wrote:
>>
>>> Hi Ryan,
>>>
>>>
>>> 1. "expected qps and response time for the filter request"
>>>
>>> I expect that only the requested BINARY are scanned instead of all
>>> records, so the response time would be "10K * 5MB / disk read speed",
>>> or several times of this.
>>>
>>> In practice, our cluster has 30 SAS disks and scanning all the 10M *
>>> 5MB data takes about 6 hours now. It should becomes several minutes as
>>> expected.
>>>
>>>
>>> 2. "build a search tree using ids within each partition to act like an
>>> index, or create a bloom filter to see if current partition would have any
>>> hit"
>>>
>>> Sounds like the thing I'm looking for!
>>>
>>> Could you kindly provide some links for reference? I found nothing in
>>> spark document about index or bloom filter working inside partition.
>>>
>>>
>>> Thanks very much!
>>>
>>>
>>> Mo Tao
>>>
>>> --
>>> *发件人:* Ryan 
>>> *发送时间:* 2017年4月17日 14:32:00
>>> *收件人:* 莫涛
>>> *抄送:* user
>>> *主题:* Re: How to store 10M records in HDFS to speed up further
>>> filtering?
>>>
>>> you can build a search tree using ids within each partition to act like
>>> an index, or create a bloom filter to see if current partition would have
>>> any hit.
>>>
>>> What's your expected qps and response time for the filter request?
>>>
>>>
>>> On Mon, Apr 17, 2017 at 2:23 PM, MoTao  wrote:
>>>
 Hi all,

 I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
 average.
 In my daily application, I need to filter out 10K BINARY 

Please participate in a research survey on graphs

2017-04-20 Thread Siddhartha Sahu
Hi,

My name is Siddhartha Sahu and I am a Master's student at University of
Waterloo working on graph processing with Prof. Semih Salihoglu. As part of
my research, I am running a survey on how graphs are used in the industry
and academia.

If you work on any kind of graph technology, such as GraphX, please
participate in the survey:

*Survey link*:
https://dsg-uwaterloo.typeform.com/to/EBihxG

The survey will take about *5-10 minutes* to finish. Most of the questions
are *multiple-choice questions*, and you can *skip any number of questions*.

I would really appreciate if you filled out my survey :)
Also, kindly forward the survey to any one else you know who also works on
graph processing.

Our goal is to help researchers who work on graph processing understand the
types of data, computations, and software that are used in practice. We
hope the information we can gather from participation in this survey can
guide the research people like ourselves do in universities. We plan to
share the information we gather from the survey with the wider research
community through a publication.

This survey has been reviewed and received ethics clearance through a
University of Waterloo Research Ethics Committee. You will see that the
first page of the survey contains a long consent page as required by the
university.

Thank you very much in advance for your support.

Regards,
Siddhartha Sahu
s3s...@uwaterloo.ca


Re: Long Shuffle Read Blocked Time

2017-04-20 Thread Pradeep Gollakota
Hi All,

It appears that the bottleneck in my job was the EBS volumes. Very high i/o
wait times across the cluster. I was only using 1 volume. Increasing to 4
made it faster.

Thanks,
Pradeep

On Thu, Apr 20, 2017 at 3:12 PM, Pradeep Gollakota 
wrote:

> Hi All,
>
> I have a simple ETL job that reads some data, shuffles it and writes it
> back out. This is running on AWS EMR 5.4.0 using Spark 2.1.0.
>
> After Stage 0 completes and the job starts Stage 1, I see a huge slowdown
> in the job. The CPU usage is low on the cluster, as is the network I/O.
> From the Spark Stats, I see large values for the Shuffle Read Blocked Time.
> As an example, one of my tasks completed in 18 minutes, but spent 15
> minutes waiting for remote reads.
>
> I'm not sure why the shuffle is so slow. Are there things I can do to
> increase the performance of the shuffle?
>
> Thanks,
> Pradeep
>


Long Shuffle Read Blocked Time

2017-04-20 Thread Pradeep Gollakota
Hi All,

I have a simple ETL job that reads some data, shuffles it and writes it
back out. This is running on AWS EMR 5.4.0 using Spark 2.1.0.

After Stage 0 completes and the job starts Stage 1, I see a huge slowdown
in the job. The CPU usage is low on the cluster, as is the network I/O.
>From the Spark Stats, I see large values for the Shuffle Read Blocked Time.
As an example, one of my tasks completed in 18 minutes, but spent 15
minutes waiting for remote reads.

I'm not sure why the shuffle is so slow. Are there things I can do to
increase the performance of the shuffle?

Thanks,
Pradeep


Re: Concurrent DataFrame.saveAsTable into non-existant tables fails the second job despite Mode.APPEND

2017-04-20 Thread Subhash Sriram
Would it be an option to just write the results of each job into separate 
tables and then run a UNION on all of them at the end into a final target 
table? Just thinking of an alternative!

Thanks,
Subhash

Sent from my iPhone

> On Apr 20, 2017, at 3:48 AM, Rick Moritz  wrote:
> 
> Hi List,
> 
> I'm wondering if the following behaviour should be considered a bug, or 
> whether it "works as designed":
> 
> I'm starting multiple concurrent (FIFO-scheduled) jobs in a single 
> SparkContext, some of which write into the same tables.
> When these tables already exist, it appears as though both jobs [at least 
> believe that they] successfully appended to the table (i.e., both jobs 
> terminate succesfully, but I haven't checked whether the data from both jobs 
> was actually written, or if one job overwrote the other's data, despite 
> Mode.APPEND). If the table does not exist, both jobs will attempt to create 
> the table, but whichever job's turn is second (or  later) will then fail with 
> a AlreadyExistsException (org.apache.spark.sql.AnalysisException: 
> org.apache.hadoop.hive.ql.metadata.HiveException: AlreadyExistsException).
> 
> I think the issue here is, that both jobs don't register the table with the 
> metastore, until they actually start writing to it, but determine early on 
> that they will need to create it. The slower job then oobviously fails 
> creating the table, and instead of falling back to appending the data to the 
> existing table crashes out.
> 
> I would consider this a bit of a bug, but I'd like to make sure that it isn't 
> merely a case of me doing something stupid elsewhere, or indeed simply an 
> inherent architectural limitation of working with the metastore, before going 
> to Jira with this.
> 
> Also, I'm aware that running the jobs strictly sequentially would work around 
> the issue, but that would require reordering jobs before sending them off to 
> Spark, or kill efficiency.
> 
> Thanks for any feedback,
> 
> Rick

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Tathagata Das
Here are couple of ideas.
1. You can set up a Structured Streaming query to update in-memory table.
Look at the memory sink in the programming guide - http://spark.apache.org/
docs/latest/structured-streaming-programming-guide.html#output-sinks
So you can query the latest table using a specified table name, and also
join that table with another stream. However, note that this in-memory
table is maintained in the driver, and so you have be careful about the
size of the table.

2. If you cannot define a streaming query in the slow moving due to
unavailability of connector for your streaming data source, then you can
always define a batch Dataframe and register it as view, and then run a
background then periodically creates a new Dataframe with updated data and
re-registers it as a view with the same name. Any streaming query that
joins a streaming dataframe with the view will automatically start using
the most updated data as soon as the view is updated.

Hope this helps.


On Thu, Apr 20, 2017 at 1:30 PM, Hemanth Gudela 
wrote:

> Thanks Georg for your reply.
>
> But I’m not sure if I fully understood your answer.
>
>
>
> If you meant to join two streams (one reading Kafka, and another reading
> database table), then I think it’s not possible, because
>
> 1.   According to documentation
> ,
> Structured streaming does not support database as a streaming source
>
> 2.   Joining between two streams is not possible yet.
>
>
>
> Regards,
>
> Hemanth
>
>
>
> *From: *Georg Heiler 
> *Date: *Thursday, 20 April 2017 at 23.11
> *To: *Hemanth Gudela , "user@spark.apache.org"
> 
> *Subject: *Re: Spark structured streaming: Is it possible to periodically
> refresh static data frame?
>
>
>
> What about treating the static data as a (slow) stream as well?
>
>
>
> Hemanth Gudela  schrieb am Do., 20. Apr. 2017
> um 22:09 Uhr:
>
> Hello,
>
>
>
> I am working on a use case where there is a need to join streaming data
> frame with a static data frame.
>
> The streaming data frame continuously gets data from Kafka topics, whereas
> static data frame fetches data from a database table.
>
>
>
> However, as the underlying database table is getting updated often, I must
> somehow manage to refresh my static data frame periodically to get the
> latest information from underlying database table.
>
>
>
> My questions:
>
> 1.   Is it possible to periodically refresh static data frame?
>
> 2.   If refreshing static data frame is not possible, is there a
> mechanism to automatically stop & restarting spark structured streaming
> job, so that every time the job restarts, the static data frame gets
> updated with latest information from underlying database table.
>
> 3.   If 1) and 2) are not possible, please suggest alternatives to
> achieve my requirement described above.
>
>
>
> Thanks,
>
> Hemanth
>
>


Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Hemanth Gudela
Thanks Georg for your reply.
But I’m not sure if I fully understood your answer.

If you meant to join two streams (one reading Kafka, and another reading 
database table), then I think it’s not possible, because

1.   According to 
documentation,
 Structured streaming does not support database as a streaming source

2.   Joining between two streams is not possible yet.

Regards,
Hemanth

From: Georg Heiler 
Date: Thursday, 20 April 2017 at 23.11
To: Hemanth Gudela , "user@spark.apache.org" 

Subject: Re: Spark structured streaming: Is it possible to periodically refresh 
static data frame?

What about treating the static data as a (slow) stream as well?

Hemanth Gudela > 
schrieb am Do., 20. Apr. 2017 um 22:09 Uhr:
Hello,

I am working on a use case where there is a need to join streaming data frame 
with a static data frame.
The streaming data frame continuously gets data from Kafka topics, whereas 
static data frame fetches data from a database table.

However, as the underlying database table is getting updated often, I must 
somehow manage to refresh my static data frame periodically to get the latest 
information from underlying database table.

My questions:

1.   Is it possible to periodically refresh static data frame?

2.   If refreshing static data frame is not possible, is there a mechanism 
to automatically stop & restarting spark structured streaming job, so that 
every time the job restarts, the static data frame gets updated with latest 
information from underlying database table.

3.   If 1) and 2) are not possible, please suggest alternatives to achieve 
my requirement described above.

Thanks,
Hemanth


Re: Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Georg Heiler
What about treating the static data as a (slow) stream as well?

Hemanth Gudela  schrieb am Do., 20. Apr. 2017
um 22:09 Uhr:

> Hello,
>
>
>
> I am working on a use case where there is a need to join streaming data
> frame with a static data frame.
>
> The streaming data frame continuously gets data from Kafka topics, whereas
> static data frame fetches data from a database table.
>
>
>
> However, as the underlying database table is getting updated often, I must
> somehow manage to refresh my static data frame periodically to get the
> latest information from underlying database table.
>
>
>
> My questions:
>
> 1.   Is it possible to periodically refresh static data frame?
>
> 2.   If refreshing static data frame is not possible, is there a
> mechanism to automatically stop & restarting spark structured streaming
> job, so that every time the job restarts, the static data frame gets
> updated with latest information from underlying database table.
>
> 3.   If 1) and 2) are not possible, please suggest alternatives to
> achieve my requirement described above.
>
>
>
> Thanks,
>
> Hemanth
>


Spark structured streaming: Is it possible to periodically refresh static data frame?

2017-04-20 Thread Hemanth Gudela
Hello,

I am working on a use case where there is a need to join streaming data frame 
with a static data frame.
The streaming data frame continuously gets data from Kafka topics, whereas 
static data frame fetches data from a database table.

However, as the underlying database table is getting updated often, I must 
somehow manage to refresh my static data frame periodically to get the latest 
information from underlying database table.

My questions:

1.   Is it possible to periodically refresh static data frame?

2.   If refreshing static data frame is not possible, is there a mechanism 
to automatically stop & restarting spark structured streaming job, so that 
every time the job restarts, the static data frame gets updated with latest 
information from underlying database table.

3.   If 1) and 2) are not possible, please suggest alternatives to achieve 
my requirement described above.

Thanks,
Hemanth


Re: Any Idea about this error : IllegalArgumentException: File segment length cannot be negative ?

2017-04-20 Thread Victor Tso-Guillen
Along with Priya's email slightly earlier than this one, we also are seeing
this happen on Spark 1.5.2.

On Wed, Jul 13, 2016 at 1:26 AM Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> In Spark Streaming job, I see a Batch failed with following error. Haven't
> seen anything like this earlier.
>
> This has happened during Shuffle for one Batch (haven't reoccurred after
> that).. Just curious to know what can cause this error. I am running Spark
> 1.5.1
>
> Regards,
> Dibyendu
>
>
> Job aborted due to stage failure: Task 2801 in stage 9421.0 failed 4 times, 
> most recent failure: Lost task 2801.3 in stage 9421.0: 
> java.lang.IllegalArgumentException: requirement failed: File segment length 
> cannot be negative (got -68321)
>   at scala.Predef$.require(Predef.scala:233)
>   at org.apache.spark.storage.FileSegment.(FileSegment.scala:28)
>   at 
> org.apache.spark.storage.DiskBlockObjectWriter.fileSegment(DiskBlockObjectWriter.scala:216)
>   at 
> org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:684)
>   at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:80)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:88)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
>
>


Maximum Partitioner size

2017-04-20 Thread Patrick GRANDJEAN
Hi,
I have implemented a custom Partitioner (org.apache.spark.Partitioner) that 
contains a medium-sized object (some megabytes). Unfortunately Spark (2.1.0) 
fails with a StackOverflowError, and I suspect it is because of the size of the 
partitioner that needs to be serialized. My question is, what is the maximum 
size of a Partitioner accepted by Spark?
Thanks!





Re: how to add new column using regular expression within pyspark dataframe

2017-04-20 Thread Pushkar.Gujar
Can be as  simple as -

from pyspark.sql.functions import split

flight.withColumn('hour',split(flight.duration,'h').getItem(0))


Thank you,
*Pushkar Gujar*


On Thu, Apr 20, 2017 at 4:35 AM, Zeming Yu  wrote:

> Any examples?
>
> On 20 Apr. 2017 3:44 pm, "颜发才(Yan Facai)"  wrote:
>
>> How about using `withColumn` and UDF?
>>
>> example:
>> + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
>> 
>> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
>>
>>
>>
>> On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu  wrote:
>>
>>> I've got a dataframe with a column looking like this:
>>>
>>> display(flight.select("duration").show())
>>>
>>> ++
>>> |duration|
>>> ++
>>> |  15h10m|
>>> |   17h0m|
>>> |  21h25m|
>>> |  14h30m|
>>> |  24h50m|
>>> |  26h10m|
>>> |  14h30m|
>>> |   23h5m|
>>> |  21h30m|
>>> |  11h50m|
>>> |  16h10m|
>>> |  15h15m|
>>> |  21h25m|
>>> |  14h25m|
>>> |  14h40m|
>>> |   16h0m|
>>> |  24h20m|
>>> |  14h30m|
>>> |  14h25m|
>>> |  14h30m|
>>> ++
>>> only showing top 20 rows
>>>
>>>
>>>
>>> I need to extract the hour as a number and store it as an additional
>>> column within the same dataframe. What's the best way to do that?
>>>
>>>
>>> I tried the following, but it failed:
>>>
>>> import re
>>> def getHours(x):
>>>   return re.match('([0-9]+(?=h))', x)
>>> temp = flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
>>> temp.select("duration").show()
>>>
>>>
>>> error message:
>>>
>>>
>>> ---Py4JJavaError
>>>  Traceback (most recent call 
>>> last) in ()  2 def getHours(x):  
>>> 3   return re.match('([0-9]+(?=h))', x)> 4 temp = 
>>> flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()  5 
>>> temp.select("duration").show()
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', 
>>> age=1)] 56 """---> 57 return 
>>> sparkSession.createDataFrame(self, schema, sampleRatio) 58  59 
>>> RDD.toDF = toDF
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in createDataFrame(self, data, schema, samplingRatio, verifySchema)518 
>>> 519 if isinstance(data, RDD):--> 520 rdd, schema = 
>>> self._createFromRDD(data.map(prepare), schema, samplingRatio)521
>>>  else:522 rdd, schema = self._createFromLocal(map(prepare, 
>>> data), schema)
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in _createFromRDD(self, rdd, schema, samplingRatio)358 """
>>> 359 if schema is None or isinstance(schema, (list, tuple)):--> 360  
>>>struct = self._inferSchema(rdd, samplingRatio)361
>>>  converter = _create_converter(struct)362 rdd = 
>>> rdd.map(converter)
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>>  in _inferSchema(self, rdd, samplingRatio)329 :return: 
>>> :class:`pyspark.sql.types.StructType`330 """--> 331 
>>> first = rdd.first()332 if not first:333 raise 
>>> ValueError("The first row in RDD is empty, "
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
>>>  in first(self)   1359 ValueError: RDD is empty   1360 
>>> """-> 1361 rs = self.take(1)   1362 if rs:   1363   
>>>   return rs[0]
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py
>>>  in take(self, num)   13411342 p = range(partsScanned, 
>>> min(partsScanned + numPartsToTry, totalParts))-> 1343 res = 
>>> self.context.runJob(self, takeUpToNumLeft, p)   13441345 
>>> items += res
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py
>>>  in runJob(self, rdd, partitionFunc, partitions, allowLocal)963 
>>> # SparkContext#runJob.964 mappedRDD = 
>>> rdd.mapPartitions(partitionFunc)--> 965 port = 
>>> self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
>>> 966 return list(_load_from_socket(port, 
>>> mappedRDD._jrdd_deserializer))967
>>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py
>>>  in __call__(self, *args)   1131 answer = 
>>> self.gateway_client.send_command(command)   1132 return_value = 
>>> get_return_value(-> 1133 answer, self.gateway_client, 
>>> self.target_id, self.name)   11341135 for temp_arg in temp_args:
>>> 

[sparkR] [MLlib] : Is word2vec implemented in SparkR MLlib ?

2017-04-20 Thread Radhwane Chebaane
Hi,

I've been experimenting with the Spark *Word2vec* implementation in the
MLLib package with Scala and it was very nice.
I need to use the same algorithm in R leveraging the power of spark
distribution with SparkR.
I have been looking on the mailing list and Stackoverflow for any
*Word2vec* use-case
in SparkR but no luck.

Is there any implementation of *Word2vec* in *SparkR* ? Is there any
current work to support this feature in MLlib with R?

Thanks!
Radhwane Chebaane

-- 

[image: photo] Radhwane Chebaane
Distributed systems engineer, Mindlytix

Mail: radhw...@mindlytix.com  
Mobile: +33 695 588 906 <+33+695+588+906>

Skype: rad.cheb  
LinkedIn 



答复: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-20 Thread 莫涛
Hi Ryan,


The attachment is the event timeline on executors. They are always busy 
computing.

More executors are helpful but that's not my job as a developer.


1. The bad performance could be caused by my poor implementation, as "checkID" 
would not pushdown as a user defined function.

2. To make the group index works, I need to sort the data by id, which leads to 
shuffle of 50T data. That's somehow crazy.


I'm on the way testing HAR, but the discussion brings me lots of insight about 
ORC.

Thanks for your help!



发件人: Ryan 
发送时间: 2017年4月17日 16:48:47
收件人: 莫涛
抄送: user
主题: Re: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

how about the event timeline on executors? It seems add more executor could 
help.

1. I found a jira(https://issues.apache.org/jira/browse/SPARK-11621) that 
states the ppd should work. And I think "only for matched ones the binary data 
is read" is true if proper index is configured. The row group wouldn't be read 
if the predicate isn't satisfied due to index.

2. It is absolutely true the performance gain depends on the id distribution...

On Mon, Apr 17, 2017 at 4:23 PM, 莫涛 
> wrote:

Hi Ryan,


The attachment is a screen shot for the spark job and this is the only stage 
for this job.

I've changed the partition size to 1GB by "--conf 
spark.sql.files.maxPartitionBytes=1073741824".


1. spark-orc seems not that smart. The input size is almost the whole data. I 
guess "only for matched ones the binary data is read" is not true as orc does 
not know the offset of each BINARY so things like seek could not happen

2. I've tried orc and it does skip the partition that has no hit. This could be 
a solution but the performance depends on the distribution of the given ID 
list. No partition could be skipped in the worst case.


Mo Tao




发件人: Ryan >
发送时间: 2017年4月17日 15:42:46
收件人: 莫涛
抄送: user
主题: Re: 答复: How to store 10M records in HDFS to speed up further filtering?

1. Per my understanding, for orc files, it should push down the filters, which 
means all id columns will be scanned but only for matched ones the binary data 
is read. I haven't dig into spark-orc reader though..

2. orc itself have row group index and bloom filter index. you may try 
configurations like 'orc.bloom.filter.columns' on the source table first. From 
the spark side, with mapPartitions, it's possible to build sort of index for 
each partition.

And could you check how many tasks does the filter stage have? maybe there's 
too few partitions..

On Mon, Apr 17, 2017 at 3:01 PM, 莫涛 
> wrote:

Hi Ryan,


1. "expected qps and response time for the filter request"

I expect that only the requested BINARY are scanned instead of all records, so 
the response time would be "10K * 5MB / disk read speed", or several times of 
this.

In practice, our cluster has 30 SAS disks and scanning all the 10M * 5MB data 
takes about 6 hours now. It should becomes several minutes as expected.


2. "build a search tree using ids within each partition to act like an index, 
or create a bloom filter to see if current partition would have any hit"

Sounds like the thing I'm looking for!

Could you kindly provide some links for reference? I found nothing in spark 
document about index or bloom filter working inside partition.


Thanks very much!


Mo Tao


发件人: Ryan >
发送时间: 2017年4月17日 14:32:00
收件人: 莫涛
抄送: user
主题: Re: How to store 10M records in HDFS to speed up further filtering?

you can build a search tree using ids within each partition to act like an 
index, or create a bloom filter to see if current partition would have any hit.

What's your expected qps and response time for the filter request?


On Mon, Apr 17, 2017 at 2:23 PM, MoTao 
> wrote:
Hi all,

I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
average.
In my daily application, I need to filter out 10K BINARY according to an ID
list.
How should I store the whole data to make the filtering faster?

I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
and column-based format (orc).
However, both of them require to scan almost ALL records, making the
filtering stage very very slow.
The code block for filtering looks like:

val IDSet: Set[String] = ...
val checkID = udf { ID: String => IDSet(ID) }
spark.read.orc("/path/to/whole/data")
  .filter(checkID($"ID"))
  .select($"ID", $"BINARY")
  .write...

Thanks for any advice!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
Sent from the Apache Spark User List 

答复: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-20 Thread 莫涛
It's hadoop archive.


https://hadoop.apache.org/docs/r1.2.1/hadoop_archives.html



发件人: Alonso Isidoro Roman 
发送时间: 2017年4月20日 17:03:33
收件人: 莫涛
抄送: Jörn Franke; user@spark.apache.org
主题: Re: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

forgive my ignorance, but, what does it mean HAR? a acronym to High available 
record?

Thanks


Alonso Isidoro Roman
about.me/alonso.isidoro.roman


2017-04-20 10:58 GMT+02:00 莫涛 >:

Hi Jörn,


HAR is a great idea!


For POC, I've archived 1M records and stored the id -> path mapping in text 
(for better readability).

Filtering 1K records takes only 2 minutes now (30 seconds to get the path list 
and 0.5 second per thread to read a record).

Such performance is exactly what I expected: "only the requested BINARY are 
scanned".

Moreover, HAR provides directly access to each record by hdfs shell command.


Thank you very much!


发件人: Jörn Franke >
发送时间: 2017年4月17日 22:37:48
收件人: 莫涛
抄送: user@spark.apache.org
主题: Re: 答复: How to store 10M records in HDFS to speed up further filtering?

Yes 5 mb is a difficult size, too small for HDFS too big for parquet/orc.
Maybe you can put the data in a HAR and store id, path in orc/parquet.

On 17. Apr 2017, at 10:52, 莫涛 > 
wrote:


Hi Jörn,


I do think a 5 MB column is odd but I don't have any other idea before asking 
this question. The binary data is a short video and the maximum size is no more 
than 50 MB.


Hadoop archive sounds very interesting and I'll try it first to check whether 
filtering is fast on it.


To my best knowledge, HBase works best for record around hundreds of KB and it 
requires extra work of the cluster administrator. So this would be the last 
option.


Thanks!


Mo Tao


发件人: Jörn Franke >
发送时间: 2017年4月17日 15:59:28
收件人: 莫涛
抄送: user@spark.apache.org
主题: Re: How to store 10M records in HDFS to speed up further filtering?

You need to sort the data by id otherwise q situation can occur where the index 
does not work. Aside from this, it sounds odd to put a 5 MB column using those 
formats. This will be also not so efficient.
What is in the 5 MB binary data?
You could use HAR or maybe Hbase to store this kind of data (if it does not get 
much larger than 5 MB).

> On 17. Apr 2017, at 08:23, MoTao 
> > wrote:
>
> Hi all,
>
> I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
> average.
> In my daily application, I need to filter out 10K BINARY according to an ID
> list.
> How should I store the whole data to make the filtering faster?
>
> I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
> and column-based format (orc).
> However, both of them require to scan almost ALL records, making the
> filtering stage very very slow.
> The code block for filtering looks like:
>
> val IDSet: Set[String] = ...
> val checkID = udf { ID: String => IDSet(ID) }
> spark.read.orc("/path/to/whole/data")
>  .filter(checkID($"ID"))
>  .select($"ID", $"BINARY")
>  .write...
>
> Thanks for any advice!
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
> Sent from the Apache Spark User List mailing list archive at 
> Nabble.com.
>
> -
> To unsubscribe e-mail: 
> user-unsubscr...@spark.apache.org
>



Re: 答复: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-20 Thread Alonso Isidoro Roman
forgive my ignorance, but, what does it mean HAR? a acronym to High
available record?

Thanks

Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman


2017-04-20 10:58 GMT+02:00 莫涛 :

> Hi Jörn,
>
>
> HAR is a great idea!
>
>
> For POC, I've archived 1M records and stored the id -> path mapping in
> text (for better readability).
>
> Filtering 1K records takes only 2 minutes now (30 seconds to get the path
> list and 0.5 second per thread to read a record).
>
> Such performance is exactly what I expected: "only the requested BINARY
> are scanned".
>
> Moreover, HAR provides directly access to each record by hdfs shell
> command.
>
>
> Thank you very much!
> --
> *发件人:* Jörn Franke 
> *发送时间:* 2017年4月17日 22:37:48
> *收件人:* 莫涛
> *抄送:* user@spark.apache.org
> *主题:* Re: 答复: How to store 10M records in HDFS to speed up further
> filtering?
>
> Yes 5 mb is a difficult size, too small for HDFS too big for parquet/orc.
> Maybe you can put the data in a HAR and store id, path in orc/parquet.
>
> On 17. Apr 2017, at 10:52, 莫涛  wrote:
>
> Hi Jörn,
>
>
> I do think a 5 MB column is odd but I don't have any other idea before
> asking this question. The binary data is a short video and the maximum
> size is no more than 50 MB.
>
>
> Hadoop archive sounds very interesting and I'll try it first to check
> whether filtering is fast on it.
>
>
> To my best knowledge, HBase works best for record around hundreds of KB
> and it requires extra work of the cluster administrator. So this would be
> the last option.
>
>
> Thanks!
>
>
> Mo Tao
> --
> *发件人:* Jörn Franke 
> *发送时间:* 2017年4月17日 15:59:28
> *收件人:* 莫涛
> *抄送:* user@spark.apache.org
> *主题:* Re: How to store 10M records in HDFS to speed up further filtering?
>
> You need to sort the data by id otherwise q situation can occur where the
> index does not work. Aside from this, it sounds odd to put a 5 MB column
> using those formats. This will be also not so efficient.
> What is in the 5 MB binary data?
> You could use HAR or maybe Hbase to store this kind of data (if it does
> not get much larger than 5 MB).
>
> > On 17. Apr 2017, at 08:23, MoTao  wrote:
> >
> > Hi all,
> >
> > I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
> > average.
> > In my daily application, I need to filter out 10K BINARY according to an
> ID
> > list.
> > How should I store the whole data to make the filtering faster?
> >
> > I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
> > and column-based format (orc).
> > However, both of them require to scan almost ALL records, making the
> > filtering stage very very slow.
> > The code block for filtering looks like:
> >
> > val IDSet: Set[String] = ...
> > val checkID = udf { ID: String => IDSet(ID) }
> > spark.read.orc("/path/to/whole/data")
> >  .filter(checkID($"ID"))
> >  .select($"ID", $"BINARY")
> >  .write...
> >
> > Thanks for any advice!
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-
> speed-up-further-filtering-tp28605.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>
>


答复: 答复: How to store 10M records in HDFS to speed up further filtering?

2017-04-20 Thread 莫涛
Hi Jörn,


HAR is a great idea!


For POC, I've archived 1M records and stored the id -> path mapping in text 
(for better readability).

Filtering 1K records takes only 2 minutes now (30 seconds to get the path list 
and 0.5 second per thread to read a record).

Such performance is exactly what I expected: "only the requested BINARY are 
scanned".

Moreover, HAR provides directly access to each record by hdfs shell command.


Thank you very much!


发件人: Jörn Franke 
发送时间: 2017年4月17日 22:37:48
收件人: 莫涛
抄送: user@spark.apache.org
主题: Re: 答复: How to store 10M records in HDFS to speed up further filtering?

Yes 5 mb is a difficult size, too small for HDFS too big for parquet/orc.
Maybe you can put the data in a HAR and store id, path in orc/parquet.

On 17. Apr 2017, at 10:52, 莫涛 > 
wrote:


Hi Jörn,


I do think a 5 MB column is odd but I don't have any other idea before asking 
this question. The binary data is a short video and the maximum size is no more 
than 50 MB.


Hadoop archive sounds very interesting and I'll try it first to check whether 
filtering is fast on it.


To my best knowledge, HBase works best for record around hundreds of KB and it 
requires extra work of the cluster administrator. So this would be the last 
option.


Thanks!


Mo Tao


发件人: Jörn Franke >
发送时间: 2017年4月17日 15:59:28
收件人: 莫涛
抄送: user@spark.apache.org
主题: Re: How to store 10M records in HDFS to speed up further filtering?

You need to sort the data by id otherwise q situation can occur where the index 
does not work. Aside from this, it sounds odd to put a 5 MB column using those 
formats. This will be also not so efficient.
What is in the 5 MB binary data?
You could use HAR or maybe Hbase to store this kind of data (if it does not get 
much larger than 5 MB).

> On 17. Apr 2017, at 08:23, MoTao 
> > wrote:
>
> Hi all,
>
> I have 10M (ID, BINARY) record, and the size of each BINARY is 5MB on
> average.
> In my daily application, I need to filter out 10K BINARY according to an ID
> list.
> How should I store the whole data to make the filtering faster?
>
> I'm using DataFrame in Spark 2.0.0 and I've tried row-based format (avro)
> and column-based format (orc).
> However, both of them require to scan almost ALL records, making the
> filtering stage very very slow.
> The code block for filtering looks like:
>
> val IDSet: Set[String] = ...
> val checkID = udf { ID: String => IDSet(ID) }
> spark.read.orc("/path/to/whole/data")
>  .filter(checkID($"ID"))
>  .select($"ID", $"BINARY")
>  .write...
>
> Thanks for any advice!
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-store-10M-records-in-HDFS-to-speed-up-further-filtering-tp28605.html
> Sent from the Apache Spark User List mailing list archive at 
> Nabble.com.
>
> -
> To unsubscribe e-mail: 
> user-unsubscr...@spark.apache.org
>


Re: how to add new column using regular expression within pyspark dataframe

2017-04-20 Thread Zeming Yu
Any examples?

On 20 Apr. 2017 3:44 pm, "颜发才(Yan Facai)"  wrote:

> How about using `withColumn` and UDF?
>
> example:
> + https://gist.github.com/zoltanctoth/2deccd69e3d1cde1dd78
> 
> + https://ragrawal.wordpress.com/2015/10/02/spark-custom-udf-example/
>
>
>
> On Mon, Apr 17, 2017 at 8:25 PM, Zeming Yu  wrote:
>
>> I've got a dataframe with a column looking like this:
>>
>> display(flight.select("duration").show())
>>
>> ++
>> |duration|
>> ++
>> |  15h10m|
>> |   17h0m|
>> |  21h25m|
>> |  14h30m|
>> |  24h50m|
>> |  26h10m|
>> |  14h30m|
>> |   23h5m|
>> |  21h30m|
>> |  11h50m|
>> |  16h10m|
>> |  15h15m|
>> |  21h25m|
>> |  14h25m|
>> |  14h40m|
>> |   16h0m|
>> |  24h20m|
>> |  14h30m|
>> |  14h25m|
>> |  14h30m|
>> ++
>> only showing top 20 rows
>>
>>
>>
>> I need to extract the hour as a number and store it as an additional
>> column within the same dataframe. What's the best way to do that?
>>
>>
>> I tried the following, but it failed:
>>
>> import re
>> def getHours(x):
>>   return re.match('([0-9]+(?=h))', x)
>> temp = flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()
>> temp.select("duration").show()
>>
>>
>> error message:
>>
>>
>> ---Py4JJavaError
>>  Traceback (most recent call 
>> last) in ()  2 def getHours(x):   
>>3   return re.match('([0-9]+(?=h))', x)> 4 temp = 
>> flight.select("duration").rdd.map(lambda x:getHours(x[0])).toDF()  5 
>> temp.select("duration").show()
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>  in toDF(self, schema, sampleRatio) 55 [Row(name=u'Alice', 
>> age=1)] 56 """---> 57 return 
>> sparkSession.createDataFrame(self, schema, sampleRatio) 58  59 
>> RDD.toDF = toDF
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>  in createDataFrame(self, data, schema, samplingRatio, verifySchema)518  
>>519 if isinstance(data, RDD):--> 520 rdd, schema = 
>> self._createFromRDD(data.map(prepare), schema, samplingRatio)521 
>> else:522 rdd, schema = self._createFromLocal(map(prepare, 
>> data), schema)
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>  in _createFromRDD(self, rdd, schema, samplingRatio)358 """
>> 359 if schema is None or isinstance(schema, (list, tuple)):--> 360   
>>   struct = self._inferSchema(rdd, samplingRatio)361 
>> converter = _create_converter(struct)362 rdd = 
>> rdd.map(converter)
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\session.py
>>  in _inferSchema(self, rdd, samplingRatio)329 :return: 
>> :class:`pyspark.sql.types.StructType`330 """--> 331 
>> first = rdd.first()332 if not first:333 raise 
>> ValueError("The first row in RDD is empty, "
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py 
>> in first(self)   1359 ValueError: RDD is empty   1360 """-> 
>> 1361 rs = self.take(1)   1362 if rs:   1363 
>> return rs[0]
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\rdd.py 
>> in take(self, num)   13411342 p = range(partsScanned, 
>> min(partsScanned + numPartsToTry, totalParts))-> 1343 res = 
>> self.context.runJob(self, takeUpToNumLeft, p)   13441345 
>> items += res
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py
>>  in runJob(self, rdd, partitionFunc, partitions, allowLocal)963 
>> # SparkContext#runJob.964 mappedRDD = 
>> rdd.mapPartitions(partitionFunc)--> 965 port = 
>> self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
>> 966 return list(_load_from_socket(port, 
>> mappedRDD._jrdd_deserializer))967
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip\py4j\java_gateway.py
>>  in __call__(self, *args)   1131 answer = 
>> self.gateway_client.send_command(command)   1132 return_value = 
>> get_return_value(-> 1133 answer, self.gateway_client, 
>> self.target_id, self.name)   11341135 for temp_arg in temp_args:
>> C:\spark-2.1.0-bin-hadoop2.7\spark-2.1.0-bin-hadoop2.7\python\pyspark\sql\utils.py
>>  in deco(*a, **kw) 61 def deco(*a, **kw): 62 try:---> 63 
>> return f(*a, **kw) 64 except 
>> py4j.protocol.Py4JJavaError as e: 65 s = 
>> e.java_exception.toString()
>> 

Concurrent DataFrame.saveAsTable into non-existant tables fails the second job despite Mode.APPEND

2017-04-20 Thread Rick Moritz
Hi List,

I'm wondering if the following behaviour should be considered a bug, or
whether it "works as designed":

I'm starting multiple concurrent (FIFO-scheduled) jobs in a single
SparkContext, some of which write into the same tables.
When these tables already exist, it appears as though both jobs [at least
believe that they] successfully appended to the table (i.e., both jobs
terminate succesfully, but I haven't checked whether the data from both
jobs was actually written, or if one job overwrote the other's data,
despite Mode.APPEND). If the table does not exist, both jobs will attempt
to create the table, but whichever job's turn is second (or  later) will
then fail with a AlreadyExistsException
(org.apache.spark.sql.AnalysisException:
org.apache.hadoop.hive.ql.metadata.HiveException: AlreadyExistsException).

I think the issue here is, that both jobs don't register the table with the
metastore, until they actually start writing to it, but determine early on
that they will need to create it. The slower job then oobviously fails
creating the table, and instead of falling back to appending the data to
the existing table crashes out.

I would consider this a bit of a bug, but I'd like to make sure that it
isn't merely a case of me doing something stupid elsewhere, or indeed
simply an inherent architectural limitation of working with the metastore,
before going to Jira with this.

Also, I'm aware that running the jobs strictly sequentially would work
around the issue, but that would require reordering jobs before sending
them off to Spark, or kill efficiency.

Thanks for any feedback,

Rick


checkpoint on spark standalone

2017-04-20 Thread Vivek Mishra
Hi,
I am processing multiple 2 GB each csv files with my spark application. Which 
also does union and aggregation across all the input files. Currently stuck 
with given below error:
java.lang.StackOverflowError
at 
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression.dataType(interfaces.scala:81)
at 
org.apache.spark.sql.catalyst.expressions.Alias.toAttribute(namedExpressions.scala:149)
at 
org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$7.apply(basicOperators.scala:228)
at 
org.apache.spark.sql.catalyst.plans.logical.Aggregate$$anonfun$output$7.apply(basicOperators.scala:228)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.sql.catalyst.plans.logical.Aggregate.output(basicOperators.scala:228)
at 
org.apache.spark.sql.catalyst.planning.PhysicalOperation$$anonfun$unapply$1.apply(patterns.scala:38)
at 
org.apache.spark.sql.catalyst.planning.PhysicalOperation$$anonfun$unapply$1.apply(patterns.scala:38)
at scala.Option.getOrElse(Option.scala:120)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at 
org.apache.spark.sql.catalyst.plans.logical.Aggregate.output(basicOperators.scala:228)
at 
org.apache.spark.sql.catalyst.planning.PhysicalOperation$$anonfun$unapply$1.apply(patterns.scala:38)
at 
org.apache.spark.sql.catalyst.planning.PhysicalOperation$$anonfun$unapply$1.apply(patterns.scala:38)
at scala.Option.getOrElse(Option.scala:120)
at 
org.apache.spark.sql.catalyst.planning.PhysicalOperation$.unapply(patterns.scala:38)
at 
org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:44)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at 
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:336)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at 
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:334)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at 
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:327)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)



I understand that it could be due to RDD linage and tried to resolve it via 
checkpoint, but no luck.

Any help?

Sincerely,
-Vivek










NOTE: This message may contain information that is confidential, proprietary, 
privileged or otherwise protected by law. The message is intended solely for 
the named addressee. If received in error, please destroy and notify the 
sender. Any use of this email is prohibited when received in error. Impetus 
does not represent, warrant and/or guarantee, that the integrity of this 
communication has been maintained nor that the 

Re: Spark-shell's performance

2017-04-20 Thread Yan Facai
Hi, Hanson.
Perhaps I’m digressing here.
If I'm wrong or mistake, please correct me.

SPARK_WORKER_* is the configuration for whole cluster, and it's fine to
write those global variable in spark-env.sh.
However,
SPARK_DRIVER_* and SPARK_EXECUTOR_* is the configuration for application
(your code), perhaps it's better to pass the argument to spark-shell
directly, like:
```bash
spark-shell --driver-memory 8G --executor-cores 4 --executor-memory 2G
```

Tuning the configuration for application is a good start, and passing them
to spark-shell directly is easier to test.

For more details see:
+ `spark-shell -h`
+ http://spark.apache.org/docs/latest/submitting-applications.html
+ http://spark.apache.org/docs/latest/spark-standalone.html




On Mon, Apr 17, 2017 at 6:18 PM, Richard Hanson  wrote:

> I am playing with some data using (stand alone) spark-shell (Spark version
> 1.6.0) by executing `spark-shell`. The flow is simple; a bit like cp -
> basically moving local 100k files (the max size is 190k) to S3. Memory is
> configured as below
>
>
> export SPARK_DRIVER_MEMORY=8192M
> export SPARK_WORKER_CORES=1
> export SPARK_WORKER_MEMORY=8192M
> export SPARK_EXECUTOR_CORES=4
> export SPARK_EXECUTOR_MEMORY=2048M
>
>
> But total time spent on moving those files to S3 took roughly 30 mins. The
> resident memory I found is roughly 3.820g (checking with top -p ).
> This seems to me there are still rooms to speed it up, though this is only
> for testing purpose. So I would like to know if any other parameters I can
> change to improve spark-shell's performance? Is the memory setup above
> correct?
>
>
> Thanks.
>