Repository: flume
Updated Branches:
  refs/heads/trunk 84b6263cb -> 7419f05ee


FLUME-2989 added 2 KafkaChannel metrics

KafkaChannel was missing some metrics:
  eventTakeAttemptCount, eventPutAttemptCount

This PR is based on the patch included in the issue that was the work
of Umesh Chaudhary.
I reworked the test a bit to use Mockito, and made some other minor
modifications to the test.

This closes #244

Reviewers: Peter Turcsanyi, Ferenc Szabo

(Endre Major via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/7419f05e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/7419f05e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/7419f05e

Branch: refs/heads/trunk
Commit: 7419f05ee19cee63b9a3faf876a139bd56894634
Parents: 84b6263
Author: Endre Major <[email protected]>
Authored: Fri Nov 23 14:25:33 2018 +0100
Committer: Ferenc Szabo <[email protected]>
Committed: Fri Nov 23 14:25:33 2018 +0100

----------------------------------------------------------------------
 flume-ng-channels/flume-kafka-channel/pom.xml   |  5 +++
 .../flume/channel/kafka/KafkaChannel.java       |  2 ++
 .../channel/kafka/TestBasicFunctionality.java   | 35 ++++++++++++++++++++
 3 files changed, 42 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/7419f05e/flume-ng-channels/flume-kafka-channel/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/pom.xml 
b/flume-ng-channels/flume-kafka-channel/pom.xml
index f9211b2..926237d 100644
--- a/flume-ng-channels/flume-kafka-channel/pom.xml
+++ b/flume-ng-channels/flume-kafka-channel/pom.xml
@@ -66,6 +66,11 @@ limitations under the License.
       <classifier>tests</classifier>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
 
   </dependencies>
 

http://git-wip-us.apache.org/repos/asf/flume/blob/7419f05e/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 694cf3f..40494d4 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -454,6 +454,7 @@ public class KafkaChannel extends BasicChannelSemantics {
               new ProducerRecord<String, byte[]>(topic.get(), key,
                                                  serializeValue(event, 
parseAsFlumeEvent)));
         }
+        counter.incrementEventPutAttemptCount();
       } catch (NumberFormatException e) {
         throw new ChannelException("Non integer partition id specified", e);
       } catch (Exception e) {
@@ -518,6 +519,7 @@ public class KafkaChannel extends BasicChannelSemantics {
           } else {
             return null;
           }
+          counter.incrementEventTakeAttemptCount();
         } catch (Exception ex) {
           logger.warn("Error while getting events from Kafka. This is usually 
caused by " +
                       "trying to read a non-flume event. Ensure the setting 
for " +

http://git-wip-us.apache.org/repos/asf/flume/blob/7419f05e/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
 
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
index 4ff0ee6..d119b42 100644
--- 
a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
+++ 
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
@@ -20,13 +20,16 @@ package org.apache.flume.channel.kafka;
 
 import org.apache.flume.Context;
 import org.apache.flume.Event;
+import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurables;
+import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -210,4 +213,36 @@ public class TestBasicFunctionality extends 
TestKafkaChannelBase {
     underlying.shutdownNow();
     verify(eventsPulled);
   }
+
+  @Test
+  public void testMetricsCount() throws Exception {
+    final KafkaChannel channel = startChannel(true);
+    ExecutorService underlying = Executors.newCachedThreadPool();
+    ExecutorCompletionService<Void> submitterSvc = new 
ExecutorCompletionService<Void>(underlying);
+    final List<List<Event>> events = createBaseList();
+    putEvents(channel, events, submitterSvc);
+    takeEventsWithCommittingTxn(channel,50);
+
+    KafkaChannelCounter counter =
+            (KafkaChannelCounter) Whitebox.getInternalState(channel, 
"counter");
+    Assert.assertEquals(50, counter.getEventPutAttemptCount());
+    Assert.assertEquals(50, counter.getEventPutSuccessCount());
+    Assert.assertEquals(50, counter.getEventTakeAttemptCount());
+    Assert.assertEquals(50, counter.getEventTakeSuccessCount());
+    channel.stop();
+  }
+
+  private void takeEventsWithCommittingTxn(KafkaChannel channel, long 
eventsCount) {
+    List<Event> takeEventsList = new ArrayList<>();
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    while (takeEventsList.size() < eventsCount) {
+      Event event = channel.take();
+      if (event != null) {
+        takeEventsList.add(event);
+      }
+    }
+    txn.commit();
+    txn.close();
+  }
 }

Reply via email to