Re: Custom metrics sink

2018-03-16 Thread Silvio Fiorito
Just set your custom sink in the org.apache.spark.metrics.sink namespace and 
configure metrics.properties. Use ConsoleSink as an example. Obviously since 
it’s private the API may change, but in the meantime that should work…

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/sink/ConsoleSink.scala#L28


From: Christopher Piggott 
Date: Friday, March 16, 2018 at 4:09 PM
To: "user@spark.apache.org" 
Subject: Custom metrics sink

Just for fun, i want to make a stupid program that makes different frequency 
chimes as each worker becomes active.  That way you can 'hear' what the cluster 
is doing and how it's distributing work.

I thought to do this I would make a custom Sink, but the Sink and everything 
else in org.apache.spark.metrics.sink is private to spark.  What I was hoping 
to do was to just pick up the # of active workers in semi real time (once a 
second?) and have them send a UDP message somewhere... then each worker would 
be assigned to a different frequency chime.  It's just a toy, for fun.

How do you add a custom Sink when these classes don't seem to be exposed?

--C



Re: Custom metrics sink

2018-03-16 Thread Felix Cheung
There is a proposal to expose them. See SPARK-14151


From: Christopher Piggott 
Sent: Friday, March 16, 2018 1:09:38 PM
To: user@spark.apache.org
Subject: Custom metrics sink

Just for fun, i want to make a stupid program that makes different frequency 
chimes as each worker becomes active.  That way you can 'hear' what the cluster 
is doing and how it's distributing work.

I thought to do this I would make a custom Sink, but the Sink and everything 
else in org.apache.spark.metrics.sink is private to spark.  What I was hoping 
to do was to just pick up the # of active workers in semi real time (once a 
second?) and have them send a UDP message somewhere... then each worker would 
be assigned to a different frequency chime.  It's just a toy, for fun.

How do you add a custom Sink when these classes don't seem to be exposed?

--C



change spark default for a setting without overriding user

2018-03-16 Thread Koert Kuipers
i would like to change some defaults in spark without overriding the user
if she/he wishes to change them.

for example currently spark.blacklist.enabled is by default false, which
makes sense for backwards compatibility.

i would like it to be by default true, but if the user provided --conf
spark.spark.blacklist.enabled=false it should be set to false again.

i am aware that i can do:

val spark = SparkSession
  .builder()
  .appName("SomeExample")
  .config("spark.spark.blacklist.enabled", "true")
  .getOrCreate()

however if i understand the order in which settings are applied correctly,
this will override any setting the user provided with --conf, which is
undesired.

note that i do not have permission to change spark-defaults.conf because
the spark install is also shared with some other applications for which i
do not want to change the defaults.

any suggestions? thanks


Custom metrics sink

2018-03-16 Thread Christopher Piggott
Just for fun, i want to make a stupid program that makes different
frequency chimes as each worker becomes active.  That way you can 'hear'
what the cluster is doing and how it's distributing work.

I thought to do this I would make a custom Sink, but the Sink and
everything else in org.apache.spark.metrics.sink is private to spark.  What
I was hoping to do was to just pick up the # of active workers in semi real
time (once a second?) and have them send a UDP message somewhere... then
each worker would be assigned to a different frequency chime.  It's just a
toy, for fun.

How do you add a custom Sink when these classes don't seem to be exposed?

--C


Re: is it possible to use Spark 2.3.0 along with Kafka 0.9.0.1?

2018-03-16 Thread Cody Koeninger
Should be able to use the 0.8 kafka dstreams with a kafka 0.9 broker

On Fri, Mar 16, 2018 at 7:52 AM, kant kodali  wrote:
> Hi All,
>
> is it possible to use Spark 2.3.0 along with Kafka 0.9.0.1?
>
> Thanks,
> kant

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



GOTO Chicago Talk / Discount

2018-03-16 Thread Trevor Grant
Hey all,

We (the ASF) are putting on a booth at GOTO Chicago April 24-27.  There is
a Spark talk by Kelly Robinson[1].  If anyone was already planning on going
and can help with the booth, there is a signup on the comdev wiki[2] (you
might need to join d...@community.apache.org and ask for write
permissions).  Finally, if you are now thinking of going, see the note
below, there is a code for $50 tickets using `asf10` at checkout.

[1] https://gotochgo.com/2018/sessions/457
[2]
https://cwiki.apache.org/confluence/display/COMDEV/GOTO+Con+Chicago%2CIL+2018

Below is the copy from GOTO Chicago:
-

GOTO Chicago 2018 on April 24-27 is an international IT conference designed
for developers and architects: a place where you will learn about new
trends, connect with industry experts, share your passion, and grow your
skills.

The program is created “for developers by developers”  with our program
committee of leading industry experts curating two full days of talks
covering topics like: DevOps, Machine Learning, Microservices, Serverless,
Cloud Native, Agile, Machine Learning, Programming Languages, Software
Security and Serverless.

We also offer two days of hands-on, full-day workshops at GOTO Chicago led
by our world-renowned speakers - a higher quality training session is hard
to find!

GOTO Chicago is truly the meeting place for software innovators and thought
leaders from all over the world. More information including the full
conference program is available here:  gotochgo.com

Interested in attending? Save an extra $50 off each conference and workshop
day - sign up now with the promo code asf10 at gotochgo.com!


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-16 Thread Aakash Basu
Hi all,

>From the last mail queries in the bottom, query 1's doubt has been
resolved, I was already guessing so, that I resent same columns from Kafka
producer multiple times, hence the join gave duplicates.

Retested with fresh Kafka feed and problem was solved.

But, the other queries still persists, would anyone like to reply? :)

Thanks,
Aakash.

On 16-Mar-2018 3:57 PM, "Aakash Basu"  wrote:

> Hi all,
>
> The code was perfectly alright, just the package I was submitting had to
> be the updated one (marked green below). The join happened but the output
> has many duplicates (even though the *how *parameter is by default *inner*)
> -
>
> Spark Submit:
>
> /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py
>
>
>
> Code:
>
> from pyspark.sql import SparkSession
> import time
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \
> .getOrCreate()
>
> table1_stream = 
> (spark.readStream.format("kafka").option("startingOffsets", 
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test1").load())
>
> table2_stream = 
> (spark.readStream.format("kafka").option("startingOffsets", 
> "earliest").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "test2").load())
>
>
> query1 = table1_stream.select('value')\
> .withColumn('value', table1_stream.value.cast("string")) \
> .withColumn("ID", split(col("value"), ",").getItem(0)) \
> .withColumn("First_Name", split(col("value"), ",").getItem(1)) \
> .withColumn("Last_Name", split(col("value"), ",").getItem(2)) \
> .drop('value')
>
> query2 = table2_stream.select('value') \
> .withColumn('value', table2_stream.value.cast("string")) \
> .withColumn("ID", split(col("value"), ",").getItem(0)) \
> .withColumn("Department", split(col("value"), ",").getItem(1)) \
> .withColumn("Date_joined", split(col("value"), ",").getItem(2)) \
> .drop('value')
>
> joined_Stream = query1.join(query2, "Id")
>
> a = query1.writeStream.format("console").start()
> b = query2.writeStream.format("console").start()
> c = joined_Stream.writeStream.format("console").start()
>
> time.sleep(10)
>
> a.awaitTermination()
> b.awaitTermination()
> c.awaitTermination()
>
>
> Output -
>
> +---+--+-+---+---+
> | ID|First_Name|Last_Name| Department|Date_joined|
> +---+--+-+---+---+
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  3| Tobit|Robardley| Accounting|   8/3/2006|
> |  5| Reggy|Comizzoli|Human Resources|  8/15/2012|
> |  5| Reggy|Comizzoli|Human Resources|  8/15/2012|
> +---+--+-+---+---+
> only showing top 20 rows
>
>
>
>
> *Queries:*
>
> *1) Why even after inner join, the join is doing a outer type?*
>
> *2) Do we need to put awaitTermination on all the streams? Or putting only
> on the input streams would suffice?*
> *3) This code is not optimized, how to generically optimize streaming
> code?*
>
> Thanks,
> Aakash.
>
> On Fri, Mar 16, 2018 at 3:23 PM, Aakash Basu 
> wrote:
>
>> Hi,
>>
>> *Thanks to Chris and TD* for perpetually supporting my endeavor. I ran
>> the code with a little bit of tweak here and there, *it worked well in
>> Spark 2.2.1* giving me the Deserialized values (I used withColumn in the
>> writeStream section to run all SQL functions of split and cast).
>>
>> But, when I submit the same code in 2.3.0, I get an error which I
>> couldn't find any solution of, on the internet.
>>
>>
>>
>>
>>
>> *Error: pyspark.sql.utils.StreamingQueryException: u'null\n=== Streaming
>> Query ===\nIdentifier: [id = d956096e-42d2-493c-8b6c-125e3137c291, 

Spark 2.x Core: .setMaster(local[*]) output is different from spark-submit

2018-03-16 Thread klrmowse
when i run a job with .setMaster(local[*]), the output is as expected... 

but when i run it using YARN (single node, pseudo-distributed hdfs) via
spark-submit, the output is fudged - instead of key-value pairs, it only
shows one value preceded by a comma, and the rest are blank 

what am i missing?


thanks in advance



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



NPE in Subexpression Elimination optimization

2018-03-16 Thread Jacek Laskowski
Hi,

I'm working on a minimal test to reproduce the NPE exception that is thrown
in the latest 2.3.0 and earlier 2.2.1 in subexpression elimination
optimization, and am sending it to the mailing list hoping someone notices
something familiar and would shed more light on what might be the root
cause and how to write a test.

I know why Spark throws the NPE technically since there's this @transient
relation: HadoopFsRelation [1] that is not re-created at de-serialization
on executors, but don't know why this @transient is required in the first
place and more importantly how to write a test.

Any hints appreciated.

FYI Disabling subexpression elimination
with spark.sql.subexpressionElimination.enabled Spark configuration
property helps.

[1]
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala?utf8=%E2%9C%93#L159

Caused by: java.lang.NullPointerException
  at
org.apache.spark.sql.execution.FileSourceScanExec.(DataSourceScanExec.scala:167)
  at
org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:502)
  at
org.apache.spark.sql.execution.FileSourceScanExec.doCanonicalize(DataSourceScanExec.scala:158)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:224)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
  at scala.collection.immutable.List.map(List.scala:285)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:224)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:210)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:209)
  at
org.apache.spark.sql.catalyst.plans.QueryPlan.sameResult(QueryPlan.scala:257)
  at
org.apache.spark.sql.execution.ScalarSubquery.semanticEquals(subquery.scala:58)
  at
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$Expr.equals(EquivalentExpressions.scala:36)
  at
scala.collection.mutable.HashTable$class.elemEquals(HashTable.scala:358)
  at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:40)
  at
scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$findEntry0(HashTable.scala:136)
  at scala.collection.mutable.HashTable$class.findEntry(HashTable.scala:132)
  at scala.collection.mutable.HashMap.findEntry(HashMap.scala:40)
  at scala.collection.mutable.HashMap.get(HashMap.scala:70)
  at
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExpr(EquivalentExpressions.scala:54)
  at
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.addExprTree(EquivalentExpressions.scala:95)
  at
org.apache.spark.sql.catalyst.expressions.EquivalentExpressions$$anonfun$addExprTree$1.apply(EquivalentExpressions.scala:96)
  at

Time delay in multiple predicate Filter

2018-03-16 Thread Nikodimos Nikolaidis

Hello,

Here’s a behavior that I find strange. A filtering with a predicate with 
zero selectivity is much quicker than a filtering with multiple 
predicates, but with the same zero-selectivity predicate in first place.


For example, in google’s English One Million 1-grams dataset (Spark 2.2, 
wholeStageCodegen enabled, local mode):


|val df = spark.read.parquet("../../1-grams.parquet") def metric[T](f: => 
T): T = { val t0 = System.nanoTime() val res = f println("Executed in " 
+ (System.nanoTime()-t0)/100 + "ms") res } df.count() // res11: Long 
= 261823186 metric { df.filter('gram === "does not exist").count() } // 
Executed in 1794ms // res13: Long = 0 metric { df.filter('gram === "does 
not exist" && 'year > 0 && 'times > 0 && 'books > 0).count() } // 
Executed in 4233ms // res15: Long = 0 |


In generated code, the behavior is exact the same; first predicate will 
continue the loop in the same point. Why this time difference exists?


Thanks

​


is it possible to use Spark 2.3.0 along with Kafka 0.9.0.1?

2018-03-16 Thread kant kodali
Hi All,

is it possible to use Spark 2.3.0 along with Kafka 0.9.0.1?

Thanks,
kant


Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-16 Thread Aakash Basu
Hi all,

The code was perfectly alright, just the package I was submitting had to be
the updated one (marked green below). The join happened but the output has
many duplicates (even though the *how *parameter is by default *inner*) -

Spark Submit:

/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py



Code:

from pyspark.sql import SparkSession
import time
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("DirectKafka_Spark_Stream_Stream_Join") \
.getOrCreate()

table1_stream =
(spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test1").load())

table2_stream =
(spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test2").load())


query1 = table1_stream.select('value')\
.withColumn('value', table1_stream.value.cast("string")) \
.withColumn("ID", split(col("value"), ",").getItem(0)) \
.withColumn("First_Name", split(col("value"), ",").getItem(1)) \
.withColumn("Last_Name", split(col("value"), ",").getItem(2)) \
.drop('value')

query2 = table2_stream.select('value') \
.withColumn('value', table2_stream.value.cast("string")) \
.withColumn("ID", split(col("value"), ",").getItem(0)) \
.withColumn("Department", split(col("value"), ",").getItem(1)) \
.withColumn("Date_joined", split(col("value"), ",").getItem(2)) \
.drop('value')

joined_Stream = query1.join(query2, "Id")

a = query1.writeStream.format("console").start()
b = query2.writeStream.format("console").start()
c = joined_Stream.writeStream.format("console").start()

time.sleep(10)

a.awaitTermination()
b.awaitTermination()
c.awaitTermination()


Output -

+---+--+-+---+---+
| ID|First_Name|Last_Name| Department|Date_joined|
+---+--+-+---+---+
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  3| Tobit|Robardley| Accounting|   8/3/2006|
|  5| Reggy|Comizzoli|Human Resources|  8/15/2012|
|  5| Reggy|Comizzoli|Human Resources|  8/15/2012|
+---+--+-+---+---+
only showing top 20 rows




*Queries:*

*1) Why even after inner join, the join is doing a outer type?*

*2) Do we need to put awaitTermination on all the streams? Or putting only
on the input streams would suffice?*
*3) This code is not optimized, how to generically optimize streaming code?*

Thanks,
Aakash.

On Fri, Mar 16, 2018 at 3:23 PM, Aakash Basu 
wrote:

> Hi,
>
> *Thanks to Chris and TD* for perpetually supporting my endeavor. I ran
> the code with a little bit of tweak here and there, *it worked well in
> Spark 2.2.1* giving me the Deserialized values (I used withColumn in the
> writeStream section to run all SQL functions of split and cast).
>
> But, when I submit the same code in 2.3.0, I get an error which I couldn't
> find any solution of, on the internet.
>
>
>
>
>
> *Error: pyspark.sql.utils.StreamingQueryException: u'null\n=== Streaming
> Query ===\nIdentifier: [id = d956096e-42d2-493c-8b6c-125e3137c291, runId =
> cd25ec61-c6bb-436c-a93e-80814e1436ec]\nCurrent Committed Offsets:
> {}\nCurrent Available Offsets: {}\n\nCurrent State: INITIALIZING\nThread
> State: RUNNABLE'*
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *Final code (for clearer understanding of where it may go wrong in 2.3.0)
> -from pyspark.sql import SparkSessionimport timefrom pyspark.sql.functions
> import split, colclass test: spark = SparkSession.builder \
> .appName("DirectKafka_Spark_Stream_Stream_Join") \ .getOrCreate()
> table1_stream = (spark.readStream.format("kafka").option("startingOffsets",
> "earliest").option("kafka.bootstrap.servers",

Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-16 Thread Aakash Basu
Hi,

*Thanks to Chris and TD* for perpetually supporting my endeavor. I ran the
code with a little bit of tweak here and there, *it worked well in Spark
2.2.1* giving me the Deserialized values (I used withColumn in the
writeStream section to run all SQL functions of split and cast).

But, when I submit the same code in 2.3.0, I get an error which I couldn't
find any solution of, on the internet.





*Error: pyspark.sql.utils.StreamingQueryException: u'null\n=== Streaming
Query ===\nIdentifier: [id = d956096e-42d2-493c-8b6c-125e3137c291, runId =
cd25ec61-c6bb-436c-a93e-80814e1436ec]\nCurrent Committed Offsets:
{}\nCurrent Available Offsets: {}\n\nCurrent State: INITIALIZING\nThread
State: RUNNABLE'*

































*Final code (for clearer understanding of where it may go wrong in 2.3.0)
-from pyspark.sql import SparkSessionimport timefrom pyspark.sql.functions
import split, colclass test: spark = SparkSession.builder \
.appName("DirectKafka_Spark_Stream_Stream_Join") \ .getOrCreate()
table1_stream = (spark.readStream.format("kafka").option("startingOffsets",
"earliest").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe", "test1").load()) query =
table1_stream.select('value').withColumn('value',
table1_stream.value.cast("string")) \ .withColumn("ID", split(col("value"),
",").getItem(0)) \ .withColumn("First_Name", split(col("value"),
",").getItem(1)) \ .withColumn("Last_Name", split(col("value"),
",").getItem(2)) \ .drop('value').writeStream.format("console").start()
time.sleep(10) query.awaitTermination()# Code working in Spark 2.2.1#
/home/kafka/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py#
Code not working in Spark 2.3.0#
/home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --packages
org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Stream_Join.py*
2) I'm getting the below output as expected, from the above code in 2.2.1.
My query is, is there a way to get the header of a file being read and
ensure header=True? (Or is it that for Structured Streaming, user has to
provide headers explicitly all the time, as data shall always come in this
structure [for Kafka] - topic, partition, offset, key, value, timestamp,
timestampType; if so, then how to remove column headers explicitly from the
data, as in the below table) I know it is a stream, and the data is fed in
as messages, but still wanted experts to put some more light into it.

+---+--+-+
| ID|First_Name|Last_Name|
+---+--+-+
| Hi|  null| null|
| id|first_name|last_name|
|  1|  Kellyann|Moyne|
|  2| Morty|  Blacker|
|  3| Tobit|Robardley|
|  4|Wilona|Kells|
|  5| Reggy|Comizzoli|
| id|first_name|last_name|
|  1|  Kellyann|Moyne|
|  2| Morty|  Blacker|
|  3| Tobit|Robardley|
|  4|Wilona|Kells|
|  5| Reggy|Comizzoli|
| id|first_name|last_name|
|  1|  Kellyann|Moyne|
|  2| Morty|  Blacker|
|  3| Tobit|Robardley|
|  4|Wilona|Kells|
|  5| Reggy|Comizzoli|
| id|first_name|last_name|
+---+--+-+
only showing top 20 rows


Any help?

Thanks,
Aakash.

On Fri, Mar 16, 2018 at 12:54 PM, sagar grover 
wrote:

>
> With regards,
> Sagar Grover
> Phone - 7022175584
>
> On Fri, Mar 16, 2018 at 12:15 AM, Aakash Basu 
> wrote:
>
>> Awesome, thanks for detailing!
>>
>> Was thinking the same, we've to split by comma for csv while casting
>> inside.
>>
>> Cool! Shall try it and revert back tomm.
>>
>> Thanks a ton!
>>
>> On 15-Mar-2018 11:50 PM, "Bowden, Chris" 
>> wrote:
>>
>>> To remain generic, the KafkaSource can only offer the lowest common
>>> denominator for a schema (topic, partition, offset, key, value, timestamp,
>>> timestampType). As such, you can't just feed it a StructType. When you are
>>> using a producer or consumer directly with Kafka, serialization and
>>> deserialization is often an orthogonal and implicit transform. However, in
>>> Spark, serialization and deserialization is an explicit transform (e.g.,
>>> you define it in your query plan).
>>>
>>>
>>> To make this more granular, if we imagine your source is registered as a
>>> temp view named "foo":
>>>
>>> SELECT
>>>
>>>   split(cast(value as string), ',')[0] as id,
>>>
>>>   split(cast(value as string), ',')[1] as name
>>>
>>> FROM foo;
>>>
>>>
>>> Assuming you were providing the following messages to Kafka:
>>>
>>> 1,aakash
>>>
>>> 2,tathagata
>>>
>>> 3,chris
>>>
>>>
>>> You could make the query plan less repetitive. I don't believe Spark
>>> offers from_csv out of the box as an expression (although CSV is well
>>> supported as a data source). You could implement an expression by reusing a
>>> lot of the supporting CSV classes which may result 

Re: Sparklyr and idle executors

2018-03-16 Thread Florian Dewes
I set this from within R:

config <- spark_config()
config$spark.shuffle.service.enabled = "true"
config$spark.dynamicAllocation.enabled = "true"
config$spark.dynamicAllocation.executorIdleTimeout = 120
config$spark.dynamicAllocation.maxExecutors = 80
sc <- spark_connect(master = “yarn_client",  config = config)

Thanks!

> On Mar 16, 2018, at 8:09 AM, Femi Anthony  wrote:
> 
> I assume you're setting these values in spark-defaults.conf. What happens if 
> you specify them directly to spark-submit  as in --conf 
> spark.dynamicAllocation.enabled=true 
> ?
> 
> On Thu, Mar 15, 2018 at 1:47 PM, Florian Dewes  > wrote:
> Hi all,
> 
> I am currently trying to enable dynamic resource allocation for a little yarn 
> managed spark cluster.
> We are using sparklyr to access spark from R and have multiple jobs which 
> should run in parallel, because some of them take several days to complete or 
> are in development.
> 
> Everything works out so far, the only problem we have is that executors are 
> not removed from idle jobs.
> 
> Lets say job A is the only running job that loads a file that is several 
> hundred GB in size and then goes idle without disconnecting from spark. It 
> gets 80% of the cluster because I set a maximum value via 
> spark.dynamicAllocation.maxExecutors.
> 
> When we start another job (B) with the remaining 20% of the cluster 
> resources, no idle executors of the other job are freed and the idle job will 
> keep 80% of the cluster's resources, although 
> spark.dynamicAllocation.executorIdleTimeout is set.
> 
> Only if we disconnect job A, B will allocate the freed executors.
> 
> Configuration settings used:
> 
> spark.shuffle.service.enabled = "true"
> spark.dynamicAllocation.enabled = “true"
> spark.dynamicAllocation.executorIdleTimeout = 120
> spark.dynamicAllocation.maxExecutors = 100
> 
> with
> 
> Spark 2.1.0
> R 3.4.3
> sparklyr 0.6.3
> 
> 
> Any ideas?
> 
> 
> Thanks,
> 
> Florian
> 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 
> 
> 
> 
> -- 
> http://www.femibyte.com/twiki5/bin/view/Tech/ 
> 
> http://www.nextmatrix.com "Great spirits have 
> always encountered violent opposition from mediocre minds." - Albert Einstein.



Re: Multiple Kafka Spark Streaming Dataframe Join query

2018-03-16 Thread sagar grover
With regards,
Sagar Grover
Phone - 7022175584

On Fri, Mar 16, 2018 at 12:15 AM, Aakash Basu 
wrote:

> Awesome, thanks for detailing!
>
> Was thinking the same, we've to split by comma for csv while casting
> inside.
>
> Cool! Shall try it and revert back tomm.
>
> Thanks a ton!
>
> On 15-Mar-2018 11:50 PM, "Bowden, Chris" 
> wrote:
>
>> To remain generic, the KafkaSource can only offer the lowest common
>> denominator for a schema (topic, partition, offset, key, value, timestamp,
>> timestampType). As such, you can't just feed it a StructType. When you are
>> using a producer or consumer directly with Kafka, serialization and
>> deserialization is often an orthogonal and implicit transform. However, in
>> Spark, serialization and deserialization is an explicit transform (e.g.,
>> you define it in your query plan).
>>
>>
>> To make this more granular, if we imagine your source is registered as a
>> temp view named "foo":
>>
>> SELECT
>>
>>   split(cast(value as string), ',')[0] as id,
>>
>>   split(cast(value as string), ',')[1] as name
>>
>> FROM foo;
>>
>>
>> Assuming you were providing the following messages to Kafka:
>>
>> 1,aakash
>>
>> 2,tathagata
>>
>> 3,chris
>>
>>
>> You could make the query plan less repetitive. I don't believe Spark
>> offers from_csv out of the box as an expression (although CSV is well
>> supported as a data source). You could implement an expression by reusing a
>> lot of the supporting CSV classes which may result in a better user
>> experience vs. explicitly using split and array indices, etc. In this
>> simple example, casting the binary to a string just works because there is
>> a common understanding of string's encoded as bytes between Spark and Kafka
>> by default.
>>
>>
>> -Chris
>> --
>> *From:* Aakash Basu 
>> *Sent:* Thursday, March 15, 2018 10:48:45 AM
>> *To:* Bowden, Chris
>> *Cc:* Tathagata Das; Dylan Guedes; Georg Heiler; user
>>
>> *Subject:* Re: Multiple Kafka Spark Streaming Dataframe Join query
>>
>> Hey Chris,
>>
>> You got it right. I'm reading a *csv *file from local as mentioned
>> above, with a console producer on Kafka side.
>>
>> So, as it is a csv data with headers, shall I then use from_csv on the
>> spark side and provide a StructType to shape it up with a schema and then
>> cast it to string as TD suggested?
>>
>> I'm getting all of your points at a very high level. A little more
>> granularity would help.
>>
>> *In the slide TD just shared*, PFA, I'm confused at the point where he
>> is casting the value as string. Logically, the value shall consist of all
>> the entire data set, so, suppose, I've a table with many columns, *how
>> can I provide a single alias as he did in the groupBy. I missed it there
>> itself. Another question is, do I have to cast in groupBy itself? Can't I
>> do it directly in a select query? The last one, if the steps are followed,
>> can I then run a SQL query on top of the columns separately?*
>>
>> Thanks,
>> Aakash.
>>
>>
>> On 15-Mar-2018 9:07 PM, "Bowden, Chris" 
>> wrote:
>>
>> You need to tell Spark about the structure of the data, it doesn't know
>> ahead of time if you put avro, json, protobuf, etc. in kafka for the
>> message format. If the messages are in json, Spark provides from_json out
>> of the box. For a very simple POC you can happily cast the value to a
>> string, etc. if you are prototyping and pushing messages by hand with a
>> console producer on the kafka side.
>>
>> 
>> From: Aakash Basu 
>> Sent: Thursday, March 15, 2018 7:52:28 AM
>> To: Tathagata Das
>> Cc: Dylan Guedes; Georg Heiler; user
>> Subject: Re: Multiple Kafka Spark Streaming Dataframe Join query
>>
>> Hi,
>>
>> And if I run this below piece of code -
>>
>>
>> from pyspark.sql import SparkSession
>> import time
>>
>> class test:
>>
>>
>> spark = SparkSession.builder \
>> .appName("DirectKafka_Spark_Stream_Stream_Join") \
>> .getOrCreate()
>> # ssc = StreamingContext(spark, 20)
>>
>> table1_stream = 
>> (spark.readStream.format("kafka").option("startingOffsets",
>> "earliest").option("kafka.bootstrap.servers",
>> "localhost:9092").option("subscribe", "test1").load())
>>
>> table2_stream = (
>> spark.readStream.format("kafka").option("startingOffsets",
>> "earliest").option("kafka.bootstrap.servers",
>>
>> "localhost:9092").option("subscribe",
>>
>>  "test2").load())
>>
>> joined_Stream = table1_stream.join(table2_stream, "Id")
>> #
>> # joined_Stream.show()
>>
>> # query =
>> table1_stream.writeStream.format("console").start().awaitTermination()
>> # .queryName("table_A").format("memory")
>> # spark.sql("select * from table_A").show()
>> time.sleep(10)  # sleep 20 seconds
>> # query.stop()
>> # query
>>
>>
>> # 

Re: Sparklyr and idle executors

2018-03-16 Thread Femi Anthony
I assume you're setting these values in spark-defaults.conf. What happens
if you specify them directly to spark-submit  as in --conf
spark.dynamicAllocation.enabled=true
?

On Thu, Mar 15, 2018 at 1:47 PM, Florian Dewes  wrote:

> Hi all,
>
> I am currently trying to enable dynamic resource allocation for a little
> yarn managed spark cluster.
> We are using sparklyr to access spark from R and have multiple jobs which
> should run in parallel, because some of them take several days to complete
> or are in development.
>
> Everything works out so far, the only problem we have is that executors
> are not removed from idle jobs.
>
> Lets say job A is the only running job that loads a file that is several
> hundred GB in size and then goes idle without disconnecting from spark. It
> gets 80% of the cluster because I set a maximum value via
> spark.dynamicAllocation.maxExecutors.
>
> When we start another job (B) with the remaining 20% of the cluster
> resources, no idle executors of the other job are freed and the idle job
> will keep 80% of the cluster's resources, although 
> spark.dynamicAllocation.executorIdleTimeout
> is set.
>
> Only if we disconnect job A, B will allocate the freed executors.
>
> Configuration settings used:
>
> spark.shuffle.service.enabled = "true"
> spark.dynamicAllocation.enabled = “true"
> spark.dynamicAllocation.executorIdleTimeout = 120
> spark.dynamicAllocation.maxExecutors = 100
>
> with
>
> Spark 2.1.0
> R 3.4.3
> sparklyr 0.6.3
>
>
> Any ideas?
>
>
> Thanks,
>
> Florian
>
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


-- 
http://www.femibyte.com/twiki5/bin/view/Tech/
http://www.nextmatrix.com
"Great spirits have always encountered violent opposition from mediocre
minds." - Albert Einstein.