[
https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17230183#comment-17230183
]
Roman Khachatryan edited comment on FLINK-20097 at 11/11/20, 8:05 PM:
----------------------------------------------------------------------
OK, the tests fail with the increased checkpointing frequency plus some other
settings updated.
I don't know whether the root cause is what I described above.
Changes:
{code:java}
long minCheckpoints = 100;
env.enableCheckpointing(10);
env.getCheckpointConfig().setAlignmentTimeout(0);
env.getCheckpointConfig().setCheckpointTimeout(10); // minimum allowed
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
// can be lower because of the high TolerableCheckpointFailureNumber
// collector.checkThat(result.<Integer>getAccumulatorResult(NUM_FAILURES),
equalTo(EXPECTED_FAILURES));
sink:
if (random.nextInt(100) == 42) {
Thread.sleep(7);
}
{code}
Failures:
shouldPerformUnalignedCheckpointOnParallelRemoteChannel: NUM_DUPLICATES > 0 (2
out of 70 runs)
With CheckpointTimeout 5, interval 1, sleep 1
I also get: two out-of-order and one corrupted state on recovery:
{code}
Caused by: java.io.EOFException
at
org.apache.flink.core.memory.DataInputDeserializer.readByte(DataInputDeserializer.java:134)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:199)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:92)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:145)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:574)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:538)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.lang.Thread.run(Thread.java:748)
{code}
Corrupted state is very similar to what I observed in FLINK-19681.
was (Author: roman_khachatryan):
OK, the tests fail with the increased checkpointing frequency plus some other
settings updated.
I don't know whether the root cause is what I described above.
Changes:
{code:java}
long minCheckpoints = 100;
env.enableCheckpointing(10);
env.getCheckpointConfig().setAlignmentTimeout(0);
env.getCheckpointConfig().setCheckpointTimeout(10); // minimum allowed
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(0);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
// can be lower because of the high TolerableCheckpointFailureNumber
// collector.checkThat(result.<Integer>getAccumulatorResult(NUM_FAILURES),
equalTo(EXPECTED_FAILURES));
sink:
if (random.nextInt(100) == 42) {
Thread.sleep(7);
}
{code}
Failures:
shouldPerformUnalignedCheckpointOnParallelRemoteChannel: NUM_DUPLICATES > 0 (2
out of 70 runs)
I've also observed recovery problems (corrupted stream) similar to FLINK-19681
likely caused by corrupted InputChannel state.
> Race conditions in InputChannel.ChannelStatePersister (confirm)
> ---------------------------------------------------------------
>
> Key: FLINK-20097
> URL: https://issues.apache.org/jira/browse/FLINK-20097
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing, Runtime / Network
> Affects Versions: 1.12.0
> Reporter: Roman Khachatryan
> Assignee: Roman Khachatryan
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.12.0
>
>
> In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier()
> always update pendingCheckpointBarrierId, potentially overwriting newer id
> (or BARRIER_RECEIVED value) with an old one.
> For stopPersisting(), consider a case:
> # Two consecutive UC barriers arrive at the same channel (1st being stale at
> some point)
> # In RemoteInputChannel.onBuffer, netty thread updates
> pendingCheckpointBarrierId to BARRIER_RECEIVED
> # Task thread processes the 1st barrier and triggers a checkpoint
> Task thread processes the 2nd barrier and aborts 1st checkpoint, calling
> stopPersisting() from UC controller and setting pendingCheckpointBarrierId to
> CHECKPOINT_COMPLETED
> # Task thread starts 2nd checkpoint and calls startPersisting() setting
> pendingCheckpointBarrierId to 2
> # now new buffers have a chance to be included in the 2nd checkpoint (though
> they belong to the next one)
>
> For pendingCheckpointBarrierId(), consider an input gate with two channels A
> and B and two barriers 1 and 2:
> # Channel A receives both barriers, channel B receives nothing yet
> # Task thread processes both barriers on A, eventually triggering 2nd
> checkpoint
> # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2)
> # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED
> # No buffers in B between barriers 1 and 2 will be included in the
> checkpoint
> # Channel B receives the 2nd barrier which will eventually conclude the
> checkpoint
>
> I see a solution in doing an action only if passed checkpointId >=
> pendingCheckpointId. For that, a separate field will be needed to hold the
> status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it
> shouldn't be a problem.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)