New Spark User Group in Florida

2016-03-06 Thread रविशंकर नायर
Hi Organizer,

We have just started a new user group for Spark in Florida. Can you please
add this entry in Spark community ? Thanks

Florida Spark Meetup


Best regards,
R Nair.


Re: how to implement ALS with csv file? getting error while calling Rating class

2016-03-06 Thread Nick Pentreath
As you've pointed out, Rating requires user and item ids in Int form. So
you will need to map String user ids to integers.

See this thread for example:
https://mail-archives.apache.org/mod_mbox/spark-user/201501.mbox/%3CCAJgQjQ9GhGqpg1=hvxpfrs+59elfj9f7knhp8nyqnh1ut_6...@mail.gmail.com%3E
.

There is a DeveloperApi method
in org.apache.spark.ml.recommendation.ALS that takes Rating with generic
type (can be String) for user id and item id. However that is a little more
involved, and for larger scale data will be a lot less efficient.

Something like this for example:

import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.recommendation.ALS.Rating

val conf = new SparkConf().setAppName("ALSWithStringID").setMaster("local[4]")
val sc = new SparkContext(conf)
// Name,Value1,Value2.
val rdd = sc.parallelize(Seq(
  Rating[String]("foo", "1", 4.0f),
  Rating[String]("foo", "2", 2.0f),
  Rating[String]("bar", "1", 5.0f),
  Rating[String]("bar", "3", 1.0f)
))
val (userFactors, itemFactors) = ALS.train(rdd)


As you can see, you just get the factor RDDs back, and if you want an
ALSModel you will have to construct it yourself.


On Sun, 6 Mar 2016 at 18:23 Shishir Anshuman 
wrote:

> I am new to apache Spark, and I want to implement the Alternating Least
> Squares algorithm. The data set is stored in a csv file in the format:
> *Name,Value1,Value2*.
>
> When I read the csv file, I get
> *java.lang.NumberFormatException.forInputString* error because the Rating
> class needs the parameters in the format: *(user: Int, product: Int,
> rating: Double)* and the first column of my file contains *Name*.
>
> Please suggest me a way to overcome this issue.
>


reading the parquet file in spark sql

2016-03-06 Thread Angel Angel
Hello Sir/Madam,

I am running one spark application having 3 slaves and one master.

I am wring the my information using the parquet format.

but when i am trying to read it got some error.
Please help me to resolve this problem.

code ;



val sqlContext = new org.apache.spark.sql.SQLContext(sc)


import sqlContext.implicits._


case class Table(Address: String, Couple_time: Int, WT_ID: Int, WT_Name:
String)


val df2 = sc.textFile("/root/Desktop/database.txt").map(_.split(",")).map(p
=> Table(p(0),p(1).trim.toInt, p(2).trim.toInt, p(3)))toDF


df2.write.parquet("Desktop/database2.parquet")




After that on master computer there is on folder name database2 have the
_success file

and on my slaves

 has the following tree

database2.parquet
└── _temporary
└── 0
├── task_201603071435__m_01
│   └── part-r-2.gz.parquet
├── task_201603071435__m_04
│   └── part-r-5.gz.parquet
└── _temporary



But when i am trying to read this file using following command i get the
error


val df1 = sqlContext.read.parquet("Desktop/database2.parquet")



error


ava.lang.AssertionError: assertion failed: No schema defined, and no
Parquet data file or summary file found under file:/root/database2.parquet.

at scala.Predef$.assert(Predef.scala:179)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.org$apache$spark$sql$parquet$ParquetRelation2$MetadataCache$$readSchema(newParquet.scala:443)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$15.apply(newParquet.scala:385)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$15.apply(newParquet.scala:385)

at scala.Option.orElse(Option.scala:257)

at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache.refresh(newParquet.scala:385)

at org.apache.spark.sql.parquet.ParquetRelation2.org
$apache$spark$sql$parquet$ParquetRelation2$$metadataCache$lzycompute(newParquet.scala:154)

at org.apache.spark.sql.parquet.ParquetRelation2.org
$apache$spark$sql$parquet$ParquetRelation2$$metadataCache(newParquet.scala:152)

at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:193)

at
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$dataSchema$1.apply(newParquet.scala:193)

at scala.Option.getOrElse(Option.scala:120)

at
org.apache.spark.sql.parquet.ParquetRelation2.dataSchema(newParquet.scala:193)

at org.apache.spark.sql.sources.HadoopFsRel




Thanks.


Re: Spark reduce serialization question

2016-03-06 Thread Holden Karau
You might want to try treeAggregate

On Sunday, March 6, 2016, Takeshi Yamamuro  wrote:

> Hi,
>
> I'm not exactly sure what's your codes like though, ISTM this is a correct
> behaviour.
> If the size of data that a driver fetches exceeds the limit, the driver
> throws this exception.
> (See
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L68
> )
> So, in your case, your driver tries to fetch 1345.5 MB data of 4 models
> from executors, then it fails.
> Thanks,
>
> On Sat, Mar 5, 2016 at 6:11 AM, James Jia  > wrote:
>
>> I'm running a distributed KMeans algorithm with 4 executors.
>>
>> I have a RDD[Data]. I use mapPartition to run a learner on each data 
>> partition, and then call reduce with my custom model reduce function to 
>> reduce the result of the model to start a new iteration.
>>
>> The model size is around ~330 MB. I would expect that the required memory 
>> for the serialized result at the driver to be at most 2*300 MB in order to 
>> reduce two models, but it looks like Spark is serializing all of the models 
>> to the driver before reducing.
>>
>> The error message says that the total size of the serialized results is 
>> 1345.5MB, which is approximately 4 * 330 MB. I know I can set the driver's 
>> max result size, but I just want to confirm that this is expected behavior.
>>
>> Thanks!
>>
>> James
>>
>> Stage 0:==>(1 + 3) / 
>> 4]16/02/19 05:59:28 ERROR TaskSetManager: Total size of serialized results 
>> of 4 tasks (1345.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Total 
>> size of serialized results of 4 tasks (1345.5 MB) is bigger than 
>> spark.driver.maxResultSize (1024.0 MB)
>>
>>   at org.apache.spark.scheduler.DAGScheduler.org 
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
>>
>>   at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
>>
>>   at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
>>
>>   at 
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>
>>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>>
>>   at 
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
>>
>>   at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>>
>>   at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>>
>>   at scala.Option.foreach(Option.scala:257)
>>
>>   at 
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>>
>>   at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
>>
>>   at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>>
>>   at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>>
>>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>>
>>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
>>
>>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
>>
>>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
>>
>>   at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1007)
>>
>>   at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>>
>>   at 
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>>
>>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>>
>>   at org.apache.spark.rdd.RDD.reduce(RDD.scala:989)
>>
>>   at BIDMach.RunOnSpark$.runOnSpark(RunOnSpark.scala:50)
>>
>>   ... 50 elided
>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau


Re: Spark reduce serialization question

2016-03-06 Thread Takeshi Yamamuro
Hi,

I'm not exactly sure what's your codes like though, ISTM this is a correct
behaviour.
If the size of data that a driver fetches exceeds the limit, the driver
throws this exception.
(See
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L68
)
So, in your case, your driver tries to fetch 1345.5 MB data of 4 models
from executors, then it fails.
Thanks,

On Sat, Mar 5, 2016 at 6:11 AM, James Jia  wrote:

> I'm running a distributed KMeans algorithm with 4 executors.
>
> I have a RDD[Data]. I use mapPartition to run a learner on each data 
> partition, and then call reduce with my custom model reduce function to 
> reduce the result of the model to start a new iteration.
>
> The model size is around ~330 MB. I would expect that the required memory for 
> the serialized result at the driver to be at most 2*300 MB in order to reduce 
> two models, but it looks like Spark is serializing all of the models to the 
> driver before reducing.
>
> The error message says that the total size of the serialized results is 
> 1345.5MB, which is approximately 4 * 330 MB. I know I can set the driver's 
> max result size, but I just want to confirm that this is expected behavior.
>
> Thanks!
>
> James
>
> Stage 0:==>(1 + 3) / 
> 4]16/02/19 05:59:28 ERROR TaskSetManager: Total size of serialized results of 
> 4 tasks (1345.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 4 tasks (1345.5 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
>
>   at org.apache.spark.scheduler.DAGScheduler.org 
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
>
>   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
>   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>
>   at scala.Option.foreach(Option.scala:257)
>
>   at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
>
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>
>   at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>
>   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>
>   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
>
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
>
>   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)
>
>   at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1007)
>
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
>
>   at org.apache.spark.rdd.RDD.reduce(RDD.scala:989)
>
>   at BIDMach.RunOnSpark$.runOnSpark(RunOnSpark.scala:50)
>
>   ... 50 elided
>
>


-- 
---
Takeshi Yamamuro


Understanding the Web_UI 4040

2016-03-06 Thread Angel Angel
Hello Sir/Madam,


I am running the spark-sql application on the cluster.
In my cluster there are 3 slaves and one Master.

When i saw the progress of my application in web UI hadoopm0:8080

I found that one of my slaves node is always in  *LOADDING *mode.

Can you tell me what is that mean?

Also i am unable to see the DAG graph (As click on the DAG graph it hangs
the scree for some time).

[image: Inline image 1]


YARN nodemanager always uses its own hostname

2016-03-06 Thread HIGUCHI Daisuke
Hello,

I build Spark on HDFS/YARN cluster on Docker Containers.

* Spark on YARN version
  - Spark 1.6.0
  - Hadoop 2.6.0 (CDH 5.6.0)
  - Oracle Java 1.8.0_74

There are one HDFS/YARN master and one HDFS/YARN worker on each containers.

spark-yarn-master container has below hostname and IP addr.

hostname: spark-yarn-master-1-sxegt (pod name)
IP addr.: 172.17.0.11

hostname: spark-yarn-master (alias DNS name)
IP addr.: 172.30.242.57 (alias IP addr.)

bash-4.2$ cat /etc/hosts
# Kubernetes-managed hosts file.
127.0.0.1   localhost
::1 localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
fe00::0 ip6-mcastprefix
fe00::1 ip6-allnodes
fe00::2 ip6-allrouters
172.17.0.11 spark-yarn-master-1-sxegt
bash-4.2$

bash-4.2$ ip -4 addr show dev eth0
50: eth0@if51:  mtu 1500 qdisc noqueue state 
UP  link-netnsid 0
inet 172.17.0.11/16 scope global eth0
   valid_lft forever preferred_lft forever
bash-4.2$

bash-4.2$ hostname -f
spark-yarn-master-1-sxegt
bash-4.2$ 

bash-4.2$ curl -v spark-yarn-master:8020
* About to connect() to spark-yarn-master port 8020 (#0)
*   Trying 172.30.242.57...
* Connected to spark-yarn-master (172.30.242.57) port 8020 (#0)
> GET / HTTP/1.1
> User-Agent: curl/7.29.0
> Host: spark-yarn-master:8020
> Accept: */*
> 
< HTTP/1.1 404 Not Found
< Content-type: text/plain
* no chunk, no close, no size. Assume close to signal end
< 
It looks like you are making an HTTP request to a Hadoop IPC port. This is not 
the correct port for the web interface on this daemon.
* Closing connection 0
bash-4.2$ 

spark-yarn-worker container has below hostname and IP addr.

hostname: spark-yarn-worker-1-pshqi (pod name)
IP addr.: 172.17.0.12

hostname: spark-yarn-worker (alias DNS name)
IP addr.: 172.30.1.53 (alias IP addr.)

bash-4.2$ cat /etc/hosts
# Kubernetes-managed hosts file.
127.0.0.1   localhost
::1 localhost ip6-localhost ip6-loopback
fe00::0 ip6-localnet
fe00::0 ip6-mcastprefix
fe00::1 ip6-allnodes
fe00::2 ip6-allrouters
172.17.0.12 spark-yarn-worker-1-pshqi
bash-4.2$

bash-4.2$ ip -4 addr show dev eth0
52: eth0@if53:  mtu 1500 qdisc noqueue state 
UP  link-netnsid 0
inet 172.17.0.12/16 scope global eth0
   valid_lft forever preferred_lft forever
bash-4.2$ 

bash-4.2$ hostname -f
spark-yarn-worker-1-pshqi
bash-4.2$ 

bash-4.2$ curl -v spark-yarn-worker:8040
* About to connect() to spark-yarn-worker port 8040 (#0)
*   Trying 172.30.1.53...
* Connected to spark-yarn-worker (172.30.1.53) port 8040 (#0)
> GET / HTTP/1.1
> User-Agent: curl/7.29.0
> Host: spark-yarn-worker:8040
> Accept: */*
> 
< HTTP/1.1 404 Not Found
< Content-type: text/plain
* no chunk, no close, no size. Assume close to signal end
< 
It looks like you are making an HTTP request to a Hadoop IPC port. This is not 
the correct port for the web interface on this daemon.
* Closing connection 0
bash-4.2$ 

Spark HDFS/YARN master/worker nodes can connect each other by alias DNS name.

On master, To worker (alias DNS name):

bash-4.2$ hostname -f ; curl spark-yarn-worker:8040
spark-yarn-master-1-sxegt
It looks like you are making an HTTP request to a Hadoop IPC port. This is not 
the correct port for the web interface on this daemon.
bash-4.2$ 

On worker, To master (alias DNS name):

bash-4.2$ hostname -f ; curl spark-yarn-master:8020
spark-yarn-worker-1-pshqi
It looks like you are making an HTTP request to a Hadoop IPC port. This is not 
the correct port for the web interface on this daemon.
bash-4.2$ 

They cannot connect each other by hostname.

On master, To worker (hostname):

bash-4.2$ hostname -f ; curl spark-yarn-master-1-sxegt:8020
spark-yarn-worker-1-pshqi
curl: (6) Could not resolve host: spark-yarn-master-1-sxegt; Name or service 
not known
bash-4.2$ 

On worker, To master (hostname):

bash-4.2$ hostname -f ; curl spark-yarn-worker-1-pshqi:8040
spark-yarn-master-1-sxegt
curl: (6) Could not resolve host: spark-yarn-worker-1-pshqi; Name or service 
not known
bash-4.2$ 

So, I want HDFS/YARN to use alias DNS name instead of hostname.
But YARN nodemanager always uses hostname
even configuredyarn.nodemanager.hostname to alias DNS name.


HDFS/YARN worker log:

16/03/07 10:04:38 INFO datanode.DataNode: Configured hostname is 
spark-yarn-worker
:
16/03/07 10:04:42 INFO security.NMContainerTokenSecretManager: Updating node 
address : spark-yarn-worker-1-pshqi:39352
:
16/03/07 10:04:42 INFO containermanager.ContainerManagerImpl: ContainerManager 
started at spark-yarn-worker-1-pshqi/172.17.0.12:39352
:
16/03/07 10:04:44 INFO nodemanager.NodeStatusUpdaterImpl: Registered with 
ResourceManager as spark-yarn-worker-1-pshqi:39352 with total resource of 

:
16/03/07 10:04:44 INFO common.Storage: Lock on 
/var/lib/hadoop-hdfs/cache/hdfs/dfs/data/in_use.lock acquired by nodename 
16@spark-yarn-worker-1-pshqi
:


HDFS/YARN master log:

16/03/07 10:04:44 INFO 

Re: Avro SerDe Issue w/ Manual Partitions?

2016-03-06 Thread Chris Miller
For anyone running into this same issue, it looks like Avro deserialization
is just broken when used with SparkSQL and partitioned schemas. I created
an bug report with details and a simplified example on how to reproduce:
https://issues.apache.org/jira/browse/SPARK-13709


--
Chris Miller

On Fri, Mar 4, 2016 at 12:11 AM, Chris Miller 
wrote:

> One more thing -- just to set aside any question about my specific schema
> or data, I used the sample schema and data record from Oracle's
> documentation on Avro support. It's a pretty simple schema:
> https://docs.oracle.com/cd/E26161_02/html/GettingStartedGuide/jsonbinding-overview.html
>
> When I create a table with this schema and then try to query the
> Avro-encoded record, I get the same type of error:
>
> 
>  org.apache.avro.AvroTypeException: Found avro.FullName, expecting union
> at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
> at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
> at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:155)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)
> at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer$SchemaReEncoder.reencode(AvroDeserializer.java:111)
> at
> org.apache.hadoop.hive.serde2.avro.AvroDeserializer.deserialize(AvroDeserializer.java:175)
> at
> org.apache.hadoop.hive.serde2.avro.AvroSerDe.deserialize(AvroSerDe.java:201)
> at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:409)
> at
> org.apache.spark.sql.hive.HadoopTableReader$$anonfun$fillObject$2.apply(TableReader.scala:408)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at scala.collection.TraversableOnce$class.to
> (TraversableOnce.scala:273)
> at scala.collection.AbstractIterator.to(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
> at
> org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> at org.apache.spark.scheduler.Task.run(Task.scala:89)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 
>
> To me, this "feels" like a bug -- I just can't identify if it's a Spark
> issue or an Avro issue. Decoding the same files work fine with Hive, and I
> imagine the same deserializer code is used there too.
>
> Thoughts?
>
> --
> Chris Miller
>
> On Thu, Mar 3, 2016 at 9:38 PM, Igor Berman  wrote:
>
>> your field name is
>> *enum1_values*
>>
>> but you have data
>> { "foo1": "test123", *"enum1"*: "BLUE" }
>>
>> i.e. since you defined enum and not union(null, enum)
>> it tries to find value for enum1_values and doesn't find one...
>>
>> On 3 March 2016 at 11:30, Chris Miller  wrote:
>>
>>> I've been digging into this a little deeper. Here's what I've found:
>>>
>>> test1.avsc:
>>> 
>>> {
>>>   "namespace": "com.cmiller",
>>>   "name": "test1",
>>>   "type": "record",
>>>   "fields": [
>>> { "name":"foo1", "type":"string" }
>>>   ]
>>> }
>>> 
>>>
>>> test2.avsc:
>>> 
>>> {
>>>   "namespace": 

Re: Is Spark right for us?

2016-03-06 Thread Chris Miller
Gut instinct is no, Spark is overkill for your needs... you should be able
to accomplish all of that with a relational database or a column oriented
database (depending on the types of queries you most frequently run and the
performance requirements).

--
Chris Miller

On Mon, Mar 7, 2016 at 1:17 AM, Laumegui Deaulobi <
guillaume.bilod...@gmail.com> wrote:

> Our problem space is survey analytics.  Each survey comprises a set of
> questions, with each question having a set of possible answers.  Survey
> fill-out tasks are sent to users, who have until a certain date to complete
> it.  Based on these survey fill-outs, reports need to be generated.  Each
> report deals with a subset of the survey fill-outs, and comprises a set of
> data points (average rating for question 1, min/max for question 2, etc.)
>
> We are dealing with rather large data sets - although reading the internet
> we get the impression that everyone is analyzing petabytes of data...
>
> Users: up to 100,000
> Surveys: up to 100,000
> Questions per survey: up to 100
> Possible answers per question: up to 10
> Survey fill-outs / user: up to 10
> Reports: up to 100,000
> Data points per report: up to 100
>
> Data is currently stored in a relational database but a migration to a
> different kind of store is possible.
>
> The naive algorithm for report generation can be summed up as this:
>
> for each report to be generated {
>   for each report data point to be calculated {
> calculate data point
> add data point to report
>   }
>   publish report
> }
>
> In order to deal with the upper limits of these values, we will need to
> distribute this algorithm to a compute / data cluster as much as possible.
>
> I've read about frameworks such as Apache Spark but also Hadoop, GridGain,
> HazelCast and several others, and am still confused as to how each of these
> can help us and how they fit together.
>
> Is Spark the right framework for us?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Is Spark right for us?

2016-03-06 Thread Peyman Mohajerian
if your relational database has enough computing power, you don't have to
change it. You can just run SQL queries on top of it or even run Spark
queries over it. There is no hard-fast rule about using big data tools.
Usually people or organizations don't jump into big data for one specific
use case, it is a journey that involves multiple use cases, and future
growth and a lot more. If you data is already fitting in relational store
and you can used the existing SQL analytics and BI tool why consider other
options, unless you want to learn something new or data will grow over time
and you want to future proof it.

On Sun, Mar 6, 2016 at 12:59 PM, Krishna Sankar  wrote:

> Good question. It comes to computational complexity, computational scale
> and data volume.
>
>1. If you can store the data in a single server or a small cluster of
>db server (say mysql) then hdfs/Spark might be an overkill
>2. If you can run the computation/process the data on a single machine
>(remember servers with 512 GB memory and quad core CPUs can do a lot of
>stuff) then Spark is an overkill
>3. Even if you can do computations #1 & #2 above, in a pipeline and
>tolerate the elapsed time, Spark might be an overkill
>4. But if you require data/computation parallelism or distributed
>processing of data due to computation complexities or data volume or time
>constraints incl real time analytics, Spark is the right stack.
>5. Taking a quick look at what you have described so far, probably
>Spark is not needed.
>
> Cheers & HTH
> 
>
> On Sun, Mar 6, 2016 at 9:17 AM, Laumegui Deaulobi <
> guillaume.bilod...@gmail.com> wrote:
>
>> Our problem space is survey analytics.  Each survey comprises a set of
>> questions, with each question having a set of possible answers.  Survey
>> fill-out tasks are sent to users, who have until a certain date to
>> complete
>> it.  Based on these survey fill-outs, reports need to be generated.  Each
>> report deals with a subset of the survey fill-outs, and comprises a set of
>> data points (average rating for question 1, min/max for question 2, etc.)
>>
>> We are dealing with rather large data sets - although reading the internet
>> we get the impression that everyone is analyzing petabytes of data...
>>
>> Users: up to 100,000
>> Surveys: up to 100,000
>> Questions per survey: up to 100
>> Possible answers per question: up to 10
>> Survey fill-outs / user: up to 10
>> Reports: up to 100,000
>> Data points per report: up to 100
>>
>> Data is currently stored in a relational database but a migration to a
>> different kind of store is possible.
>>
>> The naive algorithm for report generation can be summed up as this:
>>
>> for each report to be generated {
>>   for each report data point to be calculated {
>> calculate data point
>> add data point to report
>>   }
>>   publish report
>> }
>>
>> In order to deal with the upper limits of these values, we will need to
>> distribute this algorithm to a compute / data cluster as much as possible.
>>
>> I've read about frameworks such as Apache Spark but also Hadoop, GridGain,
>> HazelCast and several others, and am still confused as to how each of
>> these
>> can help us and how they fit together.
>>
>> Is Spark the right framework for us?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Is Spark right for us?

2016-03-06 Thread Krishna Sankar
Good question. It comes to computational complexity, computational scale
and data volume.

   1. If you can store the data in a single server or a small cluster of db
   server (say mysql) then hdfs/Spark might be an overkill
   2. If you can run the computation/process the data on a single machine
   (remember servers with 512 GB memory and quad core CPUs can do a lot of
   stuff) then Spark is an overkill
   3. Even if you can do computations #1 & #2 above, in a pipeline and
   tolerate the elapsed time, Spark might be an overkill
   4. But if you require data/computation parallelism or distributed
   processing of data due to computation complexities or data volume or time
   constraints incl real time analytics, Spark is the right stack.
   5. Taking a quick look at what you have described so far, probably Spark
   is not needed.

Cheers & HTH


On Sun, Mar 6, 2016 at 9:17 AM, Laumegui Deaulobi <
guillaume.bilod...@gmail.com> wrote:

> Our problem space is survey analytics.  Each survey comprises a set of
> questions, with each question having a set of possible answers.  Survey
> fill-out tasks are sent to users, who have until a certain date to complete
> it.  Based on these survey fill-outs, reports need to be generated.  Each
> report deals with a subset of the survey fill-outs, and comprises a set of
> data points (average rating for question 1, min/max for question 2, etc.)
>
> We are dealing with rather large data sets - although reading the internet
> we get the impression that everyone is analyzing petabytes of data...
>
> Users: up to 100,000
> Surveys: up to 100,000
> Questions per survey: up to 100
> Possible answers per question: up to 10
> Survey fill-outs / user: up to 10
> Reports: up to 100,000
> Data points per report: up to 100
>
> Data is currently stored in a relational database but a migration to a
> different kind of store is possible.
>
> The naive algorithm for report generation can be summed up as this:
>
> for each report to be generated {
>   for each report data point to be calculated {
> calculate data point
> add data point to report
>   }
>   publish report
> }
>
> In order to deal with the upper limits of these values, we will need to
> distribute this algorithm to a compute / data cluster as much as possible.
>
> I've read about frameworks such as Apache Spark but also Hadoop, GridGain,
> HazelCast and several others, and am still confused as to how each of these
> can help us and how they fit together.
>
> Is Spark the right framework for us?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark Custom Partitioner not picked

2016-03-06 Thread Prabhu Joseph
Hi All,

When i am submitting a spark job on YARN with Custom Partitioner, it is
not picked by Executors. Executors still using the default HashPartitioner.
I added logs into both HashPartitioner (org/apache/spark/Partitioner.scala)
and Custom Partitioner. The completed executor logs shows HashPartitioner.

Below is the Spark application code with Custom Partitioner and the log
line which is added into HashPartitioner class of Partition.scala

 
log.info("HashPartitioner="+key+"---"+numPartitions+""+Utils.nonNegativeMod(key.hashCode,
numPartitions))

The Executor logs has

16/03/06 15:20:27 INFO spark.HashPartitioner: HashPartitioner=INFO---42
16/03/06 15:20:27 INFO spark.HashPartitioner: HashPartitioner=INFO---42


How to make sure, the executors are picking the right partitioner.



*Code:*
package org.apache.spark

class ExactPartitioner(partitions: Int) extends Partitioner with Logging{

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = {

*   log.info ("ExactPartitioner="+key)*

   key match{
   case "INFO" => 0
   case "DEBUG" => 1
   case "ERROR" => 2
   case "WARN" => 3
   case "FATAL" => 4
   }
  }
}

object GroupByCLDB {

def main(args: Array[String]) {

val logFile = "/DATA"

val sparkConf = new SparkConf().setAppName("GroupBy")
sparkConf.set("spark.executor.memory","4g");
sparkConf.set("spark.executor.cores","2");
sparkConf.set("spark.executor.instances","2");

val sc = new SparkContext(sparkConf)
val logData = sc.textFile(logFile)


case class LogClass(one:String,two:String)

def parse(line: String) = {
  val pieces = line.split(' ')
  val level = pieces(2).toString
  val one = pieces(0).toString
  val two = pieces(1).toString
  (level,LogClass(one,two))
  }

val output = logData.map(x => parse(x))

*val partitioned = output.partitionBy(new ExactPartitioner(5)).persist()val
groups = partitioned.groupByKey(new ExactPartitioner(5))*
groups.count()

output.partitions.size
partitioned.partitions.size

}
}



Thanks,
Prabhu Joseph


Re: Is Spark right for us?

2016-03-06 Thread Gourav Sengupta
Hi,

once again that is all about tooling.

Regards,
Gourav Sengupta

On Sun, Mar 6, 2016 at 7:52 PM, Mich Talebzadeh 
wrote:

> Hi,
>
>
>
> What is the current size of your relational database?
>
>
>
> Are we talking about a row based RDBMS (Oracle, Sybase) or a columnar one
> (Teradata/ Sybase IQ)?
>
>
>
> I assume that you will be using SQL wherever you migrate to. The
> SQL-on-Hadoop tools are divided between well thought out solutions like
> Hive which actually have the use case of being your Data Warehouse
> infrastructure, to others which are relational database replacements to
> just query engines. Many SQL query engines whether they are Impala, Drill,
> Spark SQL or Presto have varying capabilities to query data in Hive. So
> here Spark is effectively a query engine. However, you still have to
> migrate your data to it. You can easily use Sqoop to migrate data from your
> RDBMS to Hive pretty straight forward (it will do table creation and
> population in Hive via JDBC). You mentioned HazelCast but that is just Data
> Grid much like Oracle Coherence Cache. You can of course push your data
> from your RDBMS to JMS or something similar in XML format using triggers or
> replication server (GoldenGate/SAP Replication server)  and eventually you
> will want to store that data somewhere in Big Data once in Data Grid. I
> have explained the architecture here
> 
>
>
> So there are few questions to be asked:
>
>
>
>1. Choose a Data Warehouse in Big Data. The likelihood will be
>something like Hive that supports ACID properties and has the nearest to
>Ansi-SQL on Big data. Your users will be productive on it assuming they
>know SQL (which they ought)
>2. Once you have chosen your target Data Warehouse, you will need to
>consider various query tools like Spark that provides Spark-shell and
>Spark-sql tools among other things. It provides SQL interface plus
>functional programming through Scala etc. It is a pretty impressive query
>engine with in-memory calculation and DAG
>3. You can also use other visualisation tools like Tableau etc for
>user interface.
>
>
> HTH
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 6 March 2016 at 19:14, Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> SPARK is just tooling, and its even not tooling. You can consider SPARK a
>> distributed operating system like YARN. You should read books like HADOOP
>> Application Architecture, Big Data (Nathan Marz) and other disciplines
>> before starting to consider how the solution is built.
>>
>> Most of the big data projects (like any other BI projects) do not deliver
>> value or turn extremely expensive to maintain because the approach is that
>> tools solve the problem.
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Sun, Mar 6, 2016 at 5:25 PM, Guillaume Bilodeau <
>> guillaume.bilod...@gmail.com> wrote:
>>
>>> The data is currently stored in a relational database, but a migration
>>> to a document-oriented database such as MongoDb is something we are
>>> definitely considering.  How does this factor in?
>>>
>>> On Sun, Mar 6, 2016 at 12:23 PM, Gourav Sengupta <
>>> gourav.sengu...@gmail.com> wrote:
>>>
 Hi,

 That depends on a lot of things, but as a starting point I would ask
 whether you are planning to store your data in JSON format?


 Regards,
 Gourav Sengupta

 On Sun, Mar 6, 2016 at 5:17 PM, Laumegui Deaulobi <
 guillaume.bilod...@gmail.com> wrote:

> Our problem space is survey analytics.  Each survey comprises a set of
> questions, with each question having a set of possible answers.  Survey
> fill-out tasks are sent to users, who have until a certain date to
> complete
> it.  Based on these survey fill-outs, reports need to be generated.
> Each
> report deals with a subset of the survey fill-outs, and comprises a
> set of
> data points (average rating for question 1, min/max for question 2,
> etc.)
>
> We are dealing with rather large data sets - although reading the
> internet
> we get the impression that everyone is analyzing petabytes of data...
>
> Users: up to 100,000
> Surveys: up to 100,000
> Questions per survey: up to 100
> Possible answers per question: up to 10
> Survey fill-outs / user: up to 10
> Reports: up to 100,000
> Data points per report: up to 100
>
> Data is currently stored in a relational database but a migration to a
> different kind of store is possible.
>
> The naive algorithm for report generation can be summed up as this:

Re: Is Spark right for us?

2016-03-06 Thread Mich Talebzadeh
Hi,



What is the current size of your relational database?



Are we talking about a row based RDBMS (Oracle, Sybase) or a columnar one
(Teradata/ Sybase IQ)?



I assume that you will be using SQL wherever you migrate to. The
SQL-on-Hadoop tools are divided between well thought out solutions like
Hive which actually have the use case of being your Data Warehouse
infrastructure, to others which are relational database replacements to
just query engines. Many SQL query engines whether they are Impala, Drill,
Spark SQL or Presto have varying capabilities to query data in Hive. So
here Spark is effectively a query engine. However, you still have to
migrate your data to it. You can easily use Sqoop to migrate data from your
RDBMS to Hive pretty straight forward (it will do table creation and
population in Hive via JDBC). You mentioned HazelCast but that is just Data
Grid much like Oracle Coherence Cache. You can of course push your data
from your RDBMS to JMS or something similar in XML format using triggers or
replication server (GoldenGate/SAP Replication server)  and eventually you
will want to store that data somewhere in Big Data once in Data Grid. I
have explained the architecture here



So there are few questions to be asked:



   1. Choose a Data Warehouse in Big Data. The likelihood will be something
   like Hive that supports ACID properties and has the nearest to Ansi-SQL on
   Big data. Your users will be productive on it assuming they know SQL (which
   they ought)
   2. Once you have chosen your target Data Warehouse, you will need to
   consider various query tools like Spark that provides Spark-shell and
   Spark-sql tools among other things. It provides SQL interface plus
   functional programming through Scala etc. It is a pretty impressive query
   engine with in-memory calculation and DAG
   3. You can also use other visualisation tools like Tableau etc for user
   interface.


HTH



Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 6 March 2016 at 19:14, Gourav Sengupta  wrote:

> Hi,
>
> SPARK is just tooling, and its even not tooling. You can consider SPARK a
> distributed operating system like YARN. You should read books like HADOOP
> Application Architecture, Big Data (Nathan Marz) and other disciplines
> before starting to consider how the solution is built.
>
> Most of the big data projects (like any other BI projects) do not deliver
> value or turn extremely expensive to maintain because the approach is that
> tools solve the problem.
>
>
> Regards,
> Gourav Sengupta
>
> On Sun, Mar 6, 2016 at 5:25 PM, Guillaume Bilodeau <
> guillaume.bilod...@gmail.com> wrote:
>
>> The data is currently stored in a relational database, but a migration to
>> a document-oriented database such as MongoDb is something we are definitely
>> considering.  How does this factor in?
>>
>> On Sun, Mar 6, 2016 at 12:23 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> That depends on a lot of things, but as a starting point I would ask
>>> whether you are planning to store your data in JSON format?
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Sun, Mar 6, 2016 at 5:17 PM, Laumegui Deaulobi <
>>> guillaume.bilod...@gmail.com> wrote:
>>>
 Our problem space is survey analytics.  Each survey comprises a set of
 questions, with each question having a set of possible answers.  Survey
 fill-out tasks are sent to users, who have until a certain date to
 complete
 it.  Based on these survey fill-outs, reports need to be generated.
 Each
 report deals with a subset of the survey fill-outs, and comprises a set
 of
 data points (average rating for question 1, min/max for question 2,
 etc.)

 We are dealing with rather large data sets - although reading the
 internet
 we get the impression that everyone is analyzing petabytes of data...

 Users: up to 100,000
 Surveys: up to 100,000
 Questions per survey: up to 100
 Possible answers per question: up to 10
 Survey fill-outs / user: up to 10
 Reports: up to 100,000
 Data points per report: up to 100

 Data is currently stored in a relational database but a migration to a
 different kind of store is possible.

 The naive algorithm for report generation can be summed up as this:

 for each report to be generated {
   for each report data point to be calculated {
 calculate data point
 add data point to report
   }
   publish report
 }

 In order to deal with the upper limits of these values, we will need to
 distribute this algorithm to a compute / 

Re: MLLib + Streaming

2016-03-06 Thread Lan Jiang
Thanks, Guru. After reading the implementation of StreamingKMean, 
StreamingLinearRegressionWithSGD and StreamingLogisticRegressionWithSGD, I 
reached the same conclusion. But unfortunately, this distinction between true 
online learning and offline learning are implied in the documentation and I was 
not sure if my understanding was correct or not. Thanks for confirming this!

However, I have a different opinion on your last paragraph —  that we cannot 
hold test data during model training for online learning. Taking 
StreamingLinearRegressionWithSGD for example, you can certainly split the each 
micro-batch as 70% — 30% and do evaluation based on the RMSE. At the very 
beginning, the RMSE will be large. But as more and more micro-batch arrives, 
you should see RMSE becomes smaller as the weights approach optimal. IMHO, I 
don’t see much difference regarding holding test data between online and 
offline learning.  

Lan

> On Mar 6, 2016, at 2:43 AM, Chris Miller  wrote:
> 
> Guru:This is a really great response. Thanks for taking the time to explain 
> all of this. Helpful for me too.
> 
> 
> --
> Chris Miller
> 
> On Sun, Mar 6, 2016 at 1:54 PM, Guru Medasani  > wrote:
> Hi Lan,
> 
> Streaming Means, Linear Regression and Logistic Regression support online 
> machine learning as you mentioned. Online machine learning is where model is 
> being trained and updated on every batch of streaming data. These models have 
> trainOn() and predictOn() methods where you can simply pass in DStreams you 
> want to train the model on and DStreams you want the model to predict on. So 
> when the next batch of data arrives model is trained and updated again. In 
> this case model weights are continually updated and hopefully model performs 
> better in terms of convergence and accuracy over time. What we are really 
> trying to do in online learning case is that we are only showing few examples 
> of the data at a time ( stream of data) and updating the parameters in case 
> of Linear and Logistic Regression and updating the centers in case of 
> K-Means. In the case of Linear or Logistic Regression this is possible due to 
> the optimizer that is chosen for minimizing the cost function which is 
> Stochastic Gradient Descent. This optimizer helps us to move closer and 
> closer to the optimal weights after every batch and over the time we will 
> have a model that has learned how to represent our data and predict well.
> 
> In the scenario of using any MLlib algorithms and doing training with 
> DStream.transform() and DStream.foreachRDD() operations, when the first batch 
> of data arrives we build a model, let’s call this model1. Once you have the 
> model1 you can make predictions on the same DStream or a different DStream 
> source. But for the next batch if you follow the same procedure and create a 
> model, let’s call this model2. This model2 will be significantly different 
> than model1 based on how different the data is in the second DStream vs the 
> first DStream as it is not continually updating the model. It’s like weight 
> vectors are jumping from one place to the other for every batch and we never 
> know if the algorithm is converging to the optimal weights. So I believe it 
> is not possible to do true online learning with other MLLib models in Spark 
> Streaming.  I am not sure if this is because the models don’t generally 
> support this streaming scenarios or if the streaming versions simply haven’t 
> been implemented yet.
> 
> Though technically you can use any of the MLlib algorithms in Spark Streaming 
> with the procedure you mentioned and make predictions, it is important to 
> figure out if the model you are choosing can converge by showing only a 
> subset(batches  - DStreams) of the data over time. Based on the algorithm you 
> choose certain optimizers won’t necessarily be able to converge by showing 
> only individual data points and require to see majority of the data to be 
> able to learn optimal weights.  In these cases, you can still do offline 
> learning/training with Spark bach processing using any of the MLlib 
> algorithms and save those models on hdfs. You can then start a streaming job 
> and load these saved models into your streaming application and make 
> predictions. This is traditional offline learning.
> 
> In general, online learning is hard as it’s hard to evaluate since we are not 
> holding any test data during the model training. We are simply training the 
> model and predicting. So in the initial batches, results can vary quite a bit 
> and have significant errors in terms of the predictions. So choosing online 
> learning vs. offline learning depends on how much tolerance the application 
> can have towards wild predictions in the beginning. Offline training is 
> simple and cheap where as online training can be hard and needs to be 
> constantly monitored to see how it is performing.
> 
> 

Re: Spark ML and Streaming

2016-03-06 Thread Lan Jiang
Sorry, accidentally sent again. My apology.  

> On Mar 6, 2016, at 1:22 PM, Lan Jiang  wrote:
> 
> Hi, there
> 
> I hope someone can clarify this for me.  It seems that some of the MLlib 
> algorithms such as KMean, Linear Regression and Logistics Regression have a 
> Streaming version, which can do online machine learning. But does that mean 
> other MLLib algorithm cannot be used in Spark streaming applications, such as 
> random forest, SVM, collaborate filtering, etc??
> 
> DStreams are essentially a sequence of RDDs. We can use DStream.transform() 
> and DStream.foreachRDD() operations, which allows you access RDDs in a 
> DStream and apply MLLib functions on them. So it looks like all MLLib 
> algorithms should be able to run in the streaming application. Am I wrong? 
> 
> Thanks in advance.
> 
> Lan


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark ML and Streaming

2016-03-06 Thread Lan Jiang
Hi, there

I hope someone can clarify this for me.  It seems that some of the MLlib 
algorithms such as KMean, Linear Regression and Logistics Regression have a 
Streaming version, which can do online machine learning. But does that mean 
other MLLib algorithm cannot be used in Spark streaming applications, such as 
random forest, SVM, collaborate filtering, etc??

DStreams are essentially a sequence of RDDs. We can use DStream.transform() and 
DStream.foreachRDD() operations, which allows you access RDDs in a DStream and 
apply MLLib functions on them. So it looks like all MLLib algorithms should be 
able to run in the streaming application. Am I wrong? 

Thanks in advance.

Lan
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark + Kafka all messages being used in 1 batch

2016-03-06 Thread Shahbaz
   - Do you happen to see how busy are the nodes in terms of CPU and how
   much heap each executor is allocated with.
   - If there is enough capacity ,you may want to increase number of cores
   per executor to 2 and do the needed heap tweaking.
   - How much time did it take to process 4M+ events (In Spark UI,you can
   look at Duration) column.
   - I believe Reader is Quite fast ,however Processing could be slower ,if
   you click on the Job,it gives you break down of execution,Result
   Serialization etc ,you may want to look at that and drive from there.


Regards,
Shahbaz

On Sun, Mar 6, 2016 at 9:26 PM, Vinti Maheshwari 
wrote:

> I have 2 machines in my cluster with the below specifications:
> 128 GB RAM and 8 cores machine
>
> Regards,
> ~Vinti
>
> On Sun, Mar 6, 2016 at 7:54 AM, Vinti Maheshwari 
> wrote:
>
>> Thanks Supreeth and Shahbaz. I will try adding
>> spark.streaming.kafka.maxRatePerPartition.
>>
>> Hi Shahbaz,
>>
>> Please see comments, inline:
>>
>>
>>- Which version of Spark you are using. ==> *1.5.2*
>>- How big is the Kafka Cluster ==> *2 brokers*
>>- What is the Message Size and type.==>
>> *String, 9,550 bytes (around) *
>>- How big is the spark cluster (How many executors ,How many cores
>>Per Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
>>- What does your Spark Job looks like ==>
>>
>>
>>val messages = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](
>>  ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)
>>
>>
>>  val parsedStream = inputStream
>>.map(line => {
>>  val splitLines = line.split(",")
>>  (splitLines(1), splitLines.slice(2, 
>> splitLines.length).map((_.trim.toLong)))
>>})
>>
>>  val state: DStream[(String, Array[Long])] = 
>> parsedStream.updateStateByKey(
>>(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>>  prev.map(_ +: current).orElse(Some(current))
>>.flatMap(as => Try(as.map(BDV(_)).reduce(_ + 
>> _).toArray).toOption)
>>})
>>  state.checkpoint(Duration(25000))
>>  state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
>>  ssc
>>}
>>
>>
>> spark.streaming.backpressure.enabled set it to true and try?
>>  ==>
>>
>>
>> *yes, i had enabled it.*
>> Regards,
>> ~Vinti
>>
>> On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz  wrote:
>>
>>> Hello,
>>>
>>>- Which version of Spark you are using.
>>>- How big is the Kafka Cluster
>>>- What is the Message Size and type.
>>>- How big is the spark cluster (How many executors ,How many cores
>>>Per Executor)
>>>- What does your Spark Job looks like .
>>>
>>> spark.streaming.backpressure.enabled set it to true and try?
>>>
>>>
>>> Regards,
>>> Shahbaz
>>> +91-9986850670
>>>
>>> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth 
>>> wrote:
>>>
 Try setting spark.streaming.kafka.maxRatePerPartition, this can help
 control the number of messages read from Kafka per partition on the spark
 streaming consumer.

 -S


 On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari 
 wrote:

 Hello,

 I am trying to figure out why my kafka+spark job is running slow. I
 found that spark is consuming all the messages out of kafka into a single
 batch itself and not sending any messages to the other batches.

 2016/03/05 21:57:05
 
 0 events - - queued 2016/03/05 21:57:00
 
 0 events - - queued 2016/03/05 21:56:55
 
 0 events - - queued 2016/03/05 21:56:50
 
 0 events - - queued 2016/03/05 21:56:45
 
 0 events - - queued 2016/03/05 21:56:40
 
 4039573 events 6 ms - processing

 Does anyone know how this behavior can be changed so that the number of
 messages are load balanced across all the batches?

 Thanks,
 Vinti


>>>
>>
>


Re: Is Spark right for us?

2016-03-06 Thread Gourav Sengupta
Hi,

SPARK is just tooling, and its even not tooling. You can consider SPARK a
distributed operating system like YARN. You should read books like HADOOP
Application Architecture, Big Data (Nathan Marz) and other disciplines
before starting to consider how the solution is built.

Most of the big data projects (like any other BI projects) do not deliver
value or turn extremely expensive to maintain because the approach is that
tools solve the problem.


Regards,
Gourav Sengupta

On Sun, Mar 6, 2016 at 5:25 PM, Guillaume Bilodeau <
guillaume.bilod...@gmail.com> wrote:

> The data is currently stored in a relational database, but a migration to
> a document-oriented database such as MongoDb is something we are definitely
> considering.  How does this factor in?
>
> On Sun, Mar 6, 2016 at 12:23 PM, Gourav Sengupta <
> gourav.sengu...@gmail.com> wrote:
>
>> Hi,
>>
>> That depends on a lot of things, but as a starting point I would ask
>> whether you are planning to store your data in JSON format?
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Sun, Mar 6, 2016 at 5:17 PM, Laumegui Deaulobi <
>> guillaume.bilod...@gmail.com> wrote:
>>
>>> Our problem space is survey analytics.  Each survey comprises a set of
>>> questions, with each question having a set of possible answers.  Survey
>>> fill-out tasks are sent to users, who have until a certain date to
>>> complete
>>> it.  Based on these survey fill-outs, reports need to be generated.  Each
>>> report deals with a subset of the survey fill-outs, and comprises a set
>>> of
>>> data points (average rating for question 1, min/max for question 2, etc.)
>>>
>>> We are dealing with rather large data sets - although reading the
>>> internet
>>> we get the impression that everyone is analyzing petabytes of data...
>>>
>>> Users: up to 100,000
>>> Surveys: up to 100,000
>>> Questions per survey: up to 100
>>> Possible answers per question: up to 10
>>> Survey fill-outs / user: up to 10
>>> Reports: up to 100,000
>>> Data points per report: up to 100
>>>
>>> Data is currently stored in a relational database but a migration to a
>>> different kind of store is possible.
>>>
>>> The naive algorithm for report generation can be summed up as this:
>>>
>>> for each report to be generated {
>>>   for each report data point to be calculated {
>>> calculate data point
>>> add data point to report
>>>   }
>>>   publish report
>>> }
>>>
>>> In order to deal with the upper limits of these values, we will need to
>>> distribute this algorithm to a compute / data cluster as much as
>>> possible.
>>>
>>> I've read about frameworks such as Apache Spark but also Hadoop,
>>> GridGain,
>>> HazelCast and several others, and am still confused as to how each of
>>> these
>>> can help us and how they fit together.
>>>
>>> Is Spark the right framework for us?
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Streaming UI tab misleading for window operations

2016-03-06 Thread Jatin Kumar
Thanks Ted! I have created https://issues.apache.org/jira/browse/SPARK-13707
JIRA ticket.

As I commented, I would like to work on the fix if we can decide what
should the correct behavior be.

--
Thanks
Jatin Kumar | Rocket Scientist
+91-7696741743 m

On Sun, Mar 6, 2016 at 11:30 PM, Ted Yu  wrote:

> Suggest logging a new issue with details provided in this thread.
>
> Thanks
>
> On Sun, Mar 6, 2016 at 9:56 AM, Jatin Kumar 
> wrote:
>
>> Hello Ted,
>>
>> The JIRA you pointed is a different issue. Here are more details of the
>> issue I am talking about:
>>
>> Consider code block this:
>>
>> val streamingContext = new StreamingContext(sparkConfig, Seconds(2))
>>
>> val totalVideoImps = streamingContext.sparkContext.accumulator(0, 
>> "TotalVideoImpressions")
>> val totalImps = streamingContext.sparkContext.accumulator(0, 
>> "TotalImpressions")
>>
>> val stream = KafkaReader.KafkaDirectStream(streamingContext)
>> stream.map(KafkaAdLogParser.parseAdLogRecord)
>>   .filter(record => {
>> totalImps += 1
>> KafkaAdLogParser.isVideoRecord(record)
>>   })
>>   .map(record => {
>> totalVideoImps += 1
>> record.url
>>   })
>>   .window(Seconds(120))
>>   .count().foreachRDD((rdd, time) => {
>>   println("Timestamp: " + 
>> ImpressionAggregator.millsToDate(time.milliseconds))
>>   println("Count: " + rdd.collect()(0))
>>   println("Total Impressions: " + totalImps.value)
>>   totalImps.setValue(0)
>>   println("Total Video Impressions: " + totalVideoImps.value)
>>   totalVideoImps.setValue(0)
>> })
>> streamingContext.start()
>> streamingContext.awaitTermination()
>>
>>
>> Batch Size before window operation is 2 sec and then after window batches
>> are of 120 seconds each. Now the output of the above program for first 2
>> batches of 120 sec each is:
>>
>> Timestamp: 2016-03-06 12:02:56,000
>> Count: 362195
>> Total Impressions: 16882431
>> Total Video Impressions: 362195
>>
>> Timestamp: 2016-03-06 12:04:56,000
>> Count: 367168
>> Total Impressions: 19480293
>> Total Video Impressions: 367168
>>
>> Timestamp: 2016-03-06 12:06:56,000
>> Count: 177711
>> Total Impressions: 10196677
>> Total Video Impressions: 177711
>>
>> Whereas the spark UI shows different numbers as attached in the image.
>> Also if we check the start and end index of kafka partition offsets
>> reported by subsequent batch entries on UI, they do not result in all
>> overall continuous range. All numbers are fine if we remove the window
>> operation though.
>>
>> I think this is a bug and I couldn't find any existing bug regarding this.
>>
>> --
>> Thanks
>> Jatin
>>
>> On Sun, Mar 6, 2016 at 8:29 PM, Ted Yu  wrote:
>>
>>> Have you taken a look at SPARK-12739 ?
>>>
>>> FYI
>>>
>>> On Sun, Mar 6, 2016 at 4:06 AM, Jatin Kumar <
>>> jku...@rocketfuelinc.com.invalid> wrote:
>>>
 Hello all,

 Consider following two code blocks:

 val ssc = new StreamingContext(sparkConfig, Seconds(2))
 val stream = KafkaUtils.createDirectStream(...)

 a) stream.filter(filterFunc).count().foreachRDD(rdd =>
 println(rdd.collect()))
 b) stream.filter(filterFunc).window(Seconds(60),
 Seconds(60)).count().foreachRDD(rdd => println(rdd.collect()))

 I have observed that in case
 a) the UI behaves correctly and numbers reported for each batch are
 correct.
 b) UI reports numbers every 60 seconds but the batch-id/input-size etc
 are for the 2 sec batch after every 60 seconds i.e. 30th batch, 60th batch
 etc. These numbers become totally useless, infact confusing in this case
 though the delay/processing-time numbers are still helpful.

 Is someone working on a fix to show aggregated numbers which will be
 more useful?

 --
 Thanks
 Jatin

>>>
>>>
>>
>


Re: Is Spark right for us?

2016-03-06 Thread Gourav Sengupta
Hi,

That depends on a lot of things, but as a starting point I would ask
whether you are planning to store your data in JSON format?


Regards,
Gourav Sengupta

On Sun, Mar 6, 2016 at 5:17 PM, Laumegui Deaulobi <
guillaume.bilod...@gmail.com> wrote:

> Our problem space is survey analytics.  Each survey comprises a set of
> questions, with each question having a set of possible answers.  Survey
> fill-out tasks are sent to users, who have until a certain date to complete
> it.  Based on these survey fill-outs, reports need to be generated.  Each
> report deals with a subset of the survey fill-outs, and comprises a set of
> data points (average rating for question 1, min/max for question 2, etc.)
>
> We are dealing with rather large data sets - although reading the internet
> we get the impression that everyone is analyzing petabytes of data...
>
> Users: up to 100,000
> Surveys: up to 100,000
> Questions per survey: up to 100
> Possible answers per question: up to 10
> Survey fill-outs / user: up to 10
> Reports: up to 100,000
> Data points per report: up to 100
>
> Data is currently stored in a relational database but a migration to a
> different kind of store is possible.
>
> The naive algorithm for report generation can be summed up as this:
>
> for each report to be generated {
>   for each report data point to be calculated {
> calculate data point
> add data point to report
>   }
>   publish report
> }
>
> In order to deal with the upper limits of these values, we will need to
> distribute this algorithm to a compute / data cluster as much as possible.
>
> I've read about frameworks such as Apache Spark but also Hadoop, GridGain,
> HazelCast and several others, and am still confused as to how each of these
> can help us and how they fit together.
>
> Is Spark the right framework for us?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: how to implements a distributed system ?

2016-03-06 Thread Ted Yu
w.r.t. akka, please see the following:

[SPARK-7997][CORE] Remove Akka from Spark Core and Streaming

There're various ways to design distributed system. Can you outline what
your program does ?

Cheers

On Sun, Mar 6, 2016 at 8:35 AM, Minglei Zhang  wrote:

> hello, experts
>
> Suppose I have a local machine with a program contains a function with
> main function, in java it is public static void main, in scala ,it's def
> object extends App. But now, I want make it to a distributed program, how
> can I do and make it possible? Maybe it is a direct and simple question I
> ask. But I want to know it's idea. I think may be it's the right way to
> implement it with some components called Master and  Work and Client just
> like in Spark Project. And these components's talk to each other with the
> akka mechanism. Am I right ? Experts ?
>
> thanks,
> Minglei.
>


Is Spark right for us?

2016-03-06 Thread Laumegui Deaulobi
Our problem space is survey analytics.  Each survey comprises a set of
questions, with each question having a set of possible answers.  Survey
fill-out tasks are sent to users, who have until a certain date to complete
it.  Based on these survey fill-outs, reports need to be generated.  Each
report deals with a subset of the survey fill-outs, and comprises a set of
data points (average rating for question 1, min/max for question 2, etc.)

We are dealing with rather large data sets - although reading the internet
we get the impression that everyone is analyzing petabytes of data...

Users: up to 100,000
Surveys: up to 100,000
Questions per survey: up to 100
Possible answers per question: up to 10
Survey fill-outs / user: up to 10
Reports: up to 100,000
Data points per report: up to 100

Data is currently stored in a relational database but a migration to a
different kind of store is possible.

The naive algorithm for report generation can be summed up as this:

for each report to be generated {
  for each report data point to be calculated {
calculate data point
add data point to report
  }
  publish report
}

In order to deal with the upper limits of these values, we will need to
distribute this algorithm to a compute / data cluster as much as possible.

I've read about frameworks such as Apache Spark but also Hadoop, GridGain,
HazelCast and several others, and am still confused as to how each of these
can help us and how they fit together.

Is Spark the right framework for us?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-Spark-right-for-us-tp26412.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



how to implements a distributed system ?

2016-03-06 Thread Minglei Zhang
hello, experts

Suppose I have a local machine with a program contains a function with main
function, in java it is public static void main, in scala ,it's def object
extends App. But now, I want make it to a distributed program, how can I do
and make it possible? Maybe it is a direct and simple question I ask. But I
want to know it's idea. I think may be it's the right way to implement it
with some components called Master and  Work and Client just like in Spark
Project. And these components's talk to each other with the akka mechanism.
Am I right ? Experts ?

thanks,
Minglei.


how to implement ALS with csv file? getting error while calling Rating class

2016-03-06 Thread Shishir Anshuman
I am new to apache Spark, and I want to implement the Alternating Least
Squares algorithm. The data set is stored in a csv file in the format:
*Name,Value1,Value2*.

When I read the csv file, I get
*java.lang.NumberFormatException.forInputString* error because the Rating
class needs the parameters in the format: *(user: Int, product: Int,
rating: Double)* and the first column of my file contains *Name*.

Please suggest me a way to overcome this issue.


Re: Spark + Kafka all messages being used in 1 batch

2016-03-06 Thread Vinti Maheshwari
I have 2 machines in my cluster with the below specifications:
128 GB RAM and 8 cores machine

Regards,
~Vinti

On Sun, Mar 6, 2016 at 7:54 AM, Vinti Maheshwari 
wrote:

> Thanks Supreeth and Shahbaz. I will try adding
> spark.streaming.kafka.maxRatePerPartition.
>
> Hi Shahbaz,
>
> Please see comments, inline:
>
>
>- Which version of Spark you are using. ==> *1.5.2*
>- How big is the Kafka Cluster ==> *2 brokers*
>- What is the Message Size and type.==>
> *String, 9,550 bytes (around) *
>- How big is the spark cluster (How many executors ,How many cores Per
>Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
>- What does your Spark Job looks like ==>
>
>
>val messages = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](
>  ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)
>
>
>  val parsedStream = inputStream
>.map(line => {
>  val splitLines = line.split(",")
>  (splitLines(1), splitLines.slice(2, 
> splitLines.length).map((_.trim.toLong)))
>})
>
>  val state: DStream[(String, Array[Long])] = 
> parsedStream.updateStateByKey(
>(current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
>  prev.map(_ +: current).orElse(Some(current))
>.flatMap(as => Try(as.map(BDV(_)).reduce(_ + 
> _).toArray).toOption)
>})
>  state.checkpoint(Duration(25000))
>  state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
>  ssc
>}
>
>
> spark.streaming.backpressure.enabled set it to true and try?
>  ==>
>
>
> *yes, i had enabled it.*
> Regards,
> ~Vinti
>
> On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz  wrote:
>
>> Hello,
>>
>>- Which version of Spark you are using.
>>- How big is the Kafka Cluster
>>- What is the Message Size and type.
>>- How big is the spark cluster (How many executors ,How many cores
>>Per Executor)
>>- What does your Spark Job looks like .
>>
>> spark.streaming.backpressure.enabled set it to true and try?
>>
>>
>> Regards,
>> Shahbaz
>> +91-9986850670
>>
>> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth  wrote:
>>
>>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help
>>> control the number of messages read from Kafka per partition on the spark
>>> streaming consumer.
>>>
>>> -S
>>>
>>>
>>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari 
>>> wrote:
>>>
>>> Hello,
>>>
>>> I am trying to figure out why my kafka+spark job is running slow. I
>>> found that spark is consuming all the messages out of kafka into a single
>>> batch itself and not sending any messages to the other batches.
>>>
>>> 2016/03/05 21:57:05
>>> 
>>> 0 events - - queued 2016/03/05 21:57:00
>>> 
>>> 0 events - - queued 2016/03/05 21:56:55
>>> 
>>> 0 events - - queued 2016/03/05 21:56:50
>>> 
>>> 0 events - - queued 2016/03/05 21:56:45
>>> 
>>> 0 events - - queued 2016/03/05 21:56:40
>>> 
>>> 4039573 events 6 ms - processing
>>>
>>> Does anyone know how this behavior can be changed so that the number of
>>> messages are load balanced across all the batches?
>>>
>>> Thanks,
>>> Vinti
>>>
>>>
>>
>


Re: Spark + Kafka all messages being used in 1 batch

2016-03-06 Thread Vinti Maheshwari
Thanks Supreeth and Shahbaz. I will try adding
spark.streaming.kafka.maxRatePerPartition.

Hi Shahbaz,

Please see comments, inline:


   - Which version of Spark you are using. ==> *1.5.2*
   - How big is the Kafka Cluster ==> *2 brokers*
   - What is the Message Size and type.==>
*String, 9,550 bytes (around) *
   - How big is the spark cluster (How many executors ,How many cores Per
   Executor)==>* 2 Nodes, 16 executors, 1 core per executor*
   - What does your Spark Job looks like ==>


   val messages = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](
 ssc, kafkaParams, topicsSet)val inputStream = messages.map(_._2)


 val parsedStream = inputStream
   .map(line => {
 val splitLines = line.split(",")
 (splitLines(1), splitLines.slice(2,
splitLines.length).map((_.trim.toLong)))
   })

 val state: DStream[(String, Array[Long])] =
parsedStream.updateStateByKey(
   (current: Seq[Array[Long]], prev: Option[Array[Long]]) =>  {
 prev.map(_ +: current).orElse(Some(current))
   .flatMap(as => Try(as.map(BDV(_)).reduce(_ +
_).toArray).toOption)
   })
 state.checkpoint(Duration(25000))
 state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) //saving to Hbase
 ssc
   }


spark.streaming.backpressure.enabled set it to true and try?
 ==>


*yes, i had enabled it.*
Regards,
~Vinti

On Sat, Mar 5, 2016 at 11:16 PM, Shahbaz  wrote:

> Hello,
>
>- Which version of Spark you are using.
>- How big is the Kafka Cluster
>- What is the Message Size and type.
>- How big is the spark cluster (How many executors ,How many cores Per
>Executor)
>- What does your Spark Job looks like .
>
> spark.streaming.backpressure.enabled set it to true and try?
>
>
> Regards,
> Shahbaz
> +91-9986850670
>
> On Sun, Mar 6, 2016 at 12:19 PM, Supreeth  wrote:
>
>> Try setting spark.streaming.kafka.maxRatePerPartition, this can help
>> control the number of messages read from Kafka per partition on the spark
>> streaming consumer.
>>
>> -S
>>
>>
>> On Mar 5, 2016, at 10:02 PM, Vinti Maheshwari 
>> wrote:
>>
>> Hello,
>>
>> I am trying to figure out why my kafka+spark job is running slow. I found
>> that spark is consuming all the messages out of kafka into a single batch
>> itself and not sending any messages to the other batches.
>>
>> 2016/03/05 21:57:05
>> 
>> 0 events - - queued 2016/03/05 21:57:00
>> 
>> 0 events - - queued 2016/03/05 21:56:55
>> 
>> 0 events - - queued 2016/03/05 21:56:50
>> 
>> 0 events - - queued 2016/03/05 21:56:45
>> 
>> 0 events - - queued 2016/03/05 21:56:40
>> 
>> 4039573 events 6 ms - processing
>>
>> Does anyone know how this behavior can be changed so that the number of
>> messages are load balanced across all the batches?
>>
>> Thanks,
>> Vinti
>>
>>
>


Re: Streaming UI tab misleading for window operations

2016-03-06 Thread Ted Yu
Have you taken a look at SPARK-12739 ?

FYI

On Sun, Mar 6, 2016 at 4:06 AM, Jatin Kumar <
jku...@rocketfuelinc.com.invalid> wrote:

> Hello all,
>
> Consider following two code blocks:
>
> val ssc = new StreamingContext(sparkConfig, Seconds(2))
> val stream = KafkaUtils.createDirectStream(...)
>
> a) stream.filter(filterFunc).count().foreachRDD(rdd =>
> println(rdd.collect()))
> b) stream.filter(filterFunc).window(Seconds(60),
> Seconds(60)).count().foreachRDD(rdd => println(rdd.collect()))
>
> I have observed that in case
> a) the UI behaves correctly and numbers reported for each batch are
> correct.
> b) UI reports numbers every 60 seconds but the batch-id/input-size etc are
> for the 2 sec batch after every 60 seconds i.e. 30th batch, 60th batch etc.
> These numbers become totally useless, infact confusing in this case though
> the delay/processing-time numbers are still helpful.
>
> Is someone working on a fix to show aggregated numbers which will be more
> useful?
>
> --
> Thanks
> Jatin
>


Re: How can I pass a Data Frame from object to another class

2016-03-06 Thread Mich Talebzadeh
It would be interesting why these contexts are not available in the JVM
outside of the class they were instigated (created).

For example  we could initialize an application with two threads as follows
in the main method

  val conf = new SparkConf().
   setAppName("Harness4").
   setMaster("local[12]").
   set("spark.driver.allowMultipleContexts", "true")
  val sc = new SparkContext(conf)

So any following class should see "sc" correct. However I will have to pass
it as parameter to the method of that following class!

class FirstQuery {
   def firstquerym(*sc: org.apache.spark.SparkContext*, rs:
org.apache.spark.sql.DataFrame) {
   val sqlContext = SQLContext.getOrCreate(sc)
...
  }
}

Otherwise it throws an error that "sc" does not exit

Thanks

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 6 March 2016 at 11:48, Gourav Sengupta  wrote:

> Hi Ted/ Holden,
>
> I had read a section in the book Learning Spark which advised against
> passing entire objects to SPARK instead of just functions (ref: page 30
> passing functions to SPARK).
>
> Is the above way of solving problem not going against it? It will be
> exciting to see your kind explanation.
>
>
> Regards,
> Gourav Sengupta
>
> On Sun, Mar 6, 2016 at 10:57 AM, Mich Talebzadeh <
> mich.talebza...@gmail.com> wrote:
>
>> Thanks for this tip
>>
>> The way I do it is to pass SparckContext "sc" to method
>> firstquery.firstquerym by calling the following
>>
>> val firstquery =  new FirstQuery
>> firstquery.firstquerym(sc, rs)
>>
>>
>> And creating the method as follows:
>>
>> class FirstQuery {
>>def firstquerym(sc: org.apache.spark.SparkContext, rs:
>> org.apache.spark.sql.DataFrame) {
>>val sqlContext = SQLContext.getOrCreate(sc)
>>println ("\nfirst query at"); sqlContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>>   val rs1 =
>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>   }
>> }
>>
>> This works. However, I don't seem to invoke getOrCreate without passing
>> sc?
>>
>> Is this the way you are implying. Also why "sc" is not available within
>> the life of JVM please
>>
>> Thanks
>>
>>
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 6 March 2016 at 01:25, Ted Yu  wrote:
>>
>>> Looking at the methods you call on HiveContext, they seem to belong
>>> to SQLContext.
>>>
>>> For SQLContext, you can use the below method of SQLContext in FirstQuery
>>> to retrieve SQLContext:
>>>
>>>   def getOrCreate(sparkContext: SparkContext): SQLContext = {
>>>
>>> FYI
>>>
>>> On Sat, Mar 5, 2016 at 3:37 PM, Mich Talebzadeh <
>>> mich.talebza...@gmail.com> wrote:
>>>
 I managed to sort this one out.

 The class should be defined as below with its method accepting two
 input parameters for HiveContext and rs as below

 class FirstQuery {
def firstquerym(HiveContext: org.apache.spark.sql.hive.HiveContext,
 rs: org.apache.spark.sql.DataFrame) {
println ("\nfirst query at"); HiveContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
   val rs1 =
 rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
   }
 }

 and called from the main method as follows:

 val firstquery =  new FirstQuery
 firstquery.firstquerym(HiveContext, rs)


 Thanks


 Dr Mich Talebzadeh



 LinkedIn * 
 https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
 *



 http://talebzadehmich.wordpress.com



 On 5 March 2016 at 20:56, Mich Talebzadeh 
 wrote:

> Hi,
>
> I can use sbt to compile and run the following code. It works without
> any problem.
>
> I want to divide this into the obj and another class. I would like to
> do the result set joining tables identified by Data Frame 'rs' and then
> calls the method "firstquerym" in the class FirstQuery to do the
> calculation identified as "rs1"
>
> Now it needs "rs" to be available in class FrstQuery. Two questions
> please
>
>
>1. How can I pass rs to class FirstQuery
>2. Is there a better way of modularising this work so I can use
>methods defined in another class to 

Re: Spark SQL drops the HIVE table in "overwrite" mode while writing into table

2016-03-06 Thread Dhaval Modi
Hi Gourav,

I am trying to overwrite existing managed/internal table.

I havent register dataframe,  so it's not a temporary table.  BTW,  I have
added code in JIRA as comment.

Thanks,
Dhaval
On Mar 6, 2016 17:07, "Gourav Sengupta"  wrote:

> hi,
>
> is the table that you are trying to overwrite an external table or
> temporary table created in hivecontext?
>
>
> Regards,
> Gourav Sengupta
>
> On Sat, Mar 5, 2016 at 3:01 PM, Dhaval Modi 
> wrote:
>
>> Hi Team,
>>
>> I am facing a issue while writing dataframe back to HIVE table.
>>
>> When using "SaveMode.Overwrite" option the table is getting dropped and
>> Spark is unable to recreate it thus throwing error.
>>
>> JIRA: https://issues.apache.org/jira/browse/SPARK-13699
>>
>>
>> E.g.
>> tgtFinal.write.mode(SaveMode.Overwrite).saveAsTable("tgt_table")
>>
>> Error:
>> ++
>> 16/03/05 13:57:26 INFO spark.SparkContext: Created broadcast 138 from run
>> at ThreadPoolExecutor.java:1145
>> 16/03/05 13:57:26 INFO log.PerfLogger: > from=org.apache.hadoop.hive.ql.io.orc.ReaderImpl>
>> *java.lang.RuntimeException: serious problem*
>> *at *
>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
>> at
>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
>> at
>> org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
>> at scala.Option.getOrElse(Option.scala:120)
>> 
>> Caused by: java.util.concurrent.ExecutionException:
>> java.io.FileNotFoundException: File does not exist: hdfs://
>> sandbox.hortonworks.com:8020/apps/hive/warehouse/tgt_table
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:188)
>> at
>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:998)
>> ... 138 more
>> *Caused by: java.io.FileNotFoundException: File does not exist:
>> hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/tgt_table
>> *
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
>> ++
>>
>>
>> Regards,
>> Dhaval Modi
>> dhavalmod...@gmail.com
>>
>
>


Streaming UI tab misleading for window operations

2016-03-06 Thread Jatin Kumar
Hello all,

Consider following two code blocks:

val ssc = new StreamingContext(sparkConfig, Seconds(2))
val stream = KafkaUtils.createDirectStream(...)

a) stream.filter(filterFunc).count().foreachRDD(rdd =>
println(rdd.collect()))
b) stream.filter(filterFunc).window(Seconds(60),
Seconds(60)).count().foreachRDD(rdd => println(rdd.collect()))

I have observed that in case
a) the UI behaves correctly and numbers reported for each batch are correct.
b) UI reports numbers every 60 seconds but the batch-id/input-size etc are
for the 2 sec batch after every 60 seconds i.e. 30th batch, 60th batch etc.
These numbers become totally useless, infact confusing in this case though
the delay/processing-time numbers are still helpful.

Is someone working on a fix to show aggregated numbers which will be more
useful?

--
Thanks
Jatin


Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append

2016-03-06 Thread Ted Yu
Thanks for the clarification, Gourav. 

> On Mar 6, 2016, at 3:54 AM, Gourav Sengupta  wrote:
> 
> Hi Ted,
> 
> There was no idle time after I changed the path to start with s3a and then 
> ensured that the number of executors writing were large. The writes start and 
> complete in about 5 mins or less. 
> 
> Initially the write used to complete by around 30 mins and we could see that 
> there were failure messages all over the place for another 20 mins after 
> which we killed jupyter application. 
> 
> 
> Regards,
> Gourav Sengupta 
> 
>> On Sun, Mar 6, 2016 at 11:48 AM, Ted Yu  wrote:
>> Gourav:
>> For the 3rd paragraph, did you mean the job seemed to be idle for about 5 
>> minutes ?
>> 
>> Cheers
>> 
>>> On Mar 6, 2016, at 3:35 AM, Gourav Sengupta  
>>> wrote:
>>> 
>>> Hi,
>>> 
>>> This is a solved problem, try using s3a instead and everything will be fine.
>>> 
>>> Besides that you might want to use coalesce or  partitionby or repartition 
>>> in order to see how many executors are being used to write (that speeds 
>>> things up quite a bit).
>>> 
>>> We had a write issue taking close to 50 min which is not running for lower 
>>> than 5 minutes.
>>> 
>>> 
>>> Regards,
>>> Gourav Sengupta 
>>> 
 On Fri, Mar 4, 2016 at 8:59 PM, Jelez Raditchkov  wrote:
 Working on a streaming job with DirectParquetOutputCommitter to S3
 I need to use PartitionBy and hence SaveMode.Append
 
 Apparently when using SaveMode.Append spark automatically defaults to the 
 default parquet output committer and ignores DirectParquetOutputCommitter.
 
 My problems are:
 1. the copying to _temporary takes alot of time
 2. I get job failures with: java.io.FileNotFoundException: File 
 s3n://jelez/parquet-data/_temporary/0/task_201603040904_0544_m_07 does 
 not exist.
 
 I have set:
 sparkConfig.set("spark.speculation", "false")
 sc.hadoopConfiguration.set("mapreduce.map.speculative", "false") 
 sc.hadoopConfiguration.set("mapreduce.reduce.speculative", 
 "false") 
 
 Any ideas? Opinions? Best practices?
> 


Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append

2016-03-06 Thread Gourav Sengupta
Hi Ted,

There was no idle time after I changed the path to start with s3a and then
ensured that the number of executors writing were large. The writes start
and complete in about 5 mins or less.

Initially the write used to complete by around 30 mins and we could see
that there were failure messages all over the place for another 20 mins
after which we killed jupyter application.


Regards,
Gourav Sengupta

On Sun, Mar 6, 2016 at 11:48 AM, Ted Yu  wrote:

> Gourav:
> For the 3rd paragraph, did you mean the job seemed to be idle for about 5
> minutes ?
>
> Cheers
>
> On Mar 6, 2016, at 3:35 AM, Gourav Sengupta 
> wrote:
>
> Hi,
>
> This is a solved problem, try using s3a instead and everything will be
> fine.
>
> Besides that you might want to use coalesce or  partitionby or repartition
> in order to see how many executors are being used to write (that speeds
> things up quite a bit).
>
> We had a write issue taking close to 50 min which is not running for lower
> than 5 minutes.
>
>
> Regards,
> Gourav Sengupta
>
> On Fri, Mar 4, 2016 at 8:59 PM, Jelez Raditchkov 
> wrote:
>
>> Working on a streaming job with DirectParquetOutputCommitter to S3
>> I need to use PartitionBy and hence SaveMode.Append
>>
>> Apparently when using SaveMode.Append spark automatically defaults to the
>> default parquet output committer and ignores DirectParquetOutputCommitter.
>>
>> My problems are:
>> 1. the copying to _temporary takes alot of time
>> 2. I get job failures with: java.io.FileNotFoundException: File
>> s3n://jelez/parquet-data/_temporary/0/task_201603040904_0544_m_07 does
>> not exist.
>>
>> I have set:
>> sparkConfig.set("spark.speculation", "false")
>> sc.hadoopConfiguration.set("mapreduce.map.speculative", "false")
>> sc.hadoopConfiguration.set("mapreduce.reduce.speculative",
>> "false")
>>
>> Any ideas? Opinions? Best practices?
>>
>>
>


Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append

2016-03-06 Thread Ted Yu
Gourav:
For the 3rd paragraph, did you mean the job seemed to be idle for about 5 
minutes ?

Cheers

> On Mar 6, 2016, at 3:35 AM, Gourav Sengupta  wrote:
> 
> Hi,
> 
> This is a solved problem, try using s3a instead and everything will be fine.
> 
> Besides that you might want to use coalesce or  partitionby or repartition in 
> order to see how many executors are being used to write (that speeds things 
> up quite a bit).
> 
> We had a write issue taking close to 50 min which is not running for lower 
> than 5 minutes.
> 
> 
> Regards,
> Gourav Sengupta 
> 
>> On Fri, Mar 4, 2016 at 8:59 PM, Jelez Raditchkov  wrote:
>> Working on a streaming job with DirectParquetOutputCommitter to S3
>> I need to use PartitionBy and hence SaveMode.Append
>> 
>> Apparently when using SaveMode.Append spark automatically defaults to the 
>> default parquet output committer and ignores DirectParquetOutputCommitter.
>> 
>> My problems are:
>> 1. the copying to _temporary takes alot of time
>> 2. I get job failures with: java.io.FileNotFoundException: File 
>> s3n://jelez/parquet-data/_temporary/0/task_201603040904_0544_m_07 does 
>> not exist.
>> 
>> I have set:
>> sparkConfig.set("spark.speculation", "false")
>> sc.hadoopConfiguration.set("mapreduce.map.speculative", "false") 
>> sc.hadoopConfiguration.set("mapreduce.reduce.speculative", "false") 
>> 
>> Any ideas? Opinions? Best practices?
> 


Re: How can I pass a Data Frame from object to another class

2016-03-06 Thread Gourav Sengupta
Hi Ted/ Holden,

I had read a section in the book Learning Spark which advised against
passing entire objects to SPARK instead of just functions (ref: page 30
passing functions to SPARK).

Is the above way of solving problem not going against it? It will be
exciting to see your kind explanation.


Regards,
Gourav Sengupta

On Sun, Mar 6, 2016 at 10:57 AM, Mich Talebzadeh 
wrote:

> Thanks for this tip
>
> The way I do it is to pass SparckContext "sc" to method
> firstquery.firstquerym by calling the following
>
> val firstquery =  new FirstQuery
> firstquery.firstquerym(sc, rs)
>
>
> And creating the method as follows:
>
> class FirstQuery {
>def firstquerym(sc: org.apache.spark.SparkContext, rs:
> org.apache.spark.sql.DataFrame) {
>val sqlContext = SQLContext.getOrCreate(sc)
>println ("\nfirst query at"); sqlContext.sql("SELECT
> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
> ").collect.foreach(println)
>   val rs1 =
> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>   }
> }
>
> This works. However, I don't seem to invoke getOrCreate without passing sc?
>
> Is this the way you are implying. Also why "sc" is not available within
> the life of JVM please
>
> Thanks
>
>
>
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> *
>
>
>
> http://talebzadehmich.wordpress.com
>
>
>
> On 6 March 2016 at 01:25, Ted Yu  wrote:
>
>> Looking at the methods you call on HiveContext, they seem to belong
>> to SQLContext.
>>
>> For SQLContext, you can use the below method of SQLContext in FirstQuery
>> to retrieve SQLContext:
>>
>>   def getOrCreate(sparkContext: SparkContext): SQLContext = {
>>
>> FYI
>>
>> On Sat, Mar 5, 2016 at 3:37 PM, Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> I managed to sort this one out.
>>>
>>> The class should be defined as below with its method accepting two input
>>> parameters for HiveContext and rs as below
>>>
>>> class FirstQuery {
>>>def firstquerym(HiveContext: org.apache.spark.sql.hive.HiveContext,
>>> rs: org.apache.spark.sql.DataFrame) {
>>>println ("\nfirst query at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>>   val rs1 =
>>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>>   }
>>> }
>>>
>>> and called from the main method as follows:
>>>
>>> val firstquery =  new FirstQuery
>>> firstquery.firstquerym(HiveContext, rs)
>>>
>>>
>>> Thanks
>>>
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> *
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>>
>>> On 5 March 2016 at 20:56, Mich Talebzadeh 
>>> wrote:
>>>
 Hi,

 I can use sbt to compile and run the following code. It works without
 any problem.

 I want to divide this into the obj and another class. I would like to
 do the result set joining tables identified by Data Frame 'rs' and then
 calls the method "firstquerym" in the class FirstQuery to do the
 calculation identified as "rs1"

 Now it needs "rs" to be available in class FrstQuery. Two questions
 please


1. How can I pass rs to class FirstQuery
2. Is there a better way of modularising this work so I can use
methods defined in another class to be called in main method

 Thanks

 import org.apache.spark.SparkContext
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.functions._
 //
 object Harness4 {
   def main(args: Array[String]) {
   val conf = new
 SparkConf().setAppName("Harness4").setMaster("local[*]").set("spark.driver.allowMultipleContexts",
 "true")
   val sc = new SparkContext(conf)
   // Note that this should be done only after an instance of
 org.apache.spark.sql.SQLContext is created. It should be written as:
   val sqlContext= new org.apache.spark.sql.SQLContext(sc)
   import sqlContext.implicits._
   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
 println ("\nStarted at"); HiveContext.sql("SELECT
 FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
 ").collect.foreach(println)
 HiveContext.sql("use oraclehadoop")
 var s =
 HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
 val c =
 HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")

Re: Spark SQL drops the HIVE table in "overwrite" mode while writing into table

2016-03-06 Thread Gourav Sengupta
hi,

is the table that you are trying to overwrite an external table or
temporary table created in hivecontext?


Regards,
Gourav Sengupta

On Sat, Mar 5, 2016 at 3:01 PM, Dhaval Modi  wrote:

> Hi Team,
>
> I am facing a issue while writing dataframe back to HIVE table.
>
> When using "SaveMode.Overwrite" option the table is getting dropped and
> Spark is unable to recreate it thus throwing error.
>
> JIRA: https://issues.apache.org/jira/browse/SPARK-13699
>
>
> E.g.
> tgtFinal.write.mode(SaveMode.Overwrite).saveAsTable("tgt_table")
>
> Error:
> ++
> 16/03/05 13:57:26 INFO spark.SparkContext: Created broadcast 138 from run
> at ThreadPoolExecutor.java:1145
> 16/03/05 13:57:26 INFO log.PerfLogger:  from=org.apache.hadoop.hive.ql.io.orc.ReaderImpl>
> *java.lang.RuntimeException: serious problem*
> *at *
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:1021)
> at
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.getSplits(OrcInputFormat.java:1048)
> at
> org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
> at scala.Option.getOrElse(Option.scala:120)
> 
> Caused by: java.util.concurrent.ExecutionException:
> java.io.FileNotFoundException: File does not exist: hdfs://
> sandbox.hortonworks.com:8020/apps/hive/warehouse/tgt_table
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:188)
> at
> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.generateSplitsInfo(OrcInputFormat.java:998)
> ... 138 more
> *Caused by: java.io.FileNotFoundException: File does not exist:
> hdfs://sandbox.hortonworks.com:8020/apps/hive/warehouse/tgt_table
> *
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1122)
> at
> org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
> ++
>
>
> Regards,
> Dhaval Modi
> dhavalmod...@gmail.com
>


Re: S3 DirectParquetOutputCommitter + PartitionBy + SaveMode.Append

2016-03-06 Thread Gourav Sengupta
Hi,

This is a solved problem, try using s3a instead and everything will be fine.

Besides that you might want to use coalesce or  partitionby or repartition
in order to see how many executors are being used to write (that speeds
things up quite a bit).

We had a write issue taking close to 50 min which is not running for lower
than 5 minutes.


Regards,
Gourav Sengupta

On Fri, Mar 4, 2016 at 8:59 PM, Jelez Raditchkov  wrote:

> Working on a streaming job with DirectParquetOutputCommitter to S3
> I need to use PartitionBy and hence SaveMode.Append
>
> Apparently when using SaveMode.Append spark automatically defaults to the
> default parquet output committer and ignores DirectParquetOutputCommitter.
>
> My problems are:
> 1. the copying to _temporary takes alot of time
> 2. I get job failures with: java.io.FileNotFoundException: File
> s3n://jelez/parquet-data/_temporary/0/task_201603040904_0544_m_07 does
> not exist.
>
> I have set:
> sparkConfig.set("spark.speculation", "false")
> sc.hadoopConfiguration.set("mapreduce.map.speculative", "false")
> sc.hadoopConfiguration.set("mapreduce.reduce.speculative",
> "false")
>
> Any ideas? Opinions? Best practices?
>
>


how to flatten the dataframe

2016-03-06 Thread shubham@celebal
root 
 |-- adultbasefare: long (nullable = true) 
 |-- adultcommission: long (nullable = true) 
 |-- adultservicetax: long (nullable = true) 
 |-- adultsurcharge: long (nullable = true) 
 |-- airline: string (nullable = true) 
 |-- arrdate: string (nullable = true) 
 |-- arrtime: string (nullable = true) 
 |-- cafecommission: long (nullable = true) 
 |-- carrierid: string (nullable = true) 
 |-- class: string (nullable = true) 
 |-- depdate: string (nullable = true) 
 |-- deptime: string (nullable = true) 
 |-- destination: string (nullable = true) 
 |-- discount: long (nullable = true) 
 |-- duration: string (nullable = true) 
 |-- fare: struct (nullable = true) 
 ||-- A: long (nullable = true) 
 ||-- C: long (nullable = true) 
 ||-- I: long (nullable = true) 
 ||-- adultairlinetxncharge: long (nullable = true) 
 ||-- adultairporttax: long (nullable = true) 
 ||-- adultbasefare: long (nullable = true) 
 ||-- adultcommission: double (nullable = true) 
 ||-- adultsurcharge: long (nullable = true) 
 ||-- adulttotalfare: long (nullable = true) 
 ||-- childairlinetxncharge: long (nullable = true) 
 ||-- childairporttax: long (nullable = true) 
 ||-- childbasefare: long (nullable = true) 
 ||-- childcommission: double (nullable = true) 
 ||-- childsurcharge: long (nullable = true) 
 ||-- childtotalfare: long (nullable = true) 
 ||-- discount: long (nullable = true) 
 ||-- infantairlinetxncharge: long (nullable = true) 
 ||-- infantairporttax: long (nullable = true) 
 ||-- infantbasefare: long (nullable = true) 
 ||-- infantcommission: long (nullable = true) 
 ||-- infantsurcharge: long (nullable = true) 
 ||-- infanttotalfare: long (nullable = true) 
 ||-- servicetax: long (nullable = true) 
 ||-- totalbasefare: long (nullable = true) 
 ||-- totalcommission: double (nullable = true) 
 ||-- totalfare: long (nullable = true) 
 ||-- totalsurcharge: long (nullable = true) 
 ||-- transactionfee: long (nullable = true) 
 |-- farebasis: string (nullable = true) 
 |-- farerule: string (nullable = true) 
 |-- flightcode: string (nullable = true) 
 |-- flightno: string (nullable = true) 
 |-- k: string (nullable = true) 
 |-- onwardflights: array (nullable = true) 
 ||-- element: string (containsNull = true) 
 |-- origin: string (nullable = true) 
 |-- promocode: string (nullable = true) 
 |-- promodiscount: long (nullable = true) 
 |-- promotionText: string (nullable = true) 
 |-- stops: string (nullable = true) 
 |-- tickettype: string (nullable = true) 
 |-- totalbasefare: long (nullable = true) 
 |-- totalcommission: long (nullable = true) 
 |-- totalfare: long (nullable = true) 
 |-- totalpriceamount: long (nullable = true) 
 |-- totalsurcharge: long (nullable = true) 
 |-- transactionfee: long (nullable = true) 
 |-- viacharges: long (nullable = true) 
 |-- warnings: string (nullable = true) 



Now i want to flatten it so that the fare field will be removed and
everything will be flatten 

For this i used explode. But i am getting an error: 

org.apache.spark.sql.AnalysisException: cannot resolve 'explode(fare)' due
to data type mismatch: input to function explode should be array or map
type, not StructType(StructField(A,LongType,true),
StructField(C,LongType,true), StructField(I,LongType,true),
StructField(adultairlinetxncharge,LongType,true),
StructField(adultairporttax,LongType,true),
StructField(adultbasefare,LongType,true),
StructField(adultcommission,DoubleType,true),
StructField(adultsurcharge,LongType,true),
StructField(adulttotalfare,LongType,true),
StructField(childairlinetxncharge,LongType,true),
StructField(childairporttax,LongType,true),
StructField(childbasefare,LongType,true),
StructField(childcommission,DoubleType,true),
StructField(childsurcharge,LongType,true),
StructField(childtotalfare,LongType,true),
StructField(discount,LongType,true),
StructField(infantairlinetxncharge,LongType,true),
StructField(infantairporttax,LongType,true),
StructField(infantbasefare,LongType,true),
StructField(infantcommission,LongType,true),
StructField(infantsurcharge,LongType,true),
StructField(infanttotalfare,LongType,true),
StructField(servicetax,LongType,true),
StructField(totalbasefare,LongType,true),
StructField(totalcommission,DoubleType,true),
StructField(totalfare,LongType,true),
StructField(totalsurcharge,LongType,true),
StructField(transactionfee,LongType,true)); 

If not explode how can i flatten it.Your help will be appreciated. Thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-flatten-the-dataframe-tp26411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How can I pass a Data Frame from object to another class

2016-03-06 Thread Mich Talebzadeh
Thanks for this tip

The way I do it is to pass SparckContext "sc" to method
firstquery.firstquerym by calling the following

val firstquery =  new FirstQuery
firstquery.firstquerym(sc, rs)


And creating the method as follows:

class FirstQuery {
   def firstquerym(sc: org.apache.spark.SparkContext, rs:
org.apache.spark.sql.DataFrame) {
   val sqlContext = SQLContext.getOrCreate(sc)
   println ("\nfirst query at"); sqlContext.sql("SELECT
FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
").collect.foreach(println)
  val rs1 =
rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
  }
}

This works. However, I don't seem to invoke getOrCreate without passing sc?

Is this the way you are implying. Also why "sc" is not available within the
life of JVM please

Thanks




Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com



On 6 March 2016 at 01:25, Ted Yu  wrote:

> Looking at the methods you call on HiveContext, they seem to belong
> to SQLContext.
>
> For SQLContext, you can use the below method of SQLContext in FirstQuery
> to retrieve SQLContext:
>
>   def getOrCreate(sparkContext: SparkContext): SQLContext = {
>
> FYI
>
> On Sat, Mar 5, 2016 at 3:37 PM, Mich Talebzadeh  > wrote:
>
>> I managed to sort this one out.
>>
>> The class should be defined as below with its method accepting two input
>> parameters for HiveContext and rs as below
>>
>> class FirstQuery {
>>def firstquerym(HiveContext: org.apache.spark.sql.hive.HiveContext,
>> rs: org.apache.spark.sql.DataFrame) {
>>println ("\nfirst query at"); HiveContext.sql("SELECT
>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>> ").collect.foreach(println)
>>   val rs1 =
>> rs.orderBy("calendar_month_desc","channel_desc").take(5).foreach(println)
>>   }
>> }
>>
>> and called from the main method as follows:
>>
>> val firstquery =  new FirstQuery
>> firstquery.firstquerym(HiveContext, rs)
>>
>>
>> Thanks
>>
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * 
>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> *
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>>
>> On 5 March 2016 at 20:56, Mich Talebzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>> I can use sbt to compile and run the following code. It works without
>>> any problem.
>>>
>>> I want to divide this into the obj and another class. I would like to do
>>> the result set joining tables identified by Data Frame 'rs' and then calls
>>> the method "firstquerym" in the class FirstQuery to do the calculation
>>> identified as "rs1"
>>>
>>> Now it needs "rs" to be available in class FrstQuery. Two questions
>>> please
>>>
>>>
>>>1. How can I pass rs to class FirstQuery
>>>2. Is there a better way of modularising this work so I can use
>>>methods defined in another class to be called in main method
>>>
>>> Thanks
>>>
>>> import org.apache.spark.SparkContext
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.sql.Row
>>> import org.apache.spark.sql.hive.HiveContext
>>> import org.apache.spark.sql.types._
>>> import org.apache.spark.sql.SQLContext
>>> import org.apache.spark.sql.functions._
>>> //
>>> object Harness4 {
>>>   def main(args: Array[String]) {
>>>   val conf = new
>>> SparkConf().setAppName("Harness4").setMaster("local[*]").set("spark.driver.allowMultipleContexts",
>>> "true")
>>>   val sc = new SparkContext(conf)
>>>   // Note that this should be done only after an instance of
>>> org.apache.spark.sql.SQLContext is created. It should be written as:
>>>   val sqlContext= new org.apache.spark.sql.SQLContext(sc)
>>>   import sqlContext.implicits._
>>>   val HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
>>> println ("\nStarted at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> HiveContext.sql("use oraclehadoop")
>>> var s =
>>> HiveContext.table("sales").select("AMOUNT_SOLD","TIME_ID","CHANNEL_ID")
>>> val c = HiveContext.table("channels").select("CHANNEL_ID","CHANNEL_DESC")
>>> val t =
>>> HiveContext.table("times").select("TIME_ID","CALENDAR_MONTH_DESC")
>>> println ("\ncreating data set at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> val rs =
>>> s.join(t,"time_id").join(c,"channel_id").groupBy("calendar_month_desc","channel_desc").agg(sum("amount_sold").as("TotalSales"))
>>> //println ("\nfirst query at"); HiveContext.sql("SELECT
>>> FROM_unixtime(unix_timestamp(), 'dd/MM/ HH:mm:ss.ss')
>>> ").collect.foreach(println)
>>> //val rs1 =
>>> 

Re: Spark Streaming fileStream vs textFileStream

2016-03-06 Thread Yuval.Itzchakov
I dont think the documentation can be anymore descriptive:

  /**
   * Create a input stream that monitors a Hadoop-compatible filesystem
   * for new files and reads them using the given key-value types and input
format.
   * Files must be written to the monitored directory by "moving" them from
another
   * location within the same file system. File names starting with . are
ignored.
   * @param directory HDFS directory to monitor for new file
   * @tparam K Key type for reading HDFS file
   * @tparam V Value type for reading HDFS file
   * @tparam F Input format for reading HDFS file
   */



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-fileStream-vs-textFileStream-tp26407p26410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Continuous deployment to Spark Streaming application with sessionization

2016-03-06 Thread Yuval.Itzchakov
I've been recently thinking about continuous deployment to our spark
streaming service.

We have a streaming application which does sessionization via
`mapWithState`, aggregating sessions in memory until they are ready to be
deployed.

Now, as I see things we have two use cases here:

1. Spark streaming DAG didn't change - In this particular case, there
shouldn't be a problem as the spark DAG is checkpointed every X seconds, so
we should have most of our state saved and loaded.

2. Streaming streaming DAG changed - As far as I understand, if the spark
DAG changes between releases, checkpointed data cannot be read and
initialized again. This means that we'd actually lose all the state that was
saved up until we terminated our job.

Has anyone thought about this scenario? How do you guys deal with this in
production?

Yuval.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Continuous-deployment-to-Spark-Streaming-application-with-sessionization-tp26409.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLLib + Streaming

2016-03-06 Thread Chris Miller
Guru:This is a really great response. Thanks for taking the time to explain
all of this. Helpful for me too.


--
Chris Miller

On Sun, Mar 6, 2016 at 1:54 PM, Guru Medasani  wrote:

> Hi Lan,
>
> Streaming Means, Linear Regression and Logistic Regression support online
> machine learning as you mentioned. Online machine learning is where model
> is being trained and updated on every batch of streaming data. These models
> have trainOn() and predictOn() methods where you can simply pass in
> DStreams you want to train the model on and DStreams you want the model to
> predict on. So when the next batch of data arrives model is trained and
> updated again. In this case model weights are continually updated and
> hopefully model performs better in terms of convergence and accuracy over
> time. What we are really trying to do in online learning case is that we
> are only showing few examples of the data at a time ( stream of data) and
> updating the parameters in case of Linear and Logistic Regression and
> updating the centers in case of K-Means. In the case of Linear or Logistic
> Regression this is possible due to the optimizer that is chosen for
> minimizing the cost function which is Stochastic Gradient Descent. This
> optimizer helps us to move closer and closer to the optimal weights after
> every batch and over the time we will have a model that has learned how to
> represent our data and predict well.
>
> In the scenario of using any MLlib algorithms and doing training with
> DStream.transform() and DStream.foreachRDD() operations, when the first
> batch of data arrives we build a model, let’s call this model1. Once you
> have the model1 you can make predictions on the same DStream or a different
> DStream source. But for the next batch if you follow the same procedure and
> create a model, let’s call this model2. This model2 will be significantly
> different than model1 based on how different the data is in the second
> DStream vs the first DStream as it is not continually updating the model.
> It’s like weight vectors are jumping from one place to the other for every
> batch and we never know if the algorithm is converging to the optimal
> weights. So I believe it is not possible to do true online learning with
> other MLLib models in Spark Streaming.  I am not sure if this is because
> the models don’t generally support this streaming scenarios or if the
> streaming versions simply haven’t been implemented yet.
>
> Though technically you can use any of the MLlib algorithms in Spark
> Streaming with the procedure you mentioned and make predictions, it is
> important to figure out if the model you are choosing can converge by
> showing only a subset(batches  - DStreams) of the data over time. Based on
> the algorithm you choose certain optimizers won’t necessarily be able to
> converge by showing only individual data points and require to see majority
> of the data to be able to learn optimal weights.  In these cases, you can
> still do offline learning/training with Spark bach processing using any of
> the MLlib algorithms and save those models on hdfs. You can then start a
> streaming job and load these saved models into your streaming application
> and make predictions. This is traditional offline learning.
>
> In general, online learning is hard as it’s hard to evaluate since we are
> not holding any test data during the model training. We are simply training
> the model and predicting. So in the initial batches, results can vary quite
> a bit and have significant errors in terms of the predictions. So choosing
> online learning vs. offline learning depends on how much tolerance the
> application can have towards wild predictions in the beginning. Offline
> training is simple and cheap where as online training can be hard and needs
> to be constantly monitored to see how it is performing.
>
> Hope this helps in understanding offline learning vs. online learning and
> which algorithms you can choose for online learning in MLlib.
>
> Guru Medasani
> gdm...@gmail.com
>
>
>
> > On Mar 5, 2016, at 7:37 PM, Lan Jiang  wrote:
> >
> > Hi, there
> >
> > I hope someone can clarify this for me.  It seems that some of the MLlib
> algorithms such as KMean, Linear Regression and Logistics Regression have a
> Streaming version, which can do online machine learning. But does that mean
> other MLLib algorithm cannot be used in Spark streaming applications, such
> as random forest, SVM, collaborate filtering, etc??
> >
> > DStreams are essentially a sequence of RDDs. We can use
> DStream.transform() and DStream.foreachRDD() operations, which allows you
> access RDDs in a DStream and apply MLLib functions on them. So it looks
> like all MLLib algorithms should be able to run in the streaming
> application. Am I wrong?
> >
> > Lan
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For 

Re: Add the sql record having same field.

2016-03-06 Thread Jacek Laskowski
What about sum?

Jacek
06.03.2016 7:28 AM "Angel Angel"  napisał(a):

> Hello,
> I have one table and 2 fields in it
> 1) item_id and
> 2) count
>
>
>
> i want to add the count field as per item (means group the item_ids)
>
> example
> Input
> itea_ID Count
> 500 2
> 200 6
> 500 4
> 100 3
> 200 6
>
>
> Required Output
>
> Result
> Itea_id Count
> 500 6
> 200 12
> 100 3
>
>
> I used the command the  Resut= Input.groupBy(Item_ID).count()
> output is
>
> 500   2
> 200   2
> 1001
>
> so how can i get the above output?
>
> Thnaks
>