Jozef Vilcek created BEAM-7204:
----------------------------------
Summary: ReduceFnRunner overhead
Key: BEAM-7204
URL: https://issues.apache.org/jira/browse/BEAM-7204
Project: Beam
Issue Type: Improvement
Components: runner-flink, sdk-java-core
Reporter: Jozef Vilcek
More context can be found in discussion here:
[http://mail-archives.apache.org/mod_mbox/beam-dev/201904.mbox/%3CCAOUjMkyKV8npYJfS_PF3Gzo=vwomb2frzute81zsrxnm13t...@mail.gmail.com%3E]
I have found out on FlinkRunner streaming pipeline there is an overhead
associated with processing each element at:
# ReduceFnRunner.scheduleGarbageCollectionTimer() for window
# tracking PaneInfo
This cause quite some trash for JVM GC. At least second option also involves
interaction with state backend.
Relevant stacks for illustration:
{noformat}
...
[ 3]
org.apache.flink.streaming.api.operators.HeapInternalTimerService.deleteEventTimeTimer
[ 4]
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.deleteTimer
[ 5]
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.cancelPendingTimerById
[ 6]
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.setTimer
[ 7] org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer
[ 8] org.apache.beam.runners.core.ReduceFnRunner.scheduleGarbageCollectionTimer
[ 9] org.apache.beam.runners.core.ReduceFnRunner.processElement
[10] org.apache.beam.runners.core.ReduceFnRunner.processElements
[11]
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
[12]
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
[13] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
[14] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
[15] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
[16]
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
[17]
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
[18] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
[19] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
[20] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
[21] org.apache.flink.runtime.taskmanager.Task.run
[22] java.lang.Thread.run
{noformat}
{noformat}
[ 0] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get
[ 1] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get
[ 2] org.apache.flink.runtime.state.heap.HeapValueState.value
[ 3]
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkValueState.read
[ 4] org.apache.beam.runners.core.PaneInfoTracker$1.read
[ 5] org.apache.beam.runners.core.PaneInfoTracker$1.read
[ 6] org.apache.beam.runners.core.ReduceFnRunner.onTrigger
[ 7] org.apache.beam.runners.core.ReduceFnRunner.emit
[ 8] org.apache.beam.runners.core.ReduceFnRunner.processElements
[ 9]
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
[10]
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
[11] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
[12] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
[13] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
[14]
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
[15]
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
[16] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
[17] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
[18] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
[19] org.apache.flink.runtime.taskmanager.Task.run
[20] java.lang.Thread.run
{noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)