Pipeline stages must be retry-tolerant. E.g. the VM it's running on
might get shut down. We should not be failing jobs in this case.

It seems the current implementation could only produce bad results if
(1) unrelated output files already existed and (2) the temporary files
were either not written or deleted out-of-band. (2) seems really
unlikely, but can be eliminated if we ensure that (1) cannot happen
(e.g. deleting destination files before starting the rename).

On Fri, Feb 2, 2018 at 12:13 PM, Reuven Lax <re...@google.com> wrote:
>
>
> On Fri, Feb 2, 2018 at 11:17 AM, Chamikara Jayalath <chamik...@google.com>
> wrote:
>>
>> Currently, Python file-based sink is batch only.
>
>
> Sure, but that won't be true forever.
>
>>
>>
>> Regarding Raghu's question, stage/pipeline failure should not be
>> considered as a data loss but I prefer overriding existing output and
>> completing a possibly expensive pipeline over failing the whole pipeline due
>> to one or more existing files.
>>
>> - Cham
>>
>>
>> On Fri, Feb 2, 2018 at 10:21 AM Reuven Lax <re...@google.com> wrote:
>>>
>>> However this code might run in streaming as well, right?
>>>
>>> On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi <rang...@google.com> wrote:
>>>>
>>>> In a batch pipeline, is it considered a data loss if the the stage fails
>>>> (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
>>>> might be better to favor correctness and fail in current implementation.
>>>>
>>>>
>>>> On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>>
>>>>> You could add a step to delete all of dest before a barrier and the
>>>>> step that does the rename as outlined. In that case, any dest file
>>>>> that exists must be good.
>>>>>
>>>>> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov <kirpic...@google.com>
>>>>> wrote:
>>>>> > I think this is still unsafe in case exists(dst) (e.g. this is a
>>>>> > re-run of a
>>>>> > pipeline) but src is missing due to some bad reason. However it's
>>>>> > probably
>>>>> > better than what we have (e.g. we currently certainly don't perform
>>>>> > checksum
>>>>> > checks).
>>>>> >
>>>>> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri <eh...@google.com> wrote:
>>>>> >>
>>>>> >> For GCS, I would do what I believe we already do.
>>>>> >> rename(src, dst):
>>>>> >> - if !exists(src) and exists(dst) return 0
>>>>> >> - if !exists(src) and !exists(dst) return error
>>>>> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
>>>>> >> return 0 else delete(dst) }
>>>>> >> - Start a GCS copy from src to dst.
>>>>> >> - Wait for GCS copy to complete.
>>>>> >> - delete(src)
>>>>> >>
>>>>> >> For filesystems that don't have checksum() metadata, size() can be
>>>>> >> used
>>>>> >> instead.
>>>>> >>
>>>>> >> I've opened a bug to track this:
>>>>> >> https://issues.apache.org/jira/browse/BEAM-3600
>>>>> >>
>>>>> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov
>>>>> >> <kirpic...@google.com>
>>>>> >> wrote:
>>>>> >>>
>>>>> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
>>>>> >>> files
>>>>> >>> that are missing for more ominous reasons than just this being a
>>>>> >>> non-first
>>>>> >>> attempt at renaming src to dst. E.g. if there was a bug in
>>>>> >>> constructing the
>>>>> >>> filename to be renamed, or if we somehow messed up the order of
>>>>> >>> rename vs
>>>>> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would
>>>>> >>> lead to
>>>>> >>> silent data loss (likely caught by unit tests though - so this is
>>>>> >>> not a
>>>>> >>> super serious issue).
>>>>> >>>
>>>>> >>> Basically I just can't think of a case when I was copying files and
>>>>> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
>>>>> >>> copying
>>>>> >>> doesn't exist" - the option exists only because we couldn't come up
>>>>> >>> with
>>>>> >>> another way to implement idempotent rename on GCS.
>>>>> >>>
>>>>> >>> What's your idea of how a safe retryable GCS rename() could work?
>>>>> >>>
>>>>> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <eh...@google.com> wrote:
>>>>> >>>>
>>>>> >>>> Eugene, if I get this right, you're saying that
>>>>> >>>> IGNORE_MISSING_FILES is
>>>>> >>>> unsafe because it will skip (src, dst) pairs where neither exist?
>>>>> >>>> (it only
>>>>> >>>> looks if src exists)
>>>>> >>>>
>>>>> >>>> For GCS, we can construct a safe retryable rename() operation,
>>>>> >>>> assuming
>>>>> >>>> that copy() and delete() are atomic for a single file or pair of
>>>>> >>>> files.
>>>>> >>>>
>>>>> >>>>
>>>>> >>>>
>>>>> >>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <rang...@google.com>
>>>>> >>>> wrote:
>>>>> >>>>>
>>>>> >>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
>>>>> >>>>> <kirpic...@google.com> wrote:
>>>>> >>>>>>
>>>>> >>>>>> As far as I know, the current implementation of file sinks is
>>>>> >>>>>> the only
>>>>> >>>>>> reason why the flag IGNORE_MISSING for copying even exists -
>>>>> >>>>>> there's no
>>>>> >>>>>> other compelling reason to justify it. We implement "rename" as
>>>>> >>>>>> "copy, then
>>>>> >>>>>> delete" (in a single DoFn), so for idempodency of this operation
>>>>> >>>>>> we need to
>>>>> >>>>>> ignore the copying of a non-existent file.
>>>>> >>>>>>
>>>>> >>>>>> I think the right way to go would be to change the
>>>>> >>>>>> implementation of
>>>>> >>>>>> renaming to have a @RequiresStableInput (or reshuffle) in the
>>>>> >>>>>> middle, so
>>>>> >>>>>> it's made of 2 individually idempotent operations:
>>>>> >>>>>> 1) copy, which would fail if input is missing, and would
>>>>> >>>>>> overwrite
>>>>> >>>>>> output if it exists
>>>>> >>>>>> -- reshuffle --
>>>>> >>>>>> 2) delete, which would not fail if input is missing.
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> Something like this is needed only in streaming, right?
>>>>> >>>>>
>>>>> >>>>> Raghu.
>>>>> >>>>>
>>>>> >>>>>>
>>>>> >>>>>> That way first everything is copied (possibly via multiple
>>>>> >>>>>> attempts),
>>>>> >>>>>> and then old files are deleted (possibly via multiple attempts).
>>>>> >>>>>>
>>>>> >>>>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <eh...@google.com>
>>>>> >>>>>> wrote:
>>>>> >>>>>>>
>>>>> >>>>>>> I agree that overwriting is more in line with user
>>>>> >>>>>>> expectations.
>>>>> >>>>>>> I believe that the sink should not ignore errors from the
>>>>> >>>>>>> filesystem
>>>>> >>>>>>> layer. Instead, the FileSystem API should be more well defined.
>>>>> >>>>>>> Examples: rename() and copy() should overwrite existing files
>>>>> >>>>>>> at the
>>>>> >>>>>>> destination, copy() should have an ignore_missing flag.
>>>>> >>>>>>>
>>>>> >>>>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi
>>>>> >>>>>>> <rang...@google.com>
>>>>> >>>>>>> wrote:
>>>>> >>>>>>>>
>>>>> >>>>>>>> Original mail mentions that output from second run of
>>>>> >>>>>>>> word_count is
>>>>> >>>>>>>> ignored. That does not seem as safe as ignoring error from a
>>>>> >>>>>>>> second attempt
>>>>> >>>>>>>> of a step. How do we know second run didn't run on different
>>>>> >>>>>>>> output?
>>>>> >>>>>>>> Overwriting seems more accurate than ignoring. Does handling
>>>>> >>>>>>>> this error at
>>>>> >>>>>>>> sink level distinguish between the two (another run vs second
>>>>> >>>>>>>> attempt)?
>>>>> >>>>>>>>
>>>>> >>>>>>>>
>>>>> >>>>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <eh...@google.com>
>>>>> >>>>>>>> wrote:
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> Yeah, another round of refactoring is due to move the rename
>>>>> >>>>>>>>> via
>>>>> >>>>>>>>> copy+delete logic up to the file-based sink level.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath
>>>>> >>>>>>>>> <chamik...@google.com> wrote:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Good point. There's always the chance of step that performs
>>>>> >>>>>>>>>> final
>>>>> >>>>>>>>>> rename being retried. So we'll have to ignore this error at
>>>>> >>>>>>>>>> the sink level.
>>>>> >>>>>>>>>> We don't necessarily have to do this at the FileSystem level
>>>>> >>>>>>>>>> though. I think
>>>>> >>>>>>>>>> the proper behavior might be to raise an error for the
>>>>> >>>>>>>>>> rename at the
>>>>> >>>>>>>>>> FileSystem level if the destination already exists (or
>>>>> >>>>>>>>>> source doesn't exist)
>>>>> >>>>>>>>>> while ignoring that error (and possibly logging a warning)
>>>>> >>>>>>>>>> at the sink
>>>>> >>>>>>>>>> level.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> - Cham
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax
>>>>> >>>>>>>>>> <re...@google.com>
>>>>> >>>>>>>>>> wrote:
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> I think the idea was to ignore "already exists" errors. The
>>>>> >>>>>>>>>>> reason being that any step in Beam can be executed multiple
>>>>> >>>>>>>>>>> times, including
>>>>> >>>>>>>>>>> the rename step. If the rename step gets run twice, the
>>>>> >>>>>>>>>>> second run should
>>>>> >>>>>>>>>>> succeed vacuously.
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri
>>>>> >>>>>>>>>>> <eh...@google.com>
>>>>> >>>>>>>>>>> wrote:
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> Hi,
>>>>> >>>>>>>>>>>> I've been working on HDFS code for the Python SDK and I've
>>>>> >>>>>>>>>>>> noticed some behaviors which are surprising. I wanted to
>>>>> >>>>>>>>>>>> know if these
>>>>> >>>>>>>>>>>> behaviors are known and intended.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> 1. When renaming files during finalize_write, rename
>>>>> >>>>>>>>>>>> errors are
>>>>> >>>>>>>>>>>> ignored. For example, if I run wordcount twice using HDFS
>>>>> >>>>>>>>>>>> code I get a
>>>>> >>>>>>>>>>>> warning the second time because the file already exists:
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> WARNING:root:Rename not successful:
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>>>> >>>>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in
>>>>> >>>>>>>>>>>> renaming
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>>>> >>>>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to
>>>>> >>>>>>>>>>>> rename
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
>>>>> >>>>>>>>>>>> to '/counts2-00000-of-00001'.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> For GCS and local files there are no rename errors (in
>>>>> >>>>>>>>>>>> this
>>>>> >>>>>>>>>>>> case), since the rename operation silently overwrites
>>>>> >>>>>>>>>>>> existing destination
>>>>> >>>>>>>>>>>> files. However, blindly ignoring these errors might make
>>>>> >>>>>>>>>>>> the pipeline to
>>>>> >>>>>>>>>>>> report success even though output files are missing.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> 2. Output files (--ouput) overwrite existing files.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java
>>>>> >>>>>>>>>>>> SDK
>>>>> >>>>>>>>>>>> doesn't use Filesystem.Rename().
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> Thanks,
>>>>> >>>>>>>>>>>> - Udi
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>
>>>>> >
>>>>
>>>>
>>>
>

Reply via email to