Currently, Python file-based sink is batch only. Regarding Raghu's question, stage/pipeline failure should not be considered as a data loss but I prefer overriding existing output and completing a possibly expensive pipeline over failing the whole pipeline due to one or more existing files.
- Cham On Fri, Feb 2, 2018 at 10:21 AM Reuven Lax <re...@google.com> wrote: > However this code might run in streaming as well, right? > > On Fri, Feb 2, 2018 at 9:54 AM, Raghu Angadi <rang...@google.com> wrote: > >> In a batch pipeline, is it considered a data loss if the the stage fails >> (assuming it does not set IGNORE_MISSING_FILES and fails hard)? If not, it >> might be better to favor correctness and fail in current implementation. >> >> >> On Thu, Feb 1, 2018 at 4:07 PM, Robert Bradshaw <rober...@google.com> >> wrote: >> >>> You could add a step to delete all of dest before a barrier and the >>> step that does the rename as outlined. In that case, any dest file >>> that exists must be good. >>> >>> On Thu, Feb 1, 2018 at 2:52 PM, Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> > I think this is still unsafe in case exists(dst) (e.g. this is a >>> re-run of a >>> > pipeline) but src is missing due to some bad reason. However it's >>> probably >>> > better than what we have (e.g. we currently certainly don't perform >>> checksum >>> > checks). >>> > >>> > On Thu, Feb 1, 2018 at 2:45 PM Udi Meiri <eh...@google.com> wrote: >>> >> >>> >> For GCS, I would do what I believe we already do. >>> >> rename(src, dst): >>> >> - if !exists(src) and exists(dst) return 0 >>> >> - if !exists(src) and !exists(dst) return error >>> >> - if exists(src) and exists(dst) { if checksum(src) == checksum(dst) >>> >> return 0 else delete(dst) } >>> >> - Start a GCS copy from src to dst. >>> >> - Wait for GCS copy to complete. >>> >> - delete(src) >>> >> >>> >> For filesystems that don't have checksum() metadata, size() can be >>> used >>> >> instead. >>> >> >>> >> I've opened a bug to track this: >>> >> https://issues.apache.org/jira/browse/BEAM-3600 >>> >> >>> >> On Thu, Feb 1, 2018 at 2:25 PM Eugene Kirpichov <kirpic...@google.com >>> > >>> >> wrote: >>> >>> >>> >>> Yes, IGNORE_MISSING_FILES is unsafe because it will - well - ignore >>> files >>> >>> that are missing for more ominous reasons than just this being a >>> non-first >>> >>> attempt at renaming src to dst. E.g. if there was a bug in >>> constructing the >>> >>> filename to be renamed, or if we somehow messed up the order of >>> rename vs >>> >>> cleanup, etc. - these situations with IGNORE_MISSING_FILES would >>> lead to >>> >>> silent data loss (likely caught by unit tests though - so this is >>> not a >>> >>> super serious issue). >>> >>> >>> >>> Basically I just can't think of a case when I was copying files and >>> >>> thinking "oh man, I wish it didn't give an error if the stuff I'm >>> copying >>> >>> doesn't exist" - the option exists only because we couldn't come up >>> with >>> >>> another way to implement idempotent rename on GCS. >>> >>> >>> >>> What's your idea of how a safe retryable GCS rename() could work? >>> >>> >>> >>> On Wed, Jan 31, 2018 at 6:45 PM Udi Meiri <eh...@google.com> wrote: >>> >>>> >>> >>>> Eugene, if I get this right, you're saying that >>> IGNORE_MISSING_FILES is >>> >>>> unsafe because it will skip (src, dst) pairs where neither exist? >>> (it only >>> >>>> looks if src exists) >>> >>>> >>> >>>> For GCS, we can construct a safe retryable rename() operation, >>> assuming >>> >>>> that copy() and delete() are atomic for a single file or pair of >>> files. >>> >>>> >>> >>>> >>> >>>> >>> >>>> On Wed, Jan 31, 2018 at 4:00 PM Raghu Angadi <rang...@google.com> >>> wrote: >>> >>>>> >>> >>>>> On Wed, Jan 31, 2018 at 2:43 PM, Eugene Kirpichov >>> >>>>> <kirpic...@google.com> wrote: >>> >>>>>> >>> >>>>>> As far as I know, the current implementation of file sinks is the >>> only >>> >>>>>> reason why the flag IGNORE_MISSING for copying even exists - >>> there's no >>> >>>>>> other compelling reason to justify it. We implement "rename" as >>> "copy, then >>> >>>>>> delete" (in a single DoFn), so for idempodency of this operation >>> we need to >>> >>>>>> ignore the copying of a non-existent file. >>> >>>>>> >>> >>>>>> I think the right way to go would be to change the implementation >>> of >>> >>>>>> renaming to have a @RequiresStableInput (or reshuffle) in the >>> middle, so >>> >>>>>> it's made of 2 individually idempotent operations: >>> >>>>>> 1) copy, which would fail if input is missing, and would overwrite >>> >>>>>> output if it exists >>> >>>>>> -- reshuffle -- >>> >>>>>> 2) delete, which would not fail if input is missing. >>> >>>>> >>> >>>>> >>> >>>>> Something like this is needed only in streaming, right? >>> >>>>> >>> >>>>> Raghu. >>> >>>>> >>> >>>>>> >>> >>>>>> That way first everything is copied (possibly via multiple >>> attempts), >>> >>>>>> and then old files are deleted (possibly via multiple attempts). >>> >>>>>> >>> >>>>>> On Wed, Jan 31, 2018 at 2:26 PM Udi Meiri <eh...@google.com> >>> wrote: >>> >>>>>>> >>> >>>>>>> I agree that overwriting is more in line with user expectations. >>> >>>>>>> I believe that the sink should not ignore errors from the >>> filesystem >>> >>>>>>> layer. Instead, the FileSystem API should be more well defined. >>> >>>>>>> Examples: rename() and copy() should overwrite existing files at >>> the >>> >>>>>>> destination, copy() should have an ignore_missing flag. >>> >>>>>>> >>> >>>>>>> On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <rang...@google.com >>> > >>> >>>>>>> wrote: >>> >>>>>>>> >>> >>>>>>>> Original mail mentions that output from second run of >>> word_count is >>> >>>>>>>> ignored. That does not seem as safe as ignoring error from a >>> second attempt >>> >>>>>>>> of a step. How do we know second run didn't run on different >>> output? >>> >>>>>>>> Overwriting seems more accurate than ignoring. Does handling >>> this error at >>> >>>>>>>> sink level distinguish between the two (another run vs second >>> attempt)? >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <eh...@google.com> >>> >>>>>>>> wrote: >>> >>>>>>>>> >>> >>>>>>>>> Yeah, another round of refactoring is due to move the rename >>> via >>> >>>>>>>>> copy+delete logic up to the file-based sink level. >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath >>> >>>>>>>>> <chamik...@google.com> wrote: >>> >>>>>>>>>> >>> >>>>>>>>>> Good point. There's always the chance of step that performs >>> final >>> >>>>>>>>>> rename being retried. So we'll have to ignore this error at >>> the sink level. >>> >>>>>>>>>> We don't necessarily have to do this at the FileSystem level >>> though. I think >>> >>>>>>>>>> the proper behavior might be to raise an error for the rename >>> at the >>> >>>>>>>>>> FileSystem level if the destination already exists (or source >>> doesn't exist) >>> >>>>>>>>>> while ignoring that error (and possibly logging a warning) at >>> the sink >>> >>>>>>>>>> level. >>> >>>>>>>>>> >>> >>>>>>>>>> - Cham >>> >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <re...@google.com> >>> >>>>>>>>>> wrote: >>> >>>>>>>>>>> >>> >>>>>>>>>>> I think the idea was to ignore "already exists" errors. The >>> >>>>>>>>>>> reason being that any step in Beam can be executed multiple >>> times, including >>> >>>>>>>>>>> the rename step. If the rename step gets run twice, the >>> second run should >>> >>>>>>>>>>> succeed vacuously. >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <eh...@google.com >>> > >>> >>>>>>>>>>> wrote: >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Hi, >>> >>>>>>>>>>>> I've been working on HDFS code for the Python SDK and I've >>> >>>>>>>>>>>> noticed some behaviors which are surprising. I wanted to >>> know if these >>> >>>>>>>>>>>> behaviors are known and intended. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> 1. When renaming files during finalize_write, rename errors >>> are >>> >>>>>>>>>>>> ignored. For example, if I run wordcount twice using HDFS >>> code I get a >>> >>>>>>>>>>>> warning the second time because the file already exists: >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> WARNING:root:Rename not successful: >>> >>>>>>>>>>>> >>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >>> >>>>>>>>>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming >>> >>>>>>>>>>>> >>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >>> >>>>>>>>>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to >>> rename >>> >>>>>>>>>>>> >>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' >>> >>>>>>>>>>>> to '/counts2-00000-of-00001'. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> For GCS and local files there are no rename errors (in this >>> >>>>>>>>>>>> case), since the rename operation silently overwrites >>> existing destination >>> >>>>>>>>>>>> files. However, blindly ignoring these errors might make >>> the pipeline to >>> >>>>>>>>>>>> report success even though output files are missing. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> 2. Output files (--ouput) overwrite existing files. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java >>> SDK >>> >>>>>>>>>>>> doesn't use Filesystem.Rename(). >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Thanks, >>> >>>>>>>>>>>> - Udi >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>> >>> > >>> >> >> >