Sorry, I think I was a little unclear. There are two things at play here. - Exactly-once semantics with file output: spark writes out extra metadata on which files are valid to ensure that failures don't cause us to "double count" any of the input. Spark 2.0+ detects this info automatically when you use dataframe reader (spark.read...). There may be extra files, but they will be ignored. If you are consuming the output with another system you'll have to take this into account. - Retries: right now we always retry the last batch when restarting. This is safe/correct because of the above, but we could also optimize this away by tracking more information about batch progress.
On Tue, Feb 7, 2017 at 12:25 PM, Sam Elamin <hussam.ela...@gmail.com> wrote: > Hmm ok I understand that but the job is running for a good few mins before > I kill it so there should not be any jobs left because I can see in the log > that its now polling for new changes, the latest offset is the right one > > After I kill it and relaunch it picks up that same file? > > > Sorry if I misunderstood you > > On Tue, Feb 7, 2017 at 5:20 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> It is always possible that there will be extra jobs from failed batches. >> However, for the file sink, only one set of files will make it into >> _spark_metadata directory log. This is how we get atomic commits even when >> there are files in more than one directory. When reading the files with >> Spark, we'll detect this directory and use it instead of listStatus to find >> the list of valid files. >> >> On Tue, Feb 7, 2017 at 9:05 AM, Sam Elamin <hussam.ela...@gmail.com> >> wrote: >> >>> On another note, when it comes to checkpointing on structured streaming >>> >>> I noticed if I have a stream running off s3 and I kill the process. The >>> next time the process starts running it dulplicates the last record >>> inserted. is that normal? >>> >>> >>> >>> >>> So say I have streaming enabled on one folder "test" which only has two >>> files "update1" and "update 2", then I kill the spark job using Ctrl+C. >>> When I rerun the stream it picks up "update 2" again >>> >>> Is this normal? isnt ctrl+c a failure? >>> >>> I would expect checkpointing to know that update 2 was already processed >>> >>> Regards >>> Sam >>> >>> On Tue, Feb 7, 2017 at 4:58 PM, Sam Elamin <hussam.ela...@gmail.com> >>> wrote: >>> >>>> Thanks Micheal! >>>> >>>> >>>> >>>> On Tue, Feb 7, 2017 at 4:49 PM, Michael Armbrust < >>>> mich...@databricks.com> wrote: >>>> >>>>> Here a JIRA: https://issues.apache.org/jira/browse/SPARK-19497 >>>>> >>>>> We should add this soon. >>>>> >>>>> On Tue, Feb 7, 2017 at 8:35 AM, Sam Elamin <hussam.ela...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi All >>>>>> >>>>>> When trying to read a stream off S3 and I try and drop duplicates I >>>>>> get the following error: >>>>>> >>>>>> Exception in thread "main" org.apache.spark.sql.AnalysisException: >>>>>> Append output mode not supported when there are streaming aggregations on >>>>>> streaming DataFrames/DataSets;; >>>>>> >>>>>> >>>>>> Whats strange if I use the batch "spark.read.json", it works >>>>>> >>>>>> Can I assume you cant drop duplicates in structured streaming >>>>>> >>>>>> Regards >>>>>> Sam >>>>>> >>>>> >>>>> >>>> >>> >> >