pyspark on windows (Upgrade from 1.6 to 2.0.2): sqlContext.read.format fails

2017-08-25 Thread Shiv Onkar Kumar
Hi,

The following line works very well in 1.6 although fails in2.0.2. Any idea, 
what could be the issue


 
df_train 
=sqlContext.read.format("libsvm").load("../data/mllib/sample_linear_regression_data.txt")


 
The error is 

  File "", line 1, in     df_train = 
sqlContext.read.format("libsvm").load("../data/mllib/sample_linear_regression_data.txt")
  File 
"D:\ProgramFiles\spark-2.0.2-bin-hadoop2.3\python\lib\pyspark.zip\pyspark\sql\readwriter.py",
 line 147, in load    return self._df(self._jreader.load(path))
  File 
"D:\ProgramFiles\spark-2.0.2-bin-hadoop2.3\python\lib\py4j-0.10.3-src.zip\py4j\java_gateway.py",
 line 1133, in __call__    answer, self.gateway_client, self.target_id, 
self.name)
  File 
"D:\ProgramFiles\spark-2.0.2-bin-hadoop2.3\python\lib\pyspark.zip\pyspark\sql\utils.py",
 line 79, in deco    raise IllegalArgumentException(s.split(': ', 1)[1], 
stackTrace)
IllegalArgumentException: 'Can not create a Path from an empty string'
Thank you Shiv

Why do checkpoints work the way they do?

2017-08-25 Thread Hugo Reinwald
Hello,

I am implementing a spark streaming solution with Kafka and read that
checkpoints cannot be used across application code changes - here


I tested changes in application code and got the error message as b below -

17/08/25 15:10:47 WARN CheckpointReader: Error reading checkpoint from file
file:/tmp/checkpoint/checkpoint-150364116.bk
java.io.InvalidClassException: scala.collection.mutable.ArrayBuffer; local
class incompatible: stream classdesc serialVersionUID =
-2927962711774871866, local class serialVersionUID = 1529165946227428979

While I understand that this is as per design, can I know why does
checkpointing work the way that it does verifying the class signatures?
Would it not be easier to let the developer decide if he/she wants to use
the old checkpoints depending on what is the change in application logic
e.g. changes in code unrelated to spark/kafka - Logging / conf changes etc

This is first post in the group. Apologies if I am asking the question
again, I did a nabble search and it didnt throw up the answer.

Thanks for the help.
Hugo


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-25 Thread swetha kasireddy
Because I saw some posts that say that consumer cache  enabled will have
concurrentModification exception with reduceByKeyAndWIndow. I see those
errors as well after running for sometime with cache being enabled. So, I
had to disable it. Please see the tickets below.  We have 96 partitions. So
if I enable cache, would teh following settings help to improve
performance?

"spark.streaming.kafka.consumer.cache.initialCapacity" -> Integer.*valueOf*
(12),
"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(15),

"spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024),

http://markmail.org/message/n4cdxwurlhf44q5x

https://issues.apache.org/jira/browse/SPARK-19185


Also, I have a batch of 60 seconds. What do you suggest the following  to
be?

 session.timeout.ms, heartbeat.interval.ms

On Fri, Aug 25, 2017 at 5:04 PM, swetha kasireddy  wrote:

> Because I saw some posts that say that consumer cache  enabled will have
> concurrentModification exception with reduceByKeyAndWIndow. I see those
> errors as well after running for sometime with cache being enabled. So, I
> had to disable it. Please see the tickets below.  We have 96 partitions. So
> if I enable cache, would teh following settings help to improve
> performance?
>
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*
> (96),
> "spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*
> (96),
>
> "spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024),
>
> http://markmail.org/message/n4cdxwurlhf44q5x
>
> https://issues.apache.org/jira/browse/SPARK-19185
>
> On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger 
> wrote:
>
>> Why are you setting consumer.cache.enabled to false?
>>
>> On Fri, Aug 25, 2017 at 2:19 PM, SRK  wrote:
>> > Hi,
>> >
>> > What would be the appropriate settings to run Spark with Kafka 10? My
>> job
>> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its
>> very
>> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka
>> 10 . I
>> > see the following error sometimes . Please see the kafka parameters and
>> the
>> > consumer strategy for creating the stream below. Any suggestions on how
>> to
>> > run this with better performance would be of great help.
>> >
>> > java.lang.AssertionError: assertion failed: Failed to get records for
>> test
>> > stream1 72 324027964 after polling for 12
>> >
>> > val kafkaParams = Map[String, Object](
>> >   "bootstrap.servers" -> kafkaBrokers,
>> >   "key.deserializer" -> classOf[StringDeserializer],
>> >   "value.deserializer" -> classOf[StringDeserializer],
>> >   "auto.offset.reset" -> "latest",
>> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
>> >   "session.timeout.ms" -> Integer.valueOf(6),
>> >   "request.timeout.ms" -> Integer.valueOf(9),
>> >   "enable.auto.commit" -> (false: java.lang.Boolean),
>> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>> >   "group.id" -> "test1"
>> > )
>> >
>> >   val hubbleStream = KafkaUtils.createDirectStream[String, String](
>> > ssc,
>> > LocationStrategies.PreferConsistent,
>> > ConsumerStrategies.Subscribe[String, String](topicsSet,
>> kafkaParams)
>> >   )
>> >
>> >
>> >
>> >
>> >
>> > --
>> > View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Slower-performance-while-running-Spark
>> -Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
>> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> >
>> > -
>> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> >
>>
>
>


Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-25 Thread swetha kasireddy
Because I saw some posts that say that consumer cache  enabled will have
concurrentModification exception with reduceByKeyAndWIndow. I see those
errors as well after running for sometime with cache being enabled. So, I
had to disable it. Please see the tickets below.  We have 96 partitions. So
if I enable cache, would teh following settings help to improve
performance?

"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(96),
"spark.streaming.kafka.consumer.cache.maxCapacity" -> Integer.*valueOf*(96),

"spark.streaming.kafka.consumer.poll.ms" -> Integer.*valueOf*(1024),

http://markmail.org/message/n4cdxwurlhf44q5x

https://issues.apache.org/jira/browse/SPARK-19185

On Fri, Aug 25, 2017 at 12:28 PM, Cody Koeninger  wrote:

> Why are you setting consumer.cache.enabled to false?
>
> On Fri, Aug 25, 2017 at 2:19 PM, SRK  wrote:
> > Hi,
> >
> > What would be the appropriate settings to run Spark with Kafka 10? My job
> > works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its very
> > slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10
> . I
> > see the following error sometimes . Please see the kafka parameters and
> the
> > consumer strategy for creating the stream below. Any suggestions on how
> to
> > run this with better performance would be of great help.
> >
> > java.lang.AssertionError: assertion failed: Failed to get records for
> test
> > stream1 72 324027964 after polling for 12
> >
> > val kafkaParams = Map[String, Object](
> >   "bootstrap.servers" -> kafkaBrokers,
> >   "key.deserializer" -> classOf[StringDeserializer],
> >   "value.deserializer" -> classOf[StringDeserializer],
> >   "auto.offset.reset" -> "latest",
> >   "heartbeat.interval.ms" -> Integer.valueOf(2),
> >   "session.timeout.ms" -> Integer.valueOf(6),
> >   "request.timeout.ms" -> Integer.valueOf(9),
> >   "enable.auto.commit" -> (false: java.lang.Boolean),
> >   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
> >   "group.id" -> "test1"
> > )
> >
> >   val hubbleStream = KafkaUtils.createDirectStream[String, String](
> > ssc,
> > LocationStrategies.PreferConsistent,
> > ConsumerStrategies.Subscribe[String, String](topicsSet,
> kafkaParams)
> >   )
> >
> >
> >
> >
> >
> > --
> > View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Slower-performance-while-running-
> Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > -
> > To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >
>


Re: [Streaming][Structured Streaming] Understanding dynamic allocation in streaming jobs

2017-08-25 Thread Karthik Palaniappan
I definitely agree that dynamic allocation is useful, that's why I asked the 
question :p


More specifically, does spark plan to solve the problems with DRA for 
structured streaming mentioned in that Cloudera article?


If folks can give me pointers on where to start, I'd be happy to implement 
something similar to what spark streaming did.


From: cbowden 
Sent: Thursday, August 24, 2017 7:01 PM
To: user@spark.apache.org
Subject: Re: [Streaming][Structured Streaming] Understanding dynamic allocation 
in streaming jobs

You can leverage dynamic resource allocation with structured streaming.
Certainly there's an argument trivial jobs won't benefit. Certainly there's
an argument important jobs should have fixed resources for stable end to end
latency.

Few scenarios come to mind with benefits:
- I want my application to automatically leverage more resources if my
environment changes, eg. kafka topic partitions were increased at runtime
- I am not building a toy application and my driver is managing many
streaming queries with fair scheduling enabled where not every streaming
query has strict latency requirements
- My source's underlying rdd representing the dataframe provided by getbatch
is volatile, eg. #partitions batch to batch






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Structured-Streaming-Understanding-dynamic-allocation-in-streaming-jobs-tp29091p29104.html
Apache Spark User List - [Streaming][Structured Streaming] Understanding 
dynamic allocation in streaming 
jobs
apache-spark-user-list.1001560.n3.nabble.com
[Streaming][Structured Streaming] Understanding dynamic allocation in streaming 
jobs. I'm trying to understand dynamic allocation in Spark Streaming and 
Structured Streaming. It seems if you...



Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-25 Thread Cody Koeninger
Why are you setting consumer.cache.enabled to false?

On Fri, Aug 25, 2017 at 2:19 PM, SRK  wrote:
> Hi,
>
> What would be the appropriate settings to run Spark with Kafka 10? My job
> works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its very
> slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10 . I
> see the following error sometimes . Please see the kafka parameters and the
> consumer strategy for creating the stream below. Any suggestions on how to
> run this with better performance would be of great help.
>
> java.lang.AssertionError: assertion failed: Failed to get records for test
> stream1 72 324027964 after polling for 12
>
> val kafkaParams = Map[String, Object](
>   "bootstrap.servers" -> kafkaBrokers,
>   "key.deserializer" -> classOf[StringDeserializer],
>   "value.deserializer" -> classOf[StringDeserializer],
>   "auto.offset.reset" -> "latest",
>   "heartbeat.interval.ms" -> Integer.valueOf(2),
>   "session.timeout.ms" -> Integer.valueOf(6),
>   "request.timeout.ms" -> Integer.valueOf(9),
>   "enable.auto.commit" -> (false: java.lang.Boolean),
>   "spark.streaming.kafka.consumer.cache.enabled" -> "false",
>   "group.id" -> "test1"
> )
>
>   val hubbleStream = KafkaUtils.createDirectStream[String, String](
> ssc,
> LocationStrategies.PreferConsistent,
> ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
>   )
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Slower-performance-while-running-Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Slower performance while running Spark Kafka Direct Streaming with Kafka 10 cluster

2017-08-25 Thread SRK
Hi,

What would be the appropriate settings to run Spark with Kafka 10? My job
works fine with Spark with Kafka 8 and with Kafka 8 cluster. But its very
slow with Kafka 10 by using Kafka Direct' experimental APIs for Kafka 10 . I
see the following error sometimes . Please see the kafka parameters and the
consumer strategy for creating the stream below. Any suggestions on how to
run this with better performance would be of great help.

java.lang.AssertionError: assertion failed: Failed to get records for test
stream1 72 324027964 after polling for 12

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> kafkaBrokers,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "auto.offset.reset" -> "latest",
  "heartbeat.interval.ms" -> Integer.valueOf(2),
  "session.timeout.ms" -> Integer.valueOf(6),
  "request.timeout.ms" -> Integer.valueOf(9),
  "enable.auto.commit" -> (false: java.lang.Boolean),
  "spark.streaming.kafka.consumer.cache.enabled" -> "false",
  "group.id" -> "test1"
)

  val hubbleStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
  )





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Slower-performance-while-running-Spark-Kafka-Direct-Streaming-with-Kafka-10-cluster-tp29108.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: [Spark Streaming] Streaming Dynamic Allocation is broken (at least on YARN)

2017-08-25 Thread Karthik Palaniappan
You have to set spark.executor.instances=0 in a streaming application with 
dynamic allocation: 
https://github.com/tdas/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala#L207.
 I originally had it set to a positive value, and explicitly set it to 0 after 
hitting that error.

Setting executor cores > 1 seems like reasonable advice in general, but that 
shouldn’t be my issue here, right?

From: Akhil Das
Sent: Thursday, August 24, 2017 2:34 AM
To: Karthik Palaniappan
Cc: user@spark.apache.org; 
t...@databricks.com
Subject: Re: [Spark Streaming] Streaming Dynamic Allocation is broken (at least 
on YARN)

Have you tried setting spark.executor.instances=0 to a positive non-zero value? 
Also, since its a streaming application set executor cores > 1.

On Wed, Aug 23, 2017 at 3:38 AM, Karthik Palaniappan 
> wrote:

I ran the HdfsWordCount example using this command:

spark-submit run-example \
  --conf spark.streaming.dynamicAllocation.enabled=true \
  --conf spark.executor.instances=0 \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.master=yarn \
  --conf spark.submit.deployMode=client \
  org.apache.spark.examples.streaming.HdfsWordCount /foo

I tried it on both Spark 2.1.1 (through HDP 2.6) and Spark 2.2.0 (through 
Google Dataproc 1.2), and I get the same message repeatedly that Spark cannot 
allocate any executors.

17/08/22 19:34:57 INFO org.spark_project.jetty.util.log: Logging initialized 
@1694ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: 
jetty-9.3.z-SNAPSHOT
17/08/22 19:34:57 INFO org.spark_project.jetty.server.Server: Started @1756ms
17/08/22 19:34:57 INFO org.spark_project.jetty.server.AbstractConnector: 
Started 
ServerConnector@578782d6{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
17/08/22 19:34:58 INFO 
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 
1.6.1-hadoop2
17/08/22 19:34:58 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to 
ResourceManager at hadoop-m/10.240.1.92:8032
17/08/22 19:35:00 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: 
Submitted application application_1503036971561_0022
17/08/22 19:35:04 WARN org.apache.spark.streaming.StreamingContext: Dynamic 
Allocation is enabled for this application. Enabling Dynamic allocation for 
Spark Streaming applications can cause data loss if Write Ahead Log is not 
enabled for non-replayable sources like Flume. See the programming guide for 
details on how to enable the Write Ahead Log.
17/08/22 19:35:21 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:36 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources
17/08/22 19:35:51 WARN org.apache.spark.scheduler.cluster.YarnScheduler: 
Initial job has not accepted any resources; check your cluster UI to ensure 
that workers are registered and have sufficient resources

I confirmed that the YARN cluster has enough memory for dozens of executors, 
and verified that the application allocates executors when using Core's 
spark.dynamicAllocation.enabled=true, and leaving 
spark.streaming.dynamicAllocation.enabled=false.

Is streaming dynamic allocation actually supported? Sean Owen suggested it 
might have been experimental: https://issues.apache.org/jira/browse/SPARK-21792.



--
Cheers!




Re: Structured Streaming: multiple sinks

2017-08-25 Thread Tathagata Das
My apologies Chris. Somehow I have not received the first email by OP, and
hence thought our answers to OP as cryptic questions. :/
I found the full thread on nabble. I agree with your analysis of OP's
question 1.


On Fri, Aug 25, 2017 at 12:48 AM, Chris Bowden  wrote:

> Tathagata, thanks for filling in context for other readers on 2a and 2b, I
> summarized too much in hindsight.
>
> Regarding the OP's first question, I was hinting it is quite natural to
> chain processes via kafka. If you are already interested in writing
> processed data to kafka, why add complexity to a job by having it commit
> processed data to kafka and s3 vs. simply moving the processed data from
> kafka out to s3 as needed. Perhaps the OP's thread got lost in context
> based on how I responded.
>
> 1) We are consuming from  kafka using  structured streaming and  writing
> the processed data set to s3.
> We also want to write the processed data to kafka moving forward, is it
> possible to do it from the same streaming query ? (spark  version 2.1.1)
>
> Streaming queries are currently bound to a single sink, so multiplexing
> the write with existing sinks via the  streaming query isn't possible
> AFAIK. Arguably you can reuse the "processed data" DAG by starting multiple
> sinks against it, though you will effectively process the data twice on
> different "schedules" since each sink will effectively have its own
> instance of StreamExecution, TriggerExecutor, etc. If you *really* wanted
> to do one pass of the data and process the same exact block of data per
> micro batch you could implement it via foreach or a custom sink which
> writes to kafka and s3, but I wouldn't recommend it. As stated above, it is
> quite natural to chain processes via kafka.
>
> On Thu, Aug 24, 2017 at 11:03 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Responses inline.
>>
>> On Thu, Aug 24, 2017 at 7:16 PM, cbowden  wrote:
>>
>>> 1. would it not be more natural to write processed to kafka and sink
>>> processed from kafka to s3?
>>>
>>
>> I am sorry i dont fully understand this question. Could you please
>> elaborate further, as in, what is more natural than what?
>>
>>
>>> 2a. addBatch is the time Sink#addBatch took as measured by
>>> StreamExecution.
>>>
>>
>> Yes. This essentially includes the time taken to compute the output and
>> finish writing the output to the sink.
>> (**to give some context for other readers, this person is referring to
>> the different time durations reported through StreamingQuery.lastProgress)
>>
>>
>>> 2b. getBatch is the time Source#getBatch took as measured by
>>> StreamExecution.
>>>
>> Yes, it is the time taken by the source prepare the DataFrame the has the
>> new data to be processed in the trigger.
>> Usually this is low, but its not guaranteed to be as some sources may
>> require complicated tracking and bookkeeping to prepare the DataFrame.
>>
>>
>>> 3. triggerExecution is effectively end-to-end processing time for the
>>> micro-batch, note all other durations sum closely to triggerExecution,
>>> there
>>> is a little slippage based on book-keeping activities in StreamExecution.
>>>
>>
>> Yes. Precisely.
>>
>>
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-spark-user-list.
>>> 1001560.n3.nabble.com/Structured-Streaming-multiple-sinks-tp
>>> 29056p29105.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>


Re: Structured Streaming: multiple sinks

2017-08-25 Thread Chris Bowden
Tathagata, thanks for filling in context for other readers on 2a and 2b, I
summarized too much in hindsight.

Regarding the OP's first question, I was hinting it is quite natural to
chain processes via kafka. If you are already interested in writing
processed data to kafka, why add complexity to a job by having it commit
processed data to kafka and s3 vs. simply moving the processed data from
kafka out to s3 as needed. Perhaps the OP's thread got lost in context
based on how I responded.

1) We are consuming from  kafka using  structured streaming and  writing
the processed data set to s3.
We also want to write the processed data to kafka moving forward, is it
possible to do it from the same streaming query ? (spark  version 2.1.1)

Streaming queries are currently bound to a single sink, so multiplexing the
write with existing sinks via the  streaming query isn't possible
AFAIK. Arguably you can reuse the "processed data" DAG by starting multiple
sinks against it, though you will effectively process the data twice on
different "schedules" since each sink will effectively have its own
instance of StreamExecution, TriggerExecutor, etc. If you *really* wanted
to do one pass of the data and process the same exact block of data per
micro batch you could implement it via foreach or a custom sink which
writes to kafka and s3, but I wouldn't recommend it. As stated above, it is
quite natural to chain processes via kafka.

On Thu, Aug 24, 2017 at 11:03 PM, Tathagata Das  wrote:

> Responses inline.
>
> On Thu, Aug 24, 2017 at 7:16 PM, cbowden  wrote:
>
>> 1. would it not be more natural to write processed to kafka and sink
>> processed from kafka to s3?
>>
>
> I am sorry i dont fully understand this question. Could you please
> elaborate further, as in, what is more natural than what?
>
>
>> 2a. addBatch is the time Sink#addBatch took as measured by
>> StreamExecution.
>>
>
> Yes. This essentially includes the time taken to compute the output and
> finish writing the output to the sink.
> (**to give some context for other readers, this person is referring to the
> different time durations reported through StreamingQuery.lastProgress)
>
>
>> 2b. getBatch is the time Source#getBatch took as measured by
>> StreamExecution.
>>
> Yes, it is the time taken by the source prepare the DataFrame the has the
> new data to be processed in the trigger.
> Usually this is low, but its not guaranteed to be as some sources may
> require complicated tracking and bookkeeping to prepare the DataFrame.
>
>
>> 3. triggerExecution is effectively end-to-end processing time for the
>> micro-batch, note all other durations sum closely to triggerExecution,
>> there
>> is a little slippage based on book-keeping activities in StreamExecution.
>>
>
> Yes. Precisely.
>
>
>>
>>
>>
>> --
>> View this message in context: http://apache-spark-user-list.
>> 1001560.n3.nabble.com/Structured-Streaming-multiple-sinks-
>> tp29056p29105.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Structured Streaming: multiple sinks

2017-08-25 Thread Tathagata Das
Responses inline.

On Thu, Aug 24, 2017 at 7:16 PM, cbowden  wrote:

> 1. would it not be more natural to write processed to kafka and sink
> processed from kafka to s3?
>

I am sorry i dont fully understand this question. Could you please
elaborate further, as in, what is more natural than what?


> 2a. addBatch is the time Sink#addBatch took as measured by StreamExecution.
>

Yes. This essentially includes the time taken to compute the output and
finish writing the output to the sink.
(**to give some context for other readers, this person is referring to the
different time durations reported through StreamingQuery.lastProgress)


> 2b. getBatch is the time Source#getBatch took as measured by
> StreamExecution.
>
Yes, it is the time taken by the source prepare the DataFrame the has the
new data to be processed in the trigger.
Usually this is low, but its not guaranteed to be as some sources may
require complicated tracking and bookkeeping to prepare the DataFrame.


> 3. triggerExecution is effectively end-to-end processing time for the
> micro-batch, note all other durations sum closely to triggerExecution,
> there
> is a little slippage based on book-keeping activities in StreamExecution.
>

Yes. Precisely.


>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/Structured-Streaming-multiple-
> sinks-tp29056p29105.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>