Spark Streaming S3 Error

2016-05-20 Thread Benjamin Kim
I am trying to stream files from an S3 bucket using CDH 5.7.0’s version of 
Spark 1.6.0. It seems not to work. I keep getting this error.

Exception in thread "JobGenerator" java.lang.VerifyError: Bad type on operand 
stack
Exception Details:
  Location:

org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.copy(Ljava/lang/String;Ljava/lang/String;)V
 @155: invokevirtual
  Reason:
Type 'org/jets3t/service/model/S3Object' (current frame, stack[4]) is not 
assignable to 'org/jets3t/service/model/StorageObject'
  Current Frame:
bci: @155
flags: { }
locals: { 'org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore', 
'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object' }
stack: { 'org/jets3t/service/S3Service', 'java/lang/String', 
'java/lang/String', 'java/lang/String', 'org/jets3t/service/model/S3Object', 
integer }
  Bytecode:
0x000: b200 fcb9 0190 0100 9900 39b2 00fc bb01
0x010: 5659 b701 5713 0192 b601 5b2b b601 5b13
0x020: 0194 b601 5b2c b601 5b13 0196 b601 5b2a
0x030: b400 7db6 00e7 b601 5bb6 015e b901 9802
0x040: 002a b400 5799 0030 2ab4 0047 2ab4 007d
0x050: 2b01 0101 01b6 019b 4e2a b400 6b09 949e
0x060: 0016 2db6 019c 2ab4 006b 949e 000a 2a2d
0x070: 2cb6 01a0 b1bb 00a0 592c b700 a14e 2d2a
0x080: b400 73b6 00b0 2ab4 0047 2ab4 007d b600
0x090: e72b 2ab4 007d b600 e72d 03b6 01a4 57a7
0x0a0: 000a 4e2a 2d2b b700 c7b1   
  Exception Handler Table:
bci [0, 116] => handler: 162
bci [117, 159] => handler: 162
  Stackmap Table:
same_frame_extended(@65)
same_frame(@117)
same_locals_1_stack_item_frame(@162,Object[#139])
same_frame(@169)

at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.createDefaultStore(NativeS3FileSystem.java:338)
at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:328)
at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2696)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2733)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2715)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:382)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at 
org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$fs(FileInputDStream.scala:297)
at 
org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:198)
at 
org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:149)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at 
org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
at 
org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
at scala.Option.orElse(Option.scala:257)
at 
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
at 
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
at 

What factors decide the number of executors when doing a Spark SQL insert in Mesos?

2016-05-20 Thread SRK
Hi,

What factors decide the number of executors when doing a Spark SQL insert?
Right now when I submit my job in Mesos I see only 2 executors getting
allocated all the time.

Thanks!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/What-factors-decide-the-number-of-executors-when-doing-a-Spark-SQL-insert-in-Mesos-tp26990.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



set spark 1.6 with Hive 0.14 ?

2016-05-20 Thread kali.tumm...@gmail.com
Hi All , 

Is there a way to ask spark and spark-sql to use Hive 0.14 version instead
of inbuilt hive 1.2.1.

I am testing spark-sql locally by downloading spark 1.6 from internet , I
want to execute my hive queries in spark sql using hive version 0.14 can I
go back to previous version just for a simple test.

Please share out the steps involved.


Thanks
Sri



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/set-spark-1-6-with-Hive-0-14-tp26989.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Memory issues when trying to insert data in the form of ORC using Spark SQL

2016-05-20 Thread swetha kasireddy
Also, the Spark SQL insert seems to take only two tasks per stage. That
might be the reason why it does not have sufficient memory. Is there a way
to increase the number of tasks when doing the sql insert?

Stage IdDescriptionSubmittedDurationTasks: Succeeded/TotalInputOutputShuffle
ReadShuffle Write
12 (kill)
save at
SaveUsersToHdfs.scala:255
+details

2016/05/20 16:32:47 5.0 min
0/2
21.4 MB

On Fri, May 20, 2016 at 3:43 PM, SRK  wrote:

>
> Hi,
>
> I see some memory issues when trying to insert the data in the form of ORC
> using Spark SQL. Please find the query and exception below. Any idea as to
> why this is happening?
>
> sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS records (id STRING,
> record STRING) PARTITIONED BY (datePartition STRING, idPartition STRING)
> stored as ORC LOCATION '/user/users' ")
>   sqlContext.sql("  orc.compress= SNAPPY")
>   sqlContext.sql(
> """ from recordsTemp ps   insert overwrite table users
> partition(datePartition , idPartition )  select ps.id, ps.record ,
> ps.datePartition, ps.idPartition  """.stripMargin)
>
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
> in
> stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage
> 13.0org.apache.hadoop.hive.ql.metadata.HiveException:
> parquet.hadoop.MemoryManager$1: New Memory allocation 1048575 bytes is
> smaller than the minimum allocation size of 1048576 bytes.
> at
>
> org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249)
> at
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.org
> $apache$spark$sql$hive$SparkHiveDynamicPartitionWriterContainer$$newWriter$1(hiveWriterContainers.scala:240)
> at
>
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:249)
> at
>
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:249)
> at
> scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
> at
> scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
> at
>
> org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.getLocalFileWriter(hiveWriterContainers.scala:249)
> at
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:112)
> at
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org
> $apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
> at
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84)
> at
>
> org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: parquet.hadoop.MemoryManager$1: New Memory allocation 1048575
> bytes is smaller than the minimum allocation size of 1048576 bytes.
> at
> parquet.hadoop.MemoryManager.updateAllocation(MemoryManager.java:125)
> at parquet.hadoop.MemoryManager.addWriter(MemoryManager.java:82)
> at
> parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:104)
> at
>
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303)
> at
>
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:267)
> at
>
> org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.(ParquetRecordWriterWrapper.java:65)
> at
>
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:125)
> at
>
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:114)
> at
>
> 

Memory issues when trying to insert data in the form of ORC using Spark SQL

2016-05-20 Thread SRK

Hi,

I see some memory issues when trying to insert the data in the form of ORC
using Spark SQL. Please find the query and exception below. Any idea as to
why this is happening?

sqlContext.sql("  CREATE EXTERNAL TABLE IF NOT EXISTS records (id STRING,
record STRING) PARTITIONED BY (datePartition STRING, idPartition STRING) 
stored as ORC LOCATION '/user/users' ")
  sqlContext.sql("  orc.compress= SNAPPY")
  sqlContext.sql(
""" from recordsTemp ps   insert overwrite table users 
partition(datePartition , idPartition )  select ps.id, ps.record , 
ps.datePartition, ps.idPartition  """.stripMargin)


org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 13.0 failed 4 times, most recent failure: Lost task 0.3 in stage
13.0org.apache.hadoop.hive.ql.metadata.HiveException:
parquet.hadoop.MemoryManager$1: New Memory allocation 1048575 bytes is
smaller than the minimum allocation size of 1048576 bytes.
at
org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:249)
at
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.org$apache$spark$sql$hive$SparkHiveDynamicPartitionWriterContainer$$newWriter$1(hiveWriterContainers.scala:240)
at
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:249)
at
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer$$anonfun$getLocalFileWriter$1.apply(hiveWriterContainers.scala:249)
at
scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:189)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:91)
at
org.apache.spark.sql.hive.SparkHiveDynamicPartitionWriterContainer.getLocalFileWriter(hiveWriterContainers.scala:249)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:112)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:104)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:104)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84)
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:84)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: parquet.hadoop.MemoryManager$1: New Memory allocation 1048575
bytes is smaller than the minimum allocation size of 1048576 bytes.
at parquet.hadoop.MemoryManager.updateAllocation(MemoryManager.java:125)
at parquet.hadoop.MemoryManager.addWriter(MemoryManager.java:82)
at 
parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:104)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303)
at
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:267)
at
org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.(ParquetRecordWriterWrapper.java:65)
at
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:125)
at
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:114)
at
org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:261)
at
org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:246)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Memory-issues-when-trying-to-insert-data-in-the-form-of-ORC-using-Spark-SQL-tp26988.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Can not set spark dynamic resource allocation

2016-05-20 Thread David Newberger
Hi All,

The error you are seeing looks really similar to Spark-13514 to me. I could be 
wrong though

https://issues.apache.org/jira/browse/SPARK-13514

Can you check yarn.nodemanager.local-dirs  in your YARN configuration for 
"file://"


Cheers!
David Newberger

-Original Message-
From: Cui, Weifeng [mailto:weife...@a9.com] 
Sent: Friday, May 20, 2016 4:26 PM
To: Marcelo Vanzin
Cc: Ted Yu; Rodrick Brown; user; Zhao, Jun; Aulakh, Sahib; Song, Yiwei
Subject: Re: Can not set spark dynamic resource allocation

Sorry, here is the node-manager log. application_1463692924309_0002 is my test. 
Hope this will help.
http://pastebin.com/0BPEcgcW



On 5/20/16, 2:09 PM, "Marcelo Vanzin"  wrote:

>Hi Weifeng,
>
>That's the Spark event log, not the YARN application log. You get the 
>latter using the "yarn logs" command.
>
>On Fri, May 20, 2016 at 1:14 PM, Cui, Weifeng  wrote:
>> Here is the application log for this spark job.
>>
>> http://pastebin.com/2UJS9L4e
>>
>>
>>
>> Thanks,
>> Weifeng
>>
>>
>>
>>
>>
>> From: "Aulakh, Sahib" 
>> Date: Friday, May 20, 2016 at 12:43 PM
>> To: Ted Yu 
>> Cc: Rodrick Brown , Cui Weifeng 
>> , user , "Zhao, Jun"
>> 
>> Subject: Re: Can not set spark dynamic resource allocation
>>
>>
>>
>> Yes it is yarn. We have configured spark shuffle service w yarn node 
>> manager but something must be off.
>>
>>
>>
>> We will send u app log on paste bin.
>>
>> Sent from my iPhone
>>
>>
>> On May 20, 2016, at 12:35 PM, Ted Yu  wrote:
>>
>> Since yarn-site.xml was cited, I assume the cluster runs YARN.
>>
>>
>>
>> On Fri, May 20, 2016 at 12:30 PM, Rodrick Brown 
>>  wrote:
>>
>> Is this Yarn or Mesos? For the later you need to start an external 
>> shuffle service.
>>
>> Get Outlook for iOS
>>
>>
>>
>>
>>
>> On Fri, May 20, 2016 at 11:48 AM -0700, "Cui, Weifeng" 
>> 
>> wrote:
>>
>> Hi guys,
>>
>>
>>
>> Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set 
>> dynamic resource allocation for spark and we followed the following 
>> link. After the changes, all spark jobs failed.
>>
>> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-reso
>> urce-allocation
>>
>> This test was on a test cluster which has 1 master machine (running 
>> namenode, resourcemanager and hive server), 1 worker machine (running 
>> datanode and nodemanager) and 1 machine as client( running spark shell).
>>
>>
>>
>> What I updated in config :
>>
>>
>>
>> 1. Update in spark-defaults.conf
>>
>> spark.dynamicAllocation.enabled true
>>
>> spark.shuffle.service.enabledtrue
>>
>>
>>
>> 2. Update yarn-site.xml
>>
>> 
>>
>>  yarn.nodemanager.aux-services
>>   mapreduce_shuffle,spark_shuffle
>> 
>>
>> 
>> yarn.nodemanager.aux-services.spark_shuffle.class
>> org.apache.spark.network.yarn.YarnShuffleService
>> 
>>
>> 
>> spark.shuffle.service.enabled
>>  true
>> 
>>
>> 3. Copy  spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath
>> ($HADOOP_HOME/share/hadoop/yarn/*) in python code
>>
>> 4. Restart namenode, datanode, resourcemanager, nodemanger... retart 
>> everything
>>
>> 5. The config will update in all machines, resourcemanager and nodemanager.
>> We update the config in one place and copy to all machines.
>>
>>
>>
>> What I tested:
>>
>>
>>
>> 1. I started a scala spark shell and check its environment variables, 
>> spark.dynamicAllocation.enabled is true.
>>
>> 2. I used the following code:
>>
>> scala > val line =
>> sc.textFile("/spark-events/application_1463681113470_0006")
>>
>> line: org.apache.spark.rdd.RDD[String] =
>> /spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at 
>> textFile at :27
>>
>> scala > line.count # This command just stuck here
>>
>>
>>
>> 3. In the beginning, there is only 1 executor(this is for driver) and 
>> after line.count, I could see 3 executors, then dropped to 1.
>>
>> 4. Several jobs were launched and all of them failed.   Tasks (for all
>> stages): Succeeded/Total : 0/2 (4 failed)
>>
>>
>>
>> Error messages:
>>
>>
>>
>> I found the following messages in spark web UI. I found this in 
>> spark.log on nodemanager machine as well.
>>
>>
>>
>> ExecutorLostFailure (executor 1 exited caused by one of the running 
>> tasks)
>> Reason: Container marked as failed: 
>> container_1463692924309_0002_01_02
>> on host: xxx.com. Exit status: 1. Diagnostics: Exception 
>> from container-launch.
>> Container id: container_1463692924309_0002_01_02
>> Exit code: 1
>> Stack trace: ExitCodeException exitCode=1:
>> at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
>> at org.apache.hadoop.util.Shell.run(Shell.java:455)

Re: Can not set spark dynamic resource allocation

2016-05-20 Thread Cui, Weifeng
Sorry, here is the node-manager log. application_1463692924309_0002 is my test. 
Hope this will help.
http://pastebin.com/0BPEcgcW



On 5/20/16, 2:09 PM, "Marcelo Vanzin"  wrote:

>Hi Weifeng,
>
>That's the Spark event log, not the YARN application log. You get the
>latter using the "yarn logs" command.
>
>On Fri, May 20, 2016 at 1:14 PM, Cui, Weifeng  wrote:
>> Here is the application log for this spark job.
>>
>> http://pastebin.com/2UJS9L4e
>>
>>
>>
>> Thanks,
>> Weifeng
>>
>>
>>
>>
>>
>> From: "Aulakh, Sahib" 
>> Date: Friday, May 20, 2016 at 12:43 PM
>> To: Ted Yu 
>> Cc: Rodrick Brown , Cui Weifeng
>> , user , "Zhao, Jun"
>> 
>> Subject: Re: Can not set spark dynamic resource allocation
>>
>>
>>
>> Yes it is yarn. We have configured spark shuffle service w yarn node manager
>> but something must be off.
>>
>>
>>
>> We will send u app log on paste bin.
>>
>> Sent from my iPhone
>>
>>
>> On May 20, 2016, at 12:35 PM, Ted Yu  wrote:
>>
>> Since yarn-site.xml was cited, I assume the cluster runs YARN.
>>
>>
>>
>> On Fri, May 20, 2016 at 12:30 PM, Rodrick Brown
>>  wrote:
>>
>> Is this Yarn or Mesos? For the later you need to start an external shuffle
>> service.
>>
>> Get Outlook for iOS
>>
>>
>>
>>
>>
>> On Fri, May 20, 2016 at 11:48 AM -0700, "Cui, Weifeng" 
>> wrote:
>>
>> Hi guys,
>>
>>
>>
>> Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set dynamic
>> resource allocation for spark and we followed the following link. After the
>> changes, all spark jobs failed.
>>
>> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>>
>> This test was on a test cluster which has 1 master machine (running
>> namenode, resourcemanager and hive server), 1 worker machine (running
>> datanode and nodemanager) and 1 machine as client( running spark shell).
>>
>>
>>
>> What I updated in config :
>>
>>
>>
>> 1. Update in spark-defaults.conf
>>
>> spark.dynamicAllocation.enabled true
>>
>> spark.shuffle.service.enabledtrue
>>
>>
>>
>> 2. Update yarn-site.xml
>>
>> 
>>
>>  yarn.nodemanager.aux-services
>>   mapreduce_shuffle,spark_shuffle
>> 
>>
>> 
>> yarn.nodemanager.aux-services.spark_shuffle.class
>> org.apache.spark.network.yarn.YarnShuffleService
>> 
>>
>> 
>> spark.shuffle.service.enabled
>>  true
>> 
>>
>> 3. Copy  spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath
>> ($HADOOP_HOME/share/hadoop/yarn/*) in python code
>>
>> 4. Restart namenode, datanode, resourcemanager, nodemanger... retart
>> everything
>>
>> 5. The config will update in all machines, resourcemanager and nodemanager.
>> We update the config in one place and copy to all machines.
>>
>>
>>
>> What I tested:
>>
>>
>>
>> 1. I started a scala spark shell and check its environment variables,
>> spark.dynamicAllocation.enabled is true.
>>
>> 2. I used the following code:
>>
>> scala > val line =
>> sc.textFile("/spark-events/application_1463681113470_0006")
>>
>> line: org.apache.spark.rdd.RDD[String] =
>> /spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at textFile
>> at :27
>>
>> scala > line.count # This command just stuck here
>>
>>
>>
>> 3. In the beginning, there is only 1 executor(this is for driver) and after
>> line.count, I could see 3 executors, then dropped to 1.
>>
>> 4. Several jobs were launched and all of them failed.   Tasks (for all
>> stages): Succeeded/Total : 0/2 (4 failed)
>>
>>
>>
>> Error messages:
>>
>>
>>
>> I found the following messages in spark web UI. I found this in spark.log on
>> nodemanager machine as well.
>>
>>
>>
>> ExecutorLostFailure (executor 1 exited caused by one of the running tasks)
>> Reason: Container marked as failed: container_1463692924309_0002_01_02
>> on host: xxx.com. Exit status: 1. Diagnostics: Exception from
>> container-launch.
>> Container id: container_1463692924309_0002_01_02
>> Exit code: 1
>> Stack trace: ExitCodeException exitCode=1:
>> at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
>> at org.apache.hadoop.util.Shell.run(Shell.java:455)
>> at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
>> at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> 

Re: Can not set spark dynamic resource allocation

2016-05-20 Thread Marcelo Vanzin
Hi Weifeng,

That's the Spark event log, not the YARN application log. You get the
latter using the "yarn logs" command.

On Fri, May 20, 2016 at 1:14 PM, Cui, Weifeng  wrote:
> Here is the application log for this spark job.
>
> http://pastebin.com/2UJS9L4e
>
>
>
> Thanks,
> Weifeng
>
>
>
>
>
> From: "Aulakh, Sahib" 
> Date: Friday, May 20, 2016 at 12:43 PM
> To: Ted Yu 
> Cc: Rodrick Brown , Cui Weifeng
> , user , "Zhao, Jun"
> 
> Subject: Re: Can not set spark dynamic resource allocation
>
>
>
> Yes it is yarn. We have configured spark shuffle service w yarn node manager
> but something must be off.
>
>
>
> We will send u app log on paste bin.
>
> Sent from my iPhone
>
>
> On May 20, 2016, at 12:35 PM, Ted Yu  wrote:
>
> Since yarn-site.xml was cited, I assume the cluster runs YARN.
>
>
>
> On Fri, May 20, 2016 at 12:30 PM, Rodrick Brown
>  wrote:
>
> Is this Yarn or Mesos? For the later you need to start an external shuffle
> service.
>
> Get Outlook for iOS
>
>
>
>
>
> On Fri, May 20, 2016 at 11:48 AM -0700, "Cui, Weifeng" 
> wrote:
>
> Hi guys,
>
>
>
> Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set dynamic
> resource allocation for spark and we followed the following link. After the
> changes, all spark jobs failed.
>
> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
> This test was on a test cluster which has 1 master machine (running
> namenode, resourcemanager and hive server), 1 worker machine (running
> datanode and nodemanager) and 1 machine as client( running spark shell).
>
>
>
> What I updated in config :
>
>
>
> 1. Update in spark-defaults.conf
>
> spark.dynamicAllocation.enabled true
>
> spark.shuffle.service.enabledtrue
>
>
>
> 2. Update yarn-site.xml
>
> 
>
>  yarn.nodemanager.aux-services
>   mapreduce_shuffle,spark_shuffle
> 
>
> 
> yarn.nodemanager.aux-services.spark_shuffle.class
> org.apache.spark.network.yarn.YarnShuffleService
> 
>
> 
> spark.shuffle.service.enabled
>  true
> 
>
> 3. Copy  spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath
> ($HADOOP_HOME/share/hadoop/yarn/*) in python code
>
> 4. Restart namenode, datanode, resourcemanager, nodemanger... retart
> everything
>
> 5. The config will update in all machines, resourcemanager and nodemanager.
> We update the config in one place and copy to all machines.
>
>
>
> What I tested:
>
>
>
> 1. I started a scala spark shell and check its environment variables,
> spark.dynamicAllocation.enabled is true.
>
> 2. I used the following code:
>
> scala > val line =
> sc.textFile("/spark-events/application_1463681113470_0006")
>
> line: org.apache.spark.rdd.RDD[String] =
> /spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at textFile
> at :27
>
> scala > line.count # This command just stuck here
>
>
>
> 3. In the beginning, there is only 1 executor(this is for driver) and after
> line.count, I could see 3 executors, then dropped to 1.
>
> 4. Several jobs were launched and all of them failed.   Tasks (for all
> stages): Succeeded/Total : 0/2 (4 failed)
>
>
>
> Error messages:
>
>
>
> I found the following messages in spark web UI. I found this in spark.log on
> nodemanager machine as well.
>
>
>
> ExecutorLostFailure (executor 1 exited caused by one of the running tasks)
> Reason: Container marked as failed: container_1463692924309_0002_01_02
> on host: xxx.com. Exit status: 1. Diagnostics: Exception from
> container-launch.
> Container id: container_1463692924309_0002_01_02
> Exit code: 1
> Stack trace: ExitCodeException exitCode=1:
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
> at org.apache.hadoop.util.Shell.run(Shell.java:455)
> at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> Container exited with a non-zero exit code 1
>
>
>
> Thanks a lot for help. We can provide more information if needed.
>
>
>
> Thanks,
> Weifeng
>
>
>
>
>
>
>
>
>
>
>
>
>
> NOTICE TO 

Logstash to collect Spark logs

2016-05-20 Thread Ashish Kumar Singh
We are trying to collect Spark logs using logstash  for parsing app logs
and collecting useful info.

We can read the Nodemanager logs but unable to read Spark application logs
using Logstash .

Current Setup for Spark logs and Logstash
1-  Spark runs on Yarn .
2-  Using log4j socketAppenders to write logs to tcp port .
3- Below lines added in log4j.properties of Yarn and Spark conf:

main.logger=RFA,SA
 log4j.appender.SA=org.apache.log4j.net.SocketAppender
log4j.appender.SA.Port=4560
log4j.appender.SA.RemoteHost=${hostname}
log4j.appender.SA.ReconnectionDelay=1
log4j.appender.SA.Application=NM-${user.dir}

4-Logstash input
  input {
  log4j {
mode => "server"
host => "0.0.0.0"
port => 4560
type => "log4j"
  }
}


Any help on reading Spark logs via Logstash will be appreciated  .
Also, is there a better way to collect Spark logs via Logstash ?


Re: Dataset API and avro type

2016-05-20 Thread Michael Armbrust
What is the error?  I would definitely expect it to work with kryo at least.

On Fri, May 20, 2016 at 2:37 AM, Han JU  wrote:

> Hello,
>
> I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0. However
> it does not seems to work with Avro data types:
>
>
> object Datasets extends App {
>   val conf = new SparkConf()
>   conf.setAppName("Dataset")
>   conf.setMaster("local[2]")
>   conf.setIfMissing("spark.serializer", classOf[KryoSerializer].getName)
>   conf.setIfMissing("spark.kryo.registrator",
> classOf[DatasetKryoRegistrator].getName)
>
>   val sc = new SparkContext(conf)
>   val sql = new SQLContext(sc)
>   import sql.implicits._
>
>   implicit val encoder = Encoders.kryo[MyAvroType]
>   val data = sql.read.parquet("path/to/data").as[MyAvroType]
>
>   var c = 0
>   // BUG here
>   val sizes = data.mapPartitions { iter =>
> List(iter.size).iterator
>   }.collect().toList
>
>   println(c)
> }
>
>
> class DatasetKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
> kryo.register(
>   classOf[MyAvroType],
> AvroSerializer.SpecificRecordBinarySerializer[MyAvroType])
>   }
> }
>
>
> I'm using chill-avro's kryo servirilizer for avro types and I've tried
> `Encoders.kyro` as well as `bean` or `javaSerialization`, but none of them
> works. The errors seems to be that the generated code does not compile with
> janino.
>
> Tested in 1.6.1 and the 2.0.0-preview. Any idea?
>
> --
> *JU Han*
>
> Software Engineer @ Teads.tv
>
> +33 061960
>


Re: Wide Datasets (v1.6.1)

2016-05-20 Thread Michael Armbrust
>
> I can provide an example/open a Jira if there is a chance this will be
> fixed.
>

Please do!  Ping me on it.

Michael


Re: Can not set spark dynamic resource allocation

2016-05-20 Thread Cui, Weifeng
Here is the application log for this spark job.
http://pastebin.com/2UJS9L4e

Thanks,
Weifeng


From: "Aulakh, Sahib" 
Date: Friday, May 20, 2016 at 12:43 PM
To: Ted Yu 
Cc: Rodrick Brown , Cui Weifeng , 
user , "Zhao, Jun" 
Subject: Re: Can not set spark dynamic resource allocation

Yes it is yarn. We have configured spark shuffle service w yarn node manager 
but something must be off.

We will send u app log on paste bin.

Sent from my iPhone

On May 20, 2016, at 12:35 PM, Ted Yu 
> wrote:
Since yarn-site.xml was cited, I assume the cluster runs YARN.

On Fri, May 20, 2016 at 12:30 PM, Rodrick Brown 
> wrote:
Is this Yarn or Mesos? For the later you need to start an external shuffle 
service.
Get Outlook for iOS



On Fri, May 20, 2016 at 11:48 AM -0700, "Cui, Weifeng" 
> wrote:

Hi guys,



Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set dynamic 
resource allocation for spark and we followed the following link. After the 
changes, all spark jobs failed.
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

This test was on a test cluster which has 1 master machine (running namenode, 
resourcemanager and hive server), 1 worker machine (running datanode and 
nodemanager) and 1 machine as client( running spark shell).



What I updated in config :



1. Update in spark-defaults.conf

spark.dynamicAllocation.enabled true
spark.shuffle.service.enabledtrue



2. Update yarn-site.xml


 yarn.nodemanager.aux-services
  mapreduce_shuffle,spark_shuffle



yarn.nodemanager.aux-services.spark_shuffle.class
org.apache.spark.network.yarn.YarnShuffleService



spark.shuffle.service.enabled
 true


3. Copy  spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath 
($HADOOP_HOME/share/hadoop/yarn/*) in python code

4. Restart namenode, datanode, resourcemanager, nodemanger... retart everything

5. The config will update in all machines, resourcemanager and nodemanager. We 
update the config in one place and copy to all machines.



What I tested:



1. I started a scala spark shell and check its environment variables, 
spark.dynamicAllocation.enabled is true.

2. I used the following code:

scala > val line = 
sc.textFile("/spark-events/application_1463681113470_0006")

line: org.apache.spark.rdd.RDD[String] = 
/spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at textFile at 
:27

scala > line.count # This command just stuck here



3. In the beginning, there is only 1 executor(this is for driver) and after 
line.count, I could see 3 executors, then dropped to 1.

4. Several jobs were launched and all of them failed.   Tasks (for all stages): 
Succeeded/Total : 0/2 (4 failed)



Error messages:



I found the following messages in spark web UI. I found this in spark.log on 
nodemanager machine as well.


ExecutorLostFailure (executor 1 exited caused by one of the running tasks) 
Reason: Container marked as failed: container_1463692924309_0002_01_02 on 
host: xxx.com. Exit status: 1. 
Diagnostics: Exception from container-launch.
Container id: container_1463692924309_0002_01_02
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1



Thanks a lot for help. We can provide more information if needed.



Thanks,
Weifeng











NOTICE TO RECIPIENTS: This communication is confidential and intended for the 
use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does 

Re: Can not set spark dynamic resource allocation

2016-05-20 Thread Aulakh, Sahib
Yes it is yarn. We have configured spark shuffle service w yarn node manager 
but something must be off.

We will send u app log on paste bin.

Sent from my iPhone

On May 20, 2016, at 12:35 PM, Ted Yu 
> wrote:

Since yarn-site.xml was cited, I assume the cluster runs YARN.

On Fri, May 20, 2016 at 12:30 PM, Rodrick Brown 
> wrote:
Is this Yarn or Mesos? For the later you need to start an external shuffle 
service.

Get Outlook for iOS




On Fri, May 20, 2016 at 11:48 AM -0700, "Cui, Weifeng" 
> wrote:


Hi guys,



Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set dynamic 
resource allocation for spark and we followed the following link. After the 
changes, all spark jobs failed.
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

This test was on a test cluster which has 1 master machine (running namenode, 
resourcemanager and hive server), 1 worker machine (running datanode and 
nodemanager) and 1 machine as client( running spark shell).



What I updated in config :



1. Update in spark-defaults.conf

spark.dynamicAllocation.enabled true
spark.shuffle.service.enabledtrue



2. Update yarn-site.xml


 yarn.nodemanager.aux-services
  mapreduce_shuffle,spark_shuffle



yarn.nodemanager.aux-services.spark_shuffle.class
org.apache.spark.network.yarn.YarnShuffleService



spark.shuffle.service.enabled
 true


3. Copy  spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath 
($HADOOP_HOME/share/hadoop/yarn/*) in python code

4. Restart namenode, datanode, resourcemanager, nodemanger... retart everything

5. The config will update in all machines, resourcemanager and nodemanager. We 
update the config in one place and copy to all machines.



What I tested:



1. I started a scala spark shell and check its environment variables, 
spark.dynamicAllocation.enabled is true.

2. I used the following code:

scala > val line = 
sc.textFile("/spark-events/application_1463681113470_0006")

line: org.apache.spark.rdd.RDD[String] = 
/spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at textFile at 
:27

scala > line.count # This command just stuck here



3. In the beginning, there is only 1 executor(this is for driver) and after 
line.count, I could see 3 executors, then dropped to 1.

4. Several jobs were launched and all of them failed.   Tasks (for all stages): 
Succeeded/Total : 0/2 (4 failed)



Error messages:



I found the following messages in spark web UI. I found this in spark.log on 
nodemanager machine as well.


ExecutorLostFailure (executor 1 exited caused by one of the running tasks) 
Reason: Container marked as failed: container_1463692924309_0002_01_02 on 
host: xxx.com. Exit status: 1. 
Diagnostics: Exception from container-launch.
Container id: container_1463692924309_0002_01_02
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1



Thanks a lot for help. We can provide more information if needed.



Thanks,
Weifeng











NOTICE TO RECIPIENTS: This communication is confidential and intended for the 
use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an offer to 
sell or a solicitation of an indication of interest to purchase any loan, 
security or any other financial product or instrument, nor is it an offer to 
sell or a solicitation of an indication of interest to purchase any products or 
services to any persons who are prohibited from receiving such information 
under applicable law. The contents of this communication may not 

Re: Can not set spark dynamic resource allocation

2016-05-20 Thread Ted Yu
Since yarn-site.xml was cited, I assume the cluster runs YARN.

On Fri, May 20, 2016 at 12:30 PM, Rodrick Brown  wrote:

> Is this Yarn or Mesos? For the later you need to start an external shuffle
> service.
>
> Get Outlook for iOS 
>
>
>
>
> On Fri, May 20, 2016 at 11:48 AM -0700, "Cui, Weifeng" 
> wrote:
>
> Hi guys,
>>
>>
>>
>> Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set
>> dynamic resource allocation for spark and we followed the following link.
>> After the changes, all spark jobs failed.
>>
>>
>> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>>
>> This test was on a test cluster which has 1 master machine (running
>> namenode, resourcemanager and hive server), 1 worker machine (running
>> datanode and nodemanager) and 1 machine as client( running spark shell).
>>
>>
>>
>> *What I updated in config :*
>>
>>
>>
>> 1. Update in spark-defaults.conf
>>
>> spark.dynamicAllocation.enabled true
>>
>> spark.shuffle.service.enabledtrue
>>
>>
>>
>> 2. Update yarn-site.xml
>>
>> 
>>
>>  yarn.nodemanager.aux-services
>>   mapreduce_shuffle,*spark_shuffle*
>> 
>>
>> 
>> yarn.nodemanager.aux-services.spark_shuffle.class
>>
>> org.apache.spark.network.yarn.YarnShuffleService
>> 
>>
>> 
>> spark.shuffle.service.enabled
>>  true
>> 
>>
>> 3. Copy  spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath
>> ($HADOOP_HOME/share/hadoop/yarn/*) in python code
>>
>> 4. Restart namenode, datanode, resourcemanager, nodemanger...
>> retart everything
>>
>> 5. The config will update in all machines, resourcemanager
>> and nodemanager. We update the config in one place and copy to all machines.
>>
>>
>>
>> *What I tested:*
>>
>>
>>
>> 1. I started a scala spark shell and check its environment variables,
>> spark.dynamicAllocation.enabled is true.
>>
>> 2. I used the following code:
>>
>> scala > val line =
>> sc.textFile("/spark-events/application_1463681113470_0006")
>>
>> line: org.apache.spark.rdd.RDD[String] =
>> /spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at
>> textFile at :27
>>
>> scala > line.count # This command just stuck here
>>
>>
>>
>> 3. In the beginning, there is only 1 executor(this is for driver) and
>> after line.count, I could see 3 executors, then dropped to 1.
>>
>> 4. Several jobs were launched and all of them failed.   Tasks (for all
>> stages): Succeeded/Total : 0/2 (4 failed)
>>
>>
>>
>> *Error messages:*
>>
>>
>>
>> I found the following messages in spark web UI. I found this in spark.log
>> on nodemanager machine as well.
>>
>>
>>
>> *ExecutorLostFailure (executor 1 exited caused by one of the running
>> tasks) Reason: Container marked as failed:
>> container_1463692924309_0002_01_02 on host: xxx.com
>> . Exit status: 1. Diagnostics: Exception from
>> container-launch.*
>> *Container id: container_1463692924309_0002_01_02*
>> *Exit code: 1*
>> *Stack trace: ExitCodeException exitCode=1: *
>> *at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)*
>> *at org.apache.hadoop.util.Shell.run(Shell.java:455)*
>> *at
>> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)*
>> *at
>> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)*
>> *at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)*
>> *at
>> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)*
>> *at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
>> *at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
>> *at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
>> *at java.lang.Thread.run(Thread.java:745)*
>>
>> *Container exited with a non-zero exit code 1*
>>
>>
>>
>> Thanks a lot for help. We can provide more information if needed.
>>
>>
>>
>> Thanks,
>> Weifeng
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>
> *NOTICE TO RECIPIENTS*: This communication is confidential and intended
> for the use of the addressee only. If you are not an intended recipient of
> this communication, please delete it immediately and notify the sender by
> return email. Unauthorized reading, dissemination, distribution or copying
> of this communication is prohibited. This communication does not constitute
> an offer to sell or a solicitation of an indication of interest to purchase
> any loan, security or any other financial product or instrument, nor is it
> an offer to sell or a solicitation of an indication of interest to purchase
> any products or services to any persons who are prohibited from 

Re: Can not set spark dynamic resource allocation

2016-05-20 Thread Rodrick Brown
Is this Yarn or Mesos? For the later you need to start an external shuffle 
service. 

Get Outlook for iOS




On Fri, May 20, 2016 at 11:48 AM -0700, "Cui, Weifeng"  wrote:






















Hi guys,


 


Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set dynamic 
resource allocation for spark and we followed the following link. After the 
changes, all spark jobs failed.



https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation


This test was on a test cluster which has 1 master machine (running namenode, 
resourcemanager and hive server), 1 worker machine (running datanode and 
nodemanager) and 1 machine as client(
 running spark shell).


 


What I updated in config :


 


1. Update in spark-defaults.conf


spark.dynamicAllocation.enabled true


spark.shuffle.service.enabled    true



 


2. Update yarn-site.xml





 yarn.nodemanager.aux-services

  mapreduce_shuffle,spark_shuffle







yarn.nodemanager.aux-services.spark_shuffle.class

org.apache.spark.network.yarn.YarnShuffleService







spark.shuffle.service.enabled

 true

 


3. Copy  spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath 
($HADOOP_HOME/share/hadoop/yarn/*) in python code


4. Restart namenode, datanode, resourcemanager, nodemanger... retart everything


5. The config will update in all machines, resourcemanager and nodemanager. We 
update the config in one place and copy to all machines.


 


What I tested:


 


1. I started a scala spark shell and check its environment variables, 
spark.dynamicAllocation.enabled is true.


2. I used the following code:


scala > val line = 
sc.textFile("/spark-events/application_1463681113470_0006")


                    line: org.apache.spark.rdd.RDD[String] = 
/spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at textFile at 
:27


    scala > line.count # This command just stuck here





3. In the beginning, there is only 1 executor(this is for driver) and after 
line.count, I could see 3 executors, then dropped to 1.



4. Several jobs were launched and all of them failed.   Tasks (for all stages): 
Succeeded/Total : 0/2 (4 failed)  


 


Error messages:


 


I found the following messages in spark web UI. I found this in spark.log on 
nodemanager machine as well.


 


ExecutorLostFailure (executor 1 exited caused by one of the running tasks) 
Reason: Container marked as failed: container_1463692924309_0002_01_02 on 
host:
 xxx.com. Exit status: 1. Diagnostics: Exception from 
container-launch.

Container id: container_1463692924309_0002_01_02

Exit code: 1

Stack trace: ExitCodeException exitCode=1: 


at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)

at org.apache.hadoop.util.Shell.run(Shell.java:455)

at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)

at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)

at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)

at java.util.concurrent.FutureTask.run(FutureTask.java:266)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)



Container exited with a non-zero exit code 1



 


Thanks a lot for help. We can provide more information if needed.


 


Thanks,

Weifeng


 


 


 


 


 









-- 
*NOTICE TO RECIPIENTS*: This communication is confidential and intended for 
the use of the addressee only. If you are not an intended recipient of this 
communication, please delete it immediately and notify the sender by return 
email. Unauthorized reading, dissemination, distribution or copying of this 
communication is prohibited. This communication does not constitute an 
offer to sell or a solicitation of an indication of interest to purchase 
any loan, security or any other financial product or instrument, nor is it 
an offer to sell or a solicitation of an indication of interest to purchase 
any products or services to any persons who are prohibited from receiving 
such information under applicable law. The contents of this communication 
may not be accurate or complete and are subject to change without notice. 
As such, Orchard App, Inc. (including its subsidiaries and affiliates, 
"Orchard") makes no representation regarding the accuracy or completeness 
of the information contained herein. The intended recipient is advised to 
consult its own professional advisors, including those specializing in 
legal, 

Re: Can not set spark dynamic resource allocation

2016-05-20 Thread Ted Yu
Can you retrieve the log for application_1463681113470_0006 and pastebin it
?

Thanks

On Fri, May 20, 2016 at 11:48 AM, Cui, Weifeng  wrote:

> Hi guys,
>
>
>
> Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set
> dynamic resource allocation for spark and we followed the following link.
> After the changes, all spark jobs failed.
>
>
> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
> This test was on a test cluster which has 1 master machine (running
> namenode, resourcemanager and hive server), 1 worker machine (running
> datanode and nodemanager) and 1 machine as client( running spark shell).
>
>
>
> *What I updated in config :*
>
>
>
> 1. Update in spark-defaults.conf
>
> spark.dynamicAllocation.enabled true
>
> spark.shuffle.service.enabledtrue
>
>
>
> 2. Update yarn-site.xml
>
> 
>
>  yarn.nodemanager.aux-services
>   mapreduce_shuffle,*spark_shuffle*
> 
>
> 
> yarn.nodemanager.aux-services.spark_shuffle.class
> org.apache.spark.network.yarn.YarnShuffleService
> 
>
> 
> spark.shuffle.service.enabled
>  true
> 
>
> 3. Copy  spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath
> ($HADOOP_HOME/share/hadoop/yarn/*) in python code
>
> 4. Restart namenode, datanode, resourcemanager, nodemanger...
> retart everything
>
> 5. The config will update in all machines, resourcemanager
> and nodemanager. We update the config in one place and copy to all machines.
>
>
>
> *What I tested:*
>
>
>
> 1. I started a scala spark shell and check its environment variables,
> spark.dynamicAllocation.enabled is true.
>
> 2. I used the following code:
>
> scala > val line =
> sc.textFile("/spark-events/application_1463681113470_0006")
>
> line: org.apache.spark.rdd.RDD[String] =
> /spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at
> textFile at :27
>
> scala > line.count # This command just stuck here
>
>
>
> 3. In the beginning, there is only 1 executor(this is for driver) and
> after line.count, I could see 3 executors, then dropped to 1.
>
> 4. Several jobs were launched and all of them failed.   Tasks (for all
> stages): Succeeded/Total : 0/2 (4 failed)
>
>
>
> *Error messages:*
>
>
>
> I found the following messages in spark web UI. I found this in spark.log
> on nodemanager machine as well.
>
>
>
> *ExecutorLostFailure (executor 1 exited caused by one of the running
> tasks) Reason: Container marked as failed:
> container_1463692924309_0002_01_02 on host: xxx.com
> . Exit status: 1. Diagnostics: Exception from
> container-launch.*
> *Container id: container_1463692924309_0002_01_02*
> *Exit code: 1*
> *Stack trace: ExitCodeException exitCode=1: *
> *at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)*
> *at org.apache.hadoop.util.Shell.run(Shell.java:455)*
> *at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)*
> *at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)*
> *at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)*
> *at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)*
> *at java.util.concurrent.FutureTask.run(FutureTask.java:266)*
> *at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)*
> *at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)*
> *at java.lang.Thread.run(Thread.java:745)*
>
> *Container exited with a non-zero exit code 1*
>
>
>
> Thanks a lot for help. We can provide more information if needed.
>
>
>
> Thanks,
> Weifeng
>
>
>
>
>
>
>
>
>
>
>


Can not set spark dynamic resource allocation

2016-05-20 Thread Cui, Weifeng
Hi guys,



Our team has a hadoop 2.6.0 cluster with Spark 1.6.1. We want to set dynamic 
resource allocation for spark and we followed the following link. After the 
changes, all spark jobs failed.
https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation

This test was on a test cluster which has 1 master machine (running namenode, 
resourcemanager and hive server), 1 worker machine (running datanode and 
nodemanager) and 1 machine as client( running spark shell).



What I updated in config :



1. Update in spark-defaults.conf

spark.dynamicAllocation.enabled true
spark.shuffle.service.enabledtrue



2. Update yarn-site.xml


 yarn.nodemanager.aux-services
  mapreduce_shuffle,spark_shuffle



yarn.nodemanager.aux-services.spark_shuffle.class
org.apache.spark.network.yarn.YarnShuffleService



spark.shuffle.service.enabled
 true


3. Copy  spark-1.6.1-yarn-shuffle.jar to yarn.application.classpath 
($HADOOP_HOME/share/hadoop/yarn/*) in python code

4. Restart namenode, datanode, resourcemanager, nodemanger... retart everything

5. The config will update in all machines, resourcemanager and nodemanager. We 
update the config in one place and copy to all machines.



What I tested:



1. I started a scala spark shell and check its environment variables, 
spark.dynamicAllocation.enabled is true.

2. I used the following code:

scala > val line = 
sc.textFile("/spark-events/application_1463681113470_0006")

line: org.apache.spark.rdd.RDD[String] = 
/spark-events/application_1463681113470_0006 MapPartitionsRDD[1] at textFile at 
:27

scala > line.count # This command just stuck here



3. In the beginning, there is only 1 executor(this is for driver) and after 
line.count, I could see 3 executors, then dropped to 1.

4. Several jobs were launched and all of them failed.   Tasks (for all stages): 
Succeeded/Total : 0/2 (4 failed)



Error messages:



I found the following messages in spark web UI. I found this in spark.log on 
nodemanager machine as well.


ExecutorLostFailure (executor 1 exited caused by one of the running tasks) 
Reason: Container marked as failed: container_1463692924309_0002_01_02 on 
host: xxx.com. Exit status: 1. Diagnostics: Exception from 
container-launch.
Container id: container_1463692924309_0002_01_02
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:538)
at org.apache.hadoop.util.Shell.run(Shell.java:455)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
at 
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1



Thanks a lot for help. We can provide more information if needed.



Thanks,
Weifeng












Re: rpc.RpcTimeoutException: Futures timed out after [120 seconds]

2016-05-20 Thread Mail.com
Yes.

Sent from my iPhone

> On May 20, 2016, at 10:11 AM, Sahil Sareen  wrote:
> 
> I'm not sure if this happens on small files or big ones as I have a mix of 
> them always.
> Did you see this only for big files?
> 
>> On Fri, May 20, 2016 at 7:36 PM, Mail.com  wrote:
>> Hi Sahil,
>> 
>> I have seen this with high GC time. Do you ever get this error with small 
>> volume files
>> 
>> Pradeep
>> 
>>> On May 20, 2016, at 9:32 AM, Sahil Sareen  wrote:
>>> 
>>> Hey all
>>> 
>>> I'm using Spark-1.6.1 and occasionally seeing executors lost and hurting my 
>>> application performance due to these errors.
>>> Can someone please let out all the possible problems that could cause this?
>>> 
>>> 
>>> Full log:
>>> 
>>> 16/05/19 02:17:54 ERROR ContextCleaner: Error cleaning broadcast 266685
>>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 
>>> seconds]. This timeout is controlled by spark.rpc.askTimeout
>>> at 
>>> org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
>>> at 
>>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229)
>>> at 
>>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225)
>>> at 
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:242)
>>> at 
>>> org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
>>> at 
>>> org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
>>> at 
>>> org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67)
>>> at 
>>> org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
>>> at 
>>> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
>>> at 
>>> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
>>> at scala.Option.foreach(Option.scala:257)
>>> at 
>>> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
>>> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
>>> at 
>>> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
>>> at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
>>> [120 seconds]
>>> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
>>> at 
>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
>>> at scala.concurrent.Await$.result(package.scala:190)
>>> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:241)
>>> ... 12 more
>>> 16/05/19 02:18:26 ERROR TaskSchedulerImpl: Lost executor 
>>> 20160421-192532-1677787146-5050-40596-S23 on ip-10-0-1-70.ec2.internal: 
>>> Executor heartbeat timed out after 161447 ms
>>> 16/05/19 02:18:53 ERROR TaskSchedulerImpl: Lost executor 
>>> 20160421-192532-1677787146-5050-40596-S23 on ip-10-0-1-70.ec2.internal: 
>>> remote Rpc client disassociated
>>> 
>>> Thanks
>>> Sahil
> 


Re: KafkaUtils.createDirectStream Not Fetching Messages with Confluent Serializers as Value Decoder.

2016-05-20 Thread Ramaswamy, Muthuraman
No, I haven’t enabled Kerberos. Just the calls as specified in the stack 
overflow thread on how to use the schema registry based serializer.

~Muthu




On 5/19/16, 5:25 PM, "Mail.com"  wrote:

>Hi Muthu,
>
>Do you have Kerberos enabled?
>
>Thanks,
>Pradeep
>
>> On May 19, 2016, at 12:17 AM, Ramaswamy, Muthuraman 
>>  wrote:
>> 
>> I am using Spark 1.6.1 and Kafka 0.9+ It works for both receiver and 
>> receiver-less mode.
>> 
>> One thing I noticed when you specify invalid topic name, KafkaUtils doesn't 
>> fetch any messages. So, check you have specified the topic name correctly.
>> 
>> ~Muthu
>> 
>> From: Mail.com [pradeep.mi...@mail.com]
>> Sent: Monday, May 16, 2016 9:33 PM
>> To: Ramaswamy, Muthuraman
>> Cc: Cody Koeninger; spark users
>> Subject: Re: KafkaUtils.createDirectStream Not Fetching Messages with 
>> Confluent Serializers as Value Decoder.
>> 
>> Hi Muthu,
>> 
>> Are you on spark 1.4.1 and Kafka 0.8.2? I have a similar issue even for 
>> simple string messages.
>> 
>> Console producer and consumer work fine. But spark always reruns empty RDD. 
>> I am using Receiver based Approach.
>> 
>> Thanks,
>> Pradeep
>> 
>>> On May 16, 2016, at 8:19 PM, Ramaswamy, Muthuraman 
>>>  wrote:
>>> 
>>> Yes, I can see the messages. Also, I wrote a quick custom decoder for avro 
>>> and it works fine for the following:
>>> 
> kvs = KafkaUtils.createDirectStream(ssc, [topic], 
> {"metadata.broker.list": brokers}, valueDecoder=decoder)
>>> 
>>> But, when I use the Confluent Serializers to leverage the Schema Registry 
>>> (based on the link shown below), it doesn’t work for me. I am not sure 
>>> whether I need to configure any more details to consume the Schema 
>>> Registry. I can fetch the schema from the schema registry based on is Ids. 
>>> The decoder method is not returning any values for me.
>>> 
>>> ~Muthu
>>> 
>>> 
>>> 
 On 5/16/16, 10:49 AM, "Cody Koeninger"  wrote:
 
 Have you checked to make sure you can receive messages just using a
 byte array for value?
 
 On Mon, May 16, 2016 at 12:33 PM, Ramaswamy, Muthuraman
  wrote:
> I am trying to consume AVRO formatted message through
> KafkaUtils.createDirectStream. I followed the listed below example (refer
> link) but the messages are not being fetched by the Stream.
> 
> https://urldefense.proofpoint.com/v2/url?u=http-3A__stackoverflow.com_questions_30339636_spark-2Dpython-2Davro-2Dkafka-2Ddeserialiser=CwIBaQ=jcv3orpCsv7C4ly8-ubDob57ycZ4jvhoYZNDBA06fPk=NQ-dw5X8CJcqaXIvIdMUUdkL0fHDonD07FZzTY3CgiU=Nc-rPMFydyCrwOZuNWs2GmSL4NkN8eGoR-mkJUlkCx0=hwqxCKl3P4_9pKWeo1OGR134QegMRe3Xh22_WMy-5q8=
> 
> Is there any code missing that I must add to make the above sample work.
> Say, I am not sure how the confluent serializers would know the avro 
> schema
> info as it knows only the Schema Registry URL info.
> 
> Appreciate your help.
> 
> ~Muthu
>>> ?B‹CB•?È?[œÝXœØÜšX™K??K[XZ[?ˆ?\Ù\‹][œÝXœØÜšX™P?Ü?\šË˜\?XÚ?K›Ü™ÃB‘›Üˆ?Y??]?[Û˜[??ÛÛ[X[™?Ë??K[XZ[?ˆ?\Ù\‹Z?[???Ü?\šË˜\?XÚ?K›Ü™ÃBƒB
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>> 


Wide Datasets (v1.6.1)

2016-05-20 Thread Don Drake
I have been working to create a Dataframe that contains a nested
structure.  The first attempt is to create an array of structures.   I've
written previously on this list how it doesn't work in Dataframes in 1.6.1,
but it does in 2.0.

I've continued my experimenting and have it working in Datasets in 1.6.1,
using ds.groupBy($"col").groupMaps().  This works great when the number
of columns is less than the maximum for a case class (22 in scala 2.10, 254
in scala 2.11).  However, while using a custom written case class of 200+
fields, I did run into a Catalyst/Janino stack overflow exception (during
runtime, it as attempting to compile my large class) so that doesn't work.
I can provide an example/open a Jira if there is a chance this will be
fixed.

My question is the following: Datasets rely on case classes, if I have a
dataset with more than 254 fields (and I have a lot of them), how am I
supposed to use Datasets with these wide tables?  Am I forced to use
Dataframes?

Thanks.

-Don

-- 
Donald Drake
Drake Consulting
http://www.drakeconsulting.com/
https://twitter.com/dondrake 
800-733-2143


Problems finding the original objects after HashingTF()

2016-05-20 Thread Pasquinell Urbani
Hi all,

I'm following an TF-IDF example but I’m having some issues that i’m not
sure how to fix.

The input is the following

val test = sc.textFile("s3n://.../test_tfidf_products.txt")
test.collect.mkString("\n")

which prints

test: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[370] at textFile
at :121 res241: String = a a b c d e b c d d

After that, I compute the ratings by doing

val test2 = test.map(_.split(" ").toSeq)
val hashingTF2 = new HashingTF()
val tf2: RDD[Vector] = hashingTF2.transform(test2)
tf2.cache()
val idf2 = new IDF().fit(tf2)
val tfidf2: RDD[Vector] = idf2.transform(tf2)
val expandedText = idfModel.transform(tf)
tfidf2.collect.mkString("\n")

which prints

(1048576,[97,98,99,100,101],[0.8109302162163288,0.0,0.0,0.0,0.4054651081081644])
(1048576,[98,99,100],[0.0,0.0,0.0])

The numbers [97,98,99,100,101] are indexes of the vector tfidf2.

I need to access the rating for example for item “a”, but the only way i
have been able to do this is using the method indexOf() of the hasingTF
object.

hashingTF2.indexOf("a")

res236: Int = 97


Is there a better way to perform this?


Thank you all.


Re: Spark.default.parallelism can not set reduce number

2016-05-20 Thread Ovidiu-Cristian MARCU
You can check org.apache.spark.sql.internal.SQLConf for other default settings 
as well.
  val SHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.shuffle.partitions")
.doc("The default number of partitions to use when shuffling data for joins 
or aggregations.")
.intConf
.createWithDefault(200)


> On 20 May 2016, at 13:17, 喜之郎 <251922...@qq.com> wrote:
> 
>  Hi all.
> I set Spark.default.parallelism equals 20 in spark-default.conf. And send 
> this file to all nodes.
> But I found reduce number is still default value,200.
> Does anyone else encouter this problem? can anyone give some advice?
> 
> 
> [Stage 9:>(0 + 0) / 
> 200]
> [Stage 9:>(0 + 2) / 
> 200]
> [Stage 9:>(1 + 2) / 
> 200]
> [Stage 9:>(2 + 2) / 
> 200]
> ###
> 
> And this results in many empty files.Because my data is little, only some of 
> the 200 files have data.
> ###
>  2016-05-20 17:01 
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-0
>  2016-05-20 17:01 
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-1
>  2016-05-20 17:01 
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-2
>  2016-05-20 17:01 
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-3
>  2016-05-20 17:01 
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-4
>  2016-05-20 17:01 
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-5
> 
> 
> 
> 



StackOverflowError in Spark SQL

2016-05-20 Thread Jeff Jones
I’m running Spark 1.6.0 in a standalone cluster. Periodically I’ve seen 
StackOverflowErrors when running queries. An example below.
In the past I’ve been able to avoid such situations by ensuring we don’t have 
too many arguments in ‘in’ clauses or too many unioned queries both of which 
seem to trigger issues like this.

Why doesn’t Spark protect itself from such issues? I’d much rather get a Spark 
exception that gets thrown and can be handled than get a StackOverflowException 
which causes the JVM to exit.

I can provide the full stack upon request.

Thanks,
Jeff


2016-05-20 05:32:23,044 - [ERROR] - from akka.actor.ActorSystemImpl in 
play-akka.actor.default-dispatcher-8

Uncaught error from thread [play-akka.actor.default-dispatcher-84] shutting 
down JVM since 'akka.jvm-exit-on-fatal-error' is enabled

java.lang.StackOverflowError: null

at 
org.apache.spark.sql.catalyst.plans.logical.SetOperation.output(basicOperators.scala:96)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.plans.logical.SetOperation.output(basicOperators.scala:96)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.plans.logical.SetOperation.output(basicOperators.scala:96)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.plans.logical.SetOperation.output(basicOperators.scala:96)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.plans.logical.SetOperation.output(basicOperators.scala:96)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.plans.logical.SetOperation.output(basicOperators.scala:96)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.plans.logical.SetOperation.output(basicOperators.scala:96)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.plans.logical.SetOperation.output(basicOperators.scala:96)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]




at 
org.apache.spark.sql.catalyst.plans.logical.SetOperation.output(basicOperators.scala:96)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.plans.logical.SetOperation.output(basicOperators.scala:96)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.optimizer.SetOperationPushDown$.org$apache$spark$sql$catalyst$optimizer$SetOperationPushDown$$buildRewrites(Optimizer.scala:110)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.optimizer.SetOperationPushDown$$anonfun$apply$2.applyOrElse(Optimizer.scala:149)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.optimizer.SetOperationPushDown$$anonfun$apply$2.applyOrElse(Optimizer.scala:145)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:243)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:242) 
~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:248)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
 ~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) 
~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at scala.collection.Iterator$class.foreach(Iterator.scala:742) 
~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) 
~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) 
~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) 
~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) 
~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:308) 
~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at scala.collection.AbstractIterator.to(Iterator.scala:1194) 
~[spark-assembly-1.6.0-hadoop2.2.0.jar:1.6.0]

at 

Re: rpc.RpcTimeoutException: Futures timed out after [120 seconds]

2016-05-20 Thread Mail.com
Hi Sahil,

I have seen this with high GC time. Do you ever get this error with small 
volume files

Pradeep

> On May 20, 2016, at 9:32 AM, Sahil Sareen  wrote:
> 
> Hey all
> 
> I'm using Spark-1.6.1 and occasionally seeing executors lost and hurting my 
> application performance due to these errors.
> Can someone please let out all the possible problems that could cause this?
> 
> 
> Full log:
> 
> 16/05/19 02:17:54 ERROR ContextCleaner: Error cleaning broadcast 266685
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> at 
> org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229)
> at 
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225)
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:242)
> at 
> org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)
> at 
> org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
> at 
> org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
> at 
> org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67)
> at 
> org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
> at scala.Option.foreach(Option.scala:257)
> at 
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
> at 
> org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after 
> [120 seconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at 
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:190)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:241)
> ... 12 more
> 16/05/19 02:18:26 ERROR TaskSchedulerImpl: Lost executor 
> 20160421-192532-1677787146-5050-40596-S23 on ip-10-0-1-70.ec2.internal: 
> Executor heartbeat timed out after 161447 ms
> 16/05/19 02:18:53 ERROR TaskSchedulerImpl: Lost executor 
> 20160421-192532-1677787146-5050-40596-S23 on ip-10-0-1-70.ec2.internal: 
> remote Rpc client disassociated
> 
> Thanks
> Sahil


Re: rpc.RpcTimeoutException: Futures timed out after [120 seconds]

2016-05-20 Thread Sahil Sareen
I'm not sure if this happens on small files or big ones as I have a mix of
them always.
Did you see this only for big files?

On Fri, May 20, 2016 at 7:36 PM, Mail.com  wrote:

> Hi Sahil,
>
> I have seen this with high GC time. Do you ever get this error with small
> volume files
>
> Pradeep
>
> On May 20, 2016, at 9:32 AM, Sahil Sareen  wrote:
>
> Hey all
>
> I'm using Spark-1.6.1 and occasionally seeing executors lost and hurting
> my application performance due to these errors.
> Can someone please let out all the possible problems that could cause this?
>
>
> Full log:
>
> 16/05/19 02:17:54 ERROR ContextCleaner: Error cleaning broadcast 266685
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229)
> at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225)
> at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:242)
> at
> org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)
> at
> org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
> at
> org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
> at
> org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67)
> at
> org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
> at
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
> at
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
> at org.apache.spark.ContextCleaner.org
> $apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
> at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [120 seconds]
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
> at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
> at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
> at
> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
> at scala.concurrent.Await$.result(package.scala:190)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:241)
> ... 12 more
> 16/05/19 02:18:26 ERROR TaskSchedulerImpl: Lost executor
> 20160421-192532-1677787146-5050-40596-S23 on ip-10-0-1-70.ec2.internal:
> Executor heartbeat timed out after 161447 ms
> 16/05/19 02:18:53 ERROR TaskSchedulerImpl: Lost executor
> 20160421-192532-1677787146-5050-40596-S23 on ip-10-0-1-70.ec2.internal:
> remote Rpc client disassociated
>
>
>
> Thanks
> Sahil
>
>


Re: Splitting RDD by partition

2016-05-20 Thread Sun Rui
I think the latter approach is better, which can avoid un-necessary 
computations by filtering out un-needed partitions.
It is better to cache the previous RDD so that it won’t be computed twice
> On May 20, 2016, at 16:59, shlomi  wrote:
> 
> Another approach I found:
> 
> First, I make a PartitionsRDD class which only takes a certain range of
> partitions
> - 
> case class PartitionsRDDPartition(val index:Int, val origSplit:Partition)
> extends Partition {}
> 
> class PartitionsRDD[U: ClassTag](var prev: RDD[U], drop:Int,take:Int)
> extends RDD[U](prev) {
>  override def getPartitions: Array[Partition] =
> prev.partitions.drop(drop).take(take).zipWithIndex.map{case (split,
> idx)=>{new PartitionsRDDPartition(idx,
> split)}}.asInstanceOf[Array[Partition]]
>  override def compute(split: Partition, context: TaskContext): Iterator[U]
> =
> prev.iterator(partitions(split.index).asInstanceOf[PartitionsRDDPartition].origSplit,
> context)
> }
> - 
> 
> And then I can create my two RDD's using the following:
> - 
> def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T],
> RDD[T]) = {
>   val left  = new PartitionsRDD[T](rdd, 0, hotPartitions);
>   val right = new PartitionsRDD[T](rdd, hotPartitions,
> rdd.numPartitions-hotPartitions);
>   (left, right)
> }
> - 
> 
> This approach saves a few minutes when compared to the one in the previous
> post (at least on a local test.. I still need to test this on a real
> cluster).
> 
> Any thought about this? Is this the right thing to do or am I missing
> something important?
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-by-partition-tp26983p26985.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



rpc.RpcTimeoutException: Futures timed out after [120 seconds]

2016-05-20 Thread Sahil Sareen
Hey all

I'm using Spark-1.6.1 and occasionally seeing executors lost and hurting my
application performance due to these errors.
Can someone please let out all the possible problems that could cause this?


Full log:

16/05/19 02:17:54 ERROR ContextCleaner: Error cleaning broadcast 266685
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
seconds]. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org
$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229)
at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:242)
at
org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
at
org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
at
org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67)
at
org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:214)
at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:170)
at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:161)
at scala.Option.foreach(Option.scala:257)
at
org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:161)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1136)
at org.apache.spark.ContextCleaner.org
$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:154)
at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:67)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[120 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:241)
... 12 more
16/05/19 02:18:26 ERROR TaskSchedulerImpl: Lost executor
20160421-192532-1677787146-5050-40596-S23 on ip-10-0-1-70.ec2.internal:
Executor heartbeat timed out after 161447 ms
16/05/19 02:18:53 ERROR TaskSchedulerImpl: Lost executor
20160421-192532-1677787146-5050-40596-S23 on ip-10-0-1-70.ec2.internal:
remote Rpc client disassociated



Thanks
Sahil


Re: Starting executor without a master

2016-05-20 Thread Mathieu Longtin
Correct, what I do to start workers is the equivalent of start-slaves.sh.
It ends up running the same command on the worker servers as start-slaves
does.

It definitively uses all workers, and workers starting later pick up work
as well. If you have a long running job, you can add workers dynamically
and they will pick up work as long as there are enough partitions to go
around.

I set spark.locality.wait to 0 so that workers never wait to pick up tasks.



On Fri, May 20, 2016 at 2:57 AM Mich Talebzadeh 
wrote:

> OK this is basically form my notes for Spark standalone. Worker process is
> the slave process
>
> [image: Inline images 2]
>
>
>
> You start worker as you showed
>
> $SPARK_HOME/sbin/start-slaves.sh
> Now that picks up the worker host node names from $SPARK_HOME/conf/slaves
> files. So you still have to tell Spark where to run workers.
>
> However, if I am correct regardless of what you have specified in slaves,
> in this standalone mode there will not be any spark process spawned by the
> driver on the slaves. In all probability you will be running one
> spark-submit process on the driver node. You can see this through the
> output of
>
> jps|grep SparkSubmit
>
> and you will see the details by running jmonitor for that SparkSubmit job
>
> However, I still doubt whether Scheduling Across applications is feasible
> in standalone mode.
>
> The doc says
>
> *Standalone mode:* By default, applications submitted to the standalone
> mode cluster will run in FIFO (first-in-first-out) order, and each
> application will try to use *all available nodes*. You can limit the
> number of nodes an application uses by setting the spark.cores.max
> configuration property in it, or change the default for applications that
> don’t set this setting through spark.deploy.defaultCores. Finally, in
> addition to controlling cores, each application’s spark.executor.memory
> setting controls its memory use.
>
> It uses the word all available nodes but I am not convinced if it will use
> those nodes? Someone can possibly clarify this
>
> HTH
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 20 May 2016 at 02:03, Mathieu Longtin  wrote:
>
>> Okay:
>> *host=my.local.server*
>> *port=someport*
>>
>> This is the spark-submit command, which runs on my local server:
>> *$SPARK_HOME/bin/spark-submit --master spark://$host:$port
>> --executor-memory 4g python-script.py with args*
>>
>> If I want 200 worker cores, I tell the cluster scheduler to run this
>> command on 200 cores:
>> *$SPARK_HOME/sbin/start-slave.sh --cores=1 --memory=4g
>> spark://$host:$port *
>>
>> That's it. When the task starts, it uses all available workers. If for
>> some reason, not enough cores are available immediately, it still starts
>> processing with whatever it gets and the load will be spread further as
>> workers come online.
>>
>>
>> On Thu, May 19, 2016 at 8:24 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> In a normal operation we tell spark which node the worker processes can
>>> run by adding the nodenames to conf/slaves.
>>>
>>> Not very clear on this in your case all the jobs run locally with say
>>> 100 executor cores like below:
>>>
>>>
>>> ${SPARK_HOME}/bin/spark-submit \
>>>
>>> --master local[*] \
>>>
>>> --driver-memory xg \  --default would be 512M
>>>
>>> --num-executors=1 \   -- This is the constraint in
>>> stand-alone Spark cluster, whether specified or not
>>>
>>> --executor-memory=xG \ --
>>>
>>> --executor-cores=n \
>>>
>>> --master local[*] means all cores and --executor-cores in your case need
>>> not be specified? or you can cap it like above --executor-cores=n. If
>>> it is not specified then the Spark app will go and grab every core.
>>> Although in practice that does not happen it is just an upper ceiling. It
>>> is FIFO.
>>>
>>> What typical executor memory is specified in your case?
>>>
>>> Do you have a  sample snapshot of spark-submit job by any chance Mathieu?
>>>
>>> Cheers
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 20 May 2016 at 00:27, Mathieu Longtin  wrote:
>>>
 Mostly, the resource management is not up to the Spark master.

 We routinely start 100 executor-cores for 5 minute job, and they just
 quit when they are done. Then those processor cores can do something else
 entirely, they are not reserved for Spark at all.

 On Thu, May 19, 2016 at 

Re: Spark.default.parallelism can not set reduce number

2016-05-20 Thread Takeshi Yamamuro
You need to use `spark.sql.shuffle.partitions`.

// maropu

On Fri, May 20, 2016 at 8:17 PM, 喜之郎 <251922...@qq.com> wrote:

>  Hi all.
> I set Spark.default.parallelism equals 20 in spark-default.conf. And send
> this file to all nodes.
> But I found reduce number is still default value,200.
> Does anyone else encouter this problem? can anyone give some advice?
>
> 
> [Stage 9:>(0 + 0)
> / 200]
> [Stage 9:>(0 + 2)
> / 200]
> [Stage 9:>(1 + 2)
> / 200]
> [Stage 9:>(2 + 2)
> / 200]
> ###
>
> And this results in many empty files.Because my data is little, only some
> of the 200 files have data.
> ###
>  2016-05-20 17:01
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-0
>  2016-05-20 17:01
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-1
>  2016-05-20 17:01
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-2
>  2016-05-20 17:01
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-3
>  2016-05-20 17:01
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-4
>  2016-05-20 17:01
> /warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-5
> 
>
>
>
>


-- 
---
Takeshi Yamamuro


Spark.default.parallelism can not set reduce number

2016-05-20 Thread ??????
Hi all.
I set Spark.default.parallelism equals 20 in spark-default.conf. And send this 
file to all nodes.
But I found reduce number is still default value,200.
Does anyone else encouter this problem? can anyone give some advice?



[Stage 9:>(0 + 0) / 200]
[Stage 9:>(0 + 2) / 200]
[Stage 9:>(1 + 2) / 200]
[Stage 9:>(2 + 2) / 200]
###


And this results in many empty files.Because my data is little, only some of 
the 200 files have data.
###
 2016-05-20 17:01 
/warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-0
 2016-05-20 17:01 
/warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-1
 2016-05-20 17:01 
/warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-2
 2016-05-20 17:01 
/warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-3
 2016-05-20 17:01 
/warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-4
 2016-05-20 17:01 
/warehouse/dmpv3.db/datafile/tmp/output/userprofile/20160519/part-5


Re: Is there a way to merge parquet small files?

2016-05-20 Thread Takeshi Yamamuro
Many small files could cause technical issues in both hdfs and spark
though, they do not
generate many stages and tasks in the recent version of spark.

// maropu

On Fri, May 20, 2016 at 2:41 PM, Gavin Yue  wrote:

> For logs file I would suggest save as gziped text file first.  After
> aggregation, convert them into parquet by merging a few files.
>
>
>
> On May 19, 2016, at 22:32, Deng Ching-Mallete  wrote:
>
> IMO, it might be better to merge or compact the parquet files instead of
> keeping lots of small files in the HDFS. Please refer to [1] for more info.
>
> We also encountered the same issue with the slow query, and it was indeed
> caused by the many small parquet files. In our case, we were processing
> large data sets with batch jobs instead of a streaming job. To solve our
> issue, we just did a coalesce to reduce the number of partitions before
> saving as parquet format.
>
> HTH,
> Deng
>
> [1] http://blog.cloudera.com/blog/2009/02/the-small-files-problem/
>
> On Fri, May 20, 2016 at 1:50 PM, 王晓龙/0515 
> wrote:
>
>> I’m using a spark streaming program to store log message into parquet
>> file every 10 mins.
>> Now, when I query the parquet, it usually takes hundreds of thousands of
>> stages to compute a single count.
>> I looked into the parquet file’s path and find a great amount of small
>> files.
>>
>> Do the small files caused the problem? Can I merge them, or is there a
>> better way to solve it?
>>
>> Lots of thanks.
>>
>> 
>>
>> 此邮件内容仅代表发送者的个人观点和意见,与招商银行股份有限公司及其下属分支机构的观点和意见无关,招商银行股份有限公司及其下属分支机构不对此邮件内容承担任何责任。此邮件内容仅限收件人查阅,如误收此邮件请立即删除。
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


-- 
---
Takeshi Yamamuro


Dataset API and avro type

2016-05-20 Thread Han JU
Hello,

I'm looking at the Dataset API in 1.6.1 and also in upcoming 2.0. However
it does not seems to work with Avro data types:


object Datasets extends App {
  val conf = new SparkConf()
  conf.setAppName("Dataset")
  conf.setMaster("local[2]")
  conf.setIfMissing("spark.serializer", classOf[KryoSerializer].getName)
  conf.setIfMissing("spark.kryo.registrator",
classOf[DatasetKryoRegistrator].getName)

  val sc = new SparkContext(conf)
  val sql = new SQLContext(sc)
  import sql.implicits._

  implicit val encoder = Encoders.kryo[MyAvroType]
  val data = sql.read.parquet("path/to/data").as[MyAvroType]

  var c = 0
  // BUG here
  val sizes = data.mapPartitions { iter =>
List(iter.size).iterator
  }.collect().toList

  println(c)
}


class DatasetKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(
  classOf[MyAvroType],
AvroSerializer.SpecificRecordBinarySerializer[MyAvroType])
  }
}


I'm using chill-avro's kryo servirilizer for avro types and I've tried
`Encoders.kyro` as well as `bean` or `javaSerialization`, but none of them
works. The errors seems to be that the generated code does not compile with
janino.

Tested in 1.6.1 and the 2.0.0-preview. Any idea?

-- 
*JU Han*

Software Engineer @ Teads.tv

+33 061960


Fwd: Spark CacheManager Thread-safety

2016-05-20 Thread Pietro Gentile
Hi all,

I have a series of doubts about CacheManager used by SQLContext to cache
DataFrame.

My use case requires different threads persisting/reading dataframes
cuncurrently. I realized using spark that persistence really does not work
in parallel mode.

I would like it if I'm persisting a data frame, another user should be able
to persist a different DF and not wait for the first to finish.

Is there any way to implement this scenario?

Thank in advance,
Pietro.


Spark CacheManager Thread-safety

2016-05-20 Thread Pietro Gentile
Hi all,

I have a series of doubts about CacheManager used by SQLContext to cache
DataFrame.

My use case requires different threads persisting/reading dataframes
cuncurrently. I realized using spark that persistence really does not work
in parallel mode.

I would like it if I'm persisting a data frame, another user should be able
to persist a different DF and not wait for the first to finish.

Is there any way to implement this scenario?

Thank in advance,
Pietro.


Re: Splitting RDD by partition

2016-05-20 Thread shlomi
Another approach I found:

First, I make a PartitionsRDD class which only takes a certain range of
partitions
- 
case class PartitionsRDDPartition(val index:Int, val origSplit:Partition)
extends Partition {}

class PartitionsRDD[U: ClassTag](var prev: RDD[U], drop:Int,take:Int)
extends RDD[U](prev) {
  override def getPartitions: Array[Partition] =
prev.partitions.drop(drop).take(take).zipWithIndex.map{case (split,
idx)=>{new PartitionsRDDPartition(idx,
split)}}.asInstanceOf[Array[Partition]]
  override def compute(split: Partition, context: TaskContext): Iterator[U]
=
prev.iterator(partitions(split.index).asInstanceOf[PartitionsRDDPartition].origSplit,
context)
}
- 

And then I can create my two RDD's using the following:
- 
def splitByPartition[T:ClassTag](rdd: RDD[T], hotPartitions:Int): (RDD[T],
RDD[T]) = {
   val left  = new PartitionsRDD[T](rdd, 0, hotPartitions);
   val right = new PartitionsRDD[T](rdd, hotPartitions,
rdd.numPartitions-hotPartitions);
   (left, right)
}
- 

This approach saves a few minutes when compared to the one in the previous
post (at least on a local test.. I still need to test this on a real
cluster).

Any thought about this? Is this the right thing to do or am I missing
something important?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Splitting-RDD-by-partition-tp26983p26985.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Starting executor without a master

2016-05-20 Thread Mich Talebzadeh
OK this is basically form my notes for Spark standalone. Worker process is
the slave process

[image: Inline images 2]



You start worker as you showed

$SPARK_HOME/sbin/start-slaves.sh
Now that picks up the worker host node names from $SPARK_HOME/conf/slaves
files. So you still have to tell Spark where to run workers.

However, if I am correct regardless of what you have specified in slaves,
in this standalone mode there will not be any spark process spawned by the
driver on the slaves. In all probability you will be running one
spark-submit process on the driver node. You can see this through the
output of

jps|grep SparkSubmit

and you will see the details by running jmonitor for that SparkSubmit job

However, I still doubt whether Scheduling Across applications is feasible
in standalone mode.

The doc says

*Standalone mode:* By default, applications submitted to the standalone
mode cluster will run in FIFO (first-in-first-out) order, and each
application will try to use *all available nodes*. You can limit the number
of nodes an application uses by setting the spark.cores.max configuration
property in it, or change the default for applications that don’t set this
setting through spark.deploy.defaultCores. Finally, in addition to
controlling cores, each application’s spark.executor.memory setting
controls its memory use.

It uses the word all available nodes but I am not convinced if it will use
those nodes? Someone can possibly clarify this

HTH


Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 20 May 2016 at 02:03, Mathieu Longtin  wrote:

> Okay:
> *host=my.local.server*
> *port=someport*
>
> This is the spark-submit command, which runs on my local server:
> *$SPARK_HOME/bin/spark-submit --master spark://$host:$port
> --executor-memory 4g python-script.py with args*
>
> If I want 200 worker cores, I tell the cluster scheduler to run this
> command on 200 cores:
> *$SPARK_HOME/sbin/start-slave.sh --cores=1 --memory=4g
> spark://$host:$port *
>
> That's it. When the task starts, it uses all available workers. If for
> some reason, not enough cores are available immediately, it still starts
> processing with whatever it gets and the load will be spread further as
> workers come online.
>
>
> On Thu, May 19, 2016 at 8:24 PM Mich Talebzadeh 
> wrote:
>
>> In a normal operation we tell spark which node the worker processes can
>> run by adding the nodenames to conf/slaves.
>>
>> Not very clear on this in your case all the jobs run locally with say 100
>> executor cores like below:
>>
>>
>> ${SPARK_HOME}/bin/spark-submit \
>>
>> --master local[*] \
>>
>> --driver-memory xg \  --default would be 512M
>>
>> --num-executors=1 \   -- This is the constraint in
>> stand-alone Spark cluster, whether specified or not
>>
>> --executor-memory=xG \ --
>>
>> --executor-cores=n \
>>
>> --master local[*] means all cores and --executor-cores in your case need
>> not be specified? or you can cap it like above --executor-cores=n. If it
>> is not specified then the Spark app will go and grab every core. Although
>> in practice that does not happen it is just an upper ceiling. It is FIFO.
>>
>> What typical executor memory is specified in your case?
>>
>> Do you have a  sample snapshot of spark-submit job by any chance Mathieu?
>>
>> Cheers
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 20 May 2016 at 00:27, Mathieu Longtin  wrote:
>>
>>> Mostly, the resource management is not up to the Spark master.
>>>
>>> We routinely start 100 executor-cores for 5 minute job, and they just
>>> quit when they are done. Then those processor cores can do something else
>>> entirely, they are not reserved for Spark at all.
>>>
>>> On Thu, May 19, 2016 at 4:55 PM Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 Then in theory every user can fire multiple spark-submit jobs. do you
 cap it with settings in  $SPARK_HOME/conf/spark-defaults.conf , but I
 guess in reality every user submits one job only.

 This is an interesting model for two reasons:


- It uses parallel processing across all the nodes or most of the
nodes to minimise the processing time
- it requires less intervention



 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 

Re: Query about how to estimate cpu usage for spark

2016-05-20 Thread Mich Talebzadeh
Please note taht CPU usage varies with time, it is not a fixed value

First have a  look at spark GUI that runs under port 4040 under tab jobs

Then use jps to identify the spark process

jps|grep SparkSubmit

Using the process name start jmonitor  on the OS and specify SparkSubmit
process, It will show CPU, memory, Heap usage plotted against time

[image: Inline images 1]

HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 20 May 2016 at 04:35, Wang Jiaye  wrote:

> For MR job, there is job counter to provide CPU ms information while I
> cannot find a similar metrics in Spark which is quite useful. Do anyone
> know about this?
>