[
https://issues.apache.org/jira/browse/FLINK-22105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17316160#comment-17316160
]
Roman Khachatryan edited comment on FLINK-22105 at 4/7/21, 9:45 AM:
--------------------------------------------------------------------
The immediate cause is that the test writes channel state but doesn't register
a checkpoint with the writer.
The reason why it is unstable is that writer is async and it isn't closed in
the test (which would fail).
edit: production code path handles this case correctly (by calling
channelStateWriter.start from
SubtaskCheckpointCoordinatorImpl.initInputsCheckpoint)
-IIUC, this can also happen in production, for example given DAG:-
{code:java}
src ---pointwise--> op1 ---hash--> op2 ---> sink{code}
# src will convert the barrier before sending to op1 to force aligned
# op1 will NOT start writing channel state (because alignmentType != UNALIGNED)
# op1 will convert the barrier back to unaligned before broadcasting it
# op1 will call finishOutput as barrier is now UNALIGNED
-The fix is pretty simple though: in (4) check the alignment type of the
original barrier (as it was received).-
cc: [~AHeise]
was (Author: roman_khachatryan):
The immediate cause is that the test writes channel state but doesn't register
a checkpoint with the writer.
The reason why it is unstable is that writer is async and it isn't closed in
the test (which would fail).
IIUC, this can also happen in production, for example given DAG:
{code}src ---pointwise--> op1 ---hash--> op2 ---> sink{code}
# src will convert the barrier before sending to op1 to force aligned
# op1 will NOT start writing channel state (because alignmentType != UNALIGNED)
# op1 will convert the barrier back to unaligned before broadcasting it
# op1 will call finishOutput as barrier is now UNALIGNED
The fix is pretty simple though: in (4) check the alignment type of the
original barrier (as it was received).
cc: [~AHeise]
> SubtaskCheckpointCoordinatorTest.testForceAlignedCheckpointResultingInPriorityEvents
> unstable
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-22105
> URL: https://issues.apache.org/jira/browse/FLINK-22105
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.13.0
> Reporter: Robert Metzger
> Assignee: Roman Khachatryan
> Priority: Critical
> Labels: test-stability
> Fix For: 1.13.0
>
>
> https://dev.azure.com/rmetzger/Flink/_build/results?buildId=9021&view=logs&j=9dc1b5dc-bcfa-5f83-eaa7-0cb181ddc267&t=ab910030-93db-52a7-74a3-34a0addb481b
> {code}
> 2021-04-01T19:29:55.2392858Z [ERROR] Tests run: 10, Failures: 0, Errors: 1,
> Skipped: 0, Time elapsed: 1.921 s <<< FAILURE! - in
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorTest
> 2021-04-01T19:29:55.2396751Z [ERROR]
> testForceAlignedCheckpointResultingInPriorityEvents(org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorTest)
> Time elapsed: 0.02 s <<< ERROR!
> 2021-04-01T19:29:55.2397415Z java.lang.RuntimeException: unable to send
> request to worker
> 2021-04-01T19:29:55.2397956Z at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:228)
> 2021-04-01T19:29:55.2398603Z at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.finishOutput(ChannelStateWriterImpl.java:183)
> 2021-04-01T19:29:55.2399310Z at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
> 2021-04-01T19:29:55.2400104Z at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorTest.testForceAlignedCheckpointResultingInPriorityEvents(SubtaskCheckpointCoordinatorTest.java:215)
> 2021-04-01T19:29:55.2400746Z at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-04-01T19:29:55.2401202Z at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-04-01T19:29:55.2401746Z at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-04-01T19:29:55.2402237Z at
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-04-01T19:29:55.2402722Z at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-04-01T19:29:55.2403270Z at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-04-01T19:29:55.2403818Z at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-04-01T19:29:55.2404354Z at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-04-01T19:29:55.2404854Z at
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2021-04-01T19:29:55.2405359Z at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2021-04-01T19:29:55.2405896Z at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2021-04-01T19:29:55.2406393Z at
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2021-04-01T19:29:55.2406855Z at
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2021-04-01T19:29:55.2407331Z at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2021-04-01T19:29:55.2407806Z at
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2021-04-01T19:29:55.2408279Z at
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2021-04-01T19:29:55.2408907Z at
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2021-04-01T19:29:55.2409403Z at
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> 2021-04-01T19:29:55.2409954Z at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> 2021-04-01T19:29:55.2410524Z at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> 2021-04-01T19:29:55.2411318Z at
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> 2021-04-01T19:29:55.2411880Z at
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> 2021-04-01T19:29:55.2412467Z at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> 2021-04-01T19:29:55.2413006Z at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> 2021-04-01T19:29:55.2413519Z at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> 2021-04-01T19:29:55.2414082Z Caused by: java.lang.IllegalArgumentException:
> writer not found while processing request: writeOutput 0
> 2021-04-01T19:29:55.2414726Z at
> org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest.onWriterMissing(ChannelStateWriteRequest.java:223)
> 2021-04-01T19:29:55.2415463Z at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:80)
> 2021-04-01T19:29:55.2416331Z at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatch(ChannelStateWriteRequestDispatcherImpl.java:59)
> 2021-04-01T19:29:55.2417066Z at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:96)
> 2021-04-01T19:29:55.2417782Z at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.run(ChannelStateWriteRequestExecutorImpl.java:75)
> 2021-04-01T19:29:55.2418329Z at java.lang.Thread.run(Thread.java:748)
> 2021-04-01T19:29:55.2418741Z Suppressed: java.lang.IllegalStateException:
> not running
> 2021-04-01T19:29:55.2419343Z at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.ensureRunning(ChannelStateWriteRequestExecutorImpl.java:152)
> 2021-04-01T19:29:55.2420249Z at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submitInternal(ChannelStateWriteRequestExecutorImpl.java:144)
> 2021-04-01T19:29:55.2421010Z at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.submit(ChannelStateWriteRequestExecutorImpl.java:128)
> 2021-04-01T19:29:55.2421694Z at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.enqueue(ChannelStateWriterImpl.java:225)
> 2021-04-01T19:29:55.2422333Z at
> org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl.finishOutput(ChannelStateWriterImpl.java:183)
> 2021-04-01T19:29:55.2423029Z at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
> 2021-04-01T19:29:55.2423820Z at
> org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorTest.testForceAlignedCheckpointResultingInPriorityEvents(SubtaskCheckpointCoordinatorTest.java:215)
> 2021-04-01T19:29:55.2425923Z at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2021-04-01T19:29:55.2426529Z at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2021-04-01T19:29:55.2427162Z at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2021-04-01T19:29:55.2427727Z at
> java.lang.reflect.Method.invoke(Method.java:498)
> 2021-04-01T19:29:55.2428283Z at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2021-04-01T19:29:55.2429008Z at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2021-04-01T19:29:55.2429611Z at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2021-04-01T19:29:55.2430357Z at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2021-04-01T19:29:55.2430921Z at
> org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> 2021-04-01T19:29:55.2431809Z at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> 2021-04-01T19:29:55.2432459Z at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> 2021-04-01T19:29:55.2433013Z at
> org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> 2021-04-01T19:29:55.2433513Z at
> org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> 2021-04-01T19:29:55.2434043Z at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> 2021-04-01T19:29:55.2434568Z at
> org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> 2021-04-01T19:29:55.2435093Z at
> org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> 2021-04-01T19:29:55.2435609Z at
> org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> 2021-04-01T19:29:55.2436161Z at
> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
> 2021-04-01T19:29:55.2436772Z at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
> 2021-04-01T19:29:55.2437399Z at
> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
> 2021-04-01T19:29:55.2438127Z at
> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
> 2021-04-01T19:29:55.2438881Z at
> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
> 2021-04-01T19:29:55.2439509Z at
> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
> 2021-04-01T19:29:55.2440088Z at
> org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
> 2021-04-01T19:29:55.2440642Z at
> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
> 2021-04-01T19:29:55.2440899Z
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)