[
https://issues.apache.org/jira/browse/BEAM-11146?focusedWorklogId=508041&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-508041
]
ASF GitHub Bot logged work on BEAM-11146:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 05/Nov/20 09:01
Start Date: 05/Nov/20 09:01
Worklog Time Spent: 10m
Work Description: rHermes commented on pull request #13240:
URL: https://github.com/apache/beam/pull/13240#issuecomment-722241371
> @rHermes good job ;)
Thanks! It's my first change to a big open source project!
> Anyway, I think it's safe to change default value of `objectReuse` to
`true` and maybe deprecate it?
As it stands now, I don't think Beam is in a position to utilize it for its
intended purpose. I have no thoughts on deprecating it, as I don't know if the
type system is something Beam will change in the future.
> Quick verification of used serializers on two of our production
pipelines...
>
> First:
>
> ```shell
> $ kubectl exec pipelines-runner-identity-8123e61d-tm-d9f85785-nhv78 --
jmap -histo 1 | grep flink | grep Serializer | grep -v '\$'
> 36: 563802 13531248
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer
> 633: 68 4896
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> 945: 74 1184
org.apache.flink.runtime.state.ArrayListSerializer
> 977: 44 1056
org.apache.flink.runtime.types.PriorityQueueSerializer
> 1029: 36 864
[Lorg.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
> 1053: 26 832
org.apache.flink.streaming.api.operators.TimerSerializer
> 1057: 34 816
org.apache.flink.api.common.typeutils.base.MapSerializer
> 1184: 44 704
org.apache.flink.api.java.typeutils.runtime.kryo.JavaSerializer
> 1187: 44 704
org.apache.flink.runtime.types.JavaIterableWrapperSerializer
> 1218: 20 640
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshotData
> 1272: 36 576
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate
> 1412: 26 416
org.apache.flink.runtime.state.JavaSerializer
> 1443: 16 384
org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot
> 1538: 20 320
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot
> 1594: 12 288
org.apache.flink.streaming.api.operators.TimerSerializerSnapshot
> 1653: 16 256
org.apache.flink.api.common.typeutils.base.ListSerializer
> 1820: 8 192
org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot
> 1822: 6 192
org.apache.flink.core.memory.DataOutputSerializer
> 2872: 4 64
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer
> 3155: 3 48
org.apache.flink.api.common.typeutils.base.StringSerializer
> 3162: 2 48
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer
> 5768: 1 16
org.apache.flink.runtime.state.VoidNamespaceSerializer
> ```
>
> Second:
>
> ```shell
> $ kubectl exec pipelines-runner-default-39c08fc2-tm-6474fb97c5-64s8q --
jmap -histo 1 | grep flink | grep Serializer | grep -v '\$'
> 244: 18950 454800
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer
> 688: 2031 32496
org.apache.flink.runtime.state.ArrayListSerializer
> 791: 1307 20912
org.apache.flink.api.common.typeutils.base.ListSerializer
> 884: 682 16368
[Lorg.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
> 976: 682 10912
org.apache.flink.api.common.typeutils.NestedSerializersSnapshotDelegate
> 1014: 320 10240
org.apache.flink.core.memory.DataOutputSerializer
> 1072: 324 7776
org.apache.flink.api.common.typeutils.base.MapSerializer
> 1120: 212 6784
org.apache.flink.streaming.api.operators.TimerSerializer
> 1139: 253 6072
org.apache.flink.api.common.typeutils.base.ListSerializerSnapshot
> 1156: 241 5784
org.apache.flink.api.common.typeutils.base.MapSerializerSnapshot
> 1159: 360 5760
org.apache.flink.runtime.state.JavaSerializer
> 1236: 188 4512
org.apache.flink.streaming.api.operators.TimerSerializerSnapshot
> 1267: 168 4032
[Lorg.apache.flink.api.common.typeutils.TypeSerializer;
> 1473: 80 1920
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer
> 1480: 120 1920
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer
> 2025: 29 464
org.apache.flink.api.common.typeutils.base.StringSerializer
> 5926: 1 16
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer
> 5992: 1 16
org.apache.flink.runtime.state.VoidNamespaceSerializer
> ```
>
> WDYT?
Is this with and without my change enabled? If yes, would you mind sending
me an email with the setup and the two pipelines? It would be very useful data
for my thesis!
If no, I'm kind of confused :sweat_smile:
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 508041)
Time Spent: 2h 10m (was: 2h)
> Add option to disable copying between Flink runner
> ---------------------------------------------------
>
> Key: BEAM-11146
> URL: https://issues.apache.org/jira/browse/BEAM-11146
> Project: Beam
> Issue Type: Improvement
> Components: runner-flink
> Reporter: Teodor Spæren
> Assignee: Teodor Spæren
> Priority: P2
> Labels: performance
> Time Spent: 2h 10m
> Remaining Estimate: 0h
>
> In order to implement Flink
> [TypeSerializer|https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java]
> the runner implements
> [CoderTypeSerializer|https://github.com/apache/beam/blob/master/runners/flink/1.8/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java#L84].
> The way the {{copy}} function is implemented is by first serializing and
> then deserializing each element. This means that such a deep copy needs to be
> done between each operator and this can become a bottleneck.
> The reason the {{copy}} functions need to be implemented is that Flink
> guarantees that elements will be deep copied between each operator. In Beam
> this is the users responsibility and so this is not strictly neccecarry.
> The aim of this improvement is to introduce an option on the Flink Runner,
> that eliminates this overhead, by simply returning the value.
> [Here is the mailing list
> discussion|https://lists.apache.org/thread.html/r24129dba98782e1cf4d18ec738ab9714dceb05ac23f13adfac5baad1%40%3Cdev.beam.apache.org%3E]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)