Writing the contents of spark dataframe to Kafka with Spark 2.2

Hi all,
I am unable to write the contents of spark dataframe to Kafka.
I am using Spark 2.2

This is my code

val df = Seq(("1","One"),("2","two")).toDF("key","value")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .option("kafka.bootstrap.servers", "")
  .option("topic", "testtopic")

and I am getting the following error message
[Stage 0:>  (0 + 2)
/ 2]Exception in thread "main" org.apache.spark.SparkException: Job aborted
due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent
failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver):
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.lang.Thread.run(Thread.java:745)

I have added this dependency


Appreciate any help. Thanks.

Spark - Hadoop custom filesystem service loading

Hi everyone,

On spark 2.2.0, if you wanted to create a custom file system
implementation, you just created an extension of
org.apache.hadoop.fs.FileSystem and put the canonical name of the custom
class on the file

Once you imported that jar dependency on your spark submit application, the
custom schema was automatically loaded, and you could start to use it just
like ds.load("customfs://path").

But on spark 2.4.0 that does not seem to work the same. If you do exactly
the same you will get an error like "No FileSystem for customfs".

The only way I achieved this on 2.4.0, was specifying the spark property

Do you guys consider this as a bug? or is it an intentional change that
should be documented on somewhere?

Btw, digging a little bit on this, it seems that the cause is that now the
FileSystem is initialized before the actual dependencies are downloaded
from Maven repo (see here
And as that initialization loads the available filesystems at that point
and only once, the filesystems in the jars downloaded are not taken in


Expecting 'type' to be present

Hi everyone, 

does anyone know what does it mean the error: Expecting 'type' to be present 
when using spark on mesos ? 

./bin/spark-submit  --class org.apache.spark.examples.SparkPi   --master 
mesos://host:5050/api --deploy-mode cluster   --conf 
spark.master.rest.enabled=true  --total-executor-cores 4 --jars 
2019-03-18 20:39:31 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
2019-03-18 20:39:31 INFO  RestSubmissionClient:54 - Submitting a request to 
launch an application in mesos://host:5050/api.
2019-03-18 20:39:32 ERROR RestSubmissionClient:70 - Server responded with error:
Some(Failed to validate master::Call: Expecting 'type' to be present)
2019-03-18 20:39:32 ERROR RestSubmissionClient:70 - Error: Server responded 
with message of unexpected type ErrorResponse.
2019-03-18 20:39:32 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-03-18 20:39:32 INFO  ShutdownHookManager:54 - Deleting directory 


Re: Structured Streaming & Query Planning

I don't think its feasible with the current logic. Typically the query
planning time should be a tiny fraction unless you are processing tiny
micro-batches more frequently. You might want to consider adjusting the
trigger interval to processes more data per micro-batch and see if it
helps. The tiny micro-batch use cases should ideally be solved using
continuous mode (once it matures) which would not have this overhead.


> Almost everything is coupled with logical plan right now, including
> updated range for source in new batch, updated watermark for stateful
> operations, random seed in each batch. Please refer below codes:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
> We might try out replacing these things in physical plan so that logical
> plan doesn't need to be evaluated, but not sure it's feasible.
>> I can understand that if you involve columns with variable distribution
>> in join operations, it may change your execution plan, but most of the time
>> this is not going to happen, in streaming the most used operations are: map
>> filter, grouping and stateful operations and in all these cases I can't how
>> a dynamic query planning could help.
>> It could be useful to have a parameter to force a streaming query to
>> calculate the query plan just once.
>> Paolo
>> Hello Paolo,
>> generally speaking, query planning is mostly based on statistics and
>> distributions of data values for the involved columns, which might
>> significantly change over time in a streaming context, so for me it makes a
>> lot of sense that it is run at every schedule, even though I understand
>> your concern.
>> For the second question I don't know how to (or if you even can) cache
>> the computed query plan.
>> If possible, would you mind sharing your findings afterwards? (query
>> planning on streaming it's a very interesting and not yet enough explored
>> topic IMO)
>>> Hi All,
>>> I would like to understand why in a streaming query ( that should not be
>>> able to change its behaviour along iterations ) there is a
>>> queryPlanning-Duration effort ( in my case is 33% of trigger interval ) at
>>> every schedule. I don’t uderstand  why this is needed and if it is possible
>>> to disable or cache it.
>>> Thanks
java.lang.NoSuchFieldError: HIVE_STATS_JDBC_TIMEOUT on EMR

I know the JIRA of this error 
(https://issues.apache.org/jira/browse/SPARK-18112), and I read all the 
comments and even PR for it.

But I am facing this issue on AWS EMR, and only in Oozie Spark Action. I am 
looking for someone can give me a hint or direction,  so I can see if I can 
overcome this issue on EMR.

I am testing a simple Spark application on EMR-5.12.2, which comes with Hadoop 
2.8.3 + HCatalog 2.3.2 + Spark 2.2.1, and using AWS Glue Data Catalog for both 
Hive + Spark table metadata.

First of all, both Hive and Spark work fine with AWS Glue as metadata catalog. 
And my spark application works in spark-submit.

[hadoop@ip-172-31-65-232 oozieJobs]$ spark-shell
Welcome to
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_171)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("show databases").show
|   databaseName|
|   sampledb|

I can access and query the database I created in Glue without any issue on 
spark-shell or spark-sql.
And as part of later problem, I can see when it works in this case, there is no 
set of "spark.sql.hive.metastore.version" in spark-shell, as the default value 
is shown below:

scala> spark.conf.get("spark.sql.hive.metastore.version")
res2: String = 1.2.1

Even though it shows version as "1.2.1", but I knew that by using Glue the hive 
metastore version will be "2.3.2", I can see "hive-metastore-2.3.2-amzn-1.jar" 
in the Hive library path.

Now here comes the issue, when I test the Spark code in the Oozie Spark action, 
and "enableHiveSupport" on the Spark session, it works with spark-submit in the 
command line, but failed with the following error in the oozie runtime:

ailing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], 
main() threw exception, HIVE_STATS_JDBC_TIMEOUT
java.lang.NoSuchFieldError: HIVE_STATS_JDBC_TIMEOUT

I know this most likely caused by the Oozie runtime classpath, but I spent days 
of trying and still cannot find out a solution. We use Spark as our core of ETL 
engine, and the ability to manage and query the HiveCatalog is critical for us.

Here are what puzzled me:

  *   I know this issue was supposed fixing in Spark 2.2.0, and on this ERM, we 
are using Spark 2.2.1
  *   There is 1.2.1 version of hive metastore jar under the spark jars on EMR. 
Does this mean in the successful spark-shell runtime, spark indeed is using 
1.2.1 version of hive-metastore?

[hadoop@ip-172-31-65-232 oozieJobs]$ ls /usr/lib/spark/jars/*hive-meta*

  *   There is 2.3.2 version of hive metastore jar under the Hive component on 
this EMR, which I believe it pointing to the Glue, right?

[hadoop@ip-172-31-65-232 oozieJobs]$ ls /usr/lib/hive/lib/*hive-meta*

  *   I specified the "oozie.action.sharelib.for.spark=spark,hive" in the 
oozie, and I can see oozie runtime loads the jars from both spark and hive 
share libs. There is NO hive-metastore-1.2.1-spark2-amzn-0.jar in the oozie 
SPARK sharelib, and there is indeed hive-metastore-2.3.2-amzn-1.jar in the 
oozie HIVE sharelib.
  *   Based on my understanding of 
(https://issues.apache.org/jira/browse/SPARK-18112), here are what I did so far 
trying to fix this in oozie runtime, but none of them works
 *   I added hive-metastore-1.2.1-spark2-amzn-0.jar into hdfs of ozzie 
spark share lib, and run "oozie admin -sharelibupdate".  After that, I confirm 
this library loaded in the oozie runtime log of my spark action, but I got the 
same error message.
 *   I added "--conf spark.sql.hive.metastore.version=2.3.2" in the 
 of my oozie spark action, and confirm this configuration in spark 
session, but I still got the same error message above.
 *   I added "--conf spark.sql.hive.metastore.version=2.3.2 --conf 
spark.sql.hive.metastore.jars=maven", but still got the same error message
 *   I added 

Re: Spark does not load all classes in fat jar

Hi Jorn, thank you for your response.

I'm sorry I didn't mention it in the previous mail, the class name is
'sherlogic_mon_interface', a class of our project, which is intended to be
instantiated dinamically (Class.forName(" sherlogic_mon_interface")) within
an executor. So, it's not within the Spark core library.

To give a bit more context, we've got 3 modules, let's call them: core, ext
and util. The missing classes are *some* (and that's really weird, not all
of them) of the classes defined in the two modules ext and util.

The weird thing is that we're specifically building the uberjar with all
those dependencies, just like you said, because we thought it was related
to an incorrect resolution of the additional jars, so we added those
modules as dependencies explicitly (checked again with jar tf, classes are
present in the jar). And still, the same issue of ClassNotFoundException.

Feels like I'm missing something.


> Fat jar with shading as the application not as an additional jar package
> Am 18.03.2019 um 14:08 schrieb Jörn Franke :
> Maybe that class is already loaded as part of a core library of Spark?
> Do you have concrete class names?
> In doubt create a fat jar and shade the dependencies in question
> Am 18.03.2019 um 12:34 schrieb Federico D'Ambrosio :
> Hello everyone,
> We're having a serious issue, where we get ClassNotFoundException because,
> apparently the class is not found within the classpath of Spark, in both
> the Driver and Executors.
> First, I checked whether the class was actually within the jar with jar tf,
> and there actually is. Then, I activated the following options to see
> which classes are actually loaded:
> --conf 'spark.driver.extraJavaOptions=-verbose:class' --conf
> 'spark.executor.extraJavaOptions=-verbose:class'
> and I can see from the YARN stdout logs that some classes, just like the
> one throwing the exception, are not actually being loaded while other
> classes are.
> I tried, then, using --jars to pass the jar containing the missing
> classes, and also using addJar() from the spark context, to no avail.
> This looks like an issue with Spark class loader.
> Any idea about what's happenig here? I'm using Spark
> (HDP 3.0).
> Thank you for your help,
> Federico

Spark Metrics : Job Remains In "Running" State

Hi Team,

We are executing spark submit job by enabling the metrics(spark 2.4 on 
kubernetes) on the user defined port(say 45010). We have observed that the job 
is not going into "Completed" state even after it's 
The pods for this spark submit job remain in "Running" state.  I am able to 
collect the metrics for both driver and executor/s on the defined port by using 

Below is the content of metrics.properties:


# Enable JVM metrics source for all instances by class name

Spark Submit Job:
export HADOOP_CONF_DIR=;sudo -E ./spark-submit --verbose 
--deploy-mode cluster --master  --conf spark.app.name= 
--conf spark.executor.instances=2 

Please let me know if it is the expected behavior ?

Re: Spark does not load all classes in fat jar

Fat jar with shading as the application not as an additional jar package 

> Maybe that class is already loaded as part of a core library of Spark?
> Do you have concrete class names?
> In doubt create a fat jar and shade the dependencies in question
>> Am 18.03.2019 um 12:34 schrieb Federico D'Ambrosio :
>> Hello everyone,
>> We're having a serious issue, where we get ClassNotFoundException because, 
>> apparently the class is not found within the classpath of Spark, in both the 
>> Driver and Executors.
>> First, I checked whether the class was actually within the jar with jar tf, 
>> and there actually is. Then, I activated the following options to see which 
>> classes are actually loaded:
>> --conf 'spark.driver.extraJavaOptions=-verbose:class' --conf 
>> 'spark.executor.extraJavaOptions=-verbose:class' 
>> and I can see from the YARN stdout logs that some classes, just like the one 
>> throwing the exception, are not actually being loaded while other classes 
>> are.
>> I tried, then, using --jars to pass the jar containing the missing classes, 
>> and also using addJar() from the spark context, to no avail.
>> This looks like an issue with Spark class loader.
>> Any idea about what's happenig here? I'm using Spark (HDP 
>> 3.0). 
>> Thank you for your help,
>> Federico

Re: Spark does not load all classes in fat jar

Maybe that class is already loaded as part of a core library of Spark?

Do you have concrete class names?

In doubt create a fat jar and shade the dependencies in question

> Am 18.03.2019 um 12:34 schrieb Federico D'Ambrosio :
> Hello everyone,
> We're having a serious issue, where we get ClassNotFoundException because, 
> apparently the class is not found within the classpath of Spark, in both the 
> Driver and Executors.
> First, I checked whether the class was actually within the jar with jar tf, 
> and there actually is. Then, I activated the following options to see which 
> classes are actually loaded:
> --conf 'spark.driver.extraJavaOptions=-verbose:class' --conf 
> 'spark.executor.extraJavaOptions=-verbose:class' 
> and I can see from the YARN stdout logs that some classes, just like the one 
> throwing the exception, are not actually being loaded while other classes are.
> I tried, then, using --jars to pass the jar containing the missing classes, 
> and also using addJar() from the spark context, to no avail.
> This looks like an issue with Spark class loader.
> Any idea about what's happenig here? I'm using Spark (HDP 
> 3.0). 
> Thank you for your help,
> Federico

Re: what is the difference between udf execution and map(someLambda)?

Map and flatmap are RDD operations, a UDF is a dataframe operation.  The
big difference from a performance perspective is in the query optimizer.  A
udf defines the set of input fields it needs and the set of output fields
it will produce, map operates on the entire row at a time.  This means the
optimizer can move operations around, and potentially drop columns earlier
to try and make the overall processing more efficient.  A map operation
requires the entire row as input so the optimizer cannot do anything to it,
and does not know what the output is going to look like unless you
explicitly tell it.  But in reality, udfs are compiled down to map
operations on an RDD with some glue code to get the columns in the correct
place, so there should be little performance difference if you can manually
build a query that is similar to what the catalyst optimizer would have

> Hi All,
> I am wondering what is the difference between UDF execution and
> map(someLambda)? you can assume someLambda ~= UDF. Any performance
> difference?
> Thanks!

Spark does not load all classes in fat jar

Hello everyone,

We're having a serious issue, where we get ClassNotFoundException because,
apparently the class is not found within the classpath of Spark, in both
the Driver and Executors.

First, I checked whether the class was actually within the jar with jar tf,
and there actually is. Then, I activated the following options to see which
classes are actually loaded:

--conf 'spark.driver.extraJavaOptions=-verbose:class' --conf

and I can see from the YARN stdout logs that some classes, just like the
one throwing the exception, are not actually being loaded while other
classes are.
I tried, then, using --jars to pass the jar containing the missing classes,
and also using addJar() from the spark context, to no avail.

This looks like an issue with Spark class loader.

Any idea about what's happenig here? I'm using Spark
(HDP 3.0).

Thank you for your help,

Reuse broadcasted data frame in multiple query

Hi there,
I have a question regarding reuse broadcast data frame in multiple queries, 
not quite sure whether it is possible or not and hope someone can shed some 
light here.. Pseudo code as below:

val df_schedule = spark.sql(“select …. from A”)
val df_schedule_broadcast = broadcast(df_schedule)
// query1
res_df1 = spark.sql(“select …. from B, tbl_schedule S where B.col1 = 

// query2
res_df2 = spark.sql(“select …. from C, tbl_schedule S where C.col2 = 

   From SparkUI, I can see 2 jobs ThreadPoolExecutor that having same DAG, cost 
similar time to complete, seems like the table df_schedule_broadcast 
broadcasted twice,  so my question is is there anyway I can avoid broadcasting 
again? Just reuse the same data in two queries

Many thanks,


Re: Structured Streaming & Query Planning

Almost everything is coupled with logical plan right now, including updated
range for source in new batch, updated watermark for stateful operations,
random seed in each batch. Please refer below codes:



We might try out replacing these things in physical plan so that logical
plan doesn't need to be evaluated, but not sure it's feasible.

> I can understand that if you involve columns with variable distribution in
> join operations, it may change your execution plan, but most of the time
> this is not going to happen, in streaming the most used operations are: map
> filter, grouping and stateful operations and in all these cases I can't how
> a dynamic query planning could help.
> It could be useful to have a parameter to force a streaming query to
> calculate the query plan just once.
> Paolo
> Hello Paolo,
> generally speaking, query planning is mostly based on statistics and
> distributions of data values for the involved columns, which might
> significantly change over time in a streaming context, so for me it makes a
> lot of sense that it is run at every schedule, even though I understand
> your concern.
> For the second question I don't know how to (or if you even can) cache the
> computed query plan.
> If possible, would you mind sharing your findings afterwards? (query
> planning on streaming it's a very interesting and not yet enough explored
> topic IMO)
> Best regards,
> Alessandro
> On Thu, 14 Mar 2019 at 16:51, Paolo Platter 
> wrote:
>> Hi All,
>> I would like to understand why in a streaming query ( that should not be
>> able to change its behaviour along iterations ) there is a
>> queryPlanning-Duration effort ( in my case is 33% of trigger interval ) at
>> every schedule. I don’t uderstand  why this is needed and if it is possible
>> to disable or cache it.
>> Thanks
>> [image: cid:image001.jpg@01D41D15.E01B6F00]
>> *Paolo Platter*
>> *CTO*
>> E-mail:paolo.plat...@agilelab.it
>> Web Site:   www.agilelab.it

Re: Structured Streaming & Query Planning

I can understand that if you involve columns with variable distribution in join 
operations, it may change your execution plan, but most of the time this is not 
going to happen, in streaming the most used operations are: map filter, 
grouping and stateful operations and in all these cases I can't how a dynamic 
query planning could help.

It could be useful to have a parameter to force a streaming query to calculate 
the query plan just once.


Hello Paolo,
generally speaking, query planning is mostly based on statistics and 
distributions of data values for the involved columns, which might 
significantly change over time in a streaming context, so for me it makes a lot 
of sense that it is run at every schedule, even though I understand your 

For the second question I don't know how to (or if you even can) cache the 
computed query plan.

If possible, would you mind sharing your findings afterwards? (query planning 
on streaming it's a very interesting and not yet enough explored topic IMO)

Best regards,

On Thu, 14 Mar 2019 at 16:51, Paolo Platter 
mailto:paolo.plat...@agilelab.it>> wrote:
Hi All,

I would like to understand why in a streaming query ( that should not be able 
to change its behaviour along iterations ) there is a queryPlanning-Duration 
effort ( in my case is 33% of trigger interval ) at every schedule. I don’t 
uderstand  why this is needed and if it is possible to disable or cache it.



Spark on Mesos broken on 2.4 ?

Hello Everyone, 

I’m just trying out the spark-shell on mesos and I don’t get any executors. To 
debug it I started the vagrant box from aurora and try it out there and I can 
the same issue as I’m getting on my cluster. 
On Mesos the only active framework is the spark-shel, it is running 1.6.1 and 
has 4 cores.  Does someone else have the same issue ?

vagrant@aurora:~/spark-2.4.0-bin-hadoop2.7$ ./bin/spark-shell --master 
mesos:// --total-executor-cores 3
2019-03-18 06:43:30 WARN  Utils:66 - Your hostname, aurora resolves to a 
loopback address:; using instead (on interface eth0)
2019-03-18 06:43:30 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to 
another address
2019-03-18 06:43:31 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
I0318 06:43:40.738236  5192 sched.cpp:232] Version: 1.6.1
I0318 06:43:40.743121  5188 sched.cpp:336] New master detected at 
I0318 06:43:40.744252  5188 sched.cpp:351] No credentials provided. Attempting 
to register without authentication
I0318 06:43:40.748190  5188 sched.cpp:749] Framework registered with 
Spark context Web UI available at
Spark context available as 'sc' (master = mesos://, app id = 
Spark session available as 'spark'.
Welcome to
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

scala> textFile.count()
[Stage 0:>  (0 + 0) / 
1]2019-03-18 06:44:48 WARN  TaskSchedulerImpl:66 - Initial job has not accepted 
any resources; check your cluster UI to ensure that workers are registered and 
have sufficient resources
2019-03-18 06:45:03 WARN  TaskSchedulerImpl:66 - Initial job has not accepted 
any resources; check your cluster UI to ensure that workers are registered and 
have sufficient resources
2019-03-18 06:45:18 WARN  TaskSchedulerImpl:66 - Initial job has not accepted 
any resources; check your cluster UI to ensure that workers are registered and 
have sufficient resources
2019-03-18 06:45:33 WARN  TaskSchedulerImpl:66 - Initial job has not accepted 
any resources; check your cluster UI to ensure that workers are registered and 
have sufficient resources
[Stage 0:>  (0 + 0) / 
1]2019-03-18 06:45:46 WARN  Signaling:66 - Cancelling all active jobs, this can 
take a while. Press Ctrl+C again to exit now.

