Repository: flume Updated Branches: refs/heads/trunk 719afe908 -> a76e2e9f2
FLUME-3237: Handling RuntimeExceptions coming from the JMS provider in JMSSource Handling RuntimeExceptions in the same way as JMSExceptions in order to trigger the reconnecting mechanism in JMSSource. This closes #210 Reviewers: Endre Major, Ferenc Szabo (Peter Turcsanyi via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/a76e2e9f Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/a76e2e9f Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/a76e2e9f Branch: refs/heads/trunk Commit: a76e2e9f2cde18f3b7548b991535a3151425de12 Parents: 719afe9 Author: Ferenc Szabo <[email protected]> Authored: Wed Jul 11 13:56:02 2018 +0200 Committer: Ferenc Szabo <[email protected]> Committed: Wed Jul 11 13:56:02 2018 +0200 ---------------------------------------------------------------------- .../flume/source/jms/JMSMessageConsumer.java | 30 ++++++++- .../source/jms/TestJMSMessageConsumer.java | 64 ++++++++++++++++++++ 2 files changed, 92 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/a76e2e9f/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java index 3b4da81..b0b1c08 100644 --- a/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java +++ b/flume-ng-sources/flume-jms-source/src/main/java/org/apache/flume/source/jms/JMSMessageConsumer.java @@ -129,12 +129,12 @@ class JMSMessageConsumer { List<Event> take() throws JMSException { List<Event> result = new ArrayList<Event>(batchSize); Message message; - message = messageConsumer.receive(pollTimeout); + message = receive(); if (message != null) { result.addAll(messageConverter.convert(message)); int max = batchSize - 1; for (int i = 0; i < max; i++) { - message = messageConsumer.receiveNoWait(); + message = receiveNoWait(); if (message == null) { break; } @@ -147,11 +147,35 @@ class JMSMessageConsumer { return result; } + private Message receive() throws JMSException { + try { + return messageConsumer.receive(pollTimeout); + } catch (RuntimeException runtimeException) { + JMSException jmsException = new JMSException("JMS provider has thrown runtime exception: " + + runtimeException.getMessage()); + jmsException.setLinkedException(runtimeException); + throw jmsException; + } + } + + private Message receiveNoWait() throws JMSException { + try { + return messageConsumer.receiveNoWait(); + } catch (RuntimeException runtimeException) { + JMSException jmsException = new JMSException("JMS provider has thrown runtime exception: " + + runtimeException.getMessage()); + jmsException.setLinkedException(runtimeException); + throw jmsException; + } + } + void commit() { try { session.commit(); } catch (JMSException jmsException) { logger.warn("JMS Exception processing commit", jmsException); + } catch (RuntimeException runtimeException) { + logger.warn("Runtime Exception processing commit", runtimeException); } } @@ -160,6 +184,8 @@ class JMSMessageConsumer { session.rollback(); } catch (JMSException jmsException) { logger.warn("JMS Exception processing rollback", jmsException); + } catch (RuntimeException runtimeException) { + logger.warn("Runtime Exception processing rollback", runtimeException); } } http://git-wip-us.apache.org/repos/asf/flume/blob/a76e2e9f/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java index 711525e..41262af 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java @@ -175,4 +175,68 @@ public class TestJMSMessageConsumer extends JMSMessageConsumerTestBase { verify(session, times(1)).createDurableSubscriber(topic, name, messageSelector, true); } + @Test(expected = JMSException.class) + public void testTakeFailsDueToJMSExceptionFromReceive() throws JMSException { + when(messageConsumer.receive(anyLong())).thenThrow(new JMSException("")); + consumer = create(); + + consumer.take(); + } + + @Test(expected = JMSException.class) + public void testTakeFailsDueToRuntimeExceptionFromReceive() throws JMSException { + when(messageConsumer.receive(anyLong())).thenThrow(new RuntimeException()); + consumer = create(); + + consumer.take(); + } + + @Test(expected = JMSException.class) + public void testTakeFailsDueToJMSExceptionFromReceiveNoWait() throws JMSException { + when(messageConsumer.receiveNoWait()).thenThrow(new JMSException("")); + consumer = create(); + + consumer.take(); + } + + @Test(expected = JMSException.class) + public void testTakeFailsDueToRuntimeExceptionFromReceiveNoWait() throws JMSException { + when(messageConsumer.receiveNoWait()).thenThrow(new RuntimeException()); + consumer = create(); + + consumer.take(); + } + + @Test + public void testCommitFailsDueToJMSException() throws JMSException { + doThrow(new JMSException("")).when(session).commit(); + consumer = create(); + + consumer.commit(); + } + + @Test + public void testCommitFailsDueToRuntimeException() throws JMSException { + doThrow(new RuntimeException()).when(session).commit(); + consumer = create(); + + consumer.commit(); + } + + @Test + public void testRollbackFailsDueToJMSException() throws JMSException { + doThrow(new JMSException("")).when(session).rollback(); + consumer = create(); + + consumer.rollback(); + } + + @Test + public void testRollbackFailsDueToRuntimeException() throws JMSException { + doThrow(new RuntimeException()).when(session).rollback(); + consumer = create(); + + consumer.rollback(); + } + }
