Umesh Chaudhary created FLUME-3203:
--------------------------------------

             Summary: Spooling dir source leaks records from a file when a 
corresponding .COMPLETED file already present
                 Key: FLUME-3203
                 URL: https://issues.apache.org/jira/browse/FLUME-3203
             Project: Flume
          Issue Type: Bug
          Components: Sinks+Sources
    Affects Versions: 1.6.0
            Reporter: Umesh Chaudhary


Here is the steps to reproduce: 

1) Use below config in the flume agent: 


{code:java}
tier1.sources  = source1
tier1.channels = channel1
tier1.sinks    = sink1

tier1.channels.channel1.type   = memory
tier1.channels.channel1.capacity = 1000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.channel      = channel1
tier1.sources.source1.channels = channel1

tier1.sources.source1.type     = spooldir
tier1.sources.source1.spoolDir = /root/testSpoolDir
tier1.sources.source1.fileHeader = true

tier1.sinks.sink1.type         = hdfs
tier1.sinks.sink1.hdfs.path =/tmp/spoolEvnts
tier1.sinks.sink1.hdfs.filePrefix = events-
{code}

2) When the agent is started with the above config, use below command to move a 
sample text file in spooling dir:


{code:java}
mv Sample-text-file-50kb.txt /home/systest/spoolDir
{code}

agent will start processing the events and output can be seen in HDFS dir: 

{code:java}
$ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l
37
{code}

3) Again move same file into spooling dir using below command: 

{code:java}
mv /tmp/Sample-text-file-50kb.txt /home/systest/spoolDir
{code}

This time flume will raise an exception as below but continue processing the 
file again: 



{noformat}
2017-12-21 00:00:27,581 INFO 
org.apache.flume.client.avro.ReliableSpoolingFileEventReader: Preparing to move 
file /home/systest/spoolDir/Sample-text-file-50kb.txt to 
/home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED
2017-12-21 00:00:27,582 ERROR org.apache.flume.source.SpoolDirectorySource: 
FATAL: Spool Directory source source1: { spoolDir: /home/systest/spoolDir }: 
Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume 
to continue processing.
java.lang.IllegalStateException: File name has been re-used with different 
files. Spooling assumptions violated for 
/home/systest/spoolDir/Sample-text-file-50kb.txt.COMPLETED
        at 
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:463)
        at 
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.retireCurrentFile(ReliableSpoolingFileEventReader.java:414)
        at 
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:326)
        at 
org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2017-12-21 00:00:31,265 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing 
/tmp/spoolEvnts/events-.1513843202836.tmp
2017-12-21 00:00:31,275 INFO org.apache.flume.sink.hdfs.BucketWriter: Renaming 
/tmp/spoolEvnts/events-.1513843202836.tmp to 
/tmp/spoolEvnts/events-.1513843202836
2017-12-21 00:00:31,293 INFO org.apache.flume.sink.hdfs.BucketWriter: Creating 
/tmp/spoolEvnts/events-.1513843202837.tmp
2017-12-21 00:00:31,321 INFO org.apache.flume.sink.hdfs.BucketWriter: Closing 
/tmp/spoolEvnts/events-.1513843202837.tmp
{noformat}


And if we check at HDFS it shows the below file count :

{code:java}
$ hdfs dfs -ls /tmp/spoolEvnts | uniq | wc -l
72
{code}

Based on [the 
doc|https://flume.apache.org/FlumeUserGuide.html#spooling-directory-source] it 
should not process the same file which has same name with .COMPLETED suffix. It 
causes duplicate records on sink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to