Spark Streaming from Kafka, deal with initial heavy load.

2017-03-17 Thread sagarcasual .
Hi, we have spark 1.6.1 streaming from Kafka (0.10.1) topic using direct
approach. The streaming part works fine but when we initially start the
job, we have to deal with really huge Kafka message backlog, millions of
messages, and that first batch runs for over 40 hours,  and after 12 hours
or so it becomes very very slow, it keeps crunching messages, but at a very
low speed. Any idea how to overcome this issue? Once the job is all caught
up, subsequent batches are quick and fast since the load is really tiny to
process. So any idea how to avoid this problem?


Spark master IP on Kubernetes

2017-03-17 Thread ffarozan
We are deploying Spark on k8s cluster. We are facing one issue with respect
to Spark master IP from a worker perspective. The Spark master is exposed as
a service @ 10.3.0.175:7077.

Spark worker registers with the master, but saves the pod IP, instead of the
service IP.

Following are related logs.
17/03/18 03:33:15 SPARK_WORKER INFO Utils: Successfully started service
'WorkerUI' on port 8081.
17/03/18 03:33:15 SPARK_WORKER INFO WorkerWebUI: Bound WorkerWebUI to
0.0.0.0, and started at http://10.2.58.40:8081
17/03/18 03:33:15 SPARK_WORKER INFO Worker: Connecting to master
10.3.0.175:7077...
17/03/18 03:33:15 SPARK_WORKER INFO ContextHandler: Started
o.s.j.s.ServletContextHandler@1302ede4{/metrics/json,null,AVAILABLE}
17/03/18 03:33:15 SPARK_WORKER INFO TransportClientFactory: Successfully
created connection to /10.3.0.175:7077 after 89 ms (0 ms spent in
bootstraps)
17/03/18 03:33:15 SPARK_WORKER INFO Worker: Successfully registered with
master spark://10.2.58.68:7077


As can be seen, what gets registered is the pod IP -
spark://10.2.58.68:7077.

The issue due to this behavior is that when spark master pod dies or
restarts, the spark worker does not act on the disconnected message, since
it checks if the disconnected IP (here, we get service IP - 10.3.0.175)
matches with the locally stored IP (in this case, it is 10.2.58.68).

Any suggestions how I can override this behavior, without changing spark
code?

If this can be achieved only by changing spark behavior, please share ideas
how to go about.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-master-IP-on-Kubernetes-tp28507.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: spark streaming exectors memory increasing and executor killed by yarn

2017-03-17 Thread darin
I add this code in foreachRDD block .
```
rdd.persist(StorageLevel.MEMORY_AND_DISK)
```


This exception no occur agein.But many executor dead showing in spark
streaming UI .
```
User class threw exception: org.apache.spark.SparkException: Job aborted due
to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.
```




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500p28506.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Spark 2.0.2 - hiveContext.emptyDataFrame.except(hiveContext.emptyDataFrame).count()

2017-03-17 Thread Ravindra
Thanks a lot young for explanation. But its sounds like an API behaviour
change. For now I do the counts != o on both dataframes before these
operations. Not good from performance point of view hence have created a
JIRA (SPARK-20008) to track it.

Thanks,
Ravindra.

On Fri, Mar 17, 2017 at 8:51 PM Yong Zhang  wrote:

> Starting from Spark 2, these kind of operation are implemented in left
> anti join, instead of using RDD operation directly.
>
>
> Same issue also on sqlContext.
>
>
> scala> spark.version
> res25: String = 2.0.2
>
>
> spark.sqlContext.emptyDataFrame.except(spark.sqlContext.emptyDataFrame).explain(true)
>
> == Physical Plan ==
> *HashAggregate(keys=[], functions=[], output=[])
> +- Exchange SinglePartition
>+- *HashAggregate(keys=[], functions=[], output=[])
>   +- BroadcastNestedLoopJoin BuildRight, *LeftAnti*, false
>  :- Scan ExistingRDD[]
>  +- BroadcastExchange IdentityBroadcastMode
> +- Scan ExistingRDD[]
>
> This arguably means a bug. But my guess is liking the logic of comparing
> NULL = NULL, should it return true or false, causing this kind of
> confusion.
>
> Yong
>
> --
> *From:* Ravindra 
> *Sent:* Friday, March 17, 2017 4:30 AM
> *To:* user@spark.apache.org
> *Subject:* Spark 2.0.2 -
> hiveContext.emptyDataFrame.except(hiveContext.emptyDataFrame).count()
>
> Can someone please explain why
>
> println ( " Empty count " +
> hiveContext.emptyDataFrame.except(hiveContext.emptyDataFrame).count()
>
> *prints* -  Empty count 1
>
> This was not the case in Spark 1.5.2... I am upgrading to spark 2.0.2 and
> found this. This causes my tests to fail. Is there another way to check
> full equality of 2 dataframes.
>
> Thanks,
> Ravindra.
>


How to redistribute dataset without full shuffle

2017-03-17 Thread Artur R
Hi!

I use Spark heavily for various workloads and always fall in the situation
when there is some skewed dataset (without any partitioner assigned) and I
just want to "redistribute" its data more evenly.

For example, say there is RDD of X partitions with Y rows on each except
one large partition with Y * 10 rows. I don't want to change number of
partitions, only redistribute it. Obviously, such operation should not send
more than ~Y * 9 rows across the network.
But the only option available is repartition that requires full shuffle
that takes ALL (X * Y) rows.

The question: why there is no such operation like "redistribute"?


HyperLogLogMonoid for unique visitor count in Spark Streaming

2017-03-17 Thread SRK
Hi,

We have a requirement to calculate unique visitors in Spark Streaming. Can
HyperLogLogMonoid be applied to a sliding window in Spark Streaming to
calculate unique visitors? Any example on how to do that would be of great
help.

Thanks,
Swetha



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HyperLogLogMonoid-for-unique-visitor-count-in-Spark-Streaming-tp28505.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming+Kafka][How-to]

2017-03-17 Thread Michael Armbrust
Another option that would avoid a shuffle would be to use assign and
coalesce, running two separate streams.

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("assign", """{t0: {"0": }, t1:{"0": x}}""")
  .load()
  .coalesce(1)
  .writeStream
  .foreach(... code to write to cassandra ...)

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "...")
  .option("assign", """{t0: {"1": }, t1:{"1": x}}""")
  .load()
  .coalesce(1)
  .writeStream
  .foreach(... code to write to cassandra ...)

On Fri, Mar 17, 2017 at 7:35 AM, OUASSAIDI, Sami 
wrote:

> @Cody : Duly noted.
> @Michael Ambrust : A repartition is out of the question for our project as
> it would be a fairly expensive operation. We tried looking into targeting a
> specific executor so as to avoid this extra cost and directly have well
> partitioned data after consuming the kafka topics. Also we are using Spark
> streaming to save to the cassandra DB and try to keep shuffle operations to
> a strict minimum (at best none). As of now we are not entirely pleased with
> our current performances, that's why I'm doing a kafka topic sharding POC
> and getting the executor to handle the specificied partitions is central.
> ᐧ
>
> 2017-03-17 9:14 GMT+01:00 Michael Armbrust :
>
>> Sorry, typo.  Should be a repartition not a groupBy.
>>
>>
>>> spark.readStream
>>>   .format("kafka")
>>>   .option("kafka.bootstrap.servers", "...")
>>>   .option("subscribe", "t0,t1")
>>>   .load()
>>>   .repartition($"partition")
>>>   .writeStream
>>>   .foreach(... code to write to cassandra ...)
>>>
>>
>
>
> --
> *Mind7 Consulting*
>
> Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
> __
>
> 64 Rue Taitbout, 75009 Paris
>


Getting 2.0.2 for the link http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz

2017-03-17 Thread George Obama
Hello,

I download spark-2.1.0-bin-hadoop2.7.tgz from 
http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz 
 and get 
Spark 2.0.2: verified for the Scala, Python and R.

The link is from the download page http://spark.apache.org/downloads.html 
.


Is that only me or something we need to fix?

Regards,
Junfeng

Re: Streaming 2.1.0 - window vs. batch duration

2017-03-17 Thread Cody Koeninger
Probably easier if you show some more code, but if you just call
dstream.window(Seconds(60))
you didn't specify a slide duration, so it's going to default to your
batch duration of 1 second.
So yeah, if you're just using e.g. foreachRDD to output every message
in the window, every second it's going to output the last 60 seconds
of messages... which does mean each message will be output a total of
60 times.

Using a smaller window of 5 seconds for an example, 1 message per
second, notice that message 1489765610 will be output a total of 5
times

Window:
1489765605
1489765606
1489765607
1489765608
1489765609
Window:
1489765606
1489765607
1489765608
1489765609
1489765610
Window:
1489765607
1489765608
1489765609
1489765610
1489765611
Window:
1489765608
1489765609
1489765610
1489765611
1489765612
Window:
1489765609
1489765610
1489765611
1489765612
1489765613
Window:
1489765610
1489765611
1489765612
1489765613
1489765614
Window:
1489765611
1489765612
1489765613
1489765614
1489765615

On Thu, Mar 16, 2017 at 2:34 PM, Dominik Safaric
 wrote:
> Hi all,
>
> As I’ve implemented a streaming application pulling data from Kafka every 1
> second (batch interval), I am observing some quite strange behaviour (didn’t
> use Spark extensively in the past, but continuous operator based engines
> instead of).
>
> Namely the dstream.window(Seconds(60)) windowed stream when written back to
> Kafka contains more messages then they were consumed (for debugging purposes
> using a small dataset of a million Kafka byte array deserialized messages).
> In particular, in total I’ve streamed exactly 1 million messages, whereas
> upon window expiry 60 million messages are written back to Kafka.
>
> I’ve read on the official docs that both the window and window slide
> duration must be multiples of the batch interval. Does this mean that when
> consuming messages between two windows every batch interval the RDDs of a
> given batch interval t the same batch is being ingested 59 more times into
> the windowed stream?
>
> If I would like to achieve this behaviour (batch every being equal to a
> second, window duration 60 seconds) - how might one achieve this?
>
> I would appreciate if anyone could correct me if I got the internals of
> Spark’s windowed operations wrong and elaborate a bit.
>
> Thanks,
> Dominik

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark Streaming+Kafka][How-to]

2017-03-17 Thread OUASSAIDI, Sami
@Cody : Duly noted.
@Michael Ambrust : A repartition is out of the question for our project as
it would be a fairly expensive operation. We tried looking into targeting a
specific executor so as to avoid this extra cost and directly have well
partitioned data after consuming the kafka topics. Also we are using Spark
streaming to save to the cassandra DB and try to keep shuffle operations to
a strict minimum (at best none). As of now we are not entirely pleased with
our current performances, that's why I'm doing a kafka topic sharding POC
and getting the executor to handle the specificied partitions is central.
ᐧ

2017-03-17 9:14 GMT+01:00 Michael Armbrust :

> Sorry, typo.  Should be a repartition not a groupBy.
>
>
>> spark.readStream
>>   .format("kafka")
>>   .option("kafka.bootstrap.servers", "...")
>>   .option("subscribe", "t0,t1")
>>   .load()
>>   .repartition($"partition")
>>   .writeStream
>>   .foreach(... code to write to cassandra ...)
>>
>


-- 
*Mind7 Consulting*

Sami Ouassaid | Consultant Big Data | sami.ouassa...@mind7.com
__

64 Rue Taitbout, 75009 Paris


Re: How does preprocessing fit into Spark MLlib pipeline

2017-03-17 Thread Yanbo Liang
Hi Adrian,

Did you try SQLTransformer? Your preprocessing steps are SQL operations and
can be handled by SQLTransformer in MLlib pipeline scope.

Thanks
Yanbo

On Thu, Mar 9, 2017 at 11:02 AM, aATv  wrote:

> I want to start using PySpark Mllib pipelines, but I don't understand
> how/where preprocessing fits into the pipeline.
>
> My preprocessing steps are generally in the following form:
>1) Load log files(from s3) and parse into a spark Dataframe with columns
> user_id, event_type, timestamp, etc
>2) Group by a column, then pivot and count another column
>   - e.g. df.groupby("user_id").pivot("event_type").count()
>   - We can think of the columns that this creates besides user_id as
> features, where the number of each event type is a different feature
>3) Join the data from step 1 with other metadata, usually stored in
> Cassandra. Then perform a transformation similar to one from step 2), where
> the column that is pivoted and counted is a column that came from the data
> stored in Cassandra.
>
> After this preprocessing, I would use transformers to create other features
> and feed it into a model, lets say Logistic Regression for example.
>
> I would like to make at lease step 2 a custom transformer and add that to a
> pipeline, but it doesn't fit the transformer abstraction. This is because
> it
> takes a single input column and outputs multiple columns.  It also has a
> different number of input rows than output rows due to the group by
> operation.
>
> Given that, how do I fit this into a Mllib pipeline, and it if doesn't fit
> as part of a pipeline, what is the best way to include it in my code so
> that
> it can easily be reused both for training and testing, as well as in
> production.
>
> I'm using pyspark 2.1 and here is an example of 2)
>
>
>
>
> Note: My question is in some way related to this question, but I don't
> think
> it is answered here:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Why-can-t-a-
> Transformer-have-multiple-output-columns-td18689.html
>  Transformer-have-multiple-output-columns-td18689.html>
>
> Thanks
> Adrian
>
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/How-does-preprocessing-fit-into-
> Spark-MLlib-pipeline-tp28473.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Spark 2.0.2 - hiveContext.emptyDataFrame.except(hiveContext.emptyDataFrame).count()

2017-03-17 Thread Yong Zhang
Starting from Spark 2, these kind of operation are implemented in left anti 
join, instead of using RDD operation directly.


Same issue also on sqlContext.


scala> spark.version
res25: String = 2.0.2


spark.sqlContext.emptyDataFrame.except(spark.sqlContext.emptyDataFrame).explain(true)

== Physical Plan ==
*HashAggregate(keys=[], functions=[], output=[])
+- Exchange SinglePartition
   +- *HashAggregate(keys=[], functions=[], output=[])
  +- BroadcastNestedLoopJoin BuildRight, LeftAnti, false
 :- Scan ExistingRDD[]
 +- BroadcastExchange IdentityBroadcastMode
+- Scan ExistingRDD[]


This arguably means a bug. But my guess is liking the logic of comparing NULL = 
NULL, should it return true or false, causing this kind of confusion.

Yong


From: Ravindra 
Sent: Friday, March 17, 2017 4:30 AM
To: user@spark.apache.org
Subject: Spark 2.0.2 - 
hiveContext.emptyDataFrame.except(hiveContext.emptyDataFrame).count()

Can someone please explain why

println ( " Empty count " + 
hiveContext.emptyDataFrame.except(hiveContext.emptyDataFrame).count()

prints -  Empty count 1

This was not the case in Spark 1.5.2... I am upgrading to spark 2.0.2 and found 
this. This causes my tests to fail. Is there another way to check full equality 
of 2 dataframes.

Thanks,
Ravindra.


Re: Dataset : Issue with Save

2017-03-17 Thread Yong Zhang
Looks like the current fix is reducing accumulator data being sent to driver, 
but there are still lots of more statistics data being sent to the driver. It 
is arguable that how much data is reasonable for 3.7k tasks.


You can attach your heap dump file in that JIRA and follow it.


Yong


From: Bahubali Jain 
Sent: Thursday, March 16, 2017 11:41 PM
To: Yong Zhang
Cc: user@spark.apache.org
Subject: Re: Dataset : Issue with Save

I am using SPARK 2.0 . There are comments in the ticket since Oct-2016 which 
clearly mention that issue still persists even in 2.0.
I agree 1G is very small today's world, and I have already resolved by 
increasing the spark.driver.maxResultSize.
I was more intrigued as to why is the data being sent to driver during 
save(similat to collect() action ), are there any plans to fix this 
behavior/issue ?

Thanks,
Baahu

On Fri, Mar 17, 2017 at 8:17 AM, Yong Zhang 
> wrote:

Did you read the JIRA ticket? Are you confirming that it is fixed in Spark 2.0, 
or you complain that it still exists in Spark 2.0?


First, you didn't tell us what version of your Spark you are using. The JIRA 
clearly said that it is a bug in Spark 1.x, and should be fixed in Spark 2.0. 
So help yourself and the community, to confirm if this is the case.


If you are looking for workaround, the JIRA ticket clearly show you how to 
increase your driver heap. 1G in today's world really is kind of small.


Yong



From: Bahubali Jain >
Sent: Thursday, March 16, 2017 10:34 PM
To: Yong Zhang
Cc: user@spark.apache.org
Subject: Re: Dataset : Issue with Save

Hi,
Was this not yet resolved?
Its a very common requirement to save a dataframe, is there a better way to 
save a dataframe by avoiding data being sent to driver?.

"Total size of serialized results of 3722 tasks (1024.0 MB) is bigger than 
spark.driver.maxResultSize (1024.0 MB) "

Thanks,
Baahu

On Fri, Mar 17, 2017 at 1:19 AM, Yong Zhang 
> wrote:

You can take a look of https://issues.apache.org/jira/browse/SPARK-12837


Yong

Spark driver requires large memory space for serialized 
...
issues.apache.org
Executing a sql statement with a large number of partitions requires a high 
memory space for the driver even there are no requests to collect data back to 
the driver.





From: Bahubali Jain >
Sent: Thursday, March 16, 2017 1:39 PM
To: user@spark.apache.org
Subject: Dataset : Issue with Save

Hi,
While saving a dataset usingmydataset.write().csv("outputlocation") 
  I am running into an exception

"Total size of serialized results of 3722 tasks (1024.0 MB) is bigger than 
spark.driver.maxResultSize (1024.0 MB)"

Does it mean that for saving a dataset whole of the dataset contents are being 
sent to driver ,similar to collect()  action?

Thanks,
Baahu



--
Twitter:http://twitter.com/Baahu




--
Twitter:http://twitter.com/Baahu



Re: RDD can not convert to df, thanks

2017-03-17 Thread Yong Zhang
You also need the import the sqlContext implicits


import sqlContext.implicits._

Yong


From: 萝卜丝炒饭 <1427357...@qq.com>
Sent: Friday, March 17, 2017 1:52 AM
To: user-return-68576-1427357147=qq.com; user
Subject: Re: RDD can not convert to df, thanks

More info,I have imported the implicitics of sparksession.

---Original---
From: 

UI Metrics data memory consumption

2017-03-17 Thread xjrk
Hi,

is there a good way how to get rid of UIData completely? I have switched off
UI, decreased retainedXXX to minimum, but still there seems to be a lot of
instances of this class (org.apache.spark.ui.jobs.UIData$TaskMetricsUIData)
held in memory. Any ideas?

Thanks,
J. S.

spark {
  master = "local[2]"
  master = ${?SPARK_MASTER}
  info = ""
  info = ${?SPARK_INFO_URI}
  jobs = ""
  jobs = ${?SPARK_JOBS}
  jars.packages = "org.elasticsearch:elasticsearch-spark-20_2.11:5.0.1"
  submit.deployMode = "cluster"
  sql.crossJoin.enabled = true
  executor.memory = "4g"
  executor.memory = ${?SPARK_EXECUTOR_MEMORY}
  executor.cores = 2
  shuffle.service.enabled = true
  dynamicAllocation.enabled = true
  ui.enabled = false
  ui.retainedJobs = 100
  ui.retainedStages = 100
  ui.retainedTasks = 3000
  sql.retainedExecutions = 100
}

 num #instances #bytes  class name
--
   1:  4011 1563354608  [J
   2:133214  185564000  [B
   3:449373  140445216  [C
   4:   5481059  131545416  scala.Tuple2
   5:   5429700  130312800  java.lang.Long
   6:238037   36071536  [Ljava.lang.Object;
   7:148048   16581376 
org.apache.spark.ui.jobs.UIData$TaskMetricsUIData
   8:545859   13100616  scala.collection.immutable.$colon$colon
   9:148048   11843840 
org.apache.spark.ui.jobs.UIData$ShuffleReadMetricsUIData



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/UI-Metrics-data-memory-consumption-tp28502.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



org.apache.spark.ui.jobs.UIData$TaskMetricsUIData

2017-03-17 Thread xjrk
Hi,

is there a good way how to get rid of UIData completely? I have switched off
UI, decreased retainedXXX to minimum, but still there seems to be a lot of
instances of this class ($SUBJ) held in memory. Any ideas?

Thanks,
J. S.

spark {
  master = "local[2]"
  master = ${?SPARK_MASTER}
  info = ""
  info = ${?SPARK_INFO_URI}
  jobs = ""
  jobs = ${?SPARK_JOBS}
  jars.packages = "org.elasticsearch:elasticsearch-spark-20_2.11:5.0.1"
  submit.deployMode = "cluster"
  sql.crossJoin.enabled = true
  executor.memory = "4g"
  executor.memory = ${?SPARK_EXECUTOR_MEMORY}
  executor.cores = 2
  shuffle.service.enabled = true
  dynamicAllocation.enabled = true
  ui.enabled = false
  ui.retainedJobs = 100
  ui.retainedStages = 100
  ui.retainedTasks = 3000
  sql.retainedExecutions = 100
}

 num #instances #bytes  class name
--
   1:  4011 1563354608  [J
   2:133214  185564000  [B
   3:449373  140445216  [C
   4:   5481059  131545416  scala.Tuple2
   5:   5429700  130312800  java.lang.Long
   6:238037   36071536  [Ljava.lang.Object;
   7:148048   16581376 
org.apache.spark.ui.jobs.UIData$TaskMetricsUIData
   8:545859   13100616  scala.collection.immutable.$colon$colon
   9:148048   11843840 
org.apache.spark.ui.jobs.UIData$ShuffleReadMetricsUIData



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-ui-jobs-UIData-TaskMetricsUIData-tp28501.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



org.apache.spark.ui.jobs.UIData$TaskMetricsUIData

2017-03-17 Thread Jiří Syrový
Hi,

is there a good way how to get rid of UIData completely? I have switched
off UI, decreased retainedXXX to minimum, but still there seems to be a lot
of instances of this class ($SUBJ) held in memory. Any ideas?

Thanks,
J. S.

spark {
  master = "local[2]"
  master = ${?SPARK_MASTER}
  info = ""
  info = ${?SPARK_INFO_URI}
  jobs = ""
  jobs = ${?SPARK_JOBS}
  jars.packages = "org.elasticsearch:elasticsearch-spark-20_2.11:5.0.1"
  submit.deployMode = "cluster"
  sql.crossJoin.enabled = true
  executor.memory = "4g"
  executor.memory = ${?SPARK_EXECUTOR_MEMORY}
  executor.cores = 2
  shuffle.service.enabled = true
  dynamicAllocation.enabled = true
  ui.enabled = false
  ui.retainedJobs = 100
  ui.retainedStages = 100
  ui.retainedTasks = 3000
  sql.retainedExecutions = 100
}


 num #instances #bytes  class name
--
   1:  4011 1563354608  [J
   2:133214  185564000  [B
   3:449373  140445216  [C
   4:   5481059  131545416  scala.Tuple2
   5:   5429700  130312800  java.lang.Long
   6:238037   36071536  [Ljava.lang.Object;
   7:148048   16581376
org.apache.spark.ui.jobs.UIData$TaskMetricsUIData
   8:545859   13100616  scala.collection.immutable.$colon$colon
   9:148048   11843840
org.apache.spark.ui.jobs.UIData$ShuffleReadMetricsUIData


Spark 2.0.2 - hiveContext.emptyDataFrame.except(hiveContext.emptyDataFrame).count()

2017-03-17 Thread Ravindra
Can someone please explain why

println ( " Empty count " +
hiveContext.emptyDataFrame.except(hiveContext.emptyDataFrame).count()

*prints* -  Empty count 1

This was not the case in Spark 1.5.2... I am upgrading to spark 2.0.2 and
found this. This causes my tests to fail. Is there another way to check
full equality of 2 dataframes.

Thanks,
Ravindra.


Re: [Spark Streaming+Kafka][How-to]

2017-03-17 Thread Michael Armbrust
Sorry, typo.  Should be a repartition not a groupBy.


> spark.readStream
>   .format("kafka")
>   .option("kafka.bootstrap.servers", "...")
>   .option("subscribe", "t0,t1")
>   .load()
>   .repartition($"partition")
>   .writeStream
>   .foreach(... code to write to cassandra ...)
>