[
https://issues.apache.org/jira/browse/FLINK-30032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17634514#comment-17634514
]
Martijn Visser commented on FLINK-30032:
----------------------------------------
[~functioner] Given that Flink 1.14 isn't supported by the community anymore,
can you verify this with preferably Flink 1.16?
> IOException during MAX_WATERMARK emission causes message missing
> ----------------------------------------------------------------
>
> Key: FLINK-30032
> URL: https://issues.apache.org/jira/browse/FLINK-30032
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.14.0
> Reporter: Haoze Wu
> Priority: Major
>
> We are doing testing on Flink (version 1.14.0). We launch 1
> StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a
> Flink client which submit a WordCount workload. The code is similar to
> [https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java],
> and we only add a Kafka topic output:
> {code:java}
> private static DataStreamSink<String> addKafkaSink(
> final DataStream<String> events, final String brokers, final
> String topic) {
> return events.sinkTo(KafkaSink.<String>builder()
> .setBootstrapServers(brokers)
> .setRecordSerializer(
> KafkaRecordSerializationSchema.<String>builder()
> .setValueSerializationSchema(new
> SimpleStringSchema())
> .setTopic(topic)
> .build())
> .build());
> }
> public static void run(final String[] args) throws Exception {
> final String brokers = args[0];
> final String textFilePath = args[1];
> final String kafkaTopic = args[2];
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> final DataStream<String> text = env.readTextFile(textFilePath);
> final DataStream<Tuple2<String, Integer>> counts =
> text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1);
> addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic);
> final long nano = System.nanoTime();
> env.execute("WordCount");
> FlinkGrayClientMain.reply("success", nano);
> }
> {code}
> We found that sometimes the Kafka topic fails to receive a few messages. We
> reproduce the symptom multiple times. We found that the Kafka topic always
> gets 160~169 messages while the expected number of messages is 170. We also
> found that the missing messages are always the expected last few messages
> from the 170 expected messages.
> Then we inspect the logs and code.
> First, we have an IOException to one of the TaskManagerRunner:
> {code:java}
> 2021-11-02T17:43:41,070 WARN source.ContinuousFileReaderOperator
> (ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark
> while closing
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to next operator
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> [flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> [flink-dist_2.11-1.14.0.jar:1.14.0]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> [flink-dist_2.11-1.14.0.jar:1.14.0]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> [flink-dist_2.11-1.14.0.jar:1.14.0]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> Caused by: java.lang.RuntimeException: McGray injected exception
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> at
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112)
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
> ... 24 more
> Caused by: java.io.IOException {code}
> The IOException is from line 104 in RecordWriter#emit:
> {code:java}
> protected void emit(T record, int targetSubpartition) throws IOException {
> checkErroneous();
> targetPartition.emitRecord(serializeRecord(serializer, record),
> targetSubpartition); // line 104
> if (flushAlways) {
> targetPartition.flush(targetSubpartition);
> }
> } {code}
> Here, `targetPartition.emitRecord` will finally call some file I/O or memory
> map I/O, triggering the IOException for some reason.
> This exception is caught at `RecordWriterOutput#emitWatermark`:
> {code:java}
> @Override
> public void emitWatermark(Watermark mark) {
> if (announcedStatus.isIdle()) {
> return;
> }
> watermarkGauge.setCurrentWatermark(mark.getTimestamp());
> serializationDelegate.setInstance(mark);
> try {
> recordWriter.broadcastEmit(serializationDelegate);
> } catch (Exception e) {
> throw new RuntimeException(e.getMessage(), e);
> }
> } {code}
> And then caught at `ChainingOutput#emitWatermark`:
> {code:java}
> @Override
> public void emitWatermark(Watermark mark) {
> if (announcedStatus.isIdle()) {
> return;
> }
> try {
> watermarkGauge.setCurrentWatermark(mark.getTimestamp());
> input.processWatermark(mark);
> } catch (Exception e) {
> throw new ExceptionInChainedOperatorException(e);
> }
> } {code}
> And finally caught at `ContinuousFileReaderOperator#finish`:
> {code:java}
> @Override
> public void finish() throws Exception {
> LOG.debug("finishing");
> super.finish(); switch (state) {
> case IDLE:
> switchState(ReaderState.FINISHED);
> break;
> case FINISHED:
> LOG.warn("operator is already closed, doing nothing");
> return;
> default:
> switchState(ReaderState.FINISHING);
> while (!state.isTerminal()) {
> executor.yield();
> }
> }
> try {
> sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
> } catch (Exception e) {
> LOG.warn("unable to emit watermark while closing", e);
> }
> } {code}
> Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation.
> In Flink (version 1.14.0), the full call stack of all the aforementioned
> workflow is:
> {code:java}
> org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67
> org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119
> org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
> org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605
> org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112
> org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117
> org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549
> org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203
> org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809
> org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761
> org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958
> org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937
> org.apache.flink.runtime.taskmanager.Task#doRun:766
> org.apache.flink.runtime.taskmanager.Task#run:575
> java.lang.Thread#run:748 {code}
> We think the reason for missing a few ending messages in Kafka topic is in
> `ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack):
> {code:java}
> @Override
> public void broadcastEmit(T record) throws IOException {
> checkErroneous();
> // Emitting to all channels in a for loop can be better than calling
> // ResultPartitionWriter#broadcastRecord because the broadcastRecord
> // method incurs extra overhead.
> ByteBuffer serializedRecord = serializeRecord(serializer, record);
> for (int channelIndex = 0; channelIndex < numberOfChannels;
> channelIndex++) {
> serializedRecord.rewind();
> emit(record, channelIndex); // line 67
> }
> if (flushAlways) {
> flushAll();
> }
> } {code}
> Line 67 tries to emit `Watermark.MAX_WATERMARK` (from
> `ContinuousFileReaderOperator#finish`) to all channels. When the IOException
> is thrown here, `ContinuousFileReaderOperator#finish` swallows all the
> exceptions and loop fails to continue running line 67 for the remaining
> channels. We reproduce the symptom multiple times and we found the number of
> missing messages is exactly equal to the number of affected channels.
> That being said, we suspect the potential IOException at line 67 is not
> properly handled because the current symptom and logging is not convenient
> for the user to notice the issue or debug. The user may suddenly get a few
> ending messages missing. And then the user can only find that there is some
> IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users
> still don’t know why and how a few ending messages are missing.
> We would like to propose a fix for this issue. A simple solution is catching
> the IOException at line 67 and then do some logging and maybe retry to emit.
> We implemented this solution and found the symptom disappears. However, we
> also found that this `broadcastEmit` method is called at many places. So,
> this fix will also affect the other callers, and we are not sure whether this
> behavior is also proper for those callers.
> We are looking for suggestions and feedback. Thanks!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)