In a batch pipeline, is it considered a data loss if the the stage fails (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it might be better to favor correctness and fail in current implementation.
On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw <rober...@google.com> wrote: > You could add a step to delete all of dest before a barrier and the > step that does the rename as outlined. In that case, any dest file > that exists must be good. > > On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov <kirpic...@google.com> > wrote: > > I think this is still unsafe in case exists(dst) (e.g. this is a re-run > of a > > pipeline) but src is missing due to some bad reason. However it's > probably > > better than what we have (e.g. we currently certainly don't perform > checksum > > checks). > > > > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri <eh...@google.com> wrote: > >> > >> For GCS, I would do what I believe we already do. > >> rename(src, dst): > >> - if !exists(src) and exists(dst) return 0 > >> - if !exists(src) and !exists(dst) return error > >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst) > >> return 0 else delete(dst) } > >> - Start a GCS copy from src to dst. > >> - Wait for GCS copy to complete. > >> - delete(src) > >> > >> For filesystems that don't have checksum() metadata, size() can be used > >> instead. > >> > >> I've opened a bug to track this: > >> https://issues.apache.org/jira/browse/BEAM-3600 > >> > >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov <kirpic...@google.com> > >> wrote: > >>> > >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore > files > >>> that are missing for more ominous reasons than just this being a > non-first > >>> attempt at renaming src to dst. E.g. if there was a bug in > constructing the > >>> filename to be renamed, or if we somehow messed up the order of rename > vs > >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead > to > >>> silent data loss (likely caught by unit tests though - so this is not a > >>> super serious issue). > >>> > >>> Basically I just can't think of a case when I was copying files and > >>> thinking "oh man, I wish it didn't give an error if the stuff I'm > copying > >>> doesn't exist" - the option exists only because we couldn't come up > with > >>> another way to implement idempotent rename on GCS. > >>> > >>> What's your idea of how a safe retryable GCS rename() could work? > >>> > >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <eh...@google.com> wrote: > >>>> > >>>> Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES > is > >>>> unsafe because it will skip (src, dst) pairs where neither exist? (it > only > >>>> looks if src exists) > >>>> > >>>> For GCS, we can construct a safe retryable rename() operation, > assuming > >>>> that copy() and delete() are atomic for a single file or pair of > files. > >>>> > >>>> > >>>> > >>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <rang...@google.com> > wrote: > >>>>> > >>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov > >>>>> <kirpic...@google.com> wrote: > >>>>>> > >>>>>> As far as I know, the current implementation of file sinks is the > only > >>>>>> reason why the flag IGNORE_MISSING for copying even exists - > there's no > >>>>>> other compelling reason to justify it. We implement "rename" as > "copy, then > >>>>>> delete" (in a single DoFn), so for idempodency of this operation we > need to > >>>>>> ignore the copying of a non-existent file. > >>>>>> > >>>>>> I think the right way to go would be to change the implementation of > >>>>>> renaming to have a @RequiresStableInput (or reshuffle) in the > middle, so > >>>>>> it's made of 2 individually idempotent operations: > >>>>>> 1) copy, which would fail if input is missing, and would overwrite > >>>>>> output if it exists > >>>>>> -- reshuffle -- > >>>>>> 2) delete, which would not fail if input is missing. > >>>>> > >>>>> > >>>>> Something like this is needed only in streaming, right? > >>>>> > >>>>> Raghu. > >>>>> > >>>>>> > >>>>>> That way first everything is copied (possibly via multiple > attempts), > >>>>>> and then old files are deleted (possibly via multiple attempts). > >>>>>> > >>>>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <eh...@google.com> wrote: > >>>>>>> > >>>>>>> I agree that overwriting is more in line with user expectations. > >>>>>>> I believe that the sink should not ignore errors from the > filesystem > >>>>>>> layer. Instead, the FileSystem API should be more well defined. > >>>>>>> Examples: rename() and copy() should overwrite existing files at > the > >>>>>>> destination, copy() should have an ignore_missing flag. > >>>>>>> > >>>>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <rang...@google.com> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> Original mail mentions that output from second run of word_count > is > >>>>>>>> ignored. That does not seem as safe as ignoring error from a > second attempt > >>>>>>>> of a step. How do we know second run didn't run on different > output? > >>>>>>>> Overwriting seems more accurate than ignoring. Does handling this > error at > >>>>>>>> sink level distinguish between the two (another run vs second > attempt)? > >>>>>>>> > >>>>>>>> > >>>>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <eh...@google.com> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Yeah, another round of refactoring is due to move the rename via > >>>>>>>>> copy+delete logic up to the file-based sink level. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath > >>>>>>>>> <chamik...@google.com> wrote: > >>>>>>>>>> > >>>>>>>>>> Good point. There's always the chance of step that performs > final > >>>>>>>>>> rename being retried. So we'll have to ignore this error at the > sink level. > >>>>>>>>>> We don't necessarily have to do this at the FileSystem level > though. I think > >>>>>>>>>> the proper behavior might be to raise an error for the rename > at the > >>>>>>>>>> FileSystem level if the destination already exists (or source > doesn't exist) > >>>>>>>>>> while ignoring that error (and possibly logging a warning) at > the sink > >>>>>>>>>> level. > >>>>>>>>>> > >>>>>>>>>> - Cham > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <re...@google.com> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> I think the idea was to ignore "already exists" errors. The > >>>>>>>>>>> reason being that any step in Beam can be executed multiple > times, including > >>>>>>>>>>> the rename step. If the rename step gets run twice, the second > run should > >>>>>>>>>>> succeed vacuously. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <eh...@google.com> > >>>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi, > >>>>>>>>>>>> I've been working on HDFS code for the Python SDK and I've > >>>>>>>>>>>> noticed some behaviors which are surprising. I wanted to know > if these > >>>>>>>>>>>> behaviors are known and intended. > >>>>>>>>>>>> > >>>>>>>>>>>> 1. When renaming files during finalize_write, rename errors > are > >>>>>>>>>>>> ignored. For example, if I run wordcount twice using HDFS > code I get a > >>>>>>>>>>>> warning the second time because the file already exists: > >>>>>>>>>>>> > >>>>>>>>>>>> WARNING:root:Rename not successful: > >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2 > 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 > >>>>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming > >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2 > 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 > >>>>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to > rename > >>>>>>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2 > 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' > >>>>>>>>>>>> to '/counts2-00000-of-00001'. > >>>>>>>>>>>> > >>>>>>>>>>>> For GCS and local files there are no rename errors (in this > >>>>>>>>>>>> case), since the rename operation silently overwrites > existing destination > >>>>>>>>>>>> files. However, blindly ignoring these errors might make the > pipeline to > >>>>>>>>>>>> report success even though output files are missing. > >>>>>>>>>>>> > >>>>>>>>>>>> 2. Output files (--ouput) overwrite existing files. > >>>>>>>>>>>> > >>>>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK > >>>>>>>>>>>> doesn't use Filesystem.Rename(). > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks, > >>>>>>>>>>>> - Udi > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > > >