Writing the contents of spark dataframe to Kafka with Spark 2.2

2019-03-18 Thread anna stax
Hi all,
I am unable to write the contents of spark dataframe to Kafka.
I am using Spark 2.2

This is my code

val df = Seq(("1","One"),("2","two")).toDF("key","value")
df.printSchema()
df.show(false)
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("topic", "testtopic")
  .save()

and I am getting the following error message
[Stage 0:>  (0 + 2)
/ 2]Exception in thread "main" org.apache.spark.SparkException: Job aborted
due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent
failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver):
java.lang.NoSuchMethodError:
org.apache.spark.sql.catalyst.expressions.Cast$.apply$default$3()Lscala/Option;
at
org.apache.spark.sql.kafka010.KafkaWriteTask.createProjection(KafkaWriteTask.scala:112)
at
org.apache.spark.sql.kafka010.KafkaWriteTask.(KafkaWriteTask.scala:39)
at
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:90)
at
org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1.apply(KafkaWriter.scala:89)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:926)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I have added this dependency


  org.apache.spark
  spark-sql_2.11
  2.2.2


Appreciate any help. Thanks.
https://stackoverflow.com/questions/55229945/writing-the-contents-of-spark-dataframe-to-kafka-with-spark-2-2


Spark - Hadoop custom filesystem service loading

2019-03-18 Thread Jhon Anderson Cardenas Diaz
Hi everyone,

On spark 2.2.0, if you wanted to create a custom file system
implementation, you just created an extension of
org.apache.hadoop.fs.FileSystem and put the canonical name of the custom
class on the file
src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem.

Once you imported that jar dependency on your spark submit application, the
custom schema was automatically loaded, and you could start to use it just
like ds.load("customfs://path").

But on spark 2.4.0 that does not seem to work the same. If you do exactly
the same you will get an error like "No FileSystem for customfs".

The only way I achieved this on 2.4.0, was specifying the spark property
spark.hadoop.fs.customfs.impl.

Do you guys consider this as a bug? or is it an intentional change that
should be documented on somewhere?

Btw, digging a little bit on this, it seems that the cause is that now the
FileSystem is initialized before the actual dependencies are downloaded
from Maven repo (see here
).
And as that initialization loads the available filesystems at that point
and only once, the filesystems in the jars downloaded are not taken in
account.

Thanks.


Expecting 'type' to be present

2019-03-18 Thread Jorge Machado
Hi everyone, 

does anyone know what does it mean the error: Expecting 'type' to be present 
when using spark on mesos ? 

./bin/spark-submit  --class org.apache.spark.examples.SparkPi   --master 
mesos://host:5050/api --deploy-mode cluster   --conf 
spark.master.rest.enabled=true  --total-executor-cores 4 --jars 
/home/machjor/spark-2.4.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.0.jar
  
/home/machjor/spark-2.4.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.4.0.jar
 10
2019-03-18 20:39:31 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
2019-03-18 20:39:31 INFO  RestSubmissionClient:54 - Submitting a request to 
launch an application in mesos://host:5050/api.
2019-03-18 20:39:32 ERROR RestSubmissionClient:70 - Server responded with error:
Some(Failed to validate master::Call: Expecting 'type' to be present)
2019-03-18 20:39:32 ERROR RestSubmissionClient:70 - Error: Server responded 
with message of unexpected type ErrorResponse.
2019-03-18 20:39:32 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-03-18 20:39:32 INFO  ShutdownHookManager:54 - Deleting directory 
/tmp/spark-259c0e66-c2ab-43b4-90df-0a9d442e5c54


Thanks
Jorge 

Re: Structured Streaming & Query Planning

2019-03-18 Thread Arun Mahadevan
I don't think its feasible with the current logic. Typically the query
planning time should be a tiny fraction unless you are processing tiny
micro-batches more frequently. You might want to consider adjusting the
trigger interval to processes more data per micro-batch and see if it
helps. The tiny micro-batch use cases should ideally be solved using
continuous mode (once it matures) which would not have this overhead.

Thanks,
Arun

On Mon, 18 Mar 2019 at 00:39, Jungtaek Lim  wrote:

> Almost everything is coupled with logical plan right now, including
> updated range for source in new batch, updated watermark for stateful
> operations, random seed in each batch. Please refer below codes:
>
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
>
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala
>
> We might try out replacing these things in physical plan so that logical
> plan doesn't need to be evaluated, but not sure it's feasible.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2019년 3월 18일 (월) 오후 4:03, Paolo Platter 님이 작성:
>
>> I can understand that if you involve columns with variable distribution
>> in join operations, it may change your execution plan, but most of the time
>> this is not going to happen, in streaming the most used operations are: map
>> filter, grouping and stateful operations and in all these cases I can't how
>> a dynamic query planning could help.
>>
>> It could be useful to have a parameter to force a streaming query to
>> calculate the query plan just once.
>>
>> Paolo
>>
>>
>>
>> Ottieni Outlook per Android 
>>
>> --
>> *From:* Alessandro Solimando 
>> *Sent:* Thursday, March 14, 2019 6:59:50 PM
>> *To:* Paolo Platter
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Structured Streaming & Query Planning
>>
>> Hello Paolo,
>> generally speaking, query planning is mostly based on statistics and
>> distributions of data values for the involved columns, which might
>> significantly change over time in a streaming context, so for me it makes a
>> lot of sense that it is run at every schedule, even though I understand
>> your concern.
>>
>> For the second question I don't know how to (or if you even can) cache
>> the computed query plan.
>>
>> If possible, would you mind sharing your findings afterwards? (query
>> planning on streaming it's a very interesting and not yet enough explored
>> topic IMO)
>>
>> Best regards,
>> Alessandro
>>
>> On Thu, 14 Mar 2019 at 16:51, Paolo Platter 
>> wrote:
>>
>>> Hi All,
>>>
>>>
>>>
>>> I would like to understand why in a streaming query ( that should not be
>>> able to change its behaviour along iterations ) there is a
>>> queryPlanning-Duration effort ( in my case is 33% of trigger interval ) at
>>> every schedule. I don’t uderstand  why this is needed and if it is possible
>>> to disable or cache it.
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>>
>>>
>>> [image: cid:image001.jpg@01D41D15.E01B6F00]
>>>
>>> *Paolo Platter*
>>>
>>> *CTO*
>>>
>>> E-mail:paolo.plat...@agilelab.it
>>>
>>> Web Site:   www.agilelab.it
>>>
>>>
>>>
>>>
>>>
>>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


java.lang.NoSuchFieldError: HIVE_STATS_JDBC_TIMEOUT on EMR

2019-03-18 Thread Daniel Zhang
Hi,

I know the JIRA of this error 
(https://issues.apache.org/jira/browse/SPARK-18112), and I read all the 
comments and even PR for it.

But I am facing this issue on AWS EMR, and only in Oozie Spark Action. I am 
looking for someone can give me a hint or direction,  so I can see if I can 
overcome this issue on EMR.

I am testing a simple Spark application on EMR-5.12.2, which comes with Hadoop 
2.8.3 + HCatalog 2.3.2 + Spark 2.2.1, and using AWS Glue Data Catalog for both 
Hive + Spark table metadata.

First of all, both Hive and Spark work fine with AWS Glue as metadata catalog. 
And my spark application works in spark-submit.

[hadoop@ip-172-31-65-232 oozieJobs]$ spark-shell
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.2.1
  /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_171)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("show databases").show
+---+
|   databaseName|
+---+
|default|
|googleanalytics|
|   sampledb|
+---+


I can access and query the database I created in Glue without any issue on 
spark-shell or spark-sql.
And as part of later problem, I can see when it works in this case, there is no 
set of "spark.sql.hive.metastore.version" in spark-shell, as the default value 
is shown below:

scala> spark.conf.get("spark.sql.hive.metastore.version")
res2: String = 1.2.1


Even though it shows version as "1.2.1", but I knew that by using Glue the hive 
metastore version will be "2.3.2", I can see "hive-metastore-2.3.2-amzn-1.jar" 
in the Hive library path.

Now here comes the issue, when I test the Spark code in the Oozie Spark action, 
and "enableHiveSupport" on the Spark session, it works with spark-submit in the 
command line, but failed with the following error in the oozie runtime:

ailing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SparkMain], 
main() threw exception, HIVE_STATS_JDBC_TIMEOUT
java.lang.NoSuchFieldError: HIVE_STATS_JDBC_TIMEOUT
at 
org.apache.spark.sql.hive.HiveUtils$.hiveClientConfigurations(HiveUtils.scala:200)
at 
org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:265)
at 
org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:66)
at 
org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:65)
at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:195)
at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)
at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:195)


I know this most likely caused by the Oozie runtime classpath, but I spent days 
of trying and still cannot find out a solution. We use Spark as our core of ETL 
engine, and the ability to manage and query the HiveCatalog is critical for us.

Here are what puzzled me:

  *   I know this issue was supposed fixing in Spark 2.2.0, and on this ERM, we 
are using Spark 2.2.1
  *   There is 1.2.1 version of hive metastore jar under the spark jars on EMR. 
Does this mean in the successful spark-shell runtime, spark indeed is using 
1.2.1 version of hive-metastore?

[hadoop@ip-172-31-65-232 oozieJobs]$ ls /usr/lib/spark/jars/*hive-meta*
/usr/lib/spark/jars/hive-metastore-1.2.1-spark2-amzn-0.jar

  *   There is 2.3.2 version of hive metastore jar under the Hive component on 
this EMR, which I believe it pointing to the Glue, right?

[hadoop@ip-172-31-65-232 oozieJobs]$ ls /usr/lib/hive/lib/*hive-meta*
/usr/lib/hive/lib/hive-metastore-2.3.2-amzn-1.jar  
/usr/lib/hive/lib/hive-metastore.jar

  *   I specified the "oozie.action.sharelib.for.spark=spark,hive" in the 
oozie, and I can see oozie runtime loads the jars from both spark and hive 
share libs. There is NO hive-metastore-1.2.1-spark2-amzn-0.jar in the oozie 
SPARK sharelib, and there is indeed hive-metastore-2.3.2-amzn-1.jar in the 
oozie HIVE sharelib.
  *   Based on my understanding of 
(https://issues.apache.org/jira/browse/SPARK-18112), here are what I did so far 
trying to fix this in oozie runtime, but none of them works
 *   I added hive-metastore-1.2.1-spark2-amzn-0.jar into hdfs of ozzie 
spark share lib, and run "oozie admin -sharelibupdate".  After that, I confirm 
this library loaded in the oozie runtime log of my spark action, but I got the 
same error message.
 *   I added "--conf spark.sql.hive.metastore.version=2.3.2" in the 
 of my oozie spark action, and confirm this configuration in spark 
session, but I still got the same error message above.
 *   I added "--conf spark.sql.hive.metastore.version=2.3.2 --conf 
spark.sql.hive.metastore.jars=maven", but still got the same error message
 *   I added 

Re: Spark does not load all classes in fat jar

2019-03-18 Thread Federico D'Ambrosio
Hi Jorn, thank you for your response.

I'm sorry I didn't mention it in the previous mail, the class name is
'sherlogic_mon_interface', a class of our project, which is intended to be
instantiated dinamically (Class.forName(" sherlogic_mon_interface")) within
an executor. So, it's not within the Spark core library.

To give a bit more context, we've got 3 modules, let's call them: core, ext
and util. The missing classes are *some* (and that's really weird, not all
of them) of the classes defined in the two modules ext and util.

The weird thing is that we're specifically building the uberjar with all
those dependencies, just like you said, because we thought it was related
to an incorrect resolution of the additional jars, so we added those
modules as dependencies explicitly (checked again with jar tf, classes are
present in the jar). And still, the same issue of ClassNotFoundException.

Feels like I'm missing something.

Thanks,
Federico


Il giorno lun 18 mar 2019 alle ore 14:09 Jörn Franke 
ha scritto:

> Fat jar with shading as the application not as an additional jar package
>
> Am 18.03.2019 um 14:08 schrieb Jörn Franke :
>
> Maybe that class is already loaded as part of a core library of Spark?
>
> Do you have concrete class names?
>
> In doubt create a fat jar and shade the dependencies in question
>
> Am 18.03.2019 um 12:34 schrieb Federico D'Ambrosio :
>
> Hello everyone,
>
> We're having a serious issue, where we get ClassNotFoundException because,
> apparently the class is not found within the classpath of Spark, in both
> the Driver and Executors.
>
> First, I checked whether the class was actually within the jar with jar tf,
> and there actually is. Then, I activated the following options to see
> which classes are actually loaded:
>
> --conf 'spark.driver.extraJavaOptions=-verbose:class' --conf
> 'spark.executor.extraJavaOptions=-verbose:class'
>
> and I can see from the YARN stdout logs that some classes, just like the
> one throwing the exception, are not actually being loaded while other
> classes are.
> I tried, then, using --jars to pass the jar containing the missing
> classes, and also using addJar() from the spark context, to no avail.
>
> This looks like an issue with Spark class loader.
>
> Any idea about what's happenig here? I'm using Spark 2.3.1.3.0.0.0-1634
> (HDP 3.0).
>
> Thank you for your help,
> Federico
>
>

-- 
Federico D'Ambrosio


Spark Metrics : Job Remains In "Running" State

2019-03-18 Thread Jain, Abhishek 3. (Nokia - IN/Bangalore)
Hi Team,

We are executing spark submit job by enabling the metrics(spark 2.4 on 
kubernetes) on the user defined port(say 45010). We have observed that the job 
is not going into "Completed" state even after it's 
completion(.stop()).
The pods for this spark submit job remain in "Running" state.  I am able to 
collect the metrics for both driver and executor/s on the defined port by using 
curl.

Below is the content of metrics.properties:
executor.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
executor.sink.csv.period=1
executor.sink.csv.directory=/tmp/
executor.sink.csv.unit=seconds
driver.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
driver.sink.csv.directory=/tmp/

*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
driver.sink.jmx.period=1
driver.sink.jmx.unit=seconds

# Enable JVM metrics source for all instances by class name
driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource


Spark Submit Job:
export HADOOP_CONF_DIR=;sudo -E ./spark-submit --verbose 
--deploy-mode cluster --master  --conf spark.app.name= 
--conf spark.executor.instances=2 

Please let me know if it is the expected behavior ?

Regards,
Abhishek Jain


Re: Spark does not load all classes in fat jar

2019-03-18 Thread Jörn Franke
Fat jar with shading as the application not as an additional jar package 

> Am 18.03.2019 um 14:08 schrieb Jörn Franke :
> 
> Maybe that class is already loaded as part of a core library of Spark?
> 
> Do you have concrete class names?
> 
> In doubt create a fat jar and shade the dependencies in question
> 
>> Am 18.03.2019 um 12:34 schrieb Federico D'Ambrosio :
>> 
>> Hello everyone,
>> 
>> We're having a serious issue, where we get ClassNotFoundException because, 
>> apparently the class is not found within the classpath of Spark, in both the 
>> Driver and Executors.
>> 
>> First, I checked whether the class was actually within the jar with jar tf, 
>> and there actually is. Then, I activated the following options to see which 
>> classes are actually loaded:
>> 
>> --conf 'spark.driver.extraJavaOptions=-verbose:class' --conf 
>> 'spark.executor.extraJavaOptions=-verbose:class' 
>> 
>> and I can see from the YARN stdout logs that some classes, just like the one 
>> throwing the exception, are not actually being loaded while other classes 
>> are.
>> I tried, then, using --jars to pass the jar containing the missing classes, 
>> and also using addJar() from the spark context, to no avail.
>> 
>> This looks like an issue with Spark class loader.
>> 
>> Any idea about what's happenig here? I'm using Spark 2.3.1.3.0.0.0-1634 (HDP 
>> 3.0). 
>> 
>> Thank you for your help,
>> Federico


Re: Spark does not load all classes in fat jar

2019-03-18 Thread Jörn Franke
Maybe that class is already loaded as part of a core library of Spark?

Do you have concrete class names?

In doubt create a fat jar and shade the dependencies in question

> Am 18.03.2019 um 12:34 schrieb Federico D'Ambrosio :
> 
> Hello everyone,
> 
> We're having a serious issue, where we get ClassNotFoundException because, 
> apparently the class is not found within the classpath of Spark, in both the 
> Driver and Executors.
> 
> First, I checked whether the class was actually within the jar with jar tf, 
> and there actually is. Then, I activated the following options to see which 
> classes are actually loaded:
> 
> --conf 'spark.driver.extraJavaOptions=-verbose:class' --conf 
> 'spark.executor.extraJavaOptions=-verbose:class' 
> 
> and I can see from the YARN stdout logs that some classes, just like the one 
> throwing the exception, are not actually being loaded while other classes are.
> I tried, then, using --jars to pass the jar containing the missing classes, 
> and also using addJar() from the spark context, to no avail.
> 
> This looks like an issue with Spark class loader.
> 
> Any idea about what's happenig here? I'm using Spark 2.3.1.3.0.0.0-1634 (HDP 
> 3.0). 
> 
> Thank you for your help,
> Federico


Re: what is the difference between udf execution and map(someLambda)?

2019-03-18 Thread Bobby Evans
Map and flatmap are RDD operations, a UDF is a dataframe operation.  The
big difference from a performance perspective is in the query optimizer.  A
udf defines the set of input fields it needs and the set of output fields
it will produce, map operates on the entire row at a time.  This means the
optimizer can move operations around, and potentially drop columns earlier
to try and make the overall processing more efficient.  A map operation
requires the entire row as input so the optimizer cannot do anything to it,
and does not know what the output is going to look like unless you
explicitly tell it.  But in reality, udfs are compiled down to map
operations on an RDD with some glue code to get the columns in the correct
place, so there should be little performance difference if you can manually
build a query that is similar to what the catalyst optimizer would have
built.

On Sun, Mar 17, 2019 at 1:42 PM kant kodali  wrote:

> Hi All,
>
> I am wondering what is the difference between UDF execution and
> map(someLambda)? you can assume someLambda ~= UDF. Any performance
> difference?
>
> Thanks!
>
>
>


Spark does not load all classes in fat jar

2019-03-18 Thread Federico D'Ambrosio
Hello everyone,

We're having a serious issue, where we get ClassNotFoundException because,
apparently the class is not found within the classpath of Spark, in both
the Driver and Executors.

First, I checked whether the class was actually within the jar with jar tf,
and there actually is. Then, I activated the following options to see which
classes are actually loaded:

--conf 'spark.driver.extraJavaOptions=-verbose:class' --conf
'spark.executor.extraJavaOptions=-verbose:class'

and I can see from the YARN stdout logs that some classes, just like the
one throwing the exception, are not actually being loaded while other
classes are.
I tried, then, using --jars to pass the jar containing the missing classes,
and also using addJar() from the spark context, to no avail.

This looks like an issue with Spark class loader.

Any idea about what's happenig here? I'm using Spark 2.3.1.3.0.0.0-1634
(HDP 3.0).

Thank you for your help,
Federico


Reuse broadcasted data frame in multiple query

2019-03-18 Thread Lu Liu
Hi there,
I have a question regarding reuse broadcast data frame in multiple queries, 
not quite sure whether it is possible or not and hope someone can shed some 
light here.. Pseudo code as below:

val df_schedule = spark.sql(“select …. from A”)
val df_schedule_broadcast = broadcast(df_schedule)
df_schedule_broadcast.createOrReplaceTempView(“tbl_schedule”)
// query1
res_df1 = spark.sql(“select …. from B, tbl_schedule S where B.col1 = 
S.col1”)

…..
// query2
res_df2 = spark.sql(“select …. from C, tbl_schedule S where C.col2 = 
S.col2”)

   From SparkUI, I can see 2 jobs ThreadPoolExecutor that having same DAG, cost 
similar time to complete, seems like the table df_schedule_broadcast 
broadcasted twice,  so my question is is there anyway I can avoid broadcasting 
again? Just reuse the same data in two queries




Many thanks,

LL



Re: Structured Streaming & Query Planning

2019-03-18 Thread Jungtaek Lim
Almost everything is coupled with logical plan right now, including updated
range for source in new batch, updated watermark for stateful operations,
random seed in each batch. Please refer below codes:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala

We might try out replacing these things in physical plan so that logical
plan doesn't need to be evaluated, but not sure it's feasible.

Thanks,
Jungtaek Lim (HeartSaVioR)

2019년 3월 18일 (월) 오후 4:03, Paolo Platter 님이 작성:

> I can understand that if you involve columns with variable distribution in
> join operations, it may change your execution plan, but most of the time
> this is not going to happen, in streaming the most used operations are: map
> filter, grouping and stateful operations and in all these cases I can't how
> a dynamic query planning could help.
>
> It could be useful to have a parameter to force a streaming query to
> calculate the query plan just once.
>
> Paolo
>
>
>
> Ottieni Outlook per Android 
>
> --
> *From:* Alessandro Solimando 
> *Sent:* Thursday, March 14, 2019 6:59:50 PM
> *To:* Paolo Platter
> *Cc:* user@spark.apache.org
> *Subject:* Re: Structured Streaming & Query Planning
>
> Hello Paolo,
> generally speaking, query planning is mostly based on statistics and
> distributions of data values for the involved columns, which might
> significantly change over time in a streaming context, so for me it makes a
> lot of sense that it is run at every schedule, even though I understand
> your concern.
>
> For the second question I don't know how to (or if you even can) cache the
> computed query plan.
>
> If possible, would you mind sharing your findings afterwards? (query
> planning on streaming it's a very interesting and not yet enough explored
> topic IMO)
>
> Best regards,
> Alessandro
>
> On Thu, 14 Mar 2019 at 16:51, Paolo Platter 
> wrote:
>
>> Hi All,
>>
>>
>>
>> I would like to understand why in a streaming query ( that should not be
>> able to change its behaviour along iterations ) there is a
>> queryPlanning-Duration effort ( in my case is 33% of trigger interval ) at
>> every schedule. I don’t uderstand  why this is needed and if it is possible
>> to disable or cache it.
>>
>>
>>
>> Thanks
>>
>>
>>
>>
>>
>> [image: cid:image001.jpg@01D41D15.E01B6F00]
>>
>> *Paolo Platter*
>>
>> *CTO*
>>
>> E-mail:paolo.plat...@agilelab.it
>>
>> Web Site:   www.agilelab.it
>>
>>
>>
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Re: Structured Streaming & Query Planning

2019-03-18 Thread Paolo Platter
I can understand that if you involve columns with variable distribution in join 
operations, it may change your execution plan, but most of the time this is not 
going to happen, in streaming the most used operations are: map filter, 
grouping and stateful operations and in all these cases I can't how a dynamic 
query planning could help.

It could be useful to have a parameter to force a streaming query to calculate 
the query plan just once.

Paolo



Ottieni Outlook per Android


From: Alessandro Solimando 
Sent: Thursday, March 14, 2019 6:59:50 PM
To: Paolo Platter
Cc: user@spark.apache.org
Subject: Re: Structured Streaming & Query Planning

Hello Paolo,
generally speaking, query planning is mostly based on statistics and 
distributions of data values for the involved columns, which might 
significantly change over time in a streaming context, so for me it makes a lot 
of sense that it is run at every schedule, even though I understand your 
concern.

For the second question I don't know how to (or if you even can) cache the 
computed query plan.

If possible, would you mind sharing your findings afterwards? (query planning 
on streaming it's a very interesting and not yet enough explored topic IMO)

Best regards,
Alessandro

On Thu, 14 Mar 2019 at 16:51, Paolo Platter 
mailto:paolo.plat...@agilelab.it>> wrote:
Hi All,

I would like to understand why in a streaming query ( that should not be able 
to change its behaviour along iterations ) there is a queryPlanning-Duration 
effort ( in my case is 33% of trigger interval ) at every schedule. I don’t 
uderstand  why this is needed and if it is possible to disable or cache it.

Thanks


[cid:image001.jpg@01D41D15.E01B6F00]

Paolo Platter
CTO
E-mail:paolo.plat...@agilelab.it
Web Site:   www.agilelab.it





Spark on Mesos broken on 2.4 ?

2019-03-18 Thread Jorge Machado
Hello Everyone, 

I’m just trying out the spark-shell on mesos and I don’t get any executors. To 
debug it I started the vagrant box from aurora and try it out there and I can 
the same issue as I’m getting on my cluster. 
On Mesos the only active framework is the spark-shel, it is running 1.6.1 and 
has 4 cores.  Does someone else have the same issue ?



vagrant@aurora:~/spark-2.4.0-bin-hadoop2.7$ ./bin/spark-shell --master 
mesos://192.168.33.7:5050 --total-executor-cores 3
2019-03-18 06:43:30 WARN  Utils:66 - Your hostname, aurora resolves to a 
loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0)
2019-03-18 06:43:30 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to 
another address
2019-03-18 06:43:31 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
I0318 06:43:40.738236  5192 sched.cpp:232] Version: 1.6.1
I0318 06:43:40.743121  5188 sched.cpp:336] New master detected at 
master@192.168.33.7:5050
I0318 06:43:40.744252  5188 sched.cpp:351] No credentials provided. Attempting 
to register without authentication
I0318 06:43:40.748190  5188 sched.cpp:749] Framework registered with 
2c00bfa7-df7b-430b-8c92-c6452c447249-0004
Spark context Web UI available at http://10.0.2.15:4040
Spark context available as 'sc' (master = mesos://192.168.33.7:5050, app id = 
2c00bfa7-df7b-430b-8c92-c6452c447249-0004).
Spark session available as 'spark'.
Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
  /_/

Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_181)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val textFile = spark.read.textFile("README.md")
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

scala> textFile.count()
[Stage 0:>  (0 + 0) / 
1]2019-03-18 06:44:48 WARN  TaskSchedulerImpl:66 - Initial job has not accepted 
any resources; check your cluster UI to ensure that workers are registered and 
have sufficient resources
2019-03-18 06:45:03 WARN  TaskSchedulerImpl:66 - Initial job has not accepted 
any resources; check your cluster UI to ensure that workers are registered and 
have sufficient resources
2019-03-18 06:45:18 WARN  TaskSchedulerImpl:66 - Initial job has not accepted 
any resources; check your cluster UI to ensure that workers are registered and 
have sufficient resources
2019-03-18 06:45:33 WARN  TaskSchedulerImpl:66 - Initial job has not accepted 
any resources; check your cluster UI to ensure that workers are registered and 
have sufficient resources
[Stage 0:>  (0 + 0) / 
1]2019-03-18 06:45:46 WARN  Signaling:66 - Cancelling all active jobs, this can 
take a while. Press Ctrl+C again to exit now.


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org