Ralph, There is no real bug. The issue is that even after the event is committed to the channel, the source has to send back the status to the previous hop. It is possible that the code reaches the stop() call before the success status is sent back, which is why the sleep would help. Since the primarySource.stop() might get called before the source sends back success, the previous hop ends up assuming that the primarySource never got the event, in which case it re-sends it.
Thanks, Hari -- Hari Shreedharan On Sunday, September 9, 2012 at 7:06 PM, Ralph Goers wrote: > If the code looks good wouldn't that imply a bug of some kind in the > FileChannel? Although I can add the sleep, the purpose of the test is to > simulate a failure and the failover to the secondary agent. Obviously in that > case there would be no sleep. > > Ralph > > On Sep 8, 2012, at 10:36 AM, Hari Shreedharan wrote: > > > Hi Ralph, > > > > The code looks good. I think this might be due to some timing issues, > > though I am not sure what. I'd suggest you add a Thead.sleep(2000) before > > this line: primarySource.stop();, so once your whole transaction is done, > > wait for the Avro sink/Rpc Client to get the Success message, so it will > > not send duplicates. > > > > Please let me know if that causes the failures to disappear. > > > > > > Thanks > > Hari > > > > -- > > Hari Shreedharan > > > > > > On Saturday, September 8, 2012 at 10:01 AM, Ralph Goers wrote: > > > > > Did you ever get a chance to look at this? I am still getting these > > > failures almost every time it runs. > > > > > > Ralph > > > > > > On Aug 31, 2012, at 10:35 AM, Hari Shreedharan wrote: > > > > > > > Thanks Ralph. Let me take a look at the code. > > > > > > > > -- > > > > Hari Shreedharan > > > > > > > > > > > > On Friday, August 31, 2012 at 9:38 AM, Ralph Goers wrote: > > > > > > > > > Which file? The files from the FileChannel, the source or …? If you > > > > > want the FileChannel stuff, unfortunately it only is failing on the > > > > > machine where Gump runs and I don't have a clue how to get access to > > > > > it or if it is even left around after the run. As I said, I've never > > > > > had this fail on the Mac(s), my Linux system or in Jenkins. I have no > > > > > idea what is peculiar about that system but I do know the tests take > > > > > about twice as long as they do on my Mac. > > > > > > > > > > If you want to look at the actual unit test source, it is at > > > > > https://svn.apache.org/repos/asf/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java > > > > > > > > > > Ralph > > > > > > > > > > > > > > > > > > > > On Aug 31, 2012, at 12:15 AM, Hari Shreedharan wrote: > > > > > > > > > > > It looks like the channel has already got the events before the > > > > > > source stops. Can you send me a link to the actual file, so I can > > > > > > take a look? > > > > > > > > > > > > > > > > > > Hari > > > > > > > > > > > > -- > > > > > > Hari Shreedharan > > > > > > > > > > > > > > > > > > On Thursday, August 30, 2012 at 9:44 AM, Ralph Goers wrote: > > > > > > > > > > > > > Thanks Hari, > > > > > > > > > > > > > > First, remember that the Flume agent is embedded in the Appender. > > > > > > > So the Log4j EventSource is passing the event to the FileChannel. > > > > > > > The Avro Sink then reads from the channel and sends it on. The > > > > > > > unit test has two Avro Sources listening with each associated > > > > > > > with its own MemoryChannel. The test logs 10 events then reads > > > > > > > the 10 events from the primary MemoryChannel, each within its own > > > > > > > transaction. The test then stops the primary source. Then it logs > > > > > > > 10 more events and tries to read them from the alternate > > > > > > > MemoryChannel. > > > > > > > > > > > > > > > > > > > > > The code to read from the channel looks like: > > > > > > > > > > > > > > for (int i = 0; i < 10; ++i) { > > > > > > > StructuredDataMessage msg = new StructuredDataMessage("Test", > > > > > > > "Test Primary " + i, "Test"); > > > > > > > EventLogger.logEvent(msg); > > > > > > > } > > > > > > > for (int i = 0; i < 10; ++i) { > > > > > > > Transaction transaction = primaryChannel.getTransaction(); > > > > > > > transaction.begin(); > > > > > > > > > > > > > > Event event = primaryChannel.take(); > > > > > > > Assert.assertNotNull(event); > > > > > > > String body = getBody(event); > > > > > > > String expected = "Test Primary " + i; > > > > > > > Assert.assertTrue("Channel contained event, but not expected > > > > > > > message. Received: " + body, > > > > > > > body.endsWith(expected)); > > > > > > > transaction.commit(); > > > > > > > transaction.close(); > > > > > > > } > > > > > > > > > > > > > > primarySource.stop(); > > > > > > > > > > > > > > > > > > > > > for (int i = 0; i < 10; ++i) { > > > > > > > StructuredDataMessage msg = new StructuredDataMessage("Test", > > > > > > > "Test Alternate " + i, "Test"); > > > > > > > EventLogger.logEvent(msg); > > > > > > > } > > > > > > > for (int i = 0; i < 10; ++i) { > > > > > > > Transaction transaction = alternateChannel.getTransaction(); > > > > > > > transaction.begin(); > > > > > > > > > > > > > > Event event = alternateChannel.take(); > > > > > > > Assert.assertNotNull(event); > > > > > > > String body = getBody(event); > > > > > > > String expected = "Test Alternate " + i; > > > > > > > /* When running in Gump Flume consistently returns the last event > > > > > > > from the primary channel after > > > > > > > the failover, which fails this test */ > > > > > > > Assert.assertTrue("Channel contained event, but not expected > > > > > > > message. Expected: " + expected + > > > > > > > " Received: " + body, body.endsWith(expected)); > > > > > > > transaction.commit(); > > > > > > > transaction.close(); > > > > > > > } > > > > > > > When I run this on my Mac it never fails. But Gump fails almost > > > > > > > every time returning "Channel contained event, but not expected > > > > > > > message. Expected: Test Alternate 0 Received: <128>1 > > > > > > > 2012-08-30T05:50:04.143Z vmgump MyApp - Test > > > > > > > [Test@18060][mdc@18060] Test Primary 9" > > > > > > > > > > > > > > Do we have any tests that are similar to this? I didn't see > > > > > > > anything that tests failover in this way but I might have missed > > > > > > > it. > > > > > > > > > > > > > > Ralph > > > > > > > > > > > > > > On Aug 30, 2012, at 9:22 AM, Hari Shreedharan wrote: > > > > > > > > > > > > > > > Hi Ralph, > > > > > > > > > > > > > > > > Sorry missed this message earlier. How are you simulating > > > > > > > > failover in your test - I did not look at your code. If the > > > > > > > > message was written by the Avro Source on the client and the > > > > > > > > Avro Sink on the other side simply did not get a success would > > > > > > > > cause the failover sink processor to retry the same message > > > > > > > > since it would be rolled back by the sink, and hence the > > > > > > > > channel will end up making it available for another sink. > > > > > > > > Generally, if a message is not ack-ed as being successfully > > > > > > > > written to the channel by the Avro Source, the sink will > > > > > > > > rollback the transaction - and throw an EventDeliveryException > > > > > > > > - and in case of Failover SinkProcessor, it will cause the next > > > > > > > > sink to pick it up. > > > > > > > > > > > > > > > > Also, note that Flume guarantees at least once semantics and > > > > > > > > weak ordering. If a failure happens, it is possible that there > > > > > > > > will be duplicates. > > > > > > > > > > > > > > > > And no, this is not related to any of the FileChannel issues we > > > > > > > > have been fixing. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Hari > > > > > > > > > > > > > > > > -- > > > > > > > > Hari Shreedharan > > > > > > > > > > > > > > > > > > > > > > > > On Thursday, August 30, 2012 at 7:50 AM, Ralph Goers wrote: > > > > > > > > > > > > > > > > > I'm going to try again. Does this problem sound familiar to > > > > > > > > > anyone? > > > > > > > > > > > > > > > > > > Ralph > > > > > > > > > > > > > > > > > > On Aug 27, 2012, at 3:36 PM, Ralph Goers wrote: > > > > > > > > > > > > > > > > > > > Does anyone have any thoughts on this? Is it possibly > > > > > > > > > > related to any of the issues already being fixed on the > > > > > > > > > > FileChannel? > > > > > > > > > > > > > > > > > > > > Ralph > > > > > > > > > > > > > > > > > > > > On Aug 26, 2012, at 4:05 PM, Ralph Goers wrote: > > > > > > > > > > > > > > > > > > > > > I have successfully embedded Flume into the Log4j 2 > > > > > > > > > > > Appender. However, I have a unit test that has Flume fail > > > > > > > > > > > over from one AvroSink to another. When this happens > > > > > > > > > > > under some circumstances I am getting the last message > > > > > > > > > > > successfully delivered to the first source as the first > > > > > > > > > > > message to the second source, which doesn't seem correct. > > > > > > > > > > > The unit test is > > > > > > > > > > > athttps://svn.apache.org/repos/asf/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java > > > > > > > > > > > > > > > > > > > > > > (http://svn.apache.org/repos/asf/logging/log4j/log4j2/trunk/flume-ng/src/test/java/org/apache/logging/log4j/flume/appender/FlumeEmbeddedAppenderTest.java). > > > > > > > > > > > The odd thing is that I cannot get this to fail on my > > > > > > > > > > > local machine - it only fails when Gump runs it, but it > > > > > > > > > > > fail fairly consistently. > > > > > > > > > > > > > > > > > > > > > > The unit test has the AppenderSource connect to a > > > > > > > > > > > FileChannel. Two AvroSinks are connected to the > > > > > > > > > > > FileChannel via the Failover processor. > > > > > > > > > > > > > > > > > > > > > > Is this a known behavior? > > > > > > > > > > > > > > > > > > > > > > Ralph
