Ok, I just ran all the tests with my change and all of them pass with a simple "return v;" instead of the serialize and deserialize. This is encouraging!

A question about implementation. The idea is to set this via an Pipeline option, but a problem is that most places which use CoderTypeSerializer don't pass down the pipeline options. I can go through each and try to pass it down, but are there any easier way? Some global variable or an earlier point where we could do this? Or simply just remove the constructor without pipeline options?

Best Regards,
Teodor Spæren

On Tue, Oct 27, 2020 at 02:35:10PM +0100, Teodor Spæren wrote:
@David, I don't know how the direct runner does the validation, so I'm not sure if we could replicate that to the flink runner without a perf penalty. Your point about writing tests I actually think is an argument for removing this as soon as possible, so the prototype doesn't blow up in production :P But I think a flag would be best.

@Jan, This will actually be part of my master thesis and so it would be my pleasure to perform more benchmarks and share the results. I have not had the time yet to test it out, but I agree with you about the performance impact being less for more realistic jobs. The benchmark I have used here is a worst case scenario. Would continuing with the synthetic sources, but changing the processing steps? So far all my tests have been with parallelism set to 1 and this is something I'm also going to explore more in the thesis.

But am I correct in thinking that it's worth creating a Jira issue over this and assigning it to myself? As I said in the original email, I think the change is simple enough so that I can implement it and I would like to try contributing to Beam.

(Not going to lie, it would also look good on my master thesis ;) )

Best regards,
Teodor Spæren


On Tue, Oct 27, 2020 at 01:53:11PM +0100, Jan Lukavský wrote:
Hi,

I tend to be +1 for the flag, but before that, we might want to have a deeper analysis of the performance impact. I believe the penalty will be (in percentage) much lower in cases of more practical jobs (e.g. having at least one shuffle).

@Teodor, would you be willing to provide us with some measurements of jobs doing something more practical, than simple stateless mappings? E.g. a few jobs doing 1, 2 and 3 shuffle phases to see what is the impact of these more complex scenarios on the performance penalty?

Cheers,

 Jan

On 10/27/20 1:24 PM, David Morávek wrote:
you made a really good argument ;) I'm inclined to an experimental opt-in flag that would enable this. It would be great if we could automatically check for violations - kind of a safety net, for mistakes in user code.

Just to note, direct runner enforcement may not cover all cases, as it only checks binary representation after serialization. Also there are programmers that don't write tests, especially during prototyping (not an argument for perf. penalty, but something to keep in mind).

Max, WDYT?





On Tue, Oct 27, 2020 at 12:44 PM Teodor Spæren <teodor_spae...@riseup.net <mailto:teodor_spae...@riseup.net>> wrote:

  Some more thoughts:

  As it says on the DirectRunner [1] page, the DirectRunner is meant to
  check that users don't rely on semantics that are not guaranteed
  by the
  Beam model.

  Programs that rely on the Flink runner deep cloning the inputs
  between
  each operator in the pipeline is relying on a semantic that is not
  guaranteed by the Beam model, and those pipelines would fail if
  ran on
  the DirectRunner.

  As I stated in the previous email, I have some example programs that
  return different outputs on the Flink runner and on the
  DirectRunner. I
  have not tested these programs on other runners, so I don't know what
  they would return. If they return different answers than the
  DirectRunner, I'm inclined to say that the DirectRunner should
  either be
  changed, or the runners be changed.

   From my very limited point of view, the Flink runner seems to be
  spending a lot of extra time implementing a semantic guarantee
  that the
  Beam model explicitly doesn't support.


  Best regards,
  Teodor Spæren

  [1]: https://beam.apache.org/documentation/runners/direct/

  On Tue, Oct 27, 2020 at 12:08:51PM +0100, Teodor Spæren wrote:
  >Hey David,
  >
  >I think I might have worded this poorly, because what I meant is
  that
  >from what I can see in [1], the BEAM model explicitly states that
  >PCollections should be treated as immutable. The direct runner also
  >tests for this. Do the other runners also protect the user from
  >misusing the system so? If not we have a situation where running the
  >same pipeline on two different runners will yield different
  answers. I
  >can show some examples that return different examples for the Flink
  >and the Direct Runner.
  >
  >I agree that a breaking existing pipelines is a no-no, but I do
  think
  >that we could simply gate this behind an option on the Flink runner.
  >
  >I also tried to search for this before, but did not find any mention
  >of it, can you link me to some discussions about this in the past?
  >
  >Thanks for reply :D
  >
  >Best regards,
  >Teodor Spæren
  >
  >[1]:
  https://beam.apache.org/documentation/programming-guide/#immutability
  >
  >
  >On Tue, Oct 27, 2020 at 11:49:45AM +0100, David Morávek wrote:
  >>Hi Teodor,
  >>
  >>Thanks for bringing this up. This is a known, long standing "issue".
  >>Unfortunately there are few things we need to consider:
  >>
  >>- As you correctly noted, the *Beam model doesn't enforce
  immutability* of
  >>input / output elements, so this is the price.
  >>- We* can not break *existing pipelines.
  >>- Flink Runner needs to provide the *same guarantees as the Beam
  model*.
  >>
  >>There are definitely some things we can do here, to make things
  faster:
  >>
  >>- We can try the similar approach as HadoopIO
  >>(HadoopInputFormatReader#isKnownImmutable), to check for known
  immutable
  >>types (KV, primitives, protobuf, other known internal immutable
  structures).
  >>-* If the type is immutable, we can safely reuse it.* This
  should cover
  >>most of the performance costs without breaking the guarantees
  Beam model
  >>provides.
  >>- We can enable registration of custom "immutable" types via
  pipeline
  >>options? (this may be an unnecessary knob, so this needs a further
  >>discussion)
  >>
  >>WDYT?
  >>
  >>D.
  >>
  >>
  >>On Mon, Oct 26, 2020 at 6:37 PM Teodor Spæren
  <teodor_spae...@riseup.net <mailto:teodor_spae...@riseup.net>>
  >>wrote:
  >>
  >>>Hey!
  >>>
  >>>I'm a student at the University of Oslo, and I'm writing a
  master thesis
  >>>about the possibility of using Beam to benchmark stream processing
  >>>systems. An important factor in this is the overhead associated
  with
  >>>using Beam over writing code for the runner directly. [1] found
  that
  >>>there was a large overhead associated with using Beam, but did not
  >>>investigate where this overhead came from. I've done benchmarks and
  >>>confirmed the findings there, where for simple chains of identity
  >>>operators, Beam is 43x times slower than the Flink equivalent.
  >>>
  >>>These are very simple pipelines, with custom sources that just
  output a
  >>>series of integers. By profiling I've found that most of the
  overhead
  >>>comes from serializing and deserializing. Specifically the way
  >>>TypeSerializer's, [2], is implemented in [3], where each object is
  >>>serialized and then deserialized between every operator.
  Looking into
  >>>the semantics of Beam, no operator should change the input, so
  we don't
  >>>need to do a copy here. The function in [3] could potentially
  be changed
  >>>to a single `return` statement.
  >>>
  >>>Doing this removes 80% of the overhead in my tests. This is a very
  >>>synthetic example, but it's a low hanging fruit and might give
  a speed
  >>>boost to many pipelines when run on the Flink runnner. I would
  like to
  >>>make this my first contribution to Beam, but as the guide [4]
  says, I
  >>>thought I'd ask here first to see if there a is a reason not to
  do this.
  >>>
  >>>Only objection I can see, is that it might break existing pipelines
  >>>which rely on the Flink runner saving them from not following the
  >>>immutability guarantee. I see this as a small loss as they are
  relying
  >>>on an implementation detail of the Flink runner.
  >>>
  >>>I hope I have explained this adequately and eagerly away any
  feedback :)
  >>>
  >>>Best regards,
  >>>Teodor Spæren
  >>>
  >>>[1]: https://arxiv.org/abs/1907.08302
  >>>[2]:
  
>>>https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
  >>>[3]:
  
>>>https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84
  >>>[4]: https://beam.apache.org/contribute/
  >>>

Reply via email to