I agree that overwriting is more in line with user expectations. I believe that the sink should not ignore errors from the filesystem layer. Instead, the FileSystem API should be more well defined. Examples: rename() and copy() should overwrite existing files at the destination, copy() should have an ignore_missing flag.
On Wed, Jan 31, 2018 at 1:49 PM Raghu Angadi <rang...@google.com> wrote: > Original mail mentions that output from second run of word_count is > ignored. That does not seem as safe as ignoring error from a second attempt > of a step. How do we know second run didn't run on different output? > Overwriting seems more accurate than ignoring. Does handling this error at > sink level distinguish between the two (another run vs second attempt)? > > > On Wed, Jan 31, 2018 at 12:32 PM, Udi Meiri <eh...@google.com> wrote: > >> Yeah, another round of refactoring is due to move the rename via >> copy+delete logic up to the file-based sink level. >> >> On Wed, Jan 31, 2018, 10:42 Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> Good point. There's always the chance of step that performs final rename >>> being retried. So we'll have to ignore this error at the sink level. We >>> don't necessarily have to do this at the FileSystem level though. I think >>> the proper behavior might be to raise an error for the rename at the >>> FileSystem level if the destination already exists (or source doesn't >>> exist) while ignoring that error (and possibly logging a warning) at the >>> sink level. >>> >>> - Cham >>> >>> >>> On Tue, Jan 30, 2018 at 6:47 PM Reuven Lax <re...@google.com> wrote: >>> >>>> I think the idea was to ignore "already exists" errors. The reason >>>> being that any step in Beam can be executed multiple times, including the >>>> rename step. If the rename step gets run twice, the second run should >>>> succeed vacuously. >>>> >>>> >>>> On Tue, Jan 30, 2018 at 6:19 PM, Udi Meiri <eh...@google.com> wrote: >>>> >>>>> Hi, >>>>> I've been working on HDFS code for the Python SDK and I've noticed >>>>> some behaviors which are surprising. I wanted to know if these behaviors >>>>> are known and intended. >>>>> >>>>> 1. When renaming files during finalize_write, rename errors are >>>>> ignored >>>>> <https://github.com/apache/beam/blob/3aa2bef87c93d2844dd7c8dbaf45db75ec607792/sdks/python/apache_beam/io/filebasedsink.py#L232>. >>>>> For example, if I run wordcount twice using HDFS code I get a warning the >>>>> second time because the file already exists: >>>>> >>>>> WARNING:root:Rename not successful: >>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >>>>> -> hdfs://counts2-00000-of-00001, libhdfs error in renaming >>>>> hdfs://beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2 >>>>> to hdfs://counts2-00000-of-00001 with exceptions Unable to rename >>>>> '/beam-temp-counts2-7cb0a78005f211e8b6a08851fb5da245/1059f870-d64f-4f63-b1de-e4bd20fcd70a.counts2' >>>>> to '/counts2-00000-of-00001'. >>>>> >>>>> For GCS and local files there are no rename errors (in this case), >>>>> since the rename operation silently overwrites existing destination files. >>>>> However, blindly ignoring these errors might make the pipeline to report >>>>> success even though output files are missing. >>>>> >>>>> 2. Output files (--ouput) overwrite existing files. >>>>> >>>>> 3. The Python SDK doesn't use Filesystems.copy(). The Java SDK doesn't >>>>> use Filesystem.Rename(). >>>>> >>>>> Thanks, >>>>> - Udi >>>>> >>>> >>>> >
smime.p7s
Description: S/MIME Cryptographic Signature