[ 
https://issues.apache.org/jira/browse/BEAM-7204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jozef Vilcek updated BEAM-7204:
-------------------------------
    Description: 
More context can be found in discussion here:

[http://mail-archives.apache.org/mod_mbox/beam-dev/201904.mbox/%3CCAOUjMkyKV8npYJfS_PF3Gzo=vwomb2frzute81zsrxnm13t...@mail.gmail.com%3E]

I have found out on FlinkRunner streaming pipeline there is an overhead 
associated with processing each element at:
 # ReduceFnRunner.scheduleGarbageCollectionTimer() for window
 # tracking PaneInfo

This cause quite some trash for JVM GC. At least second option also involves 
interaction with state backend.

 

Relevant stacks for illustration:

ScheduleGarbageCollectionTimer
{noformat}
...
[ 3] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.deleteEventTimeTimer
[ 4] 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.deleteTimer
[ 5] 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.cancelPendingTimerById
[ 6] 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.setTimer
[ 7] org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer
[ 8] org.apache.beam.runners.core.ReduceFnRunner.scheduleGarbageCollectionTimer
[ 9] org.apache.beam.runners.core.ReduceFnRunner.processElement
[10] org.apache.beam.runners.core.ReduceFnRunner.processElements
[11] 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
[12] 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
[13] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
[14] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
[15] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
[16] 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
[17] 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
[18] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
[19] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
[20] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
[21] org.apache.flink.runtime.taskmanager.Task.run
[22] java.lang.Thread.run
{noformat}
PaneInfoTracker: Read
{noformat}
[ 0] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get
[ 1] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get
[ 2] org.apache.flink.runtime.state.heap.HeapValueState.value
[ 3] 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkValueState.read
[ 4] org.apache.beam.runners.core.PaneInfoTracker$1.read
[ 5] org.apache.beam.runners.core.PaneInfoTracker$1.read
[ 6] org.apache.beam.runners.core.ReduceFnRunner.onTrigger
[ 7] org.apache.beam.runners.core.ReduceFnRunner.emit
[ 8] org.apache.beam.runners.core.ReduceFnRunner.processElements
[ 9] 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
[10] 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
[11] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
[12] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
[13] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
[14] 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
[15] 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
[16] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
[17] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
[18] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
[19] org.apache.flink.runtime.taskmanager.Task.run
[20] java.lang.Thread.run
{noformat}
PaneInfoTracker: Write
{noformat}
[ 0] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.putEntry
[ 1] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.put
[ 2] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.put
[ 3] org.apache.flink.runtime.state.heap.HeapValueState.update
[ 4] 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkValueState.write
[ 5] org.apache.beam.runners.core.PaneInfoTracker.storeCurrentPaneInfo
[ 6] org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1
[ 7] org.apache.beam.runners.core.ReduceFnRunner$$Lambda$101.211931975.output
[ 8] 
org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output
[ 9] org.apache.beam.runners.core.SystemReduceFn.onTrigger
[10] org.apache.beam.runners.core.ReduceFnRunner.onTrigger
[11] org.apache.beam.runners.core.ReduceFnRunner.emit
[12] org.apache.beam.runners.core.ReduceFnRunner.processElements
[13] 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
[14] 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
[15] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
[16] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
[17] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
[18] 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
[19] 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
[20] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
[21] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
[22] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
[23] org.apache.flink.runtime.taskmanager.Task.run
[24] java.lang.Thread.run
{noformat}

  was:
More context can be found in discussion here:

[http://mail-archives.apache.org/mod_mbox/beam-dev/201904.mbox/%3CCAOUjMkyKV8npYJfS_PF3Gzo=vwomb2frzute81zsrxnm13t...@mail.gmail.com%3E]

I have found out on FlinkRunner streaming pipeline there is an overhead 
associated with processing each element at:
 # ReduceFnRunner.scheduleGarbageCollectionTimer() for window
 # tracking PaneInfo

This cause quite some trash for JVM GC. At least second option also involves 
interaction with state backend.

 

Relevant stacks for illustration:
{noformat}
...
[ 3] 
org.apache.flink.streaming.api.operators.HeapInternalTimerService.deleteEventTimeTimer
[ 4] 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.deleteTimer
[ 5] 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.cancelPendingTimerById
[ 6] 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.setTimer
[ 7] org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer
[ 8] org.apache.beam.runners.core.ReduceFnRunner.scheduleGarbageCollectionTimer
[ 9] org.apache.beam.runners.core.ReduceFnRunner.processElement
[10] org.apache.beam.runners.core.ReduceFnRunner.processElements
[11] 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
[12] 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
[13] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
[14] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
[15] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
[16] 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
[17] 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
[18] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
[19] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
[20] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
[21] org.apache.flink.runtime.taskmanager.Task.run
[22] java.lang.Thread.run
{noformat}
{noformat}
[ 0] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get
[ 1] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get
[ 2] org.apache.flink.runtime.state.heap.HeapValueState.value
[ 3] 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkValueState.read
[ 4] org.apache.beam.runners.core.PaneInfoTracker$1.read
[ 5] org.apache.beam.runners.core.PaneInfoTracker$1.read
[ 6] org.apache.beam.runners.core.ReduceFnRunner.onTrigger
[ 7] org.apache.beam.runners.core.ReduceFnRunner.emit
[ 8] org.apache.beam.runners.core.ReduceFnRunner.processElements
[ 9] 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
[10] 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
[11] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
[12] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
[13] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
[14] 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
[15] 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
[16] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
[17] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
[18] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
[19] org.apache.flink.runtime.taskmanager.Task.run
[20] java.lang.Thread.run
{noformat}


> ReduceFnRunner overhead
> -----------------------
>
>                 Key: BEAM-7204
>                 URL: https://issues.apache.org/jira/browse/BEAM-7204
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-flink, sdk-java-core
>            Reporter: Jozef Vilcek
>            Priority: Major
>              Labels: performance
>
> More context can be found in discussion here:
> [http://mail-archives.apache.org/mod_mbox/beam-dev/201904.mbox/%3CCAOUjMkyKV8npYJfS_PF3Gzo=vwomb2frzute81zsrxnm13t...@mail.gmail.com%3E]
> I have found out on FlinkRunner streaming pipeline there is an overhead 
> associated with processing each element at:
>  # ReduceFnRunner.scheduleGarbageCollectionTimer() for window
>  # tracking PaneInfo
> This cause quite some trash for JVM GC. At least second option also involves 
> interaction with state backend.
>  
> Relevant stacks for illustration:
> ScheduleGarbageCollectionTimer
> {noformat}
> ...
> [ 3] 
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.deleteEventTimeTimer
> [ 4] 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.deleteTimer
> [ 5] 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.cancelPendingTimerById
> [ 6] 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.setTimer
> [ 7] org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer
> [ 8] 
> org.apache.beam.runners.core.ReduceFnRunner.scheduleGarbageCollectionTimer
> [ 9] org.apache.beam.runners.core.ReduceFnRunner.processElement
> [10] org.apache.beam.runners.core.ReduceFnRunner.processElements
> [11] 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
> [12] 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
> [13] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
> [14] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
> [15] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
> [16] 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
> [17] 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
> [18] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
> [19] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
> [20] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
> [21] org.apache.flink.runtime.taskmanager.Task.run
> [22] java.lang.Thread.run
> {noformat}
> PaneInfoTracker: Read
> {noformat}
> [ 0] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get
> [ 1] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get
> [ 2] org.apache.flink.runtime.state.heap.HeapValueState.value
> [ 3] 
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkValueState.read
> [ 4] org.apache.beam.runners.core.PaneInfoTracker$1.read
> [ 5] org.apache.beam.runners.core.PaneInfoTracker$1.read
> [ 6] org.apache.beam.runners.core.ReduceFnRunner.onTrigger
> [ 7] org.apache.beam.runners.core.ReduceFnRunner.emit
> [ 8] org.apache.beam.runners.core.ReduceFnRunner.processElements
> [ 9] 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
> [10] 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
> [11] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
> [12] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
> [13] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
> [14] 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
> [15] 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
> [16] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
> [17] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
> [18] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
> [19] org.apache.flink.runtime.taskmanager.Task.run
> [20] java.lang.Thread.run
> {noformat}
> PaneInfoTracker: Write
> {noformat}
> [ 0] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.putEntry
> [ 1] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.put
> [ 2] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.put
> [ 3] org.apache.flink.runtime.state.heap.HeapValueState.update
> [ 4] 
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkValueState.write
> [ 5] org.apache.beam.runners.core.PaneInfoTracker.storeCurrentPaneInfo
> [ 6] org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1
> [ 7] org.apache.beam.runners.core.ReduceFnRunner$$Lambda$101.211931975.output
> [ 8] 
> org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output
> [ 9] org.apache.beam.runners.core.SystemReduceFn.onTrigger
> [10] org.apache.beam.runners.core.ReduceFnRunner.onTrigger
> [11] org.apache.beam.runners.core.ReduceFnRunner.emit
> [12] org.apache.beam.runners.core.ReduceFnRunner.processElements
> [13] 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
> [14] 
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
> [15] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
> [16] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
> [17] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
> [18] 
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
> [19] 
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
> [20] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
> [21] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
> [22] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
> [23] org.apache.flink.runtime.taskmanager.Task.run
> [24] java.lang.Thread.run
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to