Unsubscribe

2017-08-07 Thread sowmya ramesh
Unsubscribe


Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread shyla deshpande
Thanks TD.

On Mon, Aug 7, 2017 at 8:59 PM, Tathagata Das 
wrote:

> I dont think there is any easier way.
>
> On Mon, Aug 7, 2017 at 7:32 PM, shyla deshpande 
> wrote:
>
>> Thanks TD for the response. I forgot to mention that I am not using
>> structured streaming.
>>
>> I was looking into KafkaUtils.createRDD, and looks like I need to get
>> the earliest and the latest offset for each partition to build the
>> Array(offsetRange). I wanted to know if there was a easier way.
>>
>> 1 reason why we are hesitating to use structured streaming is because I
>> need to persist the data in Cassandra database which I believe is not
>> production ready.
>>
>>
>> On Mon, Aug 7, 2017 at 6:11 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Its best to use DataFrames. You can read from as streaming or as batch.
>>> More details here.
>>>
>>> https://spark.apache.org/docs/latest/structured-streaming-ka
>>> fka-integration.html#creating-a-kafka-source-for-batch-queries
>>> https://databricks.com/blog/2017/04/26/processing-data-in-ap
>>> ache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>>
>>> On Mon, Aug 7, 2017 at 6:03 PM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
 Hi all,

 What is the easiest way to read all the data from kafka in a batch
 program for a given topic?
 I have 10 kafka partitions, but the data is not much. I would like to
 read  from the earliest from all the partitions for a topic.

 I appreciate any help. Thanks

>>>
>>>
>>
>


Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread Tathagata Das
I dont think there is any easier way.

On Mon, Aug 7, 2017 at 7:32 PM, shyla deshpande 
wrote:

> Thanks TD for the response. I forgot to mention that I am not using
> structured streaming.
>
> I was looking into KafkaUtils.createRDD, and looks like I need to get the
> earliest and the latest offset for each partition to build the
> Array(offsetRange). I wanted to know if there was a easier way.
>
> 1 reason why we are hesitating to use structured streaming is because I
> need to persist the data in Cassandra database which I believe is not
> production ready.
>
>
> On Mon, Aug 7, 2017 at 6:11 PM, Tathagata Das  > wrote:
>
>> Its best to use DataFrames. You can read from as streaming or as batch.
>> More details here.
>>
>> https://spark.apache.org/docs/latest/structured-streaming-ka
>> fka-integration.html#creating-a-kafka-source-for-batch-queries
>> https://databricks.com/blog/2017/04/26/processing-data-in-ap
>> ache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>>
>> On Mon, Aug 7, 2017 at 6:03 PM, shyla deshpande > > wrote:
>>
>>> Hi all,
>>>
>>> What is the easiest way to read all the data from kafka in a batch
>>> program for a given topic?
>>> I have 10 kafka partitions, but the data is not much. I would like to
>>> read  from the earliest from all the partitions for a topic.
>>>
>>> I appreciate any help. Thanks
>>>
>>
>>
>


Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread shyla deshpande
Thanks TD for the response. I forgot to mention that I am not using
structured streaming.

I was looking into KafkaUtils.createRDD, and looks like I need to get the
earliest and the latest offset for each partition to build the
Array(offsetRange). I wanted to know if there was a easier way.

1 reason why we are hesitating to use structured streaming is because I
need to persist the data in Cassandra database which I believe is not
production ready.


On Mon, Aug 7, 2017 at 6:11 PM, Tathagata Das 
wrote:

> Its best to use DataFrames. You can read from as streaming or as batch.
> More details here.
>
> https://spark.apache.org/docs/latest/structured-streaming-
> kafka-integration.html#creating-a-kafka-source-for-batch-queries
> https://databricks.com/blog/2017/04/26/processing-data-in-
> apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
>
> On Mon, Aug 7, 2017 at 6:03 PM, shyla deshpande 
> wrote:
>
>> Hi all,
>>
>> What is the easiest way to read all the data from kafka in a batch
>> program for a given topic?
>> I have 10 kafka partitions, but the data is not much. I would like to
>> read  from the earliest from all the partitions for a topic.
>>
>> I appreciate any help. Thanks
>>
>
>


Re: KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread Tathagata Das
Its best to use DataFrames. You can read from as streaming or as batch.
More details here.

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

On Mon, Aug 7, 2017 at 6:03 PM, shyla deshpande 
wrote:

> Hi all,
>
> What is the easiest way to read all the data from kafka in a batch program
> for a given topic?
> I have 10 kafka partitions, but the data is not much. I would like to read
>  from the earliest from all the partitions for a topic.
>
> I appreciate any help. Thanks
>


KafkaUtils.createRDD , How do I read all the data from kafka in a batch program for a given topic?

2017-08-07 Thread shyla deshpande
Hi all,

What is the easiest way to read all the data from kafka in a batch program
for a given topic?
I have 10 kafka partitions, but the data is not much. I would like to read
 from the earliest from all the partitions for a topic.

I appreciate any help. Thanks


Re: [SS] Console sink not supporting recovering from checkpoint location? Why?

2017-08-07 Thread Michael Armbrust
I think there is really no good reason for this limitation.

On Mon, Aug 7, 2017 at 2:58 AM, Jacek Laskowski  wrote:

> Hi,
>
> While exploring checkpointing with kafka source and console sink I've
> got the exception:
>
> // today's build from the master
> scala> spark.version
> res8: String = 2.3.0-SNAPSHOT
>
> scala> val q = records.
>  |   writeStream.
>  |   format("console").
>  |   option("truncate", false).
>  |   option("checkpointLocation", "/tmp/checkpoint"). // <--
> checkpoint directory
>  |   trigger(Trigger.ProcessingTime(10.seconds)).
>  |   outputMode(OutputMode.Update).
>  |   start
> org.apache.spark.sql.AnalysisException: This query does not support
> recovering from checkpoint location. Delete /tmp/checkpoint/offsets to
> start over.;
>   at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(
> StreamingQueryManager.scala:222)
>   at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(
> StreamingQueryManager.scala:278)
>   at org.apache.spark.sql.streaming.DataStreamWriter.
> start(DataStreamWriter.scala:284)
>   ... 61 elided
>
> The "trigger" is the change
> https://issues.apache.org/jira/browse/SPARK-16116 and this line in
> particular https://github.com/apache/spark/pull/13817/files#diff-
> d35e8fce09686073f81de598ed657de7R277.
>
> Why is this needed? I can't think of a use case where console sink
> could not recover from checkpoint location (since all the information
> is available). I'm lost on it and would appreciate some help (to
> recover :))
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[spark-core] Choosing the correct number of partitions while joining two RDDs with partitioner set on one

2017-08-07 Thread Piyush Narang
hi folks,

I was debugging a Spark job that ending up with too few partitions during
the join step and thought I'd reach out understand if this is the right
behavior / what typical workarounds are.

I have two RDDs that I'm joining. One with a lot of partitions (5K+) and
one with much lesser partitions (< 50). I perform a reduceByKey on the
smallerRDD and then join the two together. I notice that the join
operations ends up with numPartitions = smallerRDD.numPartitions. This
seems to stem from the code in Partitioner.defaultPartitioner
.
That code checks if either of the RDDs has a partitioner specified and if
it does, it picks the partitioner and numPartitions of that RDD. In my case
as I'm calling reduceByKey on the smaller RDD, that ends up with a
partitioner being set and thus that's what we end up with along with the
much fewer number of partitions.

I'm currently just specifying the number of partitions I want, but I was
wondering if others have run into this and if there are other suggested
workarounds? To partition my larger RDD as well? Would it make sense in the
defaultPartitioner function to account for if the number of partitions is
much larger in one RDD?

Here's a simple snippet that illustrates things:

val largeRDD = sc.parallelize( List( (1,10), (1,11), (2,20), (2,21),
(3, 30), (3,31)), 100)val smallRDD = sc.parallelize( List( (1,"one"),
(2,"two"), (3,"three")), 2).reduceByKey((l, _) => l)
// end up with a join with 2 partitions
largeRDD.join(smallRDD).collect().foreach(println)


Thanks,

-- 
- Piyush


Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread shyla deshpande
I am doing that already for all known messy data. Thanks Cody for all your
time and input

On Mon, Aug 7, 2017 at 11:58 AM, Cody Koeninger  wrote:

> Yes
>
> On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande
>  wrote:
> > Thanks Cody again.
> >
> > No. I am doing mapping of the Kafka ConsumerRecord to be able to save it
> in
> > the Cassandra table and saveToCassandra  is an action and my data do get
> > saved into Cassandra. It is working as expected 99% of the time except
> that
> > when there is an exception, I did not want the offsets to be committed.
> >
> > By Filtering for unsuccessful attempts, do you mean filtering the bad
> > records...
> >
> >
> >
> >
> >
> >
> > On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger 
> wrote:
> >>
> >> If literally all you are doing is rdd.map I wouldn't expect
> >> saveToCassandra to happen at all, since map is not an action.
> >>
> >> Filtering for unsuccessful attempts and collecting those back to the
> >> driver would be one way for the driver to know whether it was safe to
> >> commit.
> >>
> >> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
> >>  wrote:
> >> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  -->
> is
> >> > running on executor
> >> >
> >> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) -->
> is
> >> > running on driver.
> >> >
> >> > Is this the reason why kafka offsets are committed even when an
> >> > exception is
> >> > raised? If so is there a way to commit the offsets only when there are
> >> > no
> >> > exceptions?
> >> >
> >> >
> >> >
> >> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande
> >> > 
> >> > wrote:
> >> >>
> >> >> Thanks again Cody,
> >> >>
> >> >> My understanding is all the code inside foreachRDD is running on the
> >> >> driver except for
> >> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
> >> >>
> >> >> When the exception is raised, I was thinking I won't be committing
> the
> >> >> offsets, but the offsets are committed all the time independent of
> >> >> whether
> >> >> an exception was raised or not.
> >> >>
> >> >> It will be helpful if you can explain this behavior.
> >> >>
> >> >>
> >> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger 
> >> >> wrote:
> >> >>>
> >> >>> I mean that the kafka consumers running on the executors should not
> be
> >> >>> automatically committing, because the fact that a message was read
> by
> >> >>> the consumer has no bearing on whether it was actually successfully
> >> >>> processed after reading.
> >> >>>
> >> >>> It sounds to me like you're confused about where code is running.
> >> >>> foreachRDD runs on the driver, not the executor.
> >> >>>
> >> >>>
> >> >>>
> >> >>> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#design-patterns-for-using-foreachrdd
> >> >>>
> >> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
> >> >>>  wrote:
> >> >>> > Thanks Cody for your response.
> >> >>> >
> >> >>> > All I want to do is, commit the offsets only if I am successfully
> >> >>> > able
> >> >>> > to
> >> >>> > write to cassandra database.
> >> >>> >
> >> >>> > The line //save the rdd to Cassandra database is
> >> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
> >> >>> >
> >> >>> > What do you mean by Executors shouldn't be auto-committing, that's
> >> >>> > why
> >> >>> > it's
> >> >>> > being overridden. It is the executors that do the mapping and
> saving
> >> >>> > to
> >> >>> > cassandra. The status of success or failure of this operation is
> >> >>> > known
> >> >>> > only
> >> >>> > on the executor and thats where I want to commit the kafka
> offsets.
> >> >>> > If
> >> >>> > this
> >> >>> > is not what I sould be doing, then  what is the right way?
> >> >>> >
> >> >>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger <
> c...@koeninger.org>
> >> >>> > wrote:
> >> >>> >>
> >> >>> >> If your complaint is about offsets being committed that you
> didn't
> >> >>> >> expect... auto commit being false on executors shouldn't have
> >> >>> >> anything
> >> >>> >> to do with that.  Executors shouldn't be auto-committing, that's
> >> >>> >> why
> >> >>> >> it's being overridden.
> >> >>> >>
> >> >>> >> What you've said and the code you posted isn't really enough to
> >> >>> >> explain what your issue is, e.g.
> >> >>> >>
> >> >>> >> is this line
> >> >>> >> // save the rdd to Cassandra database
> >> >>> >> a blocking call
> >> >>> >>
> >> >>> >> are you sure that the rdd foreach isn't being retried and
> >> >>> >> succeeding
> >> >>> >> the second time around, etc
> >> >>> >>
> >> >>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
> >> >>> >>  wrote:
> >> >>> >> > Hello All,
> >> >>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
> >> >>> >> >
> >> >>> >> > I am setting 

Spark sample submitted with cluster deploy-mode does not work in Standalone

2017-08-07 Thread ctang
Spark sample submitted with cluster deploy-mode (./bin/run-example --master
spark://localhost:6066 --deploy-mode=cluster SparkPi 10) throw out following
error, any one knows what the problem is?
==
17/08/07 16:27:39 ERROR RestSubmissionClient: Exception from the cluster:
java.nio.file.NoSuchFileException: spark-internal
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526)
sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
java.nio.file.Files.copy(Files.java:1274)

org.apache.spark.util.Utils$.org$apache$spark$util$Utils$$copyRecursive(Utils.scala:604)
org.apache.spark.util.Utils$.copyFile(Utils.scala:575)
org.apache.spark.util.Utils$.doFetchFile(Utils.scala:660)
org.apache.spark.util.Utils$.fetchFile(Utils.scala:459)

org.apache.spark.deploy.worker.DriverRunner.org$apache$spark$deploy$worker$DriverRunner$$downloadUserJar(DriverRunner.scala:154)

org.apache.spark.deploy.worker.DriverRunner$$anon$1.run(DriverRunner.scala:83)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-sample-submitted-with-cluster-deploy-mode-does-not-work-in-Standalone-tp29040.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread Cody Koeninger
Yes

On Mon, Aug 7, 2017 at 12:32 PM, shyla deshpande
 wrote:
> Thanks Cody again.
>
> No. I am doing mapping of the Kafka ConsumerRecord to be able to save it in
> the Cassandra table and saveToCassandra  is an action and my data do get
> saved into Cassandra. It is working as expected 99% of the time except that
> when there is an exception, I did not want the offsets to be committed.
>
> By Filtering for unsuccessful attempts, do you mean filtering the bad
> records...
>
>
>
>
>
>
> On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger  wrote:
>>
>> If literally all you are doing is rdd.map I wouldn't expect
>> saveToCassandra to happen at all, since map is not an action.
>>
>> Filtering for unsuccessful attempts and collecting those back to the
>> driver would be one way for the driver to know whether it was safe to
>> commit.
>>
>> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
>>  wrote:
>> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
>> > running on executor
>> >
>> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> is
>> > running on driver.
>> >
>> > Is this the reason why kafka offsets are committed even when an
>> > exception is
>> > raised? If so is there a way to commit the offsets only when there are
>> > no
>> > exceptions?
>> >
>> >
>> >
>> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande
>> > 
>> > wrote:
>> >>
>> >> Thanks again Cody,
>> >>
>> >> My understanding is all the code inside foreachRDD is running on the
>> >> driver except for
>> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
>> >>
>> >> When the exception is raised, I was thinking I won't be committing the
>> >> offsets, but the offsets are committed all the time independent of
>> >> whether
>> >> an exception was raised or not.
>> >>
>> >> It will be helpful if you can explain this behavior.
>> >>
>> >>
>> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger 
>> >> wrote:
>> >>>
>> >>> I mean that the kafka consumers running on the executors should not be
>> >>> automatically committing, because the fact that a message was read by
>> >>> the consumer has no bearing on whether it was actually successfully
>> >>> processed after reading.
>> >>>
>> >>> It sounds to me like you're confused about where code is running.
>> >>> foreachRDD runs on the driver, not the executor.
>> >>>
>> >>>
>> >>>
>> >>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>> >>>
>> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>> >>>  wrote:
>> >>> > Thanks Cody for your response.
>> >>> >
>> >>> > All I want to do is, commit the offsets only if I am successfully
>> >>> > able
>> >>> > to
>> >>> > write to cassandra database.
>> >>> >
>> >>> > The line //save the rdd to Cassandra database is
>> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>> >>> >
>> >>> > What do you mean by Executors shouldn't be auto-committing, that's
>> >>> > why
>> >>> > it's
>> >>> > being overridden. It is the executors that do the mapping and saving
>> >>> > to
>> >>> > cassandra. The status of success or failure of this operation is
>> >>> > known
>> >>> > only
>> >>> > on the executor and thats where I want to commit the kafka offsets.
>> >>> > If
>> >>> > this
>> >>> > is not what I sould be doing, then  what is the right way?
>> >>> >
>> >>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
>> >>> > wrote:
>> >>> >>
>> >>> >> If your complaint is about offsets being committed that you didn't
>> >>> >> expect... auto commit being false on executors shouldn't have
>> >>> >> anything
>> >>> >> to do with that.  Executors shouldn't be auto-committing, that's
>> >>> >> why
>> >>> >> it's being overridden.
>> >>> >>
>> >>> >> What you've said and the code you posted isn't really enough to
>> >>> >> explain what your issue is, e.g.
>> >>> >>
>> >>> >> is this line
>> >>> >> // save the rdd to Cassandra database
>> >>> >> a blocking call
>> >>> >>
>> >>> >> are you sure that the rdd foreach isn't being retried and
>> >>> >> succeeding
>> >>> >> the second time around, etc
>> >>> >>
>> >>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>> >>> >>  wrote:
>> >>> >> > Hello All,
>> >>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>> >>> >> >
>> >>> >> > I am setting enable.auto.commit to false, and manually want to
>> >>> >> > commit
>> >>> >> > the
>> >>> >> > offsets after my output operation is successful. So when a
>> >>> >> > exception
>> >>> >> > is
>> >>> >> > raised during during the processing I do not want the offsets to
>> >>> >> > be
>> >>> >> > committed. But looks like the offsets are automatically committed
>> >>> >> > even
>> >>> >> > when
>> >>> >> > the exception is raised and thereby I am losing data.
>> >>> 

Re: tuning - Spark data serialization for cache() ?

2017-08-07 Thread Ofir Manor
Thanks a lot for the quick pointer!
So, is the advice I linked to in official Spark 2.2 documentation
misleading? You are saying that Spark 2.2 does not use by Java
serialization? And the tip to switch to Kyro is also outdated?

Ofir Manor

Co-Founder & CTO | Equalum

Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io

On Mon, Aug 7, 2017 at 8:47 PM, Kazuaki Ishizaki 
wrote:

> For Dataframe (and Dataset), cache() already uses fast
> serialization/deserialization with data compression schemes.
>
> We already identified some performance issues regarding cache(). We are
> working for alleviating these issues in https://issues.apache.org/
> jira/browse/SPARK-14098.
> We expect that these PRs will be integrated into Spark 2.3.
>
> Kazuaki Ishizaki
>
>
>
> From:Ofir Manor 
> To:user 
> Date:2017/08/08 02:04
> Subject:tuning - Spark data serialization for cache() ?
> --
>
>
>
> Hi,
> I'm using Spark 2.2, and have a big batch job, using dataframes (with
> built-in, basic types). It references the same intermediate dataframe
> multiple times, so I wanted to try to cache() that and see if it helps,
> both in memory footprint and performance.
>
> Now, the Spark 2.2 tuning page (
> *http://spark.apache.org/docs/latest/tuning.html*
> ) clearly says:
> 1. The default Spark serialization is Java serialization.
> 2. It is recommended to switch to Kyro serialization.
> 3. "Since Spark 2.0.0, we internally use Kryo serializer when shuffling
> RDDs with simple types, arrays of simple types, or string type".
>
> Now, I remember that in 2.0 launch, there were discussion of a third
> serialization format that is much more performant and compact. (Encoder?),
> but it is not referenced in the tuning guide and its Scala doc is not very
> clear to me. Specifically, Databricks shared some graphs etc of how much it
> is better than Kyro and Java serialization - see Encoders here:
>
> *https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html*
> 
>
> So, is that relevant to cache()? If so, how can I enable it - and is it
> for MEMORY_AND_DISK_ONLY or MEMORY_AND_DISK_SER?
>
> I tried to play with some other variations, like enabling Kyro by the
> tuning guide instructions, but didn't see any impact on the cached
> dataframe size (same tens of GBs in the UI). So any tips around that?
>
> Thanks.
>
> Ofir Manor
>
> Co-Founder & CTO | Equalum
>
> Mobile: *+972-54-7801286* <%2B972-54-7801286> | Email:
> *ofir.ma...@equalum.io* 
>
>
>


Re: tuning - Spark data serialization for cache() ?

2017-08-07 Thread Kazuaki Ishizaki
For Dataframe (and Dataset), cache() already uses fast 
serialization/deserialization with data compression schemes.

We already identified some performance issues regarding cache(). We are 
working for alleviating these issues in 
https://issues.apache.org/jira/browse/SPARK-14098.
We expect that these PRs will be integrated into Spark 2.3.

Kazuaki Ishizaki



From:   Ofir Manor 
To: user 
Date:   2017/08/08 02:04
Subject:tuning - Spark data serialization for cache() ?



Hi,
I'm using Spark 2.2, and have a big batch job, using dataframes (with 
built-in, basic types). It references the same intermediate dataframe 
multiple times, so I wanted to try to cache() that and see if it helps, 
both in memory footprint and performance.

Now, the Spark 2.2 tuning page (
http://spark.apache.org/docs/latest/tuning.html) clearly says:
1. The default Spark serialization is Java serialization.
2. It is recommended to switch to Kyro serialization.
3. "Since Spark 2.0.0, we internally use Kryo serializer when shuffling 
RDDs with simple types, arrays of simple types, or string type".

Now, I remember that in 2.0 launch, there were discussion of a third 
serialization format that is much more performant and compact. (Encoder?), 
but it is not referenced in the tuning guide and its Scala doc is not very 
clear to me. Specifically, Databricks shared some graphs etc of how much 
it is better than Kyro and Java serialization - see Encoders here:
https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html

So, is that relevant to cache()? If so, how can I enable it - and is it 
for MEMORY_AND_DISK_ONLY or MEMORY_AND_DISK_SER?

I tried to play with some other variations, like enabling Kyro by the 
tuning guide instructions, but didn't see any impact on the cached 
dataframe size (same tens of GBs in the UI). So any tips around that?

Thanks.
Ofir Manor
Co-Founder & CTO | Equalum
Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io




Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread shyla deshpande
Thanks Cody again.

No. I am doing mapping of the Kafka ConsumerRecord to be able to save it in
the Cassandra table and saveToCassandra  is an action and my data do get
saved into Cassandra. It is working as expected 99% of the time except that
when there is an exception, I did not want the offsets to be committed.

By Filtering for unsuccessful attempts, do you mean filtering the bad
records...






On Mon, Aug 7, 2017 at 9:59 AM, Cody Koeninger  wrote:

> If literally all you are doing is rdd.map I wouldn't expect
> saveToCassandra to happen at all, since map is not an action.
>
> Filtering for unsuccessful attempts and collecting those back to the
> driver would be one way for the driver to know whether it was safe to
> commit.
>
> On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
>  wrote:
> > rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
> > running on executor
> >
> > stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> is
> > running on driver.
> >
> > Is this the reason why kafka offsets are committed even when an
> exception is
> > raised? If so is there a way to commit the offsets only when there are no
> > exceptions?
> >
> >
> >
> > On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande <
> deshpandesh...@gmail.com>
> > wrote:
> >>
> >> Thanks again Cody,
> >>
> >> My understanding is all the code inside foreachRDD is running on the
> >> driver except for
> >> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
> >>
> >> When the exception is raised, I was thinking I won't be committing the
> >> offsets, but the offsets are committed all the time independent of
> whether
> >> an exception was raised or not.
> >>
> >> It will be helpful if you can explain this behavior.
> >>
> >>
> >> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger 
> wrote:
> >>>
> >>> I mean that the kafka consumers running on the executors should not be
> >>> automatically committing, because the fact that a message was read by
> >>> the consumer has no bearing on whether it was actually successfully
> >>> processed after reading.
> >>>
> >>> It sounds to me like you're confused about where code is running.
> >>> foreachRDD runs on the driver, not the executor.
> >>>
> >>>
> >>> http://spark.apache.org/docs/latest/streaming-programming-
> guide.html#design-patterns-for-using-foreachrdd
> >>>
> >>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
> >>>  wrote:
> >>> > Thanks Cody for your response.
> >>> >
> >>> > All I want to do is, commit the offsets only if I am successfully
> able
> >>> > to
> >>> > write to cassandra database.
> >>> >
> >>> > The line //save the rdd to Cassandra database is
> >>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
> >>> >
> >>> > What do you mean by Executors shouldn't be auto-committing, that's
> why
> >>> > it's
> >>> > being overridden. It is the executors that do the mapping and saving
> to
> >>> > cassandra. The status of success or failure of this operation is
> known
> >>> > only
> >>> > on the executor and thats where I want to commit the kafka offsets.
> If
> >>> > this
> >>> > is not what I sould be doing, then  what is the right way?
> >>> >
> >>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
> >>> > wrote:
> >>> >>
> >>> >> If your complaint is about offsets being committed that you didn't
> >>> >> expect... auto commit being false on executors shouldn't have
> anything
> >>> >> to do with that.  Executors shouldn't be auto-committing, that's why
> >>> >> it's being overridden.
> >>> >>
> >>> >> What you've said and the code you posted isn't really enough to
> >>> >> explain what your issue is, e.g.
> >>> >>
> >>> >> is this line
> >>> >> // save the rdd to Cassandra database
> >>> >> a blocking call
> >>> >>
> >>> >> are you sure that the rdd foreach isn't being retried and succeeding
> >>> >> the second time around, etc
> >>> >>
> >>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
> >>> >>  wrote:
> >>> >> > Hello All,
> >>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
> >>> >> >
> >>> >> > I am setting enable.auto.commit to false, and manually want to
> >>> >> > commit
> >>> >> > the
> >>> >> > offsets after my output operation is successful. So when a
> exception
> >>> >> > is
> >>> >> > raised during during the processing I do not want the offsets to
> be
> >>> >> > committed. But looks like the offsets are automatically committed
> >>> >> > even
> >>> >> > when
> >>> >> > the exception is raised and thereby I am losing data.
> >>> >> > In my logs I see,  WARN  overriding enable.auto.commit to false
> for
> >>> >> > executor.  But I don't want it to override. Please help.
> >>> >> >
> >>> >> > My code looks like..
> >>> >> >
> >>> >> > val kafkaParams = Map[String, Object](
> >>> >> >   "bootstrap.servers" -> brokers,
> >>> >> >   

tuning - Spark data serialization for cache() ?

2017-08-07 Thread Ofir Manor
Hi,
I'm using Spark 2.2, and have a big batch job, using dataframes (with
built-in, basic types). It references the same intermediate dataframe
multiple times, so I wanted to try to cache() that and see if it helps,
both in memory footprint and performance.

Now, the Spark 2.2 tuning page (
http://spark.apache.org/docs/latest/tuning.html) clearly says:
1. The default Spark serialization is Java serialization.
2. It is recommended to switch to Kyro serialization.
3. "Since Spark 2.0.0, we internally use Kryo serializer when shuffling
RDDs with simple types, arrays of simple types, or string type".

Now, I remember that in 2.0 launch, there were discussion of a third
serialization format that is much more performant and compact. (Encoder?),
but it is not referenced in the tuning guide and its Scala doc is not very
clear to me. Specifically, Databricks shared some graphs etc of how much it
is better than Kyro and Java serialization - see Encoders here:
https://databricks.com/blog/2016/01/04/introducing-apache-spark-datasets.html

So, is that relevant to cache()? If so, how can I enable it - and is
it for MEMORY_AND_DISK_ONLY
or MEMORY_AND_DISK_SER?

I tried to play with some other variations, like enabling Kyro by the
tuning guide instructions, but didn't see any impact on the cached
dataframe size (same tens of GBs in the UI). So any tips around that?

Thanks.

Ofir Manor

Co-Founder & CTO | Equalum

Mobile: +972-54-7801286 | Email: ofir.ma...@equalum.io


Re: kafka settting, enable.auto.commit to false is being overridden and I lose data. please help!

2017-08-07 Thread Cody Koeninger
If literally all you are doing is rdd.map I wouldn't expect
saveToCassandra to happen at all, since map is not an action.

Filtering for unsuccessful attempts and collecting those back to the
driver would be one way for the driver to know whether it was safe to
commit.

On Mon, Aug 7, 2017 at 12:31 AM, shyla deshpande
 wrote:
> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1")  --> is
> running on executor
>
> stream1.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) --> is
> running on driver.
>
> Is this the reason why kafka offsets are committed even when an exception is
> raised? If so is there a way to commit the offsets only when there are no
> exceptions?
>
>
>
> On Sun, Aug 6, 2017 at 10:23 PM, shyla deshpande 
> wrote:
>>
>> Thanks again Cody,
>>
>> My understanding is all the code inside foreachRDD is running on the
>> driver except for
>> rdd.map { record => ()}.saveToCassandra("keyspace1", "table1").
>>
>> When the exception is raised, I was thinking I won't be committing the
>> offsets, but the offsets are committed all the time independent of whether
>> an exception was raised or not.
>>
>> It will be helpful if you can explain this behavior.
>>
>>
>> On Sun, Aug 6, 2017 at 5:19 PM, Cody Koeninger  wrote:
>>>
>>> I mean that the kafka consumers running on the executors should not be
>>> automatically committing, because the fact that a message was read by
>>> the consumer has no bearing on whether it was actually successfully
>>> processed after reading.
>>>
>>> It sounds to me like you're confused about where code is running.
>>> foreachRDD runs on the driver, not the executor.
>>>
>>>
>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>>>
>>> On Sun, Aug 6, 2017 at 12:55 PM, shyla deshpande
>>>  wrote:
>>> > Thanks Cody for your response.
>>> >
>>> > All I want to do is, commit the offsets only if I am successfully able
>>> > to
>>> > write to cassandra database.
>>> >
>>> > The line //save the rdd to Cassandra database is
>>> > rdd.map { record => ()}.saveToCassandra("kayspace1", "table1")
>>> >
>>> > What do you mean by Executors shouldn't be auto-committing, that's why
>>> > it's
>>> > being overridden. It is the executors that do the mapping and saving to
>>> > cassandra. The status of success or failure of this operation is known
>>> > only
>>> > on the executor and thats where I want to commit the kafka offsets. If
>>> > this
>>> > is not what I sould be doing, then  what is the right way?
>>> >
>>> > On Sun, Aug 6, 2017 at 9:21 AM, Cody Koeninger 
>>> > wrote:
>>> >>
>>> >> If your complaint is about offsets being committed that you didn't
>>> >> expect... auto commit being false on executors shouldn't have anything
>>> >> to do with that.  Executors shouldn't be auto-committing, that's why
>>> >> it's being overridden.
>>> >>
>>> >> What you've said and the code you posted isn't really enough to
>>> >> explain what your issue is, e.g.
>>> >>
>>> >> is this line
>>> >> // save the rdd to Cassandra database
>>> >> a blocking call
>>> >>
>>> >> are you sure that the rdd foreach isn't being retried and succeeding
>>> >> the second time around, etc
>>> >>
>>> >> On Sat, Aug 5, 2017 at 5:10 PM, shyla deshpande
>>> >>  wrote:
>>> >> > Hello All,
>>> >> > I am using spark 2.0.2 and spark-streaming-kafka-0-10_2.11 .
>>> >> >
>>> >> > I am setting enable.auto.commit to false, and manually want to
>>> >> > commit
>>> >> > the
>>> >> > offsets after my output operation is successful. So when a exception
>>> >> > is
>>> >> > raised during during the processing I do not want the offsets to be
>>> >> > committed. But looks like the offsets are automatically committed
>>> >> > even
>>> >> > when
>>> >> > the exception is raised and thereby I am losing data.
>>> >> > In my logs I see,  WARN  overriding enable.auto.commit to false for
>>> >> > executor.  But I don't want it to override. Please help.
>>> >> >
>>> >> > My code looks like..
>>> >> >
>>> >> > val kafkaParams = Map[String, Object](
>>> >> >   "bootstrap.servers" -> brokers,
>>> >> >   "key.deserializer" -> classOf[StringDeserializer],
>>> >> >   "value.deserializer" -> classOf[StringDeserializer],
>>> >> >   "group.id" -> "Group1",
>>> >> >   "auto.offset.reset" -> offsetresetparameter,
>>> >> >   "enable.auto.commit" -> (false: java.lang.Boolean)
>>> >> > )
>>> >> >
>>> >> > val myTopics = Array("topic1")
>>> >> > val stream1 = KafkaUtils.createDirectStream[String, String](
>>> >> >   ssc,
>>> >> >   PreferConsistent,
>>> >> >   Subscribe[String, String](myTopics, kafkaParams)
>>> >> > )
>>> >> >
>>> >> > stream1.foreachRDD { (rdd, time) =>
>>> >> > val offsetRanges =
>>> >> > rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>> >> > try {
>>> 

Spark 2.1 table loaded from Hive Metastore has null values

2017-08-07 Thread Shmuel Blitz
(Also asked on SO at https://stackoverflow.com/q/45543140/416300)
I am trying to migrate table definitions from one Hive metastore to another.

The source cluster has:

   - Spark 1.6.0
   - Hive 1.1.0 (cdh)
   - HDFS


The destination cluster is an EMR cluster with:

   - Spark 2.1.1
   - Hive 2.1.1
   - S3


To migrate the tables I did the following:
  1. Copy data from HDFS to S3
  2. Run SHOW CREATE TABLE my_table; in the source cluster
  3. Modify the returned create query - change LOCATION from the HDFS path
to the S3 path
  4. Run the modified query on the destination cluster's Hive
  5. Run SELECT * FROM my_table;. This returns 0 rows (expected)
  6. Run MSCK REPAIR TABLE my_table;. This passes as expected and registers
the partitions in the metastore.
  7. Run SELECT * FROM my_table LIMIT 10; - 10 lines are returned with
correct values
  8. On the destination cluster, from Spark that is configured to work with
the Hive Metastore, run the following code: spark.sql("SELECT * FROM
my_table limit 10").show() - This returns null values!

The result returned from the Spark SQL query has all the correct columns,
and the correct number of lines, but all the values are null.

To get Spark to correctly load the values, I can add the following
properties to the TBLPROPERTIES part of the create query:

'spark.sql.partitionProvider'='catalog',
'spark.sql.sources.provider'='org.apache.spark.sql.parquet',
'spark.sql.sources.schema.numPartCols'='',
'spark.sql.sources.schema.numParts'='1',
'spark.sql.sources.schema.part.0'=''
'spark.sql.sources.schema.partCol.0'='',
'spark.sql.sources.schema.partCol.1'='',
...



The other side of this problem is that in the source cluster, Spark reads
the table values without any problem and without the extra TBLPROPERTIES.

Why is this happening? How can it be fixed?
-- 
[image: Logo]

Shmuel Blitz
*Big Data Developer*
www.similarweb.com


Like
Us


Follow
Us


Watch
Us


Read
Us



[SS] Console sink not supporting recovering from checkpoint location? Why?

2017-08-07 Thread Jacek Laskowski
Hi,

While exploring checkpointing with kafka source and console sink I've
got the exception:

// today's build from the master
scala> spark.version
res8: String = 2.3.0-SNAPSHOT

scala> val q = records.
 |   writeStream.
 |   format("console").
 |   option("truncate", false).
 |   option("checkpointLocation", "/tmp/checkpoint"). // <--
checkpoint directory
 |   trigger(Trigger.ProcessingTime(10.seconds)).
 |   outputMode(OutputMode.Update).
 |   start
org.apache.spark.sql.AnalysisException: This query does not support
recovering from checkpoint location. Delete /tmp/checkpoint/offsets to
start over.;
  at 
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:222)
  at 
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
  at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:284)
  ... 61 elided

The "trigger" is the change
https://issues.apache.org/jira/browse/SPARK-16116 and this line in
particular 
https://github.com/apache/spark/pull/13817/files#diff-d35e8fce09686073f81de598ed657de7R277.

Why is this needed? I can't think of a use case where console sink
could not recover from checkpoint location (since all the information
is available). I'm lost on it and would appreciate some help (to
recover :))

Pozdrawiam,
Jacek Laskowski

https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark
Follow me at https://twitter.com/jaceklaskowski

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[no subject]

2017-08-07 Thread Sumit Saraswat
Unsubscribe