http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java b/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java index 70d2c1b..724f093 100644 --- a/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java +++ b/flume-ng-sdk/src/test/java/org/apache/flume/api/ThriftTestingSource.java @@ -67,8 +67,7 @@ public class ThriftTestingSource { @Override public Status append(ThriftFlumeEvent event) throws TException { - flumeEvents.add(EventBuilder.withBody(event.getBody(), - event.getHeaders())); + flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders())); individualCount++; return Status.OK; } @@ -80,8 +79,7 @@ public class ThriftTestingSource { incompleteBatches++; } for (ThriftFlumeEvent event : events) { - flumeEvents.add(EventBuilder.withBody(event.getBody(), - event.getHeaders())); + flumeEvents.add(EventBuilder.withBody(event.getBody(), event.getHeaders())); } return Status.OK; } @@ -175,8 +173,7 @@ public class ThriftTestingSource { } @Override - public Status appendBatch(List<ThriftFlumeEvent> events) throws - TException { + public Status appendBatch(List<ThriftFlumeEvent> events) throws TException { try { if (delay != null) { TimeUnit.MILLISECONDS.sleep(delay.get()); @@ -207,8 +204,8 @@ public class ThriftTestingSource { } public ThriftTestingSource(String handlerName, int port, String protocol) throws Exception { - TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(new - InetSocketAddress("0.0.0.0", port)); + TNonblockingServerTransport serverTransport = + new TNonblockingServerSocket(new InetSocketAddress("0.0.0.0", port)); ThriftSourceProtocol.Iface handler = getHandler(handlerName); TProtocolFactory transportProtocolFactory = null; @@ -217,10 +214,9 @@ public class ThriftTestingSource { } else { transportProtocolFactory = new TCompactProtocol.Factory(); } - server = new THsHaServer(new THsHaServer.Args - (serverTransport).processor( - new ThriftSourceProtocol.Processor(handler)).protocolFactory( - transportProtocolFactory)); + server = new THsHaServer(new THsHaServer.Args(serverTransport).processor( + new ThriftSourceProtocol.Processor(handler)).protocolFactory( + transportProtocolFactory)); Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() { @@ -260,10 +256,8 @@ public class ThriftTestingSource { args.protocolFactory(transportProtocolFactory); args.inputTransportFactory(new TFastFramedTransport.Factory()); args.outputTransportFactory(new TFastFramedTransport.Factory()); - args.processor(new ThriftSourceProtocol - .Processor<ThriftSourceProtocol.Iface>(handler)); - server = (TServer) serverClass.getConstructor(argsClass).newInstance - (args); + args.processor(new ThriftSourceProtocol.Processor<ThriftSourceProtocol.Iface>(handler)); + server = (TServer) serverClass.getConstructor(argsClass).newInstance(args); Executors.newSingleThreadExecutor().submit(new Runnable() { @Override public void run() { @@ -285,5 +279,4 @@ public class ThriftTestingSource { server.stop(); } - }
http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java index 621920d..3709577 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java @@ -18,27 +18,12 @@ package org.apache.flume.sink.kite; -import org.apache.flume.sink.kite.parser.EntityParser; -import org.apache.flume.sink.kite.policy.FailurePolicy; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import javax.annotation.Nullable; import org.apache.avro.Schema; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; @@ -57,6 +42,8 @@ import org.apache.flume.Transaction; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.SimpleEvent; +import org.apache.flume.sink.kite.parser.EntityParser; +import org.apache.flume.sink.kite.policy.FailurePolicy; import org.apache.flume.source.avro.AvroFlumeEvent; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -74,7 +61,28 @@ import org.kitesdk.data.DatasetWriter; import org.kitesdk.data.Datasets; import org.kitesdk.data.PartitionStrategy; import org.kitesdk.data.View; -import static org.mockito.Mockito.*; + +import javax.annotation.Nullable; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class TestDatasetSink { @@ -206,7 +214,8 @@ public class TestDatasetSink { } @Test - public void testFileStore() throws EventDeliveryException, NonRecoverableEventException, NonRecoverableEventException { + public void testFileStore() + throws EventDeliveryException, NonRecoverableEventException, NonRecoverableEventException { DatasetSink sink = sink(in, config); // run the sink @@ -307,31 +316,28 @@ public class TestDatasetSink { sink.process(); sink.stop(); - Assert.assertEquals( - Sets.newHashSet(expected), - read(Datasets.load(FILE_DATASET_URI))); + Assert.assertEquals(Sets.newHashSet(expected), read(Datasets.load(FILE_DATASET_URI))); Assert.assertEquals("Should have committed", 0, remaining(in)); } @Test public void testDatasetUpdate() throws EventDeliveryException { // add an updated record that is missing the msg field - GenericRecordBuilder updatedBuilder = new GenericRecordBuilder( - UPDATED_SCHEMA); + GenericRecordBuilder updatedBuilder = new GenericRecordBuilder(UPDATED_SCHEMA); GenericData.Record updatedRecord = updatedBuilder - .set("id", "0") - .set("priority", 1) - .set("msg", "Priority 1 message!") - .build(); + .set("id", "0") + .set("priority", 1) + .set("msg", "Priority 1 message!") + .build(); // make a set of the expected records with the new schema Set<GenericRecord> expectedAsUpdated = Sets.newHashSet(); for (GenericRecord record : expected) { expectedAsUpdated.add(updatedBuilder - .clear("priority") - .set("id", record.get("id")) - .set("msg", record.get("msg")) - .build()); + .clear("priority") + .set("id", record.get("id")) + .set("msg", record.get("msg")) + .build()); } expectedAsUpdated.add(updatedRecord); @@ -343,9 +349,9 @@ public class TestDatasetSink { // update the dataset's schema DatasetDescriptor updated = new DatasetDescriptor - .Builder(Datasets.load(FILE_DATASET_URI).getDataset().getDescriptor()) - .schema(UPDATED_SCHEMA) - .build(); + .Builder(Datasets.load(FILE_DATASET_URI).getDataset().getDescriptor()) + .schema(UPDATED_SCHEMA) + .build(); Datasets.update(FILE_DATASET_URI, updated); // trigger a roll on the next process call to refresh the writer @@ -358,15 +364,12 @@ public class TestDatasetSink { sink.process(); sink.stop(); - Assert.assertEquals( - expectedAsUpdated, - read(Datasets.load(FILE_DATASET_URI))); + Assert.assertEquals(expectedAsUpdated, read(Datasets.load(FILE_DATASET_URI))); Assert.assertEquals("Should have committed", 0, remaining(in)); } @Test - public void testMiniClusterStore() - throws EventDeliveryException, IOException { + public void testMiniClusterStore() throws EventDeliveryException, IOException { // setup a minicluster MiniDFSCluster cluster = new MiniDFSCluster .Builder(new Configuration()) http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java index 9c1cd09..f1dadf1 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/HDFSTestSeqWriter.java @@ -19,25 +19,27 @@ package org.apache.flume.sink.hdfs; -import java.io.IOException; - import org.apache.flume.Event; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; +import java.io.IOException; + public class HDFSTestSeqWriter extends HDFSSequenceFile { - protected volatile boolean closed, opened; + protected volatile boolean closed; + protected volatile boolean opened; private int openCount = 0; + HDFSTestSeqWriter(int openCount) { this.openCount = openCount; } @Override - public void open(String filePath, CompressionCodec codeC, - CompressionType compType) throws IOException { + public void open(String filePath, CompressionCodec codeC, CompressionType compType) + throws IOException { super.open(filePath, codeC, compType); - if(closed) { + if (closed) { opened = true; } } @@ -52,7 +54,7 @@ public class HDFSTestSeqWriter extends HDFSSequenceFile { throw new IOException("Injected fault"); } else if (e.getHeaders().containsKey("fault-until-reopen")) { // opening first time. - if(openCount == 1) { + if (openCount == 1) { throw new IOException("Injected fault-until-reopen"); } } else if (e.getHeaders().containsKey("slow")) { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java index f0c6e7e..a85a99f 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockDataStream.java @@ -30,9 +30,9 @@ class MockDataStream extends HDFSDataStream { MockDataStream(FileSystem fs) { this.fs = fs; } + @Override - protected FileSystem getDfs(Configuration conf, - Path dstPath) throws IOException{ + protected FileSystem getDfs(Configuration conf, Path dstPath) throws IOException { return fs; } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java index 4443335..a079b83 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFileSystem.java @@ -42,8 +42,7 @@ public class MockFileSystem extends FileSystem { int currentRenameAttempts; boolean closeSucceed = true; - public MockFileSystem(FileSystem fs, - int numberOfRetriesRequired) { + public MockFileSystem(FileSystem fs, int numberOfRetriesRequired) { this.fs = fs; this.numberOfRetriesRequired = numberOfRetriesRequired; } @@ -67,17 +66,14 @@ public class MockFileSystem extends FileSystem { @Override public FSDataOutputStream create(Path arg0) throws IOException { - //throw new IOException ("HI there2"); - latestOutputStream = new MockFsDataOutputStream( - fs.create(arg0), closeSucceed); - + latestOutputStream = new MockFsDataOutputStream(fs.create(arg0), closeSucceed); return latestOutputStream; } @Override - public FSDataOutputStream create(Path arg0, FsPermission arg1, - boolean arg2, int arg3, short arg4, long arg5, Progressable arg6) - throws IOException { + public FSDataOutputStream create(Path arg0, FsPermission arg1, boolean arg2, int arg3, + short arg4, long arg5, Progressable arg6) + throws IOException { throw new IOException("Not a real file system"); } @@ -126,11 +122,9 @@ public class MockFileSystem extends FileSystem { @Override public boolean rename(Path arg0, Path arg1) throws IOException { currentRenameAttempts++; - logger.info( - "Attempting to Rename: '" + currentRenameAttempts + "' of '" + - numberOfRetriesRequired + "'"); - if (currentRenameAttempts >= numberOfRetriesRequired || - numberOfRetriesRequired == 0) { + logger.info("Attempting to Rename: '" + currentRenameAttempts + "' of '" + + numberOfRetriesRequired + "'"); + if (currentRenameAttempts >= numberOfRetriesRequired || numberOfRetriesRequired == 0) { logger.info("Renaming file"); return fs.rename(arg0, arg1); } else { @@ -141,6 +135,5 @@ public class MockFileSystem extends FileSystem { @Override public void setWorkingDirectory(Path arg0) { fs.setWorkingDirectory(arg0); - } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java index 35b034e..f5d579c 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockFsDataOutputStream.java @@ -17,21 +17,20 @@ + */ package org.apache.flume.sink.hdfs; -import java.io.IOException; - import org.apache.hadoop.fs.FSDataOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class MockFsDataOutputStream extends FSDataOutputStream{ +import java.io.IOException; + +public class MockFsDataOutputStream extends FSDataOutputStream { private static final Logger logger = LoggerFactory.getLogger(MockFsDataOutputStream.class); boolean closeSucceed; - public MockFsDataOutputStream(FSDataOutputStream wrapMe, - boolean closeSucceed) + public MockFsDataOutputStream(FSDataOutputStream wrapMe, boolean closeSucceed) throws IOException { super(wrapMe.getWrappedStream(), null); this.closeSucceed = closeSucceed; @@ -39,8 +38,7 @@ public class MockFsDataOutputStream extends FSDataOutputStream{ @Override public void close() throws IOException { - logger.info( - "Close Succeeded - " + closeSucceed); + logger.info("Close Succeeded - " + closeSucceed); if (closeSucceed) { logger.info("closing file"); super.close(); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java index ec49b97..05c4316 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java @@ -68,7 +68,8 @@ public class MockHDFSWriter implements HDFSWriter { filesOpened++; } - public void open(String filePath, CompressionCodec codec, CompressionType cType) throws IOException { + public void open(String filePath, CompressionCodec codec, CompressionType cType) + throws IOException { this.filePath = filePath; filesOpened++; } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index 2581f73..742deb0 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -18,14 +18,7 @@ */ package org.apache.flume.sink.hdfs; -import java.io.File; -import java.io.IOException; -import java.util.Calendar; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - +import com.google.common.base.Charsets; import org.apache.flume.Clock; import org.apache.flume.Context; import org.apache.flume.Event; @@ -46,12 +39,17 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; +import java.io.File; +import java.io.IOException; +import java.util.Calendar; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; public class TestBucketWriter { - private static Logger logger = - LoggerFactory.getLogger(TestBucketWriter.class); + private static Logger logger = LoggerFactory.getLogger(TestBucketWriter.class); private Context ctx = new Context(); private static ScheduledExecutorService timedRollerPool; @@ -74,11 +72,11 @@ public class TestBucketWriter { public void testEventCountingRoller() throws IOException, InterruptedException { int maxEvents = 100; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); - BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx, - "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, proxy, - new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, - null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); + BucketWriter bucketWriter = new BucketWriter( + 0, 0, maxEvents, 0, ctx, "/tmp", "file", "", ".tmp", null, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, + Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < 1000; i++) { @@ -98,12 +96,11 @@ public class TestBucketWriter { public void testSizeRoller() throws IOException, InterruptedException { int maxBytes = 300; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); - BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, - ctx, "/tmp", "file", "", ".tmp", null, null, - SequenceFile.CompressionType.NONE, hdfsWriter,timedRollerPool, - proxy, new SinkCounter("test-bucket-writer-" + - System.currentTimeMillis()),0, null, null, 30000, - Executors.newSingleThreadExecutor(), 0, 0); + BucketWriter bucketWriter = new BucketWriter( + 0, maxBytes, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, + Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < 1000; i++) { @@ -126,16 +123,16 @@ public class TestBucketWriter { final AtomicBoolean calledBack = new AtomicBoolean(false); MockHDFSWriter hdfsWriter = new MockHDFSWriter(); - BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, - "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, proxy, - new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, new HDFSEventSink.WriterCallback() { - @Override - public void run(String filePath) { - calledBack.set(true); - } - }, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); + BucketWriter bucketWriter = new BucketWriter( + ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, + new HDFSEventSink.WriterCallback() { + @Override + public void run(String filePath) { + calledBack.set(true); + } + }, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); long startNanos = System.nanoTime(); @@ -149,12 +146,11 @@ public class TestBucketWriter { Assert.assertTrue(bucketWriter.closed); Assert.assertTrue(calledBack.get()); - bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, - "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, proxy, - new SinkCounter("test-bucket-writer-" - + System.currentTimeMillis()), 0, null, null, 30000, - Executors.newSingleThreadExecutor(), 0, 0); + bucketWriter = new BucketWriter( + ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", null, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, + Executors.newSingleThreadExecutor(), 0, 0); // write one more event (to reopen a new file so we will roll again later) bucketWriter.append(e); @@ -193,17 +189,16 @@ public class TestBucketWriter { private volatile boolean open = false; public void configure(Context context) { - } public void sync() throws IOException { - if(!open) { + if (!open) { throw new IOException("closed"); } } - public void open(String filePath, CompressionCodec codec, - CompressionType cType) throws IOException { + public void open(String filePath, CompressionCodec codec, CompressionType cType) + throws IOException { open = true; } @@ -225,19 +220,18 @@ public class TestBucketWriter { open = true; } }; + HDFSTextSerializer serializer = new HDFSTextSerializer(); File tmpFile = File.createTempFile("flume", "test"); tmpFile.deleteOnExit(); String path = tmpFile.getParent(); String name = tmpFile.getName(); - BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, - 0, ctx, path, name, "", ".tmp", null, null, - SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" - + System.currentTimeMillis()), - 0, null, null, 30000, Executors.newSingleThreadExecutor(), - 0, 0); + BucketWriter bucketWriter = new BucketWriter( + ROLL_INTERVAL, 0, 0, 0, ctx, path, name, "", ".tmp", null, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, + Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < NUM_EVENTS - 1; i++) { @@ -252,62 +246,61 @@ public class TestBucketWriter { @Test public void testFileSuffixNotGiven() throws IOException, InterruptedException { - final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test - final String suffix = null; - - MockHDFSWriter hdfsWriter = new MockHDFSWriter(); - BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, - 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, - SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" - + System.currentTimeMillis()), 0, null, null, 30000, + final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test + final String suffix = null; + + MockHDFSWriter hdfsWriter = new MockHDFSWriter(); + BucketWriter bucketWriter = new BucketWriter( + ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); - // Need to override system time use for test so we know what to expect - final long testTime = System.currentTimeMillis(); - Clock testClock = new Clock() { - public long currentTimeMillis() { - return testTime; - } - }; - bucketWriter.setClock(testClock); + // Need to override system time use for test so we know what to expect + final long testTime = System.currentTimeMillis(); + Clock testClock = new Clock() { + public long currentTimeMillis() { + return testTime; + } + }; + bucketWriter.setClock(testClock); - Event e = EventBuilder.withBody("foo", Charsets.UTF_8); - bucketWriter.append(e); + Event e = EventBuilder.withBody("foo", Charsets.UTF_8); + bucketWriter.append(e); - Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1) + ".tmp")); + Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith( + Long.toString(testTime + 1) + ".tmp")); } - @Test - public void testFileSuffixGiven() throws IOException, InterruptedException { - final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test - final String suffix = ".avro"; + @Test + public void testFileSuffixGiven() throws IOException, InterruptedException { + final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test + final String suffix = ".avro"; - MockHDFSWriter hdfsWriter = new MockHDFSWriter(); - BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, - 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, - SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, proxy, new SinkCounter( - "test-bucket-writer-" + System.currentTimeMillis()), 0, - null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); + MockHDFSWriter hdfsWriter = new MockHDFSWriter(); + BucketWriter bucketWriter = new BucketWriter( + ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, + Executors.newSingleThreadExecutor(), 0, 0); - // Need to override system time use for test so we know what to expect + // Need to override system time use for test so we know what to expect - final long testTime = System.currentTimeMillis(); + final long testTime = System.currentTimeMillis(); - Clock testClock = new Clock() { - public long currentTimeMillis() { - return testTime; - } - }; - bucketWriter.setClock(testClock); + Clock testClock = new Clock() { + public long currentTimeMillis() { + return testTime; + } + }; + bucketWriter.setClock(testClock); - Event e = EventBuilder.withBody("foo", Charsets.UTF_8); - bucketWriter.append(e); + Event e = EventBuilder.withBody("foo", Charsets.UTF_8); + bucketWriter.append(e); - Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith( - Long.toString(testTime + 1) + suffix + ".tmp")); - } + Assert.assertTrue("Incorrect suffix",hdfsWriter.getOpenedFilePath().endsWith( + Long.toString(testTime + 1) + suffix + ".tmp")); + } @Test public void testFileSuffixCompressed() @@ -316,13 +309,11 @@ public class TestBucketWriter { final String suffix = ".foo"; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); - BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, - 0, ctx, "/tmp", "file", "", ".tmp", suffix, - HDFSEventSink.getCodec("gzip"), - SequenceFile.CompressionType.BLOCK, hdfsWriter, - timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" - + System.currentTimeMillis()), 0, null, null, 30000, - Executors.newSingleThreadExecutor(), 0, 0 + BucketWriter bucketWriter = new BucketWriter( + ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, + HDFSEventSink.getCodec("gzip"), SequenceFile.CompressionType.BLOCK, hdfsWriter, + timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), + 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0 ); // Need to override system time use for test so we know what to expect @@ -338,8 +329,8 @@ public class TestBucketWriter { Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); - Assert.assertTrue("Incorrect suffix",hdfsWriter.getOpenedFilePath() - .endsWith(Long.toString(testTime+1) + suffix + ".tmp")); + Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith( + Long.toString(testTime + 1) + suffix + ".tmp")); } @Test @@ -349,12 +340,11 @@ public class TestBucketWriter { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextSerializer formatter = new HDFSTextSerializer(); - BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, - 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null, - SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, proxy, new SinkCounter( - "test-bucket-writer-" + System.currentTimeMillis()), 0, - null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); + BucketWriter bucketWriter = new BucketWriter( + ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", PREFIX, ".tmp", null, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, + Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); @@ -369,12 +359,11 @@ public class TestBucketWriter { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextSerializer serializer = new HDFSTextSerializer(); - BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, - 0, ctx, "/tmp", "file", "", SUFFIX, null, null, - SequenceFile.CompressionType.NONE, hdfsWriter, - timedRollerPool, proxy, new SinkCounter( - "test-bucket-writer-" + System.currentTimeMillis()), 0, - null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0); + BucketWriter bucketWriter = new BucketWriter( + ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, + Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); @@ -389,18 +378,16 @@ public class TestBucketWriter { final AtomicBoolean callbackCalled = new AtomicBoolean(false); MockHDFSWriter hdfsWriter = new MockHDFSWriter(); - BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, - 0, ctx, "/tmp", "file", "", SUFFIX, null, null, - SequenceFile.CompressionType.NONE, - hdfsWriter, timedRollerPool, proxy, - new SinkCounter( - "test-bucket-writer-" + System.currentTimeMillis()), 0, - new HDFSEventSink.WriterCallback() { - @Override - public void run(String filePath) { - callbackCalled.set(true); - } - }, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0); + BucketWriter bucketWriter = new BucketWriter( + ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", SUFFIX, null, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, + new HDFSEventSink.WriterCallback() { + @Override + public void run(String filePath) { + callbackCalled.set(true); + } + }, "blah", 30000, Executors.newSingleThreadExecutor(), 0, 0); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); @@ -420,13 +407,13 @@ public class TestBucketWriter { SequenceFileRenameRetryCoreTest(1, false); SequenceFileRenameRetryCoreTest(5, false); SequenceFileRenameRetryCoreTest(2, false); - } - public void SequenceFileRenameRetryCoreTest(int numberOfRetriesRequired, boolean closeSucceed) throws Exception { - String hdfsPath = "file:///tmp/flume-test." - + Calendar.getInstance().getTimeInMillis() + "." - + Thread.currentThread().getId(); + public void SequenceFileRenameRetryCoreTest(int numberOfRetriesRequired, boolean closeSucceed) + throws Exception { + String hdfsPath = "file:///tmp/flume-test." + + Calendar.getInstance().getTimeInMillis() + + "." + Thread.currentThread().getId(); Context context = new Context(); Configuration conf = new Configuration(); @@ -435,22 +422,16 @@ public class TestBucketWriter { fs.delete(dirPath, true); fs.mkdirs(dirPath); context.put("hdfs.path", hdfsPath); - context.put("hdfs.closeTries", - String.valueOf(numberOfRetriesRequired)); + context.put("hdfs.closeTries", String.valueOf(numberOfRetriesRequired)); context.put("hdfs.rollCount", "1"); context.put("hdfs.retryInterval", "1"); context.put("hdfs.callTimeout", Long.toString(1000)); - MockFileSystem mockFs = new - MockFileSystem(fs, - numberOfRetriesRequired, closeSucceed); - BucketWriter bucketWriter = new BucketWriter(0, 0, 1, 1, ctx, - hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null, - null, new MockDataStream(mockFs), - timedRollerPool, proxy, - new SinkCounter( - "test-bucket-writer-" + System.currentTimeMillis()), - 0, null, null, 30000, Executors.newSingleThreadExecutor(), 1, - numberOfRetriesRequired); + MockFileSystem mockFs = new MockFileSystem(fs, numberOfRetriesRequired, closeSucceed); + BucketWriter bucketWriter = new BucketWriter( + 0, 0, 1, 1, ctx, hdfsPath, hdfsPath, "singleBucket", ".tmp", null, null, + null, new MockDataStream(mockFs), timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, + Executors.newSingleThreadExecutor(), 1, numberOfRetriesRequired); bucketWriter.setFileSystem(mockFs); // At this point, we checked if isFileClosed is available in @@ -463,8 +444,7 @@ public class TestBucketWriter { TimeUnit.SECONDS.sleep(numberOfRetriesRequired + 2); Assert.assertTrue("Expected " + numberOfRetriesRequired + " " + - "but got " + bucketWriter.renameTries.get(), - bucketWriter.renameTries.get() == - numberOfRetriesRequired); + "but got " + bucketWriter.renameTries.get(), + bucketWriter.renameTries.get() == numberOfRetriesRequired); } } http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index 23862eb..73f016b 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -71,7 +71,6 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Charsets; import com.google.common.collect.Lists; - public class TestHDFSEventSink { private HDFSEventSink sink; @@ -118,8 +117,7 @@ public class TestHDFSEventSink { @After public void tearDown() { - if (System.getenv("hdfs_keepFiles") == null) - dirCleanup(); + if (System.getenv("hdfs_keepFiles") == null) dirCleanup(); } @Test @@ -176,7 +174,7 @@ public class TestHDFSEventSink { List<String> bodies = Lists.newArrayList(); // push the event batches into channel to roll twice - for (i = 1; i <= (rollCount*10)/batchSize; i++) { + for (i = 1; i <= (rollCount * 10) / batchSize; i++) { Transaction txn = channel.getTransaction(); txn.begin(); for (j = 1; j <= batchSize; j++) { @@ -200,7 +198,7 @@ public class TestHDFSEventSink { // loop through all the files generated and check their contains FileStatus[] dirStat = fs.listStatus(dirPath); - Path fList[] = FileUtil.stat2Paths(dirStat); + Path[] fList = FileUtil.stat2Paths(dirStat); // check that the roll happened correctly for the given data long expectedFiles = totalEvents / rollCount; @@ -353,7 +351,7 @@ public class TestHDFSEventSink { // loop through all the files generated and check their contains FileStatus[] dirStat = fs.listStatus(dirPath); - Path fList[] = FileUtil.stat2Paths(dirStat); + Path[] fList = FileUtil.stat2Paths(dirStat); // check that the roll happened correctly for the given data long expectedFiles = totalEvents / rollCount; @@ -432,7 +430,7 @@ public class TestHDFSEventSink { // loop through all the files generated and check their contains FileStatus[] dirStat = fs.listStatus(dirPath); - Path fList[] = FileUtil.stat2Paths(dirStat); + Path[] fList = FileUtil.stat2Paths(dirStat); // check that the roll happened correctly for the given data long expectedFiles = totalEvents / rollCount; @@ -508,7 +506,7 @@ public class TestHDFSEventSink { // loop through all the files generated and check their contains FileStatus[] dirStat = fs.listStatus(dirPath); - Path fList[] = FileUtil.stat2Paths(dirStat); + Path[] fList = FileUtil.stat2Paths(dirStat); // check that the roll happened correctly for the given data long expectedFiles = totalEvents / rollCount; @@ -519,8 +517,8 @@ public class TestHDFSEventSink { } @Test - public void testSimpleAppendLocalTime() throws InterruptedException, - LifecycleException, EventDeliveryException, IOException { + public void testSimpleAppendLocalTime() + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { final long currentTime = System.currentTimeMillis(); Clock clk = new Clock() { @Override @@ -536,7 +534,7 @@ public class TestHDFSEventSink { final int numBatches = 4; String newPath = testPath + "/singleBucket/%s" ; String expectedPath = testPath + "/singleBucket/" + - String.valueOf(currentTime/1000); + String.valueOf(currentTime / 1000); int totalEvents = 0; int i = 1, j = 1; @@ -576,7 +574,7 @@ public class TestHDFSEventSink { eventDate.clear(); eventDate.set(2011, i, i, i, 0); // yy mm dd event.getHeaders().put("timestamp", - String.valueOf(eventDate.getTimeInMillis())); + String.valueOf(eventDate.getTimeInMillis())); event.getHeaders().put("hostname", "Host" + i); String body = "Test." + i + "." + j; event.setBody(body.getBytes()); @@ -595,13 +593,13 @@ public class TestHDFSEventSink { // loop through all the files generated and check their contains FileStatus[] dirStat = fs.listStatus(dirPath); - Path fList[] = FileUtil.stat2Paths(dirStat); + Path[] fList = FileUtil.stat2Paths(dirStat); // check that the roll happened correctly for the given data long expectedFiles = totalEvents / rollCount; if (totalEvents % rollCount > 0) expectedFiles++; Assert.assertEquals("num files wrong, found: " + - Lists.newArrayList(fList), expectedFiles, fList.length); + Lists.newArrayList(fList), expectedFiles, fList.length); verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); // The clock in bucketpath is static, so restore the real clock sink.setBucketClock(new SystemClock()); @@ -750,10 +748,10 @@ public class TestHDFSEventSink { private List<String> getAllFiles(String input) { List<String> output = Lists.newArrayList(); File dir = new File(input); - if(dir.isFile()) { + if (dir.isFile()) { output.add(dir.getAbsolutePath()); - } else if(dir.isDirectory()) { - for(String file : dir.list()) { + } else if (dir.isDirectory()) { + for (String file : dir.list()) { File subDir = new File(dir, file); output.addAll(getAllFiles(subDir.getAbsolutePath())); } @@ -761,16 +759,17 @@ public class TestHDFSEventSink { return output; } - private void verifyOutputSequenceFiles(FileSystem fs, Configuration conf, String dir, String prefix, List<String> bodies) throws IOException { + private void verifyOutputSequenceFiles(FileSystem fs, Configuration conf, String dir, + String prefix, List<String> bodies) throws IOException { int found = 0; int expected = bodies.size(); - for(String outputFile : getAllFiles(dir)) { + for (String outputFile : getAllFiles(dir)) { String name = (new File(outputFile)).getName(); - if(name.startsWith(prefix)) { + if (name.startsWith(prefix)) { SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(outputFile), conf); LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); - while(reader.next(key, value)) { + while (reader.next(key, value)) { String body = new String(value.getBytes(), 0, value.getLength()); if (bodies.contains(body)) { LOG.debug("Found event body: {}", body); @@ -792,16 +791,17 @@ public class TestHDFSEventSink { } - private void verifyOutputTextFiles(FileSystem fs, Configuration conf, String dir, String prefix, List<String> bodies) throws IOException { + private void verifyOutputTextFiles(FileSystem fs, Configuration conf, String dir, String prefix, + List<String> bodies) throws IOException { int found = 0; int expected = bodies.size(); - for(String outputFile : getAllFiles(dir)) { + for (String outputFile : getAllFiles(dir)) { String name = (new File(outputFile)).getName(); - if(name.startsWith(prefix)) { + if (name.startsWith(prefix)) { FSDataInputStream input = fs.open(new Path(outputFile)); BufferedReader reader = new BufferedReader(new InputStreamReader(input)); String body = null; - while((body = reader.readLine()) != null) { + while ((body = reader.readLine()) != null) { bodies.remove(body); found++; } @@ -814,12 +814,13 @@ public class TestHDFSEventSink { } - private void verifyOutputAvroFiles(FileSystem fs, Configuration conf, String dir, String prefix, List<String> bodies) throws IOException { + private void verifyOutputAvroFiles(FileSystem fs, Configuration conf, String dir, String prefix, + List<String> bodies) throws IOException { int found = 0; int expected = bodies.size(); - for(String outputFile : getAllFiles(dir)) { + for (String outputFile : getAllFiles(dir)) { String name = (new File(outputFile)).getName(); - if(name.startsWith(prefix)) { + if (name.startsWith(prefix)) { FSDataInputStream input = fs.open(new Path(outputFile)); DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(); DataFileStream<GenericRecord> avroStream = @@ -840,7 +841,7 @@ public class TestHDFSEventSink { } Assert.assertTrue("Found = " + found + ", Expected = " + expected + ", Left = " + bodies.size() + " " + bodies, - bodies.size() == 0); + bodies.size() == 0); } /** @@ -849,9 +850,9 @@ public class TestHDFSEventSink { * This relies on Transactional rollback semantics for durability and * the behavior of the BucketWriter class of close()ing upon IOException. */ - @Test - public void testCloseReopen() throws InterruptedException, - LifecycleException, EventDeliveryException, IOException { + @Test + public void testCloseReopen() + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { LOG.debug("Starting..."); final int numBatches = 4; @@ -924,8 +925,8 @@ public class TestHDFSEventSink { * a new one is used for the next set of events. */ @Test - public void testCloseReopenOnRollTime() throws InterruptedException, - LifecycleException, EventDeliveryException, IOException { + public void testCloseReopenOnRollTime() + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { LOG.debug("Starting..."); final int numBatches = 4; @@ -973,7 +974,7 @@ public class TestHDFSEventSink { eventDate.clear(); eventDate.set(2011, i, i, i, 0); // yy mm dd event.getHeaders().put("timestamp", - String.valueOf(eventDate.getTimeInMillis())); + String.valueOf(eventDate.getTimeInMillis())); event.getHeaders().put("hostname", "Host" + i); String body = "Test." + i + "." + j; event.setBody(body.getBytes()); @@ -997,9 +998,9 @@ public class TestHDFSEventSink { Assert.assertTrue(badWriterFactory.openCount.get() >= 2); LOG.info("Total number of bucket writers opened: {}", - badWriterFactory.openCount.get()); + badWriterFactory.openCount.get()); verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, - bodies); + bodies); } /** @@ -1007,8 +1008,8 @@ public class TestHDFSEventSink { * sfWriters map. */ @Test - public void testCloseRemovesFromSFWriters() throws InterruptedException, - LifecycleException, EventDeliveryException, IOException { + public void testCloseRemovesFromSFWriters() + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { LOG.debug("Starting..."); final String fileName = "FlumeData"; @@ -1055,7 +1056,7 @@ public class TestHDFSEventSink { eventDate.clear(); eventDate.set(2011, i, i, i, 0); // yy mm dd event.getHeaders().put("timestamp", - String.valueOf(eventDate.getTimeInMillis())); + String.valueOf(eventDate.getTimeInMillis())); event.getHeaders().put("hostname", "Host" + i); String body = "Test." + i + "." + j; event.setBody(body.getBytes()); @@ -1080,9 +1081,9 @@ public class TestHDFSEventSink { sink.stop(); LOG.info("Total number of bucket writers opened: {}", - badWriterFactory.openCount.get()); + badWriterFactory.openCount.get()); verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, - bodies); + bodies); } @@ -1163,8 +1164,9 @@ public class TestHDFSEventSink { * append using slow sink writer with specified append timeout * verify that the data is written correctly to files */ - private void slowAppendTestHelper (long appendTimeout) throws InterruptedException, IOException, - LifecycleException, EventDeliveryException, IOException { + private void slowAppendTestHelper(long appendTimeout) + throws InterruptedException, IOException, LifecycleException, EventDeliveryException, + IOException { final String fileName = "FlumeData"; final long rollCount = 5; final long batchSize = 2; @@ -1230,7 +1232,7 @@ public class TestHDFSEventSink { // loop through all the files generated and check their contains FileStatus[] dirStat = fs.listStatus(dirPath); - Path fList[] = FileUtil.stat2Paths(dirStat); + Path[] fList = FileUtil.stat2Paths(dirStat); // check that the roll happened correctly for the given data // Note that we'll end up with two files with only a head @@ -1292,7 +1294,7 @@ public class TestHDFSEventSink { Transaction txn = channel.getTransaction(); txn.begin(); - for(int i=0; i < 10; i++) { + for (int i = 0; i < 10; i++) { Event event = new SimpleEvent(); event.setBody(("test event " + i).getBytes()); channel.put(event); @@ -1317,9 +1319,9 @@ public class TestHDFSEventSink { FileStatus[] dirStat = fs.listStatus(dirPath); Path[] fList = FileUtil.stat2Paths(dirStat); Assert.assertEquals("Incorrect content of the directory " + StringUtils.join(fList, ","), - 2, fList.length); + 2, fList.length); Assert.assertTrue(!fList[0].getName().endsWith(".tmp") && - !fList[1].getName().endsWith(".tmp")); + !fList[1].getName().endsWith(".tmp")); fs.close(); } @@ -1338,8 +1340,7 @@ public class TestHDFSEventSink { } @Test - public void testBadConfigurationForRetryIntervalZero() throws - Exception { + public void testBadConfigurationForRetryIntervalZero() throws Exception { Context context = getContextForRetryTests(); context.put("hdfs.retryInterval", "0"); @@ -1348,43 +1349,41 @@ public class TestHDFSEventSink { } @Test - public void testBadConfigurationForRetryIntervalNegative() throws - Exception { + public void testBadConfigurationForRetryIntervalNegative() throws Exception { Context context = getContextForRetryTests(); context.put("hdfs.retryInterval", "-1"); Configurables.configure(sink, context); Assert.assertEquals(1, sink.getTryCount()); } + @Test - public void testBadConfigurationForRetryCountZero() throws - Exception { + public void testBadConfigurationForRetryCountZero() throws Exception { Context context = getContextForRetryTests(); context.put("hdfs.closeTries" ,"0"); Configurables.configure(sink, context); Assert.assertEquals(Integer.MAX_VALUE, sink.getTryCount()); } + @Test - public void testBadConfigurationForRetryCountNegative() throws - Exception { + public void testBadConfigurationForRetryCountNegative() throws Exception { Context context = getContextForRetryTests(); context.put("hdfs.closeTries" ,"-4"); Configurables.configure(sink, context); Assert.assertEquals(Integer.MAX_VALUE, sink.getTryCount()); } + @Test - public void testRetryRename() throws InterruptedException, - LifecycleException, - EventDeliveryException, IOException { + public void testRetryRename() + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { testRetryRename(true); testRetryRename(false); } - private void testRetryRename(boolean closeSucceed) throws InterruptedException, - LifecycleException, - EventDeliveryException, IOException { + private void testRetryRename(boolean closeSucceed) + throws InterruptedException, LifecycleException, EventDeliveryException, IOException { LOG.debug("Starting..."); String newPath = testPath + "/retryBucket"; @@ -1441,8 +1440,8 @@ public class TestHDFSEventSink { Collection<BucketWriter> writers = sink.getSfWriters().values(); int totalRenameAttempts = 0; - for(BucketWriter writer: writers) { - LOG.info("Rename tries = "+ writer.renameTries.get()); + for (BucketWriter writer : writers) { + LOG.info("Rename tries = " + writer.renameTries.get()); totalRenameAttempts += writer.renameTries.get(); } // stop clears the sfWriters map, so we need to compute the http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java index 6381edc..974e857 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestSequenceFileSerializerFactory.java @@ -47,8 +47,7 @@ public class TestSequenceFileSerializerFactory { @Test public void getCustomFormatter() { - SequenceFileSerializer formatter = SequenceFileSerializerFactory - .getSerializer( + SequenceFileSerializer formatter = SequenceFileSerializerFactory.getSerializer( "org.apache.flume.sink.hdfs.MyCustomSerializer$Builder", new Context()); assertTrue(formatter != null); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java index 46724f2..c417404 100644 --- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java +++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveSink.java @@ -55,8 +55,8 @@ import java.util.UUID; public class TestHiveSink { // 1) partitioned table - final static String dbName = "testing"; - final static String tblName = "alerts"; + static final String dbName = "testing"; + static final String tblName = "alerts"; public static final String PART1_NAME = "continent"; public static final String PART2_NAME = "country"; @@ -72,8 +72,8 @@ public class TestHiveSink { private final ArrayList<String> partitionVals; // 2) un-partitioned table - final static String dbName2 = "testing2"; - final static String tblName2 = "alerts2"; + static final String dbName2 = "testing2"; + static final String tblName2 = "alerts2"; final String[] colNames2 = {COL1,COL2}; private String[] colTypes2 = { "int", "string" }; @@ -88,7 +88,6 @@ public class TestHiveSink { @Rule public TemporaryFolder dbFolder = new TemporaryFolder(); - private static final Logger LOG = LoggerFactory.getLogger(HiveSink.class); public TestHiveSink() throws Exception { @@ -182,8 +181,8 @@ public class TestHiveSink { TestUtil.dropDB(conf, dbName2); String dbLocation = dbFolder.newFolder(dbName2).getCanonicalPath() + ".db"; dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths - TestUtil.createDbAndTable(driver, dbName2, tblName2, null, colNames2, colTypes2 - , null, dbLocation); + TestUtil.createDbAndTable(driver, dbName2, tblName2, null, colNames2, colTypes2, + null, dbLocation); try { int totalRecords = 4; @@ -282,7 +281,7 @@ public class TestHiveSink { String body = j + ",blah,This is a log message,other stuff"; event.setBody(body.getBytes()); eventDate.clear(); - eventDate.set(2014, 03, 03, j%batchCount, 1); // yy mm dd hh mm + eventDate.set(2014, 03, 03, j % batchCount, 1); // yy mm dd hh mm event.getHeaders().put( "timestamp", String.valueOf(eventDate.getTimeInMillis()) ); event.getHeaders().put( PART1_NAME, "Asia" ); @@ -317,7 +316,7 @@ public class TestHiveSink { throws EventDeliveryException, IOException, CommandNeedRetryException { int batchSize = 2; int batchCount = 3; - int totalRecords = batchCount*batchSize; + int totalRecords = batchCount * batchSize; Context context = new Context(); context.put("hive.metastore", metaStoreURI); context.put("hive.database", dbName); @@ -340,7 +339,7 @@ public class TestHiveSink { txn.begin(); for (int j = 1; j <= batchSize; j++) { Event event = new SimpleEvent(); - String body = i*j + ",blah,This is a log message,other stuff"; + String body = i * j + ",blah,This is a log message,other stuff"; event.setBody(body.getBytes()); bodies.add(body); channel.put(event); @@ -361,7 +360,7 @@ public class TestHiveSink { public void testJsonSerializer() throws Exception { int batchSize = 2; int batchCount = 2; - int totalRecords = batchCount*batchSize; + int totalRecords = batchCount * batchSize; Context context = new Context(); context.put("hive.metastore",metaStoreURI); context.put("hive.database",dbName); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java index 41bf0f6..4d7c9bb 100644 --- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java +++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestHiveWriter.java @@ -42,8 +42,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class TestHiveWriter { - final static String dbName = "testing"; - final static String tblName = "alerts"; + static final String dbName = "testing"; + static final String tblName = "alerts"; public static final String PART1_NAME = "continent"; public static final String PART2_NAME = "country"; @@ -106,8 +106,8 @@ public class TestHiveWriter { TestUtil.dropDB(conf, dbName); String dbLocation = dbFolder.newFolder(dbName).getCanonicalPath() + ".db"; dbLocation = dbLocation.replaceAll("\\\\","/"); // for windows paths - TestUtil.createDbAndTable(driver, dbName, tblName, partVals, colNames, colTypes - , partNames, dbLocation); + TestUtil.createDbAndTable(driver, dbName, tblName, partVals, colNames, colTypes, partNames, + dbLocation); // 2) Setup serializer Context ctx = new Context(); @@ -120,8 +120,8 @@ public class TestHiveWriter { public void testInstantiate() throws Exception { HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals); SinkCounter sinkCounter = new SinkCounter(this.getClass().getName()); - HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout - , callTimeoutPool, "flumetest", serializer, sinkCounter); + HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout, callTimeoutPool, "flumetest", + serializer, sinkCounter); writer.close(); } @@ -130,8 +130,8 @@ public class TestHiveWriter { public void testWriteBasic() throws Exception { HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals); SinkCounter sinkCounter = new SinkCounter(this.getClass().getName()); - HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout - , callTimeoutPool, "flumetest", serializer, sinkCounter); + HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout, callTimeoutPool, "flumetest", + serializer, sinkCounter); writeEvents(writer,3); writer.flush(false); @@ -144,8 +144,8 @@ public class TestHiveWriter { HiveEndPoint endPoint = new HiveEndPoint(metaStoreURI, dbName, tblName, partVals); SinkCounter sinkCounter = new SinkCounter(this.getClass().getName()); - HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout - , callTimeoutPool, "flumetest", serializer, sinkCounter); + HiveWriter writer = new HiveWriter(endPoint, 10, true, timeout, callTimeoutPool, "flumetest", + serializer, sinkCounter); checkRecordCountInTable(0); SimpleEvent event = new SimpleEvent(); @@ -184,8 +184,8 @@ public class TestHiveWriter { int txnPerBatch = 3; - HiveWriter writer = new HiveWriter(endPoint, txnPerBatch, true, timeout - , callTimeoutPool, "flumetest", serializer, sinkCounter); + HiveWriter writer = new HiveWriter(endPoint, txnPerBatch, true, timeout, callTimeoutPool, + "flumetest", serializer, sinkCounter); Assert.assertEquals(writer.getRemainingTxns(),2); writer.flush(true); @@ -275,14 +275,13 @@ public class TestHiveWriter { ctx.put("serializer.serdeSeparator", "ab"); try { serializer3.configure(ctx); - Assert.assertTrue("Bad serdeSeparator character was accepted" ,false); - } catch (Exception e){ + Assert.assertTrue("Bad serdeSeparator character was accepted", false); + } catch (Exception e) { // expect an exception } } - @Test public void testSecondWriterBeforeFirstCommits() throws Exception { // here we open a new writer while the first is still writing (not committed) @@ -295,13 +294,13 @@ public class TestHiveWriter { SinkCounter sinkCounter1 = new SinkCounter(this.getClass().getName()); SinkCounter sinkCounter2 = new SinkCounter(this.getClass().getName()); - HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout - , callTimeoutPool, "flumetest", serializer, sinkCounter1); + HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout, callTimeoutPool, "flumetest", + serializer, sinkCounter1); writeEvents(writer1, 3); - HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout - , callTimeoutPool, "flumetest", serializer, sinkCounter2); + HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout, callTimeoutPool, "flumetest", + serializer, sinkCounter2); writeEvents(writer2, 3); writer2.flush(false); // commit @@ -311,7 +310,6 @@ public class TestHiveWriter { writer2.close(); } - @Test public void testSecondWriterAfterFirstCommits() throws Exception { // here we open a new writer after the first writer has committed one txn @@ -324,16 +322,16 @@ public class TestHiveWriter { SinkCounter sinkCounter1 = new SinkCounter(this.getClass().getName()); SinkCounter sinkCounter2 = new SinkCounter(this.getClass().getName()); - HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout - , callTimeoutPool, "flumetest", serializer, sinkCounter1); + HiveWriter writer1 = new HiveWriter(endPoint1, 10, true, timeout, callTimeoutPool, "flumetest", + serializer, sinkCounter1); writeEvents(writer1, 3); writer1.flush(false); // commit - HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout - , callTimeoutPool, "flumetest", serializer, sinkCounter2); + HiveWriter writer2 = new HiveWriter(endPoint2, 10, true, timeout, callTimeoutPool, "flumetest", + serializer, sinkCounter2); writeEvents(writer2, 3); writer2.flush(false); // commit @@ -342,8 +340,8 @@ public class TestHiveWriter { writer2.close(); } - - private void writeEvents(HiveWriter writer, int count) throws InterruptedException, HiveWriter.WriteException { + private void writeEvents(HiveWriter writer, int count) + throws InterruptedException, HiveWriter.WriteException { SimpleEvent event = new SimpleEvent(); for (int i = 1; i <= count; i++) { event.setBody((i + ",xyz,Hello world,abc").getBytes()); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java index 107789f..1fcb4eb 100644 --- a/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java +++ b/flume-ng-sinks/flume-hive-sink/src/test/java/org/apache/flume/sink/hive/TestUtil.java @@ -46,7 +46,7 @@ import java.util.List; public class TestUtil { - private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; + private static final String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"; /** * Set up the configuration so it will use the DbTxnManager, concurrency will be set to true, @@ -81,7 +81,7 @@ public class TestUtil { runDDL(driver, crtTbl); System.out.println("crtTbl = " + crtTbl); - if (partNames!=null && partNames.length!=0) { + if (partNames != null && partNames.length != 0) { String addPart = "alter table " + tableName + " add partition ( " + getTablePartsStr2(partNames, partVals) + " )"; runDDL(driver, addPart); @@ -96,7 +96,8 @@ public class TestUtil { } // delete db and all tables in it - public static void dropDB(HiveConf conf, String databaseName) throws HiveException, MetaException { + public static void dropDB(HiveConf conf, String databaseName) + throws HiveException, MetaException { IMetaStoreClient client = new HiveMetaStoreClient(conf); try { for (String table : client.listTableNamesByFilter(databaseName, "", (short)-1)) { @@ -110,9 +111,9 @@ public class TestUtil { private static String getTableColumnsStr(String[] colNames, String[] colTypes) { StringBuffer sb = new StringBuffer(); - for (int i=0; i < colNames.length; ++i) { + for (int i = 0; i < colNames.length; ++i) { sb.append(colNames[i] + " " + colTypes[i]); - if (i<colNames.length-1) { + if (i < colNames.length - 1) { sb.append(","); } } @@ -121,13 +122,13 @@ public class TestUtil { // converts partNames into "partName1 string, partName2 string" private static String getTablePartsStr(String[] partNames) { - if (partNames==null || partNames.length==0) { + if (partNames == null || partNames.length == 0) { return ""; } StringBuffer sb = new StringBuffer(); - for (int i=0; i < partNames.length; ++i) { + for (int i = 0; i < partNames.length; ++i) { sb.append(partNames[i] + " string"); - if (i < partNames.length-1) { + if (i < partNames.length - 1) { sb.append(","); } } @@ -137,9 +138,9 @@ public class TestUtil { // converts partNames,partVals into "partName1=val1, partName2=val2" private static String getTablePartsStr2(String[] partNames, List<String> partVals) { StringBuffer sb = new StringBuffer(); - for (int i=0; i < partVals.size(); ++i) { + for (int i = 0; i < partVals.size(); ++i) { sb.append(partNames[i] + " = '" + partVals.get(i) + "'"); - if (i < partVals.size()-1) { + if (i < partVals.size() - 1) { sb.append(","); } } @@ -147,7 +148,7 @@ public class TestUtil { } public static ArrayList<String> listRecordsInTable(Driver driver, String dbName, String tblName) - throws CommandNeedRetryException, IOException { + throws CommandNeedRetryException, IOException { driver.run("select * from " + dbName + "." + tblName); ArrayList<String> res = new ArrayList<String>(); driver.getResults(res); @@ -155,8 +156,9 @@ public class TestUtil { } public static ArrayList<String> listRecordsInPartition(Driver driver, String dbName, - String tblName, String continent, String country) - throws CommandNeedRetryException, IOException { + String tblName, String continent, + String country) + throws CommandNeedRetryException, IOException { driver.run("select * from " + dbName + "." + tblName + " where continent='" + continent + "' and country='" + country + "'"); ArrayList<String> res = new ArrayList<String>(); @@ -164,9 +166,9 @@ public class TestUtil { return res; } - public static class RawFileSystem extends RawLocalFileSystem { private static final URI NAME; + static { try { NAME = new URI("raw:///"); @@ -211,9 +213,10 @@ public class TestUtil { FsPermission.createImmutable(mod), "owen", "users", path); } } + private static boolean runDDL(Driver driver, String sql) throws QueryFailedException { int retryCount = 1; // # of times to retry if first attempt fails - for (int attempt=0; attempt <= retryCount; ++attempt) { + for (int attempt = 0; attempt <= retryCount; ++attempt) { try { driver.run(sql); return true; http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java b/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java index e6c065e..32517d1 100644 --- a/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java +++ b/flume-ng-sinks/flume-irc-sink/src/test/java/org/apache/flume/sink/irc/TestIRCSink.java @@ -19,7 +19,12 @@ package org.apache.flume.sink.irc; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; -import org.apache.flume.*; +import org.apache.flume.Channel; +import org.apache.flume.Context; +import org.apache.flume.Event; +import org.apache.flume.EventDeliveryException; +import org.apache.flume.Sink; +import org.apache.flume.Transaction; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; import org.apache.flume.event.EventBuilder; @@ -116,7 +121,9 @@ public class TestIRCSink { try { Socket socket = ss.accept(); process(socket); - } catch (Exception ex) {/* noop */ } + } catch (Exception ex) { + /* noop */ + } } } catch (IOException e) { // noop http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java index f9272fa..9fbd747 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchSinkTest.java @@ -18,17 +18,6 @@ */ package org.apache.flume.sink.elasticsearch; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE; -import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL; -import static org.junit.Assert.assertEquals; - -import java.util.Arrays; -import java.util.Comparator; -import java.util.Map; - import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; @@ -51,6 +40,17 @@ import org.joda.time.DateTimeUtils; import org.junit.After; import org.junit.Before; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Map; + +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL; +import static org.junit.Assert.assertEquals; + public abstract class AbstractElasticSearchSinkTest { static final String DEFAULT_INDEX_NAME = "flume"; @@ -136,7 +136,8 @@ public abstract class AbstractElasticSearchSinkTest { .setTypes(DEFAULT_INDEX_TYPE).setQuery(query).execute().actionGet(); } - void assertSearch(int expectedHits, SearchResponse response, Map<String, Object> expectedBody, Event... events) { + void assertSearch(int expectedHits, SearchResponse response, Map<String, Object> expectedBody, + Event... events) { SearchHits hitResponse = response.getHits(); assertEquals(expectedHits, hitResponse.getTotalHits()); http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java index 8022111..b62254e 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchIndexRequestBuilderFactory.java @@ -18,15 +18,7 @@ */ package org.apache.flume.sink.elasticsearch; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Map; - +import com.google.common.collect.Maps; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.conf.ComponentConfiguration; @@ -39,7 +31,14 @@ import org.elasticsearch.common.io.FastByteArrayOutputStream; import org.junit.Before; import org.junit.Test; -import com.google.common.collect.Maps; +import java.io.IOException; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestElasticSearchIndexRequestBuilderFactory extends AbstractElasticSearchSinkTest { @@ -70,7 +69,7 @@ public class TestElasticSearchIndexRequestBuilderFactory @Test public void indexNameShouldBePrefixDashFormattedTimestamp() { long millis = 987654321L; - assertEquals("prefix-"+factory.fastDateFormat.format(millis), + assertEquals("prefix-" + factory.fastDateFormat.format(millis), factory.getIndexName("prefix", millis)); } @@ -135,7 +134,7 @@ public class TestElasticSearchIndexRequestBuilderFactory assertEquals(indexPrefix + '-' + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS), - indexRequestBuilder.request().index()); + indexRequestBuilder.request().index()); assertEquals(indexType, indexRequestBuilder.request().type()); assertArrayEquals(FakeEventSerializer.FAKE_BYTES, indexRequestBuilder.request().source().array()); @@ -154,7 +153,7 @@ public class TestElasticSearchIndexRequestBuilderFactory assertEquals(indexPrefix + '-' + ElasticSearchIndexRequestBuilderFactory.df.format(1213141516L), - indexRequestBuilder.request().index()); + indexRequestBuilder.request().index()); } @Test @@ -174,7 +173,7 @@ public class TestElasticSearchIndexRequestBuilderFactory assertEquals(indexValue + '-' + ElasticSearchIndexRequestBuilderFactory.df.format(FIXED_TIME_MILLIS), - indexRequestBuilder.request().index()); + indexRequestBuilder.request().index()); assertEquals(typeValue, indexRequestBuilder.request().type()); } @@ -192,7 +191,8 @@ public class TestElasticSearchIndexRequestBuilderFactory static class FakeEventSerializer implements ElasticSearchEventSerializer { static final byte[] FAKE_BYTES = new byte[]{9, 8, 7, 6}; - boolean configuredWithContext, configuredWithComponentConfiguration; + boolean configuredWithContext; + boolean configuredWithComponentConfiguration; @Override public BytesStream getContentBuilder(Event event) throws IOException { http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java index ab9587d..65b4dab 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchLogStashEventSerializer.java @@ -18,6 +18,7 @@ */ package org.apache.flume.sink.elasticsearch; +import com.google.gson.JsonParser; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; @@ -28,9 +29,6 @@ import org.junit.Test; import java.util.Date; import java.util.Map; -import com.google.gson.JsonParser; -import com.google.gson.JsonElement; - import static org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer.charset; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.junit.Assert.assertEquals; @@ -56,26 +54,25 @@ public class TestElasticSearchLogStashEventSerializer { Event event = EventBuilder.withBody(message.getBytes(charset)); event.setHeaders(headers); - XContentBuilder expected = jsonBuilder() - .startObject(); - expected.field("@message", new String(message.getBytes(), charset)); - expected.field("@timestamp", new Date(timestamp)); - expected.field("@source", "flume_tail_src"); - expected.field("@type", "sometype"); - expected.field("@source_host", "test@localhost"); - expected.field("@source_path", "/tmp/test"); - - expected.startObject("@fields"); - expected.field("timestamp", String.valueOf(timestamp)); - expected.field("src_path", "/tmp/test"); - expected.field("host", "test@localhost"); - expected.field("headerNameTwo", "headerValueTwo"); - expected.field("source", "flume_tail_src"); - expected.field("headerNameOne", "headerValueOne"); - expected.field("type", "sometype"); - expected.endObject(); - - expected.endObject(); + XContentBuilder expected = jsonBuilder().startObject(); + expected.field("@message", new String(message.getBytes(), charset)); + expected.field("@timestamp", new Date(timestamp)); + expected.field("@source", "flume_tail_src"); + expected.field("@type", "sometype"); + expected.field("@source_host", "test@localhost"); + expected.field("@source_path", "/tmp/test"); + + expected.startObject("@fields"); + expected.field("timestamp", String.valueOf(timestamp)); + expected.field("src_path", "/tmp/test"); + expected.field("host", "test@localhost"); + expected.field("headerNameTwo", "headerValueTwo"); + expected.field("source", "flume_tail_src"); + expected.field("headerNameOne", "headerValueOne"); + expected.field("type", "sometype"); + expected.endObject(); + + expected.endObject(); XContentBuilder actual = fixture.getContentBuilder(event); @@ -102,26 +99,25 @@ public class TestElasticSearchLogStashEventSerializer { Event event = EventBuilder.withBody(message.getBytes(charset)); event.setHeaders(headers); - XContentBuilder expected = jsonBuilder(). - startObject(); - expected.field("@message", new String(message.getBytes(), charset)); - expected.field("@timestamp", new Date(timestamp)); - expected.field("@source", "flume_tail_src"); - expected.field("@type", "sometype"); - expected.field("@source_host", "test@localhost"); - expected.field("@source_path", "/tmp/test"); - - expected.startObject("@fields"); - expected.field("timestamp", String.valueOf(timestamp)); - expected.field("src_path", "/tmp/test"); - expected.field("host", "test@localhost"); - expected.field("headerNameTwo", "headerValueTwo"); - expected.field("source", "flume_tail_src"); - expected.field("headerNameOne", "headerValueOne"); - expected.field("type", "sometype"); - expected.endObject(); - - expected.endObject(); + XContentBuilder expected = jsonBuilder().startObject(); + expected.field("@message", new String(message.getBytes(), charset)); + expected.field("@timestamp", new Date(timestamp)); + expected.field("@source", "flume_tail_src"); + expected.field("@type", "sometype"); + expected.field("@source_host", "test@localhost"); + expected.field("@source_path", "/tmp/test"); + + expected.startObject("@fields"); + expected.field("timestamp", String.valueOf(timestamp)); + expected.field("src_path", "/tmp/test"); + expected.field("host", "test@localhost"); + expected.field("headerNameTwo", "headerValueTwo"); + expected.field("source", "flume_tail_src"); + expected.field("headerNameOne", "headerValueOne"); + expected.field("type", "sometype"); + expected.endObject(); + + expected.endObject(); XContentBuilder actual = fixture.getContentBuilder(event);
