Nice, thanks again Michael for helping out. Dmitry
2017-09-14 21:37 GMT+03:00 Michael Armbrust <mich...@databricks.com>: > Yep, that is correct. You can also use the query ID which is a GUID that > is stored in the checkpoint and preserved across restarts if you want to > distinguish the batches from different streams. > > sqlContext.sparkContext.getLocalProperty(StreamExecution.QUERY_ID_KEY) > > This was added recently > <https://github.com/apache/spark/commit/2d968a07d211688a9c588deb859667dd8b653b27> > though. > > On Thu, Sep 14, 2017 at 3:40 AM, Dmitry Naumenko <dm.naume...@gmail.com> > wrote: > >> Ok. So since I can get repeated batch ids, I guess I can just store the >> last committed batch id in my storage (in the same transaction with the >> data) and initialize the custom sink with right batch id when application >> re-starts. After this just ignore batch if current batchId <= >> latestBatchId. >> >> Dmitry >> >> >> 2017-09-13 22:12 GMT+03:00 Michael Armbrust <mich...@databricks.com>: >> >>> I think the right way to look at this is the batchId is just a proxy for >>> offsets that is agnostic to what type of source you are reading from (or >>> how many sources their are). We might call into a custom sink with the >>> same batchId more than once, but it will always contain the same data >>> (there is no race condition, since this is stored in a write-ahead log). >>> As long as you check/commit the batch id in the same transaction as the >>> data you will get exactly once. >>> >>> On Wed, Sep 13, 2017 at 1:25 AM, Dmitry Naumenko <dm.naume...@gmail.com> >>> wrote: >>> >>>> Thanks, I see. >>>> >>>> However, I guess reading from checkpoint directory might be less >>>> efficient comparing just preserving offsets in Dataset. >>>> >>>> I have one more question about operation idempotence (hope it help >>>> others to have a clear picture). >>>> >>>> If I read offsets on re-start from RDBMS and manually specify starting >>>> offsets on Kafka Source, is it still possible that in case of any failure I >>>> got a situation where the duplicate batch id will go to a Custom Sink? >>>> >>>> Previously on DStream, you will just read offsets from storage on start >>>> and just write them into DB in one transaction with data and it's was >>>> enough for "exactly-once". Please, correct me if I made a mistake here. So >>>> does the same strategy will work with Structured Streaming? >>>> >>>> I guess, that in case of Structured Streaming, Spark will commit batch >>>> offset to a checkpoint directory and there can be a race condition where >>>> you can commit your data with offsets into DB, but Spark will fail to >>>> commit the batch id, and some kind of automatic retry happen. If this is >>>> true, is it possible to disable this automatic re-try, so I can still use >>>> unified API for batch/streaming with my own re-try logic (which is >>>> basically, just ignore intermediate data, re-read from Kafka and re-try >>>> processing and load)? >>>> >>>> Dmitry >>>> >>>> >>>> 2017-09-12 22:43 GMT+03:00 Michael Armbrust <mich...@databricks.com>: >>>> >>>>> In the checkpoint directory there is a file /offsets/$batchId that >>>>> holds the offsets serialized as JSON. I would not consider this a public >>>>> stable API though. >>>>> >>>>> Really the only important thing to get exactly once is that you must >>>>> ensure whatever operation you are doing downstream is idempotent with >>>>> respect to the batchId. For example, if you are writing to an RDBMS you >>>>> could have a table that records the batch ID and update that in the same >>>>> transaction as you append the results of the batch. Before trying to >>>>> append you should check that batch ID and make sure you have not already >>>>> committed. >>>>> >>>>> On Tue, Sep 12, 2017 at 11:48 AM, Dmitry Naumenko < >>>>> dm.naume...@gmail.com> wrote: >>>>> >>>>>> Thanks for response, Michael >>>>>> >>>>>> > You should still be able to get exactly once processing by using >>>>>> the batchId that is passed to the Sink. >>>>>> >>>>>> Could you explain this in more detail, please? Is there some kind of >>>>>> offset manager API that works as get-offset by batch id lookup table? >>>>>> >>>>>> Dmitry >>>>>> >>>>>> 2017-09-12 20:29 GMT+03:00 Michael Armbrust <mich...@databricks.com>: >>>>>> >>>>>>> I think that we are going to have to change the Sink API as part of >>>>>>> SPARK-20928 <https://issues-test.apache.org/jira/browse/SPARK-20928>, >>>>>>> which is why I linked these tickets together. I'm still targeting an >>>>>>> initial version for Spark 2.3 which should happen sometime towards the >>>>>>> end >>>>>>> of the year. >>>>>>> >>>>>>> There are some misconceptions in that stack overflow answer that I >>>>>>> can correct. Until we improve the Source API, You should still be able >>>>>>> to >>>>>>> get exactly once processing by using the batchId that is passed to >>>>>>> the Sink. We guarantee that the offsets present at any given batch >>>>>>> ID will be the same across retries by recording this information in the >>>>>>> checkpoint's WAL. The checkpoint does not use java serialization (like >>>>>>> DStreams does) and can be used even after upgrading Spark. >>>>>>> >>>>>>> >>>>>>> On Tue, Sep 12, 2017 at 12:45 AM, Dmitry Naumenko < >>>>>>> dm.naume...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks, Cody >>>>>>>> >>>>>>>> Unfortunately, it seems to be there is no active development right >>>>>>>> now. Maybe I can step in and help with it somehow? >>>>>>>> >>>>>>>> Dmitry >>>>>>>> >>>>>>>> 2017-09-11 21:01 GMT+03:00 Cody Koeninger <c...@koeninger.org>: >>>>>>>> >>>>>>>>> https://issues-test.apache.org/jira/browse/SPARK-18258 >>>>>>>>> >>>>>>>>> On Mon, Sep 11, 2017 at 7:15 AM, Dmitry Naumenko < >>>>>>>>> dm.naume...@gmail.com> wrote: >>>>>>>>> > Hi all, >>>>>>>>> > >>>>>>>>> > It started as a discussion in >>>>>>>>> > https://stackoverflow.com/questions/46153105/how-to-get-kafk >>>>>>>>> a-offsets-with-spark-structured-streaming-api. >>>>>>>>> > >>>>>>>>> > So the problem that there is no support in Public API to obtain >>>>>>>>> the Kafka >>>>>>>>> > (or Kineses) offsets. For example, if you want to save offsets >>>>>>>>> in external >>>>>>>>> > storage in Custom Sink, you should : >>>>>>>>> > 1) preserve topic, partition and offset across all transform >>>>>>>>> operations of >>>>>>>>> > Dataset (based on hard-coded Kafka schema) >>>>>>>>> > 2) make a manual group by partition/offset with aggregate max >>>>>>>>> offset >>>>>>>>> > >>>>>>>>> > Structured Streaming doc says "Every streaming source is assumed >>>>>>>>> to have >>>>>>>>> > offsets", so why it's not a part of Public API? What do you >>>>>>>>> think about >>>>>>>>> > supporting it? >>>>>>>>> > >>>>>>>>> > Dmitry >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >