[
https://issues.apache.org/jira/browse/FLINK-3688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15223353#comment-15223353
]
Konstantin Knauf commented on FLINK-3688:
-----------------------------------------
Regarding dropping Watermarks in {{StreamRecordSerializer.serialize()}}. Did
you mean to change {{StreamRecordSerializer<T> extends
TypeSerializer<StreamRecord<T>}} to {{StreamRecordSerializer<T> extends
TypeSerializer<StreamElement>}}? If so questions about that:
# This induces some changes to {{WindowedStream}} and other classes, like the
{{KeyedCEPPatternOperator}}, which affects type safety there. So I am not to
sure about the change.
# How to handle non-StreamRecord StreamElements in copy-methods. I throw
RuntimeExceptions now.
You can have a look at this change here:
https://github.com/knaufk/flink/tree/FLINK-3688.
If not, we have to drop Watermarks somewhere higher up the chain, I think, or
am I missing something?
We could also only merge 4. as this fixes the issue, and think about how to
change StreamRecordSerializer in a separate issue.
> ClassCastException in StreamRecordSerializer when WindowOperator.trigger() is
> called and TimeCharacteristic = ProcessingTime
> ----------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-3688
> URL: https://issues.apache.org/jira/browse/FLINK-3688
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.0.0
> Reporter: Konstantin Knauf
> Assignee: Konstantin Knauf
> Priority: Critical
>
> Hi,
> when using {{TimeCharacteristics.ProcessingTime}} a ClassCastException is
> thrown in {{StreamRecordSerializer}} when
> {{WindowOperator.processWatermark()}} is called from
> {{WindowOperator.trigger()}}, i.e. whenever a ProcessingTimeTimer is
> triggered.
> The problem seems to be that {{processWatermark()}} is also called in
> {{trigger()}}, when time characteristic is ProcessingTime, but in
> {{RecordWriterOutput}} {{enableWatermarkMultiplexing}} is {{false}} and the
> {{TypeSerializer}} is a {{StreamRecordSerializer}}, which ultimately leads to
> the ClassCastException. Do you agree?
> If this is indeed a bug, there several possible solutions.
> # Only calling {{processWatermark()}} in {{trigger()}}, when
> TimeCharacteristic is EventTime
> # Not calling {{processWatermark()}} in {{trigger()}} at all, instead wait
> for the next watermark to trigger the EventTimeTimers with a timestamp behind
> the current watermark. This is, of course, a trade off.
> # Using {{MultiplexingStreamRecordSerializer}} all the time, but I have no
> idea what the side effect of this change would be. I assume there is a reason
> for existence of the StreamRecordSerializer ;)
> StackTrace:
> {quote}
> TimerException\{java.lang.RuntimeException:
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord\}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:716)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.RuntimeException:
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293)
> at
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:710)
> ... 7 more
> Caused by: java.lang.ClassCastException:
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
> at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)
> ... 11 more
> {quote}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)