[
https://issues.apache.org/jira/browse/BEAM-7204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Maximilian Michels updated BEAM-7204:
-------------------------------------
Status: Open (was: Triage Needed)
> ReduceFnRunner overhead
> -----------------------
>
> Key: BEAM-7204
> URL: https://issues.apache.org/jira/browse/BEAM-7204
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink, sdk-java-core
> Reporter: Jozef Vilcek
> Priority: Major
> Labels: performance
>
> More context can be found in discussion here:
> [http://mail-archives.apache.org/mod_mbox/beam-dev/201904.mbox/%3CCAOUjMkyKV8npYJfS_PF3Gzo=vwomb2frzute81zsrxnm13t...@mail.gmail.com%3E]
> I have found out on FlinkRunner streaming pipeline there is an overhead
> associated with processing each element at:
> # ReduceFnRunner.scheduleGarbageCollectionTimer() for window
> # tracking PaneInfo
> This cause quite some trash for JVM GC. At least second option also involves
> interaction with state backend.
>
> Relevant stacks for illustration:
> ScheduleGarbageCollectionTimer
> {noformat}
> ...
> [ 3]
> org.apache.flink.streaming.api.operators.HeapInternalTimerService.deleteEventTimeTimer
> [ 4]
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.deleteTimer
> [ 5]
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.cancelPendingTimerById
> [ 6]
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator$FlinkTimerInternals.setTimer
> [ 7] org.apache.beam.runners.core.ReduceFnContextFactory$TimersImpl.setTimer
> [ 8]
> org.apache.beam.runners.core.ReduceFnRunner.scheduleGarbageCollectionTimer
> [ 9] org.apache.beam.runners.core.ReduceFnRunner.processElement
> [10] org.apache.beam.runners.core.ReduceFnRunner.processElements
> [11]
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
> [12]
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
> [13] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
> [14] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
> [15] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
> [16]
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
> [17]
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
> [18] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
> [19] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
> [20] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
> [21] org.apache.flink.runtime.taskmanager.Task.run
> [22] java.lang.Thread.run
> {noformat}
> PaneInfoTracker: Read
> {noformat}
> [ 0] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get
> [ 1] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get
> [ 2] org.apache.flink.runtime.state.heap.HeapValueState.value
> [ 3]
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkValueState.read
> [ 4] org.apache.beam.runners.core.PaneInfoTracker$1.read
> [ 5] org.apache.beam.runners.core.PaneInfoTracker$1.read
> [ 6] org.apache.beam.runners.core.ReduceFnRunner.onTrigger
> [ 7] org.apache.beam.runners.core.ReduceFnRunner.emit
> [ 8] org.apache.beam.runners.core.ReduceFnRunner.processElements
> [ 9]
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
> [10]
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
> [11] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
> [12] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
> [13] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
> [14]
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
> [15]
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
> [16] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
> [17] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
> [18] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
> [19] org.apache.flink.runtime.taskmanager.Task.run
> [20] java.lang.Thread.run
> {noformat}
> PaneInfoTracker: Write
> {noformat}
> [ 0] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.putEntry
> [ 1] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.put
> [ 2] org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.put
> [ 3] org.apache.flink.runtime.state.heap.HeapValueState.update
> [ 4]
> org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkValueState.write
> [ 5] org.apache.beam.runners.core.PaneInfoTracker.storeCurrentPaneInfo
> [ 6] org.apache.beam.runners.core.ReduceFnRunner.lambda$onTrigger$1
> [ 7] org.apache.beam.runners.core.ReduceFnRunner$$Lambda$101.211931975.output
> [ 8]
> org.apache.beam.runners.core.ReduceFnContextFactory$OnTriggerContextImpl.output
> [ 9] org.apache.beam.runners.core.SystemReduceFn.onTrigger
> [10] org.apache.beam.runners.core.ReduceFnRunner.onTrigger
> [11] org.apache.beam.runners.core.ReduceFnRunner.emit
> [12] org.apache.beam.runners.core.ReduceFnRunner.processElements
> [13]
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement
> [14]
> org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement
> [15] org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement
> [16] org.apache.beam.runners.core.SimpleDoFnRunner.processElement
> [17] org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement
> [18]
> org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement
> [19]
> org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement
> [20] org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
> [21] org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run
> [22] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
> [23] org.apache.flink.runtime.taskmanager.Task.run
> [24] java.lang.Thread.run
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)