[
https://issues.apache.org/jira/browse/FLINK-20290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17237418#comment-17237418
]
Andrey Zagrebin edited comment on FLINK-20290 at 11/23/20, 2:59 PM:
--------------------------------------------------------------------
I think that the attached log file probably confirms the [~TsReaper]'s
observation about duplicated output from source. The attached log file shows
that the duplicates come from 'nested1/text.1' file (ID=0000000089) and it is
assigned and finished reading two times before the TM failure:
{code:java}
4063 [SourceCoordinator-Source: file-source] INFO
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner -
Assigning remote split to requesting host 'localhost':
Optional[FileSourceSplit: file:/var/folders/67/
v4yp_42d21j6_n8k1h556h0c0000gn/T/junit7737023041992368052/junit2962518782319371379/nested1/text.1
[0, 225) (no host info) ID=0000000089 position=null]
4065 [Source: file-source (2/4)#0] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase - Adding
split(s) to reader: [FileSourceSplit:
file:/var/folders/67/v4yp_42d21j6_n8k1h556h0c0000gn/T/junit7737023041992368052/
junit2962518782319371379/nested1/text.1 [0, 225) (no host info)
ID=0000000089 position=null]
4065 [jobmanager-future-thread-5] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 25 for job c2a549b2a4e01f6ab22149b688200de6 (1832 bytes in 13 ms).
4065 [Source Data Fetcher for Source: file-source (2/4)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Starting
split fetcher 0
4065 [SourceCoordinator-Source: file-source] INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking
checkpoint 25 as completed for source Source: file-source.
4066 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 26 (type=CHECKPOINT) @ 1606134907136 for job
c2a549b2a4e01f6ab22149b688200de6.
4068 [Source Data Fetcher for Source: file-source (2/4)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished
reading from splits [0000000089]
4068 [Source: file-source (2/4)#0] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase - Finished
reading split(s) [0000000089]
{code}
and after the TM failure:
{code:java}
5149 [SourceCoordinator-Source: file-source] INFO
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner -
Assigning remote split to requesting host 'localhost':
Optional[FileSourceSplit: file:/var/folders/67/
v4yp_42d21j6_n8k1h556h0c0000gn/T/junit7737023041992368052/junit2962518782319371379/nested1/text.1
[0, 225) (no host info) ID=0000000089 position=null]
5149 [Source Data Fetcher for Source: file-source (4/4)#1] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Split
fetcher 0 exited.
5150 [SourceCoordinator-Source: file-source] INFO
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner -
Assigning remote split to requesting host 'localhost':
Optional[FileSourceSplit: file:/var/folders/67/
v4yp_42d21j6_n8k1h556h0c0000gn/T/junit7737023041992368052/junit2962518782319371379/text.1
[0, 219) (no host info) ID=0000000095 position=null]
5151 [Source: file-source (2/4)#1] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase - Adding
split(s) to reader: [FileSourceSplit:
file:/var/folders/67/v4yp_42d21j6_n8k1h556h0c0000gn/T/junit7737023041992368052/
junit2962518782319371379/nested1/text.1 [0, 225) (no host info)
ID=0000000089 position=null]
5152 [Source Data Fetcher for Source: file-source (2/4)#1] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Starting
split fetcher 1
5152 [Source Data Fetcher for Source: file-source (2/4)#1] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished
reading from splits [0000000089]
5153 [Source: file-source (2/4)#1] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase - Finished
reading split(s) [0000000089]
{code}
was (Author: azagrebin):
I think that the attached log file confirms the [~TsReaper]'s observation about
duplicated output from source. The attached log file shows that the duplicates
come from 'nested1/text.1' file (ID=0000000089) and it is assigned and finished
reading two times before the TM failure:
{code:java}
4063 [SourceCoordinator-Source: file-source] INFO
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner -
Assigning remote split to requesting host 'localhost':
Optional[FileSourceSplit: file:/var/folders/67/
v4yp_42d21j6_n8k1h556h0c0000gn/T/junit7737023041992368052/junit2962518782319371379/nested1/text.1
[0, 225) (no host info) ID=0000000089 position=null]
4065 [Source: file-source (2/4)#0] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase - Adding
split(s) to reader: [FileSourceSplit:
file:/var/folders/67/v4yp_42d21j6_n8k1h556h0c0000gn/T/junit7737023041992368052/
junit2962518782319371379/nested1/text.1 [0, 225) (no host info)
ID=0000000089 position=null]
4065 [jobmanager-future-thread-5] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 25 for job c2a549b2a4e01f6ab22149b688200de6 (1832 bytes in 13 ms).
4065 [Source Data Fetcher for Source: file-source (2/4)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Starting
split fetcher 0
4065 [SourceCoordinator-Source: file-source] INFO
org.apache.flink.runtime.source.coordinator.SourceCoordinator - Marking
checkpoint 25 as completed for source Source: file-source.
4066 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 26 (type=CHECKPOINT) @ 1606134907136 for job
c2a549b2a4e01f6ab22149b688200de6.
4068 [Source Data Fetcher for Source: file-source (2/4)#0] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished
reading from splits [0000000089]
4068 [Source: file-source (2/4)#0] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase - Finished
reading split(s) [0000000089]
{code}
and after the TM failure:
{code:java}
5149 [SourceCoordinator-Source: file-source] INFO
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner -
Assigning remote split to requesting host 'localhost':
Optional[FileSourceSplit: file:/var/folders/67/
v4yp_42d21j6_n8k1h556h0c0000gn/T/junit7737023041992368052/junit2962518782319371379/nested1/text.1
[0, 225) (no host info) ID=0000000089 position=null]
5149 [Source Data Fetcher for Source: file-source (4/4)#1] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Split
fetcher 0 exited.
5150 [SourceCoordinator-Source: file-source] INFO
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner -
Assigning remote split to requesting host 'localhost':
Optional[FileSourceSplit: file:/var/folders/67/
v4yp_42d21j6_n8k1h556h0c0000gn/T/junit7737023041992368052/junit2962518782319371379/text.1
[0, 219) (no host info) ID=0000000095 position=null]
5151 [Source: file-source (2/4)#1] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase - Adding
split(s) to reader: [FileSourceSplit:
file:/var/folders/67/v4yp_42d21j6_n8k1h556h0c0000gn/T/junit7737023041992368052/
junit2962518782319371379/nested1/text.1 [0, 225) (no host info)
ID=0000000089 position=null]
5152 [Source Data Fetcher for Source: file-source (2/4)#1] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Starting
split fetcher 1
5152 [Source Data Fetcher for Source: file-source (2/4)#1] INFO
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher - Finished
reading from splits [0000000089]
5153 [Source: file-source (2/4)#1] INFO
org.apache.flink.connector.base.source.reader.SourceReaderBase - Finished
reading split(s) [0000000089]
{code}
> Duplicated output in FileSource continuous ITCase with TM failover
> ------------------------------------------------------------------
>
> Key: FLINK-20290
> URL: https://issues.apache.org/jira/browse/FLINK-20290
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem
> Affects Versions: 1.12.0
> Reporter: Andrey Zagrebin
> Priority: Blocker
> Fix For: 1.12.0
>
> Attachments: collect-debug-jm.png, collect-debug.png, log
>
>
> If FileSourceTextLinesITCase::testContinuousTextFileSource includes TM
> restarts (after failing TM with TestingMiniCluster::terminateTaskExecutor,
> see testContinuousTextFileSourceWithTaskManagerFailover in
> [branch|https://github.com/azagrebin/flink/tree/FLINK-20118-it]) then
> sometimes I observe duplicated lines in the output after running the whole
> test suite FileSourceTextLinesITCase 5-10 times in IDE:
> {code:java}
> Test
> testContinuousTextFileSourceWithTaskManagerFailover(org.apache.flink.connector.file.src.FileSourceTextLinesITCase)
> failed with:
> java.lang.AssertionError:
> Expected: ["And by opposing end them?--To die,--to sleep,--", "And
> enterprises of great pith and moment,", "And lose the name of action.--Soft
> you now!", "And makes us rather bear those ills we have", "And thus the
> native hue of resolution", "Be all my sins remember'd.", "But that the dread
> of something after death,--", "Devoutly to be wish'd. To die,--to sleep;--",
> "For in that sleep of death what dreams may come,", "For who would bear the
> whips and scorns of time,", "Is sicklied o'er with the pale cast of
> thought;", "Must give us pause: there's the respect", "No more; and by a
> sleep to say we end", "No traveller returns,--puzzles the will,", "Or to take
> arms against a sea of troubles,", "Than fly to others that we know not of?",
> "That flesh is heir to,--'tis a consummation", "That makes calamity of so
> long life;", "That patient merit of the unworthy takes,", "The fair
> Ophelia!--Nymph, in thy orisons", "The heartache, and the thousand natural
> shocks", "The insolence of office, and the spurns", "The oppressor's wrong,
> the proud man's contumely,", "The pangs of despis'd love, the law's delay,",
> "The slings and arrows of outrageous fortune", "The undiscover'd country,
> from whose bourn", "Thus conscience does make cowards of us all;", "To be, or
> not to be,--that is the question:--", "To grunt and sweat under a weary
> life,", "To sleep! perchance to dream:--ay, there's the rub;", "When he
> himself might his quietus make", "When we have shuffled off this mortal
> coil,", "Whether 'tis nobler in the mind to suffer", "With a bare bodkin? who
> would these fardels bear,", "With this regard, their currents turn awry,"]
> but: was ["And by opposing end them?--To die,--to sleep,--", "And
> enterprises of great pith and moment,", "And lose the name of action.--Soft
> you now!", "And makes us rather bear those ills we have", "And thus the
> native hue of resolution", "Be all my sins remember'd.", "But that the dread
> of something after death,--", "Devoutly to be wish'd. To die,--to sleep;--",
> "Devoutly to be wish'd. To die,--to sleep;--", "For in that sleep of death
> what dreams may come,", "For who would bear the whips and scorns of time,",
> "Is sicklied o'er with the pale cast of thought;", "Must give us pause:
> there's the respect", "No more; and by a sleep to say we end", "No more; and
> by a sleep to say we end", "No traveller returns,--puzzles the will,", "Or to
> take arms against a sea of troubles,", "Than fly to others that we know not
> of?", "That flesh is heir to,--'tis a consummation", "That flesh is heir
> to,--'tis a consummation", "That makes calamity of so long life;", "The fair
> Ophelia!--Nymph, in thy orisons", "The heartache, and the thousand natural
> shocks", "The heartache, and the thousand natural shocks", "The slings and
> arrows of outrageous fortune", "The undiscover'd country, from whose bourn",
> "Thus conscience does make cowards of us all;", "To be, or not to be,--that
> is the question:--", "To grunt and sweat under a weary life,", "To sleep!
> perchance to dream:--ay, there's the rub;", "To sleep! perchance to
> dream:--ay, there's the rub;", "When we have shuffled off this mortal coil,",
> "Whether 'tis nobler in the mind to suffer", "With a bare bodkin? who would
> these fardels bear,", "With this regard, their currents turn awry,"]
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
> at org.junit.Assert.assertThat(Assert.java:956)
> at org.junit.Assert.assertThat(Assert.java:923)
> at
> org.apache.flink.connector.file.src.FileSourceTextLinesITCase.verifyResult(FileSourceTextLinesITCase.java:198)
> at
> org.apache.flink.connector.file.src.FileSourceTextLinesITCase.testContinuousTextFileSource(FileSourceTextLinesITCase.java:151)
> at
> org.apache.flink.connector.file.src.FileSourceTextLinesITCase.testContinuousTextFileSourceWithTaskManagerFailover(FileSourceTextLinesITCase.java:109)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
> at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
> at org.junit.rules.RunRules.evaluate(RunRules.java:20)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
> at
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
> at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
> at
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)