Spark 1.5.2 + Hive 1.0.0 in Amazon EMR 4.2.0

2015-11-30 Thread Daniel Lopes
Hi,

I get this error when trying to write Spark DataFrame to Hive Table Stored
as TextFile


sqlContext.sql('INSERT OVERWRITE TABLE analytics.client_view_stock *(hive
table)* SELECT * FROM client_view_stock'*(spark temp table)*')

Erro:

15/11/30 21:40:14 INFO latency: StatusCode=[404],
Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
(Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
ID: 5ADBECA2D82A7C17), S3 Extended Request ID:
RcPfjgWaeXG62xyVRrAr91sVQNxktqbXUPJgK2cvZlf6SKEAOnWCtV9X9K1Vp9dAyDhGALQRBcU=],
ServiceName=*[Amazon S3], AWSErrorCode=[404 Not Found]*,
AWSRequestID=[5ADBECA2D82A7C17], ServiceEndpoint=[
https://my-bucket.s3.amazonaws.com], Exception=1,
HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
HttpClientPoolAvailableCount=1, ClientExecuteTime=[214.69],
HttpRequestTime=[214.245], HttpClientReceiveResponseTime=[212.513],
RequestSigningTime=[0.16], HttpClientSendRequestTime=[0.112],
15/11/30 21:40:21 INFO Hive: Replacing
src:s3://my-bucket/output/2015/11/29/client_view_stock/.hive-staging_hive_2015-11-30_21-19-48_942_238078420083598647-1/-ext-1/part-00199,
dest: s3://my-bucket/output/2015/11/29/client_view_stock/part-00199,
Status:true
-chgrp: '' does not match expected pattern for group
Usage: hadoop fs [generic options] -chgrp [-R] GROUP PATH...
15/11/30 21:40:21 INFO latency: StatusCode=[200], ServiceName=[Amazon S3],
AWSRequestID=[2509AE55A8D71A61], ServiceEndpoint=[https://my-bucket.
s3.amazonaws.com], HttpClientPoolLeasedCount=0, RequestCount=1,
HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1,
ClientExecuteTime=[137.387], HttpRequestTime=[136.721],
HttpClientReceiveResponseTime=[134.805], RequestSigningTime=[0.235],
ResponseProcessingTime=[0.169], HttpClientSendRequestTime=[0.145],
15/11/30 21:40:21 WARN RetryingMetaStoreClient: MetaStoreClient lost
connection. Attempting to reconnect.
org.apache.thrift.TApplication*Exception: Invalid method name:
'alter_table_with_cascade'*

Thanks!

-- 
*Daniel Lopes, B.Eng*
Data Scientist - BankFacil
CREA/SP 5069410560

Mob +55 (18) 99764-2733 
Ph +55 (11) 3522-8009
http://about.me/dannyeuu

Av. Nova Independência, 956, São Paulo, SP
Bairro Brooklin Paulista
CEP 04570-001
https://www.bankfacil.com.br


Re: Help with type check

2015-11-30 Thread Jakob Odersky
Hi Eyal,

what you're seeing is not a Spark issue, it is related to boxed types.

I assume 'b' in your code is some kind of java buffer, where b.getDouble()
returns an instance of java.lang.Double and not a scala.Double. Hence
muCouch is an Array[java.lang.Double], an array containing boxed doubles.

To fix your problem, change 'yield b.getDouble(i)' to 'yield
b.getDouble(i).doubleValue'

You might want to have a look at these too:
-
http://stackoverflow.com/questions/23821576/efficient-conversion-of-java-util-listjava-lang-double-to-scala-listdouble
- https://docs.oracle.com/javase/7/docs/api/java/lang/Double.html
- http://www.scala-lang.org/api/current/index.html#scala.Double

On 30 November 2015 at 10:13, Eyal Sharon  wrote:

> Hi ,
>
> I have problem with inferring what are the types bug here
>
> I have this code fragment . it parse Json to Array[Double]
>
>
>
>
>
>
> *val muCouch = {  val e = input.filter( _.id=="mu")(0).content()  val b  = 
> e.getArray("feature_mean")  for (i <- 0 to e.getInt("features") ) yield 
> b.getDouble(i)}.toArray*
>
> Now the problem is when I want to create a dense vector  :
>
> *new DenseVector(muCouch)*
>
>
> I get the following error :
>
>
> *Error:(111, 21) type mismatch;
>  found   : Array[java.lang.Double]
>  required: Array[scala.Double] *
>
>
> Now , I probably get a workaround for that , but I want to get a deeper 
> understanding  on why it occurs
>
> p.s - I do use collection.JavaConversions._
>
> Thanks !
>
>
>
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. Please note that any disclosure, copying or distribution of the
> content of this information is strictly forbidden. If you have received
> this email message in error, please destroy it immediately and notify its
> sender.*
>


RE: Cant start master on windows 7

2015-11-30 Thread Tim Barthram
Hi Jacek,

To run a spark master on my windows box, I've created a .bat file with contents 
something like:

.\bin\spark-class.cmd org.apache.spark.deploy.master.Master --host 


For the worker:

.\bin\spark-class.cmd org.apache.spark.deploy.worker.Worker spark://:7077


To wrap these in services, I've user yasw or nssm.

Thanks,
Tim




-Original Message-
From: Jacek Laskowski [mailto:ja...@japila.pl] 
Sent: Tuesday, 1 December 2015 4:18 AM
To: Shuo Wang
Cc: user
Subject: Re: Cant start master on windows 7

On Fri, Nov 27, 2015 at 4:27 PM, Shuo Wang  wrote:

> I am trying to use the start-master.sh script on windows 7.

From http://spark.apache.org/docs/latest/spark-standalone.html:

"Note: The launch scripts do not currently support Windows. To run a
Spark cluster on Windows, start the master and workers by hand."

Can you start the command by hand? Just copy and paste the command
from the logs. Mind the spaces!

Jacek

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org


_

The information transmitted in this message and its attachments (if any) is 
intended 
only for the person or entity to which it is addressed.
The message may contain confidential and/or privileged material. Any review, 
retransmission, dissemination or other use of, or taking of any action in 
reliance 
upon this information, by persons or entities other than the intended recipient 
is 
prohibited.

If you have received this in error, please contact the sender and delete this 
e-mail 
and associated material from any computer.

The intended recipient of this e-mail may only use, reproduce, disclose or 
distribute 
the information contained in this e-mail and any attached files, with the 
permission 
of the sender.

This message has been scanned for viruses.
_

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



PySpark failing on a mid-sized broadcast

2015-11-30 Thread ameyc
So I'm running PySpark 1.3.1 on Amazon EMR on a fairly beefy cluster (20 node
cluster with 32 cores each node and 64 gig memory) and my parallelism,
executor.instances, executor.cores and executor memory settings are also
fairly reasonable (600, 20, 30, 48gigs).

However my job invariably fails when trying to use a 200MB broadcast in a
closure as YARN starts killing containers for running beyond physical memory
limits. Looking at my node manager logs on slaves, it seems that PySpark is
spawning too many pyspark daemons which are using up more than the off-heap
memory would allow and playing with yarn.executor.memoryOverhead property
doesnt seem to make much of a difference.

Has anyone else come across this before?

- Amey



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-failing-on-a-mid-sized-broadcast-tp25520.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



dfs.blocksize is not applicable to some cases

2015-11-30 Thread Jung
Hello,
I use Spark 1.4.1 and Hadoop 2.2.0.
It may be a stupid question but I cannot understand why "dfs.blocksize" in 
hadoop option doesn't affect the number of blocks sometimes.
When I run the script below,

  val BLOCK_SIZE = 1024 * 1024 * 512 // set to 512MB, hadoop default is 128MB
  sc.hadoopConfiguration.setInt("parquet.block.size", BLOCK_SIZE)
  sc.hadoopConfiguration.setInt("dfs.blocksize",BLOCK_SIZE)
  sc.parallelize(1 to 5, 
24).repartition(3).toDF.saveAsTable("partition_test")

it creates 3 files like this.

  221.1 M  /user/hive/warehouse/partition_test/part-r-1.gz.parquet
  221.1 M  /user/hive/warehouse/partition_test/part-r-2.gz.parquet
  221.1 M  /user/hive/warehouse/partition_test/part-r-3.gz.parquet

To check how many blocks in a file, I enter the command "hdfs fsck 
/user/hive/warehouse/partition_test/part-r-1.gz.parquet -files -blocks".

  Total blocks (validated):  1 (avg. block size 231864402 B)

It is normal case because maximum blocksize change from 128MB to 512MB.
In the real world, I have a bunch of files.

  14.4 M  /user/hive/warehouse/data_1g/part-r-1.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-2.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-3.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-4.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-5.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-6.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-7.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-8.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-9.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00010.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00011.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00012.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00013.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00014.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00015.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00016.gz.parquet

Each file consists of 1block (avg. block size 15141395 B) and I run the almost 
same code as first.

  val BLOCK_SIZE = 1024 * 1024 * 512 // set to 512MB, hadoop default is 128MB
  sc.hadoopConfiguration.setInt("parquet.block.size", BLOCK_SIZE)
  sc.hadoopConfiguration.setInt("dfs.blocksize",BLOCK_SIZE)
  sqlContext.table("data_1g").repartition(1).saveAsTable("partition_test2")

It creates one file.

 231.0 M  /user/hive/warehouse/partition_test2/part-r-1.gz.parquet

But it consists of 2 blocks. It seems dfs.blocksize is not applicable.

  /user/hive/warehouse/partition_test2/part-r-1.gz.parquet 242202143 bytes, 
2 block(s):  OK
  0. BP-2098986396-192.168.100.1-1389779750403:blk_1080124727_6385839 
len=134217728 repl=2
  1. BP-2098986396-192.168.100.1-1389779750403:blk_1080124728_6385840 
len=107984415 repl=2

Because of this, Spark read it as 2partition even though I repartition data 
into 1partition. If the file size after repartitioning is a little more 128MB 
and save it again, it writes 2 files like 128Mb, 1MB.
It is very important for me because I use repartition method many times. Please 
help me figure out.

  Jung

Re: Spark Streaming on mesos

2015-11-30 Thread Iulian Dragoș
Hi,

Latency isn't such a big issue as it sounds. Did you try it out and failed
some performance metrics?

In short, the *Mesos* executor on a given slave is going to be long-running
(consuming memory, but no CPUs). Each Spark task will be scheduled using
Mesos CPU resources, but they don't suffer much latency.

iulian


On Mon, Nov 30, 2015 at 4:17 AM, Renjie Liu  wrote:

> Hi, Tim:
> Fine grain mode is not suitable for streaming applications since it need
> to start up an executor each time. When will the revamp get release? In the
> coming 1.6.0?
>
> On Sun, Nov 29, 2015 at 6:16 PM Timothy Chen  wrote:
>
>> Hi Renjie,
>>
>> You can set number of cores per executor with spark executor cores in
>> fine grain mode.
>>
>> If you want coarse grain mode to support that it will
>> Be supported in the near term as he coarse grain scheduler is getting
>> revamped now.
>>
>> Tim
>>
>> On Nov 28, 2015, at 7:31 PM, Renjie Liu  wrote:
>>
>> Hi, Nagaraj:
>>  Thanks for the response, but this does not solve my problem.
>> I think executor memory should be proportional to number of cores, or
>> number of core
>> in each executor should be the same.
>> On Sat, Nov 28, 2015 at 1:48 AM Nagaraj Chandrashekar <
>> nchandrashe...@innominds.com> wrote:
>>
>>> Hi Renjie,
>>>
>>> I have not setup Spark Streaming on Mesos but there is something called
>>> reservations in Mesos.  It supports both Static and Dynamic reservations.
>>> Both types of reservations must have role defined. You may want to explore
>>> these options.   Excerpts from the Apache Mesos documentation.
>>>
>>> Cheers
>>> Nagaraj C
>>> Reservation
>>>
>>> Mesos provides mechanisms to reserve resources in specific slaves. The
>>> concept was first introduced with static reservation in 0.14.0 which
>>> enabled operators to specify the reserved resources on slave startup. This
>>> was extended with dynamic reservation in 0.23.0 which enabled operators
>>> and authorized frameworks to dynamically reserve resources in the
>>> cluster.
>>>
>>> No breaking changes were introduced with dynamic reservation, which
>>> means the existing static reservation mechanism continues to be fully
>>> supported.
>>>
>>> In both types of reservations, resources are reserved for a role.
>>> Static Reservation (since 0.14.0)
>>>
>>> An operator can configure a slave with resources reserved for a role.
>>> The reserved resources are specified via the --resources flag. For
>>> example, suppose we have 12 CPUs and 6144 MB of RAM available on a slave
>>> and that we want to reserve 8 CPUs and 4096 MB of RAM for the ads role.
>>> We start the slave like so:
>>>
>>> $ mesos-slave \
>>>   --master=: \
>>>   --resources="cpus:4;mem:2048;cpus(ads):8;mem(ads):4096"
>>>
>>> We now have 8 CPUs and 4096 MB of RAM reserved for ads on this slave.
>>>
>>>
>>> From: Renjie Liu 
>>> Date: Friday, November 27, 2015 at 9:57 PM
>>> To: "user@spark.apache.org" 
>>> Subject: Spark Streaming on mesos
>>>
>>> Hi, all:
>>> I'm trying to run spark streaming on mesos and it seems that none of the
>>> scheduler is suitable for that. Fine grain scheduler will start an executor
>>> for each task so it will significantly increase the latency. While coarse
>>> grained mode can only set the max core numbers and executor memory but
>>> there's no way to set the number of cores for each executor. Has anyone
>>> deployed spark streaming on mesos? And what's your settings?
>>> --
>>> Liu, Renjie
>>> Software Engineer, MVAD
>>>
>> --
>> Liu, Renjie
>> Software Engineer, MVAD
>>
>> --
> Liu, Renjie
> Software Engineer, MVAD
>



-- 

--
Iulian Dragos

--
Reactive Apps on the JVM
www.typesafe.com


Re: Cant start master on windows 7

2015-11-30 Thread Jacek Laskowski
On Fri, Nov 27, 2015 at 4:27 PM, Shuo Wang  wrote:

> I am trying to use the start-master.sh script on windows 7.

>From http://spark.apache.org/docs/latest/spark-standalone.html:

"Note: The launch scripts do not currently support Windows. To run a
Spark cluster on Windows, start the master and workers by hand."

Can you start the command by hand? Just copy and paste the command
from the logs. Mind the spaces!

Jacek

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Logs of Custom Receiver

2015-11-30 Thread Matthias Niehoff
Hi,

I've built a customer receiver and deployed an application using this
receiver to a cluster.
When I run the application locally I see the log output my logger in
Stdout/Stderr but when I run it  on the cluster I don't see the log output
in Stdout/Stderr.
I just see the logs in the constructor but no following statements in the
start() method and other methods called from there. (All log at the same
level)
Where do I find this log statements?

Thanks!


-- 
Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
172.1702676
www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
www.more4fi.de

Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz

Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
nicht gestattet


Re: Debug Spark

2015-11-30 Thread Jacek Laskowski
Hi,

Yes, that's possible -- I'm doing it every day in local and standalone modes.

Just use SPARK_PRINT_LAUNCH_COMMAND=1 before any Spark command, i.e.
spark-submit, spark-shell, to know the command to start it:

$ SPARK_PRINT_LAUNCH_COMMAND=1 ./bin/spark-shell

SPARK_PRINT_LAUNCH_COMMAND environment variable controls whether the
Spark launch command is printed out to the standard error output, i.e.
System.err, or not.

Once you've got the command, add the following command-line option to
enable JDWP agent and have it suspended (suspend=y) until a remote
debugging client connects (on port 5005):

-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005

In IntelliJ IDEA, define a new debug configuration for Remote and
press Debug. You're done.

https://www.jetbrains.com/idea/help/debugging-2.html might help.

Pozdrawiam,
Jacek

--
Jacek Laskowski | https://medium.com/@jaceklaskowski/ |
http://blog.jaceklaskowski.pl
Mastering Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski


On Sun, Nov 29, 2015 at 5:18 PM, Masf  wrote:
> Hi
>
> Is it possible to debug spark locally with IntelliJ or another IDE?
>
> Thanks
>
> --
> Regards.
> Miguel Ángel

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to get a single available message from kafka (case where OffsetRange.fromOffset == OffsetRange.untilOffset)

2015-11-30 Thread Cody Koeninger
If you had exactly 1 message in the 0th topicpartition, to read it you
would use

OffsetRange("topicname", 0, 0, 1)

Kafka's simple shell consumer in that case would print

next offset = 1


So instead trying to consume

OffsetRange("topicname", 0, 1, 2)
shouldn't be expected to work



On Sat, Nov 28, 2015 at 8:35 AM, Nikos Viorres  wrote:

> Hi,
>
> I am using KafkaUtils.createRDD to retrieve data from Kafka for batch
> processing and
> when Invoking KafkaUtils.createRDD with an OffsetRange where
> OffsetRange.fromOffset == OffsetRange.untilOffset for a particular
> partition, i get an empy RDD.
> Documentation is clear that until is exclusive and from inclusive, but if
> i use OffsetRange.untilOffset + 1 i get an invalid OffsetRange exception
> during the check.
> Sinve this should also apply in general (if untilOffset is exculsive you
> cannot fetch it ), does it mean that untilOffset is also non-existent in
> Kafka (and thus always exlcusive) or i am missing something?
>
> regards
>
> p.s. by manually using the kafka protocol to query the offsets i see
> that kafka.api.OffsetRequest.EarliestTime()
> == kafka.api.OffsetRequest.LatestTime() and set to a poisitive value
>


Re: Spark directStream with Kafka and process the lost messages.

2015-11-30 Thread Guillermo Ortiz
Then,,, something is wrong in my code ;), thanks.

2015-11-30 16:46 GMT+01:00 Cody Koeninger :

> Starting from the checkpoint using getOrCreate should be sufficient if all
> you need is at-least-once semantics
>
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
>
> On Mon, Nov 30, 2015 at 9:38 AM, Guillermo Ortiz 
> wrote:
>
>> Hello,
>>
>> I have Spark and Kafka with directStream. I'm trying that if Spark dies
>> it could process all those messages when it starts.  The offsets are stored
>> in chekpoints but I don't know how I could say to Spark to start in that
>> point.
>> I saw that there's another createDirectStream method with a fromOffsets
>> parameter but, how could I access to the offsets?
>>
>> val ssc = new StreamingContext(sparkConf, Seconds(5))
>> ssc.checkpoint(checkpoint)
>> val directKafkaStream = KafkaUtils.createDirectStream[String, String, 
>> StringDecoder, StringDecoder](ssc, kafkaBrokers, topic)
>>
>>
>


Spark directStream with Kafka and process the lost messages.

2015-11-30 Thread Guillermo Ortiz
Hello,

I have Spark and Kafka with directStream. I'm trying that if Spark dies it
could process all those messages when it starts.  The offsets are stored in
chekpoints but I don't know how I could say to Spark to start in that point.
I saw that there's another createDirectStream method with a fromOffsets
parameter but, how could I access to the offsets?

val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint(checkpoint)
val directKafkaStream = KafkaUtils.createDirectStream[String, String,
StringDecoder, StringDecoder](ssc, kafkaBrokers, topic)


Re: Spark Streaming on mesos

2015-11-30 Thread Renjie Liu
Hi, Lulian:
Are you sure that it'll be a long running process in fine-grained mode? I
think you have a misunderstanding about it. An executor will be launched
for some tasks, but not a long running process. When a group of tasks
finished, it will get shutdown.

On Mon, Nov 30, 2015 at 6:25 PM Iulian Dragoș 
wrote:

> Hi,
>
> Latency isn't such a big issue as it sounds. Did you try it out and failed
> some performance metrics?
>
> In short, the *Mesos* executor on a given slave is going to be
> long-running (consuming memory, but no CPUs). Each Spark task will be
> scheduled using Mesos CPU resources, but they don't suffer much latency.
>
> iulian
>
>
> On Mon, Nov 30, 2015 at 4:17 AM, Renjie Liu 
> wrote:
>
>> Hi, Tim:
>> Fine grain mode is not suitable for streaming applications since it need
>> to start up an executor each time. When will the revamp get release? In the
>> coming 1.6.0?
>>
>> On Sun, Nov 29, 2015 at 6:16 PM Timothy Chen  wrote:
>>
>>> Hi Renjie,
>>>
>>> You can set number of cores per executor with spark executor cores in
>>> fine grain mode.
>>>
>>> If you want coarse grain mode to support that it will
>>> Be supported in the near term as he coarse grain scheduler is getting
>>> revamped now.
>>>
>>> Tim
>>>
>>> On Nov 28, 2015, at 7:31 PM, Renjie Liu  wrote:
>>>
>>> Hi, Nagaraj:
>>>  Thanks for the response, but this does not solve my problem.
>>> I think executor memory should be proportional to number of cores, or
>>> number of core
>>> in each executor should be the same.
>>> On Sat, Nov 28, 2015 at 1:48 AM Nagaraj Chandrashekar <
>>> nchandrashe...@innominds.com> wrote:
>>>
 Hi Renjie,

 I have not setup Spark Streaming on Mesos but there is something called
 reservations in Mesos.  It supports both Static and Dynamic reservations.
 Both types of reservations must have role defined. You may want to explore
 these options.   Excerpts from the Apache Mesos documentation.

 Cheers
 Nagaraj C
 Reservation

 Mesos provides mechanisms to reserve resources in specific slaves. The
 concept was first introduced with static reservation in 0.14.0 which
 enabled operators to specify the reserved resources on slave startup. This
 was extended with dynamic reservation in 0.23.0 which enabled
 operators and authorized frameworks to dynamically reserve resources
 in the cluster.

 No breaking changes were introduced with dynamic reservation, which
 means the existing static reservation mechanism continues to be fully
 supported.

 In both types of reservations, resources are reserved for a role.
 Static Reservation (since 0.14.0)

 An operator can configure a slave with resources reserved for a role.
 The reserved resources are specified via the --resources flag. For
 example, suppose we have 12 CPUs and 6144 MB of RAM available on a slave
 and that we want to reserve 8 CPUs and 4096 MB of RAM for the ads role.
 We start the slave like so:

 $ mesos-slave \
   --master=: \
   --resources="cpus:4;mem:2048;cpus(ads):8;mem(ads):4096"

 We now have 8 CPUs and 4096 MB of RAM reserved for ads on this slave.


 From: Renjie Liu 
 Date: Friday, November 27, 2015 at 9:57 PM
 To: "user@spark.apache.org" 
 Subject: Spark Streaming on mesos

 Hi, all:
 I'm trying to run spark streaming on mesos and it seems that none of
 the scheduler is suitable for that. Fine grain scheduler will start an
 executor for each task so it will significantly increase the latency. While
 coarse grained mode can only set the max core numbers and executor memory
 but there's no way to set the number of cores for each executor. Has anyone
 deployed spark streaming on mesos? And what's your settings?
 --
 Liu, Renjie
 Software Engineer, MVAD

>>> --
>>> Liu, Renjie
>>> Software Engineer, MVAD
>>>
>>> --
>> Liu, Renjie
>> Software Engineer, MVAD
>>
>
>
>
> --
>
> --
> Iulian Dragos
>
> --
> Reactive Apps on the JVM
> www.typesafe.com
>
> --
Liu, Renjie
Software Engineer, MVAD


Re: Permanent RDD growing with Kafka DirectStream

2015-11-30 Thread Cody Koeninger
Can you post the relevant code?

On Fri, Nov 27, 2015 at 4:25 AM, u...@moosheimer.com 
wrote:

> Hi,
>
> we have some strange behavior with KafkaUtils DirectStream and the size of
> the MapPartitionsRDDs.
>
> We use a permanent direct steam where we consume about 8.500 json
> messages/sec.
> The json messages are read, some information are extracted and the result
> of each json is a string which collect/group with reduceByKeyAndWindow.
>
> The windowLength and slideInterval are both 60 sec.
> The result of the window will be send back to another Kafka topic.
> The batch duration is 20 seconds.
>
> The RDDs are growing all the time and the data transfered to the executors
> is growing and growing.
> Since the RDDs are growing, the time to send them over the network and the
> time for processing is growing, too.
>
> We start with a processing time of 4 seconds. After 20 hours we reach 10
> seconds.
> The processing time is growing and growing and on some point the
> processing time is permanently over 20 seconds (the batch duration time)
> and Spark will run OOM or get other problems.
>
> Has anybody any idea how to fix this?
>
> We are using Spark 1.5.2 and Kafka 0.8.2.2. We read two Kafka topics - one
> with 200 Kafka partitions and one with 20 Kafka partitions.
> Spark runs in standalone cluster mode with three instances on AWS.
> We use 3 worker nodes with at all 23 executors.
>
> We start the app with these parameters:
> --conf "spark.akka.frameSize=160"
> --conf "spark.cleaner.referenceTracking.blocking=true"
> --conf "spark.cleaner.referenceTracking.blocking.shuffle=true"
> --conf "spark.cleaner.referenceTracking.cleanCheckpoints=true"
> --conf "spark.cleaner.ttl=600"
> --conf "spark.default.parallelism=69"
> --conf "spark.executor.cores=1"
> --conf "spark.executor.memory=7g"
> --conf "spark.kryoserializer.buffer.max=256m"
> --conf "spark.rrd.compress=true"
> --conf "spark.storage.memoryFraction=0.2"
> --conf "spark.streaming.backpressure.enabled=true"
> --conf "spark.streaming.kafka.maxRatePerPartition=75"
> --conf "spark.streaming.receiver.maxRate=15000"
> --conf "spark.streaming.stopGracefullyOnShutdown=true"
> --deploy-mode cluster
> --supervise
>
> Thanks,
> Uwe
>


Re: Spark directStream with Kafka and process the lost messages.

2015-11-30 Thread Cody Koeninger
Starting from the checkpoint using getOrCreate should be sufficient if all
you need is at-least-once semantics

http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing

On Mon, Nov 30, 2015 at 9:38 AM, Guillermo Ortiz 
wrote:

> Hello,
>
> I have Spark and Kafka with directStream. I'm trying that if Spark dies it
> could process all those messages when it starts.  The offsets are stored in
> chekpoints but I don't know how I could say to Spark to start in that point.
> I saw that there's another createDirectStream method with a fromOffsets
> parameter but, how could I access to the offsets?
>
> val ssc = new StreamingContext(sparkConf, Seconds(5))
> ssc.checkpoint(checkpoint)
> val directKafkaStream = KafkaUtils.createDirectStream[String, String, 
> StringDecoder, StringDecoder](ssc, kafkaBrokers, topic)
>
>


Re: Logs of Custom Receiver

2015-11-30 Thread Ted Yu
Have you seen this thread ?
http://search-hadoop.com/m/q3RTtEor1vYWbsW=RE+Configuring+Log4J+Spark+1+5+on+EMR+4+1+

which mentioned SPARK-11105

FYI

2015-11-30 9:00 GMT-08:00 Matthias Niehoff 
:

> Hi,
>
> I've built a customer receiver and deployed an application using this
> receiver to a cluster.
> When I run the application locally I see the log output my logger in
> Stdout/Stderr but when I run it  on the cluster I don't see the log output
> in Stdout/Stderr.
> I just see the logs in the constructor but no following statements in the
> start() method and other methods called from there. (All log at the same
> level)
> Where do I find this log statements?
>
> Thanks!
>
>
> --
> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
> 172.1702676
> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
> www.more4fi.de
>
> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen Schütz
>
> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält vertrauliche
> und/oder rechtlich geschützte Informationen. Wenn Sie nicht der richtige
> Adressat sind oder diese E-Mail irrtümlich erhalten haben, informieren Sie
> bitte sofort den Absender und löschen Sie diese E-Mail und evtl.
> beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder Öffnen
> evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser E-Mail ist
> nicht gestattet
>


Re: SparkException: Failed to get broadcast_10_piece0

2015-11-30 Thread Spark Newbie
Pinging again ...

On Wed, Nov 25, 2015 at 4:19 PM, Ted Yu  wrote:

> Which Spark release are you using ?
>
> Please take a look at:
> https://issues.apache.org/jira/browse/SPARK-5594
>
> Cheers
>
> On Wed, Nov 25, 2015 at 3:59 PM, Spark Newbie 
> wrote:
>
>> Hi Spark users,
>>
>> I'm seeing the below exceptions once in a while which causes tasks to
>> fail (even after retries, so it is a non recoverable exception I think),
>> hence stage fails and then the job gets aborted.
>>
>> Exception ---
>> java.io.IOException: org.apache.spark.SparkException: Failed to get
>> broadcast_10_piece0 of broadcast_10
>>
>> Any idea why this exception occurs and how to avoid/handle these
>> exceptions? Please let me know if you have seen this exception and know a
>> fix for it.
>>
>> Thanks,
>> Bharath
>>
>
>


Obtaining Job Id for query submitted via Spark Thrift Server

2015-11-30 Thread Jagrut Sharma
Is there a way to get the Job Id for a query submitted via the Spark Thrift
Server? This would allow checking the status of that specific job via the
History Server.

Currently, I'm getting status of all jobs, and then filtering the results.
Looking for a more efficient approach.

Test environment is: Spark 1.4.1, Hive 0.13.1, running on YARN

Thanks!
-- 
Jagrut


Re: how to using local repository in spark[dev]

2015-11-30 Thread Jacek Laskowski
Hi,

Maven uses its own repo as does sbt. To cross the repo boundaries use
the following:

resolvers += Resolver.mavenLocal

in your build.sbt or any other build definition as described in
http://www.scala-sbt.org/0.13/tutorial/Library-Dependencies.html#Resolvers.

You did it so let's give the other options a try.

Can you show the exact location of the jar you want your Spark app to
depend on (using `ls`) and how you defined the dependency in
build.sbt?

Pozdrawiam,
Jacek

--
Jacek Laskowski | https://medium.com/@jaceklaskowski/ |
http://blog.jaceklaskowski.pl
Mastering Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski


On Fri, Nov 27, 2015 at 9:03 AM, lihu  wrote:
> Hi, All:
>
>  I modify the spark code and try to use some extra jars in Spark, the
> extra jars is published in my local maven repository using mvn install.
>  However the sbt can not find this jars file, even I can find this jar
> fils under /home/myname/.m2/resposiroty.
> I can guarantee that the local m2 repository is added in the resolvers,
> because I get the following resolvers using show resolvers command.
>
>
> List(central: https://repo1.maven.org/maven2, apache-repo:
> https://repository.apache.org/content/repositories/releases, jboss-repo:
> https://repository.jboss.org/nexus/content/repositories/releases, mqtt-repo:
> https://repo.eclipse.org/content/repositories/paho-releases, cloudera-repo:
> https://repository.cloudera.com/artifactory/cloudera-repos,
> spark-hive-staging:
> https://oss.sonatype.org/content/repositories/orgspark-project-1113,
> mapr-repo: http://repository.mapr.com/maven/, spring-releases:
> https://repo.spring.io/libs-release, twttr-repo: http://maven.twttr.com,
> apache.snapshots: http://repository.apache.org/snapshots, cache:Maven2
> Local: /home/myname/.m2/repository)
>
>
> Does anyone know how to deal with this. In fact, some days ago this can
> work, but after update my custom jar file and install again recently, it can
> not work now.
>
> Environment: spark1.5  sbt 0.13.7/0.13.9

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on yarn vs spark standalone

2015-11-30 Thread Jacek Laskowski
Hi,

My understanding of Spark on YARN and even Spark in general is very
limited so keep that in mind.

I'm not sure why you compare yarn-cluster and spark standalone? In
yarn-cluster a driver runs on a node in the YARN cluster while spark
standalone keeps the driver on the machine you launched a Spark
application. Also, YARN cluster supports retrying applications while
standalone doesn't. There's also support for rack locality preference
(but dunno if that's used and where in Spark).

My limited understanding suggests me to use Spark on YARN if you're
considering to use Hadoop/HDFS and submitting jobs using YARN.
Standalone's an entry option where throwing in YARN could kill
introducing Spark to organizations without Hadoop YARN.

Just my two cents.

Pozdrawiam,
Jacek

--
Jacek Laskowski | https://medium.com/@jaceklaskowski/ |
http://blog.jaceklaskowski.pl
Mastering Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski


On Fri, Nov 27, 2015 at 8:36 AM, cs user  wrote:
> Hi All,
>
> Apologies if this question has been asked before. I'd like to know if there
> are any downsides to running spark over yarn with the --master yarn-cluster
> option vs having a separate spark standalone cluster to execute jobs?
>
> We're looking at installing a hdfs/hadoop cluster with Ambari and submitting
> jobs to the cluster using yarn, or having an Ambari cluster and a separate
> standalone spark cluster, which will run the spark jobs on data within hdfs.
>
> With yarn, will we still get all the benefits of spark?
>
> Will it be possible to process streaming data?
>
> Many thanks in advance for any responses.
>
> Cheers!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark, Windows 7 python shell non-reachable ip address

2015-11-30 Thread Jacek Laskowski
Hi,

I'd call it a known issue on Windows, and have no solution, but using
SPARK_LOCAL_HOSTNAME or SPARK_LOCAL_IP before starting pyshell to
*work it around*.

I wished I had access to Win7 to work on it longer and find a decent
solution (not a workaround).

If you have Scala REPL, execute java.net.InetAddress.getLocalHost()
that Spark executes under the covers before running into the
network-related issue.

Pozdrawiam,
Jacek

--
Jacek Laskowski | https://medium.com/@jaceklaskowski/ |
http://blog.jaceklaskowski.pl
Mastering Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski


On Thu, Nov 26, 2015 at 8:33 AM, Shuo Wang  wrote:
> I am not sure if my message is getting through the mailing list.
>
> After running these two lines in the Quick Start example in spark's python
> shell on windows 7.
>
 textFile = sc.textFile("README.md")
>
 textFile.count()
>
> I am getting the following error:
>
 textFile.count()
>
> 15/11/25 19:57:01 WARN : Your hostname, oh_t-PC resolves to a
> loopback/non-reachable address: fe80:0:0:0:84b:213f:3f57:fef6%net5, but we
> couldn't find  any external IP address!
>   Traceback (most recent call last):
>   File "", line 1, in 
>   File "C:\spark-1.5.2-bin-hadoop2.6\python\pyspark\rdd.py", line 1006,
> in count
>  
>
> Any idea what is going wrong here?
> --
> 王硕
> 邮箱:shuo.x.w...@gmail.com
> Whatever your journey, keep walking.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: dfs.blocksize is not applicable to some cases

2015-11-30 Thread Ted Yu
I am not expert in Parquet.

Looking at PARQUET-166, it seems that parquet.block.size should be lower
than dfs.blocksize

Have you tried Spark 1.5.2 to see if the problem persists ?

Cheers

On Mon, Nov 30, 2015 at 1:55 AM, Jung  wrote:

> Hello,
> I use Spark 1.4.1 and Hadoop 2.2.0.
> It may be a stupid question but I cannot understand why "dfs.blocksize" in
> hadoop option doesn't affect the number of blocks sometimes.
> When I run the script below,
>
>   val BLOCK_SIZE = 1024 * 1024 * 512 // set to 512MB, hadoop default is
> 128MB
>   sc.hadoopConfiguration.setInt("parquet.block.size", BLOCK_SIZE)
>   sc.hadoopConfiguration.setInt("dfs.blocksize",BLOCK_SIZE)
>   sc.parallelize(1 to 5,
> 24).repartition(3).toDF.saveAsTable("partition_test")
>
> it creates 3 files like this.
>
>   221.1 M  /user/hive/warehouse/partition_test/part-r-1.gz.parquet
>   221.1 M  /user/hive/warehouse/partition_test/part-r-2.gz.parquet
>   221.1 M  /user/hive/warehouse/partition_test/part-r-3.gz.parquet
>
> To check how many blocks in a file, I enter the command "hdfs fsck
> /user/hive/warehouse/partition_test/part-r-1.gz.parquet -files -blocks".
>
>   Total blocks (validated):  1 (avg. block size 231864402 B)
>
> It is normal case because maximum blocksize change from 128MB to 512MB.
> In the real world, I have a bunch of files.
>
>   14.4 M  /user/hive/warehouse/data_1g/part-r-1.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-2.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-3.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-4.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-5.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-6.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-7.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-8.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-9.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-00010.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-00011.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-00012.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-00013.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-00014.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-00015.gz.parquet
>   14.4 M  /user/hive/warehouse/data_1g/part-r-00016.gz.parquet
>
> Each file consists of 1block (avg. block size 15141395 B) and I run the
> almost same code as first.
>
>   val BLOCK_SIZE = 1024 * 1024 * 512 // set to 512MB, hadoop default is
> 128MB
>   sc.hadoopConfiguration.setInt("parquet.block.size", BLOCK_SIZE)
>   sc.hadoopConfiguration.setInt("dfs.blocksize",BLOCK_SIZE)
>   sqlContext.table("data_1g").repartition(1).saveAsTable("partition_test2")
>
> It creates one file.
>
>  231.0 M  /user/hive/warehouse/partition_test2/part-r-1.gz.parquet
>
> But it consists of 2 blocks. It seems dfs.blocksize is not applicable.
>
>   /user/hive/warehouse/partition_test2/part-r-1.gz.parquet 242202143
> bytes, 2 block(s):  OK
>   0. BP-2098986396-192.168.100.1-1389779750403:blk_1080124727_6385839
> len=134217728 repl=2
>   1. BP-2098986396-192.168.100.1-1389779750403:blk_1080124728_6385840
> len=107984415 repl=2
>
> Because of this, Spark read it as 2partition even though I repartition
> data into 1partition. If the file size after repartitioning is a little
> more 128MB and save it again, it writes 2 files like 128Mb, 1MB.
> It is very important for me because I use repartition method many times.
> Please help me figure out.
>
>   Jung


Help with type check

2015-11-30 Thread Eyal Sharon
Hi ,

I have problem with inferring what are the types bug here

I have this code fragment . it parse Json to Array[Double]






*val muCouch = {  val e = input.filter( _.id=="mu")(0).content()  val
b  = e.getArray("feature_mean")  for (i <- 0 to e.getInt("features") )
yield b.getDouble(i)}.toArray*

Now the problem is when I want to create a dense vector  :

*new DenseVector(muCouch)*


I get the following error :


*Error:(111, 21) type mismatch;
 found   : Array[java.lang.Double]
 required: Array[scala.Double] *


Now , I probably get a workaround for that , but I want to get a
deeper understanding  on why it occurs

p.s - I do use collection.JavaConversions._

Thanks !

-- 


*This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are 
addressed. Please note that any disclosure, copying or distribution of the 
content of this information is strictly forbidden. If you have received 
this email message in error, please destroy it immediately and notify its 
sender.*


Re: Spark on yarn vs spark standalone

2015-11-30 Thread Jacek Laskowski
Hi Mark,

I said I've only managed to develop a limited understanding of how
Spark works in the different deploy modes ;-)

But somehow I thought that cluster in spark standalone is not
supported. I think I've seen a JIRA with a change quite recently where
it was said or something similar. Can't find it now :(

Pozdrawiam,
Jacek

--
Jacek Laskowski | https://medium.com/@jaceklaskowski/ |
http://blog.jaceklaskowski.pl
Mastering Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
Follow me at https://twitter.com/jaceklaskowski
Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski


On Mon, Nov 30, 2015 at 6:58 PM, Mark Hamstra  wrote:
> Standalone mode also supports running the driver on a cluster node.  See
> "cluster" mode in
> http://spark.apache.org/docs/latest/spark-standalone.html#launching-spark-applications
> .  Also,
> http://spark.apache.org/docs/latest/spark-standalone.html#high-availability
>
> On Mon, Nov 30, 2015 at 9:47 AM, Jacek Laskowski  wrote:
>>
>> Hi,
>>
>> My understanding of Spark on YARN and even Spark in general is very
>> limited so keep that in mind.
>>
>> I'm not sure why you compare yarn-cluster and spark standalone? In
>> yarn-cluster a driver runs on a node in the YARN cluster while spark
>> standalone keeps the driver on the machine you launched a Spark
>> application. Also, YARN cluster supports retrying applications while
>> standalone doesn't. There's also support for rack locality preference
>> (but dunno if that's used and where in Spark).
>>
>> My limited understanding suggests me to use Spark on YARN if you're
>> considering to use Hadoop/HDFS and submitting jobs using YARN.
>> Standalone's an entry option where throwing in YARN could kill
>> introducing Spark to organizations without Hadoop YARN.
>>
>> Just my two cents.
>>
>> Pozdrawiam,
>> Jacek
>>
>> --
>> Jacek Laskowski | https://medium.com/@jaceklaskowski/ |
>> http://blog.jaceklaskowski.pl
>> Mastering Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
>> Follow me at https://twitter.com/jaceklaskowski
>> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>>
>>
>> On Fri, Nov 27, 2015 at 8:36 AM, cs user  wrote:
>> > Hi All,
>> >
>> > Apologies if this question has been asked before. I'd like to know if
>> > there
>> > are any downsides to running spark over yarn with the --master
>> > yarn-cluster
>> > option vs having a separate spark standalone cluster to execute jobs?
>> >
>> > We're looking at installing a hdfs/hadoop cluster with Ambari and
>> > submitting
>> > jobs to the cluster using yarn, or having an Ambari cluster and a
>> > separate
>> > standalone spark cluster, which will run the spark jobs on data within
>> > hdfs.
>> >
>> > With yarn, will we still get all the benefits of spark?
>> >
>> > Will it be possible to process streaming data?
>> >
>> > Many thanks in advance for any responses.
>> >
>> > Cheers!
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark on yarn vs spark standalone

2015-11-30 Thread Mark Hamstra
Standalone mode also supports running the driver on a cluster node.  See
"cluster" mode in
http://spark.apache.org/docs/latest/spark-standalone.html#launching-spark-applications
.  Also,
http://spark.apache.org/docs/latest/spark-standalone.html#high-availability

On Mon, Nov 30, 2015 at 9:47 AM, Jacek Laskowski  wrote:

> Hi,
>
> My understanding of Spark on YARN and even Spark in general is very
> limited so keep that in mind.
>
> I'm not sure why you compare yarn-cluster and spark standalone? In
> yarn-cluster a driver runs on a node in the YARN cluster while spark
> standalone keeps the driver on the machine you launched a Spark
> application. Also, YARN cluster supports retrying applications while
> standalone doesn't. There's also support for rack locality preference
> (but dunno if that's used and where in Spark).
>
> My limited understanding suggests me to use Spark on YARN if you're
> considering to use Hadoop/HDFS and submitting jobs using YARN.
> Standalone's an entry option where throwing in YARN could kill
> introducing Spark to organizations without Hadoop YARN.
>
> Just my two cents.
>
> Pozdrawiam,
> Jacek
>
> --
> Jacek Laskowski | https://medium.com/@jaceklaskowski/ |
> http://blog.jaceklaskowski.pl
> Mastering Spark https://jaceklaskowski.gitbooks.io/mastering-apache-spark/
> Follow me at https://twitter.com/jaceklaskowski
> Upvote at http://stackoverflow.com/users/1305344/jacek-laskowski
>
>
> On Fri, Nov 27, 2015 at 8:36 AM, cs user  wrote:
> > Hi All,
> >
> > Apologies if this question has been asked before. I'd like to know if
> there
> > are any downsides to running spark over yarn with the --master
> yarn-cluster
> > option vs having a separate spark standalone cluster to execute jobs?
> >
> > We're looking at installing a hdfs/hadoop cluster with Ambari and
> submitting
> > jobs to the cluster using yarn, or having an Ambari cluster and a
> separate
> > standalone spark cluster, which will run the spark jobs on data within
> hdfs.
> >
> > With yarn, will we still get all the benefits of spark?
> >
> > Will it be possible to process streaming data?
> >
> > Many thanks in advance for any responses.
> >
> > Cheers!
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: In yarn-client mode, is it the driver or application master that issue commands to executors?

2015-11-30 Thread Jacek Laskowski
On Fri, Nov 27, 2015 at 12:12 PM, Nisrina Luthfiyati <
nisrina.luthfiy...@gmail.com> wrote:

> Hi all,
> I'm trying to understand how yarn-client mode works and found these two
> diagrams:
>
>
>
>
> In the first diagram, it looks like the driver running in client directly
> communicates with executors to issue application commands, while in the
> second diagram it looks like application commands is sent to application
> master first and then forwarded to executors.
>

My limited understanding tells me that regardless of deploy mode (local,
standalone, YARN or mesos), drivers (using TaskSchedulerImpl) sends
TaskSets to executors once they're launched. YARN and Mesos are only used
until they offer resources (CPU and memory) and once executors start, these
cluster managers are not engaged in the communication (driver and executors
communicate using RPC over netty since 1.6-SNAPSHOT or akka before).

I'd love being corrected if mistaken. Thanks.

Jacek


Re: spark.cleaner.ttl for 1.4.1

2015-11-30 Thread Josh Rosen
AFAIK the ContextCleaner should perform all of the cleaning *as long as
garbage collection is performed frequently enough on the driver*. See
https://issues.apache.org/jira/browse/SPARK-7689 and
https://github.com/apache/spark/pull/6220#issuecomment-102950055 for
discussion of this technicality.

On Mon, Nov 30, 2015 at 8:46 AM Michal Čizmazia  wrote:

> Does *spark.cleaner.ttl *still need to be used for Spark *1.4.1 *long-running
> streaming jobs? Or does *ContextCleaner* alone do all the cleaning?
>


Re: question about combining small parquet files

2015-11-30 Thread Nezih Yigitbasi
This looks interesting, thanks Ruslan. But, compaction with Hive is as
simple as an insert overwrite statement as Hive
supports CombineFileInputFormat, is it possible to do the same with Spark?

On Thu, Nov 26, 2015 at 9:47 AM, Ruslan Dautkhanov 
wrote:

> An interesting compaction approach of small files is discussed recently
>
> http://blog.cloudera.com/blog/2015/11/how-to-ingest-and-query-fast-data-with-impala-without-kudu/
>
>
> AFAIK Spark supports views too.
>
>
> --
> Ruslan Dautkhanov
>
> On Thu, Nov 26, 2015 at 10:43 AM, Nezih Yigitbasi <
> nyigitb...@netflix.com.invalid> wrote:
>
>> Hi Spark people,
>> I have a Hive table that has a lot of small parquet files and I am
>> creating a data frame out of it to do some processing, but since I have a
>> large number of splits/files my job creates a lot of tasks, which I don't
>> want. Basically what I want is the same functionality that Hive provides,
>> that is, to combine these small input splits into larger ones by specifying
>> a max split size setting. Is this currently possible with Spark?
>>
>> I look at coalesce() but with coalesce I can only control the number
>> of output files not their sizes. And since the total input dataset size
>> can vary significantly in my case, I cannot just use a fixed partition
>> count as the size of each output file can get very large. I then looked for
>> getting the total input size from an rdd to come up with some heuristic to
>> set the partition count, but I couldn't find any ways to do it (without
>> modifying the spark source).
>>
>> Any help is appreciated.
>>
>> Thanks,
>> Nezih
>>
>> PS: this email is the same as my previous email as I learned that my
>> previous email ended up as spam for many people since I sent it through
>> nabble, sorry for the double post.
>>
>
>


Re: Kafka Direct does not recover automatically when the Kafka Stream gets messed up?

2015-11-30 Thread swetha kasireddy
Hi Cody,

What if the Offsets that are tracked are not present in Kafka. How do I
skip those offsets and go to the next Offset? Also would specifying
rebalance.backoff.ms be of any help?

Thanks,
Swteha

On Thu, Nov 12, 2015 at 9:07 AM, Cody Koeninger  wrote:

> To be blunt, if you care about being able to recover from weird
> situations, you should be tracking offsets yourself and specifying offsets
> on job start, not relying on checkpoints.
>
> On Tue, Nov 10, 2015 at 3:54 AM, Adrian Tanase  wrote:
>
>> I’ve seen this before during an extreme outage on the cluster, where the
>> kafka offsets checkpointed by the directstreamRdd were bigger than what
>> kafka reported. The checkpoint was therefore corrupted.
>> I don’t know the root cause but since I was stressing the cluster during
>> a reliability test I can only assume that one of the Kafka partitions was
>> restored from an out-of-sync replica and did not contain all the data.
>> Seems extreme but I don’t have another idea.
>>
>> @Cody – do you know of a way to recover from a situation like this? Can
>> someone manually delete folders from the checkpoint folder to help the job
>> recover? E.g. Go 2 steps back, hoping that kafka has those offsets.
>>
>> -adrian
>>
>> From: swetha kasireddy
>> Date: Monday, November 9, 2015 at 10:40 PM
>> To: Cody Koeninger
>> Cc: "user@spark.apache.org"
>> Subject: Re: Kafka Direct does not recover automatically when the Kafka
>> Stream gets messed up?
>>
>> OK. But, one thing that I observed is that when there is a problem with
>> Kafka Stream, unless I delete the checkpoint directory the Streaming job
>> does not restart. I guess it tries to retry the failed tasks and if it's
>> not able to recover, it fails again. Sometimes, it fails with StackOverFlow
>> Error.
>>
>> Why does the Streaming job not restart from checkpoint directory when the
>> job failed earlier with Kafka Brokers getting messed up? We have the
>> checkpoint directory in our hdfs.
>>
>> On Mon, Nov 9, 2015 at 12:34 PM, Cody Koeninger 
>> wrote:
>>
>>> I don't think deleting the checkpoint directory is a good way to restart
>>> the streaming job, you should stop the spark context or at the very least
>>> kill the driver process, then restart.
>>>
>>> On Mon, Nov 9, 2015 at 2:03 PM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
 Hi Cody,

 Our job is our failsafe as we don't have Control over Kafka Stream as
 of now. Can setting rebalance max retries help? We do not have any monitors
 setup as of now. We need to setup the monitors.

 My idea is to to have some kind of Cron job that queries the Streaming
 API for monitoring like every 5 minutes and then send an email alert and
 automatically restart the Streaming job by deleting the Checkpoint
 directory. Would that help?



 Thanks!

 On Mon, Nov 9, 2015 at 11:09 AM, Cody Koeninger 
 wrote:

> The direct stream will fail the task if there is a problem with the
> kafka broker.  Spark will retry failed tasks automatically, which should
> handle broker rebalances that happen in a timely fashion.
> spark.tax.maxFailures controls the maximum number of retries before 
> failing
> the job.  Direct stream isn't any different from any other spark task in
> that regard.
>
> The question of what kind of monitoring you need is more a question
> for your particular infrastructure and what you're already using for
> monitoring.  We put all metrics (application level or system level) into
> graphite and alert from there.
>
> I will say that if you've regularly got problems with kafka falling
> over for half an hour, I'd look at fixing that before worrying about spark
> monitoring...
>
>
> On Mon, Nov 9, 2015 at 12:26 PM, swetha 
> wrote:
>
>> Hi,
>>
>> How to recover Kafka Direct automatically when the there is a problem
>> with
>> Kafka brokers? Sometimes our Kafka Brokers gets messed up and the
>> entire
>> Streaming job blows up unlike some other consumers which do recover
>> automatically. How can I make sure that Kafka Direct recovers
>> automatically
>> when the broker fails for sometime say 30 minutes? What kind of
>> monitors
>> should be in place to recover the job?
>>
>> Thanks,
>> Swetha
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-Direct-does-not-recover-automatically-when-the-Kafka-Stream-gets-messed-up-tp25331.html
>> Sent from the Apache Spark User List mailing list archive at
>> Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, 

Re: Kafka Direct does not recover automatically when the Kafka Stream gets messed up?

2015-11-30 Thread Cody Koeninger
You'd need to get the earliest or latest available offsets from kafka,
whichever is most appropriate for your situation.

The KafkaRDD will use the value of refresh.leader.backoff.ms, so you can
try adjusting that to get a longer sleep before retrying the task.

On Mon, Nov 30, 2015 at 1:50 PM, swetha kasireddy  wrote:

> Hi Cody,
>
> What if the Offsets that are tracked are not present in Kafka. How do I
> skip those offsets and go to the next Offset? Also would specifying
> rebalance.backoff.ms be of any help?
>
> Thanks,
> Swteha
>
> On Thu, Nov 12, 2015 at 9:07 AM, Cody Koeninger 
> wrote:
>
>> To be blunt, if you care about being able to recover from weird
>> situations, you should be tracking offsets yourself and specifying offsets
>> on job start, not relying on checkpoints.
>>
>> On Tue, Nov 10, 2015 at 3:54 AM, Adrian Tanase  wrote:
>>
>>> I’ve seen this before during an extreme outage on the cluster, where the
>>> kafka offsets checkpointed by the directstreamRdd were bigger than what
>>> kafka reported. The checkpoint was therefore corrupted.
>>> I don’t know the root cause but since I was stressing the cluster during
>>> a reliability test I can only assume that one of the Kafka partitions was
>>> restored from an out-of-sync replica and did not contain all the data.
>>> Seems extreme but I don’t have another idea.
>>>
>>> @Cody – do you know of a way to recover from a situation like this? Can
>>> someone manually delete folders from the checkpoint folder to help the job
>>> recover? E.g. Go 2 steps back, hoping that kafka has those offsets.
>>>
>>> -adrian
>>>
>>> From: swetha kasireddy
>>> Date: Monday, November 9, 2015 at 10:40 PM
>>> To: Cody Koeninger
>>> Cc: "user@spark.apache.org"
>>> Subject: Re: Kafka Direct does not recover automatically when the Kafka
>>> Stream gets messed up?
>>>
>>> OK. But, one thing that I observed is that when there is a problem with
>>> Kafka Stream, unless I delete the checkpoint directory the Streaming job
>>> does not restart. I guess it tries to retry the failed tasks and if it's
>>> not able to recover, it fails again. Sometimes, it fails with StackOverFlow
>>> Error.
>>>
>>> Why does the Streaming job not restart from checkpoint directory when
>>> the job failed earlier with Kafka Brokers getting messed up? We have the
>>> checkpoint directory in our hdfs.
>>>
>>> On Mon, Nov 9, 2015 at 12:34 PM, Cody Koeninger 
>>> wrote:
>>>
 I don't think deleting the checkpoint directory is a good way to
 restart the streaming job, you should stop the spark context or at the very
 least kill the driver process, then restart.

 On Mon, Nov 9, 2015 at 2:03 PM, swetha kasireddy <
 swethakasire...@gmail.com> wrote:

> Hi Cody,
>
> Our job is our failsafe as we don't have Control over Kafka Stream as
> of now. Can setting rebalance max retries help? We do not have any 
> monitors
> setup as of now. We need to setup the monitors.
>
> My idea is to to have some kind of Cron job that queries the Streaming
> API for monitoring like every 5 minutes and then send an email alert and
> automatically restart the Streaming job by deleting the Checkpoint
> directory. Would that help?
>
>
>
> Thanks!
>
> On Mon, Nov 9, 2015 at 11:09 AM, Cody Koeninger 
> wrote:
>
>> The direct stream will fail the task if there is a problem with the
>> kafka broker.  Spark will retry failed tasks automatically, which should
>> handle broker rebalances that happen in a timely fashion.
>> spark.tax.maxFailures controls the maximum number of retries before 
>> failing
>> the job.  Direct stream isn't any different from any other spark task in
>> that regard.
>>
>> The question of what kind of monitoring you need is more a question
>> for your particular infrastructure and what you're already using for
>> monitoring.  We put all metrics (application level or system level) into
>> graphite and alert from there.
>>
>> I will say that if you've regularly got problems with kafka falling
>> over for half an hour, I'd look at fixing that before worrying about 
>> spark
>> monitoring...
>>
>>
>> On Mon, Nov 9, 2015 at 12:26 PM, swetha 
>> wrote:
>>
>>> Hi,
>>>
>>> How to recover Kafka Direct automatically when the there is a
>>> problem with
>>> Kafka brokers? Sometimes our Kafka Brokers gets messed up and the
>>> entire
>>> Streaming job blows up unlike some other consumers which do recover
>>> automatically. How can I make sure that Kafka Direct recovers
>>> automatically
>>> when the broker fails for sometime say 30 minutes? What kind of
>>> monitors
>>> should be in place to recover the job?
>>>
>>> Thanks,
>>> Swetha
>>>

spark rdd grouping

2015-11-30 Thread Rajat Kumar
Hi

i have a javaPairRdd rdd1. i want to group by rdd1 by keys but
preserve the partitions of original rdd only to avoid shuffle since I know
all same keys are already in same partition.

PairRdd is basically constrcuted using kafka streaming low level consumer
which have all records with same key already in same partition. Can i group
them together with avoid shuffle.

Thanks


Obtaining Job Id for query submitted via Spark Thrift Server

2015-11-30 Thread Jagrut Sharma
Is there a way to get the Job Id for a query submitted via the Spark Thrift
Server? This would allow checking the status of that specific job via the
History Server.

Currently, I'm getting status of all jobs, and then filtering the results.
Looking for a more efficient approach.

Test environment is: Spark 1.4.1, Hive 0.13.1, running on YARN

Thanks!
--
Jagrut



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Obtaining-Job-Id-for-query-submitted-via-Spark-Thrift-Server-tp25523.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5.2 + Hive 1.0.0 in Amazon EMR 4.2.0

2015-11-30 Thread Ted Yu
Please see the following which went to Hive 1.1 :
HIVE-8839 Support "alter table .. add/replace columns cascade"

FYI

On Mon, Nov 30, 2015 at 2:14 PM, Daniel Lopes 
wrote:

> Hi,
>
> I get this error when trying to write Spark DataFrame to Hive Table Stored
> as TextFile
>
>
> sqlContext.sql('INSERT OVERWRITE TABLE analytics.client_view_stock *(hive
> table)* SELECT * FROM client_view_stock'*(spark temp table)*')
>
> Erro:
>
> 15/11/30 21:40:14 INFO latency: StatusCode=[404],
> Exception=[com.amazonaws.services.s3.model.AmazonS3Exception: Not Found
> (Service: Amazon S3; Status Code: 404; Error Code: 404 Not Found; Request
> ID: 5ADBECA2D82A7C17), S3 Extended Request ID:
> RcPfjgWaeXG62xyVRrAr91sVQNxktqbXUPJgK2cvZlf6SKEAOnWCtV9X9K1Vp9dAyDhGALQRBcU=],
> ServiceName=*[Amazon S3], AWSErrorCode=[404 Not Found]*,
> AWSRequestID=[5ADBECA2D82A7C17], ServiceEndpoint=[
> https://my-bucket.s3.amazonaws.com], Exception=1,
> HttpClientPoolLeasedCount=0, RequestCount=1, HttpClientPoolPendingCount=0,
> HttpClientPoolAvailableCount=1, ClientExecuteTime=[214.69],
> HttpRequestTime=[214.245], HttpClientReceiveResponseTime=[212.513],
> RequestSigningTime=[0.16], HttpClientSendRequestTime=[0.112],
> 15/11/30 21:40:21 INFO Hive: Replacing 
> src:s3://my-bucket/output/2015/11/29/client_view_stock/.hive-staging_hive_2015-11-30_21-19-48_942_238078420083598647-1/-ext-1/part-00199,
> dest: s3://my-bucket/output/2015/11/29/client_view_stock/part-00199,
> Status:true
> -chgrp: '' does not match expected pattern for group
> Usage: hadoop fs [generic options] -chgrp [-R] GROUP PATH...
> 15/11/30 21:40:21 INFO latency: StatusCode=[200], ServiceName=[Amazon S3],
> AWSRequestID=[2509AE55A8D71A61], ServiceEndpoint=[https://my-bucket.
> s3.amazonaws.com], HttpClientPoolLeasedCount=0, RequestCount=1,
> HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=1,
> ClientExecuteTime=[137.387], HttpRequestTime=[136.721],
> HttpClientReceiveResponseTime=[134.805], RequestSigningTime=[0.235],
> ResponseProcessingTime=[0.169], HttpClientSendRequestTime=[0.145],
> 15/11/30 21:40:21 WARN RetryingMetaStoreClient: MetaStoreClient lost
> connection. Attempting to reconnect.
> org.apache.thrift.TApplication*Exception: Invalid method name:
> 'alter_table_with_cascade'*
>
> Thanks!
>
> --
> *Daniel Lopes, B.Eng*
> Data Scientist - BankFacil
> CREA/SP 5069410560
> 
> Mob +55 (18) 99764-2733 
> Ph +55 (11) 3522-8009
> http://about.me/dannyeuu
>
> Av. Nova Independência, 956, São Paulo, SP
> Bairro Brooklin Paulista
> CEP 04570-001
> https://www.bankfacil.com.br
>
>


Re: Grid search with Random Forest

2015-11-30 Thread Joseph Bradley
It should work with 1.5+.

On Thu, Nov 26, 2015 at 12:53 PM, Ndjido Ardo Bar  wrote:

>
> Hi folks,
>
> Does anyone know whether the Grid Search capability is enabled since the
> issue spark-9011 of version 1.4.0 ? I'm getting the "rawPredictionCol
> column doesn't exist" when trying to perform a grid search with Spark 1.4.0.
>
> Cheers,
> Ardo
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: capture video with spark streaming

2015-11-30 Thread Young, Matthew T
Unless it’s a network camera with the ability to request specific frame numbers 
for read, the answer is that you will just read from the camera like you 
normally would without Spark inside of a foreachrdd() and parallelize the 
result out for processing once you have it in a collection in the driver’s 
memory.

If the camera expects you to read continuously you will need to implement a 
custom receiver to constantly read from the camera and buffer the data until 
the next batch comes around.


From: Lavallen Pablo [mailto:intoe...@yahoo.com.ar]
Sent: Monday, November 30, 2015 5:07 PM
To: User 
Subject: capture video with spark streaming


Hello! Can anyone guide me please,  on how to capture video from a camera with 
spark streaming ? any article or book to read to recommend me ?

thank you





Spark Streaming Specify Kafka Partition

2015-11-30 Thread Alan Braithwaite
Is there any mechanism in the kafka streaming source to specify the exact
partition id that we want a streaming job to consume from?

If not, is there a workaround besides writing our a custom receiver?

Thanks,
- Alan


ORC predicate pushdown in HQL

2015-11-30 Thread Tejas Patil
Hi,
I am trying to use predicate pushdown in ORC and I was expecting that it
would be used when one tries to query an ORC table in Hive. But based on
the query plan generated, I don't see that happening. I am missing some
configurations or this is not supported at all.

PS: I have tried the following over 1.5.1 and even 1.6 release branch.

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.setConf("spark.sql.orc.filterPushdown", "true")
// orc_table is a hive table
val query = hiveContext.sql("SELECT COUNT(*) FROM orc_table WHERE key > 10")

scala> query.explain
== Physical Plan ==
TungstenAggregate(key=[],
functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#41L])
 TungstenExchange SinglePartition
  TungstenAggregate(key=[],
functions=[(count(1),mode=Partial,isDistinct=false)],
output=[currentCount#44L])
   Project
Filter (key#39 > 10)
 HiveTableScan [key#39], (MetastoreRelation default, orc_table, None)


Thanks,
Tejas


Re: ORC predicate pushdown in HQL

2015-11-30 Thread Ted Yu
This is related:
SPARK-11087

FYI

On Mon, Nov 30, 2015 at 5:56 PM, Tejas Patil 
wrote:

> Hi,
> I am trying to use predicate pushdown in ORC and I was expecting that it
> would be used when one tries to query an ORC table in Hive. But based on
> the query plan generated, I don't see that happening. I am missing some
> configurations or this is not supported at all.
>
> PS: I have tried the following over 1.5.1 and even 1.6 release branch.
>
> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
> hiveContext.setConf("spark.sql.orc.filterPushdown", "true")
> // orc_table is a hive table
> val query = hiveContext.sql("SELECT COUNT(*) FROM orc_table WHERE key >
> 10")
>
> scala> query.explain
> == Physical Plan ==
> TungstenAggregate(key=[],
> functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#41L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[],
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[currentCount#44L])
>Project
> Filter (key#39 > 10)
>  HiveTableScan [key#39], (MetastoreRelation default, orc_table, None)
>
>
> Thanks,
> Tejas
>
>


Re: Grid search with Random Forest

2015-11-30 Thread Ndjido Ardo BAR
Hi Joseph,

Yes Random Forest support Grid Search on Spark 1.5.+ . But I'm getting a
"rawPredictionCol field does not exist exception" on Spark 1.5.2 for
Gradient Boosting Trees classifier.


Ardo
On Tue, 1 Dec 2015 at 01:34, Joseph Bradley  wrote:

> It should work with 1.5+.
>
> On Thu, Nov 26, 2015 at 12:53 PM, Ndjido Ardo Bar 
> wrote:
>
>>
>> Hi folks,
>>
>> Does anyone know whether the Grid Search capability is enabled since the
>> issue spark-9011 of version 1.4.0 ? I'm getting the "rawPredictionCol
>> column doesn't exist" when trying to perform a grid search with Spark 1.4.0.
>>
>> Cheers,
>> Ardo
>>
>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: dfs.blocksize is not applicable to some cases

2015-11-30 Thread Jung
Yes, I can reproduce it in Spark 1.5.2.
This is the results.

1. first case(1block)
  221.1 M  
/user/hive/warehouse/partition_test/part-r-0-b0e5ecd3-75a3-4c92-94ec-59353d08067a.gz.parquet
  221.1 M  
/user/hive/warehouse/partition_test/part-r-1-b0e5ecd3-75a3-4c92-94ec-59353d08067a.gz.parquet
  221.1 M  
/user/hive/warehouse/partition_test/part-r-2-b0e5ecd3-75a3-4c92-94ec-59353d08067a.gz.parquet

  
/user/hive/warehouse/partition_test/part-r-0-b0e5ecd3-75a3-4c92-94ec-59353d08067a.gz.parquet
 231863863 bytes, 1 block(s):  OK

2. second case(2blocks)
   231.0 M  
/user/hive/warehouse/partition_test2/part-r-0-b7486a52-cfb9-4db0-8d94-377c039026ef.gz.parquet
  
  
/user/hive/warehouse/partition_test2/part-r-0-b7486a52-cfb9-4db0-8d94-377c039026ef.gz.parquet
 242201812 bytes, 2 block(s):  OK

In terms of PARQUET-166, I think it only discusses row group performance. 
Should I set dfs.blocksize to a little bit more than parquet.block.size? 

Thanks

-Original Message-
From: "Ted Yu" 
To: "Jung"; 
Cc: "user"; 
Sent: 2015-12-01 (화) 03:09:58
Subject: Re: dfs.blocksize is not applicable to some cases
 
I am not expert in Parquet. Looking at PARQUET-166, it seems that 
parquet.block.size should be lower than dfs.blocksize Have you tried Spark 
1.5.2 to see if the problem persists ? Cheers
On Mon, Nov 30, 2015 at 1:55 AM, Jung  wrote:
Hello,
I use Spark 1.4.1 and Hadoop 2.2.0.
It may be a stupid question but I cannot understand why "dfs.blocksize" in 
hadoop option doesn't affect the number of blocks sometimes.
When I run the script below,

  val BLOCK_SIZE = 1024 * 1024 * 512 // set to 512MB, hadoop default is 128MB
  sc.hadoopConfiguration.setInt("parquet.block.size", BLOCK_SIZE)
  sc.hadoopConfiguration.setInt("dfs.blocksize",BLOCK_SIZE)
  sc.parallelize(1 to 5, 
24).repartition(3).toDF.saveAsTable("partition_test")

it creates 3 files like this.

  221.1 M  /user/hive/warehouse/partition_test/part-r-1.gz.parquet
  221.1 M  /user/hive/warehouse/partition_test/part-r-2.gz.parquet
  221.1 M  /user/hive/warehouse/partition_test/part-r-3.gz.parquet

To check how many blocks in a file, I enter the command "hdfs fsck 
/user/hive/warehouse/partition_test/part-r-1.gz.parquet -files -blocks".

  Total blocks (validated):  1 (avg. block size 231864402 B)

It is normal case because maximum blocksize change from 128MB to 512MB.
In the real world, I have a bunch of files.

  14.4 M  /user/hive/warehouse/data_1g/part-r-1.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-2.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-3.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-4.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-5.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-6.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-7.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-8.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-9.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00010.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00011.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00012.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00013.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00014.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00015.gz.parquet
  14.4 M  /user/hive/warehouse/data_1g/part-r-00016.gz.parquet

Each file consists of 1block (avg. block size 15141395 B) and I run the almost 
same code as first.

  val BLOCK_SIZE = 1024 * 1024 * 512 // set to 512MB, hadoop default is 128MB
  sc.hadoopConfiguration.setInt("parquet.block.size", BLOCK_SIZE)
  sc.hadoopConfiguration.setInt("dfs.blocksize",BLOCK_SIZE)
  sqlContext.table("data_1g").repartition(1).saveAsTable("partition_test2")

It creates one file.

 231.0 M  /user/hive/warehouse/partition_test2/part-r-1.gz.parquet

But it consists of 2 blocks. It seems dfs.blocksize is not applicable.

  /user/hive/warehouse/partition_test2/part-r-1.gz.parquet 242202143 bytes, 
2 block(s):  OK
  0. BP-2098986396-192.168.100.1-1389779750403:blk_1080124727_6385839 
len=134217728 repl=2
  1. BP-2098986396-192.168.100.1-1389779750403:blk_1080124728_6385840 
len=107984415 repl=2

Because of this, Spark read it as 2partition even though I repartition data 
into 1partition. If the file size after repartitioning is a little more 128MB 
and save it again, it writes 2 files like 128Mb, 1MB.
It is very important for me because I use repartition method many times. Please 
help me figure out.

  Jung 

Recovery for Spark Streaming Kafka Direct in case of issues with Kafka

2015-11-30 Thread SRK
Hi,

So, our Streaming Job fails with the following errors. If you see the errors
below, they are all related to Kafka losing offsets and
OffsetOutOfRangeException.

What are the options we have other than fixing Kafka? We would like to do
something like the following. How can we achieve 1 and 2 with Spark Kafka
Direct?

1.Need to see a way to skip some offsets if they are not available after the
max retries are reached..in that case there might be data loss.

2.Catch that exception and somehow force things to "reset" for that
partition And how would it handle the offsets already calculated in the
backlog (if there is one)?

3.Track the offsets separately, restart the job by providing the offsets.

4.Or a straightforward approach would be to monitor the log for this error,
and if it occurs more than X times, kill the job, remove the checkpoint
directory, and restart.

ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([test_stream,5]))



java.lang.ClassNotFoundException:
kafka.common.NotLeaderForPartitionException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)



java.util.concurrent.RejectedExecutionException: Task
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
rejected from java.util.concurrent.ThreadPoolExecutor@543258e0[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
12112]



org.apache.spark.SparkException: Job aborted due to stage failure: Task 10
in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in stage
52.0 (TID 255, 172.16.97.97): UnknownReason

Exception in thread "streaming-job-executor-0" java.lang.Error:
java.lang.InterruptedException

Caused by: java.lang.InterruptedException

java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)



org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in
stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage 33.0
(TID 283, 172.16.97.103): UnknownReason

java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)

java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka Direct does not recover automatically when the Kafka Stream gets messed up?

2015-11-30 Thread swetha kasireddy
So, our Streaming Job fails with the following errors. If you see the
errors(highlighted in blue below), they are all related to Kafka losing
offsets and OffsetOutOfRangeException.

What are the options we have other than fixing Kafka? We would like to do
something like the following. How can we achieve 1 and 2 with Spark Kafka
Direct?

1.Need to see a way to skip some offsets if they are not available after
the max retries are reached..in that case there might be data loss.

2.Catch that exception and somehow force things to "reset" for that
partition And how would it handle the offsets already calculated in the
backlog (if there is one)?

3.Track the offsets separately, restart the job by providing the offsets.

4.Or a straightforward approach would be to monitor the log for this error,
and if it occurs more than X times, kill the job, remove the checkpoint
directory, and restart.

ERROR DirectKafkaInputDStream: ArrayBuffer(kafka.common.UnknownException,
org.apache.spark.SparkException: Couldn't find leader offsets for
Set([test_stream,5]))


java.lang.ClassNotFoundException:
kafka.common.NotLeaderForPartitionException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)


java.util.concurrent.RejectedExecutionException: Task
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
rejected from java.util.concurrent.ThreadPoolExecutor@543258e0[Terminated,
pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
12112]


org.apache.spark.SparkException: Job aborted due to stage failure: Task 10
in stage 52.0 failed 4 times, most recent failure: Lost task 10.3 in stage
52.0 (TID 255, 172.16.97.97): UnknownReason

Exception in thread "streaming-job-executor-0" java.lang.Error:
java.lang.InterruptedException

Caused by: java.lang.InterruptedException

java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)


org.apache.spark.SparkException: Job aborted due to stage failure: Task 7
in stage 33.0 failed 4 times, most recent failure: Lost task 7.3 in stage
33.0 (TID 283, 172.16.97.103): UnknownReason

java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)

java.lang.ClassNotFoundException: kafka.common.OffsetOutOfRangeException

at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)








On Mon, Nov 30, 2015 at 12:23 PM, Cody Koeninger  wrote:

> You'd need to get the earliest or latest available offsets from kafka,
> whichever is most appropriate for your situation.
>
> The KafkaRDD will use the value of refresh.leader.backoff.ms, so you can
> try adjusting that to get a longer sleep before retrying the task.
>
> On Mon, Nov 30, 2015 at 1:50 PM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Hi Cody,
>>
>> What if the Offsets that are tracked are not present in Kafka. How do I
>> skip those offsets and go to the next Offset? Also would specifying
>> rebalance.backoff.ms be of any help?
>>
>> Thanks,
>> Swteha
>>
>> On Thu, Nov 12, 2015 at 9:07 AM, Cody Koeninger 
>> wrote:
>>
>>> To be blunt, if you care about being able to recover from weird
>>> situations, you should be tracking offsets yourself and specifying offsets
>>> on job start, not relying on checkpoints.
>>>
>>> On Tue, Nov 10, 2015 at 3:54 AM, Adrian Tanase 
>>> wrote:
>>>
 I’ve seen this before during an extreme outage on the cluster, where
 the kafka offsets checkpointed by the directstreamRdd were bigger than what
 kafka reported. The checkpoint was therefore corrupted.
 I don’t know the root cause but since I was stressing the cluster
 during a reliability test I can only assume that one of the Kafka
 partitions was restored from an out-of-sync replica and did not contain all
 the data. Seems extreme but I don’t have another idea.

 @Cody – do you know of a way to recover from a situation like this? Can
 someone manually delete folders from the checkpoint folder to help the job
 recover? E.g. Go 2 steps back, hoping that kafka has those offsets.

 -adrian

 From: swetha kasireddy
 Date: Monday, November 9, 2015 at 10:40 PM
 To: Cody Koeninger
 Cc: "user@spark.apache.org"
 Subject: Re: Kafka Direct does not recover automatically when the
 Kafka Stream gets messed up?

 OK. But, one thing that I observed is that when there is a problem with
 Kafka Stream, unless I delete the checkpoint directory the Streaming job
 does not restart. I guess it tries to retry the failed tasks and if it's
 not able to recover, it fails again. Sometimes, it fails with StackOverFlow
 Error.

 Why does the Streaming job not restart from checkpoint directory when
 the job failed earlier with Kafka Brokers getting messed up? We have the

capture video with spark streaming

2015-11-30 Thread Lavallen Pablo
Hello! Can anyone guide me please,  on how to capture video from a camera with 
spark streaming ? any article or book to read to recommend me ?
thank you



Spark streaming job hangs

2015-11-30 Thread Cassa L
Hi,
 I am reading data from Kafka into spark. It runs fine for sometime but
then hangs forever with following output. I don't see and errors in logs.
How do I debug this?

2015-12-01 06:04:30,697 [dag-scheduler-event-loop] INFO  (Logging.scala:59)
- Adding task set 19.0 with 4 tasks
2015-12-01 06:04:30,872 [pool-13-thread-1] INFO  (Logging.scala:59) -
Disconnected from Cassandra cluster: APG DEV Cluster
2015-12-01 06:04:35,060 [JobGenerator] INFO  (Logging.scala:59) - Added
jobs for time 1448949875000 ms
2015-12-01 06:04:40,054 [JobGenerator] INFO  (Logging.scala:59) - Added
jobs for time 144894988 ms
2015-12-01 06:04:45,034 [JobGenerator] INFO  (Logging.scala:59) - Added
jobs for time 1448949885000 ms
2015-12-01 06:04:50,100 [JobGenerator] INFO  (Logging.scala:59) - Added
jobs for time 144894989 ms
2015-12-01 06:04:55,064 [JobGenerator] INFO  (Logging.scala:59) - Added
jobs for time 1448949895000 ms
2015-12-01 06:05:00,125 [JobGenerator] INFO  (Logging.scala:59) - Added
jobs for time 144894990 ms


Thanks
LCassa


Re: question about combining small parquet files

2015-11-30 Thread Sabarish Sasidharan
You could use the number of input files to determine the number of output
partitions. This assumes your input file sizes are deterministic.

Else, you could also persist the RDD and then determine it's size using the
apis.

Regards
Sab
On 26-Nov-2015 11:13 pm, "Nezih Yigitbasi" 
wrote:

> Hi Spark people,
> I have a Hive table that has a lot of small parquet files and I am
> creating a data frame out of it to do some processing, but since I have a
> large number of splits/files my job creates a lot of tasks, which I don't
> want. Basically what I want is the same functionality that Hive provides,
> that is, to combine these small input splits into larger ones by specifying
> a max split size setting. Is this currently possible with Spark?
>
> I look at coalesce() but with coalesce I can only control the number
> of output files not their sizes. And since the total input dataset size
> can vary significantly in my case, I cannot just use a fixed partition
> count as the size of each output file can get very large. I then looked for
> getting the total input size from an rdd to come up with some heuristic to
> set the partition count, but I couldn't find any ways to do it (without
> modifying the spark source).
>
> Any help is appreciated.
>
> Thanks,
> Nezih
>
> PS: this email is the same as my previous email as I learned that my
> previous email ended up as spam for many people since I sent it through
> nabble, sorry for the double post.
>


SparkPi running slower with more cores on each worker

2015-11-30 Thread yiskylee
Hi, I was running the SparkPi example code and studying their performance
difference using different number of cores per worker. I change the number
of cores by using start-slave.sh -c CORES on the worker machine for
distributed computation. I also use spark-submit --master local[CORES] for
the same effect on local machine. The following table shows the preliminary
timings (in seconds) of the SparkPi running locally and on multiple nodes
(one worker per node). 


 

The result is really interesting as it shows that performance is better if
we have more workers, but it gets worse if we are using more cores. This
confuses me and the only answer I can think of right now is the lack of
multi-threading support for reudce() function used in the code.
Any input is appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkPi-running-slower-with-more-cores-on-each-worker-tp25526.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Grid search with Random Forest

2015-11-30 Thread Benjamin Fradet
Hi Ndjido,

This is because GBTClassifier doesn't yet have a rawPredictionCol like the.
RandomForestClassifier has.
Cf:
http://spark.apache.org/docs/latest/ml-ensembles.html#output-columns-predictions-1
On 1 Dec 2015 3:57 a.m., "Ndjido Ardo BAR"  wrote:

> Hi Joseph,
>
> Yes Random Forest support Grid Search on Spark 1.5.+ . But I'm getting a
> "rawPredictionCol field does not exist exception" on Spark 1.5.2 for
> Gradient Boosting Trees classifier.
>
>
> Ardo
> On Tue, 1 Dec 2015 at 01:34, Joseph Bradley  wrote:
>
>> It should work with 1.5+.
>>
>> On Thu, Nov 26, 2015 at 12:53 PM, Ndjido Ardo Bar 
>> wrote:
>>
>>>
>>> Hi folks,
>>>
>>> Does anyone know whether the Grid Search capability is enabled since the
>>> issue spark-9011 of version 1.4.0 ? I'm getting the "rawPredictionCol
>>> column doesn't exist" when trying to perform a grid search with Spark 1.4.0.
>>>
>>> Cheers,
>>> Ardo
>>>
>>>
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


Re: Grid search with Random Forest

2015-11-30 Thread Ndjido Ardo BAR
Hi Benjamin,

Thanks, the documentation you sent is clear.
Is there any other way to perform a Grid Search with GBT?


Ndjido
On Tue, 1 Dec 2015 at 08:32, Benjamin Fradet 
wrote:

> Hi Ndjido,
>
> This is because GBTClassifier doesn't yet have a rawPredictionCol like
> the. RandomForestClassifier has.
> Cf:
> http://spark.apache.org/docs/latest/ml-ensembles.html#output-columns-predictions-1
> On 1 Dec 2015 3:57 a.m., "Ndjido Ardo BAR"  wrote:
>
>> Hi Joseph,
>>
>> Yes Random Forest support Grid Search on Spark 1.5.+ . But I'm getting a
>> "rawPredictionCol field does not exist exception" on Spark 1.5.2 for
>> Gradient Boosting Trees classifier.
>>
>>
>> Ardo
>> On Tue, 1 Dec 2015 at 01:34, Joseph Bradley 
>> wrote:
>>
>>> It should work with 1.5+.
>>>
>>> On Thu, Nov 26, 2015 at 12:53 PM, Ndjido Ardo Bar 
>>> wrote:
>>>

 Hi folks,

 Does anyone know whether the Grid Search capability is enabled since
 the issue spark-9011 of version 1.4.0 ? I'm getting the "rawPredictionCol
 column doesn't exist" when trying to perform a grid search with Spark 
 1.4.0.

 Cheers,
 Ardo




 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>


Re: Unable to use "Batch Start Time" on worker nodes.

2015-11-30 Thread Abhishek Anand
Thanks TD !!

I think this should solve my purpose.




On Sun, Nov 29, 2015 at 6:17 PM, Tathagata Das  wrote:

> You can get the batch start (the expected, not the exact time when the
> jobs are submitted) from DStream operation "transform". There is a version
> of transform that allows you specify a function with two params - the
> parent RDD and the batch time at which the RDD was generated.
>
> TD
>
> On Thu, Nov 26, 2015 at 1:33 PM, Abhishek Anand 
> wrote:
>
>> Hi ,
>>
>> I need to use batch start time in my spark streaming job.
>>
>> I need the value of batch start time inside one of the functions that is
>> called within a flatmap function in java.
>>
>> Please suggest me how this can be done.
>>
>> I tried to use the StreamingListener class and set the value of a
>> variable inside the onBatchSubmitted function something like this :
>>
>> public void onBatchSubmitted(StreamingListenerBatchSubmitted
>> batchSubmitted) { batchstarttime =
>> batchSubmitted.batchInfo().batchTime().milliseconds();
>>   CommandLineArguments.BATCH_START_TIME = batchstarttime;
>>  }
>>
>>
>> But, the issue is that the BATCH_START_TIME set only when the batch
>> starts. I see in the worker logs that BATCH_START_TIME takes the default
>> value and is not set.
>>
>>
>> Please suggest how this can be achieved.
>>
>>
>>
>> BR,
>> Abhi
>>
>
>


Re: How to work with a joined rdd in pyspark?

2015-11-30 Thread arnalone
ahhh I get it thx!!
I did not know that we can use "double index" 
I used x[0] to point on shows, x[1][0] to point on channels x[1][1] to point
on views.

I feel terribly noob.
Thank you all :)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-work-with-a-joined-rdd-in-pyspark-tp25510p25519.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Persisting closed sessions to external store inside updateStateByKey

2015-11-30 Thread Anthony Brew
Hi,
 I'm working on storing effectively what is a session that receives its
close event using spark streaming, by using updateStateByKey.

private static Function2

 COLLECTED_SESSION = (newItems, current) -> {

SessionUpdate returnValue = current.orNull();

for (SessionUpdate i : newItems) {
 returnValue = (returnValue == null)?returnValue:returnValue.
combine(i);
}

   if(returnValue.isClosed()){
   *// the question is should long term persistence happen here?*
*   // ie. someDataStore.store(ans);*
   return null;
   }else{
  return Optional.of(returnValue);
   }
};

For some reason this feels wrong, but it could just be I haven't
encountered this way of thinking before. Any pointers/confirmation of what
the best approach is to this would be great.

Thanks a Million,
Anthony
Phone: 087 - 9179799
Quidquid latine dictum sit, altum sonatur


Spark DStream Data stored out of order in Cassandra

2015-11-30 Thread Prateek .
Hi,

I have an time critical spark application, which is taking sensor data  from 
kafka stream, storing in case class, applying transformations and then storing 
in cassandra schema. The data needs to be stored in schema, in FIFO order.

The order is maintained at kafka queue but I am observing, out of order data in 
Cassandra schema. Does Spark Streaming provide any functionality to retain 
order. Or do we need do implement some sorting based on timestamp of arrival.


Regards,
Prateek
"DISCLAIMER: This message is proprietary to Aricent and is intended solely for 
the use of the individual to whom it is addressed. It may contain privileged or 
confidential information and should not be circulated or used for any purpose 
other than for what it is intended. If you have received this message in error, 
please notify the originator immediately. If you are not the intended 
recipient, you are notified that you are strictly prohibited from using, 
copying, altering, or disclosing the contents of this message. Aricent accepts 
no responsibility for loss or damage arising from the use of the information 
transmitted by this email including damage from virus."


No documentation for how to write custom Transformer in ml pipeline ?

2015-11-30 Thread Jeff Zhang
Although writing a custom UnaryTransformer is not difficult, but writing a
non-UnaryTransformer is a little tricky (have to check the source code).
And I don't find any document about how to write custom Transformer in ml
pipeline, but writing custom Transformer is a very basic requirement. Is
this because the interface is still unstable now ?


-- 
Best Regards

Jeff Zhang


Re: PySpark failing on a mid-sized broadcast

2015-11-30 Thread ameyc
BTW, my spark.python.worker.reuse setting is set to "true".



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/PySpark-failing-on-a-mid-sized-broadcast-tp25520p25521.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark DStream Data stored out of order in Cassandra

2015-11-30 Thread Gerard Maas
Spark Streaming will consumer and process data in parallel. So the order of
the output will depend not only on the order of the input but also in the
time it takes for each task to process. Different options, like
repartitions, sorts and shuffles at Spark level will also affect ordering,
so the best way would be to rely on the scheme in Cassandra to ensure the
ordering expected by the application.

What is the schema you're using at the Cassandra side?  And how is the data
going to be queried?   That last question should drive the required
ordering.

-kr, Gerard.

On Mon, Nov 30, 2015 at 12:37 PM, Prateek .  wrote:

> Hi,
>
>
>
> I have an time critical spark application, which is taking sensor data
> from kafka stream, storing in case class, applying transformations and then
> storing in cassandra schema. The data needs to be stored in schema, in FIFO
> order.
>
>
>
> The order is maintained at kafka queue but I am observing, out of order
> data in Cassandra schema. Does Spark Streaming provide any functionality to
> retain order. Or do we need do implement some sorting based on timestamp of
> arrival.
>
>
>
>
>
> Regards,
>
> Prateek
> "DISCLAIMER: This message is proprietary to Aricent and is intended solely
> for the use of the individual to whom it is addressed. It may contain
> privileged or confidential information and should not be circulated or used
> for any purpose other than for what it is intended. If you have received
> this message in error, please notify the originator immediately. If you are
> not the intended recipient, you are notified that you are strictly
> prohibited from using, copying, altering, or disclosing the contents of
> this message. Aricent accepts no responsibility for loss or damage arising
> from the use of the information transmitted by this email including damage
> from virus."
>