Re: Chaining Spark Streaming Jobs

2017-11-02 Thread Sunita Arvind
Sorry Michael, I ended up using kafka and missed noticing your message.
Yes, I did specify the schema with read.schema and thats when I got:

at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)

regards

Sunita


On Mon, Sep 18, 2017 at 10:15 AM, Michael Armbrust 
wrote:

> You specify the schema when loading a dataframe by calling
> spark.read.schema(...)...
>
> On Tue, Sep 12, 2017 at 4:50 PM, Sunita Arvind 
> wrote:
>
>> Hi Michael,
>>
>> I am wondering what I am doing wrong. I get error like:
>>
>> Exception in thread "main" java.lang.IllegalArgumentException: Schema
>> must be specified when creating a streaming source DataFrame. If some files
>> already exist in the directory, then depending on the file format you may
>> be able to create a static DataFrame on that directory with
>> 'spark.read.load(directory)' and infer schema from it.
>> at org.apache.spark.sql.execution.datasources.DataSource.
>> sourceSchema(DataSource.scala:223)
>> at org.apache.spark.sql.execution.datasources.DataSource.
>> sourceInfo$lzycompute(DataSource.scala:87)
>> at org.apache.spark.sql.execution.datasources.DataSource.
>> sourceInfo(DataSource.scala:87)
>> at org.apache.spark.sql.execution.streaming.StreamingRelation$.
>> apply(StreamingRelation.scala:30)
>> at org.apache.spark.sql.streaming.DataStreamReader.load(
>> DataStreamReader.scala:125)
>> at org.apache.spark.sql.streaming.DataStreamReader.load(
>> DataStreamReader.scala:134)
>> at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregate
>> s.scala:23)
>> at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
>> at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>> ssorImpl.java:62)
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>> thodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.
>> java:144)
>> 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook
>>
>>
>> I tried specifying the schema as well.
>> Here is my code:
>>
>> object Aggregates {
>>
>>   val aggregation=
>> """select sum(col1), sum(col2), id, first(name)
>>   from enrichedtb
>>   group by id
>> """.stripMargin
>>
>>   def aggregator(conf:Config)={
>> implicit val spark = 
>> SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
>> implicit val sqlctx = spark.sqlContext
>> printf("Source path is" + conf.getString("source.path"))
>> val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // 
>> Added this as it was complaining about schema.
>> val df=spark.readStream.format("parquet").option("inferSchema", 
>> true).schema(schemadf.schema).load(conf.getString("source.path"))
>> df.createOrReplaceTempView("enrichedtb")
>> val res = spark.sql(aggregation)
>> 
>> res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
>>   }
>>
>>   def main(args: Array[String]): Unit = {
>> val mainconf = ConfigFactory.load()
>> val conf = mainconf.getConfig(mainconf.getString("pipeline"))
>> print(conf.toString)
>> aggregator(conf)
>>   }
>>
>> }
>>
>>
>> I tried to extract schema from static read of the input path and provided it 
>> to the readStream API. With that, I get this error:
>>
>> at 
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>>  at 
>> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
>>  at 
>> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>>  at 
>> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>>  at 
>> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
>>  at 
>> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)
>>
>> While running on the EMR cluster all paths point to S3. In my laptop, they 
>> all point to local filesystem.
>>
>> I am using Spark2.2.0
>>
>> Appreciate your help.
>>
>> regards
>>
>> Sunita
>>
>>
>> On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust > > wrote:
>>
>>> If you use structured streaming and the file sink, you can have a
>>> subsequent stream read using the file source.  This will maintain exactly
>>> once processing even if there 

Re: Chaining Spark Streaming Jobs

2017-09-18 Thread Michael Armbrust
You specify the schema when loading a dataframe by calling
spark.read.schema(...)...

On Tue, Sep 12, 2017 at 4:50 PM, Sunita Arvind 
wrote:

> Hi Michael,
>
> I am wondering what I am doing wrong. I get error like:
>
> Exception in thread "main" java.lang.IllegalArgumentException: Schema
> must be specified when creating a streaming source DataFrame. If some files
> already exist in the directory, then depending on the file format you may
> be able to create a static DataFrame on that directory with
> 'spark.read.load(directory)' and infer schema from it.
> at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(
> DataSource.scala:223)
> at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$
> lzycompute(DataSource.scala:87)
> at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(
> DataSource.scala:87)
> at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(
> StreamingRelation.scala:30)
> at org.apache.spark.sql.streaming.DataStreamReader.
> load(DataStreamReader.scala:125)
> at org.apache.spark.sql.streaming.DataStreamReader.
> load(DataStreamReader.scala:134)
> at com.aol.persist.UplynkAggregates$.aggregator(
> UplynkAggregates.scala:23)
> at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
> at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(
> AppMain.java:144)
> 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook
>
>
> I tried specifying the schema as well.
> Here is my code:
>
> object Aggregates {
>
>   val aggregation=
> """select sum(col1), sum(col2), id, first(name)
>   from enrichedtb
>   group by id
> """.stripMargin
>
>   def aggregator(conf:Config)={
> implicit val spark = 
> SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
> implicit val sqlctx = spark.sqlContext
> printf("Source path is" + conf.getString("source.path"))
> val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // 
> Added this as it was complaining about schema.
> val df=spark.readStream.format("parquet").option("inferSchema", 
> true).schema(schemadf.schema).load(conf.getString("source.path"))
> df.createOrReplaceTempView("enrichedtb")
> val res = spark.sql(aggregation)
> 
> res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
>   }
>
>   def main(args: Array[String]): Unit = {
> val mainconf = ConfigFactory.load()
> val conf = mainconf.getConfig(mainconf.getString("pipeline"))
> print(conf.toString)
> aggregator(conf)
>   }
>
> }
>
>
> I tried to extract schema from static read of the input path and provided it 
> to the readStream API. With that, I get this error:
>
> at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>   at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)
>
> While running on the EMR cluster all paths point to S3. In my laptop, they 
> all point to local filesystem.
>
> I am using Spark2.2.0
>
> Appreciate your help.
>
> regards
>
> Sunita
>
>
> On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust 
> wrote:
>
>> If you use structured streaming and the file sink, you can have a
>> subsequent stream read using the file source.  This will maintain exactly
>> once processing even if there are hiccups or failures.
>>
>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
>> wrote:
>>
>>> Hello Spark Experts,
>>>
>>> I have a design question w.r.t Spark Streaming. I have a streaming job
>>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>>> on premise. My spark application runs on EMR (aws) and persists data onto
>>> s3. Before I persist, I need to strip header and convert protobuffer to
>>> parquet (I use sparksql-scalapb to convert from Protobuff to
>>> Spark.sql.Row). I need to 

Re: Chaining Spark Streaming Jobs

2017-09-13 Thread Sunita Arvind
Thanks for your suggestion Vincent. Do not have much experience with akka
as such. I will explore this option.

On Tue, Sep 12, 2017 at 11:01 PM, vincent gromakowski <
vincent.gromakow...@gmail.com> wrote:

> What about chaining with akka or akka stream and the fair scheduler ?
>
> Le 13 sept. 2017 01:51, "Sunita Arvind"  a écrit :
>
> Hi Michael,
>
> I am wondering what I am doing wrong. I get error like:
>
> Exception in thread "main" java.lang.IllegalArgumentException: Schema
> must be specified when creating a streaming source DataFrame. If some files
> already exist in the directory, then depending on the file format you may
> be able to create a static DataFrame on that directory with
> 'spark.read.load(directory)' and infer schema from it.
> at org.apache.spark.sql.execution.datasources.DataSource.
> sourceSchema(DataSource.scala:223)
> at org.apache.spark.sql.execution.datasources.DataSource.
> sourceInfo$lzycompute(DataSource.scala:87)
> at org.apache.spark.sql.execution.datasources.DataSource.
> sourceInfo(DataSource.scala:87)
> at org.apache.spark.sql.execution.streaming.StreamingRelation$.
> apply(StreamingRelation.scala:30)
> at org.apache.spark.sql.streaming.DataStreamReader.load(
> DataStreamReader.scala:125)
> at org.apache.spark.sql.streaming.DataStreamReader.load(
> DataStreamReader.scala:134)
> at com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregate
> s.scala:23)
> at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
> at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
> ssorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
> thodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.
> java:144)
> 17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook
>
>
> I tried specifying the schema as well.
> Here is my code:
>
> object Aggregates {
>
>   val aggregation=
> """select sum(col1), sum(col2), id, first(name)
>   from enrichedtb
>   group by id
> """.stripMargin
>
>   def aggregator(conf:Config)={
> implicit val spark = 
> SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
> implicit val sqlctx = spark.sqlContext
> printf("Source path is" + conf.getString("source.path"))
> val schemadf = sqlctx.read.parquet(conf.getString("source.path")) // 
> Added this as it was complaining about schema.
> val df=spark.readStream.format("parquet").option("inferSchema", 
> true).schema(schemadf.schema).load(conf.getString("source.path"))
> df.createOrReplaceTempView("enrichedtb")
> val res = spark.sql(aggregation)
> 
> res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
>   }
>
>   def main(args: Array[String]): Unit = {
> val mainconf = ConfigFactory.load()
> val conf = mainconf.getConfig(mainconf.getString("pipeline"))
> print(conf.toString)
> aggregator(conf)
>   }
>
> }
>
>
> I tried to extract schema from static read of the input path and provided it 
> to the readStream API. With that, I get this error:
>
> at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
>   at 
> org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
>   at 
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
>   at 
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)
>
> While running on the EMR cluster all paths point to S3. In my laptop, they 
> all point to local filesystem.
>
> I am using Spark2.2.0
>
> Appreciate your help.
>
> regards
>
> Sunita
>
>
> On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust 
> wrote:
>
>> If you use structured streaming and the file sink, you can have a
>> subsequent stream read using the file source.  This will maintain exactly
>> once processing even if there are hiccups or failures.
>>
>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
>> wrote:
>>
>>> Hello Spark Experts,
>>>
>>> I have a design question w.r.t Spark Streaming. I have a streaming job
>>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>>> on premise. My spark application runs on EMR (aws) 

Re: Chaining Spark Streaming Jobs

2017-09-13 Thread vincent gromakowski
What about chaining with akka or akka stream and the fair scheduler ?

Le 13 sept. 2017 01:51, "Sunita Arvind"  a écrit :

Hi Michael,

I am wondering what I am doing wrong. I get error like:

Exception in thread "main" java.lang.IllegalArgumentException: Schema must
be specified when creating a streaming source DataFrame. If some files
already exist in the directory, then depending on the file format you may
be able to create a static DataFrame on that directory with
'spark.read.load(directory)' and infer schema from it.
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(
DataSource.scala:223)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$
lzycompute(DataSource.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(
DataSource.scala:87)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(
StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.
load(DataStreamReader.scala:125)
at org.apache.spark.sql.streaming.DataStreamReader.
load(DataStreamReader.scala:134)
at com.aol.persist.UplynkAggregates$.aggregator(
UplynkAggregates.scala:23)
at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook


I tried specifying the schema as well.
Here is my code:

object Aggregates {

  val aggregation=
"""select sum(col1), sum(col2), id, first(name)
  from enrichedtb
  group by id
""".stripMargin

  def aggregator(conf:Config)={
implicit val spark =
SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
implicit val sqlctx = spark.sqlContext
printf("Source path is" + conf.getString("source.path"))
val schemadf = sqlctx.read.parquet(conf.getString("source.path"))
// Added this as it was complaining about schema.
val df=spark.readStream.format("parquet").option("inferSchema",
true).schema(schemadf.schema).load(conf.getString("source.path"))
df.createOrReplaceTempView("enrichedtb")
val res = spark.sql(aggregation)

res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
  }

  def main(args: Array[String]): Unit = {
val mainconf = ConfigFactory.load()
val conf = mainconf.getConfig(mainconf.getString("pipeline"))
print(conf.toString)
aggregator(conf)
  }

}


I tried to extract schema from static read of the input path and
provided it to the readStream API. With that, I get this error:

at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
at 
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at 
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)

While running on the EMR cluster all paths point to S3. In my laptop,
they all point to local filesystem.

I am using Spark2.2.0

Appreciate your help.

regards

Sunita


On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust 
wrote:

> If you use structured streaming and the file sink, you can have a
> subsequent stream read using the file source.  This will maintain exactly
> once processing even if there are hiccups or failures.
>
> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
> wrote:
>
>> Hello Spark Experts,
>>
>> I have a design question w.r.t Spark Streaming. I have a streaming job
>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>> on premise. My spark application runs on EMR (aws) and persists data onto
>> s3. Before I persist, I need to strip header and convert protobuffer to
>> parquet (I use sparksql-scalapb to convert from Protobuff to
>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>> enrichment on the same dataframe after persisting the raw data, however, in
>> order to modularize I am planning to have a separate job which picks up the
>> raw data and 

Re: Chaining Spark Streaming Jobs

2017-09-12 Thread Sunita Arvind
Hi Michael,

I am wondering what I am doing wrong. I get error like:

Exception in thread "main" java.lang.IllegalArgumentException: Schema must
be specified when creating a streaming source DataFrame. If some files
already exist in the directory, then depending on the file format you may
be able to create a static DataFrame on that directory with
'spark.read.load(directory)' and infer schema from it.
at
org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:223)
at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
at
org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
at
org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:125)
at
org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:134)
at
com.aol.persist.UplynkAggregates$.aggregator(UplynkAggregates.scala:23)
at com.aol.persist.UplynkAggregates$.main(UplynkAggregates.scala:41)
at com.aol.persist.UplynkAggregates.main(UplynkAggregates.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
17/09/12 14:46:22 INFO SparkContext: Invoking stop() from shutdown hook


I tried specifying the schema as well.
Here is my code:

object Aggregates {

  val aggregation=
"""select sum(col1), sum(col2), id, first(name)
  from enrichedtb
  group by id
""".stripMargin

  def aggregator(conf:Config)={
implicit val spark =
SparkSession.builder().appName(conf.getString("AppName")).getOrCreate()
implicit val sqlctx = spark.sqlContext
printf("Source path is" + conf.getString("source.path"))
val schemadf = sqlctx.read.parquet(conf.getString("source.path"))
// Added this as it was complaining about schema.
val df=spark.readStream.format("parquet").option("inferSchema",
true).schema(schemadf.schema).load(conf.getString("source.path"))
df.createOrReplaceTempView("enrichedtb")
val res = spark.sql(aggregation)

res.writeStream.format("parquet").outputMode("append").option("checkpointLocation",conf.getString("checkpointdir")).start(conf.getString("sink.outputpath"))
  }

  def main(args: Array[String]): Unit = {
val mainconf = ConfigFactory.load()
val conf = mainconf.getConfig(mainconf.getString("pipeline"))
print(conf.toString)
aggregator(conf)
  }

}


I tried to extract schema from static read of the input path and
provided it to the readStream API. With that, I get this error:

at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at 
org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:109)
at 
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at 
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:222)

While running on the EMR cluster all paths point to S3. In my laptop,
they all point to local filesystem.

I am using Spark2.2.0

Appreciate your help.

regards

Sunita


On Wed, Aug 23, 2017 at 2:30 PM, Michael Armbrust 
wrote:

> If you use structured streaming and the file sink, you can have a
> subsequent stream read using the file source.  This will maintain exactly
> once processing even if there are hiccups or failures.
>
> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
> wrote:
>
>> Hello Spark Experts,
>>
>> I have a design question w.r.t Spark Streaming. I have a streaming job
>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>> on premise. My spark application runs on EMR (aws) and persists data onto
>> s3. Before I persist, I need to strip header and convert protobuffer to
>> parquet (I use sparksql-scalapb to convert from Protobuff to
>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>> enrichment on the same dataframe after persisting the raw data, however, in
>> order to modularize I am planning to have a separate job which picks up the
>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>> 1 job as the enrichments could get project specific while raw data
>> persistence stays 

Re: Chaining Spark Streaming Jobs

2017-09-08 Thread Sunita Arvind
Thanks for your response Praneeth. We did consider Kafka however cost was
the only hold back factor as we might need a larger cluster and existing
cluster is on premise and my app is on cloud. So the same cluster cannot be
used.
But I agree it does sound like a good alternative.

Regards
Sunita

On Thu, Sep 7, 2017 at 11:24 PM Praneeth Gayam 
wrote:

> With file stream you will have to deal with the following
>
>1. The file(s) must not be changed once created. So if the files are
>being continuously appended, the new data will not be read. Refer
>
> 
>2. The files must be created in the dataDirectory by atomically
>*moving* or *renaming* them into the data directory.
>
> Since the latency requirements for the second job in the chain is only a
> few mins, you may have to end up creating a new file every few mins
>
> You may want to consider Kafka as your intermediary store for building a
> chain/DAG of streaming jobs
>
> On Fri, Sep 8, 2017 at 9:45 AM, Sunita Arvind 
> wrote:
>
>> Thanks for your response Michael
>> Will try it out.
>>
>> Regards
>> Sunita
>>
>> On Wed, Aug 23, 2017 at 2:30 PM Michael Armbrust 
>> wrote:
>>
>>> If you use structured streaming and the file sink, you can have a
>>> subsequent stream read using the file source.  This will maintain exactly
>>> once processing even if there are hiccups or failures.
>>>
>>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
>>> wrote:
>>>
 Hello Spark Experts,

 I have a design question w.r.t Spark Streaming. I have a streaming job
 that consumes protocol buffer encoded real time logs from a Kafka cluster
 on premise. My spark application runs on EMR (aws) and persists data onto
 s3. Before I persist, I need to strip header and convert protobuffer to
 parquet (I use sparksql-scalapb to convert from Protobuff to
 Spark.sql.Row). I need to persist Raw logs as is. I can continue the
 enrichment on the same dataframe after persisting the raw data, however, in
 order to modularize I am planning to have a separate job which picks up the
 raw data and performs enrichment on it. Also,  I am trying to avoid all in
 1 job as the enrichments could get project specific while raw data
 persistence stays customer/project agnostic.The enriched data is allowed to
 have some latency (few minutes)

 My challenge is, after persisting the raw data, how do I chain the next
 streaming job. The only way I can think of is -  job 1 (raw data)
 partitions on current date (MMDD) and within current date, the job 2
 (enrichment job) filters for records within 60s of current time and
 performs enrichment on it in 60s batches.
 Is this a good option? It seems to be error prone. When either of the
 jobs get delayed due to bursts or any error/exception this could lead to
 huge data losses and non-deterministic behavior . What are other
 alternatives to this?

 Appreciate any guidance in this regard.

 regards
 Sunita Koppar

>>>
>>>
>


Re: Chaining Spark Streaming Jobs

2017-09-08 Thread Praneeth Gayam
With file stream you will have to deal with the following

   1. The file(s) must not be changed once created. So if the files are
   being continuously appended, the new data will not be read. Refer
   

   2. The files must be created in the dataDirectory by atomically *moving*
or *renaming* them into the data directory.

Since the latency requirements for the second job in the chain is only a
few mins, you may have to end up creating a new file every few mins

You may want to consider Kafka as your intermediary store for building a
chain/DAG of streaming jobs

On Fri, Sep 8, 2017 at 9:45 AM, Sunita Arvind  wrote:

> Thanks for your response Michael
> Will try it out.
>
> Regards
> Sunita
>
> On Wed, Aug 23, 2017 at 2:30 PM Michael Armbrust 
> wrote:
>
>> If you use structured streaming and the file sink, you can have a
>> subsequent stream read using the file source.  This will maintain exactly
>> once processing even if there are hiccups or failures.
>>
>> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
>> wrote:
>>
>>> Hello Spark Experts,
>>>
>>> I have a design question w.r.t Spark Streaming. I have a streaming job
>>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>>> on premise. My spark application runs on EMR (aws) and persists data onto
>>> s3. Before I persist, I need to strip header and convert protobuffer to
>>> parquet (I use sparksql-scalapb to convert from Protobuff to
>>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>>> enrichment on the same dataframe after persisting the raw data, however, in
>>> order to modularize I am planning to have a separate job which picks up the
>>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>>> 1 job as the enrichments could get project specific while raw data
>>> persistence stays customer/project agnostic.The enriched data is allowed to
>>> have some latency (few minutes)
>>>
>>> My challenge is, after persisting the raw data, how do I chain the next
>>> streaming job. The only way I can think of is -  job 1 (raw data)
>>> partitions on current date (MMDD) and within current date, the job 2
>>> (enrichment job) filters for records within 60s of current time and
>>> performs enrichment on it in 60s batches.
>>> Is this a good option? It seems to be error prone. When either of the
>>> jobs get delayed due to bursts or any error/exception this could lead to
>>> huge data losses and non-deterministic behavior . What are other
>>> alternatives to this?
>>>
>>> Appreciate any guidance in this regard.
>>>
>>> regards
>>> Sunita Koppar
>>>
>>
>>


Re: Chaining Spark Streaming Jobs

2017-09-07 Thread Sunita Arvind
Thanks for your response Michael
Will try it out.

Regards
Sunita

On Wed, Aug 23, 2017 at 2:30 PM Michael Armbrust 
wrote:

> If you use structured streaming and the file sink, you can have a
> subsequent stream read using the file source.  This will maintain exactly
> once processing even if there are hiccups or failures.
>
> On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
> wrote:
>
>> Hello Spark Experts,
>>
>> I have a design question w.r.t Spark Streaming. I have a streaming job
>> that consumes protocol buffer encoded real time logs from a Kafka cluster
>> on premise. My spark application runs on EMR (aws) and persists data onto
>> s3. Before I persist, I need to strip header and convert protobuffer to
>> parquet (I use sparksql-scalapb to convert from Protobuff to
>> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
>> enrichment on the same dataframe after persisting the raw data, however, in
>> order to modularize I am planning to have a separate job which picks up the
>> raw data and performs enrichment on it. Also,  I am trying to avoid all in
>> 1 job as the enrichments could get project specific while raw data
>> persistence stays customer/project agnostic.The enriched data is allowed to
>> have some latency (few minutes)
>>
>> My challenge is, after persisting the raw data, how do I chain the next
>> streaming job. The only way I can think of is -  job 1 (raw data)
>> partitions on current date (MMDD) and within current date, the job 2
>> (enrichment job) filters for records within 60s of current time and
>> performs enrichment on it in 60s batches.
>> Is this a good option? It seems to be error prone. When either of the
>> jobs get delayed due to bursts or any error/exception this could lead to
>> huge data losses and non-deterministic behavior . What are other
>> alternatives to this?
>>
>> Appreciate any guidance in this regard.
>>
>> regards
>> Sunita Koppar
>>
>
>


Re: Chaining Spark Streaming Jobs

2017-08-23 Thread Michael Armbrust
If you use structured streaming and the file sink, you can have a
subsequent stream read using the file source.  This will maintain exactly
once processing even if there are hiccups or failures.

On Mon, Aug 21, 2017 at 2:02 PM, Sunita Arvind 
wrote:

> Hello Spark Experts,
>
> I have a design question w.r.t Spark Streaming. I have a streaming job
> that consumes protocol buffer encoded real time logs from a Kafka cluster
> on premise. My spark application runs on EMR (aws) and persists data onto
> s3. Before I persist, I need to strip header and convert protobuffer to
> parquet (I use sparksql-scalapb to convert from Protobuff to
> Spark.sql.Row). I need to persist Raw logs as is. I can continue the
> enrichment on the same dataframe after persisting the raw data, however, in
> order to modularize I am planning to have a separate job which picks up the
> raw data and performs enrichment on it. Also,  I am trying to avoid all in
> 1 job as the enrichments could get project specific while raw data
> persistence stays customer/project agnostic.The enriched data is allowed to
> have some latency (few minutes)
>
> My challenge is, after persisting the raw data, how do I chain the next
> streaming job. The only way I can think of is -  job 1 (raw data)
> partitions on current date (MMDD) and within current date, the job 2
> (enrichment job) filters for records within 60s of current time and
> performs enrichment on it in 60s batches.
> Is this a good option? It seems to be error prone. When either of the jobs
> get delayed due to bursts or any error/exception this could lead to huge
> data losses and non-deterministic behavior . What are other alternatives to
> this?
>
> Appreciate any guidance in this regard.
>
> regards
> Sunita Koppar
>


Chaining Spark Streaming Jobs

2017-08-21 Thread Sunita Arvind
Hello Spark Experts,

I have a design question w.r.t Spark Streaming. I have a streaming job that
consumes protocol buffer encoded real time logs from a Kafka cluster on
premise. My spark application runs on EMR (aws) and persists data onto s3.
Before I persist, I need to strip header and convert protobuffer to parquet
(I use sparksql-scalapb to convert from Protobuff to Spark.sql.Row). I need
to persist Raw logs as is. I can continue the enrichment on the same
dataframe after persisting the raw data, however, in order to modularize I
am planning to have a separate job which picks up the raw data and performs
enrichment on it. Also,  I am trying to avoid all in 1 job as the
enrichments could get project specific while raw data persistence stays
customer/project agnostic.The enriched data is allowed to have some latency
(few minutes)

My challenge is, after persisting the raw data, how do I chain the next
streaming job. The only way I can think of is -  job 1 (raw data)
partitions on current date (MMDD) and within current date, the job 2
(enrichment job) filters for records within 60s of current time and
performs enrichment on it in 60s batches.
Is this a good option? It seems to be error prone. When either of the jobs
get delayed due to bursts or any error/exception this could lead to huge
data losses and non-deterministic behavior . What are other alternatives to
this?

Appreciate any guidance in this regard.

regards
Sunita Koppar