Haoze Wu created FLINK-30032:
--------------------------------
Summary: IOException during MAX_WATERMARK emission causes message
missing
Key: FLINK-30032
URL: https://issues.apache.org/jira/browse/FLINK-30032
Project: Flink
Issue Type: Bug
Affects Versions: 1.14.0
Reporter: Haoze Wu
We are doing testing on Flink (version 1.14.0). We launch 1
StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a Flink
client which submit a WordCount workload. The code is similar to
[https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java],
and we only add a Kafka topic output:
{code:java}
private static DataStreamSink<String> addKafkaSink(
final DataStream<String> events, final String brokers, final String
topic) {
return events.sinkTo(KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(
KafkaRecordSerializationSchema.<String>builder()
.setValueSerializationSchema(new
SimpleStringSchema())
.setTopic(topic)
.build())
.build());
}
public static void run(final String[] args) throws Exception {
final String brokers = args[0];
final String textFilePath = args[1];
final String kafkaTopic = args[2];
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final DataStream<String> text = env.readTextFile(textFilePath);
final DataStream<Tuple2<String, Integer>> counts =
text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1);
addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic);
final long nano = System.nanoTime();
env.execute("WordCount");
FlinkGrayClientMain.reply("success", nano);
}
{code}
We found that sometimes the Kafka topic fails to receive a few messages. We
reproduce the symptom multiple times. We found that the Kafka topic always gets
160~169 messages while the expected number of messages is 170. We also found
that the missing messages are always the expected last few messages from the
170 expected messages.
Then we inspect the logs and code.
First, we have an IOException to one of the TaskManagerRunner:
{code:java}
2021-11-02T17:43:41,070 WARN source.ContinuousFileReaderOperator
(ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark
while closing
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
[flink-dist_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
[flink-dist_2.11-1.14.0.jar:1.14.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
[flink-dist_2.11-1.14.0.jar:1.14.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: java.lang.RuntimeException: McGray injected exception
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
at
org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112)
~[flink-dist_2.11-1.14.0.jar:1.14.0]
... 24 more
Caused by: java.io.IOException {code}
The IOException is from line 104 in RecordWriter#emit:
{code:java}
protected void emit(T record, int targetSubpartition) throws IOException {
checkErroneous();
targetPartition.emitRecord(serializeRecord(serializer, record),
targetSubpartition); // line 104
if (flushAlways) {
targetPartition.flush(targetSubpartition);
}
} {code}
Here, `targetPartition.emitRecord` will finally call some file I/O or memory
map I/O, triggering the IOException for some reason.
This exception is caught at `RecordWriterOutput#emitWatermark`:
{code:java}
@Override
public void emitWatermark(Watermark mark) {
if (announcedStatus.isIdle()) {
return;
}
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
serializationDelegate.setInstance(mark);
try {
recordWriter.broadcastEmit(serializationDelegate);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
} {code}
And then caught at `ChainingOutput#emitWatermark`:
{code:java}
@Override
public void emitWatermark(Watermark mark) {
if (announcedStatus.isIdle()) {
return;
}
try {
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
input.processWatermark(mark);
} catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
} {code}
And finally caught at `ContinuousFileReaderOperator#finish`:
{code:java}
@Override
public void finish() throws Exception {
LOG.debug("finishing");
super.finish(); switch (state) {
case IDLE:
switchState(ReaderState.FINISHED);
break;
case FINISHED:
LOG.warn("operator is already closed, doing nothing");
return;
default:
switchState(ReaderState.FINISHING);
while (!state.isTerminal()) {
executor.yield();
}
}
try {
sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
} catch (Exception e) {
LOG.warn("unable to emit watermark while closing", e);
}
} {code}
Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation.
In Flink (version 1.14.0), the full call stack of all the aforementioned
workflow is:
{code:java}
org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67
org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119
org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605
org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112
org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544
org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117
org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549
org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203
org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809
org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761
org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958
org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937
org.apache.flink.runtime.taskmanager.Task#doRun:766
org.apache.flink.runtime.taskmanager.Task#run:575
java.lang.Thread#run:748 {code}
We think the reason for missing a few ending messages in Kafka topic is in
`ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack):
{code:java}
@Override
public void broadcastEmit(T record) throws IOException {
checkErroneous();
// Emitting to all channels in a for loop can be better than calling
// ResultPartitionWriter#broadcastRecord because the broadcastRecord
// method incurs extra overhead.
ByteBuffer serializedRecord = serializeRecord(serializer, record);
for (int channelIndex = 0; channelIndex < numberOfChannels;
channelIndex++) {
serializedRecord.rewind();
emit(record, channelIndex); // line 67
}
if (flushAlways) {
flushAll();
}
} {code}
Line 67 tries to emit `Watermark.MAX_WATERMARK` (from
`ContinuousFileReaderOperator#finish`) to all channels. When the IOException is
thrown here, `ContinuousFileReaderOperator#finish` swallows all the exceptions
and loop fails to continue running line 67 for the remaining channels. We
reproduce the symptom multiple times and we found the number of missing
messages is exactly equal to the number of affected channels.
That being said, we suspect the potential IOException at line 67 is not
properly handled because the current symptom and logging is not convenient for
the user to notice the issue or debug. The user may suddenly get a few ending
messages missing. And then the user can only find that there is some
IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users still
don’t know why and how a few ending messages are missing.
We would like to propose a fix for this issue. A simple solution is catching
the IOException at line 67 and then do some logging and maybe retry to emit. We
implemented this solution and found the symptom disappears. However, we also
found that this `broadcastEmit` method is called at many places. So, this fix
will also affect the other callers, and we are not sure whether this behavior
is also proper for those callers.
We are looking for suggestions and feedback. Thanks!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)