Re: Writing custom Structured Streaming receiver

2018-06-05 Thread alz2
I'm implementing a simple Structured Streaming Source with the V2 API in
Java. I've taken the Offset logic (regarding startOffset, endOffset,
lastCommittedOffset, etc) from the socket source and also your receivers. 

However, upon start up for some reason Spark says that the initial offset or
-1, is immediately available. Because -1 is available and not committed, my
streaming query gets triggered with an empty data buffer. After the query
runs, -1 is added to the StreamExecution's commitedOffsets. The issue gets
worse from here: as new data is pushed into the internal data buffer, my
currentOffset gets immediately committed (it appears in the
StreamExecution's commitedOffsets). So as my currentOffset changes because
new data is pushed into the data buffer, it appears both in availableOffsets
and committedOffsets, causing no new batches to run. 

The interesting thing is my commit function never gets run -- printing out
stuff from inside the function doesn't change behavior and even providing an
empty commit function doesn't change behavior.

Any ideas where or why my Offsets are getting committed?

Any help would be appreciated!

Here are my relevant code snippets
// instance var declarations
private Offset startOffset = null;
private Offset endOffset = null;
private volatile static currentOffset = new SocketOffset(-1);
private SocketOffset lastOffsetCommitted = new SocketOffset(-1);

public Offset getEndOffset() { // getStartOffset is the same except with
this.startOffset
  if (this.endOffset == null) throw
  return this.endOffset
}

public void setOffsetRange(Optional start, Optional end) {
  this.startOffset = start.orElse(new SocketOffset(-1));
  this.endOffset = end.orElse(currentOffset);
}





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing custom Structured Streaming receiver

2018-03-04 Thread Hien Luu
Finally got a toy version of Structured Streaming DataSource V2 version with
Apache Spark 2.3 working.  Tested locally and on Databricks community
edition.

Source code is here - https://github.com/hienluu/wikiedit-streaming





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing custom Structured Streaming receiver

2018-03-03 Thread Hien Luu
I finally got around to implement a custom structured streaming receiver
(source) to read Wikipedia edit events from the IRC server.

It works fines locally as well as in spark-shell on my laptop.  However, it
failed with the following exception when running in Databricks community
edition.

It seems like there is no way to create DataFrame with isStreaming as true
if a class resides outside of spark.sql package.  

I was looking at RateSourceProvider class that comes with Apache Spark and
it is using an internal function to create a DataFrame with isStreaming as
true - sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)

Does anyone have any suggestion? (other than waiting for DataSourceV2)

assertion failed: DataFrame returned by getBatch from 
did not have isStreaming=true

org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$11.apply(StreamExecution.scala:674)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$11.apply(StreamExecution.scala:669)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at
org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:669)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:669)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:62)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:668)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:328)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:316)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:316)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:62)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:316)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:312)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226)



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing custom Structured Streaming receiver

2017-11-28 Thread Hien Luu
Cool.  Thanks nezhazheng.  I will give it a shot.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing custom Structured Streaming receiver

2017-11-20 Thread nezhazheng
Hi Hien,

You can write your own Source or Sink either through 
SPI(https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html 
).
Below in an example that implement kafka 0.8 source.

https://github.com/jerryshao/spark-kafka-0-8-sql 

—
郑舒力

> 在 2017年11月21日,上午9:40,Hien Luu > 
> 写道:
> 
> Hi TD,
> 
> I looked at DataStreamReader class and looks like we can specify an FQCN as
> a source (provided that it implements trait Source). The
> DataSource.lookupDataSource function will try to load this FQCN during the
> creation of a DataSource object instance inside the DataStreamReader.load(). 
> Will this work?
> 
> I am curious if anyone has tried this yet?
> 
> I am going to give a shot to see if this works.
> 
> Thanks,
> 
> Hien
> 
> 
> 
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ 
> 
> 
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 



Re: Writing custom Structured Streaming receiver

2017-11-20 Thread Hien Luu
Hi TD,

I looked at DataStreamReader class and looks like we can specify an FQCN as
a source (provided that it implements trait Source). The
DataSource.lookupDataSource function will try to load this FQCN during the
creation of a DataSource object instance inside the DataStreamReader.load(). 
Will this work?

I am curious if anyone has tried this yet?

I am going to give a shot to see if this works.

Thanks,

Hien



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Writing custom Structured Streaming receiver

2017-11-01 Thread Tathagata Das
Structured Streaming source APIs are not yet public, so there isnt a guide.
However, if you are adventurous enough, you can take a look at the source
code in Spark.
Source API:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
Text socket source implementation:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala

Note that these APIs are still internal APIs and are very likely to change
in future versions of Spark.


On Wed, Nov 1, 2017 at 5:45 PM, Daniel Haviv  wrote:

> Hi,
> Is there a guide to writing a custom Structured Streaming receiver?
>
> Thank you.
> Daniel
>