[ https://issues.apache.org/jira/browse/FLUME-3203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Umesh Chaudhary updated FLUME-3203: ----------------------------------- Description: Here are the steps to reproduce: 1) Use below config in the flume agent: {code:java} tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 1000 tier1.channels.channel1.transactionCapacity = 1000 tier1.sinks.sink1.channel = channel1 tier1.sources.source1.channels = channel1 tier1.sources.source1.type = spooldir tier1.sources.source1.spoolDir = /root/testSpoolDir tier1.sources.source1.fileHeader = true tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.hdfs.path =/tmp/spoolEvnts tier1.sinks.sink1.hdfs.filePrefix = events- {code} 2) When the agent is started with the above config, use below command to move a sample text file in spooling dir: {code:java} mv Sample-text-file-50kb.txt /home/systest/spoolDir {code} agent will start processing the events and output can be seen in HDFS dir: {code:java} $ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l 37 {code} 3) Again move same file into spooling dir using below command: {code:java} mv /tmp/Sample-text-file-50kb.txt /home/systest/spoolDir {code} This time flume will raise an exception as below but continue processing the file again: {noformat} 2017-12-21 00:00:27,581 INFO org.apache.flume.client.avro.ReliableSpoolingFileEventReader: Preparing to move file /home/systest/spoolDir/Sample-text-file-50kb.txt to /home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED 2017-12-21 00:00:27,582 ERROR org.apache.flume.source.SpoolDirectorySource: FATAL: Spool Directory source source1: { spoolDir: /home/systest/spoolDir }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing. java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:463) at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:414) at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:326) at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2017-12-21 00:00:31,265 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing /tmp/spoolEvnts/events-.1513843202836.tmp 2017-12-21 00:00:31,275 INFO org.apache.flume.sink.hdfs.BucketWriter: Renaming /tmp/spoolEvnts/events-.1513843202836.tmp to /tmp/spoolEvnts/events-.1513843202836 2017-12-21 00:00:31,293 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating /tmp/spoolEvnts/events-.1513843202837.tmp 2017-12-21 00:00:31,321 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing /tmp/spoolEvnts/events-.1513843202837.tmp {noformat} And if we check at HDFS it shows the below file count : {code:java} $ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l 72 {code} Based on [the doc|https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source] it should not process the file which has same name with .COMPLETED suffix. It causes duplicate records on sink. was: Here are the steps to reproduce: 1) Use below config in the flume agent: {code:java} tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 tier1.channels.channel1.type = memory tier1.channels.channel1.capacity = 1000 tier1.channels.channel1.transactionCapacity = 1000 tier1.sinks.sink1.channel = channel1 tier1.sources.source1.channels = channel1 tier1.sources.source1.type = spooldir tier1.sources.source1.spoolDir = /root/testSpoolDir tier1.sources.source1.fileHeader = true tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.hdfs.path =/tmp/spoolEvnts tier1.sinks.sink1.hdfs.filePrefix = events- {code} 2) When the agent is started with the above config, use below command to move a sample text file in spooling dir: {code:java} mv Sample-text-file-50kb.txt /home/systest/spoolDir {code} agent will start processing the events and output can be seen in HDFS dir: {code:java} $ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l 37 {code} 3) Again move same file into spooling dir using below command: {code:java} mv /tmp/Sample-text-file-50kb.txt /home/systest/spoolDir {code} This time flume will raise an exception as below but continue processing the file again: {noformat} 2017-12-21 00:00:27,581 INFO org.apache.flume.client.avro.ReliableSpoolingFileEventReader: Preparing to move file /home/systest/spoolDir/Sample-text-file-50kb.txt to /home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED 2017-12-21 00:00:27,582 ERROR org.apache.flume.source.SpoolDirectorySource: FATAL: Spool Directory source source1: { spoolDir: /home/systest/spoolDir }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing. java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:463) at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:414) at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:326) at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2017-12-21 00:00:31,265 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing /tmp/spoolEvnts/events-.1513843202836.tmp 2017-12-21 00:00:31,275 INFO org.apache.flume.sink.hdfs.BucketWriter: Renaming /tmp/spoolEvnts/events-.1513843202836.tmp to /tmp/spoolEvnts/events-.1513843202836 2017-12-21 00:00:31,293 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating /tmp/spoolEvnts/events-.1513843202837.tmp 2017-12-21 00:00:31,321 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing /tmp/spoolEvnts/events-.1513843202837.tmp {noformat} And if we check at HDFS it shows the below file count : {code:java} $ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l 72 {code} Based on [the doc|https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source] it should not process the same file which has same name with .COMPLETED suffix. It causes duplicate records on sink. > Spooling dir source leaks records from a file when a corresponding .COMPLETED > file already present > -------------------------------------------------------------------------------------------------- > > Key: FLUME-3203 > URL: https://issues.apache.org/jira/browse/FLUME-3203 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources > Affects Versions: 1.6.0 > Reporter: Umesh Chaudhary > > Here are the steps to reproduce: > 1) Use below config in the flume agent: > {code:java} > tier1.sources = source1 > tier1.channels = channel1 > tier1.sinks = sink1 > tier1.channels.channel1.type = memory > tier1.channels.channel1.capacity = 1000 > tier1.channels.channel1.transactionCapacity = 1000 > tier1.sinks.sink1.channel = channel1 > tier1.sources.source1.channels = channel1 > tier1.sources.source1.type = spooldir > tier1.sources.source1.spoolDir = /root/testSpoolDir > tier1.sources.source1.fileHeader = true > tier1.sinks.sink1.type = hdfs > tier1.sinks.sink1.hdfs.path =/tmp/spoolEvnts > tier1.sinks.sink1.hdfs.filePrefix = events- > {code} > 2) When the agent is started with the above config, use below command to move > a sample text file in spooling dir: > {code:java} > mv Sample-text-file-50kb.txt /home/systest/spoolDir > {code} > agent will start processing the events and output can be seen in HDFS dir: > {code:java} > $ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l > 37 > {code} > 3) Again move same file into spooling dir using below command: > {code:java} > mv /tmp/Sample-text-file-50kb.txt /home/systest/spoolDir > {code} > This time flume will raise an exception as below but continue processing the > file again: > {noformat} > 2017-12-21 00:00:27,581 INFO > org.apache.flume.client.avro.ReliableSpoolingFileEventReader: Preparing to > move file /home/systest/spoolDir/Sample-text-file-50kb.txt to > /home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED > 2017-12-21 00:00:27,582 ERROR org.apache.flume.source.SpoolDirectorySource: > FATAL: Spool Directory source source1: { spoolDir: /home/systest/spoolDir }: > Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure > Flume to continue processing. > java.lang.IllegalStateException: File name has been re-used with different > files. Spooling assumptions violated for > /home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED > at > org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:463) > at > org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:414) > at > org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:326) > at > org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2017-12-21 00:00:31,265 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing > /tmp/spoolEvnts/events-.1513843202836.tmp > 2017-12-21 00:00:31,275 INFO org.apache.flume.sink.hdfs.BucketWriter: > Renaming /tmp/spoolEvnts/events-.1513843202836.tmp to > /tmp/spoolEvnts/events-.1513843202836 > 2017-12-21 00:00:31,293 INFO org.apache.flume.sink.hdfs.BucketWriter: > Creating /tmp/spoolEvnts/events-.1513843202837.tmp > 2017-12-21 00:00:31,321 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing > /tmp/spoolEvnts/events-.1513843202837.tmp > {noformat} > And if we check at HDFS it shows the below file count : > {code:java} > $ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l > 72 > {code} > Based on [the > doc|https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source] > it should not process the file which has same name with .COMPLETED suffix. It > causes duplicate records on sink. -- This message was sent by Atlassian JIRA (v6.4.14#64029)