[ 
https://issues.apache.org/jira/browse/FLINK-20290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237325#comment-17237325
 ] 

Caizhi Weng commented on FLINK-20290:
-------------------------------------

[~azagrebin] [~sewen] It seems to me that the checkpoint state for sink and 
source is inconsistent.

{{CollectSinkFunction}} will write what it has received into its state and read 
it out when it is initialized (that is to say, when the task containing the 
sink restarts). See {{CollectSinkFunction#initializeState}} and 
{{CollectSinkFunction#snapshotState}} for details. I've printed out what is 
stored into the state and what is read from the state and the result is 
presented as follows. The "invoke" log below indicates that the sink receives 
the record:

 !collect-debug.png! 

I've prepared 10 different strings (from "aaaaaaa" to "jjjjjjj") for this test. 
As is shown in the graph, "bbbbbbb" is already in the checkpointed state (It 
appears in the buffer when initializing). However this record is received again 
(invoke: bbbbbbb). This indicates that this record is sent to the sink even if 
it should have been checkpointed and should not be sent.

So I suspect the bug lies either in the checkpointing of the collect sink or in 
that of the file source. Could you check both of the code?

> Duplicated output in FileSource continuous ITCase with TM failover
> ------------------------------------------------------------------
>
>                 Key: FLINK-20290
>                 URL: https://issues.apache.org/jira/browse/FLINK-20290
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.12.0
>            Reporter: Andrey Zagrebin
>            Priority: Blocker
>             Fix For: 1.12.0
>
>         Attachments: collect-debug.png, log
>
>
> If FileSourceTextLinesITCase::testContinuousTextFileSource includes TM 
> restarts (after failing TM with TestingMiniCluster::terminateTaskExecutor, 
> see testContinuousTextFileSourceWithTaskManagerFailover in 
> [branch|https://github.com/azagrebin/flink/tree/FLINK-20118-it]) then 
> sometimes I observe duplicated lines in the output after running the whole 
> test suite FileSourceTextLinesITCase 5-10 times in IDE:
> {code:java}
> Test 
> testContinuousTextFileSourceWithTaskManagerFailover(org.apache.flink.connector.file.src.FileSourceTextLinesITCase)
>  failed with:
> java.lang.AssertionError: 
> Expected: ["And by opposing end them?--To die,--to sleep,--", "And 
> enterprises of great pith and moment,", "And lose the name of action.--Soft 
> you now!", "And makes us rather bear those ills we have", "And thus the 
> native hue of resolution", "Be all my sins remember'd.", "But that the dread 
> of something after death,--", "Devoutly to be wish'd. To die,--to sleep;--", 
> "For in that sleep of death what dreams may come,", "For who would bear the 
> whips and scorns of time,", "Is sicklied o'er with the pale cast of 
> thought;", "Must give us pause: there's the respect", "No more; and by a 
> sleep to say we end", "No traveller returns,--puzzles the will,", "Or to take 
> arms against a sea of troubles,", "Than fly to others that we know not of?", 
> "That flesh is heir to,--'tis a consummation", "That makes calamity of so 
> long life;", "That patient merit of the unworthy takes,", "The fair 
> Ophelia!--Nymph, in thy orisons", "The heartache, and the thousand natural 
> shocks", "The insolence of office, and the spurns", "The oppressor's wrong, 
> the proud man's contumely,", "The pangs of despis'd love, the law's delay,", 
> "The slings and arrows of outrageous fortune", "The undiscover'd country, 
> from whose bourn", "Thus conscience does make cowards of us all;", "To be, or 
> not to be,--that is the question:--", "To grunt and sweat under a weary 
> life,", "To sleep! perchance to dream:--ay, there's the rub;", "When he 
> himself might his quietus make", "When we have shuffled off this mortal 
> coil,", "Whether 'tis nobler in the mind to suffer", "With a bare bodkin? who 
> would these fardels bear,", "With this regard, their currents turn awry,"]
>      but: was ["And by opposing end them?--To die,--to sleep,--", "And 
> enterprises of great pith and moment,", "And lose the name of action.--Soft 
> you now!", "And makes us rather bear those ills we have", "And thus the 
> native hue of resolution", "Be all my sins remember'd.", "But that the dread 
> of something after death,--", "Devoutly to be wish'd. To die,--to sleep;--", 
> "Devoutly to be wish'd. To die,--to sleep;--", "For in that sleep of death 
> what dreams may come,", "For who would bear the whips and scorns of time,", 
> "Is sicklied o'er with the pale cast of thought;", "Must give us pause: 
> there's the respect", "No more; and by a sleep to say we end", "No more; and 
> by a sleep to say we end", "No traveller returns,--puzzles the will,", "Or to 
> take arms against a sea of troubles,", "Than fly to others that we know not 
> of?", "That flesh is heir to,--'tis a consummation", "That flesh is heir 
> to,--'tis a consummation", "That makes calamity of so long life;", "The fair 
> Ophelia!--Nymph, in thy orisons", "The heartache, and the thousand natural 
> shocks", "The heartache, and the thousand natural shocks", "The slings and 
> arrows of outrageous fortune", "The undiscover'd country, from whose bourn", 
> "Thus conscience does make cowards of us all;", "To be, or not to be,--that 
> is the question:--", "To grunt and sweat under a weary life,", "To sleep! 
> perchance to dream:--ay, there's the rub;", "To sleep! perchance to 
> dream:--ay, there's the rub;", "When we have shuffled off this mortal coil,", 
> "Whether 'tis nobler in the mind to suffer", "With a bare bodkin? who would 
> these fardels bear,", "With this regard, their currents turn awry,"]
>       at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>       at org.junit.Assert.assertThat(Assert.java:956)
>       at org.junit.Assert.assertThat(Assert.java:923)
>       at 
> org.apache.flink.connector.file.src.FileSourceTextLinesITCase.verifyResult(FileSourceTextLinesITCase.java:198)
>       at 
> org.apache.flink.connector.file.src.FileSourceTextLinesITCase.testContinuousTextFileSource(FileSourceTextLinesITCase.java:151)
>       at 
> org.apache.flink.connector.file.src.FileSourceTextLinesITCase.testContinuousTextFileSourceWithTaskManagerFailover(FileSourceTextLinesITCase.java:109)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>       at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>       at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>       at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>       at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>       at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>       at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>       at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>       at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>       at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>       at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>       at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>       at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>       at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>       at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>       at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>       at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
>       at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
>       at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to