In a batch pipeline, is it considered a data loss if the the stage fails
(assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
might be better to favor correctness and fail in current implementation.

On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw <rober...@google.com> wrote:

> You could add a step to delete all of dest before a barrier and the
> step that does the rename as outlined. In that case, any dest file
> that exists must be good.
>
> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov <kirpic...@google.com>
> wrote:
> > I think this is still unsafe in case exists(dst) (e.g. this is a re-run
> of a
> > pipeline) but src is missing due to some bad reason. However it's
> probably
> > better than what we have (e.g. we currently certainly don't perform
> checksum
> > checks).
> >
> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri <eh...@google.com> wrote:
> >>
> >> For GCS, I would do what I believe we already do.
> >> rename(src, dst):
> >> - if !exists(src) and exists(dst) return 0
> >> - if !exists(src) and !exists(dst) return error
> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
> >> return 0 else delete(dst) }
> >> - Start a GCS copy from src to dst.
> >> - Wait for GCS copy to complete.
> >> - delete(src)
> >>
> >> For filesystems that don't have checksum() metadata, size() can be used
> >> instead.
> >>
> >> I've opened a bug to track this:
> >> https://issues.apache.org/jira/browse/BEAM-3600
> >>
> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov <kirpic...@google.com>
> >> wrote:
> >>>
> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
> files
> >>> that are missing for more ominous reasons than just this being a
> non-first
> >>> attempt at renaming src to dst. E.g. if there was a bug in
> constructing the
> >>> filename to be renamed, or if we somehow messed up the order of rename
> vs
> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would lead
> to
> >>> silent data loss (likely caught by unit tests though - so this is not a
> >>> super serious issue).
> >>>
> >>> Basically I just can't think of a case when I was copying files and
> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
> copying
> >>> doesn't exist" - the option exists only because we couldn't come up
> with
> >>> another way to implement idempotent rename on GCS.
> >>>
> >>> What's your idea of how a safe retryable GCS rename() could work?
> >>>
> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <eh...@google.com> wrote:
> >>>>
> >>>> Eugene, if I get this right, you're saying that IGNORE_MISSING_FILES
> is
> >>>> unsafe because it will skip (src, dst) pairs where neither exist? (it
> only
> >>>> looks if src exists)
> >>>>
> >>>> For GCS, we can construct a safe retryable rename() operation,
> assuming
> >>>> that copy() and delete() are atomic for a single file or pair of
> files.
> >>>>
> >>>>
> >>>>
> >>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <rang...@google.com>
> wrote:
> >>>>>
> >>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
> >>>>> <kirpic...@google.com> wrote:
> >>>>>>
> >>>>>> As far as I know, the current implementation of file sinks is the
> only
> >>>>>> reason why the flag IGNORE_MISSING for copying even exists -
> there's no
> >>>>>> other compelling reason to justify it. We implement "rename" as
> "copy, then
> >>>>>> delete" (in a single DoFn), so for idempodency of this operation we
> need to
> >>>>>> ignore the copying of a non-existent file.
> >>>>>>
> >>>>>> I think the right way to go would be to change the implementation of
> >>>>>> renaming to have a @RequiresStableInput (or reshuffle) in the
> middle, so
> >>>>>> it's made of 2 individually idempotent operations:
> >>>>>> 1) copy, which would fail if input is missing, and would overwrite
> >>>>>> output if it exists
> >>>>>> -- reshuffle --
> >>>>>> 2) delete, which would not fail if input is missing.
> >>>>>
> >>>>>
> >>>>> Something like this is needed only in streaming, right?
> >>>>>
> >>>>> Raghu.
> >>>>>
> >>>>>>
> >>>>>> That way first everything is copied (possibly via multiple
> attempts),
> >>>>>> and then old files are deleted (possibly via multiple attempts).
> >>>>>>
> >>>>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <eh...@google.com> wrote:
> >>>>>>>
> >>>>>>> I agree that overwriting is more in line with user expectations.
> >>>>>>> I believe that the sink should not ignore errors from the
> filesystem
> >>>>>>> layer. Instead, the FileSystem API should be more well defined.
> >>>>>>> Examples: rename() and copy() should overwrite existing files at
> the
> >>>>>>> destination, copy() should have an ignore_missing flag.
> >>>>>>>
> >>>>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <rang...@google.com>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Original mail mentions that output from second run of word_count
> is
> >>>>>>>> ignored. That does not seem as safe as ignoring error from a
> second attempt
> >>>>>>>> of a step. How do we know second run didn't run on different
> output?
> >>>>>>>> Overwriting seems more accurate than ignoring. Does handling this
> error at
> >>>>>>>> sink level distinguish between the two (another run vs second
> attempt)?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <eh...@google.com>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Yeah, another round of refactoring is due to move the rename via
> >>>>>>>>> copy+delete logic up to the file-based sink level.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath
> >>>>>>>>> <chamik...@google.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Good point. There's always the chance of step that performs
> final
> >>>>>>>>>> rename being retried. So we'll have to ignore this error at the
> sink level.
> >>>>>>>>>> We don't necessarily have to do this at the FileSystem level
> though. I think
> >>>>>>>>>> the proper behavior might be to raise an error for the rename
> at the
> >>>>>>>>>> FileSystem level if the destination already exists (or source
> doesn't exist)
> >>>>>>>>>> while ignoring that error (and possibly logging a warning) at
> the sink
> >>>>>>>>>> level.
> >>>>>>>>>>
> >>>>>>>>>> - Cham
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <re...@google.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> I think the idea was to ignore "already exists" errors. The
> >>>>>>>>>>> reason being that any step in Beam can be executed multiple
> times, including
> >>>>>>>>>>> the rename step. If the rename step gets run twice, the second
> run should
> >>>>>>>>>>> succeed vacuously.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <eh...@google.com>
> >>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi,
> >>>>>>>>>>>> I've been working on HDFS code for the Python SDK and I've
> >>>>>>>>>>>> noticed some behaviors which are surprising. I wanted to know
> if these
> >>>>>>>>>>>> behaviors are known and intended.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. When renaming files during finalize_write, rename errors
> are
> >>>>>>>>>>>> ignored. For example, if I run wordcount twice using HDFS
> code I get a
> >>>>>>>>>>>> warning the second time because the file already exists:
> >>>>>>>>>>>>
> >>>>>>>>>>>> WARNING:root:Rename not successful:
> >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
> >>>>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming
> >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
> >>>>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to
> rename
> >>>>>>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da2
> 45/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
> >>>>>>>>>>>> to '/counts2-00000-of-00001'.
> >>>>>>>>>>>>
> >>>>>>>>>>>> For GCS and local files there are no rename errors (in this
> >>>>>>>>>>>> case), since the rename operation silently overwrites
> existing destination
> >>>>>>>>>>>> files. However, blindly ignoring these errors might make the
> pipeline to
> >>>>>>>>>>>> report success even though output files are missing.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2. Output files (--ouput) overwrite existing files.
> >>>>>>>>>>>>
> >>>>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK
> >>>>>>>>>>>> doesn't use Filesystem.Rename().
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> - Udi
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>
> >
>

Reply via email to