http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java index c59fdd4..3ad8282 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/http/TestHTTPSource.java @@ -22,7 +22,11 @@ import com.google.common.collect.Maps; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import junit.framework.Assert; -import org.apache.flume.*; +import org.apache.flume.Channel; +import org.apache.flume.ChannelSelector; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.Transaction; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; @@ -41,11 +45,22 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import javax.net.ssl.*; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.TrustManager; +import javax.net.ssl.X509TrustManager; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.lang.reflect.Type; -import java.net.*; +import java.net.HttpURLConnection; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.URL; +import java.net.UnknownHostException; import java.security.SecureRandom; import java.security.cert.CertificateException; import java.util.ArrayList; @@ -114,9 +129,10 @@ public class TestHTTPSource { sslContext.put(HTTPSourceConfigurationConstants.SSL_ENABLED, "true"); sslPort = findFreePort(); sslContext.put(HTTPSourceConfigurationConstants.CONFIG_PORT, - String.valueOf(sslPort)); + String.valueOf(sslPort)); sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE_PASSWORD, "password"); - sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE, "src/test/resources/jettykeystore"); + sslContext.put(HTTPSourceConfigurationConstants.SSL_KEYSTORE, + "src/test/resources/jettykeystore"); Configurables.configure(source, context); Configurables.configure(httpsSource, sslContext); @@ -180,7 +196,7 @@ public class TestHTTPSource { private void doTestForbidden(HttpRequestBase request) throws Exception { HttpResponse response = httpClient.execute(request); Assert.assertEquals(HttpServletResponse.SC_FORBIDDEN, - response.getStatusLine().getStatusCode()); + response.getStatusLine().getStatusCode()); } @Test @@ -286,10 +302,8 @@ public class TestHTTPSource { } - private ResultWrapper putWithEncoding(String encoding, int n) - throws Exception{ - Type listType = new TypeToken<List<JSONEvent>>() { - }.getType(); + private ResultWrapper putWithEncoding(String encoding, int n) throws Exception { + Type listType = new TypeToken<List<JSONEvent>>() {}.getType(); List<JSONEvent> events = Lists.newArrayList(); Random rand = new Random(); for (int i = 0; i < n; i++) { @@ -341,25 +355,25 @@ public class TestHTTPSource { String json = gson.toJson(events, listType); HttpsURLConnection httpsURLConnection = null; try { - TrustManager[] trustAllCerts = {new X509TrustManager() { - @Override - public void checkClientTrusted( - java.security.cert.X509Certificate[] x509Certificates, String s) - throws CertificateException { - // noop + TrustManager[] trustAllCerts = { + new X509TrustManager() { + @Override + public void checkClientTrusted(java.security.cert.X509Certificate[] x509Certificates, + String s) throws CertificateException { + // noop + } + + @Override + public void checkServerTrusted(java.security.cert.X509Certificate[] x509Certificates, + String s) throws CertificateException { + // noop + } + + public java.security.cert.X509Certificate[] getAcceptedIssuers() { + return null; + } } - - @Override - public void checkServerTrusted( - java.security.cert.X509Certificate[] x509Certificates, String s) - throws CertificateException { - // noop - } - - public java.security.cert.X509Certificate[] getAcceptedIssuers() { - return null; - } - }}; + }; SSLContext sc = null; javax.net.ssl.SSLSocketFactory factory = null; @@ -376,14 +390,13 @@ public class TestHTTPSource { }; sc.init(null, trustAllCerts, new SecureRandom()); - if(protocol != null) { + if (protocol != null) { factory = new DisabledProtocolsSocketFactory(sc.getSocketFactory(), protocol); } else { factory = sc.getSocketFactory(); } HttpsURLConnection.setDefaultSSLSocketFactory(factory); - HttpsURLConnection.setDefaultHostnameVerifier( - SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); + HttpsURLConnection.setDefaultHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); URL sslUrl = new URL("https://0.0.0.0:" + sslPort); httpsURLConnection = (HttpsURLConnection) sslUrl.openConnection(); httpsURLConnection.setDoInput(true); @@ -396,14 +409,14 @@ public class TestHTTPSource { Transaction transaction = channel.getTransaction(); transaction.begin(); - for(int i = 0; i < 10; i++) { + for (int i = 0; i < 10; i++) { Event e = channel.take(); Assert.assertNotNull(e); Assert.assertEquals(String.valueOf(i), e.getHeaders().get("MsgNum")); } - transaction.commit(); - transaction.close(); + transaction.commit(); + transaction.close(); } finally { httpsURLConnection.disconnect(); } @@ -416,38 +429,37 @@ public class TestHTTPSource { List<JSONEvent> events = Lists.newArrayList(); Random rand = new Random(); for (int i = 0; i < 10; i++) { - Map<String, String> input = Maps.newHashMap(); - for (int j = 0; j < 10; j++) { - input.put(String.valueOf(i) + String.valueOf(j), String.valueOf(i)); - } - input.put("MsgNum", String.valueOf(i)); - JSONEvent e = new JSONEvent(); - e.setHeaders(input); - e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8")); - events.add(e); + Map<String, String> input = Maps.newHashMap(); + for (int j = 0; j < 10; j++) { + input.put(String.valueOf(i) + String.valueOf(j), String.valueOf(i)); + } + input.put("MsgNum", String.valueOf(i)); + JSONEvent e = new JSONEvent(); + e.setHeaders(input); + e.setBody(String.valueOf(rand.nextGaussian()).getBytes("UTF-8")); + events.add(e); } Gson gson = new Gson(); String json = gson.toJson(events, listType); HttpURLConnection httpURLConnection = null; try { - URL url = new URL("http://0.0.0.0:" + sslPort); - httpURLConnection = (HttpURLConnection) url.openConnection(); - httpURLConnection.setDoInput(true); - httpURLConnection.setDoOutput(true); - httpURLConnection.setRequestMethod("POST"); - httpURLConnection.getOutputStream().write(json.getBytes()); - httpURLConnection.getResponseCode(); - - Assert.fail("HTTP Client cannot connect to HTTPS source"); + URL url = new URL("http://0.0.0.0:" + sslPort); + httpURLConnection = (HttpURLConnection) url.openConnection(); + httpURLConnection.setDoInput(true); + httpURLConnection.setDoOutput(true); + httpURLConnection.setRequestMethod("POST"); + httpURLConnection.getOutputStream().write(json.getBytes()); + httpURLConnection.getResponseCode(); + + Assert.fail("HTTP Client cannot connect to HTTPS source"); } catch (Exception exception) { - Assert.assertTrue("Exception expected", true); + Assert.assertTrue("Exception expected", true); } finally { - httpURLConnection.disconnect(); + httpURLConnection.disconnect(); } } - private void takeWithEncoding(String encoding, int n, List<JSONEvent> events) - throws Exception{ + private void takeWithEncoding(String encoding, int n, List<JSONEvent> events) throws Exception { Transaction tx = channel.getTransaction(); tx.begin(); Event e = null; @@ -459,7 +471,7 @@ public class TestHTTPSource { } Event current = events.get(i++); Assert.assertEquals(new String(current.getBody(), encoding), - new String(e.getBody(), encoding)); + new String(e.getBody(), encoding)); Assert.assertEquals(current.getHeaders(), e.getHeaders()); } Assert.assertEquals(n, events.size()); @@ -480,7 +492,8 @@ public class TestHTTPSource { private class ResultWrapper { public final HttpResponse response; public final List<JSONEvent> events; - public ResultWrapper(HttpResponse resp, List<JSONEvent> events){ + + public ResultWrapper(HttpResponse resp, List<JSONEvent> events) { this.response = resp; this.events = events; } @@ -508,43 +521,39 @@ public class TestHTTPSource { } @Override - public Socket createSocket(Socket socket, String s, int i, boolean b) - throws IOException { + public Socket createSocket(Socket socket, String s, int i, boolean b) throws IOException { SSLSocket sc = (SSLSocket) socketFactory.createSocket(socket, s, i, b); sc.setEnabledProtocols(protocols); return sc; } @Override - public Socket createSocket(String s, int i) - throws IOException, UnknownHostException { - SSLSocket sc = (SSLSocket)socketFactory.createSocket(s, i); + public Socket createSocket(String s, int i) throws IOException, UnknownHostException { + SSLSocket sc = (SSLSocket) socketFactory.createSocket(s, i); sc.setEnabledProtocols(protocols); return sc; } @Override public Socket createSocket(String s, int i, InetAddress inetAddress, int i2) - throws IOException, UnknownHostException { - SSLSocket sc = (SSLSocket)socketFactory.createSocket(s, i, inetAddress, - i2); + throws IOException, UnknownHostException { + SSLSocket sc = (SSLSocket) socketFactory.createSocket(s, i, inetAddress, i2); sc.setEnabledProtocols(protocols); return sc; } @Override - public Socket createSocket(InetAddress inetAddress, int i) - throws IOException { - SSLSocket sc = (SSLSocket)socketFactory.createSocket(inetAddress, i); + public Socket createSocket(InetAddress inetAddress, int i) throws IOException { + SSLSocket sc = (SSLSocket) socketFactory.createSocket(inetAddress, i); sc.setEnabledProtocols(protocols); return sc; } @Override public Socket createSocket(InetAddress inetAddress, int i, - InetAddress inetAddress2, int i2) throws IOException { - SSLSocket sc = (SSLSocket)socketFactory.createSocket(inetAddress, i, - inetAddress2, i2); + InetAddress inetAddress2, int i2) throws IOException { + SSLSocket sc = (SSLSocket) socketFactory.createSocket(inetAddress, i, + inetAddress2, i2); sc.setEnabledProtocols(protocols); return sc; }
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java b/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java index cc7eac0..1ac11ab 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java +++ b/flume-ng-core/src/test/java/org/apache/flume/tools/TestTimestampRoundDownUtil.java @@ -18,7 +18,6 @@ package org.apache.flume.tools; - import java.util.Calendar; import junit.framework.Assert; @@ -38,8 +37,7 @@ public class TestTimestampRoundDownUtil { cal2.set(2012, 5, 15, 15, 12, 0); cal2.set(Calendar.MILLISECOND, 0); long timeToVerify = cal2.getTimeInMillis(); - long ret = TimestampRoundDownUtil. - roundDownTimeStampSeconds(cal.getTimeInMillis(), 60); + long ret = TimestampRoundDownUtil.roundDownTimeStampSeconds(cal.getTimeInMillis(), 60); System.out.println("Cal 1: " + cal.toString()); System.out.println("Cal 2: " + cal2.toString()); Assert.assertEquals(timeToVerify, ret); @@ -56,8 +54,7 @@ public class TestTimestampRoundDownUtil { cal2.set(2012, 5, 15, 15, 10, 0); cal2.set(Calendar.MILLISECOND, 0); long timeToVerify = cal2.getTimeInMillis(); - long ret = TimestampRoundDownUtil. - roundDownTimeStampMinutes(cal.getTimeInMillis(), 5); + long ret = TimestampRoundDownUtil.roundDownTimeStampMinutes(cal.getTimeInMillis(), 5); System.out.println("Cal 1: " + cal.toString()); System.out.println("Cal 2: " + cal2.toString()); Assert.assertEquals(timeToVerify, ret); @@ -74,8 +71,7 @@ public class TestTimestampRoundDownUtil { cal2.set(2012, 5, 15, 14, 0, 0); cal2.set(Calendar.MILLISECOND, 0); long timeToVerify = cal2.getTimeInMillis(); - long ret = TimestampRoundDownUtil. - roundDownTimeStampHours(cal.getTimeInMillis(), 2); + long ret = TimestampRoundDownUtil.roundDownTimeStampHours(cal.getTimeInMillis(), 2); System.out.println("Cal 1: " + ret); System.out.println("Cal 2: " + cal2.toString()); Assert.assertEquals(timeToVerify, ret); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/tools/TestVersionInfo.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/tools/TestVersionInfo.java b/flume-ng-core/src/test/java/org/apache/flume/tools/TestVersionInfo.java index b463899..0bdc820 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/tools/TestVersionInfo.java +++ b/flume-ng-core/src/test/java/org/apache/flume/tools/TestVersionInfo.java @@ -51,8 +51,8 @@ public class TestVersionInfo { !VersionInfo.getSrcChecksum().equals("Unknown")); // check getBuildVersion() return format - assertTrue("getBuildVersion returned unexpected format",VersionInfo. - getBuildVersion().matches(".+from.+by.+on.+source checksum.+")); + assertTrue("getBuildVersion returned unexpected format", + VersionInfo.getBuildVersion().matches(".+from.+by.+on.+source checksum.+")); //"Unknown" when build without svn or git assertNotNull("getRevision returned null", VersionInfo.getRevision()); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java index 241e2b5..032a4f8 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgent.java @@ -95,14 +95,14 @@ public class TestEmbeddedAgent { @After public void tearDown() throws Exception { - if(agent != null) { + if (agent != null) { try { agent.stop(); } catch (Exception e) { LOGGER.debug("Error shutting down agent", e); } } - if(nettyServer != null) { + if (nettyServer != null) { try { nettyServer.close(); } catch (Exception e) { @@ -118,7 +118,7 @@ public class TestEmbeddedAgent { agent.put(EventBuilder.withBody(body, headers)); Event event; - while((event = eventCollector.poll()) == null) { + while ((event = eventCollector.poll()) == null) { Thread.sleep(500L); } Assert.assertNotNull(event); @@ -135,7 +135,7 @@ public class TestEmbeddedAgent { agent.putAll(events); Event event; - while((event = eventCollector.poll()) == null) { + while ((event = eventCollector.poll()) == null) { Thread.sleep(500L); } Assert.assertNotNull(event); @@ -155,7 +155,7 @@ public class TestEmbeddedAgent { agent.put(EventBuilder.withBody(body, headers)); Event event; - while((event = eventCollector.poll()) == null) { + while ((event = eventCollector.poll()) == null) { Thread.sleep(500L); } Assert.assertNotNull(event); @@ -176,13 +176,13 @@ public class TestEmbeddedAgent { embedAgent.putAll(events); Event event; - while((event = eventCollector.poll()) == null) { + while ((event = eventCollector.poll()) == null) { Thread.sleep(500L); } Assert.assertNotNull(event); Assert.assertArrayEquals(body, event.getBody()); Assert.assertEquals(headers, event.getHeaders()); - if(embedAgent != null) { + if (embedAgent != null) { try { embedAgent.stop(); } catch (Exception e) { @@ -191,14 +191,13 @@ public class TestEmbeddedAgent { } } - static class EventCollector implements AvroSourceProtocol { private final Queue<AvroFlumeEvent> eventQueue = new LinkedBlockingQueue<AvroFlumeEvent>(); public Event poll() { AvroFlumeEvent avroEvent = eventQueue.poll(); - if(avroEvent != null) { + if (avroEvent != null) { return EventBuilder.withBody(avroEvent.getBody().array(), toStringMap(avroEvent.getHeaders())); } @@ -216,10 +215,9 @@ public class TestEmbeddedAgent { return Status.OK; } } - private static Map<String, String> toStringMap( - Map<CharSequence, CharSequence> charSeqMap) { - Map<String, String> stringMap = - new HashMap<String, String>(); + + private static Map<String, String> toStringMap(Map<CharSequence, CharSequence> charSeqMap) { + Map<String, String> stringMap = new HashMap<String, String>(); for (Map.Entry<CharSequence, CharSequence> entry : charSeqMap.entrySet()) { stringMap.put(entry.getKey().toString(), entry.getValue().toString()); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java index f4a9a58..ed26294 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java @@ -34,8 +34,7 @@ public class TestEmbeddedAgentConfiguration { @Before public void setUp() throws Exception { properties = Maps.newHashMap(); - properties.put("source.type", EmbeddedAgentConfiguration. - SOURCE_TYPE_EMBEDDED); + properties.put("source.type", EmbeddedAgentConfiguration.SOURCE_TYPE_EMBEDDED); properties.put("channel.type", "memory"); properties.put("channel.capacity", "200"); properties.put("sinks", "sink1 sink2"); @@ -50,28 +49,23 @@ public class TestEmbeddedAgentConfiguration { properties.put("source.interceptors.i1.type", "timestamp"); } - @Test public void testFullSourceType() throws Exception { - doTestExcepted(EmbeddedAgentConfiguration. - configure("test1", properties)); + doTestExcepted(EmbeddedAgentConfiguration.configure("test1", properties)); } @Test public void testMissingSourceType() throws Exception { Assert.assertNotNull(properties.remove("source.type")); - doTestExcepted(EmbeddedAgentConfiguration. - configure("test1", properties)); + doTestExcepted(EmbeddedAgentConfiguration.configure("test1", properties)); } @Test public void testShortSourceType() throws Exception { properties.put("source.type", "EMBEDDED"); - doTestExcepted(EmbeddedAgentConfiguration. - configure("test1", properties)); + doTestExcepted(EmbeddedAgentConfiguration.configure("test1", properties)); } - public void doTestExcepted(Map<String, String> actual) throws Exception { Map<String, String> expected = Maps.newHashMap(); expected.put("test1.channels", "channel-test1"); @@ -91,8 +85,8 @@ public class TestEmbeddedAgentConfiguration { expected.put("test1.sinks.sink2.type", "avro"); expected.put("test1.sources", "source-test1"); expected.put("test1.sources.source-test1.channels", "channel-test1"); - expected.put("test1.sources.source-test1.type", EmbeddedAgentConfiguration. - SOURCE_TYPE_EMBEDDED); + expected.put("test1.sources.source-test1.type", + EmbeddedAgentConfiguration.SOURCE_TYPE_EMBEDDED); expected.put("test1.sources.source-test1.interceptors", "i1"); expected.put("test1.sources.source-test1.interceptors.i1.type", "timestamp"); Assert.assertEquals(expected, actual); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java index 9d85e6e..c122a12 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java @@ -51,7 +51,6 @@ public class TestEmbeddedAgentEmbeddedSource { private Channel channel; private SinkRunner sinkRunner; - @Before public void setUp() throws Exception { @@ -81,26 +80,31 @@ public class TestEmbeddedAgentEmbeddedSource { result.put("source", sourceRunner); return ImmutableMap.copyOf(result); } + @Override public ImmutableMap<String, SinkRunner> getSinkRunners() { Map<String, SinkRunner> result = Maps.newHashMap(); result.put("sink", sinkRunner); return ImmutableMap.copyOf(result); } + @Override public ImmutableMap<String, Channel> getChannels() { Map<String, Channel> result = Maps.newHashMap(); result.put("channel", channel); return ImmutableMap.copyOf(result); } + @Override public void addSourceRunner(String name, SourceRunner sourceRunner) { throw new UnsupportedOperationException(); } + @Override public void addSinkRunner(String name, SinkRunner sinkRunner) { throw new UnsupportedOperationException(); } + @Override public void addChannel(String name, Channel channel) { throw new UnsupportedOperationException(); @@ -122,7 +126,6 @@ public class TestEmbeddedAgentEmbeddedSource { verify(sinkRunner, times(1)).start(); } - @Test public void testStop() { agent.configure(properties); @@ -138,16 +141,19 @@ public class TestEmbeddedAgentEmbeddedSource { doThrow(new LocalRuntimeException()).when(sourceRunner).start(); startExpectingLocalRuntimeException(); } + @Test public void testStartChannelThrowsException() { doThrow(new LocalRuntimeException()).when(channel).start(); startExpectingLocalRuntimeException(); } + @Test public void testStartSinkThrowsException() { doThrow(new LocalRuntimeException()).when(sinkRunner).start(); startExpectingLocalRuntimeException(); } + private void startExpectingLocalRuntimeException() { agent.configure(properties); try { @@ -160,9 +166,11 @@ public class TestEmbeddedAgentEmbeddedSource { verify(channel, times(1)).stop(); verify(sinkRunner, times(1)).stop(); } + private static class LocalRuntimeException extends RuntimeException { private static final long serialVersionUID = 116546244849853151L; } + @Test public void testPut() throws EventDeliveryException { Event event = new SimpleEvent(); @@ -171,6 +179,7 @@ public class TestEmbeddedAgentEmbeddedSource { agent.put(event); verify(source, times(1)).put(event); } + @Test public void testPutAll() throws EventDeliveryException { Event event = new SimpleEvent(); @@ -181,12 +190,14 @@ public class TestEmbeddedAgentEmbeddedSource { agent.putAll(events); verify(source, times(1)).putAll(events); } + @Test(expected = IllegalStateException.class) public void testPutNotStarted() throws EventDeliveryException { Event event = new SimpleEvent(); agent.configure(properties); agent.put(event); } + @Test(expected = IllegalStateException.class) public void testPutAllNotStarted() throws EventDeliveryException { Event event = new SimpleEvent(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentState.java ---------------------------------------------------------------------- diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentState.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentState.java index a14a87e..0f0ad23 100644 --- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentState.java +++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentState.java @@ -18,19 +18,19 @@ */ package org.apache.flume.agent.embedded; -import java.util.Map; - +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; import org.apache.flume.FlumeException; import org.junit.Before; import org.junit.Test; -import com.google.common.base.Throwables; -import com.google.common.collect.Maps; +import java.util.Map; public class TestEmbeddedAgentState { private static final String HOSTNAME = "localhost"; private EmbeddedAgent agent; private Map<String, String> properties; + @Before public void setUp() throws Exception { agent = new EmbeddedAgent("dummy"); @@ -47,13 +47,13 @@ public class TestEmbeddedAgentState { properties.put("processor.type", "load_balance"); } - @Test(expected=FlumeException.class) + @Test(expected = FlumeException.class) public void testConfigureWithBadSourceType() { properties.put(EmbeddedAgentConfiguration.SOURCE_TYPE, "bad"); agent.configure(properties); } - @Test(expected=IllegalStateException.class) + @Test(expected = IllegalStateException.class) public void testConfigureWhileStarted() { try { agent.configure(properties); @@ -63,13 +63,14 @@ public class TestEmbeddedAgentState { } agent.configure(properties); } + @Test public void testConfigureMultipleTimes() { agent.configure(properties); agent.configure(properties); } - @Test(expected=IllegalStateException.class) + @Test(expected = IllegalStateException.class) public void testStartWhileStarted() { try { agent.configure(properties); @@ -79,15 +80,18 @@ public class TestEmbeddedAgentState { } agent.start(); } - @Test(expected=IllegalStateException.class) + + @Test(expected = IllegalStateException.class) public void testStartUnconfigured() { agent.start(); } - @Test(expected=IllegalStateException.class) + + @Test(expected = IllegalStateException.class) public void testStopBeforeConfigure() { agent.stop(); } - @Test(expected=IllegalStateException.class) + + @Test(expected = IllegalStateException.class) public void testStoppedWhileStopped() { try { agent.configure(properties); @@ -96,7 +100,8 @@ public class TestEmbeddedAgentState { } agent.stop(); } - @Test(expected=IllegalStateException.class) + + @Test(expected = IllegalStateException.class) public void testStopAfterStop() { try { agent.configure(properties); @@ -107,7 +112,8 @@ public class TestEmbeddedAgentState { } agent.stop(); } - @Test(expected=IllegalStateException.class) + + @Test(expected = IllegalStateException.class) public void testStopAfterConfigure() { try { agent.configure(properties); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java b/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java index 6e3eb53..610aa64 100644 --- a/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java +++ b/flume-ng-legacy-sources/flume-avro-source/src/test/java/org/apache/flume/source/avroLegacy/TestLegacyAvroSource.java @@ -19,13 +19,9 @@ package org.apache.flume.source.avroLegacy; -import java.io.IOException; -import java.net.URL; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - +import com.cloudera.flume.handlers.avro.AvroFlumeOGEvent; +import com.cloudera.flume.handlers.avro.FlumeOGEventAvroServer; +import com.cloudera.flume.handlers.avro.Priority; import org.apache.avro.ipc.HttpTransceiver; import org.apache.avro.ipc.Transceiver; import org.apache.avro.ipc.specific.SpecificRequestor; @@ -40,9 +36,6 @@ import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; import org.apache.flume.lifecycle.LifecycleController; import org.apache.flume.lifecycle.LifecycleState; -import com.cloudera.flume.handlers.avro.AvroFlumeOGEvent; -import com.cloudera.flume.handlers.avro.FlumeOGEventAvroServer; -import com.cloudera.flume.handlers.avro.Priority; import org.jboss.netty.channel.ChannelException; import org.junit.Assert; import org.junit.Before; @@ -50,6 +43,12 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URL; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; public class TestLegacyAvroSource { @@ -143,10 +142,10 @@ public class TestLegacyAvroSource { FlumeOGEventAvroServer client = SpecificRequestor.getClient( FlumeOGEventAvroServer.class, http); - AvroFlumeOGEvent avroEvent = AvroFlumeOGEvent.newBuilder().setHost("foo"). - setPriority(Priority.INFO).setNanos(0).setTimestamp(1). - setFields(new HashMap<CharSequence, ByteBuffer> ()). - setBody(ByteBuffer.wrap("foo".getBytes())).build(); + AvroFlumeOGEvent avroEvent = AvroFlumeOGEvent.newBuilder().setHost("foo") + .setPriority(Priority.INFO).setNanos(0).setTimestamp(1) + .setFields(new HashMap<CharSequence, ByteBuffer>()) + .setBody(ByteBuffer.wrap("foo".getBytes())).build(); client.append(avroEvent); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java b/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java index d8a6872..f228dde 100644 --- a/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java +++ b/flume-ng-legacy-sources/flume-thrift-source/src/test/java/org/apache/flume/source/thriftLegacy/TestThriftLegacySource.java @@ -19,15 +19,10 @@ package org.apache.flume.source.thriftLegacy; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import com.cloudera.flume.handlers.thrift.Priority; +import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent; +import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer.Client; import org.apache.flume.Channel; -import org.apache.flume.ChannelException; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.Event; @@ -39,25 +34,27 @@ import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; import org.apache.flume.lifecycle.LifecycleController; import org.apache.flume.lifecycle.LifecycleState; - -import com.cloudera.flume.handlers.thrift.Priority; -import com.cloudera.flume.handlers.thrift.ThriftFlumeEvent; -import com.cloudera.flume.handlers.thrift.ThriftFlumeEventServer.Client; -//EventStatus.java Priority.java ThriftFlumeEvent.java ThriftFlumeEventServer.java - import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; - import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +//EventStatus.java Priority.java ThriftFlumeEvent.java ThriftFlumeEventServer.java + public class TestThriftLegacySource { private static final Logger logger = LoggerFactory @@ -75,7 +72,8 @@ public class TestThriftLegacySource { this.host = host; this.port = port; } - public void append(ThriftFlumeEvent evt){ + + public void append(ThriftFlumeEvent evt) { TTransport transport; try { transport = new TSocket(host, port); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java index 15a478d..e27d8f7 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractConfigurationProvider.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,10 +17,8 @@ */ package org.apache.flume.node; -import java.util.Map; - +import com.google.common.collect.Maps; import junit.framework.Assert; - import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.Context; @@ -36,7 +34,7 @@ import org.apache.flume.sink.AbstractSink; import org.apache.flume.source.AbstractSource; import org.junit.Test; -import com.google.common.collect.Maps; +import java.util.Map; public class TestAbstractConfigurationProvider { @@ -44,7 +42,7 @@ public class TestAbstractConfigurationProvider { public void testDispoableChannel() throws Exception { String agentName = "agent1"; Map<String, String> properties = getPropertiesForChannel(agentName, - DisposableChannel.class.getName()); + DisposableChannel.class.getName()); MemoryConfigurationProvider provider = new MemoryConfigurationProvider(agentName, properties); MaterializedConfiguration config1 = provider.getConfiguration(); @@ -60,7 +58,7 @@ public class TestAbstractConfigurationProvider { public void testReusableChannel() throws Exception { String agentName = "agent1"; Map<String, String> properties = getPropertiesForChannel(agentName, - RecyclableChannel.class.getName()); + RecyclableChannel.class.getName()); MemoryConfigurationProvider provider = new MemoryConfigurationProvider(agentName, properties); @@ -79,7 +77,7 @@ public class TestAbstractConfigurationProvider { public void testUnspecifiedChannel() throws Exception { String agentName = "agent1"; Map<String, String> properties = getPropertiesForChannel(agentName, - UnspecifiedChannel.class.getName()); + UnspecifiedChannel.class.getName()); MemoryConfigurationProvider provider = new MemoryConfigurationProvider(agentName, properties); @@ -98,9 +96,11 @@ public class TestAbstractConfigurationProvider { public void testReusableChannelNotReusedLater() throws Exception { String agentName = "agent1"; Map<String, String> propertiesReusable = getPropertiesForChannel(agentName, - RecyclableChannel.class.getName()); + RecyclableChannel.class + .getName()); Map<String, String> propertiesDispoable = getPropertiesForChannel(agentName, - DisposableChannel.class.getName()); + DisposableChannel.class + .getName()); MemoryConfigurationProvider provider = new MemoryConfigurationProvider(agentName, propertiesReusable); MaterializedConfiguration config1 = provider.getConfiguration(); @@ -127,7 +127,7 @@ public class TestAbstractConfigurationProvider { String channelType = "memory"; String sinkType = "null"; Map<String, String> properties = getProperties(agentName, sourceType, - channelType, sinkType); + channelType, sinkType); MemoryConfigurationProvider provider = new MemoryConfigurationProvider(agentName, properties); MaterializedConfiguration config = provider.getConfiguration(); @@ -135,6 +135,7 @@ public class TestAbstractConfigurationProvider { Assert.assertTrue(config.getChannels().size() == 1); Assert.assertTrue(config.getSinkRunners().size() == 1); } + @Test public void testChannelThrowsExceptionDuringConfiguration() throws Exception { String agentName = "agent1"; @@ -142,7 +143,7 @@ public class TestAbstractConfigurationProvider { String channelType = UnconfigurableChannel.class.getName(); String sinkType = "null"; Map<String, String> properties = getProperties(agentName, sourceType, - channelType, sinkType); + channelType, sinkType); MemoryConfigurationProvider provider = new MemoryConfigurationProvider(agentName, properties); MaterializedConfiguration config = provider.getConfiguration(); @@ -150,6 +151,7 @@ public class TestAbstractConfigurationProvider { Assert.assertTrue(config.getChannels().size() == 0); Assert.assertTrue(config.getSinkRunners().size() == 0); } + @Test public void testSinkThrowsExceptionDuringConfiguration() throws Exception { String agentName = "agent1"; @@ -157,7 +159,7 @@ public class TestAbstractConfigurationProvider { String channelType = "memory"; String sinkType = UnconfigurableSink.class.getName(); Map<String, String> properties = getProperties(agentName, sourceType, - channelType, sinkType); + channelType, sinkType); MemoryConfigurationProvider provider = new MemoryConfigurationProvider(agentName, properties); MaterializedConfiguration config = provider.getConfiguration(); @@ -165,6 +167,7 @@ public class TestAbstractConfigurationProvider { Assert.assertTrue(config.getChannels().size() == 1); Assert.assertTrue(config.getSinkRunners().size() == 0); } + @Test public void testSourceAndSinkThrowExceptionDuringConfiguration() throws Exception { @@ -173,7 +176,7 @@ public class TestAbstractConfigurationProvider { String channelType = "memory"; String sinkType = UnconfigurableSink.class.getName(); Map<String, String> properties = getProperties(agentName, sourceType, - channelType, sinkType); + channelType, sinkType); MemoryConfigurationProvider provider = new MemoryConfigurationProvider(agentName, properties); MaterializedConfiguration config = provider.getConfiguration(); @@ -181,8 +184,10 @@ public class TestAbstractConfigurationProvider { Assert.assertTrue(config.getChannels().size() == 0); Assert.assertTrue(config.getSinkRunners().size() == 0); } + private Map<String, String> getProperties(String agentName, - String sourceType, String channelType, String sinkType) { + String sourceType, String channelType, + String sinkType) { Map<String, String> properties = Maps.newHashMap(); properties.put(agentName + ".sources", "source1"); properties.put(agentName + ".channels", "channel1"); @@ -195,12 +200,14 @@ public class TestAbstractConfigurationProvider { properties.put(agentName + ".sinks.sink1.channel", "channel1"); return properties; } + private Map<String, String> getPropertiesForChannel(String agentName, String channelType) { return getProperties(agentName, "seq", channelType, "null"); } public static class MemoryConfigurationProvider extends AbstractConfigurationProvider { private Map<String, String> properties; + public MemoryConfigurationProvider(String agentName, Map<String, String> properties) { super(agentName); this.properties = properties; @@ -215,81 +222,95 @@ public class TestAbstractConfigurationProvider { return new FlumeConfiguration(properties); } } + @Disposable public static class DisposableChannel extends AbstractChannel { @Override public void put(Event event) throws ChannelException { throw new UnsupportedOperationException(); } + @Override public Event take() throws ChannelException { throw new UnsupportedOperationException(); - } + } + @Override public Transaction getTransaction() { throw new UnsupportedOperationException(); } } + @Recyclable public static class RecyclableChannel extends AbstractChannel { @Override public void put(Event event) throws ChannelException { throw new UnsupportedOperationException(); } + @Override public Event take() throws ChannelException { throw new UnsupportedOperationException(); - } + } + @Override public Transaction getTransaction() { throw new UnsupportedOperationException(); } } + public static class UnspecifiedChannel extends AbstractChannel { @Override public void put(Event event) throws ChannelException { throw new UnsupportedOperationException(); } + @Override public Event take() throws ChannelException { throw new UnsupportedOperationException(); - } + } + @Override public Transaction getTransaction() { throw new UnsupportedOperationException(); } } + public static class UnconfigurableChannel extends AbstractChannel { @Override public void configure(Context context) { throw new RuntimeException("expected"); } + @Override public void put(Event event) throws ChannelException { throw new UnsupportedOperationException(); } + @Override public Event take() throws ChannelException { throw new UnsupportedOperationException(); - } + } + @Override public Transaction getTransaction() { throw new UnsupportedOperationException(); } } - public static class UnconfigurableSource extends AbstractSource - implements Configurable { + + public static class UnconfigurableSource extends AbstractSource implements Configurable { @Override public void configure(Context context) { throw new RuntimeException("expected"); } } - public static class UnconfigurableSink extends AbstractSink - implements Configurable { + + public static class UnconfigurableSink extends AbstractSink implements Configurable { @Override public void configure(Context context) { throw new RuntimeException("expected"); } + @Override public Status process() throws EventDeliveryException { throw new UnsupportedOperationException(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java index 1ab4127..2e30634 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java @@ -18,15 +18,10 @@ package org.apache.flume.node; -import java.io.InputStreamReader; -import java.io.Reader; -import java.util.Collections; -import java.util.List; -import java.util.Set; - import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import junit.framework.Assert; - import org.apache.commons.io.IOUtils; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; @@ -38,8 +33,11 @@ import org.apache.flume.conf.FlumeConfigurationError; import org.junit.After; import org.junit.Before; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.Collections; +import java.util.List; +import java.util.Set; public abstract class TestAbstractZooKeeperConfigurationProvider { @@ -48,8 +46,7 @@ public abstract class TestAbstractZooKeeperConfigurationProvider { protected static final String AGENT_NAME = "a1"; protected static final String AGENT_PATH = - AbstractZooKeeperConfigurationProvider.DEFAULT_ZK_BASE_PATH - + "/" + AGENT_NAME; + AbstractZooKeeperConfigurationProvider.DEFAULT_ZK_BASE_PATH + "/" + AGENT_NAME; protected TestingServer zkServer; protected CuratorFramework client; @@ -112,10 +109,8 @@ public abstract class TestAbstractZooKeeperConfigurationProvider { expected.add("host2 PROPERTY_VALUE_NULL"); expected.add("host2 AGENT_CONFIGURATION_INVALID"); List<String> actual = Lists.newArrayList(); - for (FlumeConfigurationError error : configuration - .getConfigurationErrors()) { - actual.add(error.getComponentName() + " " - + error.getErrorType().toString()); + for (FlumeConfigurationError error : configuration.getConfigurationErrors()) { + actual.add(error.getComponentName() + " " + error.getErrorType().toString()); } Collections.sort(expected); Collections.sort(actual); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java index 930f2a2..affbd8c 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestApplication.java @@ -44,13 +44,13 @@ import com.google.common.io.Files; public class TestApplication { - private File baseDir; @Before public void setup() throws Exception { baseDir = Files.createTempDir(); } + @After public void tearDown() throws Exception { FileUtils.deleteDirectory(baseDir); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java index eed22ee..480f6a5 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingPropertiesFileConfigurationProvider.java @@ -36,7 +36,6 @@ import com.google.common.io.Files; public class TestPollingPropertiesFileConfigurationProvider { - private static final File TESTFILE = new File( TestPollingPropertiesFileConfigurationProvider.class.getClassLoader() .getResource("flume-conf.properties").getFile()); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java index 84a8cfd..4875c56 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java +++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPropertiesFileConfigurationProvider.java @@ -17,13 +17,9 @@ */ package org.apache.flume.node; -import java.io.File; -import java.util.Collections; -import java.util.List; -import java.util.Set; - +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import junit.framework.Assert; - import org.apache.flume.conf.FlumeConfiguration; import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration; import org.apache.flume.conf.FlumeConfigurationError; @@ -33,14 +29,15 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Set; public class TestPropertiesFileConfigurationProvider { - - private static final Logger LOGGER = LoggerFactory - .getLogger(TestPropertiesFileConfigurationProvider.class); + private static final Logger LOGGER = + LoggerFactory.getLogger(TestPropertiesFileConfigurationProvider.class); private static final File TESTFILE = new File( TestPropertiesFileConfigurationProvider.class.getClassLoader() @@ -83,23 +80,20 @@ public class TestPropertiesFileConfigurationProvider { expected.add("host2 PROPERTY_VALUE_NULL"); expected.add("host2 AGENT_CONFIGURATION_INVALID"); List<String> actual = Lists.newArrayList(); - for(FlumeConfigurationError error : configuration.getConfigurationErrors()) { + for (FlumeConfigurationError error : configuration.getConfigurationErrors()) { actual.add(error.getComponentName() + " " + error.getErrorType().toString()); } Collections.sort(expected); Collections.sort(actual); Assert.assertEquals(expected, actual); - AgentConfiguration agentConfiguration = configuration.getConfigurationFor("host1"); Assert.assertNotNull(agentConfiguration); - LOGGER.info(agentConfiguration.getPrevalidationConfig()); LOGGER.info(agentConfiguration.getPostvalidationConfig()); - Set<String> sources = Sets.newHashSet("source1"); Set<String> sinks = Sets.newHashSet("sink1"); Set<String> channels = Sets.newHashSet("channel1"); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java index 91fbf63..a597a31 100644 --- a/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java +++ b/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java @@ -19,19 +19,6 @@ package org.apache.flume.source; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.Writer; -import java.net.InetSocketAddress; -import java.nio.channels.Channels; -import java.nio.channels.SocketChannel; -import java.util.List; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.Collection; -import java.util.Arrays; - import com.google.common.collect.Lists; import org.apache.flume.Channel; import org.apache.flume.ChannelSelector; @@ -49,12 +36,25 @@ import org.apache.flume.lifecycle.LifecycleException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; -import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.Writer; +import java.net.InetSocketAddress; +import java.nio.channels.Channels; +import java.nio.channels.SocketChannel; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + @RunWith(value = Parameterized.class) public class TestNetcatSource { @@ -72,7 +72,7 @@ public class TestNetcatSource { @Parameters public static Collection data() { Object[][] data = new Object[][] { { true }, { false } }; - return Arrays.asList(data); + return Arrays.asList(data); } @Before @@ -99,7 +99,7 @@ public class TestNetcatSource { ExecutorService executor = Executors.newFixedThreadPool(3); boolean bound = false; - for(int i = 0; i < 100 && !bound; i++) { + for (int i = 0; i < 100 && !bound; i++) { try { Context context = new Context(); context.put("bind", "0.0.0.0"); @@ -131,10 +131,10 @@ public class TestNetcatSource { writer.flush(); if (ackEveryEvent) { - String response = reader.readLine(); - Assert.assertEquals("Server should return OK", "OK", response); + String response = reader.readLine(); + Assert.assertEquals("Server should return OK", "OK", response); } else { - Assert.assertFalse("Server should not return anything", reader.ready()); + Assert.assertFalse("Server should not return anything", reader.ready()); } clientChannel.close(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java index 8806860..d9355f7 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/RpcTestUtils.java @@ -18,13 +18,6 @@ */ package org.apache.flume.api; -import java.net.InetSocketAddress; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.Executors; - import junit.framework.Assert; import org.apache.avro.AvroRemoteException; import org.apache.avro.ipc.NettyServer; @@ -47,6 +40,13 @@ import org.jboss.netty.handler.codec.compression.ZlibEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.InetSocketAddress; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Executors; + /** * Helpers for Netty Avro RPC testing */ @@ -75,7 +75,9 @@ public class RpcTestUtils { * @throws FlumeException * @throws EventDeliveryException */ - public static void handlerSimpleAppendTest(AvroSourceProtocol handler, boolean enableServerCompression, boolean enableClientCompression, int compressionLevel) + public static void handlerSimpleAppendTest(AvroSourceProtocol handler, + boolean enableServerCompression, + boolean enableClientCompression, int compressionLevel) throws FlumeException, EventDeliveryException { NettyAvroRpcClient client = null; Server server = startServer(handler, 0, enableServerCompression); @@ -83,7 +85,8 @@ public class RpcTestUtils { Properties starterProp = new Properties(); if (enableClientCompression) { starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, "deflate"); - starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL, "" + compressionLevel); + starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL, + "" + compressionLevel); } else { starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, "none"); } @@ -108,7 +111,9 @@ public class RpcTestUtils { * @throws FlumeException * @throws EventDeliveryException */ - public static void handlerBatchAppendTest(AvroSourceProtocol handler, boolean enableServerCompression, boolean enableClientCompression, int compressionLevel) + public static void handlerBatchAppendTest(AvroSourceProtocol handler, + boolean enableServerCompression, + boolean enableClientCompression, int compressionLevel) throws FlumeException, EventDeliveryException { NettyAvroRpcClient client = null; Server server = startServer(handler, 0 , enableServerCompression); @@ -117,7 +122,8 @@ public class RpcTestUtils { Properties starterProp = new Properties(); if (enableClientCompression) { starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, "deflate"); - starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL, "" + compressionLevel); + starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_LEVEL, + "" + compressionLevel); } else { starterProp.setProperty(RpcClientConfigurationConstants.CONFIG_COMPRESSION_TYPE, "none"); } @@ -161,28 +167,24 @@ public class RpcTestUtils { /** * Start a NettyServer, wait a moment for it to spin up, and return it. */ - public static Server startServer(AvroSourceProtocol handler, int port, boolean enableCompression) { - Responder responder = new SpecificResponder(AvroSourceProtocol.class, - handler); + public static Server startServer(AvroSourceProtocol handler, int port, + boolean enableCompression) { + Responder responder = new SpecificResponder(AvroSourceProtocol.class, handler); Server server; if (enableCompression) { - server = new NettyServer(responder, - new InetSocketAddress(localhost, port), - new NioServerSocketChannelFactory - (Executors .newCachedThreadPool(), Executors.newCachedThreadPool()), - new CompressionChannelPipelineFactory(), null); + server = new NettyServer(responder, new InetSocketAddress(localhost, port), + new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), + Executors.newCachedThreadPool()), + new CompressionChannelPipelineFactory(), null); } else { - server = new NettyServer(responder, - new InetSocketAddress(localhost, port)); + server = new NettyServer(responder, new InetSocketAddress(localhost, port)); } server.start(); logger.info("Server started on hostname: {}, port: {}", - new Object[] { localhost, Integer.toString(server.getPort()) }); + new Object[] { localhost, Integer.toString(server.getPort()) }); try { - Thread.sleep(300L); - } catch (InterruptedException ex) { logger.error("Thread interrupted. Exception follows.", ex); Thread.currentThread().interrupt(); @@ -298,15 +300,13 @@ public class RpcTestUtils { @Override public Status append(AvroFlumeEvent event) throws AvroRemoteException { logger.info("Failed: Received event from append(): {}", - new String(event.getBody().array(), Charset.forName("UTF8"))); + new String(event.getBody().array(), Charset.forName("UTF8"))); return Status.FAILED; } @Override - public Status appendBatch(List<AvroFlumeEvent> events) throws - AvroRemoteException { - logger.info("Failed: Received {} events from appendBatch()", - events.size()); + public Status appendBatch(List<AvroFlumeEvent> events) throws AvroRemoteException { + logger.info("Failed: Received {} events from appendBatch()", events.size()); return Status.FAILED; } @@ -320,15 +320,14 @@ public class RpcTestUtils { @Override public Status append(AvroFlumeEvent event) throws AvroRemoteException { logger.info("Unknown: Received event from append(): {}", - new String(event.getBody().array(), Charset.forName("UTF8"))); + new String(event.getBody().array(), Charset.forName("UTF8"))); return Status.UNKNOWN; } @Override - public Status appendBatch(List<AvroFlumeEvent> events) throws - AvroRemoteException { + public Status appendBatch(List<AvroFlumeEvent> events) throws AvroRemoteException { logger.info("Unknown: Received {} events from appendBatch()", - events.size()); + events.size()); return Status.UNKNOWN; } @@ -342,22 +341,18 @@ public class RpcTestUtils { @Override public Status append(AvroFlumeEvent event) throws AvroRemoteException { logger.info("Throwing: Received event from append(): {}", - new String(event.getBody().array(), Charset.forName("UTF8"))); + new String(event.getBody().array(), Charset.forName("UTF8"))); throw new AvroRemoteException("Handler smash!"); } @Override - public Status appendBatch(List<AvroFlumeEvent> events) throws - AvroRemoteException { - logger.info("Throwing: Received {} events from appendBatch()", - events.size()); + public Status appendBatch(List<AvroFlumeEvent> events) throws AvroRemoteException { + logger.info("Throwing: Received {} events from appendBatch()", events.size()); throw new AvroRemoteException("Handler smash!"); } - } - private static class CompressionChannelPipelineFactory implements - ChannelPipelineFactory { + private static class CompressionChannelPipelineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java index 64dc181..c3eb205 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestFailoverRpcClient.java @@ -113,9 +113,7 @@ public class TestFailoverRpcClient { server5.close(); Thread.sleep(1000L); // wait a second for the close to occur Server server6 = RpcTestUtils.startServer(new OKAvroHandler(), s1Port); - client - .append(EventBuilder.withBody("Had a whole watermelon?", - Charset.forName("UTF8"))); + client.append(EventBuilder.withBody("Had a whole watermelon?", Charset.forName("UTF8"))); Assert.assertEquals(new InetSocketAddress("localhost", s1Port), client.getLastConnectedServerAddress()); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java index 5d6828b..dc53d3f 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestLoadBalancingRpcClient.java @@ -18,14 +18,7 @@ */ package org.apache.flume.api; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Properties; -import java.util.Set; - import junit.framework.Assert; - import org.apache.avro.ipc.Server; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; @@ -37,12 +30,16 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class TestLoadBalancingRpcClient { - private static final Logger LOGGER = LoggerFactory - .getLogger(TestLoadBalancingRpcClient.class); +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +public class TestLoadBalancingRpcClient { + private static final Logger LOGGER = LoggerFactory.getLogger(TestLoadBalancingRpcClient.class); - @Test(expected=FlumeException.class) + @Test(expected = FlumeException.class) public void testCreatingLbClientSingleHost() { Server server1 = null; RpcClient c = null; @@ -61,9 +58,10 @@ public class TestLoadBalancingRpcClient { @Test public void testTwoHostFailover() throws Exception { - Server s1 = null, s2 = null; + Server s1 = null; + Server s2 = null; RpcClient c = null; - try{ + try { LoadBalancedAvroHandler h1 = new LoadBalancedAvroHandler(); LoadBalancedAvroHandler h2 = new LoadBalancedAvroHandler(); @@ -100,9 +98,10 @@ public class TestLoadBalancingRpcClient { // This will fail without FLUME-1823 @Test(expected = EventDeliveryException.class) public void testTwoHostFailoverThrowAfterClose() throws Exception { - Server s1 = null, s2 = null; + Server s1 = null; + Server s2 = null; RpcClient c = null; - try{ + try { LoadBalancedAvroHandler h1 = new LoadBalancedAvroHandler(); LoadBalancedAvroHandler h2 = new LoadBalancedAvroHandler(); @@ -140,13 +139,15 @@ public class TestLoadBalancingRpcClient { /** * Ensure that we can tolerate a host that is completely down. + * * @throws Exception */ @Test public void testTwoHostsOneDead() throws Exception { LOGGER.info("Running testTwoHostsOneDead..."); Server s1 = null; - RpcClient c1 = null, c2 = null; + RpcClient c1 = null; + RpcClient c2 = null; try { LoadBalancedAvroHandler h1 = new LoadBalancedAvroHandler(); s1 = RpcTestUtils.startServer(h1); @@ -186,9 +187,10 @@ public class TestLoadBalancingRpcClient { @Test public void testTwoHostFailoverBatch() throws Exception { - Server s1 = null, s2 = null; + Server s1 = null; + Server s2 = null; RpcClient c = null; - try{ + try { LoadBalancedAvroHandler h1 = new LoadBalancedAvroHandler(); LoadBalancedAvroHandler h2 = new LoadBalancedAvroHandler(); @@ -225,9 +227,10 @@ public class TestLoadBalancingRpcClient { @Test public void testLbDefaultClientTwoHosts() throws Exception { - Server s1 = null, s2 = null; + Server s1 = null; + Server s2 = null; RpcClient c = null; - try{ + try { LoadBalancedAvroHandler h1 = new LoadBalancedAvroHandler(); LoadBalancedAvroHandler h2 = new LoadBalancedAvroHandler(); @@ -258,9 +261,10 @@ public class TestLoadBalancingRpcClient { @Test public void testLbDefaultClientTwoHostsBatch() throws Exception { - Server s1 = null, s2 = null; + Server s1 = null; + Server s2 = null; RpcClient c = null; - try{ + try { LoadBalancedAvroHandler h1 = new LoadBalancedAvroHandler(); LoadBalancedAvroHandler h2 = new LoadBalancedAvroHandler(); @@ -296,10 +300,10 @@ public class TestLoadBalancingRpcClient { Server[] s = new Server[NUM_HOSTS]; LoadBalancedAvroHandler[] h = new LoadBalancedAvroHandler[NUM_HOSTS]; RpcClient c = null; - try{ + try { Properties p = new Properties(); StringBuilder hostList = new StringBuilder(""); - for (int i = 0; i<NUM_HOSTS; i++) { + for (int i = 0; i < NUM_HOSTS; i++) { h[i] = new LoadBalancedAvroHandler(); s[i] = RpcTestUtils.startServer(h[i]); String name = "h" + i; @@ -328,7 +332,7 @@ public class TestLoadBalancingRpcClient { Assert.assertTrue("Very unusual distribution", counts.size() > 2); Assert.assertTrue("Missing events", total == NUM_EVENTS); } finally { - for (int i = 0; i<NUM_HOSTS; i++) { + for (int i = 0; i < NUM_HOSTS; i++) { if (s[i] != null) s[i].close(); } } @@ -341,10 +345,10 @@ public class TestLoadBalancingRpcClient { Server[] s = new Server[NUM_HOSTS]; LoadBalancedAvroHandler[] h = new LoadBalancedAvroHandler[NUM_HOSTS]; RpcClient c = null; - try{ + try { Properties p = new Properties(); StringBuilder hostList = new StringBuilder(""); - for (int i = 0; i<NUM_HOSTS; i++) { + for (int i = 0; i < NUM_HOSTS; i++) { h[i] = new LoadBalancedAvroHandler(); s[i] = RpcTestUtils.startServer(h[i]); String name = "h" + i; @@ -373,7 +377,7 @@ public class TestLoadBalancingRpcClient { Assert.assertTrue("Very unusual distribution", counts.size() > 2); Assert.assertTrue("Missing events", total == NUM_EVENTS); } finally { - for (int i = 0; i<NUM_HOSTS; i++) { + for (int i = 0; i < NUM_HOSTS; i++) { if (s[i] != null) s[i].close(); } } @@ -386,10 +390,10 @@ public class TestLoadBalancingRpcClient { Server[] s = new Server[NUM_HOSTS]; LoadBalancedAvroHandler[] h = new LoadBalancedAvroHandler[NUM_HOSTS]; RpcClient c = null; - try{ + try { Properties p = new Properties(); StringBuilder hostList = new StringBuilder(""); - for (int i = 0; i<NUM_HOSTS; i++) { + for (int i = 0; i < NUM_HOSTS; i++) { h[i] = new LoadBalancedAvroHandler(); s[i] = RpcTestUtils.startServer(h[i]); String name = "h" + i; @@ -418,24 +422,23 @@ public class TestLoadBalancingRpcClient { Assert.assertTrue("Very unusual distribution", counts.size() == 1); Assert.assertTrue("Missing events", total == NUM_EVENTS); } finally { - for (int i = 0; i<NUM_HOSTS; i++) { + for (int i = 0; i < NUM_HOSTS; i++) { if (s[i] != null) s[i].close(); } } } @Test - public void testLbClientTenHostRoundRobinDistributionBatch() throws Exception - { + public void testLbClientTenHostRoundRobinDistributionBatch() throws Exception { final int NUM_HOSTS = 10; final int NUM_EVENTS = 1000; Server[] s = new Server[NUM_HOSTS]; LoadBalancedAvroHandler[] h = new LoadBalancedAvroHandler[NUM_HOSTS]; RpcClient c = null; - try{ + try { Properties p = new Properties(); StringBuilder hostList = new StringBuilder(""); - for (int i = 0; i<NUM_HOSTS; i++) { + for (int i = 0; i < NUM_HOSTS; i++) { h[i] = new LoadBalancedAvroHandler(); s[i] = RpcTestUtils.startServer(h[i]); String name = "h" + i; @@ -464,7 +467,7 @@ public class TestLoadBalancingRpcClient { Assert.assertTrue("Very unusual distribution", counts.size() == 1); Assert.assertTrue("Missing events", total == NUM_EVENTS); } finally { - for (int i = 0; i<NUM_HOSTS; i++) { + for (int i = 0; i < NUM_HOSTS; i++) { if (s[i] != null) s[i].close(); } } @@ -474,10 +477,10 @@ public class TestLoadBalancingRpcClient { public void testRandomBackoff() throws Exception { Properties p = new Properties(); List<LoadBalancedAvroHandler> hosts = - new ArrayList<LoadBalancedAvroHandler>(); + new ArrayList<LoadBalancedAvroHandler>(); List<Server> servers = new ArrayList<Server>(); StringBuilder hostList = new StringBuilder(""); - for(int i = 0; i < 3;i++){ + for (int i = 0; i < 3; i++) { LoadBalancedAvroHandler s = new LoadBalancedAvroHandler(); hosts.add(s); Server srv = RpcTestUtils.startServer(s); @@ -499,7 +502,7 @@ public class TestLoadBalancingRpcClient { // TODO: there is a remote possibility that s0 or s2 // never get hit by the random assignment // and thus not backoffed, causing the test to fail - for(int i=0; i < 50; i++) { + for (int i = 0; i < 50; i++) { // a well behaved runner would always check the return. c.append(EventBuilder.withBody(("test" + String.valueOf(i)).getBytes())); } @@ -525,11 +528,12 @@ public class TestLoadBalancingRpcClient { Assert.assertEquals(50, hosts.get(1).getAppendCount()); Assert.assertEquals(0, hosts.get(2).getAppendCount()); } + @Test public void testRoundRobinBackoffInitialFailure() throws EventDeliveryException { Properties p = new Properties(); List<LoadBalancedAvroHandler> hosts = - new ArrayList<LoadBalancedAvroHandler>(); + new ArrayList<LoadBalancedAvroHandler>(); List<Server> servers = new ArrayList<Server>(); StringBuilder hostList = new StringBuilder(""); for (int i = 0; i < 3; i++) { @@ -572,13 +576,13 @@ public class TestLoadBalancingRpcClient { public void testRoundRobinBackoffIncreasingBackoffs() throws Exception { Properties p = new Properties(); List<LoadBalancedAvroHandler> hosts = - new ArrayList<LoadBalancedAvroHandler>(); + new ArrayList<LoadBalancedAvroHandler>(); List<Server> servers = new ArrayList<Server>(); StringBuilder hostList = new StringBuilder(""); for (int i = 0; i < 3; i++) { LoadBalancedAvroHandler s = new LoadBalancedAvroHandler(); hosts.add(s); - if(i == 1) { + if (i == 1) { s.setFailed(); } Server srv = RpcTestUtils.startServer(s); @@ -620,16 +624,17 @@ public class TestLoadBalancingRpcClient { c.append(EventBuilder.withBody("testing".getBytes())); } - Assert.assertEquals( 2 + 2 + 1 + (numEvents/3), hosts.get(0).getAppendCount()); - Assert.assertEquals((numEvents/3), hosts.get(1).getAppendCount()); - Assert.assertEquals(1 + 1 + 2 + (numEvents/3), hosts.get(2).getAppendCount()); + Assert.assertEquals(2 + 2 + 1 + (numEvents / 3), hosts.get(0).getAppendCount()); + Assert.assertEquals((numEvents / 3), hosts.get(1).getAppendCount()); + Assert.assertEquals(1 + 1 + 2 + (numEvents / 3), hosts.get(2).getAppendCount()); } @Test - public void testRoundRobinBackoffFailureRecovery() throws EventDeliveryException, InterruptedException { + public void testRoundRobinBackoffFailureRecovery() + throws EventDeliveryException, InterruptedException { Properties p = new Properties(); List<LoadBalancedAvroHandler> hosts = - new ArrayList<LoadBalancedAvroHandler>(); + new ArrayList<LoadBalancedAvroHandler>(); List<Server> servers = new ArrayList<Server>(); StringBuilder hostList = new StringBuilder(""); for (int i = 0; i < 3; i++) { @@ -660,13 +665,13 @@ public class TestLoadBalancingRpcClient { Thread.sleep(3000); int numEvents = 60; - for(int i = 0; i < numEvents; i++){ + for (int i = 0; i < numEvents; i++) { c.append(EventBuilder.withBody("testing".getBytes())); } - Assert.assertEquals(2 + (numEvents/3) , hosts.get(0).getAppendCount()); - Assert.assertEquals(0 + (numEvents/3), hosts.get(1).getAppendCount()); - Assert.assertEquals(1 + (numEvents/3), hosts.get(2).getAppendCount()); + Assert.assertEquals(2 + (numEvents / 3), hosts.get(0).getAppendCount()); + Assert.assertEquals(0 + (numEvents / 3), hosts.get(1).getAppendCount()); + Assert.assertEquals(1 + (numEvents / 3), hosts.get(2).getAppendCount()); } private List<Event> getBatchedEvent(int index) { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java index cf4f415..6cd1454 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestNettyAvroRpcClient.java @@ -87,7 +87,7 @@ public class TestNettyAvroRpcClient { * @throws FlumeException * @throws EventDeliveryException */ - @Test(expected=org.apache.flume.EventDeliveryException.class) + @Test(expected = org.apache.flume.EventDeliveryException.class) public void testOKServerSimpleCompressionClientOnly() throws FlumeException, EventDeliveryException { RpcTestUtils.handlerSimpleAppendTest(new OKAvroHandler(), false, true, 6); @@ -98,7 +98,7 @@ public class TestNettyAvroRpcClient { * @throws FlumeException * @throws EventDeliveryException */ - @Test(expected=org.apache.flume.EventDeliveryException.class) + @Test(expected = org.apache.flume.EventDeliveryException.class) public void testOKServerSimpleCompressionServerOnly() throws FlumeException, EventDeliveryException { RpcTestUtils.handlerSimpleAppendTest(new OKAvroHandler(), true, false, 6); @@ -142,7 +142,7 @@ public class TestNettyAvroRpcClient { * @throws FlumeException * @throws EventDeliveryException */ - @Test(expected=org.apache.flume.EventDeliveryException.class) + @Test(expected = org.apache.flume.EventDeliveryException.class) public void testOKServerBatchCompressionServerOnly() throws FlumeException, EventDeliveryException { RpcTestUtils.handlerBatchAppendTest(new OKAvroHandler(), true, false, 6); @@ -153,7 +153,7 @@ public class TestNettyAvroRpcClient { * @throws FlumeException * @throws EventDeliveryException */ - @Test(expected=org.apache.flume.EventDeliveryException.class) + @Test(expected = org.apache.flume.EventDeliveryException.class) public void testOKServerBatchCompressionClientOnly() throws FlumeException, EventDeliveryException { RpcTestUtils.handlerBatchAppendTest(new OKAvroHandler(), false, true, 6); @@ -164,7 +164,7 @@ public class TestNettyAvroRpcClient { * Note: this test tries to connect to port 1 on localhost. * @throws FlumeException */ - @Test(expected=FlumeException.class) + @Test(expected = FlumeException.class) public void testUnableToConnect() throws FlumeException { @SuppressWarnings("unused") NettyAvroRpcClient client = new NettyAvroRpcClient(); @@ -214,7 +214,7 @@ public class TestNettyAvroRpcClient { * @throws EventDeliveryException * @throws InterruptedException */ - @Test(expected=EventDeliveryException.class) + @Test(expected = EventDeliveryException.class) public void testServerDisconnect() throws FlumeException, EventDeliveryException, InterruptedException { NettyAvroRpcClient client = null; @@ -245,7 +245,7 @@ public class TestNettyAvroRpcClient { * @throws FlumeException * @throws EventDeliveryException */ - @Test(expected=EventDeliveryException.class) + @Test(expected = EventDeliveryException.class) public void testClientClosedRequest() throws FlumeException, EventDeliveryException { NettyAvroRpcClient client = null; @@ -265,7 +265,7 @@ public class TestNettyAvroRpcClient { /** * Send an event to an online server that returns FAILED. */ - @Test(expected=EventDeliveryException.class) + @Test(expected = EventDeliveryException.class) public void testFailedServerSimple() throws FlumeException, EventDeliveryException { @@ -276,7 +276,7 @@ public class TestNettyAvroRpcClient { /** * Send an event to an online server that returns UNKNOWN. */ - @Test(expected=EventDeliveryException.class) + @Test(expected = EventDeliveryException.class) public void testUnknownServerSimple() throws FlumeException, EventDeliveryException { @@ -287,7 +287,7 @@ public class TestNettyAvroRpcClient { /** * Send an event to an online server that throws an exception. */ - @Test(expected=EventDeliveryException.class) + @Test(expected = EventDeliveryException.class) public void testThrowingServerSimple() throws FlumeException, EventDeliveryException { @@ -298,7 +298,7 @@ public class TestNettyAvroRpcClient { /** * Send a batch of events to a server that returns FAILED. */ - @Test(expected=EventDeliveryException.class) + @Test(expected = EventDeliveryException.class) public void testFailedServerBatch() throws FlumeException, EventDeliveryException { @@ -309,7 +309,7 @@ public class TestNettyAvroRpcClient { /** * Send a batch of events to a server that returns UNKNOWN. */ - @Test(expected=EventDeliveryException.class) + @Test(expected = EventDeliveryException.class) public void testUnknownServerBatch() throws FlumeException, EventDeliveryException { @@ -320,7 +320,7 @@ public class TestNettyAvroRpcClient { /** * Send a batch of events to a server that always throws exceptions. */ - @Test(expected=EventDeliveryException.class) + @Test(expected = EventDeliveryException.class) public void testThrowingServerBatch() throws FlumeException, EventDeliveryException { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java index a8baaa8..b03fc8d 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/TestThriftRpcClient.java @@ -50,12 +50,10 @@ public class TestThriftRpcClient { public void setUp() throws Exception { props.setProperty("hosts", "h1"); port = random.nextInt(40000) + 1024; - props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, - "thrift"); - props.setProperty("hosts.h1", "0.0.0.0:"+ String.valueOf(port)); + props.setProperty(RpcClientConfigurationConstants.CONFIG_CLIENT_TYPE, "thrift"); + props.setProperty("hosts.h1", "0.0.0.0:" + String.valueOf(port)); props.setProperty(RpcClientConfigurationConstants.CONFIG_BATCH_SIZE, "10"); - props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, - "2000"); + props.setProperty(RpcClientConfigurationConstants.CONFIG_REQUEST_TIMEOUT, "2000"); props.setProperty(ThriftRpcClient.CONFIG_PROTOCOL, ThriftRpcClient.COMPACT_PROTOCOL); } @@ -71,13 +69,11 @@ public class TestThriftRpcClient { * @param count * @throws Exception */ - public static void insertEvents(RpcClient client, - int count) throws Exception { + public static void insertEvents(RpcClient client, int count) throws Exception { for (int i = 0; i < count; i++) { Map<String, String> header = new HashMap<String, String>(); header.put(SEQ, String.valueOf(i)); - client.append(EventBuilder.withBody(String.valueOf(i).getBytes(), - header)); + client.append(EventBuilder.withBody(String.valueOf(i).getBytes(), header)); } } @@ -149,22 +145,20 @@ public class TestThriftRpcClient { @Test public void testError() throws Throwable { try { - src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ERROR - .name(), port, ThriftRpcClient.COMPACT_PROTOCOL); - client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0" + - ".0", port); + src = new ThriftTestingSource(ThriftTestingSource.HandlerType.ERROR.name(), port, + ThriftRpcClient.COMPACT_PROTOCOL); + client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0" + ".0", port); insertEvents(client, 2); //2 events } catch (EventDeliveryException ex) { - Assert.assertEquals("Failed to send event. ", - ex.getMessage()); + Assert.assertEquals("Failed to send event. ", ex.getMessage()); } } @Test (expected = TimeoutException.class) public void testTimeout() throws Throwable { try { - src = new ThriftTestingSource(ThriftTestingSource.HandlerType.TIMEOUT - .name(), port, ThriftRpcClient.COMPACT_PROTOCOL); + src = new ThriftTestingSource(ThriftTestingSource.HandlerType.TIMEOUT.name(), port, + ThriftRpcClient.COMPACT_PROTOCOL); client = (ThriftRpcClient) RpcClientFactory.getThriftInstance(props); insertEvents(client, 2); //2 events } catch (EventDeliveryException ex) { @@ -174,10 +168,9 @@ public class TestThriftRpcClient { @Test public void testMultipleThreads() throws Throwable { - src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), - port, ThriftRpcClient.COMPACT_PROTOCOL); - client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0" + - ".0", port, 10); + src = new ThriftTestingSource(ThriftTestingSource.HandlerType.OK.name(), port, + ThriftRpcClient.COMPACT_PROTOCOL); + client = (ThriftRpcClient) RpcClientFactory.getThriftInstance("0.0.0" + ".0", port, 10); int threadCount = 100; ExecutorService submissionSvc = Executors.newFixedThreadPool(threadCount); ArrayList<Future<?>> futures = new ArrayList<Future<?>>(threadCount); @@ -194,18 +187,18 @@ public class TestThriftRpcClient { } })); } - for(int i = 0; i < threadCount; i++) { + for (int i = 0; i < threadCount; i++) { futures.get(i).get(); } ArrayList<String> events = new ArrayList<String>(); - for(Event e: src.flumeEvents) { + for (Event e : src.flumeEvents) { events.add(new String(e.getBody())); } int count = 0; Collections.sort(events); for (int i = 0; i < events.size();) { - for(int j = 0; j < threadCount; j++) { + for (int j = 0; j < threadCount; j++) { Assert.assertEquals(String.valueOf(count), events.get(i++)); } count++;
