@Eugene, we can make breaking changes. But if we really don't want to, we
can add it under a new name easily. That particular inheritance hierarchy
is not precious IMO.

On Thu, Oct 20, 2016, 14:03 Eugene Kirpichov <[email protected]>
wrote:

> @Cham - this addresses temporary files that were written by successful
> bundles, but not by failed bundles (and not the case when the entire
> pipeline fails), so it's not sufficient.
>
> @Dan - yes, there are situations when it's impossible to create a sibling.
> In that case, we'd need a fallback - either something the user needs to
> explicitly specify ("your path is such that we don't know where to place
> temporary files, please specify withTempLocation or something"), or I like
> Robert's option of using sibling but differently-named files in this case.
>
> @Kenn - yeah, a directory-based format would be great
> (/path/to/foo/xxxxx-of-yyyyy), but this would be a breaking change to the
> expected behavior.
>
> I actually really like the option of sibling-but-differently-named files
> (/path/to/temp-beam-foo-$uid) which would be a very non-invasive change to
> the current (/path/to/foo-temp-$uid) and indeed would not involve creating
> new directories or needing new IOChannelFactory APIs. It will still match a
> glob like /path/to/* though (which a user could conceivably specify in a
> situation like gs://my-logs-bucket/*), but it might be good enough.
>
>
> On Thu, Oct 20, 2016 at 10:14 AM Robert Bradshaw
> <[email protected]> wrote:
>
> > On Thu, Oct 20, 2016 at 9:58 AM, Kenneth Knowles <[email protected]
> >
> > wrote:
> > > I like the spirit of proposal #1 for addressing the critical
> duplication
> > > problem, though as Dan points out the logic to choose a related but
> > > collision-free name might be slightly more complex.
> > >
> > > It is a nice bonus that it addresses the less critical issues and
> > improves
> > > usability for manual inspections and interventions.
> > >
> > > The term "sibling" is being slightly misused here. I'd say #1 as
> proposed
> > > is a "sibling of the parent" while today's behavior is "sibling". I'd
> > say a
> > > root cause of multiple problems is that our sharded file format is "a
> > bunch
> > > of files next to each other" and the sibling is "other files in the
> same
> > > directory" so it takes some care, and explicit file name tracking
> instead
> > > of globbing, to work with it correctly.
> > >
> > >  AFAIK (corrections welcome) there is nothing special about
> > > Write.to("s3://bucket/file") meaning write to
> > > "s3://bucket/file-$shardnum-of-$totalshards". An alternative that seems
> > > superior is to write to "s3://bucket/file/$shardnum-of-$totalshards"
> with
> > > the convention that this prefix is fully owned by this file. Now the
> > prefix
> > > "s3://bucket/file/" _is_ the sharded file. It is conceptually simpler
> and
> > > more glob and UI friendly. (any non "-" character would work for GCS
> and
> > > S3, but the "/" convention is better, considering the broader world)
> > >
> > > And bringing it back to this thread, the "sibling" is no longer "more
> > files
> > > in the same directory" now "s3://bucket/file-temp-$uid" which is on the
> > > same filesystem with the same ACLs. It is also more UI friendly, easier
> > to
> > > clean up, and does more to explicitly indicate that this is really one
> > > sharded file. Perhaps there's a pitfall I am overlooking?
> >
> > Using directories rather than prefixes is a big change, and introduces
> > complications like dealing with hidden dot files (some placed
> > implicitly by the system or applications, and worrying about
> > executable bits rather than just the rw ones and possibly more
> > complicated permission inheritance).
> >
> > > Also since you mentioned local file support, FWIW the cleanup glob
> > "file-*"
> > > today breaks on Windows due to Java library vagaries, while "file/*"
> > would
> > > succeed.
> > > On Thu, Oct 20, 2016, 09:14 Dan Halperin <[email protected]>
> > > wrote:
> > >
> > > This thread is conflating many issues.
> > >
> > > * Putting temp files where they will not match the glob for the desired
> > > output files
> > > * Dealing with eventually-consistent filesystems (s3, GCS, ...)
> > > * Properly cleaning up all temp files
> > >
> > > They all need to get solved, but for now I think we only need to solve
> > the
> > > first one.
> > >
> > > Siblings fundamentally will not work. Consider the following
> > > perfectly-valid output path: s3://bucket/file-SSS-NNN.txt . A sibling
> > would
> > > be a new bucket, so not guaranteed to exist.
> > >
> > > On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath <
> > [email protected]>
> > > wrote:
> > >
> > >> Can this be prevented by moving temporary files (copy + delete
> > >> individually) at finalization instead of copying all of them and
> > > performing
> > >> a bulk delete ? You can support task failures by ignoring renames when
> > the
> > >> destination exists. Python SDK currently does this (and puts temp
> files
> > in
> > >> a sub-directory).
> > >>
> > >> Thanks,
> > >> Cham
> > >>
> > >> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
> > >> <[email protected]> wrote:
> > >>
> > >> Hello,
> > >>
> > >> This is a continuation of the discussion on PR
> > >> https://github.com/apache/incubator-beam/pull/1050 which turned out
> > more
> > >> complex than expected.
> > >>
> > >> Short summary:
> > >> Currently FileBasedSink, when writing to /path/to/foo (in practice,
> > >> /path/to/foo-xxxxx-of-yyyyy where yyyyy is the total number of output
> > >> files), puts temporary files into /path/to/foo-temp-$uid, and when
> > >> finalizing the sink, it removes the temporary files by matching the
> > > pattern
> > >> /path/to/foo-temp-* and removing everything that matches.
> > >>
> > >> There are a couple of issues with this:
> > >> - FileBasedSink uses IOChannelFactory, which currently supports local
> > >> filesystems and Google Cloud Storage (GCS). GCS's match() operation is
> > >> currently eventually consistent. So, it may fail to return some of the
> > >> files, so we won't remove them.
> > >> - If the Beam job is cancelled or fails midway, then the temp files
> > won't
> > >> be deleted at all (that's subject to a separate discussion on cleanup
> > API
> > > -
> > >> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about
> > this
> > >> and was going to file one).
> > >> - If a follow-up data processing job is reading /path/to/foo, then the
> > way
> > >> temp files are named, they will likely match the same glob pattern
> (e.g.
> > >> "/path/to/foo*") as the one intending to match the final output in
> > >> /path/to/foo, so if some temp files are leftover, the follow-up job
> will
> > >> effectively read duplicate records (some from /path/to/foo, some from
> > >> /path/to/foo-temp-$blah).
> > >>
> > >> I think, in the absence of a way to guarantee that all temp files will
> > be
> > >> deleted (I think it'd be very difficult or impossible to provide a
> hard
> > >> guarantee of this, considering various possible failure conditions
> such
> > as
> > >> zombie workers), the cleanest way to solve this is put temp files in a
> > >> location that's unlikely to match the same glob pattern as one that
> > > matches
> > >> the final output.
> > >>
> > >> Some options for what that could be:
> > >> 1. A subdirectory that is a sibling of the final path, sufficiently
> > > unique,
> > >> and unlikely to match the same glob -
> > >> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
> > >> currently takes)
> > >> 2. A subdirectory under PipelineOptions.tempLocation - this might be
> > > flawed
> > >> because PipelineOptions.tempLocation might be on a different
> filesystem,
> > > or
> > >> have different ACLs, than the output of the FileBasedSink.
> > >> 3. A subdirectory that the user *must* explicitly provide on their
> > >> FileBasedSink. This is a reduction in usability, but there may be
> cases
> > >> when this is necessary - e.g. if the final location of the
> FileBasedSink
> > > is
> > >> such that we can't create siblings to it (e.g. the root path in a GCS
> > >> bucket - gs://some-bucket/)
> > >> 4. A subdirectory generated by a new IOChannelFactory call ("give me a
> > > temp
> > >> directory for the given final path") which would do one of the above -
> > >> reasonable, and simplifies FileBasedSink, but we still need to choose
> > > which
> > >> of #1-#3 this call should do.
> > >>
> > >> There might be other things I missed. There might be radical
> > > restructurings
> > >> of FileBasedSink that work around this problem entirely, though I
> > couldn't
> > >> think of any.
> > >>
> > >> In general, the requirements on the solution are:
> > >> - It should be very unlikely that somebody reads the temp files in the
> > > same
> > >> glob pattern as the final output by mistake.
> > >> - It should continue to make sense as IOChannelFactory is extended
> with
> > >> support for other filesystems.
> > >> - It should ideally use the same filesystem as the final output, or
> > > perhaps
> > >> even a location logically "close" to the final output, so that it
> could
> > >> potentially take advantage of that filesystem's efficient bulk-copy or
> > >> bulk-rename operations if available.
> > >> - It should be easy to manually clean up the temp files if something
> > went
> > >> wrong and they weren't cleaned up by the Beam job.
> > >>
> > >> I'm personally in favor of #1 with fallback to #2 or #3, because I
> > think a
> > >> sibling directory achieves all of these requirements unless a sibling
> > >> directory can't be created.
> > >>
> > >> Thoughts?
> > >>
> >
>

Reply via email to