Hi Dan,

No problem ;)

*"Would you be willing to review it when it's ready?" - Sure.*
*"I think the notion of making this a file-at-a-time operation is wrong --
we*
*still want to preserve the ability to make a batch request for a network
file system like GCS or S3."* - Agree, I wanted to say (but did badly) to
combine the copy & remove into a single operation and execute it
concurrently rather than copy all the files concurrently and then remove
them. Later one (existing solution) rules out to take advantage of the
native move/rename operation.

Thank you for the review, I find your comments really useful.

*"Would you like to try implementing a FileOperationsFactory*
*<https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554
<https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554>>
for another
endpoint such as AWS S3?" - *New JIRA ticket has been created on this:
BEAM-284 <https://issues.apache.org/jira/browse/BEAM-284> Could you please
take a look at it and assign it to me if it is okay?

Thanks,
Roland

2016-05-13 17:36 GMT+01:00 Dan Halperin <[email protected]>:

> Hi Roland,
>
> I'm a big jerk, but apparently I forgot to assign this issue to myself when
> I created it to track my own work. Sorry for the inconvenience, but I am
> already testing a fairly complex solution here. Would you be willing to
> review it when it's ready?
>
> Comments on the design inline.
>
> On Fri, May 13, 2016 at 8:48 AM, Tyler Akidau <[email protected]> wrote:
>
> > +Daniel Halperin <[email protected]>
> >
> >
> > On Thu, May 12, 2016 at 10:20 AM Roland Harangozo <[email protected]>
> > wrote:
> >
> >> Hi All,
> >>
> >> I would like to fix this issue:
> >> https://issues.apache.org/jira/browse/BEAM-206
> >>
> >> Could you please revise my design proposal?
> >>
> >> I would copy and optionally remove the temporary files one by one as an
> >> atomic operation rather then copying all of the temporary files and then
> >> removing them (if we need to remove). It has the following benefits:
> >>
> >
> I think the notion of making this a file-at-a-time operation is wrong -- we
> still want
> to preserve the ability to make a batch request for a network file system
> like GCS
> or S3.
>
> * If the move operation supported by the file system and the file retention
> >> is remove, we can use the native file move operation (or rename). Could
> be
> >> significantly faster than the copy and remove.
> >>
> >
> You're right that we should change the interface from copy & remove to
> rename (which can be internally implemented as copy & remove if a file
> storage
> requires it). This will admit faster implementations for systems that have
> an atomic rename operation (like file systems ;).
>
>
> > * By moving the remove operation close to the copy operation, the
> >> probability is lower to copy the file again because of any failure (if
> one
> >> file of two is moved but the other one failed, when we replay, it moves
> >> only the one that failed rather than starting from scratch)
> >>
> >
> I'm not sure this part follows inside the Beam model. There is no easy way
> to force
> each file to be in its own bundle, so we can't really do retries (at the
> model level)
> independently for each file.
>
> You can of course follow this model inside a bulk-rename step, but you'll
> have to carefully consider the semantics if some rename fails and the
> entire operation is retried. I'm confident that this could be made into a
> good design!
>
> I think it's not trivial to detect correctness here. If the move "source"
> does not exist,
> can you tell a successful move from an error? (Note that it's common for
> users
> to rerun a job without deleting the old output, so the "destination" may
> already exist. *sigh ;)*.)
>
> Regarding the concurrency, I would use an ExecutorService to run the
> >> aforementioned operation simultaneously.
> >
> >
> Seems right to me!
>
>
> > The first exception would stop
> >> (interrupt) all operation.
> >>
> >
> Per the above comments -- we need to design this step to idempotent (or as
> close as we can).
> Stopping at the first exception may be a good thing to do, as long as
> retrying or resuming
> will result in the correct output.
>
>
> >
> >> The level of the concurrency (number of threads) would be file system
> >> specific and configurable. I can imagine 10+ threads gives a good
> >> performance on GCS but gives bad performance on local file system.
> >>
> >
> This is true -- you will want to tune the implementation for each file
> storage.
>
> I have done many experiments in the past week about GCS in particular --
> the
> conclusion here was to use batches of size 100 and about 32 concurrent
> threads
> for best performance and also robustness to failures.
>
>
> >
> >> Best regards,
> >> Roland Harangozo
> >>
> >
> Thanks so much for this email and design -- it's great. Let's keep
> discussing, and, would you be willing to review a pull request from me
> for the GCS part of this change?
>
> Would you like to try implementing a FileOperationsFactory
> <
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java#L554
> >
> for
> another endpoint such as AWS S3?
>
> Thanks,
> Dan
>

Reply via email to