On 8/25/20 9:27 PM, Maximilian Michels wrote:

I agree that this probably solves the described issue in the most straightforward way, but special handling for global window feels weird, as there is really nothing special about global window wrt state cleanup.

Why is special handling for the global window weird? After all, it is a special case because the global window normally will only be cleaned up when the application terminates.

The inefficiency described happens if and only if the following two conditions are met:

 a) there are many timers per single window (as otherwise they will be negligible)

 b) there are many keys which actually contain no state (as otherwise the timer would be negligible wrt the state size)

It only happens to be the case that global window is the (by far, might be 98% cases) most common case that satisfies these two conditions, but there are other cases as well (e.g. long lasting fixed window). Discussed options 2) and 3) are systematic in the sense that option 2) cancels property a) and option 3) property b). Making use of correlation of global window with these two conditions to solve the issue is of course possible, but a little unsystematic and that's what feels 'weird'. :)


It doesn't change anything wrt migration. The timers that were already set remain and keep on contributing to the state size.

That's ok, regular timers for non-global windows need to remain set and should be persisted. They will be redistributed when scaling up and down.

I'm not sure that's a "problem", rather an inefficiency. But we could address it by deleting the timers where they are currently set, as mentioned previously.

I had imagined that we don't even set these timers for the global window. Thus, there is no need to clean them up.

-Max

On 25.08.20 09:43, Jan Lukavský wrote:
I agree that this probably solves the described issue in the most straightforward way, but special handling for global window feels weird, as there is really nothing special about global window wrt state cleanup. A solution that handles all windows equally would be semantically 'cleaner'. If I try to sum up:

  - option 3) seems best, provided that isEmpty() lookup is cheap for every state backend (e.g. that we do not hit disk multiple times), this option is the best for state size wrt timers in all windows

  - option 2) works well for key-aligned windows, also reduces state size in all windows

  - option "watermark timer" - solves issue, easily implemented, but doesn't improve situation for non-global windows

My conclusion would be - use watermark timer as hotfix, if we can prove that isEmpty() would be cheap, then use option 3) as final solution, otherwise use 2).

WDYT?

On 8/25/20 5:48 AM, Thomas Weise wrote:


On Mon, Aug 24, 2020 at 1:50 PM Maximilian Michels <m...@apache.org <mailto:m...@apache.org>> wrote:

    I'd suggest a modified option (2) which does not use a timer to
    perform
    the cleanup (as mentioned, this will cause problems with migrating
    state).


That's a great idea. It's essentially a mix of 1) and 2) for the global window only.

It doesn't change anything wrt migration. The timers that were already set remain and keep on contributing to the state size.

I'm not sure that's a "problem", rather an inefficiency. But we could address it by deleting the timers where they are currently set, as mentioned previously.


    Instead, whenever we receive a watermark which closes the global
    window,
    we enumerate all keys and cleanup the associated state.

    This is the cleanest and simplest option.

    -Max

    On 24.08.20 20:47, Thomas Weise wrote:
    >
    > On Mon, Aug 24, 2020 at 11:35 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>
    > <mailto:je...@seznam.cz <mailto:je...@seznam.cz>>> wrote:
    >
    >      > The most general solution would be 3), given it can be
    agnostic
    >     to window types and does not assume extra runner capabilities.
    >
    >     Agree, 2) is optimization to that. It might be questionable
    if this
    >     is premature optimization, but generally querying multiple
    states
    >     for each clear opeartion to any state might be prohibitive,
    mostly
    >     when the state would be stored in external database (in case of
    >     Flink that would be RocksDB).
    >
    > For the use case I'm looking at, we are using the heap state
    backend. I
    > have not checked the RocksDB, but would assume that incremental
    cost of
    > isEmpty() for other states under the same key is negligible?
    >
    >      > 3) wouldn't require any state migration.
    >
    >     Actually, it would, as we would (ideally) like to migrate users'
    >     pipelines that already contain timers for the end of global
    window,
    >     which might not expire ever.
    >
    > Good catch. This could potentially be addressed by upgrading the
    timer
    > in the per record path.
    >
    >     On 8/24/20 7:44 PM, Thomas Weise wrote:
    >>
    >>     On Fri, Aug 21, 2020 at 12:32 AM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>
    >>     <mailto:je...@seznam.cz <mailto:je...@seznam.cz>>> wrote:
    >>
    >>         If there are runners, that are unable to efficiently
    enumerate
    >>         keys in state, then there probably isn't a runner agnostic
    >>         solution to this. If we focus on Flink, we can provide
    >>         specific implementation of CleanupTimer, which might
    then do
    >>         anything from the mentioned options. I'd be +1 for
    option 2)
    >>         for key-aligned windows (all currently supported) and
    option
    >>         3) for unaligned windows in the future.
    >>
    >>     The most general solution would be 3), given it can be
    agnostic to
    >>     window types and does not assume extra runner capabilities. It     >>     would require to introspect all user states for a given key on
    >>     state.clear. That assumes as efficient implementation of
    >>     isEmpty(). If all states are empty (have been cleared), then we     >>     can remove the cleanup timer. And add it back on state.add. I'm
    >>     planning to give that a shot (for Flink/portable/streaming)
    to see
    >>     how it performs.
    >>
    >>         We should also consider how we migrate users from the
    current
    >>         state to any future implementation. In case of option 2) it     >>         should be possible to do this when the state is loaded from
    >>         savepoint, but I'm not 100% sure about that.
    >>
    >>     3) wouldn't require any state migration.
    >>
    >>         Jan
    >>
    >>         On 8/21/20 6:25 AM, Thomas Weise wrote:
    >>>         Thanks for the clarification.
    >>>
    >>>         Here are a few potential options to address the issue,
    based
    >>>         on the discussion so far:
    >>>
    >>>         1) Optionally skip cleanup timer for global window
    >>>         (user-controlled via pipeline option)
    >>>
    >>>         2) Instead of setting a cleanup timer for every key,
    handle
    >>>         all keys for a given window with a single timer. This
    would
    >>>         be runner specific and depend on if/how a given
    >>>         runner supports key enumeration. Flink's keyed state
    backend
    >>>         supports enumerating keys for a namespace (Beam
    window) and
    >>>         state tag. [1]
    >>>
    >>>         3) Set the cleanup timer only when there is actually state
    >>>         associated with a key. This could be accomplished by
    >>>         intercepting append and clear in BagUserStateHandler
    [2] and
    >>>         adding/removing the timer appropriately.
    >>>
    >>>         4) See if TTL support in the runner can is applicable, for
    >>>         Flink see [3]
    >>>
    >>>         [1]
    >>>
https://github.com/apache/flink/blob/release-1.10/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java#L76
    >>>
    >>>         [2]
    >>>
https://github.com/apache/beam/blob/release-2.23.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java#L315
    >>>
    >>>         [3]
    >>>
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
    >>>
    >>>
    >>>         On Thu, Aug 20, 2020 at 8:08 AM Reuven Lax
    <re...@google.com <mailto:re...@google.com>
    >>>         <mailto:re...@google.com <mailto:re...@google.com>>>
    wrote:
    >>>
    >>>             Also +1 to what Jan said. Streaming pipelines can
    process
    >>>             bounded PCollections on some paths, so the global
    window
    >>>             will terminate for those paths. This is also true
    for the
    >>>             direct runner  tetsts where PCollections pretend to be
    >>>             unbounded, but we then advance the watermark
    to +inf to
    >>>             terminate the pipeline.
    >>>
    >>>             On Thu, Aug 20, 2020 at 8:06 AM Reuven Lax
    >>>             <re...@google.com <mailto:re...@google.com>
    <mailto:re...@google.com <mailto:re...@google.com>>> wrote:
    >>>
    >>>                 It is not Dataflow specific, but I think
    Dataflow is
    >>>                 the only runner that currently implements
    >>>  Drain:https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit
    >>>
    >>>
    >>>                 When a pipeline is drained, all windows (including
    >>>                 global windows) end, and the windows processed
    (i.e.
    >>>                 as if they were fixed windows that terminated).
    >>>                 Currently the easiest way to ensure that is to
    rely
    >>>                 on the end-of-window timers for the global window
    >>>                 (alternatives are possible, like issuing a
    full-state
    >>>                 scan when a pipeline is drained, but that would be
    >>>                 quite a bit more complicated). This is not
    >>>                 specifically the GC timer, but rather the
    >>>                 end-of-window timer that is needed.
    >>>
    >>>                 I believe that right now we don't have a way of
    >>>                 deleting timers if there are no elements
    buffered for
    >>>                 a key (e.g. a key that received a few elements
    that
    >>>                 were processed in a trigger and then never
    received
    >>>                 any more elements). This might be part of the
    problem
    >>>                 - large numbers of empty keys with noop timers
    set.
    >>>                 It would be nice if there were a way to detect
    this
    >>>                 and at least remove the timers for those empty
    keys.
    >>>
    >>>                 Reuven
    >>>
    >>>                 On Wed, Aug 19, 2020 at 9:20 PM Thomas Weise
    >>>                 <t...@apache.org <mailto:t...@apache.org>
    <mailto:t...@apache.org <mailto:t...@apache.org>>> wrote:
    >>>
    >>>
    >>>
    >>>                     On Wed, Aug 19, 2020 at 9:49 AM Reuven Lax
    >>>                     <re...@google.com
    <mailto:re...@google.com> <mailto:re...@google.com
    <mailto:re...@google.com>>> wrote:
    >>>
    >>>                         Skipping the cleanup timer for the global
    >>>                         window will break any sort of drain
    >>>                         functionality, which relies on having
    those
    >>>                         timers there. It's also necessary for
    bounded
    >>>                         inputs, for the same reason.
    >>>
    >>>
    >>>                     Can you say a bit more about why this will
    break
    >>>                     drain functionality and bounded inputs? Is
    this
    >>>                     Dataflow specific? Is it because the state
    would
    >>>                     be reused by a subsequent instance of the
    pipeline?
    >>>
    >>>                     For Flink, the GC timers would be triggered by
    >>>                     the final watermark and that will be the
    end of
    >>>                     the streaming job. Launching the same pipeline
    >>>                     again will either be a cold start with no
    >>>                     previous state or a start from
    savepoint/checkpoint.
    >>>
    >>>                     It sounds like for Dataflow there may be a
    need
    >>>                     for the user to influence the behavior
    while for
    >>>                     Flink the GC timers in a global window are not
    >>>                     required.
    >>>
    >>>
    >>>
    >>>
    >>>
    >>>                     On Wed, Aug 19, 2020 at 10:31 AM Reuven Lax
    >>>                     <re...@google.com
    <mailto:re...@google.com> <mailto:re...@google.com
    <mailto:re...@google.com>>> wrote:
    >>>
    >>>
    >>>
    >>>                         On Wed, Aug 19, 2020 at 9:53 AM Steve
    Niemitz
    >>>  <sniem...@apache.org
    <mailto:sniem...@apache.org>
    >>>  <mailto:sniem...@apache.org
    <mailto:sniem...@apache.org>>> wrote:
    >>>
    >>>                             for what it's worth, dataflow has the     >>>                             same problem here as well.  We've also
    >>>                             worked around it by (optionally)
    >>>                             disabling the cleanup timer in global
    >>>                             windows.  But I agree, having
    drain then
    >>>                             be an unsafe operation is not great.
    >>>
    >>>
    >>>                         Dataflow does not require the timers
    to be in
    >>>                         memory though, so unless the numbers
    get very
    >>>                         large (to the point where you run out
    of disk
    >>>                         storage storing the timers), it will not
    >>>                         cause your pipelines to fail.
    >>>
    >>>
    >>>                             I think for batch it's less of an
    issue
    >>>                             since basically everything is in the
    >>>                             global window anyways, and batch
    >>>                             pipelines run for a fixed amount
    of time
    >>>                             on a fixed input source.  For
    streaming
    >>>                             pipelines, it's much easier to run
    into
    >>>                             this.
    >>>
    >>>
    >>>                             On Wed, Aug 19, 2020 at 12:50 PM
    Reuven
    >>>                             Lax <re...@google.com
    <mailto:re...@google.com>
    >>>  <mailto:re...@google.com
    <mailto:re...@google.com>>> wrote:
    >>>
    >>>  @OnWindowExpiration is a per-key
    >>>                                 callback.
    >>>
    >>>                                 On Wed, Aug 19, 2020 at 9:48
    AM Luke
    >>>                                 Cwik <lc...@google.com
    <mailto:lc...@google.com>
    >>>  <mailto:lc...@google.com
    <mailto:lc...@google.com>>> wrote:
    >>>
    >>>                                     With the addition
    >>>  of @OnWindowExpiration, a single
    >>>                                     timer across keys optimization
    >>>                                     would still make sense.
    >>>
    >>>                                     On Wed, Aug 19, 2020 at
    8:51 AM
    >>>                                     Thomas Weise
    <t...@apache.org <mailto:t...@apache.org>
    >>>  <mailto:t...@apache.org
    <mailto:t...@apache.org>>> wrote:
    >>>
    >>> https://issues.apache.org/jira/browse/BEAM-10760
    >>>
    >>>                                         I confirmed that
    skipping the
    >>>  cleanup timers resolves the
    >>>                                         state leak that we
    observe in
    >>>                                         the pipeline that uses a
    >>>  global window.
    >>>
    >>>                                         @Luke the GC is key
    >>>  partitioned and relies on
    >>>  StateInternals. That makes it
    >>>  impractical to have a single
    >>>                                         timer that performs
    cleanup
    >>>                                         for multiple keys, at
    least
    >>>                                         in a runner agnostic way.
    >>>
    >>>                                         I would like to take a
    look
    >>>                                         if there is a need to have
    >>>                                         the GC timer for a
    >>>  global window to start with.
    >>>                                         Since the pipeline
    >>>  terminates, the
    >>>  runner discards all state
    >>>  anyways - at least in the
    >>>                                         case of Flink.
    >>>
    >>>  Thomas
    >>>
    >>>                                         On Mon, Aug 17, 2020
    at 9:46
    >>>                                         AM Luke Cwik
    >>>  <lc...@google.com
    <mailto:lc...@google.com>
    >>>  <mailto:lc...@google.com <mailto:lc...@google.com>>> wrote:
    >>>
    >>>  For the cleanup timer.
    >>>
    >>>  On Mon, Aug 17,
    2020 at
    >>>  9:45 AM Luke Cwik
    >>>  <lc...@google.com <mailto:lc...@google.com>
    >>>  <mailto:lc...@google.com <mailto:lc...@google.com>>> wrote:
    >>>
    >>>  Replacing a timer for
    >>>  each key with just
    >>>  one timer for all
    >>>  keys would make sense
    >>>  for the global window.
    >>>
    >>>  On Sun, Aug 16, 2020
    >>>  at 5:54 PM Thomas
    >>>  Weise <t...@apache.org <mailto:t...@apache.org>
    >>>  <mailto:t...@apache.org <mailto:t...@apache.org>>>
    >>>  wrote:
    >>>
    >>>      Thanks Jan. We
    >>>      observe a similar
    >>>      issue with state
    >>>      size growth in
    >>>      global window
    >>>      (with the
    >>>      portable runner).
    >>>      We don't see this
    >>>      issue
    >>>      with non-global
    >>>      windows,
    >>>      there does not
    >>>      appear to be any
    >>>      residual. I will
    >>>      take a look at
    >>>      skipping the
    >>>      cleanup timers
    >>>      for global
    >>>      window and see if
    >>>      that resolves the
    >>>      issue. These
    >>>      timers lead to
    >>>      potentially
    >>>      unbounded state
    >>>      growth and don't
    >>>      really serve a
    >>>      purpose.
    >>>
    >>>      Thomas
    >>>
    >>>      On Sun, Aug 16,
    >>>      2020 at 1:16 AM
    >>>      Jan Lukavský
    >>>      <je...@seznam.cz <mailto:je...@seznam.cz>
    >>>      <mailto:je...@seznam.cz <mailto:je...@seznam.cz>>>
    >>>      wrote:
    >>>
    >>>          Hi Catlyn,
    >>>
    >>>          if you use
    >>>          global window
    >>>          to perform
    >>>          the
    >>>          deduplication, then
    >>>          it should be
    >>>          expected to
    >>>          have as many
    >>>          timers as
    >>>          there are
    >>>          unique keys +
    >>>          one timer for
    >>>          each key that
    >>>          arrived
    >>>          during the
    >>>          last 30
    >>>          minutes
    >>>          (because
    >>>          there is
    >>>          timer set to
    >>>          clear the
    >>>          state in the
    >>>          deduplication
    >>>          function).
    >>>          The reason
    >>>          for that is
    >>>          that Beam
    >>>          creates timer
    >>>          for window
    >>>          garbage
    >>>          collection
    >>>          time to clear
    >>>          state (see
    >>>          [1]). If it
    >>>          is global
    >>>          window, then
    >>>          each key will
    >>>          have
    >>>          associated
    >>>          timer forever
    >>>          (it might
    >>>          open question
    >>>          if it makes
    >>>          sense in this
    >>>          case, or if
    >>>          Beam can do
    >>>          any better).
    >>>
    >>>          As I wrote
    >>>          before, it
    >>>          would
    >>>          probably help
    >>>          to use two
    >>>          deduplications in
    >>>          two
    >>>          successive
    >>>          fixed windows
    >>>          of length 30
    >>>          minutes,
    >>>          shifted by 15
    >>>          minutes
    >>>          (FixedWindows.of(30
    >>>          minutes).withOffset(15
    >>>          minutes)), so
    >>>          that the two
    >>>          windows
    >>>          overlap and
    >>>          catch
    >>>          duplicates
    >>>          that would
    >>>          appear near
    >>>          boundary of
    >>>          the first window.
    >>>
    >>>          @Max, do you
    >>>          think it
    >>>          would be
    >>>          possible to
    >>>          schedule the
    >>>          cleanup timer
    >>>          only when
    >>>          there is
    >>>          actually data
    >>>          in state for
    >>>          given key?
    >>>          The timer
    >>>          would be
    >>>          cleared on
    >>>          call to
    >>>          `clear()`,
    >>>          but would
    >>>          have to be
    >>>          set on every
    >>>          write. Or
    >>>          would it make
    >>>          sense not to
    >>>          schedule the
    >>>          cleanup timer
    >>>          for global
    >>>          window at all?
    >>>
    >>>          Jan
    >>>
    >>>          [1]
    >>>
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L334
    >>>
    >>>          On 8/15/20
    >>>          5:47 PM,
    >>>          Catlyn Kong
    >>>          wrote:
    >>>>              Hi!
    >>>>
    >>>>              Thanks for
    >>>>              the
    >>>>              explanation!
    >>>>              The
    >>>>              screenshot
    >>>>              actually
    >>>>              shows all
    >>>>              the new
    >>>>              instances
    >>>>              between
    >>>>              marking the
    >>>>              heap and
    >>>>              taking a
    >>>>              heap dump,
    >>>>              so sorry if
    >>>>              that's a
    >>>>              little
    >>>>              confusing.
    >>>>              Here's what
    >>>>              the full
    >>>>              heap looks like:
    >>>>              Screen Shot
    >>>>              2020-08-15
    >>>>              at 8.31.42
    >>>>              AM.png
    >>>>              Our input
    >>>>              stream has
    >>>>              roughly 50
    >>>>              messages per
    >>>>              second and
    >>>>              the pipeline
    >>>>              has been
    >>>>              running for
    >>>>              about 24
    >>>>              hours. Even
    >>>>              assuming all
    >>>>              the messages
    >>>>              are unique,
    >>>>              5.5 million
    >>>>              timers is
    >>>>              still very
    >>>>              surprising.
    >>>>
    >>>>              We're
    >>>>              allocating
    >>>>              11G for
    >>>>              taskmanager JVM
    >>>>              heap, but it
    >>>>              eventually
    >>>>              gets filled
    >>>>              up (after
    >>>>              couple days)
    >>>>              and the
    >>>>              cluster ends
    >>>>              up in a bad
    >>>>              state.
    >>>>              Here's a
    >>>>              screenshot
    >>>>              of the heap
    >>>>              size over
    >>>>              the past 24h:
    >>>>              Screen Shot
    >>>>              2020-08-15
    >>>>              at 8.41.48
    >>>>              AM.png
    >>>>
    >>>>              Could it be
    >>>>              that the
    >>>>              timers never
    >>>>              got clear
    >>>>              out or maybe
    >>>>              the pipeline
    >>>>              is creating
    >>>>              more
    >>>>              timer instances
    >>>>              than expected?
    >>>>
    >>>>              On Sat, Aug
    >>>>              15, 2020 at
    >>>>              4:07 AM
    >>>>              Maximilian
    >>>>              Michels
    >>>>              <m...@apache.org <mailto:m...@apache.org>
    >>>>              <mailto:m...@apache.org <mailto:m...@apache.org>>>
    >>>>              wrote:
    >>>>
    >>>>                  Awesome!
    >>>>                  Thanks a
    >>>>                  lot for
    >>>>                  the
    >>>>                  memory
    >>>>                  profile.
    >>>>                  Couple
    >>>>                  remarks:
    >>>>
    >>>>                  a) I can
    >>>>                  see that
    >>>>                  there
    >>>>                  are
    >>>>                  about
    >>>>                  378k
    >>>>                  keys and
    >>>>                  each of
    >>>>                  them
    >>>>                  sets a
    >>>>                  timer.
    >>>>            ��                  b) Based
    >>>>                  on the
    >>>>                  settings
    >>>>                  for
    >>>>                  DeduplicatePerKey
    >>>>                  you
    >>>>                  posted,
    >>>>                  you will
    >>>>                  keep
    >>>>                  track of
    >>>>                  all keys
    >>>>                  of the
    >>>>                  last 30
    >>>>                  minutes.
    >>>>
    >>>>                  Unless
    >>>>                  you have
    >>>>                  much
    >>>>                  fewer
    >>>>                  keys,
    >>>>                  the
    >>>>                  behavior
    >>>>                  is to be
    >>>>                  expected. The
    >>>>
    >>>>                  memory
    >>>>                  sizes
    >>>>                  for the
    >>>>                  timer
    >>>>                  maps do
    >>>>                  not look
    >>>>                  particularly
    >>>>                  high
    >>>>                  (~12Mb).
    >>>>
    >>>>                  How much
    >>>>                  memory
    >>>>                  did you
    >>>>                  reserve
    >>>>                  for the
    >>>>                  task
    >>>>                  managers?*
    >>>>
    >>>>                  -Max
    >>>>
    >>>>                  *The
    >>>>                  image
    >>>>                  links
    >>>>                  give me
    >>>>                  a "504
    >>>>                  error".
    >>>>
    >>>>                  On
    >>>>                  14.08.20
    >>>>                  23:29,
    >>>>                  Catlyn
    >>>>                  Kong wrote:
    >>>>                  > Hi!
    >>>>                  >
    >>>>                  > We're
    >>>>                  indeed
    >>>>                  using
    >>>>                  the
    >>>>                  rocksdb
    >>>>                  state
    >>>>                  backend,
    >>>>                  so that
    >>>>                  might be
    >>>>                  part of
    >>>>                  > the
    >>>>                  reason.
    >>>>                  Due to
    >>>>                  some
    >>>>                  security
    >>>>                  concerns, we
    >>>>                  might
    >>>>                  not be
    >>>>                  able to
    >>>>                  >
    >>>>                  provide
    >>>>                  the full
    >>>>                  heap
    >>>>                  dump
    >>>>                  since we
    >>>>                  have
    >>>>                  some
    >>>>                  custom
    >>>>                  code
    >>>>                  path. But
    >>>>                  > here's
    >>>>                  a
    >>>>                  screenshot
    >>>>                  from
    >>>>                  JProfiler:
    >>>>                  > Screen
    >>>>                  Shot
    >>>>                  2020-08-14
    >>>>                  at
    >>>>                  9.10.07
    >>>>                  AM.png
    >>>>                  > Looks
    >>>>                  like
    >>>>                  TimerHeapInternalTimer
    >>>>                  (initiated
    >>>>                  in
    >>>>                  InternalTimerServiceImpl
    >>>>
    >>>>                  >
    >>>>  <https://github.com/apache/flink/blob/5125b1123dfcfff73b5070401dfccb162959080c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java#L46>)
    >>>>
    >>>>                  > isn't
    >>>>                  getting
    >>>>                  garbage
    >>>>                  collected?
    >>>>                  As David
    >>>>                  has
    >>>>                  mentioned the
    >>>>                  pipeline
    >>>>                  > uses
    >>>>                  DeduplicatePerKey
    >>>>
    >>>>                  >
    >>>>  <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey> in
    >>>>
    >>>>                  > Beam
    >>>>                  2.22,
    >>>>                  ProcessConnectionEventFn
    >>>>                  is a
    >>>>                  simple stateless
    >>>>                  DoFn
    >>>>                  that just
    >>>>                  > does
    >>>>                  some
    >>>>                  logging
    >>>>                  and
    >>>>                  emits
    >>>>                  the
    >>>>                  events.
    >>>>                  Is there
    >>>>                  any
    >>>>                  possibility
    >>>>                  that
    >>>>                  > the
    >>>>                  timer
    >>>>                  logic or
    >>>>                  the way
    >>>>                  it's
    >>>>                  used in
    >>>>                  the
    >>>>                  dedupe
    >>>>                  Pardo
    >>>>                  can
    >>>>                  cause this
    >>>>                  > leak?
    >>>>                  >
    >>>>                  > Thanks,
    >>>>                  > Catlyn
    >>>>                  >
    >>>>                  > On
    >>>>                  Tue, Aug
    >>>>                  11, 2020
    >>>>                  at 7:58
    >>>>                  AM
    >>>>                  Maximilian
    >>>>                  Michels
    >>>>                  <m...@apache.org <mailto:m...@apache.org>
    >>>>  <mailto:m...@apache.org <mailto:m...@apache.org>>
    >>>>
    >>>>                  >
    >>>>  <mailto:m...@apache.org <mailto:m...@apache.org>
    >>>>  <mailto:m...@apache.org
    <mailto:m...@apache.org>>>>
    >>>>                  wrote:
    >>>>                  >
    >>>>                  >     Hi!
    >>>>                  >
    >>>>                  >
    >>>>                   Looks
    >>>>                  like a
    >>>>                  potential leak,
    >>>>                  caused
    >>>>                  by your
    >>>>                  code or
    >>>>                  by Beam
    >>>>                  itself.
    >>>>                  >
    >>>>                   Would
    >>>>                  you be
    >>>>                  able to
    >>>>                  supply a
    >>>>                  heap
    >>>>                  dump
    >>>>                  from one
    >>>>                  of the
    >>>>                  task
    >>>>                  managers?
    >>>>                  >
    >>>>                   That
    >>>>                  would
    >>>>                  greatly
    >>>>                  help
    >>>>                  debugging this
    >>>>                  issue.
    >>>>                  >
    >>>>                  >     -Max
    >>>>                  >
    >>>>                  >     On
    >>>>                  07.08.20
    >>>>                  00:19,
    >>>>                  David
    >>>>                  Gogokhiya wrote:
    >>>>                  >      > Hi,
    >>>>                  >      >
    >>>>                  >      >
    >>>>                  We
    >>>>                  recently
    >>>>                  started
    >>>>                  using
    >>>>                  Apache
    >>>>                  Beam
    >>>>                  version
    >>>>                  2.20.0
    >>>>                  running on
    >>>>                  >     Flink
    >>>>                  >      >
    >>>>                  version
    >>>>                  1.9
    >>>>                  deployed
    >>>>                  on
    >>>>                  kubernetes
    >>>>                  to
    >>>>                  process
    >>>>                  unbounded streams
    >>>>                  >     of
    >>>>                  data.
    >>>>                  >      >
    >>>>                  However,
    >>>>                  we
    >>>>                  noticed
    >>>>                  that the
    >>>>                  memory
    >>>>                  consumed
    >>>>                  by
    >>>>                  stateful
    >>>>                  Beam is
    >>>>                  >      >
    >>>>                  steadily
    >>>>                  increasing
    >>>>                  over
    >>>>                  time
    >>>>                  with no
    >>>>                  drops no
    >>>>                  matter
    >>>>                  what the
    >>>>                  >  current
    >>>>                  >      >
    >>>>                  bandwidth is.
    >>>>                  We were
    >>>>                  wondering if
    >>>>                  this is
    >>>>                  expected
    >>>>                  and if
    >>>>                  not what
    >>>>                  >      >
    >>>>                  would be
    >>>>                  the best
    >>>>                  way to
    >>>>                  resolve it.
    >>>>                  >      >
    >>>>                  >      >
    >>>>                  > >
    >>>>                   More
    >>>>                  Context
    >>>>                  >      >
    >>>>                  >      >
    >>>>                  We have
    >>>>                  the
    >>>>                  following pipeline
    >>>>                  that
    >>>>                  consumes
    >>>>                  messages
    >>>>                  from the
    >>>>                  >  unbounded
    >>>>                  >      >
    >>>>                  stream
    >>>>                  of data.
    >>>>                  Later we
    >>>>                  deduplicate
    >>>>                  the
    >>>>                  messages
    >>>>                  based on
    >>>>                  unique
    >>>>                  >      >
    >>>>                  message
    >>>>                  id using
    >>>>                  the
    >>>>                  deduplicate
    >>>>                  function
    >>>>                  >      >
    >>>>                  >
    >>>>   <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>.
    >>>>                  >
    >>>>                  >      >
    >>>>                  Since we
    >>>>                  are
    >>>>                  using
    >>>>                  Beam
    >>>>                  version
    >>>>                  2.20.0,
    >>>>                  we
    >>>>                  copied
    >>>>                  the
    >>>>                  source code
    >>>>                  >     of the
    >>>>                  >      >
    >>>>                  deduplicate
    >>>>                  function
    >>>>                  >      >
    >>>>                  >
    >>>>   <https://beam.apache.org/releases/pydoc/2.22.0/_modules/apache_beam/transforms/deduplicate.html#DeduplicatePerKey>from
    >>>>                  >
    >>>>                  >      >
    >>>>                  version
    >>>>                  2.22.0.
    >>>>                  After
    >>>>                  that we
    >>>>                  unmap
    >>>>                  the
    >>>>                  tuple,
    >>>>                  retrieve the
    >>>>                  >  necessary
    >>>>                  >      >
    >>>>                  data
    >>>>                  from
    >>>>                  message
    >>>>                  payload
    >>>>                  and dump
    >>>>                  the
    >>>>                  corresponding
    >>>>                  data into
    >>>>                  >
    >>>>                   the log.
    >>>>                  >      >
    >>>>                  >      >
    >>>>                  >      >
    >>>>                  Pipeline:
    >>>>                  >      >
    >>>>                  >      >
    >>>>                  >      >
    >>>>                  Flink
    >>>>                  configuration:
    >>>>                  >      >
    >>>>                  >      >
    >>>>                  >      >
    >>>>                  As we
    >>>>                  mentioned before,
    >>>>                  we
    >>>>                  noticed
    >>>>                  that the
    >>>>                  memory
    >>>>                  usage of the
    >>>>                  >      >
    >>>>                  jobmanager
    >>>>                  and
    >>>>                  taskmanager
    >>>>                  pod are
    >>>>                  steadily
    >>>>                  increasing
    >>>>                  with no
    >>>>                  >
    >>>>                   drops no
    >>>>                  >      >
    >>>>                  matter
    >>>>                  what the
    >>>>                  current
    >>>>                  bandwidth is.
    >>>>                  We tried
    >>>>                  allocating
    >>>>                  more
    >>>>                  >  memory
    >>>>                  >      >
    >>>>                  but it
    >>>>                  seems
    >>>>                  like no
    >>>>                  matter
    >>>>                  how much
    >>>>                  memory
    >>>>                  we
    >>>>                  allocate it
    >>>>                  >
    >>>>                   eventually
    >>>>                  >      >
    >>>>                  reaches
    >>>>                  its
    >>>>                  limit
    >>>>                  and then
    >>>>                  it tries
    >>>>                  to
    >>>>                  restart
    >>>>                  itself.
    >>>>                  >      >
    >>>>                  >      >
    >>>>                  >      >
    >>>>                  Sincerely,
    >>>>                  David
    >>>>                  >      >
    >>>>                  >      >
    >>>>                  >
    >>>>

Reply via email to