How to do multiple join in pyspark

2017-02-26 Thread lovemoon
This is my code as below:


cfg = SparkConf().setAppName('MyApp')
spark = SparkSession.builder.config(conf=cfg).getOrCreate()


rdd1 = spark.createDataFrame([(1, 'a'), (2, 'b'), (4, 'c')], ['idx', 'val'])
rdd1.registerTempTable('rdd1')
rdd2 = spark.createDataFrame([(1, 2, 100), (1, 3, 200), (2, 3, 300)], ['key1', 
'key2', 'val'])
rdd2.registerTempTable('rdd2')


what_i_want = spark.sql("""
select
*
from rdd2 a
left outer join rdd1 b
on a.key1 = b.idx
left outer join rdd1 c
on a.key2 = c.idx
""")
what_i_want.show()


try_to_use_API = rdd2.join(rdd1, on=[rdd2['key1'] == rdd1['idx']], 
how='left_outer') \
.join(rdd1, on=[rdd2['key2'] == rdd1['idx']], how='left_outer')
try_to_use_API.show()


But try_to_use_API does not work as well and rais error:
pyspark.sql.utils.AnalysisException: u'Both sides of this join are outside the 
broadcasting threshold and computing it could be prohibitively expensive. To 
explicitly enable it, please set spark.sql.crossJoin.enabled = true;'
How can I fix this 
Thanks  

Re: Spark runs out of memory with small file

2017-02-26 Thread Pavel Plotnikov
Hi, Henry

In first example the dict d always contains only one value because the_Id
is same, in second case duct grows very quickly.
So, I can suggest to firstly apply map function to split you file with
string on rows then please make repartition and then apply custom logic

Example:

def splitf(s):
return s.split("\n")

rdd.flatmap(splitf).repartition(1000).map(your function)

Best,
Pavel

On Mon, 27 Feb 2017, 06:28 Henry Tremblay,  wrote:

> Not sure where you want me to put yield. My first try caused an error in
> Spark that it could not pickle generator objects.
>
> On 02/26/2017 03:25 PM, ayan guha wrote:
>
> Hi
>
> We are doing similar stuff, but with large number of small-ish files. What
> we do is write a function to parse a complete file, similar to your parse
> file. But we use yield, instead of return and flatmap on top of it. Can you
> give it a try and let us know if it works?
>
> On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers  wrote:
>
> using wholeFiles to process formats that can not be split per line is not
> "old"
>
> and there are plenty of problems for which RDD is still better suited than
> Dataset or DataFrame currently (this might change in near future when
> Dataset gets some crucial optimizations fixed).
>
> On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
> Hi Henry,
>
> Those guys in Databricks training are nuts and still use Spark 1.x for
> their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
> using SPARK.
>
> The core engine of SPARK, which even I understand, has gone through
> several fundamental changes.
>
> Just try reading the file using dataframes and try using SPARK 2.1.
>
> In other words it may be of tremendous benefit if you were learning to
> solve problems which exists rather than problems which does not exist any
> more.
>
> Please let me know in case I can be of any further help.
>
> Regards,
> Gourav
>
> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay 
> wrote:
>
> The file is so small that a stand alone python script, independent of
> spark, can process the file in under a second.
>
> Also, the following fails:
>
> 1. Read the whole file in with wholeFiles
>
> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
> line="line")
>
> 3. Save the results as CVS to HDFS
>
> 4. Read the files (there are 20) from HDFS into a df using
> sqlContext.read.csv()
>
> 5. Convert the df to an rdd.
>
> 6 Create key value pairs with the key being the file path and the value
> being the line.
>
> 7 Iterate through values
>
> What happens is Spark either runs out of memory, or, in my last try with a
> slight variation, just hangs for 12 hours.
>
> Henry
>
> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>
> Hi, Tremblay.
> Your file is .gz format, which is not splittable for hadoop. Perhaps the
> file is loaded by only one executor.
> How many executors do you start?
> Perhaps repartition method could solve it, I guess.
>
>
> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay 
> wrote:
>
> I am reading in a single small file from hadoop with wholeText. If I
> process each line and create a row with two cells, the first cell equal to
> the name of the file, the second cell equal to the line. That code runs
> fine.
>
> But if I just add two line of code and change the first cell based on
> parsing a line, spark runs out of memory. Any idea why such a simple
> process that would succeed quickly in a non spark application fails?
>
> Thanks!
>
> Henry
>
> CODE:
>
> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
> 3816096
> /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz
>
>
> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
> In [2]: rdd1.count()
> Out[2]: 1
>
>
> In [4]: def process_file(s):
>...: text = s[1]
>...: the_id = s[0]
>...: d = {}
>...: l =  text.split("\n")
>...: final = []
>...: for line in l:
>...: d[the_id] = line
>...: final.append(Row(**d))
>...: return final
>...:
>
> In [5]: rdd2 = rdd1.map(process_file)
>
> In [6]: rdd2.count()
> Out[6]: 1
>
> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>
> In [8]: rdd3.count()
> Out[8]: 508310
>
> In [9]: rdd3.take(1)
> Out[9]: [Row(hdfs://ip-172-31-35-67.us
> -west-2.compute.internal:8020/mnt/temp/CC-MAIN-2017011609512
> 3-00570-ip-10-171-10-70.ec2.internal.warc.gz='WARC/1.0\r')]
>
> In [10]: def process_file(s):
> ...: text = s[1]
> ...: d = {}
> ...: l =  text.split("\n")
> ...: final = []
> ...: the_id = "init"
> ...: for line in l:
> ...: if line[0:15] == 'WARC-Record-ID:':
> ...: the_id = line[15:]
> ...: d[the_id] = line
> ...: final.append(Row(**d))
> ...: return final
>
> In [12]: rdd2 = rdd1.map(process_file)
>
> In [13]: rdd2.count()
> 

Thrift server does not respect hive.server2.enable.doAs=true

2017-02-26 Thread yuyong . zhai



Re: Custom log4j.properties on AWS EMR

2017-02-26 Thread Prithish
Steve, I tried that, but didn't work. Any other ideas?

On Mon, Feb 27, 2017 at 1:42 AM, Steve Loughran 
wrote:

> try giving a resource of a file in the JAR, e.g add a file
> "log4j-debugging.properties into the jar, and give a config option of
> -Dlog4j.configuration=/log4j-debugging.properties   (maybe also try
> without the "/")
>
>
> On 26 Feb 2017, at 16:31, Prithish  wrote:
>
> Hoping someone can answer this.
>
> I am unable to override and use a Custom log4j.properties on Amazon EMR. I
> am running Spark on EMR (Yarn) and have tried all the below combinations in
> the Spark-Submit to try and use the custom log4j.
>
> In Client mode
> --driver-java-options "-Dlog4j.configuration=hdfs://
> host:port/user/hadoop/log4j.properties"
>
> In Cluster mode
> --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=hdfs://host:
> port/user/hadoop/log4j.properties"
>
> I have also tried picking from local filesystem using file: instead
> of hdfs. None of this seem to work. However, I can get this working when
> running on my local Yarn setup.
>
> Any ideas?
>
> I have also posted on Stackoverflow (link below)
> http://stackoverflow.com/questions/42452622/custom-
> log4j-properties-on-aws-emr
>
>
>


Getting unrecoverable exception: java.lang.NullPointerException when trying to find wordcount in kafka topic

2017-02-26 Thread Mina Aslani
Hi,

I am trying to submit a job to spark to count number of words in a specific
kafka topic but I get below exception when I check the log:

. failed with unrecoverable exception: java.lang.NullPointerException


The command that I run follows:

./scripts/dm-spark-submit.sh  --class
org.apache.spark.examples.streaming.DirectKafkaWordCount zookeeper:2181
 my-topic --executor-memory 1G --total-executor-cores 2
/spark-apps/spark-examples-SNAPSHOT.jar

Please note that my spark, kafka, zookeeper are running in different
containers named, spark,kafka,zookeeper.

I tried the wordCount in java streaming as well, same error exists!

Any idea about cause of the error?



Kindest regards,
Mina


Re: Spark runs out of memory with small file

2017-02-26 Thread Henry Tremblay
Not sure where you want me to put yield. My first try caused an error in 
Spark that it could not pickle generator objects.



On 02/26/2017 03:25 PM, ayan guha wrote:

Hi

We are doing similar stuff, but with large number of small-ish files. 
What we do is write a function to parse a complete file, similar to 
your parse file. But we use yield, instead of return and flatmap on 
top of it. Can you give it a try and let us know if it works?


On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers > wrote:


using wholeFiles to process formats that can not be split per line
is not "old"

and there are plenty of problems for which RDD is still better
suited than Dataset or DataFrame currently (this might change in
near future when Dataset gets some crucial optimizations fixed).

On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta
> wrote:

Hi Henry,

Those guys in Databricks training are nuts and still use Spark
1.x for their exams. Learning SPARK is a VERY VERY VERY old
way of solving problems using SPARK.

The core engine of SPARK, which even I understand, has gone
through several fundamental changes.

Just try reading the file using dataframes and try using SPARK
2.1.

In other words it may be of tremendous benefit if you were
learning to solve problems which exists rather than problems
which does not exist any more.

Please let me know in case I can be of any further help.

Regards,
Gourav

On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay
> wrote:

The file is so small that a stand alone python script,
independent of spark, can process the file in under a second.

Also, the following fails:

1. Read the whole file in with wholeFiles

2. use flatMap to get 50,000 rows that looks like:
Row(id="path", line="line")

3. Save the results as CVS to HDFS

4. Read the files (there are 20) from HDFS into a df using
sqlContext.read.csv()

5. Convert the df to an rdd.

6 Create key value pairs with the key being the file path
and the value being the line.

7 Iterate through values

What happens is Spark either runs out of memory, or, in my
last try with a slight variation, just hangs for 12 hours.

Henry


On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:

Hi, Tremblay.
Your file is .gz format, which is not splittable for
hadoop. Perhaps the file is loaded by only one executor.
How many executors do you start?
Perhaps repartition method could solve it, I guess.


On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay
> wrote:

I am reading in a single small file from hadoop with
wholeText. If I process each line and create a row
with two cells, the first cell equal to the name of
the file, the second cell equal to the line. That
code runs fine.

But if I just add two line of code and change the
first cell based on parsing a line, spark runs out of
memory. Any idea why such a simple process that would
succeed quickly in a non spark application fails?

Thanks!

Henry

CODE:

[hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
3816096

/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz


In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
In [2]: rdd1.count()
Out[2]: 1


In [4]: def process_file(s):
   ...: text = s[1]
   ...: the_id = s[0]
   ...: d = {}
   ...: l = text.split("\n")
   ...: final = []
   ...: for line in l:
   ...: d[the_id] = line
   ...:  final.append(Row(**d))
   ...: return final
   ...:

In [5]: rdd2 = rdd1.map(process_file)

In [6]: rdd2.count()
Out[6]: 1

In [7]: rdd3 = rdd2.flatMap(lambda x: x)

In [8]: rdd3.count()
Out[8]: 508310

In [9]: rdd3.take(1)
Out[9]: [Row(hdfs://ip-172-31-35-67.us


java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext

2017-02-26 Thread lk_spark
hi,all:
   I want to extract some info from kafka useing sparkstream,my code like :
   
val keyword = ""
val system = "dmp"
val datetime_idx = 0
val datetime_length = 23
val logLevelBeginIdx = datetime_length + 2 - 1
val logLevelMaxLenght = 5
   
val lines = messages.filter(record => 
record.value().matches("\\d{4}.*")).map(record => {
  val assembly = record.topic()
  val value = record.value
  val datatime = value.substring(datetime_idx, datetime_length - 1)
  val level = value.substring(logLevelBeginIdx, logLevelBeginIdx + 
logLevelMaxLenght - 1)
  (assembly,value,datatime,level)
})

I will get error :
Caused by: java.io.NotSerializableException: 
org.apache.spark.streaming.StreamingContext
Serialization stack:
 - object not serializable (class: org.apache.spark.streaming.StreamingContext, 
value: org.apache.spark.streaming.StreamingContext@5a457aa1)
 - field (class: $iw, name: streamingContext, type: class 
org.apache.spark.streaming.StreamingContext)
 - object (class $iw, $iw@38eb2140)
 - field (class: $iw, name: $iw, type: class $iw)
 - object (class $iw, $iw@2a3ced3d)
 - field (class: $iw, name: $iw, type: class $iw)
 - object (class $iw, $iw@7c5dbca5)

==
  if I change the parameter to constant I will not got error  :
  
  val lines = messages.filter(record => 
record.value().matches("\\d{4}.*")).map(record => {
  val assembly = record.topic()
  val value = record.value
  val datatime = value.substring(0, 22)
  val level = value.substring(24, 27)
  (assembly,value,datatime,level)
  
})

how can I pass parameter to the map function.

2017-02-27


lk_spark 

回复:Spark SQL table authority control?

2017-02-26 Thread yuyong . zhai
https://issues.apache.org/jira/browse/SPARK-8321

翟玉勇 数据架构 ELEME Inc.
Email: yuyong.z...@ele.me  | Mobile:15221559674
http://ele.me 饿了么

 原始邮件
发件人: 李斌松
收件人: user
发送时间: 2017年2月26日(周日) 11:50
主题: Spark SQL table authority control?


Through the JDBC connection spark thriftserver, execte hive SQL, check whether 
the table read or write permission to expand hook in hive on spark, you can 
control permissions, spark on hive what is the point of expansion?


Re: SPark - YARN Cluster Mode

2017-02-26 Thread ayan guha
Also, I wanted to add if I specify the conf in the command line, it seems
to be working.

For example, if I use

spark-submit --master yarn --deploy-mode cluster --conf
spark.yarn.queue=root.Application ayan_test.py 10

Then it is going to correct queue.

Any help would be great

Best
Ayan

On Mon, Feb 27, 2017 at 11:52 AM, ayan guha  wrote:

> Hi
>
> I am facing an issue with Cluster Mode, with pyspark
>
> Here is my code:
>
> conf = SparkConf()
> conf.setAppName("Spark Ingestion")
> conf.set("spark.yarn.queue","root.Applications")
> conf.set("spark.executor.instances","50")
> conf.set("spark.executor.memory","22g")
> conf.set("spark.yarn.executor.memoryOverhead","4096")
> conf.set("spark.executor.cores","4")
> conf.set("spark.sql.hive.convertMetastoreParquet", "false")
> sc = SparkContext(conf = conf)
> sqlContext = HiveContext(sc)
>
> r = sc.parallelize(xrange(1,1))
> print r.count()
>
> sc.stop()
>
> The problem is none of my Config settings are passed on to Yarn.
>
> spark-submit --master yarn --deploy-mode cluster ayan_test.py
>
> I tried the same code with deploy-mode=client and all config are passing
> fine.
>
> Am I missing something? Will introducing --property-file be of any help?
> Can anybody share some working example?
>
> Best
> Ayan
>
> --
> Best Regards,
> Ayan Guha
>



-- 
Best Regards,
Ayan Guha


SPark - YARN Cluster Mode

2017-02-26 Thread ayan guha
Hi

I am facing an issue with Cluster Mode, with pyspark

Here is my code:

conf = SparkConf()
conf.setAppName("Spark Ingestion")
conf.set("spark.yarn.queue","root.Applications")
conf.set("spark.executor.instances","50")
conf.set("spark.executor.memory","22g")
conf.set("spark.yarn.executor.memoryOverhead","4096")
conf.set("spark.executor.cores","4")
conf.set("spark.sql.hive.convertMetastoreParquet", "false")
sc = SparkContext(conf = conf)
sqlContext = HiveContext(sc)

r = sc.parallelize(xrange(1,1))
print r.count()

sc.stop()

The problem is none of my Config settings are passed on to Yarn.

spark-submit --master yarn --deploy-mode cluster ayan_test.py

I tried the same code with deploy-mode=client and all config are passing
fine.

Am I missing something? Will introducing --property-file be of any help?
Can anybody share some working example?

Best
Ayan

-- 
Best Regards,
Ayan Guha


Re: Spark runs out of memory with small file

2017-02-26 Thread ayan guha
Hi

We are doing similar stuff, but with large number of small-ish files. What
we do is write a function to parse a complete file, similar to your parse
file. But we use yield, instead of return and flatmap on top of it. Can you
give it a try and let us know if it works?

On Mon, Feb 27, 2017 at 9:02 AM, Koert Kuipers  wrote:

> using wholeFiles to process formats that can not be split per line is not
> "old"
>
> and there are plenty of problems for which RDD is still better suited than
> Dataset or DataFrame currently (this might change in near future when
> Dataset gets some crucial optimizations fixed).
>
> On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi Henry,
>>
>> Those guys in Databricks training are nuts and still use Spark 1.x for
>> their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
>> using SPARK.
>>
>> The core engine of SPARK, which even I understand, has gone through
>> several fundamental changes.
>>
>> Just try reading the file using dataframes and try using SPARK 2.1.
>>
>> In other words it may be of tremendous benefit if you were learning to
>> solve problems which exists rather than problems which does not exist any
>> more.
>>
>> Please let me know in case I can be of any further help.
>>
>> Regards,
>> Gourav
>>
>> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay 
>> wrote:
>>
>>> The file is so small that a stand alone python script, independent of
>>> spark, can process the file in under a second.
>>>
>>> Also, the following fails:
>>>
>>> 1. Read the whole file in with wholeFiles
>>>
>>> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
>>> line="line")
>>>
>>> 3. Save the results as CVS to HDFS
>>>
>>> 4. Read the files (there are 20) from HDFS into a df using
>>> sqlContext.read.csv()
>>>
>>> 5. Convert the df to an rdd.
>>>
>>> 6 Create key value pairs with the key being the file path and the value
>>> being the line.
>>>
>>> 7 Iterate through values
>>>
>>> What happens is Spark either runs out of memory, or, in my last try with
>>> a slight variation, just hangs for 12 hours.
>>>
>>> Henry
>>>
>>> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>>>
>>> Hi, Tremblay.
>>> Your file is .gz format, which is not splittable for hadoop. Perhaps the
>>> file is loaded by only one executor.
>>> How many executors do you start?
>>> Perhaps repartition method could solve it, I guess.
>>>
>>>
>>> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay >> > wrote:
>>>
 I am reading in a single small file from hadoop with wholeText. If I
 process each line and create a row with two cells, the first cell equal to
 the name of the file, the second cell equal to the line. That code runs
 fine.

 But if I just add two line of code and change the first cell based on
 parsing a line, spark runs out of memory. Any idea why such a simple
 process that would succeed quickly in a non spark application fails?

 Thanks!

 Henry

 CODE:

 [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.i
 nternal.warc.gz


 In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
 In [2]: rdd1.count()
 Out[2]: 1


 In [4]: def process_file(s):
...: text = s[1]
...: the_id = s[0]
...: d = {}
...: l =  text.split("\n")
...: final = []
...: for line in l:
...: d[the_id] = line
...: final.append(Row(**d))
...: return final
...:

 In [5]: rdd2 = rdd1.map(process_file)

 In [6]: rdd2.count()
 Out[6]: 1

 In [7]: rdd3 = rdd2.flatMap(lambda x: x)

 In [8]: rdd3.count()
 Out[8]: 508310

 In [9]: rdd3.take(1)
 Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
 mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
 ternal.warc.gz='WARC/1.0\r')]

 In [10]: def process_file(s):
 ...: text = s[1]
 ...: d = {}
 ...: l =  text.split("\n")
 ...: final = []
 ...: the_id = "init"
 ...: for line in l:
 ...: if line[0:15] == 'WARC-Record-ID:':
 ...: the_id = line[15:]
 ...: d[the_id] = line
 ...: final.append(Row(**d))
 ...: return final

 In [12]: rdd2 = rdd1.map(process_file)

 In [13]: rdd2.count()
 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
 ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN
 for exceeding memory limits. 10.3 GB of 10.3 GB physical memory used.
 Consider boosting spark.yarn.executor.memoryOverhead.
 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
 Container 

Re: Spark runs out of memory with small file

2017-02-26 Thread Koert Kuipers
using wholeFiles to process formats that can not be split per line is not
"old"

and there are plenty of problems for which RDD is still better suited than
Dataset or DataFrame currently (this might change in near future when
Dataset gets some crucial optimizations fixed).

On Sun, Feb 26, 2017 at 3:14 PM, Gourav Sengupta 
wrote:

> Hi Henry,
>
> Those guys in Databricks training are nuts and still use Spark 1.x for
> their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
> using SPARK.
>
> The core engine of SPARK, which even I understand, has gone through
> several fundamental changes.
>
> Just try reading the file using dataframes and try using SPARK 2.1.
>
> In other words it may be of tremendous benefit if you were learning to
> solve problems which exists rather than problems which does not exist any
> more.
>
> Please let me know in case I can be of any further help.
>
> Regards,
> Gourav
>
> On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay 
> wrote:
>
>> The file is so small that a stand alone python script, independent of
>> spark, can process the file in under a second.
>>
>> Also, the following fails:
>>
>> 1. Read the whole file in with wholeFiles
>>
>> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
>> line="line")
>>
>> 3. Save the results as CVS to HDFS
>>
>> 4. Read the files (there are 20) from HDFS into a df using
>> sqlContext.read.csv()
>>
>> 5. Convert the df to an rdd.
>>
>> 6 Create key value pairs with the key being the file path and the value
>> being the line.
>>
>> 7 Iterate through values
>>
>> What happens is Spark either runs out of memory, or, in my last try with
>> a slight variation, just hangs for 12 hours.
>>
>> Henry
>>
>> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>>
>> Hi, Tremblay.
>> Your file is .gz format, which is not splittable for hadoop. Perhaps the
>> file is loaded by only one executor.
>> How many executors do you start?
>> Perhaps repartition method could solve it, I guess.
>>
>>
>> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay 
>> wrote:
>>
>>> I am reading in a single small file from hadoop with wholeText. If I
>>> process each line and create a row with two cells, the first cell equal to
>>> the name of the file, the second cell equal to the line. That code runs
>>> fine.
>>>
>>> But if I just add two line of code and change the first cell based on
>>> parsing a line, spark runs out of memory. Any idea why such a simple
>>> process that would succeed quickly in a non spark application fails?
>>>
>>> Thanks!
>>>
>>> Henry
>>>
>>> CODE:
>>>
>>> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
>>> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.i
>>> nternal.warc.gz
>>>
>>>
>>> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>>> In [2]: rdd1.count()
>>> Out[2]: 1
>>>
>>>
>>> In [4]: def process_file(s):
>>>...: text = s[1]
>>>...: the_id = s[0]
>>>...: d = {}
>>>...: l =  text.split("\n")
>>>...: final = []
>>>...: for line in l:
>>>...: d[the_id] = line
>>>...: final.append(Row(**d))
>>>...: return final
>>>...:
>>>
>>> In [5]: rdd2 = rdd1.map(process_file)
>>>
>>> In [6]: rdd2.count()
>>> Out[6]: 1
>>>
>>> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>>
>>> In [8]: rdd3.count()
>>> Out[8]: 508310
>>>
>>> In [9]: rdd3.take(1)
>>> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
>>> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>>> ternal.warc.gz='WARC/1.0\r')]
>>>
>>> In [10]: def process_file(s):
>>> ...: text = s[1]
>>> ...: d = {}
>>> ...: l =  text.split("\n")
>>> ...: final = []
>>> ...: the_id = "init"
>>> ...: for line in l:
>>> ...: if line[0:15] == 'WARC-Record-ID:':
>>> ...: the_id = line[15:]
>>> ...: d[the_id] = line
>>> ...: final.append(Row(**d))
>>> ...: return final
>>>
>>> In [12]: rdd2 = rdd1.map(process_file)
>>>
>>> In [13]: rdd2.count()
>>> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
>>> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN
>>> for exceeding memory limits. 10.3 GB of 10.3 GB physical memory used.
>>> Consider boosting spark.yarn.executor.memoryOverhead.
>>> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
>>> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
>>> physical memory used. Consider boosting spark.yarn.executor.memoryOver
>>> head.
>>> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID
>>> 5, ip-172-31-41-89.us-west-2.compute.internal, executor 5):
>>> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
>>> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
>>> 10.3 GB physical memory used. Consider boosting
>>> 

Re: Spark runs out of memory with small file

2017-02-26 Thread Henry Tremblay
I am actually using Spark 2.1 and trying to solve a real life problem. 
Unfortunately, some of the discussion of my problem went off line, and 
then I started a new thread.


Here is my problem. I am parsing crawl data which exists in a flat file 
format. It looks like this:


u'WARC/1.0',
 u'WARC-Type: warcinfo',
 u'WARC-Date: 2016-12-08T13:00:23Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 344',
 u'Content-Type: application/warc-fields',
 u'WARC-Filename: 
CC-MAIN-20161202170900-0-ip-10-31-129-80.ec2.internal.warc.gz',

 u'',
 u'robots: classic',
 u'hostname: ip-10-31-129-80.ec2.internal',
 u'software: Nutch 1.6 (CC)/CC WarcExport 1.0',
 u'isPartOf: CC-MAIN-2016-50',
 u'operator: CommonCrawl Admin',
 u'description: Wide crawl of the web for November 2016',
 u'publisher: CommonCrawl',
 u'format: WARC File Format 1.0',
 u'conformsTo: 
http://bibnum.bnf.fr/WARC/WARC_ISO_28500_version1_latestdraft.pdf 
',

 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: request',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 220',
 u'Content-Type: application/http; msgtype=request',
 u'WARC-Warcinfo-ID: ',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/ 
',

 u'',
 u'GET /blog/ HTTP/1.0',
 u'Host: 1018201.vkrugudruzei.ru ',
 u'Accept-Encoding: x-gzip, gzip, deflate',
 u'User-Agent: CCBot/2.0 (http://commoncrawl.org/faq/) 
',
 u'Accept: 
text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',

 u'',
 u'',
 u'',
 u'WARC/1.0',
 u'WARC-Type: response',
 u'WARC-Date: 2016-12-02T17:54:09Z',
 u'WARC-Record-ID: ',
 u'Content-Length: 577',
 u'Content-Type: application/http; msgtype=response',
 u'WARC-Warcinfo-ID: ',
 u'WARC-Concurrent-To: ',
 u'WARC-IP-Address: 217.197.115.133',
 u'WARC-Target-URI: http://1018201.vkrugudruzei.ru/blog/ 
',

 u'WARC-Payload-Digest: sha1:Y4TZFLB6UTXHU4HUVONBXC5NZQW2LYMM',
 u'WARC-Block-Digest: sha1:3J7HHBMWTSC7W53DDB7BHTUVPM26QS4B',
 u'']

I want to turn it into something like this:

Row(warc-type='request',warc-
date='2016-12-02'. 
ward-record-id='> wrote:


The file is so small that a stand alone python script, independent
of spark, can process the file in under a second.

Also, the following fails:

1. Read the whole file in with wholeFiles

2. use flatMap to get 50,000 rows that looks like: Row(id="path",
line="line")

3. Save the results as CVS to HDFS

4. Read the files (there are 20) from HDFS into a df using
sqlContext.read.csv()

5. Convert the df to an rdd.

6 Create key value pairs with the key being the file path and the
value being the line.

7 Iterate through values

What happens is Spark either runs out of memory, or, in my last
try with a slight variation, just hangs for 12 hours.

Henry


On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:

Hi, Tremblay.
Your file is .gz format, which is not splittable for hadoop.
Perhaps the file is loaded by only one executor.
How many executors do you start?
Perhaps repartition method could solve it, I guess.


On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay
> wrote:

I am reading in a single small file from hadoop with
wholeText. If I process each line and create a row with two
cells, the first cell equal to the name of the file, the
second cell equal to the line. That code runs fine.

But if I just add two line of code and change the first cell
based on parsing a line, spark runs out of memory. Any idea
why such a simple process that would succeed quickly in a non
spark application fails?

Thanks!

Henry

CODE:

[hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
3816096


Re: In Spark streaming, will saved kafka offsets become invalid if I change the number of partitions in a kafka topic?

2017-02-26 Thread shyla deshpande
Please help!

On Sat, Feb 25, 2017 at 11:10 PM, shyla deshpande 
wrote:

> I am commiting offsets to Kafka after my output has been stored, using the
> commitAsync API.
>
> My question is if I increase/decrease the number of kafka partitions, will
> the saved offsets will become invalid.
>
> Thanks
>


Re: Spark runs out of memory with small file

2017-02-26 Thread Gourav Sengupta
Hi Henry,

Those guys in Databricks training are nuts and still use Spark 1.x for
their exams. Learning SPARK is a VERY VERY VERY old way of solving problems
using SPARK.

The core engine of SPARK, which even I understand, has gone through several
fundamental changes.

Just try reading the file using dataframes and try using SPARK 2.1.

In other words it may be of tremendous benefit if you were learning to
solve problems which exists rather than problems which does not exist any
more.

Please let me know in case I can be of any further help.

Regards,
Gourav

On Sun, Feb 26, 2017 at 7:09 PM, Henry Tremblay 
wrote:

> The file is so small that a stand alone python script, independent of
> spark, can process the file in under a second.
>
> Also, the following fails:
>
> 1. Read the whole file in with wholeFiles
>
> 2. use flatMap to get 50,000 rows that looks like: Row(id="path",
> line="line")
>
> 3. Save the results as CVS to HDFS
>
> 4. Read the files (there are 20) from HDFS into a df using
> sqlContext.read.csv()
>
> 5. Convert the df to an rdd.
>
> 6 Create key value pairs with the key being the file path and the value
> being the line.
>
> 7 Iterate through values
>
> What happens is Spark either runs out of memory, or, in my last try with a
> slight variation, just hangs for 12 hours.
>
> Henry
>
> On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:
>
> Hi, Tremblay.
> Your file is .gz format, which is not splittable for hadoop. Perhaps the
> file is loaded by only one executor.
> How many executors do you start?
> Perhaps repartition method could solve it, I guess.
>
>
> On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay 
> wrote:
>
>> I am reading in a single small file from hadoop with wholeText. If I
>> process each line and create a row with two cells, the first cell equal to
>> the name of the file, the second cell equal to the line. That code runs
>> fine.
>>
>> But if I just add two line of code and change the first cell based on
>> parsing a line, spark runs out of memory. Any idea why such a simple
>> process that would succeed quickly in a non spark application fails?
>>
>> Thanks!
>>
>> Henry
>>
>> CODE:
>>
>> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
>> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.i
>> nternal.warc.gz
>>
>>
>> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
>> In [2]: rdd1.count()
>> Out[2]: 1
>>
>>
>> In [4]: def process_file(s):
>>...: text = s[1]
>>...: the_id = s[0]
>>...: d = {}
>>...: l =  text.split("\n")
>>...: final = []
>>...: for line in l:
>>...: d[the_id] = line
>>...: final.append(Row(**d))
>>...: return final
>>...:
>>
>> In [5]: rdd2 = rdd1.map(process_file)
>>
>> In [6]: rdd2.count()
>> Out[6]: 1
>>
>> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>>
>> In [8]: rdd3.count()
>> Out[8]: 508310
>>
>> In [9]: rdd3.take(1)
>> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
>> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.in
>> ternal.warc.gz='WARC/1.0\r')]
>>
>> In [10]: def process_file(s):
>> ...: text = s[1]
>> ...: d = {}
>> ...: l =  text.split("\n")
>> ...: final = []
>> ...: the_id = "init"
>> ...: for line in l:
>> ...: if line[0:15] == 'WARC-Record-ID:':
>> ...: the_id = line[15:]
>> ...: d[the_id] = line
>> ...: final.append(Row(**d))
>> ...: return final
>>
>> In [12]: rdd2 = rdd1.map(process_file)
>>
>> In [13]: rdd2.count()
>> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
>> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN for
>> exceeding memory limits. 10.3 GB of 10.3 GB physical memory used. Consider
>> boosting spark.yarn.executor.memoryOverhead.
>> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
>> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
>> physical memory used. Consider boosting spark.yarn.executor.memoryOver
>> head.
>> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5,
>> ip-172-31-41-89.us-west-2.compute.internal, executor 5):
>> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
>> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
>> 10.3 GB physical memory used. Consider boosting
>> spark.yarn.executor.memoryOverhead.
>>
>>
>> --
>> Henry Tremblay
>> Robert Half Technology
>>
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> Henry Tremblay
> Robert Half Technology
>
>


Re: Custom log4j.properties on AWS EMR

2017-02-26 Thread Steve Loughran
try giving a resource of a file in the JAR, e.g add a file 
"log4j-debugging.properties into the jar, and give a config option of 
-Dlog4j.configuration=/log4j-debugging.properties   (maybe also try without the 
"/")


On 26 Feb 2017, at 16:31, Prithish 
> wrote:

Hoping someone can answer this.

I am unable to override and use a Custom log4j.properties on Amazon EMR. I am 
running Spark on EMR (Yarn) and have tried all the below combinations in the 
Spark-Submit to try and use the custom log4j.

In Client mode
--driver-java-options 
"-Dlog4j.configuration=hdfs://host:port/user/hadoop/log4j.properties"

In Cluster mode
--conf 
"spark.driver.extraJavaOptions=-Dlog4j.configuration=hdfs://host:port/user/hadoop/log4j.properties"

I have also tried picking from local filesystem using file: instead of 
hdfs. None of this seem to work. However, I can get this working when running 
on my local Yarn setup.

Any ideas?

I have also posted on Stackoverflow (link below)
http://stackoverflow.com/questions/42452622/custom-log4j-properties-on-aws-emr



Are we still dependent on Guava jar in Spark 2.1.0 as well?

2017-02-26 Thread kant kodali
Are we still dependent on Guava jar in Spark 2.1.0 as well (Given Guava jar
incompatibility issues)?


Re: Spark runs out of memory with small file

2017-02-26 Thread Henry Tremblay
The file is so small that a stand alone python script, independent of 
spark, can process the file in under a second.


Also, the following fails:

1. Read the whole file in with wholeFiles

2. use flatMap to get 50,000 rows that looks like: Row(id="path", 
line="line")


3. Save the results as CVS to HDFS

4. Read the files (there are 20) from HDFS into a df using 
sqlContext.read.csv()


5. Convert the df to an rdd.

6 Create key value pairs with the key being the file path and the value 
being the line.


7 Iterate through values

What happens is Spark either runs out of memory, or, in my last try with 
a slight variation, just hangs for 12 hours.


Henry


On 02/26/2017 03:31 AM, 颜发才(Yan Facai) wrote:

Hi, Tremblay.
Your file is .gz format, which is not splittable for hadoop. Perhaps 
the file is loaded by only one executor.

How many executors do you start?
Perhaps repartition method could solve it, I guess.


On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay 
> wrote:


I am reading in a single small file from hadoop with wholeText. If
I process each line and create a row with two cells, the first
cell equal to the name of the file, the second cell equal to the
line. That code runs fine.

But if I just add two line of code and change the first cell based
on parsing a line, spark runs out of memory. Any idea why such a
simple process that would succeed quickly in a non spark
application fails?

Thanks!

Henry

CODE:

[hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
3816096
/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz


In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
In [2]: rdd1.count()
Out[2]: 1


In [4]: def process_file(s):
   ...: text = s[1]
   ...: the_id = s[0]
   ...: d = {}
   ...: l =  text.split("\n")
   ...: final = []
   ...: for line in l:
   ...: d[the_id] = line
   ...: final.append(Row(**d))
   ...: return final
   ...:

In [5]: rdd2 = rdd1.map(process_file)

In [6]: rdd2.count()
Out[6]: 1

In [7]: rdd3 = rdd2.flatMap(lambda x: x)

In [8]: rdd3.count()
Out[8]: 508310

In [9]: rdd3.take(1)
Out[9]: [Row(hdfs://ip-172-31-35-67.us

-west-2.compute.internal:8020/mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.internal.warc.gz='WARC/1.0\r')]

In [10]: def process_file(s):
...: text = s[1]
...: d = {}
...: l =  text.split("\n")
...: final = []
...: the_id = "init"
...: for line in l:
...: if line[0:15] == 'WARC-Record-ID:':
...: the_id = line[15:]
...: d[the_id] = line
...: final.append(Row(**d))
...: return final

In [12]: rdd2 = rdd1.map(process_file)

In [13]: rdd2.count()
17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
ip-172-31-41-89.us-west-2.compute.internal: Container killed by
YARN for exceeding memory limits. 10.3 GB of 10.3 GB physical
memory used. Consider boosting spark.yarn.executor.memoryOverhead.
17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
Container killed by YARN for exceeding memory limits. 10.3 GB of
10.3 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.
17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0
(TID 5, ip-172-31-41-89.us-west-2.compute.internal, executor 5):
ExecutorLostFailure (executor 5 exited caused by one of the
running tasks) Reason: Container killed by YARN for exceeding
memory limits. 10.3 GB of 10.3 GB physical memory used. Consider
boosting spark.yarn.executor.memoryOverhead.


-- 
Henry Tremblay

Robert Half Technology


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org





--
Henry Tremblay
Robert Half Technology



Custom log4j.properties on AWS EMR

2017-02-26 Thread Prithish
Hoping someone can answer this.

I am unable to override and use a Custom log4j.properties on Amazon EMR. I
am running Spark on EMR (Yarn) and have tried all the below combinations in
the Spark-Submit to try and use the custom log4j.

In Client mode
--driver-java-options
"-Dlog4j.configuration=hdfs://host:port/user/hadoop/log4j.properties"

In Cluster mode
--conf
"spark.driver.extraJavaOptions=-Dlog4j.configuration=hdfs://host:port/user/hadoop/log4j.properties"

I have also tried picking from local filesystem using file: instead of
hdfs. None of this seem to work. However, I can get this working when
running on my local Yarn setup.

Any ideas?

I have also posted on Stackoverflow (link below)
http://stackoverflow.com/questions/42452622/custom-log4j-properties-on-aws-emr


Re: RDD blocks on Spark Driver

2017-02-26 Thread Prithish
Thanks for the responses, I am running this on Amazon EMR which runs the
Yarn cluster manager.

On Sat, Feb 25, 2017 at 4:45 PM, liangyhg...@gmail.com <
liangyhg...@gmail.com> wrote:

> Hi,
>  I think you are using the local model of Spark. There
> are mainly four models, which are local, standalone,  yarn
> and Mesos. Also, "blocks" is relative to hdfs, "partitions"
>  is relative to spark.
>
> liangyihuai
>
> ---Original---
> *From:* "Jacek Laskowski "
> *Date:* 2017/2/25 02:45:20
> *To:* "prithish";
> *Cc:* "user";
> *Subject:* Re: RDD blocks on Spark Driver
>
> Hi,
>
> Guess you're use local mode which has only one executor called driver. Is
> my guessing correct?
>
> Jacek
>
> On 23 Feb 2017 2:03 a.m.,  wrote:
>
>> Hello,
>>
>> Had a question. When I look at the executors tab in Spark UI, I notice
>> that some RDD blocks are assigned to the driver as well. Can someone please
>> tell me why?
>>
>> Thanks for the help.
>>
>


Re: Disable Spark SQL Optimizations for unit tests

2017-02-26 Thread Stefan Ackermann
I found some ways to get faster unit tests.In the meantime they had gone up
to about an hour.

Apparently defining columns in a for loop makes catalyst very slow, as it
blows up the logical plan with many projections:

  final def castInts(dfIn: DataFrame, castToInts: String*): DataFrame = {
var df = dfIn
for (toBeCasted <- castToInts) {
  df = df.withColumn(toBeCasted, df(toBeCasted).cast(IntegerType))
}
df
  }

This is much faster:

  final def castInts(dfIn: DataFrame, castToInts: String*): DataFrame = {
val columns = dfIn.columns.map { c =>
  if (castToInts.contains(c)) {
dfIn(c).cast(IntegerType)
  } else {
dfIn(c)
  }
}
dfIn.select(columns: _*)
  }

As I consequently applied this to other similar functions the unit tests
went down from 60 to 18 minutes.

Another way to break SQL optimizations was to just save an intermediate
dataframe to HDFS and read from there again. This is quite counter
intuitive, but the unit tests then further went down from 18 minutes to 5.

Is there any other way to add a barrier for catalyst optimizations? As in A
-> B -> C, only optimize A -> B, and B -> C but not the complete A -> C?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Disable-Spark-SQL-Optimizations-for-unit-tests-tp28380p28426.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka Streaming and partitioning

2017-02-26 Thread tonyye
Hi Dave,
I had the same question and was wondering if you had found a way to do the
join without causing a shuffle?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Streaming-and-partitioning-tp25955p28425.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Saving Structured Streaming DF to Hive Partitioned table

2017-02-26 Thread nimrodo
Hi,

I want to load a stream of CSV files to a partitioned Hive table called
myTable.

I tried using Spark 2 Structured Streaming to do that:
val spark = SparkSession
  .builder
  .appName("TrueCallLoade")
  .enableHiveSupport()
  .config("hive.exec.dynamic.partition.mode", "non-strict")
  .config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.max.dynamic.partition", "2048")
  .config("hive.exec.max.dynamic.partition.pernode", "256")
  .getOrCreate()
val df = spark.readStream.option("sep", ",").option("header",
"true").schema(customSchema).csv(fileDirectory)

The dataframe has 2 columns called "dt" and "h" by which the Hive table is
partitioned.

writeStream can't directly stream to a Hive table, so I decided to use
val query =
df.writeStream.queryName("LoadedCSVData").outputMode("Append").format("memory").start()

and then
spark.sql("INSERT INTO myTable SELECT * FROM LoadedCSVData")

This doesn't seem to insert work. Any idea how I can achieve that?

Nimrod



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Saving-Structured-Streaming-DF-to-Hive-Partitioned-table-tp28424.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: attempting to map Dataset[Row]

2017-02-26 Thread Stephen Fletcher
sorry here's the whole code

val source =
spark.read.format("parquet").load("/emrdata/sources/very_large_ds")

implicit val mapEncoder =
org.apache.spark.sql.Encoders.kryo[(Any,ArrayBuffer[Row])]

source.map{ row => {
  val key = row(0)
  val buff = new ArrayBuffer[Row]()
  buff += row
  (key,buff)
   }
}

...

On Sun, Feb 26, 2017 at 7:31 AM, Stephen Fletcher <
stephen.fletc...@gmail.com> wrote:

> I'm attempting to perform a map on a Dataset[Row] but getting an error on
> decode when attempting to pass a custom encoder.
>  My code looks similar to the following:
>
>
> val source = spark.read.format("parquet").load("/emrdata/sources/very_
> large_ds")
>
>
>
> source.map{ row => {
>   val key = row(0)
>
>}
> }
>


attempting to map Dataset[Row]

2017-02-26 Thread Stephen Fletcher
I'm attempting to perform a map on a Dataset[Row] but getting an error on
decode when attempting to pass a custom encoder.
 My code looks similar to the following:


val source =
spark.read.format("parquet").load("/emrdata/sources/very_large_ds")



source.map{ row => {
  val key = row(0)

   }
}


Re: Spark runs out of memory with small file

2017-02-26 Thread Yan Facai
Hi, Tremblay.
Your file is .gz format, which is not splittable for hadoop. Perhaps the
file is loaded by only one executor.
How many executors do you start?
Perhaps repartition method could solve it, I guess.


On Sun, Feb 26, 2017 at 3:33 AM, Henry Tremblay 
wrote:

> I am reading in a single small file from hadoop with wholeText. If I
> process each line and create a row with two cells, the first cell equal to
> the name of the file, the second cell equal to the line. That code runs
> fine.
>
> But if I just add two line of code and change the first cell based on
> parsing a line, spark runs out of memory. Any idea why such a simple
> process that would succeed quickly in a non spark application fails?
>
> Thanks!
>
> Henry
>
> CODE:
>
> [hadoop@ip-172-31-35-67 ~]$ hadoop fs -du /mnt/temp
> 3816096 /mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.
> internal.warc.gz
>
>
> In [1]: rdd1 = sc.wholeTextFiles("/mnt/temp")
> In [2]: rdd1.count()
> Out[2]: 1
>
>
> In [4]: def process_file(s):
>...: text = s[1]
>...: the_id = s[0]
>...: d = {}
>...: l =  text.split("\n")
>...: final = []
>...: for line in l:
>...: d[the_id] = line
>...: final.append(Row(**d))
>...: return final
>...:
>
> In [5]: rdd2 = rdd1.map(process_file)
>
> In [6]: rdd2.count()
> Out[6]: 1
>
> In [7]: rdd3 = rdd2.flatMap(lambda x: x)
>
> In [8]: rdd3.count()
> Out[8]: 508310
>
> In [9]: rdd3.take(1)
> Out[9]: [Row(hdfs://ip-172-31-35-67.us-west-2.compute.internal:8020/
> mnt/temp/CC-MAIN-20170116095123-00570-ip-10-171-10-70.ec2.
> internal.warc.gz='WARC/1.0\r')]
>
> In [10]: def process_file(s):
> ...: text = s[1]
> ...: d = {}
> ...: l =  text.split("\n")
> ...: final = []
> ...: the_id = "init"
> ...: for line in l:
> ...: if line[0:15] == 'WARC-Record-ID:':
> ...: the_id = line[15:]
> ...: d[the_id] = line
> ...: final.append(Row(**d))
> ...: return final
>
> In [12]: rdd2 = rdd1.map(process_file)
>
> In [13]: rdd2.count()
> 17/02/25 19:03:03 ERROR YarnScheduler: Lost executor 5 on
> ip-172-31-41-89.us-west-2.compute.internal: Container killed by YARN for
> exceeding memory limits. 10.3 GB of 10.3 GB physical memory used. Consider
> boosting spark.yarn.executor.memoryOverhead.
> 17/02/25 19:03:03 WARN YarnSchedulerBackend$YarnSchedulerEndpoint:
> Container killed by YARN for exceeding memory limits. 10.3 GB of 10.3 GB
> physical memory used. Consider boosting spark.yarn.executor.memoryOver
> head.
> 17/02/25 19:03:03 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5,
> ip-172-31-41-89.us-west-2.compute.internal, executor 5):
> ExecutorLostFailure (executor 5 exited caused by one of the running tasks)
> Reason: Container killed by YARN for exceeding memory limits. 10.3 GB of
> 10.3 GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.
>
>
> --
> Henry Tremblay
> Robert Half Technology
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: No main class set in JAR; please specify one with --class and java.lang.ClassNotFoundException

2017-02-26 Thread Marco Mistroni
Hi Raymond
 run this command and it should work, provided you have kafka setup a s
well  on localhost at port 2181

spark-submit --packages
org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1  kafka_wordcount.py
localhost:2181 test

But i suggest, if you are a beginner, to use Spark examples' wordcount
instead, as i believe it reads from a local directory rather than setting
up kafka , which is an additional overhead you dont really need
If you want to go ahead with Kafka, the two links below can give you a start

https://dzone.com/articles/running-apache-kafka-on-windows-os   (i believe
similar setup can be used on Linux)
https://spark.apache.org/docs/latest/streaming-kafka-integration.html

kr




On Sat, Feb 25, 2017 at 11:12 PM, Marco Mistroni 
wrote:

> Hi I have a look. At GitHub project tomorrow and let u know. U have a py
> scripts to run and dependencies to specify.. pls check spark docs in
> meantime...I do all my coding in Scala and specify dependencies using
> --packages. ::.
> Kr
>
> On 25 Feb 2017 11:06 pm, "Raymond Xie"  wrote:
>
>> Thank you very much Marco,
>>
>> I am a beginner in this area, is it possible for you to show me what you
>> think the right script should be to get it executed in terminal?
>>
>>
>> **
>> *Sincerely yours,*
>>
>>
>> *Raymond*
>>
>> On Sat, Feb 25, 2017 at 6:00 PM, Marco Mistroni 
>> wrote:
>>
>>> Try to use --packages to include the jars. From error it seems it's
>>> looking for main class in jars but u r running a python script...
>>>
>>> On 25 Feb 2017 10:36 pm, "Raymond Xie"  wrote:
>>>
>>> That's right Anahita, however, the class name is not indicated in the
>>> original github project so I don't know what class should be used here. The
>>> github only says:
>>> and then run the example
>>> `$ bin/spark-submit --jars \
>>> external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar
>>> \
>>> examples/src/main/python/streaming/kafka_wordcount.py \
>>> localhost:2181 test`
>>> """ Can anyone give any thought on how to find out? Thank you very much
>>> in advance.
>>>
>>>
>>> **
>>> *Sincerely yours,*
>>>
>>>
>>> *Raymond*
>>>
>>> On Sat, Feb 25, 2017 at 5:27 PM, Anahita Talebi <
>>> anahita.t.am...@gmail.com> wrote:
>>>
 You're welcome.
 You need to specify the class. I meant like that:

 spark-submit  /usr/hdp/2.5.0.0-1245/spark/l
 ib/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
 --class "give the name of the class"



 On Saturday, February 25, 2017, Raymond Xie 
 wrote:

> Thank you, it is still not working:
>
> [image: Inline image 1]
>
> By the way, here is the original source:
>
> https://github.com/apache/spark/blob/master/examples/src/mai
> n/python/streaming/kafka_wordcount.py
>
>
> **
> *Sincerely yours,*
>
>
> *Raymond*
>
> On Sat, Feb 25, 2017 at 4:48 PM, Anahita Talebi <
> anahita.t.am...@gmail.com> wrote:
>
>> Hi,
>>
>> I think if you remove --jars, it will work. Like:
>>
>> spark-submit  /usr/hdp/2.5.0.0-1245/spark/l
>> ib/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
>>
>>  I had the same problem before and solved it by removing --jars.
>>
>> Cheers,
>> Anahita
>>
>> On Saturday, February 25, 2017, Raymond Xie 
>> wrote:
>>
>>> I am doing a spark streaming on a hortonworks sandbox and am stuck
>>> here now, can anyone tell me what's wrong with the following code and 
>>> the
>>> exception it causes and how do I fix it? Thank you very much in advance.
>>>
>>> spark-submit --jars /usr/hdp/2.5.0.0-1245/spark/li
>>> b/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
>>>  /usr/hdp/2.5.0.0-1245/kafka/libs/kafka-streams-0.10.0.2.5.0.0-1245.jar
>>> /root/hdp/kafka_wordcount.py 192.168.128.119:2181 test
>>>
>>> Error:
>>> No main class set in JAR; please specify one with --class
>>>
>>>
>>> spark-submit --class /usr/hdp/2.5.0.0-1245/spark/li
>>> b/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
>>>  /usr/hdp/2.5.0.0-1245/kafka/libs/kafka-streams-0.10.0.2.5.0.0-1245.jar
>>> /root/hdp/kafka_wordcount.py 192.168.128.119:2181 test
>>>
>>> Error:
>>> java.lang.ClassNotFoundException: /usr/hdp/2.5.0.0-1245/spark/li
>>> b/spark-assembly-1.6.2.2.5.0.0-1245-hadoop2.7.3.2.5.0.0-1245.jar
>>>
>>> spark-submit --class  /usr/hdp/2.5.0.0-1245/kafka/l
>>> ibs/kafka-streams-0.10.0.2.5.0.0-1245.jar
>>> /usr/hdp/2.5.0.0-1245/spark/lib/spark-assembly-1.6.2.2.5.0.0
>>> -1245-hadoop2.7.3.2.5.0.0-1245.jar  /root/hdp/kafka_wordcount.py