Repository: flume Updated Branches: refs/heads/trunk 84b6263cb -> 7419f05ee
FLUME-2989 added 2 KafkaChannel metrics KafkaChannel was missing some metrics: eventTakeAttemptCount, eventPutAttemptCount This PR is based on the patch included in the issue that was the work of Umesh Chaudhary. I reworked the test a bit to use Mockito, and made some other minor modifications to the test. This closes #244 Reviewers: Peter Turcsanyi, Ferenc Szabo (Endre Major via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7419f05e Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7419f05e Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7419f05e Branch: refs/heads/trunk Commit: 7419f05ee19cee63b9a3faf876a139bd56894634 Parents: 84b6263 Author: Endre Major <[email protected]> Authored: Fri Nov 23 14:25:33 2018 +0100 Committer: Ferenc Szabo <[email protected]> Committed: Fri Nov 23 14:25:33 2018 +0100 ---------------------------------------------------------------------- flume-ng-channels/flume-kafka-channel/pom.xml | 5 +++ .../flume/channel/kafka/KafkaChannel.java | 2 ++ .../channel/kafka/TestBasicFunctionality.java | 35 ++++++++++++++++++++ 3 files changed, 42 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/7419f05e/flume-ng-channels/flume-kafka-channel/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml b/flume-ng-channels/flume-kafka-channel/pom.xml index f9211b2..926237d 100644 --- a/flume-ng-channels/flume-kafka-channel/pom.xml +++ b/flume-ng-channels/flume-kafka-channel/pom.xml @@ -66,6 +66,11 @@ limitations under the License. <classifier>tests</classifier> <scope>test</scope> </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/flume/blob/7419f05e/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java index 694cf3f..40494d4 100644 --- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java +++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java @@ -454,6 +454,7 @@ public class KafkaChannel extends BasicChannelSemantics { new ProducerRecord<String, byte[]>(topic.get(), key, serializeValue(event, parseAsFlumeEvent))); } + counter.incrementEventPutAttemptCount(); } catch (NumberFormatException e) { throw new ChannelException("Non integer partition id specified", e); } catch (Exception e) { @@ -518,6 +519,7 @@ public class KafkaChannel extends BasicChannelSemantics { } else { return null; } + counter.incrementEventTakeAttemptCount(); } catch (Exception ex) { logger.warn("Error while getting events from Kafka. This is usually caused by " + "trying to read a non-flume event. Ensure the setting for " + http://git-wip-us.apache.org/repos/asf/flume/blob/7419f05e/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java index 4ff0ee6..d119b42 100644 --- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java +++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java @@ -20,13 +20,16 @@ package org.apache.flume.channel.kafka; import org.apache.flume.Context; import org.apache.flume.Event; +import org.apache.flume.Transaction; import org.apache.flume.conf.Configurables; +import org.apache.flume.instrumentation.kafka.KafkaChannelCounter; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.Assert; import org.junit.Test; +import org.mockito.internal.util.reflection.Whitebox; import java.util.ArrayList; import java.util.List; @@ -210,4 +213,36 @@ public class TestBasicFunctionality extends TestKafkaChannelBase { underlying.shutdownNow(); verify(eventsPulled); } + + @Test + public void testMetricsCount() throws Exception { + final KafkaChannel channel = startChannel(true); + ExecutorService underlying = Executors.newCachedThreadPool(); + ExecutorCompletionService<Void> submitterSvc = new ExecutorCompletionService<Void>(underlying); + final List<List<Event>> events = createBaseList(); + putEvents(channel, events, submitterSvc); + takeEventsWithCommittingTxn(channel,50); + + KafkaChannelCounter counter = + (KafkaChannelCounter) Whitebox.getInternalState(channel, "counter"); + Assert.assertEquals(50, counter.getEventPutAttemptCount()); + Assert.assertEquals(50, counter.getEventPutSuccessCount()); + Assert.assertEquals(50, counter.getEventTakeAttemptCount()); + Assert.assertEquals(50, counter.getEventTakeSuccessCount()); + channel.stop(); + } + + private void takeEventsWithCommittingTxn(KafkaChannel channel, long eventsCount) { + List<Event> takeEventsList = new ArrayList<>(); + Transaction txn = channel.getTransaction(); + txn.begin(); + while (takeEventsList.size() < eventsCount) { + Event event = channel.take(); + if (event != null) { + takeEventsList.add(event); + } + } + txn.commit(); + txn.close(); + } }
