RE: Re: Spark assembly in Maven repo?

2015-12-14 Thread Xiaoyong Zhu
Thanks for the info!

Xiaoyong

From: Sean Owen [mailto:so...@cloudera.com]
Sent: Monday, December 14, 2015 12:20 AM
To: Xiaoyong Zhu 
Cc: user 
Subject: Re: Re: Spark assembly in Maven repo?


Yes, though I think the Maven Central repository is more canonical.

http://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.5.2/

On Mon, Dec 14, 2015, 06:35 Xiaoyong Zhu 
> wrote:
Thanks! do you mean something here (for example for 1.5.1 using scala 2.10)?
https://repository.apache.org/content/repositories/releases/org/apache/spark/spark-core_2.10/1.5.1/

Xiaoyong

From: Sean Owen [mailto:so...@cloudera.com]
Sent: Saturday, December 12, 2015 12:45 AM
To: Xiaoyong Zhu >
Cc: user >

Subject: Re: Re: Spark assembly in Maven repo?

That's exactly what the various artifacts in the Maven repo are for. The API 
classes for core are in the core artifact and so on. You don't need an assembly.

On Sat, Dec 12, 2015 at 12:32 AM, Xiaoyong Zhu 
> wrote:
Yes, so our scenario is to treat the spark assembly as an “SDK” so users can 
develop Spark applications easily without downloading them. In this case which 
way do you guys think might be good?

Xiaoyong

From: fightf...@163.com 
[mailto:fightf...@163.com]
Sent: Friday, December 11, 2015 12:08 AM
To: Mark Hamstra >
Cc: Xiaoyong Zhu >; Jeff 
Zhang >; user 
>; Zhaomin Xu 
>; Joe Zhang (SDE) 
>
Subject: Re: Re: Spark assembly in Maven repo?

Agree with you that assembly jar is not good to publish. However, what he 
really need is to fetch
an updatable maven jar file.


fightf...@163.com

From: Mark Hamstra
Date: 2015-12-11 15:34
To: fightf...@163.com
CC: Xiaoyong Zhu; Jeff 
Zhang; user; Zhaomin 
Xu; Joe Zhang (SDE)
Subject: Re: RE: Spark assembly in Maven repo?
No, publishing a spark assembly jar is not fine.  See the doc attached to 
https://issues.apache.org/jira/browse/SPARK-11157
 and be aware that a likely goal of Spark 2.0 will be the elimination of 
assemblies.

On Thu, Dec 10, 2015 at 11:19 PM, fightf...@163.com 
> wrote:
Using maven to download the assembly jar is fine. I would recommend to deploy 
this
assembly jar to your local maven repo, i.e. nexus repo, Or more likey a 
snapshot repository


fightf...@163.com

From: Xiaoyong Zhu
Date: 2015-12-11 15:10
To: Jeff Zhang
CC: user@spark.apache.org; Zhaomin 
Xu; Joe Zhang (SDE)
Subject: RE: Spark assembly in Maven repo?
Sorry – I didn’t make it clear. It’s actually not a “dependency” – it’s 
actually that we are building a certain plugin for IntelliJ where we want to 
distribute this jar. But since the jar is updated frequently we don't want to 
distribute it together with our plugin but we would like to download it via 
Maven.

In this case what’s the recommended way?

Xiaoyong

From: Jeff Zhang [mailto:zjf...@gmail.com]
Sent: Thursday, December 10, 2015 11:03 PM
To: Xiaoyong Zhu 

Re: [SparkR] Is rdd in SparkR deprecated ?

2015-12-14 Thread Jeff Zhang
Thanks Felix, Just curious when I read the code.

On Tue, Dec 15, 2015 at 1:32 AM, Felix Cheung 
wrote:

> RDD API in SparkR is not officially supported. You could still access them
> with the SparkR::: prefix though.
>
> May I ask what uses you have for them? Would the DataFrame API sufficient?
>
>
>
>
>
> On Mon, Dec 14, 2015 at 4:26 AM -0800, "Jeff Zhang" 
> wrote:
>
> From the source code of SparkR, seems SparkR support rdd api. But there's
> no documentation on that. (
> http://spark.apache.org/docs/latest/sparkr.html ) So I guess it is
> deprecated, is that right ?
>
> --
> Best Regards
>
> Jeff Zhang
>



-- 
Best Regards

Jeff Zhang


Re: Problems w/YARN Spark Streaming app reading from Kafka

2015-12-14 Thread Robert Towne
Cody, sorry I didn’t get back sooner, I never saw the response pass by.

I was looking at the spark ui.   I’ll see if I can recreate the issue w/version 
1.5.2.  Thanks..


From: Cody Koeninger >
Date: Friday, October 16, 2015 at 12:48
To: robert towne >
Cc: "user@spark.apache.org" 
>
Subject: Re: Problems w/YARN Spark Streaming app reading from Kafka

What do you mean by "the current documentation states it isn’t used"?  
http://spark.apache.org/docs/latest/configuration.html  still lists the value 
and its meaning.

As far as the issue you're seeing, are you measuring records by looking at 
logs, the spark ui, or actual downstream sinks of data?  I don't think the 
backpressure code does any logging, but KafkaRDD will log at info level the 
offsets for each topicpartition that is computed (message starts with 
"Computing topic")




On Fri, Oct 16, 2015 at 1:52 PM, Robert Towne 
> wrote:
I have a Spark Streaming app that reads using a reciever-less connection ( 
KafkaUtils.createDirectStream) with an interval of 1 minute.
For about 15 hours it was running fine, ranging in input size of 3,861,758 to 
16,836 events.

Then about 3 hours ago, every minute batch brought in the same number of 
records = 5,760 (2 topics, topic 1 = 64 partitions, topic 2 = 32 partitions).

I know there is more data than the 5,760 records that being piped in, and 
eventually we’ll fall so far behind that our kafka offsets will not be 
available.
It seems odd that 5760/96 (partitions) = 60 – or my interval in seconds.

I do have spark.streaming.backpressure.enabled = true and even though the 
current documentation states it isn’t used I have a value set for 
spark.streaming.kafka.maxRatePerPartition.



Has anyone else seen this issue where the rate seems capped even though it 
should be pulling more data?

Thanks,
Robert



Re: Discover SparkUI port for spark streaming job running in cluster mode

2015-12-14 Thread Ted Yu
w.r.t. getting application Id, please take a look at the following
in SparkContext :

  /**
   * A unique identifier for the Spark application.
   * Its format depends on the scheduler implementation.
   * (i.e.
   *  in case of local spark app something like 'local-1433865536131'
   *  in case of YARN something like 'application_1433865536131_34483'
   * )
   */
  def applicationId: String = _applicationId

On Mon, Dec 14, 2015 at 2:33 PM, Jonathan Kelly 
wrote:

> Are you running Spark on YARN? If so, you can get to the Spark UI via the
> YARN ResourceManager. Each running Spark application will have a link on
> the YARN ResourceManager labeled "ApplicationMaster". If you click that, it
> will take you to the Spark UI, even if it is running on a slave node in the
> case of yarn-cluster mode. It does this by proxying the Spark UI through
> the YARN Proxy Server on the master node.
>
> For completed applications, the link will be labeled "History" and will
> take you to the Spark History Server (provided you have
> set spark.yarn.historyServer.address in spark-defaults.conf).
>
> As for getting the URL programmatically, the URL using the YARN
> ProxyServer is easy to determine. It's just http:// address>:/proxy/. (e.g.,
> http://ip-10-150-65-11.ec2.internal:20888/proxy/application_1450128858020_0001/)
> Then again, I'm not sure how easy it is to get the YARN application ID for
> a Spark application without parsing the spark-submit logs. Or at least I
> think I remember some other thread where that was mentioned.
>
> ~ Jonathan
>
> On Mon, Dec 14, 2015 at 1:57 PM, Ashish Nigam 
> wrote:
>
>> Hi,
>> I run spark streaming job in cluster mode. This means that driver can run
>> in any data node. And Spark UI can run in any dynamic port.
>> At present, I know about the port by looking at container logs that look
>> something like this -
>>
>> server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:50571
>> INFO util.Utils: Successfully started service 'SparkUI' on port 50571.
>> INFO ui.SparkUI: Started SparkUI at http://xxx:50571
>>
>>
>> Is there any way to know about the UI port automatically using some API?
>>
>> Thanks
>> Ashish
>>
>
>


Database does not exist: (Spark-SQL ===> Hive)

2015-12-14 Thread Gokula Krishnan D
Hello All -


I tried to execute a Spark-Scala Program in order to create a table in HIVE
and faced couple of error so I just tried to execute the "show tables" and
"show databases"

And I have already created a database named "test_db".But I have
encountered the error "Database does not exist"

*Note: I do see couple of posts related to this error but nothing was
helpful for me.*


=
name := "ExploreSBT_V1"

version := "1.0"

scalaVersion := "2.11.5"

libraryDependencies
++=Seq("org.apache.spark"%%"spark-core"%"1.3.0","org.apache.spark"%%"spark-sql"%"1.3.0")
libraryDependencies += "org.apache.spark"%%"spark-hive"%"1.3.0"
=
[image: Inline image 1]

Error: Encountered the following exceptions
:org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution
Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Database
does not exist: test_db
15/12/14 18:49:57 ERROR HiveContext:
==
HIVE FAILURE OUTPUT
==




 OK
FAILED: Execution Error, return code 1 from
org.apache.hadoop.hive.ql.exec.DDLTask. Database does not exist: test_db

==
END HIVE FAILURE OUTPUT
==


Process finished with exit code 0

Thanks & Regards,
Gokula Krishnan* (Gokul)*


Re: Spark Streaming having trouble writing checkpoint

2015-12-14 Thread Robert Towne
I forgot to include the data node logs for this time period:


2015-12-14 00:14:52,836 ERROR org.apache.hadoop.hdfs.server.datanode.DataNode: 
server51:50010:DataXceiver error processing unknown operation  src: 
/127.0.0.1:39442 dst: /127.0.0.1:50010
java.io.EOFException
at java.io.DataInputStream.readShort(DataInputStream.java:315)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.readOp(Receiver.java:58)
at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:212)
at java.lang.Thread.run(Thread.java:745)
2015-12-14 00:14:53,045 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
Receiving BP-480271778-xx.xx.xx.51-1400196196525:blk_2159922337_1086198834 src: 
/xx.xx.xx.137:55050 dest: /xx.xx.xx.199:50010
2015-12-14 00:14:53,052 INFO 
org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: 
/xx.xx.xx.137:55050, dest: /xx.xx.xx.199:50010, bytes: 9053, op: HDFS_WRITE, 
cliID: DFSClient_attempt_1449621009822_1
3413_r_000200_0_-1322271294_1, offset: 0, srvID: 
ac76f2fd-6e53-4254-bbf2-b591ff354784, blockid: 
BP-480271778-xx.xx.xx.51-1400196196525:blk_2159922337_1086198834, duration: 
3921702
2015-12-14 00:14:53,052 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
PacketResponder: 
BP-480271778-xx.xx.xx.51-1400196196525:blk_2159922337_1086198834, 
type=HAS_DOWNSTREAM_IN_PIPELINE terminating
2015-12-14 00:14:53,480 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
Receiving BP-480271778-xx.xx.xx.51-1400196196525:blk_2159922362_1086198859 src: 
/xx.xx.xx.199:47261 dest: /xx.xx.xx.199:50010
2015-12-14 00:14:53,497 INFO 
org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: 
/xx.xx.xx.199:47261, dest: /xx.xx.xx.199:50010, bytes: 1713474, op: HDFS_WRITE, 
cliID: DFSClient_attempt_144962100982
2_13411_r_000453_0_-1417541029_1, offset: 0, srvID: 
ac76f2fd-6e53-4254-bbf2-b591ff354784, blockid: 
BP-480271778-xx.xx.xx.51-1400196196525:blk_2159922362_1086198859, duration: 
12415453
2015-12-14 00:14:53,497 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
PacketResponder: 
BP-480271778-xx.xx.xx.51-1400196196525:blk_2159922362_1086198859, 
type=HAS_DOWNSTREAM_IN_PIPELINE terminating
2015-12-14 00:14:54,106 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
Receiving BP-480271778-xx.xx.xx.51-1400196196525:blk_2159922384_1086198881 src: 
/xx.xx.xx.199:47263 dest: /xx.xx.xx.199:50010
2015-12-14 00:14:54,113 INFO 
org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: 
/xx.xx.xx.199:47263, dest: /xx.xx.xx.199:50010, bytes: 5125, op: HDFS_WRITE, 
cliID: DFSClient_attempt_1449621009822_1
3413_r_000115_0_-1568725798_1, offset: 0, srvID: 
ac76f2fd-6e53-4254-bbf2-b591ff354784, blockid: 
BP-480271778-xx.xx.xx.51-1400196196525:blk_2159922384_1086198881, duration: 
2909067
2015-12-14 00:14:54,113 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
PacketResponder: 
BP-480271778-xx.xx.xx.51-1400196196525:blk_2159922384_1086198881, 
type=HAS_DOWNSTREAM_IN_PIPELINE terminating
2015-12-14 00:14:54,123 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
Receiving BP-480271778-xx.xx.xx.51-1400196196525:blk_2159922386_1086198883 src: 
/xx.xx.xx.199:47265 dest: /xx.xx.xx.199:50010
2015-12-14 00:14:54,128 INFO 
org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: 
/xx.xx.xx.199:47265, dest: /xx.xx.xx.199:50010, bytes: 12076, op: HDFS_WRITE, 
cliID: DFSClient_attempt_1449621009822_
13413_r_000117_0_1573884268_1, offset: 0, srvID: 
ac76f2fd-6e53-4254-bbf2-b591ff354784, blockid: 
BP-480271778-xx.xx.xx.51-1400196196525:blk_2159922386_1086198883, duration: 
2802964
--
2015-12-14 00:15:49,514 INFO 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
 Deleted BP-480271778-xx.xx.xx.51-1400196196525 blk_2159923600_1086200097 file 
/mnt/hdfs/hdfs01/data9/h
dfs/current/BP-480271778-xx.xx.xx.51-1400196196525/current/finalized/subdir189/subdir209/blk_2159923600
2015-12-14 00:15:49,515 INFO 
org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetAsyncDiskService:
 Deleted BP-480271778-xx.xx.xx.51-1400196196525 blk_2159870401_1086146898 file 
/mnt/hdfs/hdfs01/data9/h
dfs/current/BP-480271778-xx.xx.xx.51-1400196196525/current/finalized/subdir189/subdir1/blk_2159870401
2015-12-14 00:15:51,355 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
Receiving BP-480271778-xx.xx.xx.51-1400196196525:blk_2159924711_1086201208 src: 
/xx.xx.xx.199:47478 dest: /xx.xx.xx.199:50010
2015-12-14 00:15:51,361 INFO 
org.apache.hadoop.hdfs.server.datanode.DataNode.clienttrace: src: 
/xx.xx.xx.199:47478, dest: /xx.xx.xx.199:50010, bytes: 10423, op: HDFS_WRITE, 
cliID: DFSClient_attempt_1449621009822_
13412_r_000627_0_-1481089097_1, offset: 0, srvID: 
ac76f2fd-6e53-4254-bbf2-b591ff354784, blockid: 
BP-480271778-xx.xx.xx.51-1400196196525:blk_2159924711_1086201208, duration: 
2415405
2015-12-14 00:15:51,361 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: 
PacketResponder: 
BP-480271778-xx.xx.xx.51-1400196196525:blk_2159924711_1086201208, 

Re: Discover SparkUI port for spark streaming job running in cluster mode

2015-12-14 Thread Jonathan Kelly
Oh, nice, I did not know about that property. Thanks!

On Mon, Dec 14, 2015 at 4:28 PM, Ted Yu  wrote:

> w.r.t. getting application Id, please take a look at the following
> in SparkContext :
>
>   /**
>* A unique identifier for the Spark application.
>* Its format depends on the scheduler implementation.
>* (i.e.
>*  in case of local spark app something like 'local-1433865536131'
>*  in case of YARN something like 'application_1433865536131_34483'
>* )
>*/
>   def applicationId: String = _applicationId
>
> On Mon, Dec 14, 2015 at 2:33 PM, Jonathan Kelly 
> wrote:
>
>> Are you running Spark on YARN? If so, you can get to the Spark UI via the
>> YARN ResourceManager. Each running Spark application will have a link on
>> the YARN ResourceManager labeled "ApplicationMaster". If you click that, it
>> will take you to the Spark UI, even if it is running on a slave node in the
>> case of yarn-cluster mode. It does this by proxying the Spark UI through
>> the YARN Proxy Server on the master node.
>>
>> For completed applications, the link will be labeled "History" and will
>> take you to the Spark History Server (provided you have
>> set spark.yarn.historyServer.address in spark-defaults.conf).
>>
>> As for getting the URL programmatically, the URL using the YARN
>> ProxyServer is easy to determine. It's just http://> address>:/proxy/. (e.g.,
>> http://ip-10-150-65-11.ec2.internal:20888/proxy/application_1450128858020_0001/)
>> Then again, I'm not sure how easy it is to get the YARN application ID for
>> a Spark application without parsing the spark-submit logs. Or at least I
>> think I remember some other thread where that was mentioned.
>>
>> ~ Jonathan
>>
>> On Mon, Dec 14, 2015 at 1:57 PM, Ashish Nigam 
>> wrote:
>>
>>> Hi,
>>> I run spark streaming job in cluster mode. This means that driver can
>>> run in any data node. And Spark UI can run in any dynamic port.
>>> At present, I know about the port by looking at container logs that look
>>> something like this -
>>>
>>> server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:50571
>>> INFO util.Utils: Successfully started service 'SparkUI' on port 50571.
>>> INFO ui.SparkUI: Started SparkUI at http://xxx:50571
>>>
>>>
>>> Is there any way to know about the UI port automatically using some API?
>>>
>>> Thanks
>>> Ashish
>>>
>>
>>
>


Spark Streaming having trouble writing checkpoint

2015-12-14 Thread Robert Towne
I have a Spark Streaming app (1.5.2 compile for hadoop 2.6) that occasionally 
has problem writing its checkpoint file.  This is a YARN (yarn cluster) app 
running as user mapred.

What I see in my streaming app logs are:
App log
App log
15/12/15 00:14:08   server51: 
/mnt/hdfs/hdfs01/data4/yarn-log/application_1450104983109_0273/container_1450104983109_0273_01_04/stderr:15/12/15
 00:14:08 INFO hdfs.DFSClient: Could not complete 
/user/hadoop/yarn-app/session_svc/checkpoint/420fb269-490f-41fe-94a9-7c60be7a0931/rdd-665/.part-00022-attempt-0
 retrying...
15/12/15 00:14:20   server51: 
/mnt/hdfs/hdfs01/data4/yarn-log/application_1450104983109_0273/container_1450104983109_0273_01_04/stderr:15/12/15
 00:14:20 INFO hdfs.DFSClient: Could not complete 
/user/hadoop/yarn-app/session_svc/checkpoint/420fb269-490f-41fe-94a9-7c60be7a0931/rdd-665/.part-00025-attempt-0
 retrying...
15/12/15 00:14:24   server51: 
/mnt/hdfs/hdfs01/data4/yarn-log/application_1450104983109_0273/container_1450104983109_0273_01_04/stderr:15/12/15
 00:14:24 INFO hdfs.DFSClient: Could not complete 
/user/hadoop/yarn-app/session_svc/checkpoint/420fb269-490f-41fe-94a9-7c60be7a0931/rdd-665/.part-00025-attempt-0
 retrying...
15/12/15 00:14:50   server51: 
/mnt/hdfs/hdfs01/data4/yarn-log/application_1450104983109_0273/container_1450104983109_0273_01_04/stderr:15/12/15
 00:14:50 INFO hdfs.DFSClient: Could not complete 
/user/hadoop/yarn-app/session_svc/checkpoint/420fb269-490f-41fe-94a9-7c60be7a0931/rdd-665/.part-00046-attempt-0
 retrying...
15/12/15 00:14:55   server51: 
/mnt/hdfs/hdfs01/data4/yarn-log/application_1450104983109_0273/container_1450104983109_0273_01_04/stderr:15/12/15
 00:14:55 INFO hdfs.DFSClient: Could not complete 
/user/hadoop/yarn-app/session_svc/checkpoint/420fb269-490f-41fe-94a9-7c60be7a0931/rdd-665/.part-00046-attempt-0
 retrying…


server51 = xx.xx.xx.199

In the name node logs during that same time, I see a bunch of these messages:

[2015-12-15T00:14:08.433Z,INFO,org.apache.hadoop.hdfs.server.namenode.FSNamesystem:,BLOCK*
 checkFileProgress: blk_2161610531_1087887542{blockUCState=COMMITTED, 
truncateBlock=null, primaryNodeIndex=-1, 
replicas=[ReplicaUnderConstruction[[DISK]DS-45109dfc-c3e5-4fd7-a6de-9cec1db8e301:NORMAL:xx.xx.xx.138:50010|RBW],
 
ReplicaUnderConstruction[[DISK]DS-61c84f08-e038-442d-8f64-238a9bbd8803:NORMAL:xx.xx.xx.199:50010|RBW],
 
ReplicaUnderConstruction[[DISK]DS-bc8b67eb-2715-4300-bc44-7ba5b54e80c8:NORMAL:xx.xx.xx.119:50010|RBW]]}
 has not reached minimal replication 1]

If I filter those messages out, the remaining messages during that time frame 
from xx.xx.xx.199 are:

[2015-12-15T00:00:08.449Z,INFO,org.apache.hadoop.ipc.Server:,IPC Server handler 
17 on 8020: skipped org.apache.hadoop.hdfs.protocol.ClientProtocol.renewLease 
from xx.xx.xx.199:46602 Call#2746 Retry#0]
[2015-12-15T00:10:29.609Z,INFO,org.apache.hadoop.ipc.Server:,IPC Server handler 
47 on 8020, call org.apache.hadoop.hdfs.protocol.ClientProtocol.getListing from 
xx.xx.xx.199:59576 Call#38454 Retry#0: 
org.apache.hadoop.security.AccessControlException: Permission denied: 
user=yarn, access=EXECUTE, inode="/app-logs/mapred/logs":mapred:hdfs:drwxrwx---]
[2015-12-15T00:13:24.556Z,INFO,org.apache.hadoop.ipc.Server:,Socket Reader #1 
for port 8020: readAndProcess from client xx.xx.xx.199 threw exception 
[java.io.IOException: Connection reset by peer]]
[2015-12-15T00:13:24.673Z,INFO,org.apache.hadoop.ipc.Server:,IPC Server handler 
75 on 8020: skipped org.apache.hadoop.hdfs.protocol.ClientProtocol.getFileInfo 
from xx.xx.xx.199:59984 Call#4765 Retry#0]
[2015-12-15T00:13:24.683Z,INFO,org.apache.hadoop.ipc.Server:,IPC Server handler 
70 on 8020: skipped 
org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations from 
xx.xx.xx.199:59984 Call#4766 Retry#0]
[2015-12-15T00:13:24.699Z,INFO,org.apache.hadoop.ipc.Server:,IPC Server handler 
64 on 8020: skipped 
org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations from 
xx.xx.xx.199:59984 Call#4767 Retry#0]
[2015-12-15T00:13:24.721Z,INFO,org.apache.hadoop.ipc.Server:,IPC Server handler 
57 on 8020: skipped 
org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations from 
xx.xx.xx.199:59984 Call#4768 Retry#0]
[2015-12-15T00:13:42.641Z,INFO,org.apache.hadoop.ipc.Server:,Socket Reader #1 
for port 8020: readAndProcess from client xx.xx.xx.199 threw exception 
[java.io.IOException: Connection reset by peer]]

The user mapred can see the directory - but it is an empty directory (e.g., 
hdfs dfs -ls /app–logs/mapred/logs/application_1450104983109_0273/  returns 
nothing)
The message about execute permission denied seems off, I am wondering if some 
configuration is dialed down too low and maybe the permission error is just a 
red herring.


Has anyone else run into a problem where the checkpoint has IOExceptions 
running as a YARN app?

Thanks,
Robert


Mllib Word2Vec vector representations are very high in value

2015-12-14 Thread jxieeducation
Hi,

For Word2Vec in Mllib, when I use a large number of partitions (e.g. 256),
my vectors turn out to be very large. I am looking for a representation that
is between (-1, 1) like all other Word2Vec implementations (e.g. Gensim,
google's Word2Vec). 

E.g. 

scala> var m = model.transform("SOMETHING")

m: org.apache.spark.mllib.linalg.Vector =
[1.61478590965271,-13.385428428649902,-19.518991470336914,12.05420970916748,-6.141440391540527...]



Thanks so much!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Mllib-Word2Vec-vector-representations-are-very-high-in-value-tp25702.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark streaming driver java process RSS memory constantly increasing using cassandra driver

2015-12-14 Thread Conor Fennell
Just bumping the issue I am having, if anyone can provide direction? I
have been stuck on this for a while now.

Thanks,
Conor

On Fri, Dec 11, 2015 at 5:10 PM, Conor Fennell  wrote:
> Hi,
>
> I have a memory leak in the spark driver which is not in the heap or
> the non-heap.
> Even though neither of these are increasing, the java process RSS
> memory is and eventually takes up all the memory on the machine.
> I am using Spark 1.5.2 and the spark-cassandra-connector 1.5.0-M2.
>
> I have reduced the leak to the code below.
> If I remove cassandra from the code below the memory leak does not happen.
> Can someone please explain why this is happening or what I can do to
> further investigate it.
> I have also include pics from jconsole for a couple of hours and
> datadog showing the same timeframe the rss memory increase.
>
> Thanks,
> Conor
>
> val ssc = new StreamingContext(sparkConf,
> Seconds(SparkStreamingBatchInterval))
>
> ssc.checkpoint(HdfsNameNodeUriPath)
>
> val kafkaParams = Map[String, String](METADATA_BROKER_LIST ->
> MetadataBrokerList)
>
> var kafkaMessages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams,
> KafkaTopics.split(DELIMITER).toSet)
>
> var eventBuckets = kafkaMessages.map(keyMessage => {
> implicit val formats = DefaultFormats.lossless
> val eventBucket = parse(keyMessage._2)
> val minute = new Date((eventBucket \ MINUTE).extract[Long])
> val business = (eventBucket \ BUSINESS).extract[String]
> val account = (eventBucket \ ACCOUNT).extract[String]
> (minute, business, account)})
>
>   var eventsToBeProcessed = eventBuckets.transform(rdd =>
> rdd.joinWithCassandraTable("analytics_events" + '_' +
> settings.JobEnvironment, "events", SomeColumns(MINUTE, BUSINESS,
> ACCOUNT, EVENT, JSON), SomeColumns(MINUTE, BUSINESS,
> ACCOUNT)).filter(entry => {
> //remove any entries without a result
> entry._2.length > 0
>   }))
>
>   eventsToBeProcessed.foreachRDD(rdd => {
> println(rdd.take(1))
>   })
>
> sys.ShutdownHookThread {
>   System.err.println(s"Gracefully stopping $JobName Spark
> Streaming Application")
>   ssc.stop(stopSparkContext = true, stopGracefully = true)
>   System.err.println(s"$JobName streaming job stopped")
> }
>
> ssc.start()
> ssc.awaitTermination()

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RuntimeException: Failed to check null bit for primitive int type

2015-12-14 Thread zml张明磊
Hi,

 My spark version is spark-1.4.1-bin-hadoop2.6. When I submit a spark 
job and read data from hive table. Getting the following error. Although it’s 
just a WARN. But it’s leading to the job failure.
Maybe the following jira has solved. So, I am confusing.  
https://issues.apache.org/jira/browse/SPARK-3004

15/12/14 19:21:39 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 40.0 
(TID 1255, minglei): java.lang.RuntimeException: Failed to check null bit for 
primitive int value.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(rows.scala:82)
at com.ctrip.ml.toolimpl.MetadataImpl$$anonfun$1.apply(MetadataImpl.scala:22)
at com.ctrip.ml.toolimpl.MetadataImpl$$anonfun$1.apply(MetadataImpl.scala:22)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:658)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at 
org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1338)
at 
org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1335)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745
15/12/14 19:21:39 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 40.0 
(TID 1255, minglei): java.lang.RuntimeException: Failed to check null bit for 
primitive int value.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(rows.scala:82)
at com.ctrip.ml.toolimpl.MetadataImpl$$anonfun$1.apply(MetadataImpl.scala:22)
at com.ctrip.ml.toolimpl.MetadataImpl$$anonfun$1.apply(MetadataImpl.scala:22)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:658)
at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
at 
org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1338)
at 
org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1335)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)





[SparkR] Is rdd in SparkR deprecated ?

2015-12-14 Thread Jeff Zhang
>From the source code of SparkR, seems SparkR support rdd api. But there's
no documentation on that. ( http://spark.apache.org/docs/latest/sparkr.html
) So I guess it is deprecated, is that right ?

-- 
Best Regards

Jeff Zhang


Re: Autoscaling of Spark YARN cluster

2015-12-14 Thread Deepak Sharma
An approach I can think of  is using Ambari Metrics Service(AMS)
Using these metrics , you can decide upon if the cluster is low in
resources.
If yes, call the Ambari management API to add the node to the cluster.

Thanks
Deepak

On Mon, Dec 14, 2015 at 2:48 PM, cs user  wrote:

> Hi Mingyu,
>
> I'd be interested in hearing about anything else you find which might meet
> your needs for this.
>
> One way perhaps this could be done would be to use Ambari. Ambari comes
> with a nice api which you can use to add additional nodes into a cluster:
>
>
> https://github.com/apache/ambari/blob/trunk/ambari-server/docs/api/v1/index.md
>
> Once the node has been built, the ambari agent installed, you can then
> call back to the management node via the api, tell it what you want the new
> node to be, and it will connect, configure your new node and add it to the
> cluster.
>
> You could create a host group within the cluster blueprint with the
> minimal components you need to install to have it operate as a yarn node.
>
> As for the decision to scale, that is outside of the remit of Ambari. I
> guess you could look into using aws autoscaling or you could look into a
> product called scalr, which has an opensource version. We are using this to
> install an ambari cluster using chef to configure the nodes up until the
> point it hands over to Ambari.
>
> Scalr allows you to write custom scaling metrics which you could use to
> query the # of applications queued, # of resources available values and
> add nodes when required.
>
> Cheers!
>
> On Mon, Dec 14, 2015 at 8:57 AM, Mingyu Kim  wrote:
>
>> Hi all,
>>
>> Has anyone tried out autoscaling Spark YARN cluster on a public cloud
>> (e.g. EC2) based on workload? To be clear, I’m interested in scaling the
>> cluster itself up and down by adding and removing YARN nodes based on the
>> cluster resource utilization (e.g. # of applications queued, # of resources
>> available), as opposed to scaling resources assigned to Spark applications,
>> which is natively supported by Spark’s dynamic resource scheduling. I’ve
>> found that Cloudbreak
>>  has
>> a similar feature, but it’s in “technical preview”, and I didn’t find much
>> else from my search.
>>
>> This might be a general YARN question, but wanted to check if there’s a
>> solution popular in the Spark community. Any sharing of experience around
>> autoscaling will be helpful!
>>
>> Thanks,
>> Mingyu
>>
>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


manipulate schema inside a repeated column

2015-12-14 Thread Samuel
Hi,

I ma having issues trying to rename or move subcolumns when they are insdie
a repeated structure.

Given a certain schema, I can create a different layout to provide an
alternative view. For exaple, I can move one column and put it inside a
subcolumn, and add an extra literal field, just for fun:

import org.apache.spark.sql.{DataFrame, Column}
import org.apache.spark.sql.functions
import sqlContext.implicits._

case class Level0ArrayStruct(
  level_0_array_a: String,
  level_0_array_b: Int)

case class Level1ArrayStruct(
  level_1_array_a: String,
  level_1_array_b: Int)

case class Level1Struct(
  level_1_a: String,
  level_1_b: Int)

case class Level0Struct(
  level_0_a: String,
  level_0_b: Int,
  level_0_array: Seq[Level0ArrayStruct],
  level_0_struct: Level1Struct)

val example = sc.parallelize(
  Seq(Level0Struct(
"level 0 a", 0,
Seq(
  Level0ArrayStruct("level 0 array a 1", 1),
  Level0ArrayStruct("level 0 array a 2", 2)),
Level1Struct(
  "level 1 a", 3.toDF


*scala> example.printSchema*root
 |-- level_0_a: string (nullable = true)
 |-- level_0_b: integer (nullable = false)
 |-- level_0_array: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- level_0_array_a: string (nullable = true)
 |||-- level_0_array_b: integer (nullable = false)
 |-- level_0_struct: struct (nullable = true)
 ||-- level_1_a: string (nullable = true)
 ||-- level_1_b: integer (nullable = false)


*scala> example.withColumn("level_0_struct",
functions.struct($"level_0_struct.level_1_a", $"level_0_struct.level_1_b",
$"level_0_b",
functions.lit("foo").as("foo"))).drop("level_0_b").printSchema*root
 |-- level_0_a: string (nullable = true)
 |-- level_0_array: array (nullable = true)
 ||-- element: struct (containsNull = true)
 |||-- level_0_array_a: string (nullable = true)
 |||-- level_0_array_b: integer (nullable = false)
 |-- level_0_struct: struct (nullable = false)
 ||-- level_1_a: string (nullable = true)
 ||-- level_1_b: integer (nullable = true)

* ||-- level_0_b: integer (nullable = false) ||-- foo: string
(nullable = false)*

However, I don't find a way to reliably deal with the struct inside
level_0_array.
If I try to move any of its fields to anywhere (including that
array column) they become an array column themselves, and I don't know how
to
reassemble ("zip") them together in a struct.

Say I want to add the same literal "foo", but this time inside level_0_array
,
for all the rows there. The resulting schema would be:

scala> example.withColumn("level_0_array",
functions.struct($"level_0_array.level_0_array_a",
$"level_0_array.level_0_array_b",
functions.lit("foo").as("foo"))).printSchema
root
 |-- level_0_a: string (nullable = true)
 |-- level_0_b: integer (nullable = false)
 |-- level_0_array: struct (nullable = false)





* ||-- level_0_array_a: array (nullable = true) |||-- element:
string (containsNull = true) ||-- level_0_array_b: array (nullable =
true) |||-- element: integer (containsNull = true) ||-- foo:
string (nullable = false)* |-- level_0_struct: struct (nullable = true)
 ||-- level_1_a: string (nullable = true)
 ||-- level_1_b: integer (nullable = false)

The same problem applies if I tried to rename the fields, they become array
columns.

Is there any way to recursively manipulate repeated columns without
completely breaking their structure into individually repeated fields?

Best
-- 
Samuel


Kryo serialization fails when using SparkSQL and HiveContext

2015-12-14 Thread Linh M. Tran
Hi everyone,
I'm using HiveContext and SparkSQL to query a Hive table and doing join
operation on it.
After changing the default serializer to Kryo with
spark.kryo.registrationRequired = true, the Spark application failed with
the following error:

java.lang.IllegalArgumentException: Class is not registered:
org.apache.spark.sql.catalyst.expressions.GenericRow
Note: To register this class use:
kryo.register(org.apache.spark.sql.catalyst.expressions.GenericRow.class);
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
at
com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at
org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:124)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:204)
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I'm using Spark 1.3.1 (HDP 2.3.0) and submitting Spark application to Yarn
in cluster mode.
Any help is appreciated.
-- 
Linh M. Tran


Re: Unsubsribe

2015-12-14 Thread Akhil Das
Send an email to user-unsubscr...@spark.apache.org to unsubscribe from the
list. See more over http://spark.apache.org/community.html

Thanks
Best Regards

2015-12-09 22:18 GMT+05:30 Michael Nolting :

> cancel
>
> --
> 
> *Michael Nolting*
> Head of Sevenval FDX
> 
> *Sevenval Technologies GmbH *
>
> FRONT-END-EXPERTS SINCE 1999
>
> Köpenicker Straße 154 | 10997 Berlin
>
> office   +49 30 707 190 - 278
> mail michael.nolt...@sevenval.com 
>
> www.sevenval.com
>
> Sitz: Köln, HRB 79823
> Geschäftsführung: Jan Webering (CEO), Thorsten May, Joern-Carlos Kuntze
>
> *Wir erhöhen den Return On Investment bei Ihren Mobile und Web-Projekten.
> Sprechen Sie uns an: *http://roi.sevenval.com/
>
> ---
> FOLLOW US on
>
> [image: Sevenval blog]
> 
>
> [image: sevenval on twitter]
> 
>  [image: sevenval on linkedin]
> [image:
> sevenval on pinterest]
> 
>
>
>


Re: Autoscaling of Spark YARN cluster

2015-12-14 Thread cs user
Hi Mingyu,

I'd be interested in hearing about anything else you find which might meet
your needs for this.

One way perhaps this could be done would be to use Ambari. Ambari comes
with a nice api which you can use to add additional nodes into a cluster:

https://github.com/apache/ambari/blob/trunk/ambari-server/docs/api/v1/index.md

Once the node has been built, the ambari agent installed, you can then call
back to the management node via the api, tell it what you want the new node
to be, and it will connect, configure your new node and add it to the
cluster.

You could create a host group within the cluster blueprint with the minimal
components you need to install to have it operate as a yarn node.

As for the decision to scale, that is outside of the remit of Ambari. I
guess you could look into using aws autoscaling or you could look into a
product called scalr, which has an opensource version. We are using this to
install an ambari cluster using chef to configure the nodes up until the
point it hands over to Ambari.

Scalr allows you to write custom scaling metrics which you could use to
query the # of applications queued, # of resources available values and add
nodes when required.

Cheers!

On Mon, Dec 14, 2015 at 8:57 AM, Mingyu Kim  wrote:

> Hi all,
>
> Has anyone tried out autoscaling Spark YARN cluster on a public cloud
> (e.g. EC2) based on workload? To be clear, I’m interested in scaling the
> cluster itself up and down by adding and removing YARN nodes based on the
> cluster resource utilization (e.g. # of applications queued, # of resources
> available), as opposed to scaling resources assigned to Spark applications,
> which is natively supported by Spark’s dynamic resource scheduling. I’ve
> found that Cloudbreak
>  has
> a similar feature, but it’s in “technical preview”, and I didn’t find much
> else from my search.
>
> This might be a general YARN question, but wanted to check if there’s a
> solution popular in the Spark community. Any sharing of experience around
> autoscaling will be helpful!
>
> Thanks,
> Mingyu
>


Spark parallelism with mapToPair

2015-12-14 Thread Rabin Banerjee
Hi Team,

I am new to spark Streaming , I am trying to write a spark Streaming 
application , where the Calculation of incoming data will be performed in "R" 
in the micro batching .

But I want to make wordCounts.mapToPair parallel where wordCounts is the output 
of groupByKey, How can I ensure that, wordCounts.mapToPair will be all parallel 
, so that RUtilMethods.sum(inputToR)) will be invoked parallel.
How to ensure the above parallelism ?
Note: I can not use reduceByKey or combineByKey as calling R multiple time 
would be significant overhead .
Thanks

Code Sample
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) {


// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
sparkConf.setMaster("local[4]");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
Durations.seconds(10));
// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream lines = ssc.socketTextStream("localhost", 
1,
StorageLevels.MEMORY_AND_DISK_SER);

JavaDStream words = lines.flatMap(new FlatMapFunction() {
@Override
public Iterable call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream wordCounts = words.mapToPair(new 
PairFunction() {
@Override
public Tuple2 call(String s) {
return new Tuple2(s, 1);
}
}).groupByKeyAndWindow(Durations.seconds(60));
JavaPairDStream wordCounts1=wordCounts.mapToPair(new 
PairFunction, String, Integer>() {

@Override
public Tuple2 call(Tuple2 
data) throws Exception {
// TODO Auto-generated method stub
List it=IteratorUtils.toList(data._2.iterator());
int[] inputToR = ArrayUtils.toPrimitive(it.toArray(new Integer[0]));
it = null;
Runtime.getRuntime().gc();
return new Tuple2(data._1, 
RUtilMethods.sum(inputToR));
}
});

wordCounts1.print();
ssc.start();
ssc.awaitTermination();
}}
///



Run ad-hoc queries at runtime against cached RDDs

2015-12-14 Thread Krishna Rao
Hi all,

What's the best way to run ad-hoc queries against a cached RDDs?

For example, say I have an RDD that has been processed, and persisted to
memory-only. I want to be able to run a count (actually
"countApproxDistinct") after filtering by an, at compile time, unknown
(specified by query) value.

I've experimented with using (abusing) Spark Streaming, by streaming
queries and running these against the cached RDD. However, as I say I don't
think that this is an intended use-case of Streaming.

Cheers,

Krishna


Re: Can't filter

2015-12-14 Thread Akhil Das
If you are not using Spark submit to run the job, then you need to add the
following line:

sc.addJar("target/scala_2.11/spark.jar")


After creating the SparkContext, where the spark.jar is your project jar.

Thanks
Best Regards

On Thu, Dec 10, 2015 at 5:29 PM, Бобров Виктор  wrote:

> Build.sbt
>
>
>
> *name *:=
>
> *"spark"**version *:=
>
> *"1.0"**scalaVersion *:= *"2.11.7"*
>
>
>
> libraryDependencies ++= *Seq*(
>   *"org.apache.spark"  *% *"spark-core_2.11"  *% *"1.5.1"*,
>   *"org.apache.spark"  *% *"spark-streaming_2.11" *% *"1.5.1"*,
>   *"org.apache.spark"  *% *"spark-mllib_2.11" *% *"1.5.1"*
>
>
>
>
>
> *From:* Бобров Виктор [mailto:ma...@bk.ru]
> *Sent:* Thursday, December 10, 2015 2:54 PM
> *To:* 'Harsh J' 
> *Cc:* user@spark.apache.org
> *Subject:* RE: Can't filter
>
>
>
> Spark – 1.5.1, ty for help.
>
>
>
> *import *org.apache.spark.SparkContext
> *import *org.apache.spark.SparkContext._
> *import *org.apache.spark.SparkConf
> *import *scala.io.Source
>
>
> *object *SimpleApp {
> *def *main(args: Array[String]) {
> *var *A = scala.collection.mutable.Map[Array[String], Int]()
> *val *filename = *"C:**\\**Users**\\**bobrov**\\**IdeaProjects**\\*
> *spark**\\**file**\\*
> *spark1.txt"**for*((line, i) <- Source.*fromFile*
> (filename).getLines().zipWithIndex){
>   *val *lst = line.split(*" "*)
>   A += (lst -> i)
> }
>
> *def *filter1(tp: ((Array[String], Int), (Array[String], Int))):
> Boolean= {
>   tp._1._2 < tp._2._2
>   }
>
> *val *conf = *new *SparkConf().setMaster(*"spark://web01:7077"*
> ).setAppName(*"Simple Application"*)
> *val *sc = *new *SparkContext(conf)
> *val *mail_rdd = sc.parallelize(A.toSeq).cache()
> *val *step1 = mail_rdd.cartesian(mail_rdd)
> *val *step2 = step1.filter(filter1)
> //step1.collect().foreach(*println*)
>   }
> }
>
>
>
> *From:* Harsh J [mailto:ha...@cloudera.com ]
> *Sent:* Thursday, December 10, 2015 2:50 PM
> *To:* Бобров Виктор ; Ndjido Ardo Bar 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Can't filter
>
>
>
> Are you sure you do not have any messages preceding the trace, such as one
> quoting which class is found to be missing? That'd be helpful to see and
> suggest what may (exactly) be going wrong. It appear similar to
> https://issues.apache.org/jira/browse/SPARK-8368, but I cannot tell for
> certain cause I don't know if your code uses the SparkSQL features.
>
>
>
> Also, what version is your Spark running?
>
>
>
> I am able to run your program without a problem in Spark 1.5.x (with a
> sample Seq).
>
>
>
> On Thu, Dec 10, 2015 at 5:01 PM Бобров Виктор  wrote:
>
> 0 = {StackTraceElement@7132}
> "com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.a(Unknown
> Source)"
>
> 1 = {StackTraceElement@7133}
> "com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader.(Unknown
> Source)"
>
> 2 = {StackTraceElement@7134}
> "org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:40)"
>
> 3 = {StackTraceElement@7135}
> "org.apache.spark.util.ClosureCleaner$.getInnerClosureClasses(ClosureCleaner.scala:81)"
>
> 4 = {StackTraceElement@7136}
> "org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:187)"
>
> 5 = {StackTraceElement@7137}
> "org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)"
>
> 6 = {StackTraceElement@7138}
> "org.apache.spark.SparkContext.clean(SparkContext.scala:2030)"
>
> 7 = {StackTraceElement@7139}
> "org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:331)"
>
> 8 = {StackTraceElement@7140}
> "org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:330)"
>
> 9 = {StackTraceElement@7141}
> "org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)"
>
> 10 = {StackTraceElement@7142}
> "org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)"
>
> 11 = {StackTraceElement@7143}
> "org.apache.spark.rdd.RDD.withScope(RDD.scala:306)"
>
> 12 = {StackTraceElement@7144}
> "org.apache.spark.rdd.RDD.filter(RDD.scala:330)"
>
> 13 = {StackTraceElement@7145}
> "SimpleApp$GeneratedEvaluatorClass$44$1.invoke(FileToCompile0.scala:30)"
>
> 14 = {StackTraceElement@7146} "SimpleApp$.main(test1.scala:26)"
>
> 15 = {StackTraceElement@7147} "SimpleApp.main(test1.scala)"
>
>
>
> *From:* Ndjido Ardo Bar [mailto:ndj...@gmail.com]
> *Sent:* Thursday, December 10, 2015 2:20 PM
> *To:* Бобров Виктор 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Can't filter
>
>
>
> Please send your call stack with the full description of the exception .
>
>
> On 10 Dec 2015, at 12:10, Бобров Виктор  wrote:
>
> Hi, I can’t filter my rdd.
>
>
>
> *def *filter1(tp: ((Array[String], Int), (Array[String], Int))): Boolean=
> {
>   tp._1._2 > tp._2._2
> }
> *val *mail_rdd = sc.parallelize(A.toSeq).cache()
> *val 

Re: How to change StreamingContext batch duration after loading from checkpoint

2015-12-14 Thread Akhil Das
Taking the values from a configuration file rather hard-coding in the code
might help, haven't tried it though.

Thanks
Best Regards

On Mon, Dec 7, 2015 at 9:53 PM, yam  wrote:

> Is there a way to change the streaming context batch interval after
> reloading
> from checkpoint?
>
> I would like to be able to change the batch interval after restarting the
> application without loosing the checkpoint of course.
>
> Thanks!
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-change-StreamingContext-batch-duration-after-loading-from-checkpoint-tp25624.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-14 Thread alberskib
Hey all,

When my streaming application is restarting from failure (from checkpoint) I
am receiving strange error:

java.lang.ClassCastException:
org.apache.spark.util.SerializableConfiguration cannot be cast to
com.example.sender.MyClassReporter.

Instance of B class is created on driver side (with proper config passed as
constructor arg) and broadcasted to the executors in order to ensure that on
each worker there will be only single instance. Everything is going well up
to place where I am getting value of broadcasted field and executing
function on it i.e.
broadcastedValue.value.send(...)

Below you can find definition of MyClassReporter (with trait):

trait Reporter{
  def send(name: String, value: String, timestamp: Long) : Unit
  def flush() : Unit
}

class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter
with Serializable {

  val prefix = s"${config.senderConfig.prefix}.$flow"

  var counter = 0

  @transient
  private lazy val sender : GraphiteSender = initialize()

  @transient
  private lazy val threadPool =
ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())

  private def initialize() = {
  val sender = new Sender(
new InetSocketAddress(config.senderConfig.hostname,
config.senderConfig.port)
  )
  sys.addShutdownHook{
sender.close()
  }
  sender
  }

  override def send(name: String, value: String, timestamp: Long) : Unit = {
threadPool.submit(new Runnable {
  override def run(): Unit = {
try {
  counter += 1
  if (!sender.isConnected)
sender.connect()
  sender.send(s"$prefix.$name", value, timestamp)
  if (counter % graphiteConfig.batchSize == 0)
sender.flush()
}catch {
  case NonFatal(e) => {
println(s"Problem with sending metric to graphite $prefix.$name:
$value at $timestamp: ${e.getMessage}", e)
Try{sender.close()}.recover{
  case NonFatal(e) => println(s"Error closing graphite
${e.getMessage}", e)
}
  }
}
  }
})
  }

Do you have any idea how I can solve this issue? Using broadcasted variable
helps me keeping single socket open to the service on executor.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark streaming driver java process RSS memory constantly increasing using cassandra driver

2015-12-14 Thread Singh, Abhijeet
Ok, I get it now. You might be right it is possible the Cassandra driver might 
be leaking some memory. The driver might have some open sockets or stuff like 
that.

However if it is a native memory leak issue I would suggest try an alternative 
driver or proof that this is indeed the problem unless of course you want to 
spend some worthy hours trying to search it yourself. In general a native leak 
is very difficult to find in the JVM. I wrote a blog some time ago about it but 
I am never too much into writing. 
http://apachegeek.blogspot.in/2015/04/fix-c-memory-leak.html?spref=bl Try this 
link it might give some starting points if you are a newbie. You might already 
know this otherwise.

Thanks,
Abhijeet

-Original Message-
From: Conor Fennell [mailto:conorapa...@gmail.com] 
Sent: Monday, December 14, 2015 8:29 PM
To: Singh, Abhijeet
Cc: user@spark.apache.org
Subject: Re: Spark streaming driver java process RSS memory constantly 
increasing using cassandra driver

Hi Abhijeet,

Thanks for pointing out the pics are not showing.
I have put all the images in this public google document:
https://docs.google.com/document/d/1xEJ0eTtXBlSso6SshLCWZHcRw4aEQMJflzKBsr2FHaw/edit?usp=sharing

All the code is in the first email; there is nothing else starting up threads 
except the 80 or so threads Spark start up.
By the heap I mean: CMS Old Gen, Par Eden Space and Par Survivor Space.
By the non-heap I mean: Code Cache and CMS Perm Gen.
Which is the JVM memory space.

In the document you will see I am quite aggressive with the JVM options.

It does indicate native memory is leaking, but I am at a loss to properly 
investigate it and it is looking like it is in the cassandra driver itself or a 
combination of how spark is running it and the driver.

It is also happening when running this within a foreachRDD:

  val cluster =
Cluster.builder().addContactPoints(CassandraHostname.split(DELIMITER):
_*).build();
  val session = cluster.connect()
  ranges.foreach(range => {

session.execute(s"INSERT INTO
analytics_metadata_$JobEnvironment.kafka_offsets (topic, job_name, batch_time, 
partition, from_offset, until_offset) VALUES
('${range._1}','${range._2}',${range._3.getTime()},${range._4},${range._5},${range._6})")

  })

  session.close()
  cluster.close()


Thanks,
Conor



On Mon, Dec 14, 2015 at 2:12 PM, Singh, Abhijeet  
wrote:
> Hi Conor,
>
> What do you mean when you say leak is not in "Heap or non-Heap". If it is not 
> heap related than it has to be the native memory that is leaking. I can't say 
> for sure but you do have Threads working there and that could be using the 
> native memory. We didn't get any pics of JConsole.
>
> Thanks.
>
> -Original Message-
> From: Conor Fennell [mailto:conorapa...@gmail.com]
> Sent: Monday, December 14, 2015 4:15 PM
> To: user@spark.apache.org
> Subject: Re: Spark streaming driver java process RSS memory constantly 
> increasing using cassandra driver
>
> Just bumping the issue I am having, if anyone can provide direction? I have 
> been stuck on this for a while now.
>
> Thanks,
> Conor
>
> On Fri, Dec 11, 2015 at 5:10 PM, Conor Fennell  wrote:
>> Hi,
>>
>> I have a memory leak in the spark driver which is not in the heap or 
>> the non-heap.
>> Even though neither of these are increasing, the java process RSS 
>> memory is and eventually takes up all the memory on the machine.
>> I am using Spark 1.5.2 and the spark-cassandra-connector 1.5.0-M2.
>>
>> I have reduced the leak to the code below.
>> If I remove cassandra from the code below the memory leak does not happen.
>> Can someone please explain why this is happening or what I can do to 
>> further investigate it.
>> I have also include pics from jconsole for a couple of hours and 
>> datadog showing the same timeframe the rss memory increase.
>>
>> Thanks,
>> Conor
>>
>> val ssc = new StreamingContext(sparkConf,
>> Seconds(SparkStreamingBatchInterval))
>>
>> ssc.checkpoint(HdfsNameNodeUriPath)
>>
>> val kafkaParams = Map[String, String](METADATA_BROKER_LIST ->
>> MetadataBrokerList)
>>
>> var kafkaMessages = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](ssc, kafkaParams,
>> KafkaTopics.split(DELIMITER).toSet)
>>
>> var eventBuckets = kafkaMessages.map(keyMessage => {
>> implicit val formats = DefaultFormats.lossless
>> val eventBucket = parse(keyMessage._2)
>> val minute = new Date((eventBucket \ MINUTE).extract[Long])
>> val business = (eventBucket \ BUSINESS).extract[String]
>> val account = (eventBucket \ ACCOUNT).extract[String]
>> (minute, business, account)})
>>
>>   var eventsToBeProcessed = eventBuckets.transform(rdd =>
>> rdd.joinWithCassandraTable("analytics_events" + '_' + 
>> settings.JobEnvironment, "events", SomeColumns(MINUTE, BUSINESS, 
>> ACCOUNT, EVENT, JSON), SomeColumns(MINUTE, BUSINESS, 
>> 

Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-14 Thread Ted Yu
Can you show the complete stack trace for the ClassCastException ?

Please see the following thread:
http://search-hadoop.com/m/q3RTtgEUHVmJA1T1

Cheers

On Mon, Dec 14, 2015 at 7:33 AM, alberskib  wrote:

> Hey all,
>
> When my streaming application is restarting from failure (from checkpoint)
> I
> am receiving strange error:
>
> java.lang.ClassCastException:
> org.apache.spark.util.SerializableConfiguration cannot be cast to
> com.example.sender.MyClassReporter.
>
> Instance of B class is created on driver side (with proper config passed as
> constructor arg) and broadcasted to the executors in order to ensure that
> on
> each worker there will be only single instance. Everything is going well up
> to place where I am getting value of broadcasted field and executing
> function on it i.e.
> broadcastedValue.value.send(...)
>
> Below you can find definition of MyClassReporter (with trait):
>
> trait Reporter{
>   def send(name: String, value: String, timestamp: Long) : Unit
>   def flush() : Unit
> }
>
> class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter
> with Serializable {
>
>   val prefix = s"${config.senderConfig.prefix}.$flow"
>
>   var counter = 0
>
>   @transient
>   private lazy val sender : GraphiteSender = initialize()
>
>   @transient
>   private lazy val threadPool =
> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
>
>   private def initialize() = {
>   val sender = new Sender(
> new InetSocketAddress(config.senderConfig.hostname,
> config.senderConfig.port)
>   )
>   sys.addShutdownHook{
> sender.close()
>   }
>   sender
>   }
>
>   override def send(name: String, value: String, timestamp: Long) : Unit =
> {
> threadPool.submit(new Runnable {
>   override def run(): Unit = {
> try {
>   counter += 1
>   if (!sender.isConnected)
> sender.connect()
>   sender.send(s"$prefix.$name", value, timestamp)
>   if (counter % graphiteConfig.batchSize == 0)
> sender.flush()
> }catch {
>   case NonFatal(e) => {
> println(s"Problem with sending metric to graphite
> $prefix.$name:
> $value at $timestamp: ${e.getMessage}", e)
> Try{sender.close()}.recover{
>   case NonFatal(e) => println(s"Error closing graphite
> ${e.getMessage}", e)
> }
>   }
> }
>   }
> })
>   }
>
> Do you have any idea how I can solve this issue? Using broadcasted variable
> helps me keeping single socket open to the service on executor.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Problem using User Defined Predicate pushdown with core RDD and parquet - UDP class not found

2015-12-14 Thread chao chu
+spark user mailing list

Hi there,

I have exactly the same problem as mentioned below. My current work around
is to add the jar containing my UDP in one of the system classpath (for
example, put it under the same path as
/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/parquet-hadoop-bundle-1.5.0-cdh5.4.2.jar)
listed
in "Classpath Entries" of spark executors.

Obviously, the downside is that you have to put the jar locally to every
node of the cluster and it's hard to maintain when the cluster's setup got
updated.

I'd like to hear if anyone has a better solution for this. Thanks a lot!


>
>
> -- Forwarded message --
> From: Vladimir Vladimirov 
> To: d...@spark.apache.org
> Cc:
> Date: Mon, 19 Oct 2015 19:38:07 -0400
> Subject: Problem using User Defined Predicate pushdown with core RDD and
> parquet - UDP class not found
> Hi all
>
> I feel like this questions is more Spark dev related that Spark user
> related. Please correct me if I'm wrong.
>
> My project's data flow involves sampling records from the data stored as
> Parquet dataset.
> I've checked DataFrames API and it doesn't support user defined predicates
> projection pushdown - only simple filter expressions.
> I want to use custom filter function predicate pushdown feature of parquet
> while loading data with newAPIHadoopFile.
> Simple filters constructed with org.apache.parquet.filter2 API works fine.
> But User Defined Predicate works only with `--master local` mode.
>
> When I try to run in yarn-client mode my test program that uses UDP class
> to be used by parquet-mr I'm getting class not found exception.
>
> I suspect that the issue could be related to the way how class loader
> works from parquet or maybe it could be related to the fact that Spark
> executor processes has my jar loaded from HTTP server and there is some
> security policies (classpath shows that the jar URI is actually HTTP URL
> and not local file).
>
> I've tried to create uber jar with all dependencies and shipt it with the
> spark app - no success.
>
> PS I'm using spark 1.5.1.
>
> Here is my command line I'm using to submit the application:
>
> SPARK_CLASSPATH=./lib/my-jar-with-dependencies.jar spark-submit \
> --master yarn-client
> --num-executors 3 --driver-memory 3G --executor-memory 2G \
> --executor-cores 1 \
> --jars
> ./lib/my-jar-with-dependencies.jar,./lib/snappy-java-1.1.2.jar,./lib/parquet-hadoop-1.7.0.jar,./lib/parquet-avro-1.7.0.jar,./lib/parquet-column-1.7.0.jar,/opt/cloudera/parcels/CDH/jars/avro-1.7.6-cdh5.4.0.jar,/opt/cloudera/parcels/CDH/jars/avro-mapred-1.7.6-cdh5.4.0-hadoop2.jar,
> \
> --class my.app.parquet.filters.tools.TestSparkApp \
> ./lib/my-jar-with-dependencies.jar \
> yarn-client \
> "/user/vvlad/2015/*/*/*/EVENTS"
>
> Here is the code of my UDP class:
>
> package my.app.parquet.filters.udp
>
> import org.apache.parquet.filter2.predicate.Statistics
> import org.apache.parquet.filter2.predicate.UserDefinedPredicate
>
>
> import java.lang.{Integer => JInt}
>
> import scala.util.Random
>
> class SampleIntColumn(threshold: Double) extends
> UserDefinedPredicate[JInt] with Serializable {
>   lazy val random = { new Random() }
>   val myThreshold = threshold
>   override def keep(value: JInt): Boolean = {
> random.nextFloat() < myThreshold
>   }
>
>   override def canDrop(statistics: Statistics[JInt]): Boolean = false
>
>   override def inverseCanDrop(statistics: Statistics[JInt]): Boolean =
> false
>
>   override def toString: String = {
> "%s(%f)".format(getClass.getName, myThreshold)
>   }
> }
>
> Spark app:
>
> package my.app.parquet.filters.tools
>
> import my.app.parquet.filters.udp.SampleIntColumn
> import org.apache.avro.generic.GenericRecord
> import org.apache.hadoop.mapreduce.Job
> import org.apache.parquet.avro.AvroReadSupport
> import org.apache.parquet.filter2.dsl.Dsl.IntColumn
> import org.apache.parquet.hadoop.ParquetInputFormat
> import org.apache.spark.{SparkContext, SparkConf}
>
> import org.apache.parquet.filter2.dsl.Dsl._
> import org.apache.parquet.filter2.predicate.FilterPredicate
>
>
> object TestSparkApp {
>   def main (args: Array[String]) {
> val conf = new SparkConf()
>   //"local[2]" or yarn-client etc
>   .setMaster(args(0))
>   .setAppName("Spark Scala App")
>   .set("spark.executor.memory", "1g")
>   .set("spark.rdd.compress", "true")
>   .set("spark.storage.memoryFraction", "1")
>
> val sc = new SparkContext(conf)
>
> val job = new Job(sc.hadoopConfiguration)
> ParquetInputFormat.setReadSupportClass(job,
> classOf[AvroReadSupport[GenericRecord]])
>
> val sampler = new SampleIntColumn(0.05)
> val impField = IntColumn("impression")
>
> val pred: FilterPredicate = impField.filterBy(sampler)
>
> ParquetInputFormat.setFilterPredicate(job.getConfiguration, pred)
>
>
> println(job.getConfiguration.get("parquet.private.read.filter.predicate"))
>
> 

Re: how to make a dataframe of Array[Doubles] ?

2015-12-14 Thread Jeff Zhang
Please use tuple instead of array. ( the element must implement trait
Product if you want to convert RDD to DF)

val testvec = Array( (1.0, 2.0, 3.0, 4.0), (5.0, 6.0, 7.0, 8.0))

On Tue, Dec 15, 2015 at 1:12 PM, AlexG  wrote:

> My attempts to create a dataframe of Array[Doubles], I get an error about
> RDD[Array[Double]] not having a toDF function:
>
> import sqlContext.implicits._
> val testvec = Array( Array(1.0, 2.0, 3.0, 4.0), Array(5.0, 6.0, 7.0, 8.0))
> val testrdd = sc.parallelize(testvec)
> testrdd.toDF
>
> gives
>
> :29: error: value toDF is not a member of
> org.apache.spark.rdd.RDD[Array[Double]]
>   testrdd.toD
>
> on the other hand, if I make the dataframe more complicated, e.g.
> Tuple2[String, Array[Double]], the transformation goes through:
>
> val testvec = Array( ("row 1", Array(1.0, 2.0, 3.0, 4.0)), ("row 2",
> Array(5.0, 6.0, 7.0, 8.0)) )
> val testrdd = sc.parallelize(testvec)
> testrdd.toDF
>
> gives
> testrdd: org.apache.spark.rdd.RDD[(String, Array[Double])] =
> ParallelCollectionRDD[1] at parallelize at :29
> res3: org.apache.spark.sql.DataFrame = [_1: string, _2: array]
>
> What's the cause of this, and how can I get around it to create a dataframe
> of Array[Double]? My end goal is to store that dataframe in Parquet (yes, I
> do want to store all the values in a single column, not individual columns)
>
> I am using Spark 1.5.2
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-dataframe-of-Array-Doubles-tp25704.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best Regards

Jeff Zhang


how to make a dataframe of Array[Doubles] ?

2015-12-14 Thread AlexG
My attempts to create a dataframe of Array[Doubles], I get an error about
RDD[Array[Double]] not having a toDF function:

import sqlContext.implicits._
val testvec = Array( Array(1.0, 2.0, 3.0, 4.0), Array(5.0, 6.0, 7.0, 8.0))
val testrdd = sc.parallelize(testvec)
testrdd.toDF

gives

:29: error: value toDF is not a member of
org.apache.spark.rdd.RDD[Array[Double]]
  testrdd.toD

on the other hand, if I make the dataframe more complicated, e.g.
Tuple2[String, Array[Double]], the transformation goes through:

val testvec = Array( ("row 1", Array(1.0, 2.0, 3.0, 4.0)), ("row 2",
Array(5.0, 6.0, 7.0, 8.0)) )
val testrdd = sc.parallelize(testvec)
testrdd.toDF

gives
testrdd: org.apache.spark.rdd.RDD[(String, Array[Double])] =
ParallelCollectionRDD[1] at parallelize at :29
res3: org.apache.spark.sql.DataFrame = [_1: string, _2: array]

What's the cause of this, and how can I get around it to create a dataframe
of Array[Double]? My end goal is to store that dataframe in Parquet (yes, I
do want to store all the values in a single column, not individual columns)

I am using Spark 1.5.2




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-make-a-dataframe-of-Array-Doubles-tp25704.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Linear Regression with OLS

2015-12-14 Thread Arunkumar Pillai
Hi

I need an exmaple for Linear Regression using OLS

val data = sc.textFile("data/mllib/ridge-data/lpsa.data")
val parsedData = data.map { line =>
  val parts = line.split(',')
  LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split('
').map(_.toDouble)))
}.cache()

// Building the model
val numIterations = 100
val model = LinearRegressionWithSGD.train(parsedData, numIterations)

plese help me in using class LinearRegression


-- 
Thanks and Regards
Arun


mapValues Transformation (JavaPairRDD)

2015-12-14 Thread Sushrut Ikhar
Hi,
I am finding it difficult to understand the following problem :
I count the number of records before and after applying the mapValues
transformation for a JavaPairRDD. As expected the number of records were
same before and after.

Now, I counted number of distinct keys before and after applying the
mapValues transformation for the same JavaPairRDD. However, I get less
count after applying the transformation. I expected mapValues will not
change the keys. Then why am I getting lesser distinct keys? Note that -
the total records are the same only distinct keys have dropped.

using spark-1.4.1.

Thanks in advance.

Regards,

Sushrut Ikhar
[image: https://]about.me/sushrutikhar



Re: Database does not exist: (Spark-SQL ===> Hive)

2015-12-14 Thread Jeff Zhang
Do you put hive-site.xml on the classpath ?

On Tue, Dec 15, 2015 at 11:14 AM, Gokula Krishnan D 
wrote:

> Hello All -
>
>
> I tried to execute a Spark-Scala Program in order to create a table in
> HIVE and faced couple of error so I just tried to execute the "show tables"
> and "show databases"
>
> And I have already created a database named "test_db".But I have
> encountered the error "Database does not exist"
>
> *Note: I do see couple of posts related to this error but nothing was
> helpful for me.*
>
> 
> =
> name := "ExploreSBT_V1"
>
> version := "1.0"
>
> scalaVersion := "2.11.5"
>
> libraryDependencies
> ++=Seq("org.apache.spark"%%"spark-core"%"1.3.0","org.apache.spark"%%"spark-sql"%"1.3.0")
> libraryDependencies += "org.apache.spark"%%"spark-hive"%"1.3.0"
>
> =
> [image: Inline image 1]
>
> Error: Encountered the following exceptions
> :org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution
> Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. Database
> does not exist: test_db
> 15/12/14 18:49:57 ERROR HiveContext:
> ==
> HIVE FAILURE OUTPUT
> ==
>
>
>
>
>OK
> FAILED: Execution Error, return code 1 from
> org.apache.hadoop.hive.ql.exec.DDLTask. Database does not exist: test_db
>
> ==
> END HIVE FAILURE OUTPUT
> ==
>
>
> Process finished with exit code 0
>
> Thanks & Regards,
> Gokula Krishnan* (Gokul)*
>



-- 
Best Regards

Jeff Zhang


what are the cons/drawbacks of a Spark DataFrames

2015-12-14 Thread email2...@gmail.com
Hello All - I've started using the Spark DataFrames and looks like it
provides rich column level operations and functions. 

In the same time, I would like to understand are there any drawbacks / cons
of using a DataFrames?. If so please share your experience on that. 

Thanks, 
Gokul



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/what-are-the-cons-drawbacks-of-a-Spark-DataFrames-tp25703.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HDFS

2015-12-14 Thread Akhil Das
Try to set the spark.locality.wait to a higher number and see if things
change. You can read more about the configuration properties from here
http://spark.apache.org/docs/latest/configuration.html#scheduling

Thanks
Best Regards

On Sat, Dec 12, 2015 at 12:16 AM, shahid ashraf  wrote:

> hi Folks
>
> I am using standalone cluster of 50 servers on aws. i loaded data on hdfs,
>  why i am getting Locality Level as ANY for data on hdfs, i have 900+
> partitions.
>
>
> --
> with Regards
> Shahid Ashraf
>


Unsubscribe

2015-12-14 Thread Roman Garcia
Cancel
Unsubscribe


How do I link JavaEsSpark.saveToEs() to a sparkConf?

2015-12-14 Thread Spark Enthusiast
Folks,
I have the following program :
SparkConf conf = new 
SparkConf().setMaster("local").setAppName("Indexer").set("spark.driver.maxResultSize",
 "2g");conf.set("es.index.auto.create", "true");conf.set("es.nodes", 
"localhost");conf.set("es.port", "9200");conf.set("es.write.operation", 
"index");JavaSparkContext sc = new JavaSparkContext(conf);
          .          .
JavaEsSpark.saveToEs(filteredFields, "foo");

I get an error saying cannot find storage. Looks like the driver program cannot 
the Elastic Search Server. Seeing the program, I have not associated 
JavaEsSpark to the SparkConf. 
Question: How do I associate JavaEsSpark to SparkConf?



RE: Spark streaming driver java process RSS memory constantly increasing using cassandra driver

2015-12-14 Thread Singh, Abhijeet
Hi Conor,

What do you mean when you say leak is not in "Heap or non-Heap". If it is not 
heap related than it has to be the native memory that is leaking. I can't say 
for sure but you do have Threads working there and that could be using the 
native memory. We didn't get any pics of JConsole.

Thanks.

-Original Message-
From: Conor Fennell [mailto:conorapa...@gmail.com] 
Sent: Monday, December 14, 2015 4:15 PM
To: user@spark.apache.org
Subject: Re: Spark streaming driver java process RSS memory constantly 
increasing using cassandra driver

Just bumping the issue I am having, if anyone can provide direction? I have 
been stuck on this for a while now.

Thanks,
Conor

On Fri, Dec 11, 2015 at 5:10 PM, Conor Fennell  wrote:
> Hi,
>
> I have a memory leak in the spark driver which is not in the heap or 
> the non-heap.
> Even though neither of these are increasing, the java process RSS 
> memory is and eventually takes up all the memory on the machine.
> I am using Spark 1.5.2 and the spark-cassandra-connector 1.5.0-M2.
>
> I have reduced the leak to the code below.
> If I remove cassandra from the code below the memory leak does not happen.
> Can someone please explain why this is happening or what I can do to 
> further investigate it.
> I have also include pics from jconsole for a couple of hours and 
> datadog showing the same timeframe the rss memory increase.
>
> Thanks,
> Conor
>
> val ssc = new StreamingContext(sparkConf,
> Seconds(SparkStreamingBatchInterval))
>
> ssc.checkpoint(HdfsNameNodeUriPath)
>
> val kafkaParams = Map[String, String](METADATA_BROKER_LIST ->
> MetadataBrokerList)
>
> var kafkaMessages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](ssc, kafkaParams,
> KafkaTopics.split(DELIMITER).toSet)
>
> var eventBuckets = kafkaMessages.map(keyMessage => {
> implicit val formats = DefaultFormats.lossless
> val eventBucket = parse(keyMessage._2)
> val minute = new Date((eventBucket \ MINUTE).extract[Long])
> val business = (eventBucket \ BUSINESS).extract[String]
> val account = (eventBucket \ ACCOUNT).extract[String]
> (minute, business, account)})
>
>   var eventsToBeProcessed = eventBuckets.transform(rdd =>
> rdd.joinWithCassandraTable("analytics_events" + '_' + 
> settings.JobEnvironment, "events", SomeColumns(MINUTE, BUSINESS, 
> ACCOUNT, EVENT, JSON), SomeColumns(MINUTE, BUSINESS, 
> ACCOUNT)).filter(entry => {
> //remove any entries without a result
> entry._2.length > 0
>   }))
>
>   eventsToBeProcessed.foreachRDD(rdd => {
> println(rdd.take(1))
>   })
>
> sys.ShutdownHookThread {
>   System.err.println(s"Gracefully stopping $JobName Spark 
> Streaming Application")
>   ssc.stop(stopSparkContext = true, stopGracefully = true)
>   System.err.println(s"$JobName streaming job stopped")
> }
>
> ssc.start()
> ssc.awaitTermination()

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



why "cache table a as select * from b" will do shuffle,and create 2 stages.

2015-12-14 Thread ant2nebula
why "cache table a as select * from b" will do shuffle,and create 2 stages.

 

example:

table "ods_pay_consume" is from "KafkaUtils.createDirectStream"

 hiveContext.sql("cache table dwd_pay_consume as select * from
ods_pay_consume")

this code will make 2 statges of DAG

 

 hiveContext.sql("cache table dw_game_server_recharge as select *
from dwd_pay_consume")

this code also will make 2 stages of DAG,and it is similar caculate from the
beginning for ther DAG Visualization tool,"cache table dwd_pay_consume" is
not effect.

 



Re: Spark streaming driver java process RSS memory constantly increasing using cassandra driver

2015-12-14 Thread Conor Fennell
Hi Abhijeet,

Thanks for pointing out the pics are not showing.
I have put all the images in this public google document:
https://docs.google.com/document/d/1xEJ0eTtXBlSso6SshLCWZHcRw4aEQMJflzKBsr2FHaw/edit?usp=sharing

All the code is in the first email; there is nothing else starting up
threads except the 80 or so threads Spark start up.
By the heap I mean: CMS Old Gen, Par Eden Space and Par Survivor Space.
By the non-heap I mean: Code Cache and CMS Perm Gen.
Which is the JVM memory space.

In the document you will see I am quite aggressive with the JVM options.

It does indicate native memory is leaking, but I am at a loss to
properly investigate it and it is looking like it is in the cassandra
driver itself or a combination of how spark is running it and the
driver.

It is also happening when running this within a foreachRDD:

  val cluster =
Cluster.builder().addContactPoints(CassandraHostname.split(DELIMITER):
_*).build();
  val session = cluster.connect()
  ranges.foreach(range => {

session.execute(s"INSERT INTO
analytics_metadata_$JobEnvironment.kafka_offsets (topic, job_name,
batch_time, partition, from_offset, until_offset) VALUES
('${range._1}','${range._2}',${range._3.getTime()},${range._4},${range._5},${range._6})")

  })

  session.close()
  cluster.close()


Thanks,
Conor



On Mon, Dec 14, 2015 at 2:12 PM, Singh, Abhijeet
 wrote:
> Hi Conor,
>
> What do you mean when you say leak is not in "Heap or non-Heap". If it is not 
> heap related than it has to be the native memory that is leaking. I can't say 
> for sure but you do have Threads working there and that could be using the 
> native memory. We didn't get any pics of JConsole.
>
> Thanks.
>
> -Original Message-
> From: Conor Fennell [mailto:conorapa...@gmail.com]
> Sent: Monday, December 14, 2015 4:15 PM
> To: user@spark.apache.org
> Subject: Re: Spark streaming driver java process RSS memory constantly 
> increasing using cassandra driver
>
> Just bumping the issue I am having, if anyone can provide direction? I have 
> been stuck on this for a while now.
>
> Thanks,
> Conor
>
> On Fri, Dec 11, 2015 at 5:10 PM, Conor Fennell  wrote:
>> Hi,
>>
>> I have a memory leak in the spark driver which is not in the heap or
>> the non-heap.
>> Even though neither of these are increasing, the java process RSS
>> memory is and eventually takes up all the memory on the machine.
>> I am using Spark 1.5.2 and the spark-cassandra-connector 1.5.0-M2.
>>
>> I have reduced the leak to the code below.
>> If I remove cassandra from the code below the memory leak does not happen.
>> Can someone please explain why this is happening or what I can do to
>> further investigate it.
>> I have also include pics from jconsole for a couple of hours and
>> datadog showing the same timeframe the rss memory increase.
>>
>> Thanks,
>> Conor
>>
>> val ssc = new StreamingContext(sparkConf,
>> Seconds(SparkStreamingBatchInterval))
>>
>> ssc.checkpoint(HdfsNameNodeUriPath)
>>
>> val kafkaParams = Map[String, String](METADATA_BROKER_LIST ->
>> MetadataBrokerList)
>>
>> var kafkaMessages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams,
>> KafkaTopics.split(DELIMITER).toSet)
>>
>> var eventBuckets = kafkaMessages.map(keyMessage => {
>> implicit val formats = DefaultFormats.lossless
>> val eventBucket = parse(keyMessage._2)
>> val minute = new Date((eventBucket \ MINUTE).extract[Long])
>> val business = (eventBucket \ BUSINESS).extract[String]
>> val account = (eventBucket \ ACCOUNT).extract[String]
>> (minute, business, account)})
>>
>>   var eventsToBeProcessed = eventBuckets.transform(rdd =>
>> rdd.joinWithCassandraTable("analytics_events" + '_' +
>> settings.JobEnvironment, "events", SomeColumns(MINUTE, BUSINESS,
>> ACCOUNT, EVENT, JSON), SomeColumns(MINUTE, BUSINESS,
>> ACCOUNT)).filter(entry => {
>> //remove any entries without a result
>> entry._2.length > 0
>>   }))
>>
>>   eventsToBeProcessed.foreachRDD(rdd => {
>> println(rdd.take(1))
>>   })
>>
>> sys.ShutdownHookThread {
>>   System.err.println(s"Gracefully stopping $JobName Spark
>> Streaming Application")
>>   ssc.stop(stopSparkContext = true, stopGracefully = true)
>>   System.err.println(s"$JobName streaming job stopped")
>> }
>>
>> ssc.start()
>> ssc.awaitTermination()
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
> commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How do I link JavaEsSpark.saveToEs() to a sparkConf?

2015-12-14 Thread Ali Gouta
You don't need an explicit association between your JavaEsSpark and the
SparkConf.
Actuall when you will make transformations/filtering/.. on your "sc" then
you can strore the final RDD in your ELS. Example:

val generateRDD = sc.makeRDD(Seq(SOME_STUFF))
JavaEsSpark.saveToEs(generateRDD, "foo");

That's it...
At last, be carreful while defining your sets of your "conf". For instance
you may end-up changing the localhost by the real IP adresse of your
Elasticsearch node...

Ali Gouta.

On Mon, Dec 14, 2015 at 1:52 PM, Spark Enthusiast 
wrote:

> Folks,
>
> I have the following program :
>
> SparkConf conf = new SparkConf().setMaster("local")
> .setAppName("Indexer").set("spark.driver.maxResultSize", "2g");
> conf.set("es.index.auto.create", "true");
> conf.set("es.nodes", "localhost");
> conf.set("es.port", "9200");
> conf.set("es.write.operation", "index");
> JavaSparkContext sc = new JavaSparkContext(conf);
>
>   .
>   .
>
> JavaEsSpark.saveToEs(filteredFields, "foo");
>
> I get an error saying cannot find storage. Looks like the driver program
> cannot the Elastic Search Server. Seeing the program, I have not associated
> JavaEsSpark to the SparkConf.
>
> Question: How do I associate JavaEsSpark to SparkConf?
>
>
>


Re: memory leak when saving Parquet files in Spark

2015-12-14 Thread Matt K
Thanks Cheng!

I'm running 1.5. After setting the following, I'm no longer seeing this
issue:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

Thanks,
-Matt

On Fri, Dec 11, 2015 at 1:58 AM, Cheng Lian  wrote:

> This is probably caused by schema merging. Were you using Spark 1.4 or
> earlier versions? Could you please try the following snippet to see whether
> it helps:
>
> df.write
>   .format("parquet")
>   .option("mergeSchema", "false")
>   .partitionBy(partitionCols: _*)
>   .mode(saveMode)
>   .save(targetPath)
>
> In 1.5, we've disabled schema merging by default.
>
> Cheng
>
>
> On 12/11/15 5:33 AM, Matt K wrote:
>
> Hi all,
>
> I have a process that's continuously saving data as Parquet with Spark.
> The bulk of the saving logic simply looks like this:
>
>   df.write
> .format("parquet")
> .partitionBy(partitionCols: _*)
> .mode(saveMode).save(targetPath)
>
> After running for a day or so, my process ran out of memory. I took a
> memory-dump. I see that a single thread is holding 32,189
> org.apache.parquet.hadoop.Footer objects, which in turn hold
> ParquetMetadata. This is highly suspicious, since each thread processes
> under 1GB of data at a time, and there's usually no more than 10 files in a
> single batch (no small file problem). So there may be a memory leak
> somewhere in the saveAsParquet code-path.
>
> I've attached a screen-shot from Eclipse MemoryAnalyzer showing the above.
> Note 32,189 references.
>
> A shot in the dark, but is there a way to disable ParquetMetadata file
> generation?
>
> Thanks,
> -Matt
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


-- 
www.calcmachine.com - easy online calculator.


Re: memory leak when saving Parquet files in Spark

2015-12-14 Thread Cheng Lian

Thanks for the feedback, Matt!

Yes, we've also seen other feedback about slow Parquet summary file 
generation, especially when appending a small dataset to an existing 
large dataset. Disabling it is a reasonable workaround since the summary 
files are no longer important after parquet-mr 1.7.


We're planning to turn it off by default in future versions.

Cheng

On 12/15/15 12:27 AM, Matt K wrote:

Thanks Cheng!

I'm running 1.5. After setting the following, I'm no longer seeing 
this issue:


sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

Thanks,
-Matt

On Fri, Dec 11, 2015 at 1:58 AM, Cheng Lian > wrote:


This is probably caused by schema merging. Were you using Spark
1.4 or earlier versions? Could you please try the following
snippet to see whether it helps:

df.write
  .format("parquet")
  .option("mergeSchema", "false")
  .partitionBy(partitionCols: _*)
  .mode(saveMode)
  .save(targetPath)

In 1.5, we've disabled schema merging by default.

Cheng


On 12/11/15 5:33 AM, Matt K wrote:

Hi all,

I have a process that's continuously saving data as Parquet with
Spark. The bulk of the saving logic simply looks like this:

  df.write
.format("parquet")
.partitionBy(partitionCols: _*)
.mode(saveMode).save(targetPath)

After running for a day or so, my process ran out of memory. I
took a memory-dump. I see that a single thread is holding 32,189
org.apache.parquet.hadoop.Footer objects, which in turn hold
ParquetMetadata. This is highly suspicious, since each thread
processes under 1GB of data at a time, and there's usually no
more than 10 files in a single batch (no small file problem). So
there may be a memory leak somewhere in the saveAsParquet code-path.

I've attached a screen-shot from Eclipse MemoryAnalyzer showing
the above. Note 32,189 references.

A shot in the dark, but is there a way to disable ParquetMetadata
file generation?

Thanks,
-Matt


-
To unsubscribe, e-mail:user-unsubscr...@spark.apache.org

For additional commands, e-mail:user-h...@spark.apache.org 






--
www.calcmachine.com  - easy online calculator.




Re: Run ad-hoc queries at runtime against cached RDDs

2015-12-14 Thread Krishna Rao
Thanks for the response Jörn. So to elaborate, I have a large dataset with
userIds, each tagged with a property, e.g.:

user_1prop1=X
user_2prop1=Yprop2=A
user_3prop2=B


I would like to be able to get the number of distinct users that have a
particular property (or combination of properties). The cardinality of each
property is in the 1000s and will only grow, as will the number of
properties. I'm happy with approximate values to trade accuracy for
performance.

Spark's performance when doing this via spark-shell is more that excellent
using the "countApproxDistinct" method on a "javaRDD". However, I've no
idea what's the best way to be able to run a query programatically like I
can do manually via spark-shell.

Hope this clarifies things.


On 14 December 2015 at 17:04, Jörn Franke  wrote:

> Can you elaborate a little bit more on the use case? It looks a little bit
> like an abuse of Spark in general . Interactive queries that are not
> suitable for in-memory batch processing might be better supported by ignite
> that has in-memory indexes, concept of hot, warm, cold data etc. or hive on
> tez+llap .
>
> > On 14 Dec 2015, at 17:19, Krishna Rao  wrote:
> >
> > Hi all,
> >
> > What's the best way to run ad-hoc queries against a cached RDDs?
> >
> > For example, say I have an RDD that has been processed, and persisted to
> memory-only. I want to be able to run a count (actually
> "countApproxDistinct") after filtering by an, at compile time, unknown
> (specified by query) value.
> >
> > I've experimented with using (abusing) Spark Streaming, by streaming
> queries and running these against the cached RDD. However, as I say I don't
> think that this is an intended use-case of Streaming.
> >
> > Cheers,
> >
> > Krishna
>


How to Make Spark Streaming DStream as SQL table?

2015-12-14 Thread MK
Hi,

The aim here is as follows:

   - read data from Socket using Spark Streaming every N seconds

   - register received data as SQL table

   - there will be more data read from HDFS etc as reference data, they will
also be registered as SQL tables

   - the idea is to perform arbitrary SQL queries on the combined streaming
& reference data

Please see below code snippet. I see Data is written out to disk from
"inside" the forEachRDD loop but the same registered SQL table's data is
empty when written "outside" of forEachRDD loop.

Please give your opinion/suggestions to fix this. Also any other mechanism
to achieve the above stated "aim" is welcome.

case class Record(id:Int, status:String, source:String)

object SqlApp2 {
  def main(args: Array[String]) {
val sparkConf = new
SparkConf().setAppName("SqlApp2").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
// Create the streaming context with a 10 second batch size
val ssc = new StreamingContext(sc, Seconds(10))

val lines = ssc.socketTextStream("localhost", ,
StorageLevel.MEMORY_AND_DISK_SER)

var alldata:DataFrame=sqlContext.emptyDataFrame
alldata.registerTempTable("alldata")

lines.foreachRDD((rdd: RDD[String], time: Time) => {
  import sqlContext.implicits._

  // Convert RDD[String] to DataFrame
  val data = rdd.map(w => {
val words = w.split(" ")
Record(words(0).toInt, words(1), words(2))}).toDF()

  // Register as table
  data.registerTempTable("alldata")
  data.save("inside/file"+System.currentTimeMillis(), "json",
SaveMode.ErrorIfExists)  // this data is written properly
})

val dataOutside = sqlContext.sql("select * from alldata")
dataOutside.save("outside/file"+System.currentTimeMillis(), "json",
SaveMode.ErrorIfExists) // this data is empty, how to make the SQL table
registered inside the forEachRDD loop visible for rest of application

ssc.start()
ssc.awaitTermination()
  }

Thanks & Regards

MK



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-Make-Spark-Streaming-DStream-as-SQL-table-tp25699.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: How to save Multilayer Perceptron Classifier model.

2015-12-14 Thread Ulanov, Alexander
Hi Vadim,

As Yanbo pointed out, that feature is not yet merged into the main branch. 
However, there is a hacky workaround:
// save model
sc.parallelize(Seq(model), 1).saveAsObjectFile("path")
// load model
val sameModel = sc.objectFile[YourCLASS]("path").first()

Best regards, Alexander

From: Yanbo Liang [mailto:yblia...@gmail.com]
Sent: Sunday, December 13, 2015 8:44 PM
To: Vadim Gribanov
Cc: user@spark.apache.org
Subject: Re: How to save Multilayer Perceptron Classifier model.

Hi Vadim,

It does not support save/load for Multilayer Perceptron Model currently, you 
can track the issues at 
SPARK-11871.

Yanbo

2015-12-14 2:31 GMT+08:00 Vadim Gribanov 
>:
Hey everyone! I’m new with spark and scala. I looked at examples in user guide 
and didn’t find how to save Multilayer Perceptron Classifier model to HDFS.

Trivial:

model.save(sc, “NNModel”)

Didn’t work for me.

Help me please.
-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org



Re: Run ad-hoc queries at runtime against cached RDDs

2015-12-14 Thread Jörn Franke
Can you elaborate a little bit more on the use case? It looks a little bit like 
an abuse of Spark in general . Interactive queries that are not suitable for 
in-memory batch processing might be better supported by ignite that has 
in-memory indexes, concept of hot, warm, cold data etc. or hive on tez+llap . 

> On 14 Dec 2015, at 17:19, Krishna Rao  wrote:
> 
> Hi all,
> 
> What's the best way to run ad-hoc queries against a cached RDDs?
> 
> For example, say I have an RDD that has been processed, and persisted to 
> memory-only. I want to be able to run a count (actually 
> "countApproxDistinct") after filtering by an, at compile time, unknown 
> (specified by query) value.
> 
> I've experimented with using (abusing) Spark Streaming, by streaming queries 
> and running these against the cached RDD. However, as I say I don't think 
> that this is an intended use-case of Streaming.
> 
> Cheers,
> 
> Krishna

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Classpath problem trying to use DataFrames

2015-12-14 Thread Christopher Brady
Thanks for the response. I lost access to my cluster over the weekend, 
so I had to wait until today to check.


All of the correct Hive jars are in classpath.txt. Also, this error 
seems to be happening in the driver rather than the executors. It's 
running in yarn-client mode, so it should use the classpath of my local 
JVM, which also contains the Hive jars. I also checked for that class 
specifically and it is there. Does Spark do anything funny on the driver 
side that would make the Hive classes on the classpath unavailable?



On 12/11/2015 11:08 PM, Harsh J wrote:
Do you have all your hive jars listed in the classpath.txt / 
SPARK_DIST_CLASSPATH env., specifically the hive-exec jar? Is the 
location of that jar also the same on all the distributed hosts?


Passing an explicit executor classpath string may also help overcome 
this (replace HIVE_BASE_DIR to the root of your hive installation):


--conf "spark.executor.extraClassPath=$HIVE_BASE_DIR/hive/lib/*"

On Sat, Dec 12, 2015 at 6:32 AM Christopher Brady 
> 
wrote:


I'm trying to run a basic "Hello world" type example using DataFrames
with Hive in yarn-client mode. My code is:

JavaSparkContext sc = new JavaSparkContext("yarn-client", "Test app"))
HiveContext sqlContext = new HiveContext(sc.sc ());
sqlContext.sql("SELECT * FROM my_table").count();

The exception I get on the driver is:
java.lang.ClassNotFoundException:
org.apache.hadoop.hive.ql.plan.TableDesc

There are no exceptions on the executors.

That class is definitely on the classpath of the driver, and it runs
without errors in local mode. I haven't been able to find any similar
errors on google. Does anyone know what I'm doing wrong?

The full stack trace is included below:

java.lang.NoClassDefFoundError:
Lorg/apache/hadoop/hive/ql/plan/TableDesc;
 at java.lang.Class.getDeclaredFields0(Native Method)
 at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
 at java.lang.Class.getDeclaredField(Class.java:1946)
 at
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
 at
java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
 at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
 at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.io.ObjectStreamClass.(ObjectStreamClass.java:468)
 at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
 at
java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602)
 at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
 at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
scala.collection.immutable.$colon$colon.readObject(List.scala:362)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
 at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
 at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
 at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at

Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

2015-12-14 Thread Bartłomiej Alberski
Below is the full stacktrace(real names of my classes were changed) with
short description of entries from my code:

rdd.mapPartitions{ partition => //this is the line to which second
stacktrace entry is pointing
  val sender =  broadcastedValue.value // this is the maing place to which
first stacktrace entry is pointing
}

java.lang.ClassCastException:
org.apache.spark.util.SerializableConfiguration cannot be cast to
com.example.sender.MyClassReporter
at com.example.flow.Calculator
$$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(AnomalyDetection.scala:87)
at com.example.flow.Calculator
$$anonfun$saveAndLoadHll$1$$anonfun$apply$14.apply(Calculator.scala:82)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:706)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

2015-12-14 17:10 GMT+01:00 Ted Yu :

> Can you show the complete stack trace for the ClassCastException ?
>
> Please see the following thread:
> http://search-hadoop.com/m/q3RTtgEUHVmJA1T1
>
> Cheers
>
> On Mon, Dec 14, 2015 at 7:33 AM, alberskib  wrote:
>
>> Hey all,
>>
>> When my streaming application is restarting from failure (from
>> checkpoint) I
>> am receiving strange error:
>>
>> java.lang.ClassCastException:
>> org.apache.spark.util.SerializableConfiguration cannot be cast to
>> com.example.sender.MyClassReporter.
>>
>> Instance of B class is created on driver side (with proper config passed
>> as
>> constructor arg) and broadcasted to the executors in order to ensure that
>> on
>> each worker there will be only single instance. Everything is going well
>> up
>> to place where I am getting value of broadcasted field and executing
>> function on it i.e.
>> broadcastedValue.value.send(...)
>>
>> Below you can find definition of MyClassReporter (with trait):
>>
>> trait Reporter{
>>   def send(name: String, value: String, timestamp: Long) : Unit
>>   def flush() : Unit
>> }
>>
>> class MyClassReporter(config: MyClassConfig, flow: String) extends
>> Reporter
>> with Serializable {
>>
>>   val prefix = s"${config.senderConfig.prefix}.$flow"
>>
>>   var counter = 0
>>
>>   @transient
>>   private lazy val sender : GraphiteSender = initialize()
>>
>>   @transient
>>   private lazy val threadPool =
>> ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
>>
>>   private def initialize() = {
>>   val sender = new Sender(
>> new InetSocketAddress(config.senderConfig.hostname,
>> config.senderConfig.port)
>>   )
>>   sys.addShutdownHook{
>> sender.close()
>>   }
>>   sender
>>   }
>>
>>   override def send(name: String, value: String, timestamp: Long) : Unit
>> = {
>> threadPool.submit(new Runnable {
>>   override def run(): Unit = {
>> try {
>>   counter += 1
>>   if (!sender.isConnected)
>> sender.connect()
>>   sender.send(s"$prefix.$name", value, timestamp)
>>   if (counter % graphiteConfig.batchSize == 0)
>> sender.flush()
>> }catch {
>>   case NonFatal(e) => {
>> println(s"Problem with sending metric to graphite
>> $prefix.$name:
>> $value at $timestamp: ${e.getMessage}", e)
>> Try{sender.close()}.recover{
>>   case NonFatal(e) => println(s"Error closing graphite
>> ${e.getMessage}", e)
>> }
>>   }
>> }
>>   }
>> })
>>   }
>>
>> Do you have any idea how I can solve this issue? Using broadcasted
>> variable
>> helps me keeping single socket open to the service on executor.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-java-lang-ClassCastException-org-apache-spark-util-SerializableConfiguration-on-restt-tp25698.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


UNSUBSCRIBE

2015-12-14 Thread Tim Barthram
UNSUBSCRIBE Thanks


_

The information transmitted in this message and its attachments (if any) is 
intended 
only for the person or entity to which it is addressed.
The message may contain confidential and/or privileged material. Any review, 
retransmission, dissemination or other use of, or taking of any action in 
reliance 
upon this information, by persons or entities other than the intended recipient 
is 
prohibited.

If you have received this in error, please contact the sender and delete this 
e-mail 
and associated material from any computer.

The intended recipient of this e-mail may only use, reproduce, disclose or 
distribute 
the information contained in this e-mail and any attached files, with the 
permission 
of the sender.

This message has been scanned for viruses.
_


Discover SparkUI port for spark streaming job running in cluster mode

2015-12-14 Thread Ashish Nigam
Hi,
I run spark streaming job in cluster mode. This means that driver can run
in any data node. And Spark UI can run in any dynamic port.
At present, I know about the port by looking at container logs that look
something like this -

server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:50571
INFO util.Utils: Successfully started service 'SparkUI' on port 50571.
INFO ui.SparkUI: Started SparkUI at http://xxx:50571


Is there any way to know about the UI port automatically using some API?

Thanks
Ashish


Saving to JDBC

2015-12-14 Thread Bob Corsaro
Is there anyway to map pyspark.sql.Row columns to JDBC table columns, or do
I have to just put them in the right order before saving?

I'm using code like this:

```
rdd = rdd.map(lambda i: Row(name=i.name, value=i.value))
sqlCtx.createDataFrame(rdd).write.jdbc(dbconn_string, tablename,
mode='append')
```

Since the Row class orders them alphabetically, they are inserted into the
sql table in alphabetical order instead of matching Row columns to table
columns.


worker:java.lang.ClassNotFoundException: ttt.test$$anonfun$1

2015-12-14 Thread Bonsen
package ttt

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object test {
  def main(args: Array[String]) {
val conf = new SparkConf()
conf.setAppName("mytest")
  .setMaster("spark://Master:7077")
  .setSparkHome("/usr/local/spark")
 
.setJars(Array("/home/hadoop/spark-assembly-1.4.0-hadoop2.4.0.jar","/home/hadoop/datanucleus-core-3.2.10.jar","/home/hadoop/spark-1.4.0-yarn-shuffle.jar","spark-examples-1.4.0-hadoop2.4.0.jar"))
val sc = new SparkContext(conf)
val rawData = sc.textFile("/home/hadoop/123.csv")
val secondData = rawData.flatMap(_.split(",").toString)
println(secondData.first)   /line 32
sc.stop()
  }
}
__
the problem is:   //219.216.64.55 is my worker's ip
15/12/14 03:18:34 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
219.216.64.55): java.lang.ClassNotFoundException: ttt.test$$anonfun$1
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
...
__
I can run the example that spark provides , but if I try my own program,it
can't find the class..



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/worker-java-lang-ClassNotFoundException-ttt-test-anonfun-1-tp25696.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: HELP! I get "java.lang.String cannot be cast to java.lang.Intege " for a long time.

2015-12-14 Thread Jakob Odersky
> Sorry,I'm late.I try again and again ,now I use spark 1.4.0 ,hadoop
2.4.1.but I also find something strange like this :

>
http://apache-spark-user-list.1001560.n3.nabble.com/worker-java-lang-ClassNotFoundException-ttt-test-anonfun-1-td25696.html
> (if i use "textFile",It can't run.)

In the link you sent, there is still an `addJar(spark-assembly-hadoop-xx)`,
can you try running your application with that?

On 11 December 2015 at 03:08, Bonsen  wrote:

> Thank you,and I find the problem is my package is test,but I write package
> org.apache.spark.examples ,and IDEA had imported the
> spark-examples-1.5.2-hadoop2.6.0.jar ,so I can run it,and it makes lots of
> problems
> __
> Now , I change the package like this:
>
> package test
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> object test {
>   def main(args: Array[String]) {
> val conf = new
> SparkConf().setAppName("mytest").setMaster("spark://Master:7077")
> val sc = new SparkContext(conf)
> sc.addJar("/home/hadoop/spark-assembly-1.5.2-hadoop2.6.0.jar")//It
> doesn't work.!?
> val rawData = sc.textFile("/home/hadoop/123.csv")
> val secondData = rawData.flatMap(_.split(",").toString)
> println(secondData.first)   /line 32
> sc.stop()
>   }
> }
> it causes that:
> 15/12/11 18:41:06 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> 219.216.65.129): java.lang.ClassNotFoundException: test.test$$anonfun$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> 
> 
> //  219.216.65.129 is my worker computer.
> //  I can connect to my worker computer.
> // Spark can start successfully.
> //  addFile is also doesn't work,the tmp file will also dismiss.
>
>
>
>
>
>
> At 2015-12-10 22:32:21, "Himanshu Mehra [via Apache Spark User List]" <[hidden
> email] > wrote:
>
> You are trying to print an array, but anyway it will print the objectID
>  of the array if the input is same as you have shown here. Try flatMap()
> instead of map and check if the problem is same.
>
>--Himanshu
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/HELP-I-get-java-lang-String-cannot-be-cast-to-java-lang-Intege-for-a-long-time-tp25666p25667.html
> To unsubscribe from HELP! I get "java.lang.String cannot be cast to
> java.lang.Intege " for a long time., click here.
> NAML
> 
>
>
>
>
>
> --
> View this message in context: Re:Re: HELP! I get "java.lang.String cannot
> be cast to java.lang.Intege " for a long time.
> 
>
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: Re: HELP! I get "java.lang.String cannot be cast to java.lang.Intege " for a long time.

2015-12-14 Thread Jakob Odersky
sorry typo, I meant *without* the addJar

On 14 December 2015 at 11:13, Jakob Odersky  wrote:

> > Sorry,I'm late.I try again and again ,now I use spark 1.4.0 ,hadoop
> 2.4.1.but I also find something strange like this :
>
> >
> http://apache-spark-user-list.1001560.n3.nabble.com/worker-java-lang-ClassNotFoundException-ttt-test-anonfun-1-td25696.html
> > (if i use "textFile",It can't run.)
>
> In the link you sent, there is still an
> `addJar(spark-assembly-hadoop-xx)`, can you try running your application
> with that?
>
> On 11 December 2015 at 03:08, Bonsen  wrote:
>
>> Thank you,and I find the problem is my package is test,but I write
>> package org.apache.spark.examples ,and IDEA had imported the
>> spark-examples-1.5.2-hadoop2.6.0.jar ,so I can run it,and it makes lots of
>> problems
>> __
>> Now , I change the package like this:
>>
>> package test
>> import org.apache.spark.SparkConf
>> import org.apache.spark.SparkContext
>> object test {
>>   def main(args: Array[String]) {
>> val conf = new
>> SparkConf().setAppName("mytest").setMaster("spark://Master:7077")
>> val sc = new SparkContext(conf)
>> sc.addJar("/home/hadoop/spark-assembly-1.5.2-hadoop2.6.0.jar")//It
>> doesn't work.!?
>> val rawData = sc.textFile("/home/hadoop/123.csv")
>> val secondData = rawData.flatMap(_.split(",").toString)
>> println(secondData.first)   /line 32
>> sc.stop()
>>   }
>> }
>> it causes that:
>> 15/12/11 18:41:06 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
>> 219.216.65.129): java.lang.ClassNotFoundException: test.test$$anonfun$1
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>> 
>> 
>> //  219.216.65.129 is my worker computer.
>> //  I can connect to my worker computer.
>> // Spark can start successfully.
>> //  addFile is also doesn't work,the tmp file will also dismiss.
>>
>>
>>
>>
>>
>>
>> At 2015-12-10 22:32:21, "Himanshu Mehra [via Apache Spark User List]" 
>> <[hidden
>> email] > wrote:
>>
>> You are trying to print an array, but anyway it will print the objectID
>>  of the array if the input is same as you have shown here. Try flatMap()
>> instead of map and check if the problem is same.
>>
>>--Himanshu
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/HELP-I-get-java-lang-String-cannot-be-cast-to-java-lang-Intege-for-a-long-time-tp25666p25667.html
>> To unsubscribe from HELP! I get "java.lang.String cannot be cast to
>> java.lang.Intege " for a long time., click here.
>> NAML
>> 
>>
>>
>>
>>
>>
>> --
>> View this message in context: Re:Re: HELP! I get "java.lang.String
>> cannot be cast to java.lang.Intege " for a long time.
>> 
>>
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>
>


Adding a UI Servlet Filter

2015-12-14 Thread iamknome
Hello all,

I am trying to setup a UI filter for the Web UI and trying to add my
customer auth servlet filter to the worker and master processes. I have
added the extraClasspath option and have it pointed to my custom JAR but
when the worker or master starts it keeps complaining about
ClassNotFoundException.

What is the recommended approach to do this ? How do i protect my web ui so
only authorized users have access to it ? How do i get past the class path
issue ?

Any help is appreciated.

thanks




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Adding-a-UI-Servlet-Filter-tp25700.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Strange Set of errors

2015-12-14 Thread Steve Lewis
I am running on a spark 1.5.1 cluster managed by Mesos - I have an
application that handled a chemistry problem which can be increased by
increasing the number of atoms - increasing the number of Spark stages. I
do a repartition at each stage - Stage 9 is the last stage. At each stage
the size and complexity increases by a factor of 8 or so.
Problems with 8 stages run with no difficulty - ones with 9 stages never
work - the always crash in a manner similar to the stack dump below ( sorry
for the length but NONE of steps are mine.
I do not see any slaves throwing an exception (which has different errors
anyway)
I am completely baffled and believe the error is in something Spark is
doing - I use 7000 or so tasks to try to divide the work - I see the same
issue when I cut the parallelism to 256 but tasks run longer - my mean task
takes about 5 minutes (oh yes I expect the job to take about 8 hours on my
15 node cluster.
Any bright ideas


[Stage 9:==> (5827 + 60) /
7776]Exception in thread "main" org.apache.spark.SparkException: Job 0
cancelled because Stage 9 was cancelled
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
at
org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1229)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1217)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1216)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1216)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:156)
at
org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1216)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1469)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919)
at org.apache.spark.rdd.RDD.count(RDD.scala:1121)
at
org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:445)
at
org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:47)
at
com.lordjoe.molgen.SparkAtomGenerator.run(SparkAtomGenerator.java:150)
at
com.lordjoe.molgen.SparkAtomGenerator.run(SparkAtomGenerator.java:110)
at com.lordjoe.molgen.VariantCounter.main(VariantCounter.java:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672)
at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/12/14 09:53:20 WARN ServletHandler: /stages/stage/kill/
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at
org.apache.spark.ui.jobs.StagesTab.handleKillRequest(StagesTab.scala:49)
at org.apache.spark.ui.SparkUI$$anonfun$3.apply(SparkUI.scala:71)
at org.apache.spark.ui.SparkUI$$anonfun$3.apply(SparkUI.scala:71)
at
org.apache.spark.ui.JettyUtils$$anon$2.doRequest(JettyUtils.scala:141)
at
org.apache.spark.ui.JettyUtils$$anon$2.doGet(JettyUtils.scala:128)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:735)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:848)
at
org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
at
org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
at
org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
at

SparkML algos limitations question.

2015-12-14 Thread Eugene Morozov
Hello!

I'm currently working on POC and try to use Random Forest (classification
and regression). I also have to check SVM and Multiclass perceptron (other
algos are less important at the moment). So far I've discovered that Random
Forest has a limitation of maxDepth for trees and just out of curiosity I
wonder why such a limitation has been introduced?

An actual question is that I'm going to use Spark ML in production next
year and would like to know if there are other limitations like maxDepth in
RF for other algorithms: Logistic Regression, Perceptron, SVM, etc.

Thanks in advance for your time.
--
Be well!
Jean Morozov


Re: Autoscaling of Spark YARN cluster

2015-12-14 Thread Mingyu Kim
Cool. Using Ambari to monitor and scale up/down the cluster sounds
promising. Thanks for the pointer!

Mingyu

From:  Deepak Sharma 
Date:  Monday, December 14, 2015 at 1:53 AM
To:  cs user 
Cc:  Mingyu Kim , "user@spark.apache.org"

Subject:  Re: Autoscaling of Spark YARN cluster

An approach I can think of  is using Ambari Metrics Service(AMS)
Using these metrics , you can decide upon if the cluster is low in
resources.
If yes, call the Ambari management API to add the node to the cluster.

Thanks
Deepak

On Mon, Dec 14, 2015 at 2:48 PM, cs user  wrote:
> Hi Mingyu, 
> 
> I'd be interested in hearing about anything else you find which might meet
> your needs for this.
> 
> One way perhaps this could be done would be to use Ambari. Ambari comes with a
> nice api which you can use to add additional nodes into a cluster:
> 
> https://github.com/apache/ambari/blob/trunk/ambari-server/docs/api/v1/index.md
>  _blob_trunk_ambari-2Dserver_docs_api_v1_index.md=CwMFaQ=izlc9mHr637UR4lpLE
> ZLFFS3Vn2UXBrZ4tFb6oOnmz8=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84=tDt9
> pyS5Gz-4R50zQ9pG1lDSVv8Gg03JsQXzTtsghag=aceNpj9HLTmsTeVMI5VMxj9HmbU3ls0gqxa2
> OVkkUOA=> 
> 
> Once the node has been built, the ambari agent installed, you can then call
> back to the management node via the api, tell it what you want the new node to
> be, and it will connect, configure your new node and add it to the cluster.
> 
> You could create a host group within the cluster blueprint with the minimal
> components you need to install to have it operate as a yarn node.
> 
> As for the decision to scale, that is outside of the remit of Ambari. I guess
> you could look into using aws autoscaling or you could look into a product
> called scalr, which has an opensource version. We are using this to install an
> ambari cluster using chef to configure the nodes up until the point it hands
> over to Ambari. 
> 
> Scalr allows you to write custom scaling metrics which you could use to query
> the # of applications queued, # of resources available values and add nodes
> when required. 
> 
> Cheers!
> 
> On Mon, Dec 14, 2015 at 8:57 AM, Mingyu Kim  wrote:
>> Hi all,
>> 
>> Has anyone tried out autoscaling Spark YARN cluster on a public cloud (e.g.
>> EC2) based on workload? To be clear, I¹m interested in scaling the cluster
>> itself up and down by adding and removing YARN nodes based on the cluster
>> resource utilization (e.g. # of applications queued, # of resources
>> available), as opposed to scaling resources assigned to Spark applications,
>> which is natively supported by Spark¹s dynamic resource scheduling. I¹ve
>> found that Cloudbreak
>> > k-2Ddocs_latest_periscope_-23how-2Dit-2Dworks=CwMFaQ=izlc9mHr637UR4lpLEZL
>> FFS3Vn2UXBrZ4tFb6oOnmz8=ennQJq47pNnObsDh-88a9YUrUulcYQoV8giPASqXB84=tDt9p
>> yS5Gz-4R50zQ9pG1lDSVv8Gg03JsQXzTtsghag=qKfLbs_mv_rLKTEHN1FUW98fehzu7HAbdD7t
>> h9dykTg=>  has a similar feature, but it¹s in ³technical preview², and I
>> didn¹t find much else from my search.
>> 
>> This might be a general YARN question, but wanted to check if there¹s a
>> solution popular in the Spark community. Any sharing of experience around
>> autoscaling will be helpful!
>> 
>> Thanks,
>> Mingyu
> 



-- 
Thanks
Deepak
www.bigdatabig.com 

www.keosha.net 





smime.p7s
Description: S/MIME cryptographic signature


Re: [SparkR] Any reason why saveDF's mode is append by default ?

2015-12-14 Thread Shivaram Venkataraman
I think its just a bug -- I think we originally followed the Python
API (in the original PR [1]) but the Python API seems to have been
changed to match Scala / Java in
https://issues.apache.org/jira/browse/SPARK-6366

Feel free to open a JIRA / PR for this.

Thanks
Shivaram

[1] https://github.com/amplab-extras/SparkR-pkg/pull/199/files

On Sun, Dec 13, 2015 at 11:58 PM, Jeff Zhang  wrote:
> It is inconsistent with scala api which is error by default. Any reason for
> that ? Thanks
>
>
>
> --
> Best Regards
>
> Jeff Zhang

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Re: Spark assembly in Maven repo?

2015-12-14 Thread Sean Owen
Yes, though I think the Maven Central repository is more canonical.

http://repo1.maven.org/maven2/org/apache/spark/spark-core_2.10/1.5.2/

On Mon, Dec 14, 2015, 06:35 Xiaoyong Zhu  wrote:

> Thanks! do you mean something here (for example for 1.5.1 using scala
> 2.10)?
>
>
> https://repository.apache.org/content/repositories/releases/org/apache/spark/spark-core_2.10/1.5.1/
>
>
>
> Xiaoyong
>
>
>
> *From:* Sean Owen [mailto:so...@cloudera.com]
> *Sent:* Saturday, December 12, 2015 12:45 AM
> *To:* Xiaoyong Zhu 
> *Cc:* user 
>
>
> *Subject:* Re: Re: Spark assembly in Maven repo?
>
>
>
> That's exactly what the various artifacts in the Maven repo are for. The
> API classes for core are in the core artifact and so on. You don't need an
> assembly.
>
>
>
> On Sat, Dec 12, 2015 at 12:32 AM, Xiaoyong Zhu 
> wrote:
>
> Yes, so our scenario is to treat the spark assembly as an “SDK” so users
> can develop Spark applications easily without downloading them. In this
> case which way do you guys think might be good?
>
>
>
> Xiaoyong
>
>
>
> *From:* fightf...@163.com [mailto:fightf...@163.com]
> *Sent:* Friday, December 11, 2015 12:08 AM
> *To:* Mark Hamstra 
> *Cc:* Xiaoyong Zhu ; Jeff Zhang ;
> user ; Zhaomin Xu ; Joe Zhang
> (SDE) 
> *Subject:* Re: Re: Spark assembly in Maven repo?
>
>
>
> Agree with you that assembly jar is not good to publish. However, what he
> really need is to fetch
>
> an updatable maven jar file.
>
>
> --
>
> fightf...@163.com
>
>
>
> *From:* Mark Hamstra 
>
> *Date:* 2015-12-11 15:34
>
> *To:* fightf...@163.com
>
> *CC:* Xiaoyong Zhu ; Jeff Zhang ;
> user ; Zhaomin Xu ; Joe Zhang
> (SDE) 
>
> *Subject:* Re: RE: Spark assembly in Maven repo?
>
> No, publishing a spark assembly jar is not fine.  See the doc attached to
> https://issues.apache.org/jira/browse/SPARK-11157
> 
> and be aware that a likely goal of Spark 2.0 will be the elimination of
> assemblies.
>
>
>
> On Thu, Dec 10, 2015 at 11:19 PM, fightf...@163.com 
> wrote:
>
> Using maven to download the assembly jar is fine. I would recommend to
> deploy this
>
> assembly jar to your local maven repo, i.e. nexus repo, Or more likey a
> snapshot repository
>
>
> --
>
> fightf...@163.com
>
>
>
> *From:* Xiaoyong Zhu 
>
> *Date:* 2015-12-11 15:10
>
> *To:* Jeff Zhang 
>
> *CC:* user@spark.apache.org; Zhaomin Xu ; Joe Zhang
> (SDE) 
>
> *Subject:* RE: Spark assembly in Maven repo?
>
> Sorry – I didn’t make it clear. It’s actually not a “dependency” – it’s
> actually that we are building a certain plugin for IntelliJ where we want
> to distribute this jar. But since the jar is updated frequently we don't
> want to distribute it together with our plugin but we would like to
> download it via Maven.
>
>
>
> In this case what’s the recommended way?
>
>
>
> Xiaoyong
>
>
>
> *From:* Jeff Zhang [mailto:zjf...@gmail.com]
> *Sent:* Thursday, December 10, 2015 11:03 PM
> *To:* Xiaoyong Zhu 
> *Cc:* user@spark.apache.org
> *Subject:* Re: Spark assembly in Maven repo?
>
>
>
> I don't think make the assembly jar as dependency a good practice. You may
> meet jar hell issue in that case.
>
>
>
> On Fri, Dec 11, 2015 at 2:46 PM, Xiaoyong Zhu 
> wrote:
>
> Hi Experts,
>
>
>
> We have a project which has a dependency for the following jar
>
>
>
> spark-assembly--hadoop.jar
>
> for example:
>
> spark-assembly-1.4.1.2.3.3.0-2983-hadoop2.7.1.2.3.3.0-2983.jar
>
>
>
> since this assembly might be updated in the future, I am not sure if there
> is a Maven repo that has the above spark assembly jar? Or should we create
> & upload it to Maven central?
>
>
>
> Thanks!
>
>
>
> Xiaoyong
>
>
>
>
>
>
>
> --
>
> Best Regards
>
> Jeff Zhang
>
>
>
>
>


Re: UNSUBSCRIBE

2015-12-14 Thread Mithila Joshi
unsubscribe

On Mon, Dec 14, 2015 at 4:49 PM, Tim Barthram 
wrote:

> UNSUBSCRIBE Thanks
>
>
>
> _
>
> The information transmitted in this message and its attachments (if any)
> is intended
> only for the person or entity to which it is addressed.
> The message may contain confidential and/or privileged material. Any
> review,
> retransmission, dissemination or other use of, or taking of any action in
> reliance
> upon this information, by persons or entities other than the intended
> recipient is
> prohibited.
>
> If you have received this in error, please contact the sender and delete
> this e-mail
> and associated material from any computer.
>
> The intended recipient of this e-mail may only use, reproduce, disclose or
> distribute
> the information contained in this e-mail and any attached files, with the
> permission
> of the sender.
>
> This message has been scanned for viruses.
> _
>


Re: Concatenate a string to a Column of type string in DataFrame

2015-12-14 Thread Michael Armbrust
In earlier versions you should be able to use callUdf or callUDF (depending
on which version) and call the hive function "concat".

On Sun, Dec 13, 2015 at 3:05 AM, Yanbo Liang  wrote:

> Sorry, it was added since 1.5.0.
>
> 2015-12-13 2:07 GMT+08:00 Satish :
>
>> Hi,
>> Will the below mentioned snippet work for Spark 1.4.0
>>
>> Thanks for your inputs
>>
>> Regards,
>> Satish
>> --
>> From: Yanbo Liang 
>> Sent: ‎12-‎12-‎2015 20:54
>> To: satish chandra j 
>> Cc: user 
>> Subject: Re: Concatenate a string to a Column of type string in DataFrame
>>
>> Hi Satish,
>>
>> You can refer the following code snippet:
>> df.select(concat(col("String_Column"), lit("00:00:000")))
>>
>> Yanbo
>>
>> 2015-12-12 16:01 GMT+08:00 satish chandra j :
>>
>>> HI,
>>> I am trying to update a column value in DataFrame, incrementing a column
>>> of integer data type than the below code works
>>>
>>> val new_df=old_df.select(df("Int_Column")+10)
>>>
>>> If I implement the similar approach for appending a string to a column
>>> of string datatype  as below than it does not error out but returns only
>>> "null" values
>>>
>>> val new_df=old_df.select(df("String_Column")+"00:00:000")
>>>  OR
>>> val dt ="00:00:000"
>>> val new_df=old_df.select(df("String_Column")+toString(dt))
>>>
>>> Please suggest if any approach to update a column value of datatype
>>> String
>>> Ex: Column value consist '20-10-2015' post updating it should have
>>> '20-10-201500:00:000'
>>>
>>> Note: Transformation such that new DataFrame has to becreated from old
>>> DataFrame
>>>
>>> Regards,
>>> Satish Chandra
>>>
>>>
>>>
>>>
>>
>>
>


Re: RuntimeException: Failed to check null bit for primitive int type

2015-12-14 Thread Michael Armbrust
Your code (at 
com.ctrip.ml.toolimpl.MetadataImpl$$anonfun$1.apply(MetadataImpl.scala:22))
needs to check isNullAt before calling getInt.  This is because you cannot
return null for a primitive value (Int).

On Mon, Dec 14, 2015 at 3:40 AM, zml张明磊  wrote:

> Hi,
>
>
>
>  My spark version is spark-1.4.1-bin-hadoop2.6. When I submit a
> spark job and read data from hive table. Getting the following error.
> Although it’s just a WARN. But it’s leading to the job failure.
>
> Maybe the following jira has solved. So, I am confusing.
> https://issues.apache.org/jira/browse/SPARK-3004
>
>
>
> *15/12/14 19:21:39 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 40.0 (TID 1255, minglei): java.lang.RuntimeException: Failed to check null
> bit for primitive int value.*
> at scala.sys.package$.error(package.scala:27)
> at
> org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(rows.scala:82)
> at
> com.ctrip.ml.toolimpl.MetadataImpl$$anonfun$1.apply(MetadataImpl.scala:22)
> at
> com.ctrip.ml.toolimpl.MetadataImpl$$anonfun$1.apply(MetadataImpl.scala:22)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
> at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:658)
> at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
> at
> org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1338)
> at
> org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1335)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745
>
> 15/12/14 19:21:39 WARN scheduler.TaskSetManager: Lost task 0.0 in stage
> 40.0 (TID 1255, minglei): java.lang.RuntimeException: Failed to check null
> bit for primitive int value.
> at scala.sys.package$.error(package.scala:27)
> at
> org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(rows.scala:82)
> at
> com.ctrip.ml.toolimpl.MetadataImpl$$anonfun$1.apply(MetadataImpl.scala:22)
> at
> com.ctrip.ml.toolimpl.MetadataImpl$$anonfun$1.apply(MetadataImpl.scala:22)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at
> scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:30)
> at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:658)
> at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
> at
> org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1338)
> at
> org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1$$anonfun$29.apply(RDD.scala:1335)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
>
>
>


troubleshooting "Missing an output location for shuffle"

2015-12-14 Thread Veljko Skarich
Hi,

I keep getting some variation of the following error:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
location for shuffle 2


Does anyone know what this might indicate? Is it a memory issue? Any
general guidance appreciated.


Re: Discover SparkUI port for spark streaming job running in cluster mode

2015-12-14 Thread Jonathan Kelly
Are you running Spark on YARN? If so, you can get to the Spark UI via the
YARN ResourceManager. Each running Spark application will have a link on
the YARN ResourceManager labeled "ApplicationMaster". If you click that, it
will take you to the Spark UI, even if it is running on a slave node in the
case of yarn-cluster mode. It does this by proxying the Spark UI through
the YARN Proxy Server on the master node.

For completed applications, the link will be labeled "History" and will
take you to the Spark History Server (provided you have
set spark.yarn.historyServer.address in spark-defaults.conf).

As for getting the URL programmatically, the URL using the YARN ProxyServer
is easy to determine. It's just http://:/proxy/. (e.g.,
http://ip-10-150-65-11.ec2.internal:20888/proxy/application_1450128858020_0001/)
Then again, I'm not sure how easy it is to get the YARN application ID for
a Spark application without parsing the spark-submit logs. Or at least I
think I remember some other thread where that was mentioned.

~ Jonathan

On Mon, Dec 14, 2015 at 1:57 PM, Ashish Nigam 
wrote:

> Hi,
> I run spark streaming job in cluster mode. This means that driver can run
> in any data node. And Spark UI can run in any dynamic port.
> At present, I know about the port by looking at container logs that look
> something like this -
>
> server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:50571
> INFO util.Utils: Successfully started service 'SparkUI' on port 50571.
> INFO ui.SparkUI: Started SparkUI at http://xxx:50571
>
>
> Is there any way to know about the UI port automatically using some API?
>
> Thanks
> Ashish
>


Re: Kryo serialization fails when using SparkSQL and HiveContext

2015-12-14 Thread Michael Armbrust
You'll need to either turn off registration
(spark.kryo.registrationRequired) or create a custom register
spark.kryo.registrator

http://spark.apache.org/docs/latest/configuration.html#compression-and-serialization

On Mon, Dec 14, 2015 at 2:17 AM, Linh M. Tran 
wrote:

> Hi everyone,
> I'm using HiveContext and SparkSQL to query a Hive table and doing join
> operation on it.
> After changing the default serializer to Kryo with
> spark.kryo.registrationRequired = true, the Spark application failed with
> the following error:
>
> java.lang.IllegalArgumentException: Class is not registered:
> org.apache.spark.sql.catalyst.expressions.GenericRow
> Note: To register this class use:
> kryo.register(org.apache.spark.sql.catalyst.expressions.GenericRow.class);
> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
> at
> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:36)
> at
> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
> at
> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
> at
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:124)
> at
> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:204)
> at
> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> I'm using Spark 1.3.1 (HDP 2.3.0) and submitting Spark application to Yarn
> in cluster mode.
> Any help is appreciated.
> --
> Linh M. Tran
>


Re: Use of rdd.zipWithUniqueId() in DStream

2015-12-14 Thread Shixiong Zhu
It doesn't guarantee that. E.g.,

scala> sc.parallelize(Seq(1.0, 2.0, 3.0, 4.0), 2).filter(_ >
2.0).zipWithUniqueId().collect().foreach(println)

(3.0,1)

(4.0,3)

It only guarantees "unique".

Best Regards,
Shixiong Zhu

2015-12-13 10:18 GMT-08:00 Sourav Mazumder :

> Hi All,
>
> I'm trying to use zipWithUniqieId() function of RDD using transform
> function of dStream. It does generate unique id always starting from 0 and
> in sequence.
>
> However, not sure whether this is a reliable behavior which is always
> guaranteed to generate sequence number starting form 0.
>
> Can anyone confirm ?
>
> Regards,
> Sourav
>


Re: troubleshooting "Missing an output location for shuffle"

2015-12-14 Thread Ross.Cramblit
Hey Velijko,
I ran into this error a few days ago and it turned out I was out of disk space 
on a couple nodes. I am not sure if this was the direct cause of the error, but 
it stopped throwing when I cleared out some unneeded large files.


On Dec 14, 2015, at 5:32 PM, Veljko Skarich 
> wrote:

Hi,

I keep getting some variation of the following error:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output 
location for shuffle 2


Does anyone know what this might indicate? Is it a memory issue? Any general 
guidance appreciated.



Re: [SparkR] Any reason why saveDF's mode is append by default ?

2015-12-14 Thread Jeff Zhang
Thanks Shivaram, created https://issues.apache.org/jira/browse/SPARK-12318
I will work on it.

On Mon, Dec 14, 2015 at 4:13 PM, Shivaram Venkataraman <
shiva...@eecs.berkeley.edu> wrote:

> I think its just a bug -- I think we originally followed the Python
> API (in the original PR [1]) but the Python API seems to have been
> changed to match Scala / Java in
> https://issues.apache.org/jira/browse/SPARK-6366
>
> Feel free to open a JIRA / PR for this.
>
> Thanks
> Shivaram
>
> [1] https://github.com/amplab-extras/SparkR-pkg/pull/199/files
>
> On Sun, Dec 13, 2015 at 11:58 PM, Jeff Zhang  wrote:
> > It is inconsistent with scala api which is error by default. Any reason
> for
> > that ? Thanks
> >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
>



-- 
Best Regards

Jeff Zhang


Autoscaling of Spark YARN cluster

2015-12-14 Thread Mingyu Kim
Hi all,

Has anyone tried out autoscaling Spark YARN cluster on a public cloud (e.g.
EC2) based on workload? To be clear, I¹m interested in scaling the cluster
itself up and down by adding and removing YARN nodes based on the cluster
resource utilization (e.g. # of applications queued, # of resources
available), as opposed to scaling resources assigned to Spark applications,
which is natively supported by Spark¹s dynamic resource scheduling. I¹ve
found that Cloudbreak
  has
a similar feature, but it¹s in ³technical preview², and I didn¹t find much
else from my search.

This might be a general YARN question, but wanted to check if there¹s a
solution popular in the Spark community. Any sharing of experience around
autoscaling will be helpful!

Thanks,
Mingyu




smime.p7s
Description: S/MIME cryptographic signature


ALS mllib.recommendation vs ml.recommendation

2015-12-14 Thread Roberto Pagliari
Currently, there are two implementations of ALS available:
ml.recommendation.ALS
 and 
mllib.recommendation.ALS


  1.  How do they differ in terms of performance?
  2.  Am I correct to assume ml.recommendation.ALS (unlike mllib) does not 
support key-value RDDs? If so, what is the reason?


Thank you,