[
https://issues.apache.org/jira/browse/FLINK-17322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17113083#comment-17113083
]
Arvid Heise edited comment on FLINK-17322 at 5/22/20, 8:45 AM:
---------------------------------------------------------------
_Removed old message._
Bug seems to be related to random emit within BroadcastRecordWriter. I have
found a simple fix, which influences latency measurement on source level (will
underestimate lag). I will dig deeper to find a proper fix.
was (Author: aheise):
There is a fundamental issue with latency markers and legacy sources that also
applies to the current master.
Latency markers are emitted over task thread, while all other records come over
legacy thread. Since buffer builder is not thread-safe, we should not interact
with it over task thread at all.
However, I also don't see a way to reliably emit them over legacy thread. The
only way would be to fall back to record hand-over from legacy to task thread
and then do everything in task thread, but that would degrade performance
tremendously.
So until we get FLIP-27, it might be that legacy markers are not working.
> Enable latency tracker would corrupt the broadcast state
> --------------------------------------------------------
>
> Key: FLINK-17322
> URL: https://issues.apache.org/jira/browse/FLINK-17322
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.9.3, 1.10.1
> Reporter: Yun Tang
> Assignee: Arvid Heise
> Priority: Major
> Fix For: 1.11.0
>
> Attachments:
> Telematics2-feature-flink-1.10-latency-tracking-broken.zip
>
>
> This bug is reported from user mail list:
>
> [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Latency-tracking-together-with-broadcast-state-can-cause-job-failure-td34013.html]
> Execute {{BroadcastStateIT#broadcastStateWorksWithLatencyTracking}} would
> easily reproduce this problem.
> From current information, the broadcast element would be corrupt once we
> enable {{env.getConfig().setLatencyTrackingInterval(2000)}}.
> The exception stack trace would be: (based on current master branch)
> {code:java}
> Caused by: java.io.IOException: Corrupt stream, found tag: 84
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:217)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
> ~[classes/:?]
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> ~[classes/:?]
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:157)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:123)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:181)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:332)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:206)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:196)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:505)
> ~[classes/:?]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:485)
> ~[classes/:?]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:720)
> ~[classes/:?]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:544)
> ~[classes/:?]
> at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_144]
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)