[
https://issues.apache.org/jira/browse/BEAM-11146?focusedWorklogId=508030&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-508030
]
ASF GitHub Bot logged work on BEAM-11146:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Nov/20 08:21
Start Date: 05/Nov/20 08:21
Worklog Time Spent: 10m
Work Description: dmvk commented on pull request #13240:
URL: https://github.com/apache/beam/pull/13240#issuecomment-722221070
@rHermes good job ;)
@je-ik -1 for reusing the flag
Anyway, I think it's safe to change default value of `objectReuse` to `true`
and maybe deprecate it?
Assumption: Beam has full control of all used `TypeSerializer` instances.
The only one user can "tweak" is CoderTypeSerializer, where `copy(T, T)` is
basically no-op.
Quick verification of used serializers on two of our production pipelines...
First:
```bash
$ kubectl exec pipelines-runner-identity-8123e61d-tm-d9f85785-nhv78 -- jmap
-histo 1 | grep flink | grep Serializer | grep -v '\$'
36: 563802 13531248
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer
633: 68 4896
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
945: 74 1184
org.apache.flink.runtime.state.ArrayListSerializer
977: 44 1056
org.apache.flink.runtime.types.PriorityQueueSerializer
1029: 36 864
[Lorg.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
1053: 26 832
org.apache.flink.streaming.api.operators.TimerSerializer
1057: 34 816
org.apache.flink.api.common.typeutils.base.MapSerializer
1184: 44 704
org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer
1187: 44 704
org.apache.flink.runtime.types.JavaIterableWrapperSerializer
1218: 20 640
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData
1272: 36 576
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate
1412: 26 416
org.apache.flink.runtime.state.JavaSerializer
1443: 16 384
org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot
1538: 20 320
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot
1594: 12 288
org.apache.flink.streaming.api.operators.TimerSerializerSnapshot
1653: 16 256
org.apache.flink.api.common.typeutils.base.ListSerializer
1820: 8 192
org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot
1822: 6 192
org.apache.flink.core.memory.DataOutputSerializer
2872: 4 64
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer
3155: 3 48
org.apache.flink.api.common.typeutils.base.StringSerializer
3162: 2 48
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer
5768: 1 16
org.apache.flink.runtime.state.VoidNamespaceSerializer
```
Second:
```bash
$ kubectl exec pipelines-runner-default-39c08fc2-tm-6474fb97c5-64s8q -- jmap
-histo 1 | grep flink | grep Serializer | grep -v '\$'
244: 18950 454800
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer
688: 2031 32496
org.apache.flink.runtime.state.ArrayListSerializer
791: 1307 20912
org.apache.flink.api.common.typeutils.base.ListSerializer
884: 682 16368
[Lorg.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
976: 682 10912
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate
1014: 320 10240
org.apache.flink.core.memory.DataOutputSerializer
1072: 324 7776
org.apache.flink.api.common.typeutils.base.MapSerializer
1120: 212 6784
org.apache.flink.streaming.api.operators.TimerSerializer
1139: 253 6072
org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot
1156: 241 5784
org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot
1159: 360 5760
org.apache.flink.runtime.state.JavaSerializer
1236: 188 4512
org.apache.flink.streaming.api.operators.TimerSerializerSnapshot
1267: 168 4032
[Lorg.apache.flink.api.common.typeutils.TypeSerializer;
1473: 80 1920
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer
1480: 120 1920
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer
2025: 29 464
org.apache.flink.api.common.typeutils.base.StringSerializer
5926: 1 16
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer
5992: 1 16
org.apache.flink.runtime.state.VoidNamespaceSerializer
```
WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 508030)
Time Spent: 2h (was: 1h 50m)
> Add option to disable copying between Flink runner
> ---------------------------------------------------
>
> Key: BEAM-11146
> URL: https://issues.apache.org/jira/browse/BEAM-11146
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Teodor Spæren
> Assignee: Teodor Spæren
> Priority: P2
> Labels: performance
> Time Spent: 2h
> Remaining Estimate: 0h
>
> In order to implement Flink
> [TypeSerializer|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java]
> the runner implements
> [CoderTypeSerializer|https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84].
> The way the {{copy}} function is implemented is by first serializing and
> then deserializing each element. This means that such a deep copy needs to be
> done between each operator and this can become a bottleneck.
> The reason the {{copy}} functions need to be implemented is that Flink
> guarantees that elements will be deep copied between each operator. In Beam
> this is the users responsibility and so this is not strictly neccecarry.
> The aim of this improvement is to introduce an option on the Flink Runner,
> that eliminates this overhead, by simply returning the value.
> [Here is the mailing list
> discussion|https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)