http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java index a58f344..69acc06 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java @@ -18,25 +18,7 @@ */ package org.apache.flume.sink.elasticsearch; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNull; - -import java.io.IOException; -import java.util.Map; -import java.util.HashMap; -import java.util.TimeZone; -import java.util.concurrent.TimeUnit; import org.apache.commons.lang.time.FastDateFormat; - import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -56,6 +38,24 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; +import java.util.concurrent.TimeUnit; + +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { private ElasticSearchSink fixture; @@ -373,23 +373,22 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { public static final class CustomElasticSearchIndexRequestBuilderFactory extends AbstractElasticSearchIndexRequestBuilderFactory { - static String actualIndexName, actualIndexType; + static String actualIndexName; + static String actualIndexType; static byte[] actualEventBody; static boolean hasContext; public CustomElasticSearchIndexRequestBuilderFactory() { - super(FastDateFormat.getInstance("HH_mm_ss_SSS", - TimeZone.getTimeZone("EST5EDT"))); + super(FastDateFormat.getInstance("HH_mm_ss_SSS", TimeZone.getTimeZone("EST5EDT"))); } @Override - protected void prepareIndexRequest(IndexRequestBuilder indexRequest, - String indexName, String indexType, Event event) throws IOException { + protected void prepareIndexRequest(IndexRequestBuilder indexRequest, String indexName, + String indexType, Event event) throws IOException { actualIndexName = indexName; actualIndexType = indexType; actualEventBody = event.getBody(); - indexRequest.setIndex(indexName).setType(indexType) - .setSource(event.getBody()); + indexRequest.setIndex(indexName).setType(indexType).setSource(event.getBody()); } @Override @@ -458,7 +457,8 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { class FakeEventSerializer implements ElasticSearchEventSerializer { static final byte[] FAKE_BYTES = new byte[] { 9, 8, 7, 6 }; - boolean configuredWithContext, configuredWithComponentConfiguration; + boolean configuredWithContext; + boolean configuredWithComponentConfiguration; @Override public BytesStream getContentBuilder(Event event) throws IOException {
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java index b5a4d2f..2a36439 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java @@ -28,7 +28,7 @@ import org.junit.Test; public class TestElasticSearchSinkCreation { -private SinkFactory sinkFactory; + private SinkFactory sinkFactory; @Before public void setUp() { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java index 38e7399..0d1d092 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java @@ -17,10 +17,11 @@ package org.apache.flume.sink.elasticsearch.client; import java.util.Arrays; -import static org.junit.Assert.assertEquals; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; + public class RoundRobinListTest { private RoundRobinList<String> fixture; http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java index 4b70b65..c3f07b0 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java @@ -21,10 +21,10 @@ package org.apache.flume.sink.elasticsearch.client; import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer; import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertThat; -import org.mockito.Mock; import static org.mockito.MockitoAnnotations.initMocks; public class TestElasticSearchClientFactory { @@ -44,7 +44,7 @@ public class TestElasticSearchClientFactory { public void shouldReturnTransportClient() throws Exception { String[] hostNames = { "127.0.0.1" }; Object o = factory.getClient(ElasticSearchClientFactory.TransportClient, - hostNames, "test", serializer, null); + hostNames, "test", serializer, null); assertThat(o, instanceOf(ElasticSearchTransportClient.class)); } @@ -52,13 +52,13 @@ public class TestElasticSearchClientFactory { public void shouldReturnRestClient() throws NoSuchClientTypeException { String[] hostNames = { "127.0.0.1" }; Object o = factory.getClient(ElasticSearchClientFactory.RestClient, - hostNames, "test", serializer, null); + hostNames, "test", serializer, null); assertThat(o, instanceOf(ElasticSearchRestClient.class)); } - @Test(expected=NoSuchClientTypeException.class) + @Test(expected = NoSuchClientTypeException.class) public void shouldThrowNoSuchClientTypeException() throws NoSuchClientTypeException { - String[] hostNames = {"127.0.0.1"}; + String[] hostNames = { "127.0.0.1" }; factory.getClient("not_existing_client", hostNames, "test", null, null); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java index 1fe983a..9551c81 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java @@ -25,6 +25,15 @@ import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer; import org.apache.flume.sink.elasticsearch.IndexNameBuilder; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.StatusLine; +import org.apache.http.client.HttpClient; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.util.EntityUtils; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.BytesStream; import org.junit.Before; @@ -38,16 +47,12 @@ import java.util.List; import static junit.framework.Assert.assertEquals; import static junit.framework.Assert.assertTrue; -import org.apache.http.HttpEntity; -import org.apache.http.HttpResponse; -import org.apache.http.HttpStatus; -import org.apache.http.StatusLine; -import org.apache.http.client.HttpClient; -import org.apache.http.client.methods.HttpPost; -import org.apache.http.client.methods.HttpUriRequest; -import org.apache.http.util.EntityUtils; -import org.elasticsearch.common.bytes.BytesArray; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; public class TestElasticSearchRestClient { @@ -126,15 +131,16 @@ public class TestElasticSearchRestClient { verify(httpClient).execute(argument.capture()); assertEquals("http://host1/_bulk", argument.getValue().getURI().toString()); - assertTrue(verifyJsonEvents("{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\",\"_ttl\":\"123\"}}\n", - MESSAGE_CONTENT, EntityUtils.toString(argument.getValue().getEntity()))); + assertTrue(verifyJsonEvents( + "{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\",\"_ttl\":\"123\"}}\n", + MESSAGE_CONTENT, EntityUtils.toString(argument.getValue().getEntity()))); } private boolean verifyJsonEvents(String expectedIndex, String expectedBody, String actual) { Iterator<String> it = Splitter.on("\n").split(actual).iterator(); JsonParser parser = new JsonParser(); JsonObject[] arr = new JsonObject[2]; - for(int i = 0; i < 2; i++) { + for (int i = 0; i < 2; i++) { arr[i] = (JsonObject) parser.parse(it.next()); } return arr[0].equals(parser.parse(expectedIndex)) && arr[1].equals(parser.parse(expectedBody)); @@ -156,7 +162,8 @@ public class TestElasticSearchRestClient { public void shouldRetryBulkOperation() throws Exception { ArgumentCaptor<HttpPost> argument = ArgumentCaptor.forClass(HttpPost.class); - when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_OK); + when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR, + HttpStatus.SC_OK); when(httpResponse.getStatusLine()).thenReturn(httpStatus); when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java index b8aefe8..9a2be5a 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java @@ -37,6 +37,7 @@ public class IncrementAsyncHBaseSerializer implements AsyncHbaseEventSerializer private byte[] cf; private byte[] column; private Event currentEvent; + @Override public void initialize(byte[] table, byte[] cf) { this.table = table; @@ -55,10 +56,9 @@ public class IncrementAsyncHBaseSerializer implements AsyncHbaseEventSerializer @Override public List<AtomicIncrementRequest> getIncrements() { - List<AtomicIncrementRequest> incrs - = new ArrayList<AtomicIncrementRequest>(); + List<AtomicIncrementRequest> incrs = new ArrayList<AtomicIncrementRequest>(); AtomicIncrementRequest incr = new AtomicIncrementRequest(table, - currentEvent.getBody(), cf, column, 1); + currentEvent.getBody(), cf, column, 1); incrs.add(incr); return incrs; } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java index b4bbd6b..f8faa1e 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java @@ -176,7 +176,7 @@ public class TestAsyncHBaseSink { sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); - for(int i = 0; i < 3; i++){ + for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } @@ -189,9 +189,9 @@ public class TestAsyncHBaseSink { byte[][] results = getResults(table, 3); byte[] out; int found = 0; - for(int i = 0; i < 3; i++){ - for(int j = 0; j < 3; j++){ - if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){ + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { found++; break; } @@ -209,8 +209,7 @@ public class TestAsyncHBaseSink { public void testTimeOut() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); deleteTable = true; - AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), - true, false); + AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), true, false); Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); Configurables.configure(channel, ctx); @@ -219,7 +218,7 @@ public class TestAsyncHBaseSink { sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); - for(int i = 0; i < 3; i++){ + for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } @@ -245,7 +244,7 @@ public class TestAsyncHBaseSink { sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); - for(int i = 0; i < 3; i++){ + for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } @@ -253,7 +252,7 @@ public class TestAsyncHBaseSink { tx.close(); int count = 0; Status status = Status.READY; - while(status != Status.BACKOFF){ + while (status != Status.BACKOFF) { count++; status = sink.process(); } @@ -264,9 +263,9 @@ public class TestAsyncHBaseSink { byte[][] results = getResults(table, 3); byte[] out; int found = 0; - for(int i = 0; i < 3; i++){ - for(int j = 0; j < 3; j++){ - if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){ + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { found++; break; } @@ -278,26 +277,21 @@ public class TestAsyncHBaseSink { } @Test - public void testMultipleBatchesBatchIncrementsWithCoalescing() - throws Exception { + public void testMultipleBatchesBatchIncrementsWithCoalescing() throws Exception { doTestMultipleBatchesBatchIncrements(true); } @Test - public void testMultipleBatchesBatchIncrementsNoCoalescing() - throws Exception { + public void testMultipleBatchesBatchIncrementsNoCoalescing() throws Exception { doTestMultipleBatchesBatchIncrements(false); } - public void doTestMultipleBatchesBatchIncrements(boolean coalesce) throws - Exception { + public void doTestMultipleBatchesBatchIncrements(boolean coalesce) throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); deleteTable = true; - AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), - false, true); + AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), false, true); if (coalesce) { - ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, - "true"); + ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, "true"); } ctx.put("batchSize", "2"); ctx.put("serializer", IncrementAsyncHBaseSerializer.class.getName()); @@ -309,7 +303,7 @@ public class TestAsyncHBaseSink { ctx.put("serializer", SimpleAsyncHbaseEventSerializer.class.getName()); //Restore the no coalescing behavior ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, - "false"); + "false"); Channel channel = new MemoryChannel(); Configurables.configure(channel, ctx); sink.setChannel(channel); @@ -335,7 +329,7 @@ public class TestAsyncHBaseSink { Assert.assertEquals(7, count); HTable table = new HTable(testUtility.getConfiguration(), tableName); Scan scan = new Scan(); - scan.addColumn(columnFamily.getBytes(),"test".getBytes()); + scan.addColumn(columnFamily.getBytes(), "test".getBytes()); scan.setStartRow(Bytes.toBytes(valBase)); ResultScanner rs = table.getScanner(scan); int i = 0; @@ -358,19 +352,19 @@ public class TestAsyncHBaseSink { } @Test - public void testWithoutConfigurationObject() throws Exception{ + public void testWithoutConfigurationObject() throws Exception { testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); deleteTable = true; ctx.put("batchSize", "2"); ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, - ZKConfig.getZKQuorumServersString(testUtility.getConfiguration()) ); + ZKConfig.getZKQuorumServersString(testUtility.getConfiguration())); ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); + testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT)); AsyncHBaseSink sink = new AsyncHBaseSink(); Configurables.configure(sink, ctx); // Reset context to values usable by other tests. ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, null); - ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,null); + ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, null); ctx.put("batchSize", "100"); Channel channel = new MemoryChannel(); Configurables.configure(channel, ctx); @@ -378,7 +372,7 @@ public class TestAsyncHBaseSink { sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); - for(int i = 0; i < 3; i++){ + for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } @@ -386,7 +380,7 @@ public class TestAsyncHBaseSink { tx.close(); int count = 0; Status status = Status.READY; - while(status != Status.BACKOFF){ + while (status != Status.BACKOFF) { count++; status = sink.process(); } @@ -401,9 +395,9 @@ public class TestAsyncHBaseSink { byte[][] results = getResults(table, 3); byte[] out; int found = 0; - for(int i = 0; i < 3; i++){ - for(int j = 0; j < 3; j++){ - if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){ + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { found++; break; } @@ -428,7 +422,7 @@ public class TestAsyncHBaseSink { sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); - for(int i = 0; i < 3; i++){ + for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } @@ -440,9 +434,9 @@ public class TestAsyncHBaseSink { byte[][] results = getResults(table, 2); byte[] out; int found = 0; - for(int i = 0; i < 2; i++){ - for(int j = 0; j < 2; j++){ - if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){ + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { + if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { found++; break; } @@ -457,7 +451,7 @@ public class TestAsyncHBaseSink { // We only have support for getting File Descriptor count for Unix from the JDK private long getOpenFileDescriptorCount() { - if(os instanceof UnixOperatingSystemMXBean){ + if (os instanceof UnixOperatingSystemMXBean) { return ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount(); } else { return -1; @@ -476,13 +470,13 @@ public class TestAsyncHBaseSink { */ @Test public void testFDLeakOnShutdown() throws Exception { - if(getOpenFileDescriptorCount() < 0) { + if (getOpenFileDescriptorCount() < 0) { return; } testUtility.createTable(tableName.getBytes(), columnFamily.getBytes()); deleteTable = true; AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), - true, false); + true, false); ctx.put("maxConsecutiveFails", "1"); Configurables.configure(sink, ctx); Channel channel = new MemoryChannel(); @@ -492,7 +486,7 @@ public class TestAsyncHBaseSink { sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); - for(int i = 0; i < 3; i++){ + for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } @@ -503,7 +497,7 @@ public class TestAsyncHBaseSink { // Since the isTimeOutTest is set to true, transaction will fail // with EventDeliveryException - for(int i = 0; i < 10; i ++) { + for (int i = 0; i < 10; i++) { try { sink.process(); } catch (EventDeliveryException ex) { @@ -511,18 +505,20 @@ public class TestAsyncHBaseSink { } long increaseInFD = getOpenFileDescriptorCount() - initialFDCount; Assert.assertTrue("File Descriptor leak detected. FDs have increased by " + - increaseInFD + " from an initial FD count of " + initialFDCount, increaseInFD < 50); + increaseInFD + " from an initial FD count of " + initialFDCount, + increaseInFD < 50); } /** * This test must run last - it shuts down the minicluster :D + * * @throws Exception */ @Ignore("For dev builds only:" + - "This test takes too long, and this has to be run after all other" + - "tests, since it shuts down the minicluster. " + - "Comment out all other tests" + - "and uncomment this annotation to run this test.") + "This test takes too long, and this has to be run after all other" + + "tests, since it shuts down the minicluster. " + + "Comment out all other tests" + + "and uncomment this annotation to run this test.") @Test(expected = EventDeliveryException.class) public void testHBaseFailure() throws Exception { ctx.put("batchSize", "2"); @@ -538,7 +534,7 @@ public class TestAsyncHBaseSink { sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); - for(int i = 0; i < 3; i++){ + for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } @@ -550,9 +546,9 @@ public class TestAsyncHBaseSink { byte[][] results = getResults(table, 2); byte[] out; int found = 0; - for(int i = 0; i < 2; i++){ - for(int j = 0; j < 2; j++){ - if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){ + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { + if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { found++; break; } @@ -565,21 +561,23 @@ public class TestAsyncHBaseSink { sink.process(); sink.stop(); } + /** * Makes Hbase scans to get rows in the payload column and increment column * in the table given. Expensive, so tread lightly. * Calling this function multiple times for the same result set is a bad * idea. Cache the result set once it is returned by this function. + * * @param table * @param numEvents Number of events inserted into the table * @return * @throws IOException */ - private byte[][] getResults(HTable table, int numEvents) throws IOException{ - byte[][] results = new byte[numEvents+1][]; + private byte[][] getResults(HTable table, int numEvents) throws IOException { + byte[][] results = new byte[numEvents + 1][]; Scan scan = new Scan(); - scan.addColumn(columnFamily.getBytes(),plCol.getBytes()); - scan.setStartRow( Bytes.toBytes("default")); + scan.addColumn(columnFamily.getBytes(), plCol.getBytes()); + scan.setStartRow(Bytes.toBytes("default")); ResultScanner rs = table.getScanner(scan); byte[] out = null; int i = 0; @@ -587,10 +585,10 @@ public class TestAsyncHBaseSink { for (Result r = rs.next(); r != null; r = rs.next()) { out = r.getValue(columnFamily.getBytes(), plCol.getBytes()); - if(i >= results.length - 1){ + if (i >= results.length - 1) { rs.close(); throw new FlumeException("More results than expected in the table." + - "Expected = " + numEvents +". Found = " + i); + "Expected = " + numEvents + ". Found = " + i); } results[i++] = out; System.out.println(out); @@ -601,7 +599,7 @@ public class TestAsyncHBaseSink { Assert.assertEquals(i, results.length - 1); scan = new Scan(); - scan.addColumn(columnFamily.getBytes(),inColumn.getBytes()); + scan.addColumn(columnFamily.getBytes(), inColumn.getBytes()); scan.setStartRow(Bytes.toBytes("incRow")); rs = table.getScanner(scan); out = null; http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java index ab65a38..217913b 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java @@ -18,14 +18,6 @@ */ package org.apache.flume.sink.hbase; -import static org.mockito.Mockito.*; - -import java.io.IOException; -import java.lang.reflect.Method; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; import com.google.common.base.Charsets; import com.google.common.base.Throwables; import com.google.common.collect.Lists; @@ -54,14 +46,25 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; -import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + public class TestHBaseSink { private static final Logger logger = LoggerFactory.getLogger(TestHBaseSink.class); @@ -140,8 +143,7 @@ public class TestHBaseSink { sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); - Event e = EventBuilder.withBody( - Bytes.toBytes(valBase)); + Event e = EventBuilder.withBody(Bytes.toBytes(valBase)); channel.put(e); tx.commit(); tx.close(); @@ -195,7 +197,7 @@ public class TestHBaseSink { sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); - for(int i = 0; i < 3; i++){ + for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } @@ -207,9 +209,9 @@ public class TestHBaseSink { byte[][] results = getResults(table, 3); byte[] out; int found = 0; - for(int i = 0; i < 3; i++){ - for(int j = 0; j < 3; j++){ - if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){ + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { found++; break; } @@ -234,14 +236,14 @@ public class TestHBaseSink { sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); - for(int i = 0; i < 3; i++){ + for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } tx.commit(); tx.close(); int count = 0; - while(sink.process() != Status.BACKOFF){ + while (sink.process() != Status.BACKOFF) { count++; } sink.stop(); @@ -250,9 +252,9 @@ public class TestHBaseSink { byte[][] results = getResults(table, 3); byte[] out; int found = 0; - for(int i = 0; i < 3; i++){ - for(int j = 0; j < 3; j++){ - if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){ + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { found++; break; } @@ -284,7 +286,7 @@ public class TestHBaseSink { logger.info("Writing data into channel"); Transaction tx = channel.getTransaction(); tx.begin(); - for(int i = 0; i < 3; i++){ + for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } @@ -313,9 +315,9 @@ public class TestHBaseSink { byte[][] results = getResults(table, 2); byte[] out; int found = 0; - for(int i = 0; i < 2; i++){ - for(int j = 0; j < 2; j++){ - if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){ + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { + if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { found++; break; } @@ -328,15 +330,17 @@ public class TestHBaseSink { } // TODO: Move this test to a different class and run it stand-alone. + /** * This test must run last - it shuts down the minicluster :D + * * @throws Exception */ @Ignore("For dev builds only:" + - "This test takes too long, and this has to be run after all other" + - "tests, since it shuts down the minicluster. " + - "Comment out all other tests" + - "and uncomment this annotation to run this test.") + "This test takes too long, and this has to be run after all other" + + "tests, since it shuts down the minicluster. " + + "Comment out all other tests" + + "and uncomment this annotation to run this test.") @Test(expected = EventDeliveryException.class) public void testHBaseFailure() throws Exception { initContextForSimpleHbaseEventSerializer(); @@ -351,7 +355,7 @@ public class TestHBaseSink { sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); - for(int i = 0; i < 3; i++){ + for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } @@ -362,9 +366,9 @@ public class TestHBaseSink { byte[][] results = getResults(table, 2); byte[] out; int found = 0; - for(int i = 0; i < 2; i++){ - for(int j = 0; j < 2; j++){ - if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){ + for (int i = 0; i < 2; i++) { + for (int j = 0; j < 2; j++) { + if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { found++; break; } @@ -378,22 +382,22 @@ public class TestHBaseSink { sink.stop(); } - /** * Makes Hbase scans to get rows in the payload column and increment column * in the table given. Expensive, so tread lightly. * Calling this function multiple times for the same result set is a bad * idea. Cache the result set once it is returned by this function. + * * @param table * @param numEvents Number of events inserted into the table * @return * @throws IOException */ - private byte[][] getResults(HTable table, int numEvents) throws IOException{ - byte[][] results = new byte[numEvents+1][]; + private byte[][] getResults(HTable table, int numEvents) throws IOException { + byte[][] results = new byte[numEvents + 1][]; Scan scan = new Scan(); - scan.addColumn(columnFamily.getBytes(),plCol.getBytes()); - scan.setStartRow( Bytes.toBytes("default")); + scan.addColumn(columnFamily.getBytes(), plCol.getBytes()); + scan.setStartRow(Bytes.toBytes("default")); ResultScanner rs = table.getScanner(scan); byte[] out = null; int i = 0; @@ -401,10 +405,10 @@ public class TestHBaseSink { for (Result r = rs.next(); r != null; r = rs.next()) { out = r.getValue(columnFamily.getBytes(), plCol.getBytes()); - if(i >= results.length - 1){ + if (i >= results.length - 1) { rs.close(); throw new FlumeException("More results than expected in the table." + - "Expected = " + numEvents +". Found = " + i); + "Expected = " + numEvents + ". Found = " + i); } results[i++] = out; System.out.println(out); @@ -415,7 +419,7 @@ public class TestHBaseSink { Assert.assertEquals(i, results.length - 1); scan = new Scan(); - scan.addColumn(columnFamily.getBytes(),inColumn.getBytes()); + scan.addColumn(columnFamily.getBytes(), inColumn.getBytes()); scan.setStartRow(Bytes.toBytes("incRow")); rs = table.getScanner(scan); out = null; @@ -472,7 +476,7 @@ public class TestHBaseSink { initContextForSimpleHbaseEventSerializer(); ctx.put("batchSize", "1"); ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER, - "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer"); + "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer"); HBaseSink sink = new HBaseSink(conf); Configurables.configure(sink, ctx); @@ -507,16 +511,16 @@ public class TestHBaseSink { } @Test - public void testWithoutConfigurationObject() throws Exception{ + public void testWithoutConfigurationObject() throws Exception { initContextForSimpleHbaseEventSerializer(); Context tmpContext = new Context(ctx.getParameters()); tmpContext.put("batchSize", "2"); tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM, - ZKConfig.getZKQuorumServersString(conf) ); + ZKConfig.getZKQuorumServersString(conf)); System.out.print(ctx.getString(HBaseSinkConfigurationConstants.ZK_QUORUM)); tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); HBaseSink sink = new HBaseSink(); Configurables.configure(sink, tmpContext); @@ -526,14 +530,14 @@ public class TestHBaseSink { sink.start(); Transaction tx = channel.getTransaction(); tx.begin(); - for(int i = 0; i < 3; i++){ + for (int i = 0; i < 3; i++) { Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i)); channel.put(e); } tx.commit(); tx.close(); Status status = Status.READY; - while(status != Status.BACKOFF){ + while (status != Status.BACKOFF) { status = sink.process(); } sink.stop(); @@ -541,9 +545,9 @@ public class TestHBaseSink { byte[][] results = getResults(table, 3); byte[] out; int found = 0; - for(int i = 0; i < 3; i++){ - for(int j = 0; j < 3; j++){ - if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){ + for (int i = 0; i < 3; i++) { + for (int j = 0; j < 3; j++) { + if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) { found++; break; } @@ -555,37 +559,36 @@ public class TestHBaseSink { } @Test - public void testZKQuorum() throws Exception{ + public void testZKQuorum() throws Exception { initContextForSimpleHbaseEventSerializer(); Context tmpContext = new Context(ctx.getParameters()); String zkQuorum = "zk1.flume.apache.org:3342, zk2.flume.apache.org:3342, " + - "zk3.flume.apache.org:3342"; + "zk3.flume.apache.org:3342"; tmpContext.put("batchSize", "2"); tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum); tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); HBaseSink sink = new HBaseSink(); Configurables.configure(sink, tmpContext); Assert.assertEquals("zk1.flume.apache.org,zk2.flume.apache.org," + - "zk3.flume.apache.org", sink.getConfig().get(HConstants - .ZOOKEEPER_QUORUM)); - Assert.assertEquals(String.valueOf(3342), sink.getConfig().get(HConstants - .ZOOKEEPER_CLIENT_PORT)); + "zk3.flume.apache.org", sink.getConfig().get(HConstants.ZOOKEEPER_QUORUM)); + Assert.assertEquals(String.valueOf(3342), + sink.getConfig().get(HConstants.ZOOKEEPER_CLIENT_PORT)); } - @Test (expected = FlumeException.class) - public void testZKQuorumIncorrectPorts() throws Exception{ + @Test(expected = FlumeException.class) + public void testZKQuorumIncorrectPorts() throws Exception { initContextForSimpleHbaseEventSerializer(); Context tmpContext = new Context(ctx.getParameters()); String zkQuorum = "zk1.flume.apache.org:3345, zk2.flume.apache.org:3342, " + - "zk3.flume.apache.org:3342"; + "zk3.flume.apache.org:3342"; tmpContext.put("batchSize", "2"); tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum); tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, - conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, - HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); + conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT, + HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT)); HBaseSink sink = new HBaseSink(); Configurables.configure(sink, tmpContext); Assert.fail(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java index b102b49..24bcf37 100644 --- a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java +++ b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java @@ -18,17 +18,7 @@ */ package org.apache.flume.sink.hbase; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.nio.charset.Charset; -import java.util.Calendar; -import java.util.List; -import java.util.Map; - +import com.google.common.collect.Maps; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; @@ -39,7 +29,16 @@ import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.util.Bytes; import org.junit.Test; -import com.google.common.collect.Maps; +import java.nio.charset.Charset; +import java.util.Calendar; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestRegexHbaseEventSerializer { @@ -76,8 +75,7 @@ public class TestRegexHbaseEventSerializer { public void testRowIndexKey() throws Exception { RegexHbaseEventSerializer s = new RegexHbaseEventSerializer(); Context context = new Context(); - context.put(RegexHbaseEventSerializer.REGEX_CONFIG,"^([^\t]+)\t([^\t]+)\t" + - "([^\t]+)$"); + context.put(RegexHbaseEventSerializer.REGEX_CONFIG,"^([^\t]+)\t([^\t]+)\t" + "([^\t]+)$"); context.put(RegexHbaseEventSerializer.COL_NAME_CONFIG, "col1,col2,ROW_KEY"); context.put("rowKeyIndex", "2"); s.configure(context); @@ -115,9 +113,9 @@ public class TestRegexHbaseEventSerializer { "referer,agent"); s.configure(context); String logMsg = "33.22.11.00 - - [20/May/2011:07:01:19 +0000] " + - "\"GET /wp-admin/css/install.css HTTP/1.0\" 200 813 " + - "\"http://www.cloudera.com/wp-admin/install.php\" \"Mozilla/5.0 (comp" + - "atible; Yahoo! Slurp; http://help.yahoo.com/help/us/ysearch/slurp)\""; + "\"GET /wp-admin/css/install.css HTTP/1.0\" 200 813 " + + "\"http://www.cloudera.com/wp-admin/install.php\" \"Mozilla/5.0 (comp" + + "atible; Yahoo! Slurp; http://help.yahoo.com/help/us/ysearch/slurp)\""; Event e = EventBuilder.withBody(Bytes.toBytes(logMsg)); s.initialize(e, "CF".getBytes()); @@ -189,7 +187,7 @@ public class TestRegexHbaseEventSerializer { } - @Test + @Test /** Test depositing of the header information. */ public void testDepositHeaders() throws Exception { Charset charset = Charset.forName("KOI8-R"); @@ -222,7 +220,8 @@ public class TestRegexHbaseEventSerializer { resultMap.put(new String(kv.getQualifier(), charset), kv.getValue()); } - assertEquals(body, new String(resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT), charset)); + assertEquals(body, + new String(resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT), charset)); assertEquals("value1", new String(resultMap.get("header1"), charset)); assertArrayEquals("знаÑение2".getBytes(charset), resultMap.get("заголовок2")); assertEquals("знаÑение2".length(), resultMap.get("заголовок2").length); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java index f577e98..76eca37 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java @@ -18,8 +18,8 @@ package org.apache.flume.sink.kafka; +import com.google.common.base.Charsets; import kafka.message.MessageAndMetadata; - import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.specific.SpecificDatumReader; @@ -41,8 +41,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import com.google.common.base.Charsets; - import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.UnsupportedEncodingException; @@ -52,12 +50,21 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.AVRO_EVENT; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.BROKER_LIST_FLUME_KEY; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PREFIX; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY; +import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; -import static org.apache.flume.sink.kafka.KafkaSinkConstants.*; - /** * Unit tests for Kafka Sink */ @@ -86,40 +93,45 @@ public class TestKafkaSink { KafkaSink kafkaSink = new KafkaSink(); Context context = new Context(); context.put(KAFKA_PREFIX + TOPIC_CONFIG, ""); - context.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "override.default.serializer"); + context.put(KAFKA_PRODUCER_PREFIX + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "override.default.serializer"); context.put("kafka.producer.fake.property", "kafka.property.value"); context.put("kafka.bootstrap.servers", "localhost:9092,localhost:9092"); - context.put("brokerList","real-broker-list"); - Configurables.configure(kafkaSink,context); + context.put("brokerList", "real-broker-list"); + Configurables.configure(kafkaSink, context); Properties kafkaProps = kafkaSink.getKafkaProps(); //check that we have defaults set - assertEquals( - kafkaProps.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), DEFAULT_KEY_SERIALIZER); + assertEquals(kafkaProps.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), + DEFAULT_KEY_SERIALIZER); //check that kafka properties override the default and get correct name - assertEquals(kafkaProps.getProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), "override.default.serializer"); + assertEquals(kafkaProps.getProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), + "override.default.serializer"); //check that any kafka-producer property gets in - assertEquals(kafkaProps.getProperty("fake.property"), "kafka.property.value"); + assertEquals(kafkaProps.getProperty("fake.property"), + "kafka.property.value"); //check that documented property overrides defaults - assertEquals(kafkaProps.getProperty("bootstrap.servers") ,"localhost:9092,localhost:9092"); + assertEquals(kafkaProps.getProperty("bootstrap.servers"), + "localhost:9092,localhost:9092"); } @Test public void testOldProperties() { KafkaSink kafkaSink = new KafkaSink(); Context context = new Context(); - context.put("topic","test-topic"); + context.put("topic", "test-topic"); context.put(OLD_BATCH_SIZE, "300"); - context.put(BROKER_LIST_FLUME_KEY,"localhost:9092,localhost:9092"); + context.put(BROKER_LIST_FLUME_KEY, "localhost:9092,localhost:9092"); context.put(REQUIRED_ACKS_FLUME_KEY, "all"); - Configurables.configure(kafkaSink,context); + Configurables.configure(kafkaSink, context); Properties kafkaProps = kafkaSink.getKafkaProps(); assertEquals(kafkaSink.getTopic(), "test-topic"); - assertEquals(kafkaSink.getBatchSize(),300); - assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),"localhost:9092,localhost:9092"); + assertEquals(kafkaSink.getBatchSize(), 300); + assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), + "localhost:9092,localhost:9092"); assertEquals(kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG), "all"); } @@ -151,9 +163,8 @@ public class TestKafkaSink { // ignore } - String fetchedMsg = new String((byte[]) - testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC) - .message()); + String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC) + .message()); assertEquals(msg, fetchedMsg); } @@ -173,14 +184,13 @@ public class TestKafkaSink { // ignore } - String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer(TestConstants.STATIC_TOPIC).message()); + String fetchedMsg = new String((byte[]) testUtil.getNextMessageFromConsumer( + TestConstants.STATIC_TOPIC).message()); assertEquals(msg, fetchedMsg); } @Test public void testTopicAndKeyFromHeader() throws UnsupportedEncodingException { - - Sink kafkaSink = new KafkaSink(); Context context = prepareDefaultContext(); Configurables.configure(kafkaSink, context); @@ -210,19 +220,16 @@ public class TestKafkaSink { } MessageAndMetadata fetchedMsg = - testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC); + testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC); assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8")); assertEquals(TestConstants.CUSTOM_KEY, - new String((byte[]) fetchedMsg.key(), "UTF-8")); - + new String((byte[]) fetchedMsg.key(), "UTF-8")); } @SuppressWarnings("rawtypes") @Test public void testAvroEvent() throws IOException { - - Sink kafkaSink = new KafkaSink(); Context context = prepareDefaultContext(); context.put(AVRO_EVENT, "true"); @@ -254,13 +261,12 @@ public class TestKafkaSink { // ignore } - MessageAndMetadata fetchedMsg = - testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC); + MessageAndMetadata fetchedMsg = testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC); - ByteArrayInputStream in = - new ByteArrayInputStream((byte[])fetchedMsg.message()); + ByteArrayInputStream in = new ByteArrayInputStream((byte[]) fetchedMsg.message()); BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null); - SpecificDatumReader<AvroFlumeEvent> reader = new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class); + SpecificDatumReader<AvroFlumeEvent> reader = + new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class); AvroFlumeEvent avroevent = reader.read(null, decoder); @@ -268,17 +274,15 @@ public class TestKafkaSink { Map<CharSequence, CharSequence> eventHeaders = avroevent.getHeaders(); assertEquals(msg, eventBody); - assertEquals(TestConstants.CUSTOM_KEY, - new String((byte[]) fetchedMsg.key(), "UTF-8")); + assertEquals(TestConstants.CUSTOM_KEY, new String((byte[]) fetchedMsg.key(), "UTF-8")); - assertEquals(TestConstants.HEADER_1_VALUE, eventHeaders.get(new Utf8(TestConstants.HEADER_1_KEY)).toString()); + assertEquals(TestConstants.HEADER_1_VALUE, + eventHeaders.get(new Utf8(TestConstants.HEADER_1_KEY)).toString()); assertEquals(TestConstants.CUSTOM_KEY, eventHeaders.get(new Utf8("key")).toString()); - } @Test - public void testEmptyChannel() throws UnsupportedEncodingException, - EventDeliveryException { + public void testEmptyChannel() throws UnsupportedEncodingException, EventDeliveryException { Sink kafkaSink = new KafkaSink(); Context context = prepareDefaultContext(); Configurables.configure(kafkaSink, context); @@ -291,8 +295,7 @@ public class TestKafkaSink { if (status != Sink.Status.BACKOFF) { fail("Error Occurred"); } - assertNull( - testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC)); + assertNull(testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC)); } private Context prepareDefaultContext() { @@ -304,7 +307,7 @@ public class TestKafkaSink { } private Sink.Status prepareAndSend(Context context, String msg) - throws EventDeliveryException { + throws EventDeliveryException { Sink kafkaSink = new KafkaSink(); Configurables.configure(kafkaSink, context); Channel memoryChannel = new MemoryChannel(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java index d8a45ef..6d89bd3 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java @@ -1,19 +1,19 @@ /** - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * limitations under the License. */ package org.apache.flume.sink.kafka.util; @@ -30,23 +30,22 @@ import java.util.Properties; */ public class KafkaLocal { - public KafkaServerStartable kafka; - public ZooKeeperLocal zookeeper; + public KafkaServerStartable kafka; + public ZooKeeperLocal zookeeper; - public KafkaLocal(Properties kafkaProperties) throws IOException, - InterruptedException{ - KafkaConfig kafkaConfig = KafkaConfig.fromProps(kafkaProperties); + public KafkaLocal(Properties kafkaProperties) throws IOException, InterruptedException { + KafkaConfig kafkaConfig = KafkaConfig.fromProps(kafkaProperties); - //start local kafka broker - kafka = new KafkaServerStartable(kafkaConfig); - } + // start local kafka broker + kafka = new KafkaServerStartable(kafkaConfig); + } - public void start() throws Exception{ - kafka.startup(); - } + public void start() throws Exception { + kafka.startup(); + } - public void stop(){ - kafka.shutdown(); - } + public void stop() { + kafka.shutdown(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java index 1a5728f..35c1e47 100644 --- a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java +++ b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java @@ -1,19 +1,19 @@ /** - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * limitations under the License. */ package org.apache.flume.sink.kafka.util; @@ -33,30 +33,29 @@ import java.util.Properties; */ public class ZooKeeperLocal { - private static final Logger logger = - LoggerFactory.getLogger(ZooKeeperLocal.class); - private ZooKeeperServerMain zooKeeperServer; + private static final Logger logger = LoggerFactory.getLogger(ZooKeeperLocal.class); + private ZooKeeperServerMain zooKeeperServer; - public ZooKeeperLocal(Properties zkProperties) throws IOException{ - QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); + public ZooKeeperLocal(Properties zkProperties) throws IOException { + QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig(); + try { + quorumConfiguration.parseProperties(zkProperties); + } catch (Exception e) { + throw new RuntimeException(e); + } + + zooKeeperServer = new ZooKeeperServerMain(); + final ServerConfig configuration = new ServerConfig(); + configuration.readFrom(quorumConfiguration); + + new Thread() { + public void run() { try { - quorumConfiguration.parseProperties(zkProperties); - } catch(Exception e) { - throw new RuntimeException(e); + zooKeeperServer.runFromConfig(configuration); + } catch (IOException e) { + logger.error("Zookeeper startup failed.", e); } - - zooKeeperServer = new ZooKeeperServerMain(); - final ServerConfig configuration = new ServerConfig(); - configuration.readFrom(quorumConfiguration); - - new Thread() { - public void run() { - try { - zooKeeperServer.runFromConfig(configuration); - } catch (IOException e) { - logger.error("Zookeeper startup failed.", e); - } - } - }.start(); - } + } + }.start(); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java index 6172c68..be377ba 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java @@ -16,9 +16,7 @@ */ package org.apache.flume.sink.solr.morphline; -import java.io.IOException; -import java.util.List; - +import com.google.common.base.Charsets; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.serialization.EventDeserializer; @@ -28,7 +26,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.google.common.base.Charsets; +import java.io.IOException; +import java.util.List; public class TestBlobDeserializer extends Assert { @@ -61,7 +60,8 @@ public class TestBlobDeserializer extends Assert { public void testSimpleViaFactory() throws IOException { ResettableInputStream in = new ResettableTestStringInputStream(mini); EventDeserializer des; - des = EventDeserializerFactory.getInstance(BlobDeserializer.Builder.class.getName(), new Context(), in); + des = EventDeserializerFactory.getInstance(BlobDeserializer.Builder.class.getName(), + new Context(), in); validateMiniParse(des); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineInterceptor.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineInterceptor.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineInterceptor.java index 22cfe96..8d62d38 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineInterceptor.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineInterceptor.java @@ -16,22 +16,21 @@ */ package org.apache.flume.sink.solr.morphline; -import java.io.File; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import com.google.common.base.Charsets; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Files; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; import org.junit.Assert; import org.junit.Test; - import org.kitesdk.morphline.base.Fields; -import com.google.common.base.Charsets; -import com.google.common.collect.ImmutableMap; -import com.google.common.io.Files; + +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class TestMorphlineInterceptor extends Assert { @@ -40,13 +39,15 @@ public class TestMorphlineInterceptor extends Assert { @Test public void testNoOperation() throws Exception { Context context = new Context(); - context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, RESOURCES_DIR + "/test-morphlines/noOperation.conf"); + context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, + RESOURCES_DIR + "/test-morphlines/noOperation.conf"); Event input = EventBuilder.withBody("foo", Charsets.UTF_8); input.getHeaders().put("name", "nadja"); MorphlineInterceptor interceptor = build(context); Event actual = interceptor.intercept(input); interceptor.close(); - Event expected = EventBuilder.withBody("foo".getBytes("UTF-8"), ImmutableMap.of("name", "nadja")); + Event expected = EventBuilder.withBody("foo".getBytes("UTF-8"), + ImmutableMap.of("name", "nadja")); assertEqualsEvent(expected, actual); List<Event> actualList = build(context).intercept(Collections.singletonList(input)); @@ -57,11 +58,13 @@ public class TestMorphlineInterceptor extends Assert { @Test public void testReadClob() throws Exception { Context context = new Context(); - context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, RESOURCES_DIR + "/test-morphlines/readClob.conf"); + context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, + RESOURCES_DIR + "/test-morphlines/readClob.conf"); Event input = EventBuilder.withBody("foo", Charsets.UTF_8); input.getHeaders().put("name", "nadja"); Event actual = build(context).intercept(input); - Event expected = EventBuilder.withBody(null, ImmutableMap.of("name", "nadja", Fields.MESSAGE, "foo")); + Event expected = EventBuilder.withBody(null, + ImmutableMap.of("name", "nadja", Fields.MESSAGE, "foo")); assertEqualsEvent(expected, actual); List<Event> actualList = build(context).intercept(Collections.singletonList(input)); @@ -72,7 +75,8 @@ public class TestMorphlineInterceptor extends Assert { @Test public void testGrokIfNotMatchDropEventRetain() throws Exception { Context context = new Context(); - context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, RESOURCES_DIR + "/test-morphlines/grokIfNotMatchDropRecord.conf"); + context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, + RESOURCES_DIR + "/test-morphlines/grokIfNotMatchDropRecord.conf"); String msg = "<164>Feb 4 10:46:14 syslog sshd[607]: Server listening on 0.0.0.0 port 22."; Event input = EventBuilder.withBody(null, ImmutableMap.of(Fields.MESSAGE, msg)); @@ -94,8 +98,10 @@ public class TestMorphlineInterceptor extends Assert { /* leading XXXXX does not match regex, thus we expect the event to be dropped */ public void testGrokIfNotMatchDropEventDrop() throws Exception { Context context = new Context(); - context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, RESOURCES_DIR + "/test-morphlines/grokIfNotMatchDropRecord.conf"); - String msg = "<XXXXXXXXXXXXX164>Feb 4 10:46:14 syslog sshd[607]: Server listening on 0.0.0.0 port 22."; + context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, + RESOURCES_DIR + "/test-morphlines/grokIfNotMatchDropRecord.conf"); + String msg = "<XXXXXXXXXXXXX164>Feb 4 10:46:14 syslog sshd[607]: Server listening on 0.0.0.0" + + " port 22."; Event input = EventBuilder.withBody(null, ImmutableMap.of(Fields.MESSAGE, msg)); Event actual = build(context).intercept(input); assertNull(actual); @@ -105,10 +111,12 @@ public class TestMorphlineInterceptor extends Assert { /** morphline says route to southpole if it's an avro file, otherwise route to northpole */ public void testIfDetectMimeTypeRouteToSouthPole() throws Exception { Context context = new Context(); - context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, RESOURCES_DIR + "/test-morphlines/ifDetectMimeType.conf"); + context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, + RESOURCES_DIR + "/test-morphlines/ifDetectMimeType.conf"); context.put(MorphlineHandlerImpl.MORPHLINE_VARIABLE_PARAM + ".MY.MIME_TYPE", "avro/binary"); - Event input = EventBuilder.withBody(Files.toByteArray(new File(RESOURCES_DIR + "/test-documents/sample-statuses-20120906-141433.avro"))); + Event input = EventBuilder.withBody(Files.toByteArray( + new File(RESOURCES_DIR + "/test-documents/sample-statuses-20120906-141433.avro"))); Event actual = build(context).intercept(input); Map<String, String> expected = new HashMap(); @@ -122,10 +130,12 @@ public class TestMorphlineInterceptor extends Assert { /** morphline says route to southpole if it's an avro file, otherwise route to northpole */ public void testIfDetectMimeTypeRouteToNorthPole() throws Exception { Context context = new Context(); - context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, RESOURCES_DIR + "/test-morphlines/ifDetectMimeType.conf"); + context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, + RESOURCES_DIR + "/test-morphlines/ifDetectMimeType.conf"); context.put(MorphlineHandlerImpl.MORPHLINE_VARIABLE_PARAM + ".MY.MIME_TYPE", "avro/binary"); - Event input = EventBuilder.withBody(Files.toByteArray(new File(RESOURCES_DIR + "/test-documents/testPDF.pdf"))); + Event input = EventBuilder.withBody( + Files.toByteArray(new File(RESOURCES_DIR + "/test-documents/testPDF.pdf"))); Event actual = build(context).intercept(input); Map<String, String> expected = new HashMap(); @@ -140,8 +150,9 @@ public class TestMorphlineInterceptor extends Assert { builder.configure(context); return builder.build(); } - - private void assertEqualsEvent(Event x, Event y) { // b/c SimpleEvent doesn't implement equals() method :-( + + // b/c SimpleEvent doesn't implement equals() method :-( + private void assertEqualsEvent(Event x, Event y) { assertEquals(x.getHeaders(), y.getHeaders()); assertArrayEquals(x.getBody(), y.getBody()); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java index 232c092..1bfae95 100644 --- a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java +++ b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java @@ -87,8 +87,7 @@ public class TestMorphlineSolrSink extends SolrTestCaseJ4 { initCore( RESOURCES_DIR + "/solr/collection1/conf/solrconfig.xml", RESOURCES_DIR + "/solr/collection1/conf/schema.xml", - RESOURCES_DIR + "/solr" - ); + RESOURCES_DIR + "/solr"); } @Before @@ -139,9 +138,9 @@ public class TestMorphlineSolrSink extends SolrTestCaseJ4 { int batchSize = SEQ_NUM2.incrementAndGet() % 2 == 0 ? 100 : 1; DocumentLoader testServer = new SolrServerDocumentLoader(solrServer, batchSize); MorphlineContext solrMorphlineContext = new SolrMorphlineContext.Builder() - .setDocumentLoader(testServer) - .setExceptionHandler(new FaultTolerance(false, false, SolrServerException.class.getName())) - .setMetricRegistry(new MetricRegistry()).build(); + .setDocumentLoader(testServer) + .setExceptionHandler(new FaultTolerance(false, false, SolrServerException.class.getName())) + .setMetricRegistry(new MetricRegistry()).build(); MorphlineHandlerImpl impl = new MorphlineHandlerImpl(); impl.setMorphlineContext(solrMorphlineContext); @@ -302,9 +301,11 @@ public class TestMorphlineSolrSink extends SolrTestCaseJ4 { QueryResponse rsp = query("*:*"); Iterator<SolrDocument> iter = rsp.getResults().iterator(); ListMultimap<String, String> expectedFieldValues; - expectedFieldValues = ImmutableListMultimap.of("id", "1234567890", "text", "sample tweet one", "user_screen_name", "fake_user1"); + expectedFieldValues = ImmutableListMultimap.of("id", "1234567890", "text", "sample tweet one", + "user_screen_name", "fake_user1"); assertEquals(expectedFieldValues, next(iter)); - expectedFieldValues = ImmutableListMultimap.of("id", "2345678901", "text", "sample tweet two", "user_screen_name", "fake_user2"); + expectedFieldValues = ImmutableListMultimap.of("id", "2345678901", "text", "sample tweet two", + "user_screen_name", "fake_user2"); assertEquals(expectedFieldValues, next(iter)); assertFalse(iter.hasNext()); } @@ -398,8 +399,8 @@ public class TestMorphlineSolrSink extends SolrTestCaseJ4 { float secs = (System.currentTimeMillis() - startTime) / 1000.0f; long numDocs = queryResultSetSize("*:*"); - LOGGER.info("Took secs: " + secs + ", iters/sec: " + (iters/secs)); - LOGGER.info("Took secs: " + secs + ", docs/sec: " + (numDocs/secs)); + LOGGER.info("Took secs: " + secs + ", iters/sec: " + (iters / secs)); + LOGGER.info("Took secs: " + secs + ", docs/sec: " + (numDocs / secs)); LOGGER.info("Iterations: " + iters + ", numDocs: " + numDocs); LOGGER.info("sink: ", sink); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java index 6881967..b8466f7 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java @@ -92,13 +92,10 @@ public abstract class JMSMessageConsumerTestBase { } }); when(message.getText()).thenReturn(TEXT); - when(connectionFactory.createConnection(USERNAME, PASSWORD)). - thenReturn(connection); - when(connection.createSession(true, Session.SESSION_TRANSACTED)). - thenReturn(session); + when(connectionFactory.createConnection(USERNAME, PASSWORD)).thenReturn(connection); + when(connection.createSession(true, Session.SESSION_TRANSACTED)).thenReturn(session); when(session.createQueue(destinationName)).thenReturn(queue); - when(session.createConsumer(any(Destination.class), anyString())) - .thenReturn(messageConsumer); + when(session.createConsumer(any(Destination.class), anyString())).thenReturn(messageConsumer); when(messageConsumer.receiveNoWait()).thenReturn(message); when(messageConsumer.receive(anyLong())).thenReturn(message); destinationName = DESTINATION_NAME; @@ -127,7 +124,7 @@ public abstract class JMSMessageConsumerTestBase { } void assertBodyIsExpected(List<Event> events) { - for(Event event : events) { + for (Event event : events) { assertEquals(TEXT, new String(event.getBody(), Charsets.UTF_8)); } } @@ -140,7 +137,7 @@ public abstract class JMSMessageConsumerTestBase { @After public void tearDown() throws Exception { beforeTearDown(); - if(consumer != null) { + if (consumer != null) { consumer.close(); } afterTearDown(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestDefaultJMSMessageConverter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestDefaultJMSMessageConverter.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestDefaultJMSMessageConverter.java index 8d413f7..0b2193c 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestDefaultJMSMessageConverter.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestDefaultJMSMessageConverter.java @@ -73,7 +73,7 @@ public class TestDefaultJMSMessageConverter { @Override public Integer answer(InvocationOnMock invocation) throws Throwable { byte[] buffer = (byte[])invocation.getArguments()[0]; - if(buffer != null) { + if (buffer != null) { assertEquals(buffer.length, BYTES.length); System.arraycopy(BYTES, 0, buffer, 0, BYTES.length); } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java index e28e02a..53cc09a 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java @@ -56,9 +56,10 @@ import com.google.common.io.Files; public class TestIntegrationActiveMQ { - private final static String INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory"; + private static final String INITIAL_CONTEXT_FACTORY = + "org.apache.activemq.jndi.ActiveMQInitialContextFactory"; public static final String BROKER_BIND_URL = "tcp://localhost:61516"; - private final static String DESTINATION_NAME = "test"; + private static final String DESTINATION_NAME = "test"; private static final String USERNAME = "user"; private static final String PASSWORD = "pass"; // specific for dynamic queues on ActiveMq @@ -115,15 +116,14 @@ public class TestIntegrationActiveMQ { } }).when(channelProcessor).processEventBatch(any(List.class)); source.setChannelProcessor(channelProcessor); - - } + @After public void tearDown() throws Exception { - if(source != null) { + if (source != null) { source.stop(); } - if(broker != null) { + if (broker != null) { broker.stop(); } FileUtils.deleteDirectory(baseDir); @@ -140,8 +140,7 @@ public class TestIntegrationActiveMQ { Destination destination = session.createQueue(DESTINATION_NAME); MessageProducer producer = session.createProducer(destination); - - for(String event : events) { + for (String event : events) { TextMessage message = session.createTextMessage(); message.setText(event); producer.send(message); @@ -162,8 +161,7 @@ public class TestIntegrationActiveMQ { Destination destination = session.createTopic(DESTINATION_NAME); MessageProducer producer = session.createProducer(destination); - - for(String event : events) { + for (String event : events) { TextMessage message = session.createTextMessage(); message.setText(event); producer.send(message); @@ -202,13 +200,14 @@ public class TestIntegrationActiveMQ { Assert.assertEquals(Status.BACKOFF, source.process()); Assert.assertEquals(expected.size(), events.size()); List<String> actual = Lists.newArrayList(); - for(Event event : events) { + for (Event event : events) { actual.add(new String(event.getBody(), Charsets.UTF_8)); } Collections.sort(expected); Collections.sort(actual); Assert.assertEquals(expected, actual); } + @Test public void testTopic() throws Exception { context.put(JMSSourceConfiguration.DESTINATION_TYPE, @@ -229,7 +228,7 @@ public class TestIntegrationActiveMQ { Assert.assertEquals(Status.BACKOFF, source.process()); Assert.assertEquals(expected.size(), events.size()); List<String> actual = Lists.newArrayList(); - for(Event event : events) { + for (Event event : events) { actual.add(new String(event.getBody(), Charsets.UTF_8)); } Collections.sort(expected); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java index 9bace82..dcb47d9 100644 --- a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java +++ b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java @@ -105,8 +105,7 @@ public class TestJMSMessageConsumer extends JMSMessageConsumerTestBase { @Test public void testNoUserPass() throws Exception { userName = Optional.absent(); - when(connectionFactory.createConnection(USERNAME, PASSWORD)). - thenThrow(new AssertionError()); + when(connectionFactory.createConnection(USERNAME, PASSWORD)).thenThrow(new AssertionError()); when(connectionFactory.createConnection()).thenReturn(connection); consumer = create(); List<Event> events = consumer.take();
