Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 78033594b -> 1bfd2d15a
MLHR-1910 #resolve #comment fixed style violations and warnings in Block Reader and Splitter classes Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/17d05763 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/17d05763 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/17d05763 Branch: refs/heads/devel-3 Commit: 17d05763ddd48ccd9c3b58d44fff153952fdb537 Parents: 7803359 Author: Chandni Singh <[email protected]> Authored: Mon Nov 16 23:15:27 2015 -0800 Committer: Chandni Singh <[email protected]> Committed: Mon Nov 16 23:45:01 2015 -0800 ---------------------------------------------------------------------- .../lib/io/block/AbstractBlockReader.java | 90 +++++++++++--------- .../lib/io/block/AbstractFSBlockReader.java | 22 ++--- .../datatorrent/lib/io/block/BlockMetadata.java | 7 +- .../datatorrent/lib/io/block/FSSliceReader.java | 5 +- .../datatorrent/lib/io/block/ReaderContext.java | 40 ++++----- .../datatorrent/lib/io/block/package-info.java | 2 +- .../lib/io/block/AbstractBlockReaderTest.java | 76 +++++++++-------- .../lib/io/block/FSLineReaderTest.java | 49 ++++++----- .../lib/io/block/FSSliceReaderTest.java | 30 +++---- .../lib/io/block/ReadAheadLineReaderTest.java | 2 +- .../lib/io/fs/FileSplitterBaseTest.java | 26 ++++-- .../datatorrent/lib/io/fs/FileSplitterTest.java | 54 ++++++------ 12 files changed, 215 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java index 1839e68..aada298 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java @@ -21,27 +21,37 @@ package com.datatorrent.lib.io.block; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.util.*; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; import javax.validation.constraints.NotNull; -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.fs.PositionedReadable; -import com.datatorrent.lib.counters.BasicCounters; - -import com.datatorrent.api.*; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.Stats; +import com.datatorrent.api.StatsListener; import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.counters.BasicCounters; /** * AbstractBlockReader processes a block of data from a stream.<br/> @@ -54,7 +64,8 @@ import com.datatorrent.common.util.BaseOperator; * * <p/> * Properties that can be set on AbstractBlockReader:<br/> - * {@link #collectStats}: the operator is dynamically partition-able which is influenced by the backlog and the port queue size. This property disables + * {@link #collectStats}: the operator is dynamically partition-able which is influenced by the backlog and the port + * queue size. This property disables * collecting stats and thus partitioning.<br/> * {@link #maxReaders}: Maximum number of readers when dynamic partitioning is on.<br/> * {@link #minReaders}: Minimum number of readers when dynamic partitioning is on.<br/> @@ -71,8 +82,9 @@ import com.datatorrent.common.util.BaseOperator; */ @StatsListener.DataQueueSize -public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM extends InputStream & PositionedReadable> extends BaseOperator implements - Partitioner<AbstractBlockReader<R, B, STREAM>>, StatsListener, Operator.IdleTimeHandler +public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM extends InputStream & PositionedReadable> + extends BaseOperator + implements Partitioner<AbstractBlockReader<R, B, STREAM>>, StatsListener, Operator.IdleTimeHandler { protected int operatorId; protected transient long windowId; @@ -110,17 +122,17 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext */ protected long intervalMillis; - protected transient final StatsListener.Response response; + protected final transient StatsListener.Response response; protected transient int partitionCount; - protected transient final Map<Integer, Integer> backlogPerOperator; + protected final transient Map<Integer, Integer> backlogPerOperator; private transient long nextMillis; protected transient B lastProcessedBlock; protected transient long lastBlockOpenTime; protected transient boolean consecutiveBlock; - public final transient DefaultOutputPort<B> blocksMetadataOutput = new DefaultOutputPort<B>(); - public final transient DefaultOutputPort<ReaderRecord<R>> messages = new DefaultOutputPort<ReaderRecord<R>>(); + public final transient DefaultOutputPort<B> blocksMetadataOutput = new DefaultOutputPort<>(); + public final transient DefaultOutputPort<ReaderRecord<R>> messages = new DefaultOutputPort<>(); public final transient DefaultInputPort<B> blocksMetadataInput = new DefaultInputPort<B>() { @@ -139,7 +151,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext response = new StatsListener.Response(); backlogPerOperator = Maps.newHashMap(); partitionCount = 1; - counters = new BasicCounters<MutableLong>(MutableLong.class); + counters = new BasicCounters<>(MutableLong.class); collectStats = true; lastBlockOpenTime = -1; } @@ -172,17 +184,14 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext try { teardownStream(lastProcessedBlock); lastProcessedBlock = null; - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } - } - else { + } else { /* nothing to do here, so sleep for a while to avoid busy loop */ try { Thread.sleep(sleepTimeMillis); - } - catch (InterruptedException ie) { + } catch (InterruptedException ie) { throw new RuntimeException(ie); } } @@ -199,13 +208,13 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext { try { long blockStartTime = System.currentTimeMillis(); - if (block.getPreviousBlockId() == -1 || lastProcessedBlock == null || block.getPreviousBlockId() != lastProcessedBlock.getBlockId()) { + if (block.getPreviousBlockId() == -1 || lastProcessedBlock == null + || block.getPreviousBlockId() != lastProcessedBlock.getBlockId()) { teardownStream(lastProcessedBlock); consecutiveBlock = false; lastBlockOpenTime = System.currentTimeMillis(); stream = setupStream(block); - } - else { + } else { consecutiveBlock = true; } readBlock(block); @@ -216,15 +225,13 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext blocksMetadataOutput.emit(block); } blocksPerWindow++; - } - catch (IOException ie) { + } catch (IOException ie) { try { if (lastProcessedBlock != null) { teardownStream(lastProcessedBlock); lastProcessedBlock = null; } - } - catch (IOException ioe) { + } catch (IOException ioe) { throw new RuntimeException("closing last", ie); } throw new RuntimeException(ie); @@ -250,7 +257,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext //If the record is partial then ignore the record. if (record != null) { counters.getCounter(ReaderCounterKeys.RECORDS).increment(); - messages.emit(new ReaderRecord<R>(blockMetadata.getBlockId(), record)); + messages.emit(new ReaderRecord<>(blockMetadata.getBlockId(), record)); } } } @@ -261,7 +268,8 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext */ @SuppressWarnings("unchecked") @Override - public Collection<Partition<AbstractBlockReader<R, B, STREAM>>> definePartitions(Collection<Partition<AbstractBlockReader<R, B, STREAM>>> partitions, PartitioningContext context) + public Collection<Partition<AbstractBlockReader<R, B, STREAM>>> definePartitions( + Collection<Partition<AbstractBlockReader<R, B, STREAM>>> partitions, PartitioningContext context) { if (partitions.iterator().next().getStats() == null) { //First time when define partitions is called @@ -271,7 +279,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext //Create new partitions for (Partition<AbstractBlockReader<R, B, STREAM>> partition : partitions) { - newPartitions.add(new DefaultPartition<AbstractBlockReader<R, B, STREAM>>(partition.getPartitionedInstance())); + newPartitions.add(new DefaultPartition<>(partition.getPartitionedInstance())); } partitions.clear(); int morePartitionsToCreate = partitionCount - newPartitions.size(); @@ -287,8 +295,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext LOG.debug("partition removed {}", toRemove.getPartitionedInstance().operatorId); partitionIterator.remove(); } - } - else { + } else { //Add more partitions Kryo kryo = new Kryo(); while (morePartitionsToCreate-- > 0) { @@ -301,7 +308,8 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext @SuppressWarnings("unchecked") AbstractBlockReader<R, B, STREAM> blockReader = kryo.readObject(lInput, this.getClass()); - DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<AbstractBlockReader<R, B, STREAM>>(blockReader); + DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<>( + blockReader); newPartitions.add(partition); } } @@ -389,12 +397,10 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext if (totalBacklog > maxReaders) { LOG.debug("large backlog {}", totalBacklog); newPartitionCount = maxReaders; - } - else if (totalBacklog < minReaders) { + } else if (totalBacklog < minReaders) { LOG.debug("small backlog {}", totalBacklog); newPartitionCount = minReaders; - } - else { + } else { newPartitionCount = getAdjustedCount(totalBacklog); LOG.debug("moderate backlog {}", totalBacklog); } @@ -574,7 +580,7 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext } - public static enum ReaderCounterKeys + public enum ReaderCounterKeys { RECORDS, BLOCKS, BYTES, TIME } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java index 006df45..d74c9c9 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractFSBlockReader.java @@ -36,7 +36,8 @@ import com.datatorrent.api.StatsListener; * @since 2.1.0 */ @StatsListener.DataQueueSize -public abstract class AbstractFSBlockReader<R> extends AbstractBlockReader<R, BlockMetadata.FileBlockMetadata, FSDataInputStream> +public abstract class AbstractFSBlockReader<R> + extends AbstractBlockReader<R, BlockMetadata.FileBlockMetadata, FSDataInputStream> { protected transient FileSystem fs; protected transient Configuration configuration; @@ -48,8 +49,7 @@ public abstract class AbstractFSBlockReader<R> extends AbstractBlockReader<R, Bl configuration = new Configuration(); try { fs = getFSInstance(); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException("creating fs", e); } } @@ -60,8 +60,7 @@ public abstract class AbstractFSBlockReader<R> extends AbstractBlockReader<R, Bl super.teardown(); try { fs.close(); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } } @@ -88,27 +87,28 @@ public abstract class AbstractFSBlockReader<R> extends AbstractBlockReader<R, Bl * * @param <R> type of records */ - public static abstract class AbstractFSLineReader<R> extends AbstractFSBlockReader<R> + public abstract static class AbstractFSLineReader<R> extends AbstractFSBlockReader<R> { public AbstractFSLineReader() { super(); - this.readerContext = new ReaderContext.LineReaderContext<FSDataInputStream>(); + this.readerContext = new ReaderContext.LineReaderContext<>(); } } /** - * An {@link AbstractFSBlockReader} which reads lines from the block using {@link ReaderContext.ReadAheadLineReaderContext} + * An {@link AbstractFSBlockReader} which reads lines from the block using + * {@link ReaderContext.ReadAheadLineReaderContext} * - * @param <R> + * @param <R> type of record. */ - public static abstract class AbstractFSReadAheadLineReader<R> extends AbstractFSBlockReader<R> + public abstract static class AbstractFSReadAheadLineReader<R> extends AbstractFSBlockReader<R> { public AbstractFSReadAheadLineReader() { super(); - this.readerContext = new ReaderContext.ReadAheadLineReaderContext<FSDataInputStream>(); + this.readerContext = new ReaderContext.ReadAheadLineReaderContext<>(); } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java index 24e9623..534024d 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockMetadata.java @@ -54,7 +54,7 @@ public interface BlockMetadata */ long getPreviousBlockId(); - public abstract class AbstractBlockMetadata implements BlockMetadata + abstract class AbstractBlockMetadata implements BlockMetadata { private long offset; private long length; @@ -198,7 +198,7 @@ public interface BlockMetadata /** * A block of file which contains file path adn other block properties. */ - public static class FileBlockMetadata extends AbstractBlockMetadata + class FileBlockMetadata extends AbstractBlockMetadata { private final String filePath; @@ -208,7 +208,8 @@ public interface BlockMetadata filePath = null; } - public FileBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock, long previousBlockId) + public FileBlockMetadata(String filePath, long blockId, long offset, long length, boolean isLastBlock, + long previousBlockId) { super(blockId, offset, length, isLastBlock, previousBlockId); this.filePath = filePath; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java index 948fc3e..ad55358 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java @@ -18,10 +18,7 @@ */ package com.datatorrent.lib.io.block; -import org.apache.hadoop.fs.FSDataInputStream; - import com.datatorrent.api.StatsListener; - import com.datatorrent.netlet.util.Slice; /** @@ -38,7 +35,7 @@ public class FSSliceReader extends AbstractFSBlockReader<Slice> public FSSliceReader() { super(); - this.readerContext = new ReaderContext.FixedBytesReaderContext<FSDataInputStream>(); + this.readerContext = new ReaderContext.FixedBytesReaderContext<>(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java index b69b1ee..1ee6c53 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/ReaderContext.java @@ -22,16 +22,16 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.PositionedReadable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.PositionedReadable; + /** * This controls how an {@link AbstractBlockReader} reads a {@link BlockMetadata}. * * @param <STREAM> type of stream - * * @since 2.1.0 */ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> @@ -59,7 +59,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> * Represents the total bytes used to construct the record.<br/> * Used bytes can be different from the bytes in the record. */ - public static class Entity + class Entity { private byte[] record; private long usedBytes; @@ -97,9 +97,8 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> * * @param <STREAM> type of stream. */ - public static abstract class AbstractReaderContext<STREAM extends InputStream & PositionedReadable> implements ReaderContext<STREAM> + abstract class AbstractReaderContext<STREAM extends InputStream & PositionedReadable> implements ReaderContext<STREAM> { - protected transient long offset; protected transient STREAM stream; protected transient BlockMetadata blockMetadata; @@ -143,8 +142,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> * * @param <STREAM> type of stream. */ - - public static class LineReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM> + class LineReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM> { protected int bufferSize; @@ -178,8 +176,8 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> @Override protected Entity readEntity() throws IOException { - //Implemented a buffered reader instead of using java's BufferedReader because it was reading much ahead of block boundary - //and faced issues with duplicate records. Controlling the buffer size didn't help either. + //Implemented a buffered reader instead of using java's BufferedReader because it was reading much ahead of block + // boundary and faced issues with duplicate records. Controlling the buffer size didn't help either. boolean foundEOL = false; int bytesRead = 0; @@ -200,8 +198,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> if (c != '\r' && c != '\n') { tmpBuilder.write(c); posInStr++; - } - else { + } else { foundEOL = true; break; } @@ -216,14 +213,12 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> if (c == '\r' || c == '\n') { emptyBuilder.write(c); posInStr++; - } - else { + } else { break; } } usedBytes += emptyBuilder.toByteArray().length; - } - else { + } else { //read more bytes from the input stream posInStr = 0; } @@ -266,14 +261,14 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> /** * Another reader context that splits the block into records on '\n' or '\r'.<br/> * This implementation doesn't need a way to validate the start of a record.<br/> - * + * <p/> * This starts parsing the block (except the first block of the file) from the first eol character. * It is a less optimized version of an {@link LineReaderContext} which always reads beyond the block * boundary. * * @param <STREAM> */ - public static class ReadAheadLineReaderContext<STREAM extends InputStream & PositionedReadable> extends LineReaderContext<STREAM> + class ReadAheadLineReaderContext<STREAM extends InputStream & PositionedReadable> extends LineReaderContext<STREAM> { @Override public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock) @@ -285,8 +280,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> try { Entity entity = readEntity(); offset += entity.usedBytes; - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException("when reading first entity", e); } } @@ -310,7 +304,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> * * @param <STREAM> type of stream. */ - public static class FixedBytesReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM> + class FixedBytesReaderContext<STREAM extends InputStream & PositionedReadable> extends AbstractReaderContext<STREAM> { //When this field is null, it is initialized to default fs block size in setup. protected Integer length; @@ -319,7 +313,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> public void initialize(STREAM stream, BlockMetadata blockMetadata, boolean consecutiveBlock) { if (length == null) { - length = (int) new Configuration().getLong("fs.local.block.size", 32 * 1024 * 1024); + length = (int)new Configuration().getLong("fs.local.block.size", 32 * 1024 * 1024); LOG.debug("length init {}", length); } super.initialize(stream, blockMetadata, consecutiveBlock); @@ -331,7 +325,7 @@ public interface ReaderContext<STREAM extends InputStream & PositionedReadable> entity.clear(); int bytesToRead = length; if (offset + length >= blockMetadata.getLength()) { - bytesToRead = (int) (blockMetadata.getLength() - offset); + bytesToRead = (int)(blockMetadata.getLength() - offset); } byte[] record = new byte[bytesToRead]; stream.readFully(offset, record, 0, bytesToRead); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/main/java/com/datatorrent/lib/io/block/package-info.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/package-info.java b/library/src/main/java/com/datatorrent/lib/io/block/package-info.java index 14cfd47..bc89b6e 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/package-info.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/package-info.java @@ -16,4 +16,4 @@ * specific language governing permissions and limitations * under the License. */ -package com.datatorrent.lib.io.block; \ No newline at end of file +package com.datatorrent.lib.io.block; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java index 3610191..b2b99e6 100644 --- a/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/block/AbstractBlockReaderTest.java @@ -21,21 +21,21 @@ package com.datatorrent.lib.io.block; import java.util.Collection; import java.util.List; -import org.apache.commons.lang.mutable.MutableLong; -import org.apache.hadoop.fs.FSDataInputStream; import org.junit.Assert; import org.junit.Test; +import org.apache.commons.lang.mutable.MutableLong; +import org.apache.hadoop.fs.FSDataInputStream; + import com.google.common.collect.Lists; import com.datatorrent.api.DefaultPartition; import com.datatorrent.api.Partitioner; import com.datatorrent.api.Stats; import com.datatorrent.api.StatsListener; - -import com.datatorrent.netlet.util.Slice; import com.datatorrent.lib.counters.BasicCounters; import com.datatorrent.lib.util.TestUtils; +import com.datatorrent.netlet.util.Slice; /** * Stats and partitioning tests for {@link AbstractBlockReader} @@ -105,19 +105,23 @@ public class AbstractBlockReaderTest Assert.assertTrue("partition needed", response.repartitionRequired); Assert.assertEquals("partition count changed", 8, sliceReader.getPartitionCount()); - List<Partitioner.Partition<AbstractBlockReader<Slice, - BlockMetadata.FileBlockMetadata, FSDataInputStream>>> partitions = Lists.newArrayList(); + List<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>> + partitions = Lists + .newArrayList(); - DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition = - new DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(sliceReader); + DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition = new + DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>( + sliceReader); - TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> pseudoParttion = - new TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(apartition, readerStats); + TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> + pseudoParttion = new TestUtils.MockPartition<>( + apartition, readerStats); partitions.add(pseudoParttion); - Collection<Partitioner.Partition<AbstractBlockReader<Slice, - BlockMetadata.FileBlockMetadata, FSDataInputStream>>> newPartitions = sliceReader.definePartitions(partitions, null); + Collection<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>> + newPartitions = sliceReader + .definePartitions(partitions, null); Assert.assertEquals(8, newPartitions.size()); } @@ -131,39 +135,43 @@ public class AbstractBlockReaderTest TestReader sliceReader = new TestReader(); sliceReader.processStats(readerStats); - List<Partitioner.Partition<AbstractBlockReader<Slice, - BlockMetadata.FileBlockMetadata, FSDataInputStream>>> partitions = Lists.newArrayList(); + List<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>> + partitions = Lists + .newArrayList(); - DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition = - new DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(sliceReader); + DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> apartition = new + DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>( + sliceReader); - TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> pseudoParttion = - new TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>(apartition, readerStats); + TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> + pseudoParttion = new TestUtils.MockPartition<>(apartition, readerStats); partitions.add(pseudoParttion); - Collection<Partitioner.Partition<AbstractBlockReader<Slice, - BlockMetadata.FileBlockMetadata, FSDataInputStream>>> newPartitions = sliceReader.definePartitions(partitions, null); + Collection<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>> + newPartitions = sliceReader.definePartitions(partitions, null); + + List<Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>> + newMocks = Lists.newArrayList(); - List<Partitioner.Partition<AbstractBlockReader<Slice, - BlockMetadata.FileBlockMetadata, FSDataInputStream>>> newMocks = Lists.newArrayList(); + for (Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> partition + : newPartitions) { - for (Partitioner.Partition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>> partition : - newPartitions) { - partition.getPartitionedInstance().counters.setCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS, new MutableLong(1)); + partition.getPartitionedInstance().counters + .setCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS, new MutableLong(1)); - newMocks.add( - new TestUtils.MockPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>( - (DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>) partition, - readerStats) - ); + newMocks.add(new TestUtils.MockPartition<>( + (DefaultPartition<AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream>>)partition, + readerStats)); } sliceReader.partitionCount = 1; newPartitions = sliceReader.definePartitions(newMocks, null); Assert.assertEquals(1, newPartitions.size()); - AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream> last = newPartitions.iterator().next().getPartitionedInstance(); - Assert.assertEquals("num blocks", 8, last.counters.getCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS).longValue()); + AbstractBlockReader<Slice, BlockMetadata.FileBlockMetadata, FSDataInputStream> last = newPartitions.iterator() + .next().getPartitionedInstance(); + Assert.assertEquals("num blocks", 8, + last.counters.getCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS).longValue()); } static class ReaderStats extends Stats.OperatorStats @@ -171,7 +179,7 @@ public class AbstractBlockReaderTest ReaderStats(int backlog, long readBlocks, long bytes, long time) { - BasicCounters<MutableLong> bc = new BasicCounters<MutableLong>(MutableLong.class); + BasicCounters<MutableLong> bc = new BasicCounters<>(MutableLong.class); bc.setCounter(AbstractBlockReader.ReaderCounterKeys.BLOCKS, new MutableLong(readBlocks)); bc.setCounter(AbstractBlockReader.ReaderCounterKeys.BYTES, new MutableLong(bytes)); bc.setCounter(AbstractBlockReader.ReaderCounterKeys.TIME, new MutableLong(time)); @@ -191,4 +199,4 @@ public class AbstractBlockReaderTest return partitionCount; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java index 32ac536..5ddc8a9 100644 --- a/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/block/FSLineReaderTest.java @@ -18,7 +18,11 @@ */ package com.datatorrent.lib.io.block; -import java.io.*; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStreamReader; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; @@ -36,7 +40,6 @@ import com.google.common.collect.Lists; import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; - import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.CollectorTestSink; @@ -74,10 +77,10 @@ public class FSLineReaderTest blockReader.setup(readerContext); - messageSink = new CollectorTestSink<Object>(); + messageSink = new CollectorTestSink<>(); blockReader.messages.setSink(messageSink); - blockMetadataSink = new CollectorTestSink<Object>(); + blockMetadataSink = new CollectorTestSink<>(); blockReader.blocksMetadataOutput.setSink(blockMetadataSink); BufferedReader reader; @@ -88,11 +91,7 @@ public class FSLineReaderTest messages.add(line.split(",")); } reader.close(); - } - catch (FileNotFoundException e) { - throw new RuntimeException(e); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } } @@ -111,8 +110,9 @@ public class FSLineReaderTest public void test() { - BlockMetadata.FileBlockMetadata block = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), 0L, 0L, testMeta.dataFile.length(), - true, -1); + BlockMetadata.FileBlockMetadata block = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), 0L, + 0L, testMeta.dataFile.length(), + true, -1); testMeta.blockReader.beginWindow(1); testMeta.blockReader.blocksMetadataInput.process(block); @@ -123,7 +123,7 @@ public class FSLineReaderTest for (int i = 0; i < messages.size(); i++) { @SuppressWarnings("unchecked") - AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>) messages.get(i); + AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>)messages.get(i); Assert.assertTrue("line " + i, Arrays.equals(msg.getRecord().split(","), testMeta.messages.get(i))); } } @@ -132,13 +132,16 @@ public class FSLineReaderTest public void testMultipleBlocks() { long blockSize = 1000; - int noOfBlocks = (int) ((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1)); + int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ? + 0 : + 1)); testMeta.blockReader.beginWindow(1); for (int i = 0; i < noOfBlocks; i++) { - BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), i, i * blockSize, - i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, i - 1); + BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata( + testMeta.dataFile.getAbsolutePath(), i, i * blockSize, + i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, i - 1); testMeta.blockReader.blocksMetadataInput.process(blockMetadata); } @@ -148,7 +151,7 @@ public class FSLineReaderTest Assert.assertEquals("No of records", testMeta.messages.size(), messages.size()); for (int i = 0; i < messages.size(); i++) { @SuppressWarnings("unchecked") - AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>) messages.get(i); + AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>)messages.get(i); Assert.assertTrue("line " + i, Arrays.equals(msg.getRecord().split(","), testMeta.messages.get(i))); } } @@ -157,13 +160,17 @@ public class FSLineReaderTest public void testNonConsecutiveBlocks() { long blockSize = 1000; - int noOfBlocks = (int) ((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1)); + int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ? + 0 : + 1)); testMeta.blockReader.beginWindow(1); for (int i = 0; i < noOfBlocks; i++) { - BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), i, - i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, -1); + BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata( + testMeta.dataFile.getAbsolutePath(), i, + i * blockSize, i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, i == noOfBlocks - 1, + -1); testMeta.blockReader.blocksMetadataInput.process(blockMetadata); } @@ -173,7 +180,7 @@ public class FSLineReaderTest Assert.assertEquals("No of records", testMeta.messages.size(), messages.size()); for (int i = 0; i < messages.size(); i++) { @SuppressWarnings("unchecked") - AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>) messages.get(i); + AbstractBlockReader.ReaderRecord<String> msg = (AbstractBlockReader.ReaderRecord<String>)messages.get(i); Assert.assertTrue("line " + i, Arrays.equals(msg.getRecord().split(","), testMeta.messages.get(i))); } } @@ -193,4 +200,4 @@ public class FSLineReaderTest @SuppressWarnings("unused") private static final Logger LOG = LoggerFactory.getLogger(FSLineReaderTest.class); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java index 23aa1af..37222a7 100644 --- a/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/block/FSSliceReaderTest.java @@ -23,20 +23,20 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.List; -import org.apache.commons.io.FileUtils; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; +import org.apache.commons.io.FileUtils; + import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; - -import com.datatorrent.netlet.util.Slice; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.netlet.util.Slice; /** * Tests for {@link FSSliceReader}. @@ -63,8 +63,7 @@ public class FSSliceReaderTest output = "target/" + description.getClassName() + "/" + description.getMethodName(); try { FileUtils.forceMkdir(new File(output)); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } dataFile = new File("src/test/resources/reader_test_data.csv"); @@ -77,10 +76,10 @@ public class FSSliceReaderTest blockReader.setup(readerContext); - messageSink = new CollectorTestSink<Object>(); + messageSink = new CollectorTestSink<>(); blockReader.messages.setSink(messageSink); - blockMetadataSink = new CollectorTestSink<Object>(); + blockMetadataSink = new CollectorTestSink<>(); blockReader.blocksMetadataOutput.setSink(blockMetadataSink); } @@ -90,8 +89,7 @@ public class FSSliceReaderTest blockReader.teardown(); try { FileUtils.forceDelete(new File("target/" + description.getClassName())); - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } } @@ -104,14 +102,16 @@ public class FSSliceReaderTest public void testBytesReceived() throws IOException { long blockSize = 1500; - int noOfBlocks = (int) ((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ? 0 : 1)); + int noOfBlocks = (int)((testMeta.dataFile.length() / blockSize) + (((testMeta.dataFile.length() % blockSize) == 0) ? + 0 : 1)); testMeta.blockReader.beginWindow(1); for (int i = 0; i < noOfBlocks; i++) { - BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata(testMeta.dataFile.getAbsolutePath(), i, i * blockSize, - i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, - i == noOfBlocks - 1, i - 1); + BlockMetadata.FileBlockMetadata blockMetadata = new BlockMetadata.FileBlockMetadata( + testMeta.dataFile.getAbsolutePath(), i, i * blockSize, + i == noOfBlocks - 1 ? testMeta.dataFile.length() : (i + 1) * blockSize, + i == noOfBlocks - 1, i - 1); testMeta.blockReader.blocksMetadataInput.process(blockMetadata); } @@ -125,7 +125,7 @@ public class FSSliceReaderTest for (Object message : messages) { @SuppressWarnings("unchecked") - AbstractBlockReader.ReaderRecord<Slice> msg = (AbstractBlockReader.ReaderRecord<Slice>) message; + AbstractBlockReader.ReaderRecord<Slice> msg = (AbstractBlockReader.ReaderRecord<Slice>)message; totatBytesReceived += msg.getRecord().length; outputStream.write(msg.getRecord().buffer); } @@ -135,4 +135,4 @@ public class FSSliceReaderTest FileUtils.contentEquals(testMeta.dataFile, outputFile); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java b/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java index 056e7b2..5e0aa22 100644 --- a/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/block/ReadAheadLineReaderTest.java @@ -34,4 +34,4 @@ public class ReadAheadLineReaderTest extends FSLineReaderTest return new String(bytes); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java index b47315b..9e110be 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java @@ -23,18 +23,29 @@ import java.io.IOException; import java.util.Set; import java.util.concurrent.CountDownLatch; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; - -import org.junit.*; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; import org.junit.rules.TestWatcher; import org.junit.runner.Description; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.*; -import com.datatorrent.api.annotation.ApplicationAnnotation; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.Attribute; +import com.datatorrent.api.AutoMetric; +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.LocalMode; +import com.datatorrent.api.Stats; +import com.datatorrent.api.StatsListener; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.io.block.BlockMetadata; @@ -218,7 +229,8 @@ public class FileSplitterBaseTest int count; transient CountDownLatch latch = new CountDownLatch(1); - public final transient DefaultInputPort<FileSplitterInput.FileMetadata> fileMetadata = new DefaultInputPort<FileSplitterInput.FileMetadata>() + public final transient DefaultInputPort<FileSplitterInput.FileMetadata> fileMetadata = new + DefaultInputPort<FileSplitterInput.FileMetadata>() { @Override public void process(FileSplitterInput.FileMetadata fileMetadata) http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/17d05763/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java index 09e3b9e..e62f643 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java @@ -25,10 +25,6 @@ import java.util.Set; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeoutException; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.Path; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Ignore; @@ -39,11 +35,15 @@ import org.junit.runner.Description; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; + import com.google.common.collect.Sets; import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; - import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.io.block.BlockMetadata; @@ -59,9 +59,9 @@ public class FileSplitterTest protected void finished(Description description) { try { - FileContext.getLocalFSFileContext().delete(new Path(new File("target/" + description.getClassName()).getAbsolutePath()), true); - } - catch (IOException e) { + FileContext.getLocalFSFileContext() + .delete(new Path(new File("target/" + description.getClassName()).getAbsolutePath()), true); + } catch (IOException e) { throw new RuntimeException(e); } } @@ -77,7 +77,7 @@ public class FileSplitterTest Set<String> filePaths = Sets.newHashSet(); Context.OperatorContext context; - Exchanger<Integer> exchanger = new Exchanger<Integer>(); + Exchanger<Integer> exchanger = new Exchanger<>(); @Override protected void starting(org.junit.runner.Description description) @@ -100,8 +100,7 @@ public class FileSplitterTest filePaths.add(new Path(this.dataDirectory, created.getName()).toUri().toString()); FileUtils.write(created, StringUtils.join(lines, '\n')); } - } - catch (IOException e) { + } catch (IOException e) { throw new RuntimeException(e); } @@ -118,10 +117,10 @@ public class FileSplitterTest context = new OperatorContextTestHelper.TestIdOperatorContext(0, attributes); fileSplitter.setup(context); - fileMetadataSink = new CollectorTestSink<FileSplitter.FileMetadata>(); + fileMetadataSink = new CollectorTestSink<>(); TestUtils.setSink(fileSplitter.filesMetadataOutput, fileMetadataSink); - blockMetadataSink = new CollectorTestSink<BlockMetadata.FileBlockMetadata>(); + blockMetadataSink = new CollectorTestSink<>(); TestUtils.setSink(fileSplitter.blocksMetadataOutput, blockMetadataSink); } @@ -148,7 +147,7 @@ public class FileSplitterTest testMeta.fileSplitter.endWindow(); Assert.assertEquals("File metadata", 12, testMeta.fileMetadataSink.collectedTuples.size()); for (Object fileMetadata : testMeta.fileMetadataSink.collectedTuples) { - FileSplitter.FileMetadata metadata = (FileSplitter.FileMetadata) fileMetadata; + FileSplitter.FileMetadata metadata = (FileSplitter.FileMetadata)fileMetadata; Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath())); Assert.assertNotNull("name: ", metadata.getFileName()); } @@ -165,7 +164,7 @@ public class FileSplitterTest testMeta.fileSplitter.emitTuples(); Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size()); for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) { - BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata) blockMetadata; + BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata; Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath())); } } @@ -184,7 +183,7 @@ public class FileSplitterTest for (int i = 0; i < 12; i++) { FileSplitter.FileMetadata fm = testMeta.fileMetadataSink.collectedTuples.get(i); File testFile = new File(testMeta.dataDirectory, fm.getFileName()); - noOfBlocks += (int) Math.ceil(testFile.length() / (2 * 1.0)); + noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0)); } Assert.assertEquals("Blocks", noOfBlocks, testMeta.blockMetadataSink.collectedTuples.size()); } @@ -193,7 +192,7 @@ public class FileSplitterTest public void testIdempotency() throws InterruptedException { IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = - new IdempotentStorageManager.FSIdempotentStorageManager(); + new IdempotentStorageManager.FSIdempotentStorageManager(); testMeta.fileSplitter.setIdempotentStorageManager(fsIdempotentStorageManager); testMeta.fileSplitter.setup(testMeta.context); @@ -206,7 +205,7 @@ public class FileSplitterTest testMeta.fileSplitter.beginWindow(1); Assert.assertEquals("Blocks", 12, testMeta.blockMetadataSink.collectedTuples.size()); for (Object blockMetadata : testMeta.blockMetadataSink.collectedTuples) { - BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata) blockMetadata; + BlockMetadata.FileBlockMetadata metadata = (BlockMetadata.FileBlockMetadata)blockMetadata; Assert.assertTrue("path: " + metadata.getFilePath(), testMeta.filePaths.contains(metadata.getFilePath())); } } @@ -271,7 +270,7 @@ public class FileSplitterTest int noOfBlocks = 0; for (int i = 0; i < 12; i++) { File testFile = new File(testMeta.dataDirectory, "file" + i + ".txt"); - noOfBlocks += (int) Math.ceil(testFile.length() / (2 * 1.0)); + noOfBlocks += (int)Math.ceil(testFile.length() / (2 * 1.0)); } testMeta.fileSplitter.setBlockSize(2L); @@ -297,7 +296,8 @@ public class FileSplitterTest @Test public void testIdempotencyWithBlocksThreshold() throws InterruptedException { - IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager + .FSIdempotentStorageManager(); testMeta.fileSplitter.setIdempotentStorageManager(fsIdempotentStorageManager); testMeta.fileSplitter.setBlocksThreshold(10); testMeta.fileSplitter.scanner.setScanIntervalMillis(500); @@ -346,7 +346,8 @@ public class FileSplitterTest @Ignore public void testRecoveryOfPartialFile() throws InterruptedException { - IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager + .FSIdempotentStorageManager(); fsIdempotentStorageManager.setRecoveryPath(testMeta.dataDirectory + '/' + "recovery"); testMeta.fileSplitter.setIdempotentStorageManager(fsIdempotentStorageManager); testMeta.fileSplitter.setBlockSize(2L); @@ -395,8 +396,10 @@ public class FileSplitterTest String file2 = testMeta.fileMetadataSink.collectedTuples.get(0).getFileName(); - Assert.assertTrue("Block file name 0", testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1)); - Assert.assertTrue("Block file name 1", testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2)); + Assert.assertTrue("Block file name 0", + testMeta.blockMetadataSink.collectedTuples.get(0).getFilePath().endsWith(file1)); + Assert.assertTrue("Block file name 1", + testMeta.blockMetadataSink.collectedTuples.get(1).getFilePath().endsWith(file2)); } @Test @@ -442,7 +445,7 @@ public class FileSplitterTest testMeta.fileSplitter.endWindow(); Assert.assertEquals("File metadata count", 1, testMeta.fileMetadataSink.collectedTuples.size()); Assert.assertEquals("File metadata", new File(testMeta.dataDirectory + "/file1.txt").getAbsolutePath(), - testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath()); + testMeta.fileMetadataSink.collectedTuples.get(0).getFilePath()); } private static class MockScanner extends FileSplitter.TimeBasedDirectoryScanner @@ -463,8 +466,7 @@ public class FileSplitterTest LOG.debug("discovered {}", discoveredFiles.size()); testMeta.exchanger.exchange(discoveredFiles.size()); } - } - catch (InterruptedException e) { + } catch (InterruptedException e) { throw new RuntimeException(e); } }
