Another option would be to just use /path/to/temp-foo-$uid to avoid
matching /path/to/foo-* (hoping of course the temp- or whatever prefix
doesn't match anything).

I see #2 causing all sorts of issues, and #3 would be a significant
reduction in usability. I would lean towards doing
/path/to/temp-beam-foo-$uid/$another_uid when possible, and
/path/to/temp-beam-foo-$uid-$another_uid otherwise (note the dash
instead of the slash). The logic of determining "when possible" seems
like it belongs in IOChannelFactory not FileBasedSource.


On Thu, Oct 20, 2016 at 9:21 AM, Lukasz Cwik <[email protected]> wrote:
> The issue manifests when a completely different pipeline uses the output of
> the last pipeline as input to the new pipeline and then these temporary
> files are matched in the glob expression.
>
> This happens because FileBasedSource is responsible for creating the
> temporary paths which occurs while processing a bundle. If that bundle
> processing fails, there is no way to guarantee for the runner to even know
> that it existed in our current execution model.
>
> I think there are other potential solutions which require support from the
> runner that aren't being considered since this would all fall under a
> general cleanup API which Eugene referred to. The question for now is the
> solution good enough?
>
> I'm in favor of #1 as well.
>
> I'm against #4 since FileBasedSource could do a pretty good job for all
> filesystems and once there is support for cleanup, FileBasedSource could
> migrate to use it without any changes to the various IOChannelFactory's.
> This prevents us from getting to the place where Hadoop filesystem
> implementation has many many methods.
>
>
> On Thu, Oct 20, 2016 at 1:57 AM, Chamikara Jayalath <[email protected]>
> wrote:
>
>> Can this be prevented by moving temporary files (copy + delete
>> individually) at finalization instead of copying all of them and performing
>> a bulk delete ? You can support task failures by ignoring renames when the
>> destination exists. Python SDK currently does this (and puts temp files in
>> a sub-directory).
>>
>> Thanks,
>> Cham
>>
>> On Wed, Oct 19, 2016 at 6:25 PM Eugene Kirpichov
>> <[email protected]> wrote:
>>
>> Hello,
>>
>> This is a continuation of the discussion on PR
>> https://github.com/apache/incubator-beam/pull/1050 which turned out more
>> complex than expected.
>>
>> Short summary:
>> Currently FileBasedSink, when writing to /path/to/foo (in practice,
>> /path/to/foo-xxxxx-of-yyyyy where yyyyy is the total number of output
>> files), puts temporary files into /path/to/foo-temp-$uid, and when
>> finalizing the sink, it removes the temporary files by matching the pattern
>> /path/to/foo-temp-* and removing everything that matches.
>>
>> There are a couple of issues with this:
>> - FileBasedSink uses IOChannelFactory, which currently supports local
>> filesystems and Google Cloud Storage (GCS). GCS's match() operation is
>> currently eventually consistent. So, it may fail to return some of the
>> files, so we won't remove them.
>> - If the Beam job is cancelled or fails midway, then the temp files won't
>> be deleted at all (that's subject to a separate discussion on cleanup API -
>> AFAIK there's no JIRA for it yet, I believe peihe@ was thinking about this
>> and was going to file one).
>> - If a follow-up data processing job is reading /path/to/foo, then the way
>> temp files are named, they will likely match the same glob pattern (e.g.
>> "/path/to/foo*") as the one intending to match the final output in
>> /path/to/foo, so if some temp files are leftover, the follow-up job will
>> effectively read duplicate records (some from /path/to/foo, some from
>> /path/to/foo-temp-$blah).
>>
>> I think, in the absence of a way to guarantee that all temp files will be
>> deleted (I think it'd be very difficult or impossible to provide a hard
>> guarantee of this, considering various possible failure conditions such as
>> zombie workers), the cleanest way to solve this is put temp files in a
>> location that's unlikely to match the same glob pattern as one that matches
>> the final output.
>>
>> Some options for what that could be:
>> 1. A subdirectory that is a sibling of the final path, sufficiently unique,
>> and unlikely to match the same glob -
>> /path/to/temp-beam-foo-$uid/$another_uid (that's the approach the PR
>> currently takes)
>> 2. A subdirectory under PipelineOptions.tempLocation - this might be flawed
>> because PipelineOptions.tempLocation might be on a different filesystem, or
>> have different ACLs, than the output of the FileBasedSink.
>> 3. A subdirectory that the user *must* explicitly provide on their
>> FileBasedSink. This is a reduction in usability, but there may be cases
>> when this is necessary - e.g. if the final location of the FileBasedSink is
>> such that we can't create siblings to it (e.g. the root path in a GCS
>> bucket - gs://some-bucket/)
>> 4. A subdirectory generated by a new IOChannelFactory call ("give me a temp
>> directory for the given final path") which would do one of the above -
>> reasonable, and simplifies FileBasedSink, but we still need to choose which
>> of #1-#3 this call should do.
>>
>> There might be other things I missed. There might be radical restructurings
>> of FileBasedSink that work around this problem entirely, though I couldn't
>> think of any.
>>
>> In general, the requirements on the solution are:
>> - It should be very unlikely that somebody reads the temp files in the same
>> glob pattern as the final output by mistake.
>> - It should continue to make sense as IOChannelFactory is extended with
>> support for other filesystems.
>> - It should ideally use the same filesystem as the final output, or perhaps
>> even a location logically "close" to the final output, so that it could
>> potentially take advantage of that filesystem's efficient bulk-copy or
>> bulk-rename operations if available.
>> - It should be easy to manually clean up the temp files if something went
>> wrong and they weren't cleaned up by the Beam job.
>>
>> I'm personally in favor of #1 with fallback to #2 or #3, because I think a
>> sibling directory achieves all of these requirements unless a sibling
>> directory can't be created.
>>
>> Thoughts?
>>

Reply via email to