[ https://issues.apache.org/jira/browse/FLUME-2318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lior Zeno updated FLUME-2318: ----------------------------- Fix Version/s: (was: v1.4.0) v1.7.0 > SpoolingDirectory is unable to handle empty files > ------------------------------------------------- > > Key: FLUME-2318 > URL: https://issues.apache.org/jira/browse/FLUME-2318 > Project: Flume > Issue Type: Bug > Components: Sinks+Sources > Affects Versions: v1.4.0 > Reporter: Muhammad Ehsan ul Haque > Priority: Minor > Labels: easytest, patch > Fix For: v1.7.0 > > Attachments: FLUME-2318-0.patch, FLUME-2318-1.patch, > FLUME-2318-2.patch > > > Empty files should be returned as an empty event instead of no event. > h4. Scenario > From the start consume files in this order > # f1: File with data or empty file > # f2: Empty File > # No file in spooling directory > h4. Expected Outcome > # channel.take() should return event with f1 data. > # channel.take() should return event with f2 data (empty data). > # channel.take() should return null. > h4. What happens > # channel.take() returns event with f1 data. > # channel.take() returns null. > # Exception is raised when the SpoolDirectorySource thread tries to read > events from the ReliableSpoolingFileEventReader. Snippet of trace is > 2014-02-09 15:46:35,832 (pool-1-thread-1) [INFO - > org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)] > Preparing to move file /tmp/1391957195572-0/file1 to > /tmp/1391957195572-0/file1.COMPLETED > 2014-02-09 15:46:36,334 (pool-1-thread-1) [INFO - > org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:228)] > Last read was never committed - resetting mark position. > 2014-02-09 15:46:36,335 (pool-1-thread-1) [INFO - > org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile(ReliableSpoolingFileEventReader.java:346)] > Preparing to move file /tmp/1391957195572-0/file2 to > /tmp/1391957195572-0/file2.COMPLETED > 2014-02-09 15:46:36,839 (pool-1-thread-1) [ERROR - > org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:252)] > FATAL: Spool Directory source null: { spoolDir: /tmp/1391957195572-0 }: > Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure > Flume to continue processing. > java.lang.IllegalStateException: File should not roll when commit is > outstanding. > at > org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:225) > at > org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:224) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at > java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:722) > h4. Unit Test > In TestSpoolDirectorySource > {code} > @Test > public void testWithEmptyFile2() > throws InterruptedException, IOException { > Context context = new Context(); > File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); > Files.write("some data".getBytes(), f1); > File f2 = new File(tmpDir.getAbsolutePath() + "/file2"); > Files.write(new byte[0], f2); > context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, > tmpDir.getAbsolutePath()); > Configurables.configure(source, context); > source.start(); > Thread.sleep(10); > for (int i=0; i<2; i++) { > Transaction txn = channel.getTransaction(); > txn.begin(); > Event e = channel.take(); > txn.commit(); > txn.close(); > } > Transaction txn = channel.getTransaction(); > txn.begin(); > Assert.assertNull(channel.take()); > txn.commit(); > txn.close(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)