Hi Dan, No problem ;)
*"Would you be willing to review it when it's ready?" - Sure.* *"I think the notion of making this a file-at-a-time operation is wrong -- we* *still want to preserve the ability to make a batch request for a network file system like GCS or S3."* - Agree, I wanted to say (but did badly) to combine the copy & remove into a single operation and execute it concurrently rather than copy all the files concurrently and then remove them. Later one (existing solution) rules out to take advantage of the native move/rename operation. Thank you for the review, I find your comments really useful. *"Would you like to try implementing a FileOperationsFactory* *<https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554 <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554>> for another endpoint such as AWS S3?" - *New JIRA ticket has been created on this: BEAM-284 <https://issues.apache.org/jira/browse/BEAM-284> Could you please take a look at it and assign it to me if it is okay? Thanks, Roland 2016-05-13 17:36 GMT+01:00 Dan Halperin <[email protected]>: > Hi Roland, > > I'm a big jerk, but apparently I forgot to assign this issue to myself when > I created it to track my own work. Sorry for the inconvenience, but I am > already testing a fairly complex solution here. Would you be willing to > review it when it's ready? > > Comments on the design inline. > > On Fri, May 13, 2016 at 8:48 AM, Tyler Akidau <[email protected]> wrote: > > > +Daniel Halperin <[email protected]> > > > > > > On Thu, May 12, 2016 at 10:20 AM Roland Harangozo <[email protected]> > > wrote: > > > >> Hi All, > >> > >> I would like to fix this issue: > >> https://issues.apache.org/jira/browse/BEAM-206 > >> > >> Could you please revise my design proposal? > >> > >> I would copy and optionally remove the temporary files one by one as an > >> atomic operation rather then copying all of the temporary files and then > >> removing them (if we need to remove). It has the following benefits: > >> > > > I think the notion of making this a file-at-a-time operation is wrong -- we > still want > to preserve the ability to make a batch request for a network file system > like GCS > or S3. > > * If the move operation supported by the file system and the file retention > >> is remove, we can use the native file move operation (or rename). Could > be > >> significantly faster than the copy and remove. > >> > > > You're right that we should change the interface from copy & remove to > rename (which can be internally implemented as copy & remove if a file > storage > requires it). This will admit faster implementations for systems that have > an atomic rename operation (like file systems ;). > > > > * By moving the remove operation close to the copy operation, the > >> probability is lower to copy the file again because of any failure (if > one > >> file of two is moved but the other one failed, when we replay, it moves > >> only the one that failed rather than starting from scratch) > >> > > > I'm not sure this part follows inside the Beam model. There is no easy way > to force > each file to be in its own bundle, so we can't really do retries (at the > model level) > independently for each file. > > You can of course follow this model inside a bulk-rename step, but you'll > have to carefully consider the semantics if some rename fails and the > entire operation is retried. I'm confident that this could be made into a > good design! > > I think it's not trivial to detect correctness here. If the move "source" > does not exist, > can you tell a successful move from an error? (Note that it's common for > users > to rerun a job without deleting the old output, so the "destination" may > already exist. *sigh ;)*.) > > Regarding the concurrency, I would use an ExecutorService to run the > >> aforementioned operation simultaneously. > > > > > Seems right to me! > > > > The first exception would stop > >> (interrupt) all operation. > >> > > > Per the above comments -- we need to design this step to idempotent (or as > close as we can). > Stopping at the first exception may be a good thing to do, as long as > retrying or resuming > will result in the correct output. > > > > > >> The level of the concurrency (number of threads) would be file system > >> specific and configurable. I can imagine 10+ threads gives a good > >> performance on GCS but gives bad performance on local file system. > >> > > > This is true -- you will want to tune the implementation for each file > storage. > > I have done many experiments in the past week about GCS in particular -- > the > conclusion here was to use batches of size 100 and about 32 concurrent > threads > for best performance and also robustness to failures. > > > > > >> Best regards, > >> Roland Harangozo > >> > > > Thanks so much for this email and design -- it's great. Let's keep > discussing, and, would you be willing to review a pull request from me > for the GCS part of this change? > > Would you like to try implementing a FileOperationsFactory > < > https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554 > > > for > another endpoint such as AWS S3? > > Thanks, > Dan >
