Stephan,

Fully supporting this FLIP. We originally encountered pretty big surprises 
observing the object copy behavior causing significant performance degradation 
for our massively parallel use case. 

In our case, (I think most appropriately SHOULD be the assumptions for all 
streaming use case), is to assume object immutability for all the records 
throughout processing pipeline, as a result, I don't see a need to distinguish 
different object reuse behaviors for different (chained) operators (or to the 
very extreme even the need to support COPY_PER_OPERATOR other than we probably 
have to support for backward compatibility). I am also a fan of failing fast if 
user asserts incorrect assumptions.

One feedback on the FLIP-21 itself, I am not very clear on the difference 
between DEFAULT and FULL_REUSE enumeration, aren't them exactly the same thing 
in new proposal? However, the model figures seem to indicate they are slightly 
different? Can you elaborate a bit more?

Z. 


On 2017-06-27 11:14 (-0700), Greg Hogan <c...@greghogan.com> wrote: 
> Hi Stephan,
> 
> Would this be an appropriate time to discuss allowing reuse to be a 
> per-operator configuration? Object reuse for chained operators has lead to 
> considerable surprise for some users of the DataSet API. This came up during 
> the rework of the object reuse documentation for the DataSet API. With 
> annotations a Function could mark whether input/iterator or output/collected 
> objects should be copied or reused.
> 
> My distant observation is that is is safer to locally assert reuse at the 
> operator level than to assume or guarantee the safety of object reuse across 
> an entire program. It could also be handy to mix operators receiving copyable 
> objects with operators not requiring copyable objects.
> 
> Greg
> 
> 
> > On Jun 27, 2017, at 1:21 PM, Stephan Ewen <se...@apache.org> wrote:
> > 
> > Hi all!
> > 
> > I would like to propose the following FLIP:
> > 
> > FLIP-21 - Improve object Copying/Reuse Mode for Streaming Runtime:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982
> > 
> > The FLIP is motivated by the fact that many users run into an unnecessary
> > kind of performance problem caused by an old design artifact.
> > 
> > The required change should be reasonably small, and would help many users
> > and Flink's general standing.
> > 
> > Happy to hear thoughts!
> > 
> > Stephan
> > 
> > ======================================
> > 
> > FLIP text is below. Pictures with illustrations are only in the Wiki, not
> > supported on the mailing list.
> > -------------------------------------------------------------------------------------------------
> > 
> > Motivation
> > 
> > The default behavior of the streaming runtime is to copy every element
> > between chained operators.
> > 
> > That operation was introduced for “safety” reasons, to avoid the number 
> > of
> > cases where users can create incorrect programs by reusing mutable objects
> > (a discouraged pattern, but possible). For example when using state
> > backends that keep the state as objects on heap, reusing mutable objects
> > can theoretically create cases where the same object is used in multiple
> > state mappings.
> > 
> > The effect is that many people that try Flink get much lower performance
> > than they could possibly get. From empirical evidence, almost all users
> > that I (Stephan) have been in touch with eventually run into this issue
> > eventually.
> > 
> > There are multiple observations about that design:
> > 
> > 
> >   -
> > 
> >   Object copies are extremely costly. While some simple copy virtually for
> >   free (types reliably detected as immutable are not copied at all), many
> >   real pipelines use types like Avro, Thrift, JSON, etc, which are very
> >   expensive to copy.
> > 
> > 
> > 
> >   -
> > 
> >   Keyed operations currently only occur after shuffles. The operations are
> >   hence the first in a pipeline and will never have a reused object anyways.
> >   That means for the most critical operation, this pre-caution is 
> > unnecessary.
> > 
> > 
> > 
> >   -
> > 
> >   The mode is inconsistent with the contract of the DataSet API, which
> >   does not copy at each step
> > 
> > 
> > 
> >   -
> > 
> >   To prevent these copies, users can select {{enableObjectReuse()}}, which
> >   is misleading, since it does not really reuse mutable objects, but avoids
> >   additional copies.
> > 
> > 
> > Proposal
> > 
> > Summary
> > 
> > I propose to change the default behavior of the DataStream runtime to be
> > the same as the DataSet runtime. That means that new objects are chosen on
> > every deserialization, and no copies are made as the objects are passed on
> > along the pipelines.
> > 
> > Details
> > 
> > I propose to drop the execution config flag {{objectReuse}} and instead
> > introduce an {{ObjectReuseMode}} enumeration with better control of what
> > should happen. There will be three different types:
> > 
> > 
> >   -
> > 
> >   DEFAULT
> >   -
> > 
> >      This is the default in the DataSet API
> >      -
> > 
> >      This will become the default in the DataStream API
> >      -
> > 
> >      This happens in the DataStream API when {{enableObjectReuse()}} is
> >      activated.
> > 
> > 
> > 
> >   -
> > 
> >   COPY_PER_OPERATOR
> >   -
> > 
> >      The current default in the DataStream API
> > 
> > 
> > 
> >   -
> > 
> >   FULL_REUSE
> >   -
> > 
> >      This happens in the DataSet API when {{enableObjectReuse()}} is
> >      chosen.
> > 
> > 
> > An illustration of the modes is as follows:
> > 
> > DEFAULT
> > 
> > 
> > See here:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2F1UOpVB2wSMhx8067IE9t2_mJG549IoOkDiAfIN_uXQZVUvAXCp-hQLY-mgoSWunwF-xciZuJ4pZpj1FX0ZPQrd-Fm1jWzgX3Hv7-SELUdPUvEN6XUPbLrwfA9YRl605bFKMYlf1r
> > 
> > COPY_PER_OPERATOR
> > 
> > 
> > See here:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh3.googleusercontent.com%2Fs5sBOktzaKrRw3v1-IQMgImYZfchQMVz2HiG3i050xCWNTKuQV6mmlv3QtR0TZ0SGPRSCyjI-sUAqfbJw4fGOxKqBuRX2f-iZGh0e7hBke7DzuApUNy1vaF2SgtQVH3XEXkRx8Ks
> > 
> > 
> > FULL_REUSE
> > 
> > 
> > See here:
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=71012982&preview=/https%3A%2F%2Flh5.googleusercontent.com%2FFdOzuuaioooEIOh7bo0gZ5dHZrlgEKiwtNjGE9DjR-fT20B0q7FGDAvAk5oh1h58WtNQktuFGinrV1q1Yq8H8ayCyyqFUq-gmAYYW91x4XZQNrjLc6eJ0cptzvN_r8cU_GVV7LNE
> > New or Changed Public Interfaces
> > 
> > Interfaces changed
> > 
> > The interface of the {{ExecutionConfig}} add the method
> > {{setObjectReuseMode(ObjectReuseMode)}}, and deprecates the methods
> > {{enableObjectReuse()}} and {{disableObjectReuse()}}.
> > 
> > 
> > Behavior changed
> > 
> > The default object passing behavior changes, meaning that it can affect the
> > correctness of prior DataStream programs that assume the original
> > “COPY_PER_OPERATOR” behavior.
> > 
> > Migration Plan and Compatibility
> > 
> > Interfaces
> > 
> > No interface migration path is needed, because the interfaces are not
> > broken, merely some methods get deprecated.
> > 
> > Behavior change
> > 
> > Variant 1:
> > 
> >   -
> > 
> >   Change the behavior, make it explicit on the release notes that we did
> >   that and what cases are affected.
> >   -
> > 
> >   This may actually be feasible, because the cases that are affected are
> >   quite pathological corner cases that only very bad implementations should
> >   encounter (see below)
> > 
> > 
> > Variant 2:
> > 
> >   -
> > 
> >   When users set the mode, always that mode is used.
> >   -
> > 
> >   When the mode is not explicitly set, we follow that strategy:
> >   -
> > 
> >      Change the CLI such that we know when users upgrade existing jobs
> >      (the savepoint to start from has a version prior to 1.4).
> >      -
> > 
> >      Use DEFAULT as the default for jobs that do not start from savepoint,
> >      or that start from savepoint >= 1.4
> >      -
> > 
> >      Use COPY_PER_OPERATOR as the default for upgraded jobs
> 
> 

Reply via email to