Another option would be to just use /path/to/temp-foo-$uid to avoid matching /path/to/foo-* (hoping of course the temp- or whatever prefix doesn't match anything).
I see #2 causing all sorts of issues, and #3 would be a significant reduction in usability. I would lean towards doing /path/to/temp-beam-foo-$uid/$another_uid when possible, and /path/to/temp-beam-foo-$uid-$another_uid otherwise (note the dash instead of the slash). The logic of determining "when possible" seems like it belongs in IOChannelFactory not FileBasedSource. On Thu, Oct 20, 2016 at 9:21 AM, Lukasz Cwik <[email protected]> wrote: > The issue manifests when a completely different pipeline uses the output of > the last pipeline as input to the new pipeline and then these temporary > files are matched in the glob expression. > > This happens because FileBasedSource is responsible for creating the > temporary paths which occurs while processing a bundle. If that bundle > processing fails, there is no way to guarantee for the runner to even know > that it existed in our current execution model. > > I think there are other potential solutions which require support from the > runner that aren't being considered since this would all fall under a > general cleanup API which Eugene referred to. The question for now is the > solution good enough? > > I'm in favor of #1 as well. > > I'm against #4 since FileBasedSource could do a pretty good job for all > filesystems and once there is support for cleanup, FileBasedSource could > migrate to use it without any changes to the various IOChannelFactory's. > This prevents us from getting to the place where Hadoop filesystem > implementation has many many methods. > > > On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath <[email protected]> > wrote: > >> Can this be prevented by moving temporary files (copy + delete >> individually) at finalization instead of copying all of them and performing >> a bulk delete ? You can support task failures by ignoring renames when the >> destination exists. Python SDK currently does this (and puts temp files in >> a sub-directory). >> >> Thanks, >> Cham >> >> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov >> <[email protected]> wrote: >> >> Hello, >> >> This is a continuation of the discussion on PR >> https://github.com/apache/incubator-beam/pull/1050 which turned out more >> complex than expected. >> >> Short summary: >> Currently FileBasedSink, when writing to /path/to/foo (in practice, >> /path/to/foo-xxxxx-of-yyyyy where yyyyy is the total number of output >> files), puts temporary files into /path/to/foo-temp-$uid, and when >> finalizing the sink, it removes the temporary files by matching the pattern >> /path/to/foo-temp-* and removing everything that matches. >> >> There are a couple of issues with this: >> - FileBasedSink uses IOChannelFactory, which currently supports local >> filesystems and Google Cloud Storage (GCS). GCS's match() operation is >> currently eventually consistent. So, it may fail to return some of the >> files, so we won't remove them. >> - If the Beam job is cancelled or fails midway, then the temp files won't >> be deleted at all (that's subject to a separate discussion on cleanup API - >> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this >> and was going to file one). >> - If a follow-up data processing job is reading /path/to/foo, then the way >> temp files are named, they will likely match the same glob pattern (e.g. >> "/path/to/foo*") as the one intending to match the final output in >> /path/to/foo, so if some temp files are leftover, the follow-up job will >> effectively read duplicate records (some from /path/to/foo, some from >> /path/to/foo-temp-$blah). >> >> I think, in the absence of a way to guarantee that all temp files will be >> deleted (I think it'd be very difficult or impossible to provide a hard >> guarantee of this, considering various possible failure conditions such as >> zombie workers), the cleanest way to solve this is put temp files in a >> location that's unlikely to match the same glob pattern as one that matches >> the final output. >> >> Some options for what that could be: >> 1. A subdirectory that is a sibling of the final path, sufficiently unique, >> and unlikely to match the same glob - >> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR >> currently takes) >> 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed >> because PipelineOptions.tempLocation might be on a different filesystem, or >> have different ACLs, than the output of the FileBasedSink. >> 3. A subdirectory that the user *must* explicitly provide on their >> FileBasedSink. This is a reduction in usability, but there may be cases >> when this is necessary - e.g. if the final location of the FileBasedSink is >> such that we can't create siblings to it (e.g. the root path in a GCS >> bucket - gs://some-bucket/) >> 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp >> directory for the given final path") which would do one of the above - >> reasonable, and simplifies FileBasedSink, but we still need to choose which >> of #1-#3 this call should do. >> >> There might be other things I missed. There might be radical restructurings >> of FileBasedSink that work around this problem entirely, though I couldn't >> think of any. >> >> In general, the requirements on the solution are: >> - It should be very unlikely that somebody reads the temp files in the same >> glob pattern as the final output by mistake. >> - It should continue to make sense as IOChannelFactory is extended with >> support for other filesystems. >> - It should ideally use the same filesystem as the final output, or perhaps >> even a location logically "close" to the final output, so that it could >> potentially take advantage of that filesystem's efficient bulk-copy or >> bulk-rename operations if available. >> - It should be easy to manually clean up the temp files if something went >> wrong and they weren't cleaned up by the Beam job. >> >> I'm personally in favor of #1 with fallback to #2 or #3, because I think a >> sibling directory achieves all of these requirements unless a sibling >> directory can't be created. >> >> Thoughts? >>
