Hi Roland, Sorry for the delay -- I did not know how to use JIRA.
I have made you a BEAM contributor and have assigned BEAM-284 to you. Thanks! Dan On Sat, May 14, 2016 at 3:58 AM, Roland Harangozo <[email protected]> wrote: > Hi Dan, > > No problem ;) > > *"Would you be willing to review it when it's ready?" - Sure.* > *"I think the notion of making this a file-at-a-time operation is wrong -- > we* > *still want to preserve the ability to make a batch request for a network > file system like GCS or S3."* - Agree, I wanted to say (but did badly) to > combine the copy & remove into a single operation and execute it > concurrently rather than copy all the files concurrently and then remove > them. Later one (existing solution) rules out to take advantage of the > native move/rename operation. > > Thank you for the review, I find your comments really useful. > > *"Would you like to try implementing a FileOperationsFactory* > *<https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554 > <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554>> > for another > endpoint such as AWS S3?" - *New JIRA ticket has been created on this: > BEAM-284 <https://issues.apache.org/jira/browse/BEAM-284> Could you > please take a look at it and assign it to me if it is okay? > > Thanks, > Roland > > 2016-05-13 17:36 GMT+01:00 Dan Halperin <[email protected]>: > >> Hi Roland, >> >> I'm a big jerk, but apparently I forgot to assign this issue to myself >> when >> I created it to track my own work. Sorry for the inconvenience, but I am >> already testing a fairly complex solution here. Would you be willing to >> review it when it's ready? >> >> Comments on the design inline. >> >> On Fri, May 13, 2016 at 8:48 AM, Tyler Akidau <[email protected]> wrote: >> >> > +Daniel Halperin <[email protected]> >> >> > >> > >> > On Thu, May 12, 2016 at 10:20 AM Roland Harangozo <[email protected]> >> > wrote: >> > >> >> Hi All, >> >> >> >> I would like to fix this issue: >> >> https://issues.apache.org/jira/browse/BEAM-206 >> >> >> >> Could you please revise my design proposal? >> >> >> >> I would copy and optionally remove the temporary files one by one as an >> >> atomic operation rather then copying all of the temporary files and >> then >> >> removing them (if we need to remove). It has the following benefits: >> >> >> > >> I think the notion of making this a file-at-a-time operation is wrong -- >> we >> still want >> to preserve the ability to make a batch request for a network file system >> like GCS >> or S3. >> >> * If the move operation supported by the file system and the file >> retention >> >> is remove, we can use the native file move operation (or rename). >> Could be >> >> significantly faster than the copy and remove. >> >> >> > >> You're right that we should change the interface from copy & remove to >> rename (which can be internally implemented as copy & remove if a file >> storage >> requires it). This will admit faster implementations for systems that have >> an atomic rename operation (like file systems ;). >> >> >> > * By moving the remove operation close to the copy operation, the >> >> probability is lower to copy the file again because of any failure (if >> one >> >> file of two is moved but the other one failed, when we replay, it moves >> >> only the one that failed rather than starting from scratch) >> >> >> > >> I'm not sure this part follows inside the Beam model. There is no easy way >> to force >> each file to be in its own bundle, so we can't really do retries (at the >> model level) >> independently for each file. >> >> You can of course follow this model inside a bulk-rename step, but you'll >> have to carefully consider the semantics if some rename fails and the >> entire operation is retried. I'm confident that this could be made into a >> good design! >> >> I think it's not trivial to detect correctness here. If the move "source" >> does not exist, >> can you tell a successful move from an error? (Note that it's common for >> users >> to rerun a job without deleting the old output, so the "destination" may >> already exist. *sigh ;)*.) >> >> Regarding the concurrency, I would use an ExecutorService to run the >> >> aforementioned operation simultaneously. >> > >> > >> Seems right to me! >> >> >> > The first exception would stop >> >> (interrupt) all operation. >> >> >> > >> Per the above comments -- we need to design this step to idempotent (or as >> close as we can). >> Stopping at the first exception may be a good thing to do, as long as >> retrying or resuming >> will result in the correct output. >> >> >> > >> >> The level of the concurrency (number of threads) would be file system >> >> specific and configurable. I can imagine 10+ threads gives a good >> >> performance on GCS but gives bad performance on local file system. >> >> >> > >> This is true -- you will want to tune the implementation for each file >> storage. >> >> I have done many experiments in the past week about GCS in particular -- >> the >> conclusion here was to use batches of size 100 and about 32 concurrent >> threads >> for best performance and also robustness to failures. >> >> >> > >> >> Best regards, >> >> Roland Harangozo >> >> >> > >> Thanks so much for this email and design -- it's great. Let's keep >> discussing, and, would you be willing to review a pull request from me >> for the GCS part of this change? >> >> Would you like to try implementing a FileOperationsFactory >> < >> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554 >> > >> for >> another endpoint such as AWS S3? >> >> Thanks, >> Dan >> > >
