Repository: flume
Updated Branches:
  refs/heads/flume-1.6 89e0e53ee -> bd7acb49e


FLUME-1501. Flume Scribe Source needs unit tests.

(Ashish Paliwal via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/bd7acb49
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/bd7acb49
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/bd7acb49

Branch: refs/heads/flume-1.6
Commit: bd7acb49e13de3a0c8b41e3717f3252f149cb68d
Parents: 89e0e53
Author: Hari Shreedharan <[email protected]>
Authored: Tue Sep 30 12:21:55 2014 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Tue Sep 30 12:22:30 2014 -0700

----------------------------------------------------------------------
 .../flume/source/scribe/TestScribeSource.java   | 140 +++++++++++++++++++
 1 file changed, 140 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/bd7acb49/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
 
b/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
new file mode 100644
index 0000000..9059eba
--- /dev/null
+++ 
b/flume-ng-sources/flume-scribe-source/src/test/java/org/apache/flume/source/scribe/TestScribeSource.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.source.scribe;
+
+import junit.framework.Assert;
+import org.apache.flume.*;
+import org.apache.flume.channel.ChannelProcessor;
+import org.apache.flume.channel.MemoryChannel;
+import org.apache.flume.channel.ReplicatingChannelSelector;
+import org.apache.flume.conf.Configurables;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public class TestScribeSource {
+
+  private static int port;
+  private static Channel memoryChannel;
+  private static ScribeSource scribeSource;
+
+  private static int findFreePort() throws IOException {
+    ServerSocket socket = new ServerSocket(0);
+    int port = socket.getLocalPort();
+    socket.close();
+    return port;
+  }
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    port = findFreePort();
+    Context context = new Context();
+    context.put("port", String.valueOf(port));
+
+    scribeSource = new ScribeSource();
+    scribeSource.setName("Scribe Source");
+
+    Configurables.configure(scribeSource, context);
+
+    memoryChannel = new MemoryChannel();
+    Configurables.configure(memoryChannel, context);
+
+    List<Channel> channels = new ArrayList<Channel>(1);
+    channels.add(memoryChannel);
+
+    ChannelSelector rcs = new ReplicatingChannelSelector();
+    rcs.setChannels(channels);
+
+    memoryChannel.start();
+
+    scribeSource.setChannelProcessor(new ChannelProcessor(rcs));
+    scribeSource.start();
+  }
+
+  @Test
+  public void testScribeMessage() throws Exception {
+    TTransport transport = new TFramedTransport(new TSocket("localhost", 
port));
+
+    TProtocol protocol = new TBinaryProtocol(transport);
+    Scribe.Client client = new Scribe.Client(protocol);
+    transport.open();
+    LogEntry logEntry = new LogEntry("INFO", "Sending info msg to scribe 
source");
+    List<LogEntry> logEntries = new ArrayList<LogEntry>(1);
+    logEntries.add(logEntry);
+    client.Log(logEntries);
+
+    // try to get it from Channels
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+    Event e = memoryChannel.take();
+    Assert.assertNotNull(e);
+    Assert.assertEquals("Sending info msg to scribe source", new 
String(e.getBody()));
+    tx.commit();
+    tx.close();
+  }
+
+  @Test
+  public void testScribeMultipleMessages() throws Exception {
+    TTransport transport = new TFramedTransport(new TSocket("localhost", 
port));
+
+    TProtocol protocol = new TBinaryProtocol(transport);
+    Scribe.Client client = new Scribe.Client(protocol);
+    transport.open();
+
+    List<LogEntry> logEntries = new ArrayList<LogEntry>(10);
+    for (int i = 0; i < 10; i++) {
+      LogEntry logEntry = new LogEntry("INFO", String.format("Sending info 
msg# %d to scribe source", i));
+      logEntries.add(logEntry);
+    }
+
+    client.Log(logEntries);
+
+    // try to get it from Channels
+    Transaction tx = memoryChannel.getTransaction();
+    tx.begin();
+
+    for (int i = 0; i < 10; i++) {
+      Event e = memoryChannel.take();
+      Assert.assertNotNull(e);
+      Assert.assertEquals(String.format("Sending info msg# %d to scribe 
source", i), new String(e.getBody()));
+    }
+    tx.commit();
+    tx.close();
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    memoryChannel.stop();
+    scribeSource.stop();
+  }
+
+}
\ No newline at end of file

Reply via email to