Currently, Python file-based sink is batch only.

Regarding Raghu's question, stage/pipeline failure should not be considered
as a data loss but I prefer overriding existing output and completing a
possibly expensive pipeline over failing the whole pipeline due to one or
more existing files.

- Cham

On Fri, Feb 2, 2018 at 10:21 AM Reuven Lax <re...@google.com> wrote:

> However this code might run in streaming as well, right?
>
> On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi <rang...@google.com> wrote:
>
>> In a batch pipeline, is it considered a data loss if the the stage fails
>> (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it
>> might be better to favor correctness and fail in current implementation.
>>
>>
>> On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> You could add a step to delete all of dest before a barrier and the
>>> step that does the rename as outlined. In that case, any dest file
>>> that exists must be good.
>>>
>>> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>> > I think this is still unsafe in case exists(dst) (e.g. this is a
>>> re-run of a
>>> > pipeline) but src is missing due to some bad reason. However it's
>>> probably
>>> > better than what we have (e.g. we currently certainly don't perform
>>> checksum
>>> > checks).
>>> >
>>> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri <eh...@google.com> wrote:
>>> >>
>>> >> For GCS, I would do what I believe we already do.
>>> >> rename(src, dst):
>>> >> - if !exists(src) and exists(dst) return 0
>>> >> - if !exists(src) and !exists(dst) return error
>>> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst)
>>> >> return 0 else delete(dst) }
>>> >> - Start a GCS copy from src to dst.
>>> >> - Wait for GCS copy to complete.
>>> >> - delete(src)
>>> >>
>>> >> For filesystems that don't have checksum() metadata, size() can be
>>> used
>>> >> instead.
>>> >>
>>> >> I've opened a bug to track this:
>>> >> https://issues.apache.org/jira/browse/BEAM-3600
>>> >>
>>> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov <kirpic...@google.com
>>> >
>>> >> wrote:
>>> >>>
>>> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore
>>> files
>>> >>> that are missing for more ominous reasons than just this being a
>>> non-first
>>> >>> attempt at renaming src to dst. E.g. if there was a bug in
>>> constructing the
>>> >>> filename to be renamed, or if we somehow messed up the order of
>>> rename vs
>>> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would
>>> lead to
>>> >>> silent data loss (likely caught by unit tests though - so this is
>>> not a
>>> >>> super serious issue).
>>> >>>
>>> >>> Basically I just can't think of a case when I was copying files and
>>> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm
>>> copying
>>> >>> doesn't exist" - the option exists only because we couldn't come up
>>> with
>>> >>> another way to implement idempotent rename on GCS.
>>> >>>
>>> >>> What's your idea of how a safe retryable GCS rename() could work?
>>> >>>
>>> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <eh...@google.com> wrote:
>>> >>>>
>>> >>>> Eugene, if I get this right, you're saying that
>>> IGNORE_MISSING_FILES is
>>> >>>> unsafe because it will skip (src, dst) pairs where neither exist?
>>> (it only
>>> >>>> looks if src exists)
>>> >>>>
>>> >>>> For GCS, we can construct a safe retryable rename() operation,
>>> assuming
>>> >>>> that copy() and delete() are atomic for a single file or pair of
>>> files.
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <rang...@google.com>
>>> wrote:
>>> >>>>>
>>> >>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov
>>> >>>>> <kirpic...@google.com> wrote:
>>> >>>>>>
>>> >>>>>> As far as I know, the current implementation of file sinks is the
>>> only
>>> >>>>>> reason why the flag IGNORE_MISSING for copying even exists -
>>> there's no
>>> >>>>>> other compelling reason to justify it. We implement "rename" as
>>> "copy, then
>>> >>>>>> delete" (in a single DoFn), so for idempodency of this operation
>>> we need to
>>> >>>>>> ignore the copying of a non-existent file.
>>> >>>>>>
>>> >>>>>> I think the right way to go would be to change the implementation
>>> of
>>> >>>>>> renaming to have a @RequiresStableInput (or reshuffle) in the
>>> middle, so
>>> >>>>>> it's made of 2 individually idempotent operations:
>>> >>>>>> 1) copy, which would fail if input is missing, and would overwrite
>>> >>>>>> output if it exists
>>> >>>>>> -- reshuffle --
>>> >>>>>> 2) delete, which would not fail if input is missing.
>>> >>>>>
>>> >>>>>
>>> >>>>> Something like this is needed only in streaming, right?
>>> >>>>>
>>> >>>>> Raghu.
>>> >>>>>
>>> >>>>>>
>>> >>>>>> That way first everything is copied (possibly via multiple
>>> attempts),
>>> >>>>>> and then old files are deleted (possibly via multiple attempts).
>>> >>>>>>
>>> >>>>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <eh...@google.com>
>>> wrote:
>>> >>>>>>>
>>> >>>>>>> I agree that overwriting is more in line with user expectations.
>>> >>>>>>> I believe that the sink should not ignore errors from the
>>> filesystem
>>> >>>>>>> layer. Instead, the FileSystem API should be more well defined.
>>> >>>>>>> Examples: rename() and copy() should overwrite existing files at
>>> the
>>> >>>>>>> destination, copy() should have an ignore_missing flag.
>>> >>>>>>>
>>> >>>>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <rang...@google.com
>>> >
>>> >>>>>>> wrote:
>>> >>>>>>>>
>>> >>>>>>>> Original mail mentions that output from second run of
>>> word_count is
>>> >>>>>>>> ignored. That does not seem as safe as ignoring error from a
>>> second attempt
>>> >>>>>>>> of a step. How do we know second run didn't run on different
>>> output?
>>> >>>>>>>> Overwriting seems more accurate than ignoring. Does handling
>>> this error at
>>> >>>>>>>> sink level distinguish between the two (another run vs second
>>> attempt)?
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <eh...@google.com>
>>> >>>>>>>> wrote:
>>> >>>>>>>>>
>>> >>>>>>>>> Yeah, another round of refactoring is due to move the rename
>>> via
>>> >>>>>>>>> copy+delete logic up to the file-based sink level.
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath
>>> >>>>>>>>> <chamik...@google.com> wrote:
>>> >>>>>>>>>>
>>> >>>>>>>>>> Good point. There's always the chance of step that performs
>>> final
>>> >>>>>>>>>> rename being retried. So we'll have to ignore this error at
>>> the sink level.
>>> >>>>>>>>>> We don't necessarily have to do this at the FileSystem level
>>> though. I think
>>> >>>>>>>>>> the proper behavior might be to raise an error for the rename
>>> at the
>>> >>>>>>>>>> FileSystem level if the destination already exists (or source
>>> doesn't exist)
>>> >>>>>>>>>> while ignoring that error (and possibly logging a warning) at
>>> the sink
>>> >>>>>>>>>> level.
>>> >>>>>>>>>>
>>> >>>>>>>>>> - Cham
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <re...@google.com>
>>> >>>>>>>>>> wrote:
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> I think the idea was to ignore "already exists" errors. The
>>> >>>>>>>>>>> reason being that any step in Beam can be executed multiple
>>> times, including
>>> >>>>>>>>>>> the rename step. If the rename step gets run twice, the
>>> second run should
>>> >>>>>>>>>>> succeed vacuously.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <eh...@google.com
>>> >
>>> >>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Hi,
>>> >>>>>>>>>>>> I've been working on HDFS code for the Python SDK and I've
>>> >>>>>>>>>>>> noticed some behaviors which are surprising. I wanted to
>>> know if these
>>> >>>>>>>>>>>> behaviors are known and intended.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> 1. When renaming files during finalize_write, rename errors
>>> are
>>> >>>>>>>>>>>> ignored. For example, if I run wordcount twice using HDFS
>>> code I get a
>>> >>>>>>>>>>>> warning the second time because the file already exists:
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> WARNING:root:Rename not successful:
>>> >>>>>>>>>>>>
>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>> >>>>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming
>>> >>>>>>>>>>>>
>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2
>>> >>>>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to
>>> rename
>>> >>>>>>>>>>>>
>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2'
>>> >>>>>>>>>>>> to '/counts2-00000-of-00001'.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> For GCS and local files there are no rename errors (in this
>>> >>>>>>>>>>>> case), since the rename operation silently overwrites
>>> existing destination
>>> >>>>>>>>>>>> files. However, blindly ignoring these errors might make
>>> the pipeline to
>>> >>>>>>>>>>>> report success even though output files are missing.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> 2. Output files (--ouput) overwrite existing files.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java
>>> SDK
>>> >>>>>>>>>>>> doesn't use Filesystem.Rename().
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Thanks,
>>> >>>>>>>>>>>> - Udi
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>
>>> >
>>>
>>
>>
>

Reply via email to