Hi,

I've been looking over the Source API in
org.apache.spark.sql.execution.streaming, and I'm at a loss for how the
current API can be implemented in a practical way. The API defines a single
getBatch() method for fetching records from the source, with the following
Scaladoc comments defining the semantics:


*/**  * Returns the data that is between the offsets (*`*start*`*, *`*end*`*].
When *`*start*` *is *`*None*`

*then  * the batch should begin with the first available record. This
method must always return the  * same data for a particular *`*start*` *and
*`*end*`
*pair.  */*
* def *getBatch(start: Option[Offset], end: Offset): DataFrame

If I read the semantics described here correctly, a Source is required to
retain all past history for the stream that it backs. Further, a Source is
also required to retain this data across restarts of the process where the
Source is instantiated, even when the Source is restarted on a different
machine.

The current implementation of FileStreamSource follows my reading of the
requirements above. FileStreamSource never deletes a file.

I feel like this requirement for unbounded state retention must be a
mistake or misunderstanding of some kind. The scheduler is internally
maintaining a high water mark (StreamExecution.committedOffsets in
StreamExecution.scala) of data that has been successfully processed. There
must have been an intent to communicate that high water mark back to the
Source so that the Source can clean up its state. Indeed, the DataBricks
blog post from last week (
https://databricks.com/blog/2016/07/28/structured-streaming-in-apache-spark.html)
says that "Only a few minutes’ worth of data needs to be retained;
Structured Streaming will maintain its own internal state after that."

But the code checked into git and shipped with Spark 2.0 does not have an
API call for the scheduler to tell a Source where the boundary of "only a
few minutes' worth of data" lies.

Is there a JIRA that I'm not aware of to change the Source API? If not,
should we maybe open one?

Fred

Reply via email to