APEX-273 - Fix existing checkstyle violations in bufferserver module
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/799df6c0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/799df6c0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/799df6c0 Branch: refs/heads/devel-3 Commit: 799df6c0aba94c20bef0eb2d975d3f5649b02f54 Parents: 892355c Author: MalharJenkins <[email protected]> Authored: Thu Nov 19 15:44:13 2015 -0800 Committer: Vlad Rozov <[email protected]> Committed: Fri Nov 20 18:31:15 2015 -0800 ---------------------------------------------------------------------- bufferserver/pom.xml | 3 +- .../bufferserver/auth/AuthManager.java | 2 +- .../bufferserver/client/AuthClient.java | 6 +- .../bufferserver/client/Controller.java | 1 - .../bufferserver/client/Subscriber.java | 15 +--- .../bufferserver/internal/DataList.java | 90 ++++++++++++-------- .../bufferserver/internal/DataListener.java | 10 +-- .../bufferserver/internal/FastDataList.java | 12 +-- .../bufferserver/internal/LogicalNode.java | 29 ++++--- .../bufferserver/internal/PhysicalNode.java | 3 +- .../bufferserver/packet/DataTuple.java | 8 +- .../bufferserver/packet/EmptyTuple.java | 10 +-- .../packet/GenericRequestTuple.java | 20 ++--- .../bufferserver/packet/MessageType.java | 17 +++- .../bufferserver/packet/PayloadTuple.java | 6 +- .../packet/PublishRequestTuple.java | 5 +- .../bufferserver/packet/RequestTuple.java | 6 +- .../bufferserver/packet/ResetRequestTuple.java | 5 +- .../bufferserver/packet/ResetWindowTuple.java | 6 +- .../packet/SubscribeRequestTuple.java | 59 +++++-------- .../datatorrent/bufferserver/packet/Tuple.java | 15 ++-- .../bufferserver/packet/WindowIdTuple.java | 8 +- .../bufferserver/policy/AbstractPolicy.java | 12 +-- .../bufferserver/policy/GiveAll.java | 2 +- .../bufferserver/policy/LeastBusy.java | 14 +-- .../datatorrent/bufferserver/policy/Policy.java | 10 +-- .../bufferserver/policy/RandomOne.java | 12 +-- .../bufferserver/policy/RoundRobin.java | 18 ++-- .../datatorrent/bufferserver/server/Server.java | 44 ++++++---- .../bufferserver/storage/DiskStorage.java | 73 ++++++---------- .../bufferserver/util/SerializedData.java | 4 +- .../datatorrent/bufferserver/util/System.java | 12 ++- .../datatorrent/bufferserver/util/VarInt.java | 9 +- .../bufferserver/client/SubscriberTest.java | 39 ++++----- .../bufferserver/packet/NoMessageTupleTest.java | 23 +---- .../packet/PublishRequestTupleTest.java | 10 +-- .../packet/ResetWindowTupleTest.java | 10 +-- .../packet/SubscribeRequestTupleTest.java | 20 +++-- .../bufferserver/server/ServerTest.java | 67 ++++++++------- .../bufferserver/storage/DiskStorageTest.java | 11 +-- .../bufferserver/support/Subscriber.java | 9 +- .../bufferserver/util/CodecTest.java | 4 +- 42 files changed, 346 insertions(+), 393 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/pom.xml ---------------------------------------------------------------------- diff --git a/bufferserver/pom.xml b/bufferserver/pom.xml index f6fc8b3..efc5b5e 100644 --- a/bufferserver/pom.xml +++ b/bufferserver/pom.xml @@ -48,10 +48,9 @@ </plugin> --> <plugin> - <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <configuration> - <maxAllowedViolations>228</maxAllowedViolations> + <consoleOutput>true</consoleOutput> </configuration> </plugin> </plugins> http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java index 453befa..942a896 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/auth/AuthManager.java @@ -27,7 +27,7 @@ import java.security.SecureRandom; */ public class AuthManager { - private final static int BUFFER_SERVER_TOKEN_LENGTH = 20; + private static final int BUFFER_SERVER_TOKEN_LENGTH = 20; private static SecureRandom generator = new SecureRandom(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java index fc105b2..4465143 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/AuthClient.java @@ -45,7 +45,8 @@ public abstract class AuthClient extends AbstractLengthPrependerClient super(readbuffer, position, sendBufferSize); } - protected void sendAuthenticate() { + protected void sendAuthenticate() + { if (token != null) { write(token); } @@ -65,7 +66,8 @@ public abstract class AuthClient extends AbstractLengthPrependerClient } } if (!authenticated) { - throw new AccessControlException("Buffer server security is enabled. Access is restricted without proper credentials."); + throw new AccessControlException("Buffer server security is enabled." + + " Access is restricted without proper credentials."); } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java index d2faf69..0c3bac9 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Controller.java @@ -26,7 +26,6 @@ import com.datatorrent.bufferserver.packet.PurgeRequestTuple; import com.datatorrent.bufferserver.packet.ResetRequestTuple; import com.datatorrent.bufferserver.packet.Tuple; import com.datatorrent.bufferserver.util.Codec; -import com.datatorrent.netlet.AbstractLengthPrependerClient; import com.datatorrent.netlet.util.Slice; /** http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java index df91f0b..2b18d04 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/client/Subscriber.java @@ -23,7 +23,7 @@ import java.util.Collection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.bufferserver.packet.SubscribeRequestTuple; +import static com.datatorrent.bufferserver.packet.SubscribeRequestTuple.getSerializedRequest; /** * @@ -46,18 +46,11 @@ public abstract class Subscriber extends AuthClient this.id = id; } - public void activate(String version, String type, String sourceId, int mask, Collection<Integer> partitions, long windowId, int bufferSize) + public void activate(final String version, final String type, final String sourceId, final int mask, + final Collection<Integer> partitions, final long windowId, final int bufferSize) { sendAuthenticate(); - write(SubscribeRequestTuple.getSerializedRequest( - version, - id, - type, - sourceId, - mask, - partitions, - windowId, - bufferSize)); + write(getSerializedRequest(version, id, type, sourceId, mask, partitions, windowId, bufferSize)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 1f6c273..fa20aa2 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -90,8 +90,8 @@ public class DataList public DataList(String identifier) { /* - * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block at a time to the filesystem. - * we will use default value of 8 block sizes to be cached in memory + * We use 64MB (the default HDFS block getSize) as the getSize of the memory pool so we can flush the data 1 block + * at a time to the filesystem. We will use default value of 8 block sizes to be cached in memory */ this(identifier, 64 * 1024 * 1024, 8); } @@ -104,7 +104,8 @@ public class DataList public void rewind(final int baseSeconds, final int windowId) throws IOException { final long longWindowId = (long)baseSeconds << 32 | windowId; - logger.debug("Rewinding {} from window ID {} to window ID {}", this, Codec.getStringWindowId(last.ending_window), Codec.getStringWindowId(longWindowId)); + logger.debug("Rewinding {} from window ID {} to window ID {}", this, Codec.getStringWindowId(last.ending_window), + Codec.getStringWindowId(longWindowId)); int numberOfInMemBlockRewound = 0; synchronized (this) { @@ -139,13 +140,16 @@ public class DataList } /* - TODO: properly rewind Data List iterators, especially handle case when iterators point to blocks past the last block. - */ + * TODO: properly rewind Data List iterators, especially handle case when iterators point to blocks past the last + * block. + */ final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockRewound); - assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; + assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; resumeSuspendedClients(numberOfInMemBlockPermits); - logger.debug("Discarded {} in memory blocks during rewind. Number of in memory blocks permits {} after rewinding {}. ", numberOfInMemBlockRewound, numberOfInMemBlockPermits, this); + logger.debug("Discarded {} in memory blocks during rewind. Number of in memory blocks permits {} after" + + " rewinding {}.", numberOfInMemBlockRewound, numberOfInMemBlockPermits, this); } @@ -177,11 +181,13 @@ public class DataList public void purge(final int baseSeconds, final int windowId) { final long longWindowId = (long)baseSeconds << 32 | windowId; - logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window), Codec.getStringWindowId(longWindowId)); + logger.debug("Purging {} from window ID {} to window ID {}", this, Codec.getStringWindowId(first.starting_window), + Codec.getStringWindowId(longWindowId)); int numberOfInMemBlockPurged = 0; synchronized (this) { - for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId; prev = temp, temp = temp.next) { + for (Block prev = null, temp = first; temp != null && temp.starting_window <= longWindowId; + prev = temp, temp = temp.next) { if (temp.ending_window > longWindowId || temp == last) { if (prev != null) { first = temp; @@ -204,9 +210,11 @@ public class DataList } final int numberOfInMemBlockPermits = this.numberOfInMemBlockPermits.addAndGet(numberOfInMemBlockPurged); - assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; + assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; resumeSuspendedClients(numberOfInMemBlockPermits); - logger.debug("Discarded {} in memory blocks during purge. Number of in memory blocks permits {} after purging {}. ", numberOfInMemBlockPurged, numberOfInMemBlockPermits, this); + logger.debug("Discarded {} in memory blocks during purge. Number of in memory blocks permits {} after purging {}. ", + numberOfInMemBlockPurged, numberOfInMemBlockPermits, this); } @@ -220,7 +228,8 @@ public class DataList public void flush(final int writeOffset) { - //logger.debug("size = {}, processingOffset = {}, nextOffset = {}, writeOffset = {}", size, processingOffset, nextOffset.integer, writeOffset); + //logger.debug("size = {}, processingOffset = {}, nextOffset = {}, writeOffset = {}", size, processingOffset, + // nextOffset.integer, writeOffset); flush: do { while (size == 0) { @@ -427,7 +436,8 @@ public class DataList suspendedClients.clear(); } } else { - logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients, numberOfInMemBlockPermits, all_listeners); + logger.debug("Keeping clients: {} suspended, numberOfInMemBlockPermits={}, Listeners: {}", suspendedClients, + numberOfInMemBlockPermits, all_listeners); } return resumedSuspendedClients; } @@ -608,10 +618,11 @@ public class DataList try (DataListIterator dli = getIterator(this)) { done: while (dli.hasNext()) { - SerializedData sd = dli.next(); + final SerializedData sd = dli.next(); + final int length = sd.length - sd.dataOffset + sd.offset; switch (sd.buffer[sd.dataOffset]) { case MessageType.RESET_WINDOW_VALUE: - ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + final ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length); bs = (long)rwt.getBaseSeconds() << 32; if (bs > windowId) { writingOffset = sd.offset; @@ -620,7 +631,7 @@ public class DataList break; case MessageType.BEGIN_WINDOW_VALUE: - BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length); if ((bs | bwt.getWindowId()) >= windowId) { writingOffset = sd.offset; break done; @@ -649,8 +660,9 @@ public class DataList public void purge(long longWindowId) { -// logger.debug("starting_window = {}, longWindowId = {}, ending_window = {}", -// new Object[] {VarInt.getStringWindowId(starting_window), VarInt.getStringWindowId(longWindowId), VarInt.getStringWindowId(ending_window)}); + //logger.debug("starting_window = {}, longWindowId = {}, ending_window = {}", + // VarInt.getStringWindowId(starting_window), VarInt.getStringWindowId(longWindowId), + // VarInt.getStringWindowId(ending_window)); boolean found = false; long bs = starting_window & 0xffffffff00000000L; SerializedData lastReset = null; @@ -659,20 +671,22 @@ public class DataList done: while (dli.hasNext()) { SerializedData sd = dli.next(); + final int length = sd.length - sd.dataOffset + sd.offset; switch (sd.buffer[sd.dataOffset]) { case MessageType.RESET_WINDOW_VALUE: - ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + ResetWindowTuple rwt = (ResetWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length); bs = (long)rwt.getBaseSeconds() << 32; lastReset = sd; break; case MessageType.BEGIN_WINDOW_VALUE: - BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, sd.length - sd.dataOffset + sd.offset); + BeginWindowTuple bwt = (BeginWindowTuple)Tuple.getTuple(sd.buffer, sd.dataOffset, length); if ((bs | bwt.getWindowId()) > longWindowId) { found = true; if (lastReset != null) { /* - * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of the reset tuple. + * Restore the last Reset tuple if there was any and adjust the writingOffset to the beginning of + * the reset tuple. */ if (sd.offset >= lastReset.length) { sd.offset -= lastReset.length; @@ -702,7 +716,8 @@ public class DataList * It helps with better utilization of the RAM. */ if (!found) { - //logger.debug("we could not find a tuple which is in a window later than the window to be purged, so this has to be the last window published so far"); + //logger.debug("we could not find a tuple which is in a window later than the window to be purged, " + + // "so this has to be the last window published so far"); if (lastReset != null && lastReset.offset != 0) { this.readingOffset = this.writingOffset - lastReset.length; System.arraycopy(lastReset.buffer, lastReset.offset, this.data, this.readingOffset, lastReset.length); @@ -797,7 +812,8 @@ public class DataList } } - private Runnable getStorer(final byte[] data, final int readingOffset, final int writingOffset, final Storage storage) + private Runnable getStorer(final byte[] data, final int readingOffset, final int writingOffset, + final Storage storage) { return new Runnable() { @@ -819,7 +835,8 @@ public class DataList logger.debug("Keeping Block {} unchanged", Block.this); } } - assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; + assert numberOfInMemBlockPermits < MAX_COUNT_OF_INMEM_BLOCKS : "Number of in memory block permits " + + numberOfInMemBlockPermits + " exceeded configured maximum " + MAX_COUNT_OF_INMEM_BLOCKS + '.'; resumeSuspendedClients(numberOfInMemBlockPermits); } } @@ -884,11 +901,14 @@ public class DataList @Override public String toString() { - return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier + ", data=" + (data == null ? "null" : data.length) - + ", readingOffset=" + readingOffset + ", writingOffset=" + writingOffset - + ", starting_window=" + Codec.getStringWindowId(starting_window) + ", ending_window=" + Codec.getStringWindowId(ending_window) - + ", refCount=" + refCount.get() + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) - + ", future=" + (future == null ? "null" : future.isDone() ? "Done" : future.isCancelled() ? "Cancelled" : future) + '}'; + final String future = this.future == null ? "null" : this.future.isDone() ? "Done" : + this.future.isCancelled() ? "Cancelled" : this.future.toString(); + return getClass().getName() + '@' + Integer.toHexString(hashCode()) + "{identifier=" + identifier + + ", data=" + (data == null ? "null" : data.length) + ", readingOffset=" + readingOffset + + ", writingOffset=" + writingOffset + ", starting_window=" + Codec.getStringWindowId(starting_window) + + ", ending_window=" + Codec.getStringWindowId(ending_window) + ", refCount=" + refCount.get() + + ", uniqueIdentifier=" + uniqueIdentifier + ", next=" + (next == null ? "null" : next.identifier) + + ", future=" + future + '}'; } } @@ -968,8 +988,10 @@ public class DataList if (nextOffset.integer + size <= da.writingOffset) { current = new SerializedData(buffer, readOffset, size + nextOffset.integer - readOffset); current.dataOffset = nextOffset.integer; - //if (buffer[current.dataOffset] == MessageType.BEGIN_WINDOW_VALUE || buffer[current.dataOffset] == MessageType.END_WINDOW_VALUE) { - // Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, current.length - current.dataOffset + current.offset); + //final byte messageType = buffer[current.dataOffset]; + //if (messageType == MessageType.BEGIN_WINDOW_VALUE || messageType == MessageType.END_WINDOW_VALUE) { + // final int length = current.length - current.dataOffset + current.offset; + // Tuple t = Tuple.getTuple(current.buffer, current.dataOffset, length); // logger.debug("next t = {}", t); //} return true; @@ -993,9 +1015,9 @@ public class DataList } /** - * Removes from the underlying collection the last element returned by the iterator (optional operation). This method can be called only once per call to - * next. The behavior of an iterator is unspecified if the underlying collection is modified while the iteration is in progress in any way other than by - * calling this method. + * Removes from the underlying collection the last element returned by the iterator (optional operation). This + * method can be called only once per call to next. The behavior of an iterator is unspecified if the underlying + * collection is modified while the iteration is in progress in any way other than by calling this method. */ @Override public void remove() http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java index 4add008..a6a1fab 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataListener.java @@ -18,10 +18,10 @@ */ package com.datatorrent.bufferserver.internal; -import com.datatorrent.bufferserver.util.BitVector; - import java.util.Collection; +import com.datatorrent.bufferserver.util.BitVector; + /** * * Waits for data to be added to the buffer server and then acts on it<p> @@ -32,17 +32,17 @@ import java.util.Collection; */ public interface DataListener { - public static final BitVector NULL_PARTITION = new BitVector(0, 0); + BitVector NULL_PARTITION = new BitVector(0, 0); /** */ - public boolean addedData(); + boolean addedData(); /** * * @param partitions * @return int */ - public int getPartitions(Collection<BitVector> partitions); + int getPartitions(Collection<BitVector> partitions); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java index 6ba7b64..af47f23 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/FastDataList.java @@ -54,8 +54,7 @@ public class FastDataList extends DataList size = last.data[processingOffset]; size |= (last.data[processingOffset + 1] << 8); // logger.debug("read item = {} of size = {} at offset = {}", item++, size, processingOffset); - } - else { + } else { if (writeOffset == last.data.length) { processingOffset = 0; size = 0; @@ -73,8 +72,7 @@ public class FastDataList extends DataList if (last.starting_window == -1) { last.starting_window = baseSeconds | btw.getWindowId(); last.ending_window = last.starting_window; - } - else { + } else { last.ending_window = baseSeconds | btw.getWindowId(); } break; @@ -83,11 +81,13 @@ public class FastDataList extends DataList Tuple rwt = Tuple.getTuple(last.data, processingOffset, size); baseSeconds = (long)rwt.getBaseSeconds() << 32; break; + + default: + break; } processingOffset += size; size = 0; - } - else { + } else { if (writeOffset == last.data.length) { processingOffset = 0; size = 0; http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java index f867d69..9856829 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/LogicalNode.java @@ -157,7 +157,8 @@ public class LogicalNode implements DataListener public void catchUp() { long lBaseSeconds = (long)iterator.getBaseSeconds() << 32; - logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), Codec.getStringWindowId(lBaseSeconds)); + logger.debug("BaseSeconds = {} and lBaseSeconds = {}", Codec.getStringWindowId(baseSeconds), + Codec.getStringWindowId(lBaseSeconds)); if (lBaseSeconds > baseSeconds) { baseSeconds = lBaseSeconds; } @@ -193,13 +194,8 @@ public class LogicalNode implements DataListener case MessageType.BEGIN_WINDOW_VALUE: tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset); - logger.debug("{}->{} condition {} =? {}", - new Object[] { - upstream, - group, - Codec.getStringWindowId(baseSeconds | tuple.getWindowId()), - Codec.getStringWindowId(skipWindowId) - }); + logger.debug("{}->{} condition {} =? {}", upstream, group, + Codec.getStringWindowId(baseSeconds | tuple.getWindowId()), Codec.getStringWindowId(skipWindowId)); if ((baseSeconds | tuple.getWindowId()) > skipWindowId) { logger.debug("caught up {}->{} skipping {} payload tuples", upstream, group, skippedPayloadTuples); ready = GiveAll.getInstance().distribute(physicalNodes, data); @@ -212,10 +208,12 @@ public class LogicalNode implements DataListener case MessageType.CODEC_STATE_VALUE: case MessageType.END_STREAM_VALUE: ready = GiveAll.getInstance().distribute(physicalNodes, data); - logger.debug("Message {} was distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), physicalNodes); + logger.debug("Message {} was distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), + physicalNodes); break; default: - logger.debug("Message {} was not distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), physicalNodes); + logger.debug("Message {} was not distributed to {}", MessageType.valueOf(data.buffer[data.dataOffset]), + physicalNodes); } } } catch (InterruptedException ie) { @@ -252,7 +250,8 @@ public class LogicalNode implements DataListener break; case MessageType.RESET_WINDOW_VALUE: - Tuple resetWindow = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset); + final int length = data.length - data.dataOffset + data.offset; + Tuple resetWindow = Tuple.getTuple(data.buffer, data.dataOffset, length); baseSeconds = (long)resetWindow.getBaseSeconds() << 32; ready = GiveAll.getInstance().distribute(physicalNodes, data); break; @@ -266,9 +265,10 @@ public class LogicalNode implements DataListener } else { while (ready && iterator.hasNext()) { SerializedData data = iterator.next(); + final int length = data.length - data.dataOffset + data.offset; switch (data.buffer[data.dataOffset]) { case MessageType.PAYLOAD_VALUE: - Tuple tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset); + Tuple tuple = Tuple.getTuple(data.buffer, data.dataOffset, length); int value = tuple.getPartition(); for (BitVector bv : partitions) { if (bv.matches(value)) { @@ -283,7 +283,7 @@ public class LogicalNode implements DataListener break; case MessageType.RESET_WINDOW_VALUE: - tuple = Tuple.getTuple(data.buffer, data.dataOffset, data.length - data.dataOffset + data.offset); + tuple = Tuple.getTuple(data.buffer, data.dataOffset, length); baseSeconds = (long)tuple.getBaseSeconds() << 32; ready = GiveAll.getInstance().distribute(physicalNodes, data); break; @@ -344,7 +344,8 @@ public class LogicalNode implements DataListener @Override public String toString() { - return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + ", iterator=" + iterator + '}'; + return "LogicalNode{" + "upstream=" + upstream + ", group=" + group + ", partitions=" + partitions + + ", iterator=" + iterator + '}'; } private static final Logger logger = LoggerFactory.getLogger(LogicalNode.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java index 880d444..424a51a 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/PhysicalNode.java @@ -79,8 +79,7 @@ public class PhysicalNode if (client.write(d.buffer, d.offset, d.length)) { return true; } - } - else { + } else { if (client.send(d.buffer, d.offset, d.length)) { return true; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java index 3e7f23f..cb1ad5f 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/DataTuple.java @@ -35,13 +35,13 @@ public class DataTuple extends Tuple @Override public int getWindowId() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getPartition() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override @@ -53,13 +53,13 @@ public class DataTuple extends Tuple @Override public int getBaseSeconds() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getWindowWidth() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } public static byte[] getSerializedTuple(byte type, Slice f) http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java index f034f04..3c3f184 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/EmptyTuple.java @@ -41,31 +41,31 @@ public class EmptyTuple extends Tuple @Override public int getWindowId() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getPartition() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public Slice getData() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getBaseSeconds() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getWindowWidth() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } public static byte[] getSerializedTuple(byte value) http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java index a815334..ea49077 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/GenericRequestTuple.java @@ -25,8 +25,6 @@ import org.slf4j.LoggerFactory; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.netlet.util.VarInt; -import static com.datatorrent.bufferserver.packet.Tuple.CLASSIC_VERSION; -import static com.datatorrent.bufferserver.packet.Tuple.writeString; /** * <p>GenericRequestTuple class.</p> @@ -70,12 +68,10 @@ public class GenericRequestTuple extends RequestTuple } version = new String(buffer, dataOffset, idlen); dataOffset += idlen; - } - else if (idlen == 0) { + } else if (idlen == 0) { version = EMPTY_STRING; dataOffset++; - } - else { + } else { return; } /* @@ -87,12 +83,10 @@ public class GenericRequestTuple extends RequestTuple } identifier = new String(buffer, dataOffset, idlen); dataOffset += idlen; - } - else if (idlen == 0) { + } else if (idlen == 0) { identifier = EMPTY_STRING; dataOffset++; - } - else { + } else { return; } @@ -105,8 +99,7 @@ public class GenericRequestTuple extends RequestTuple } valid = true; - } - catch (NumberFormatException nfe) { + } catch (NumberFormatException nfe) { logger.warn("Unparseable Tuple", nfe); } } @@ -166,7 +159,8 @@ public class GenericRequestTuple extends RequestTuple @Override public String toString() { - return getClass().getSimpleName() + "{" + "version=" + version + ", identifier=" + identifier + ", windowId=" + Codec.getStringWindowId((long)baseSeconds | windowId) + '}'; + return getClass().getSimpleName() + "{" + "version=" + version + ", identifier=" + identifier + ", windowId=" + + Codec.getStringWindowId((long)baseSeconds | windowId) + '}'; } private static final Logger logger = LoggerFactory.getLogger(GenericRequestTuple.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java index 02102da..3c0ec2c 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/MessageType.java @@ -25,7 +25,20 @@ package com.datatorrent.bufferserver.packet; */ public enum MessageType { - NO_MESSAGE(0), PAYLOAD(1), RESET_WINDOW(2), BEGIN_WINDOW(3), END_WINDOW(4), END_STREAM(5), PUBLISHER_REQUEST(6), SUBSCRIBER_REQUEST(7), PURGE_REQUEST(8), RESET_REQUEST(9), CHECKPOINT(10), CODEC_STATE(11), NO_MESSAGE_ODD(127); + NO_MESSAGE(0), + PAYLOAD(1), + RESET_WINDOW(2), + BEGIN_WINDOW(3), + END_WINDOW(4), + END_STREAM(5), + PUBLISHER_REQUEST(6), + SUBSCRIBER_REQUEST(7), + PURGE_REQUEST(8), + RESET_REQUEST(9), + CHECKPOINT(10), + CODEC_STATE(11), + NO_MESSAGE_ODD(127); + public static final byte NO_MESSAGE_VALUE = 0; public static final byte PAYLOAD_VALUE = 1; public static final byte RESET_WINDOW_VALUE = 2; @@ -81,7 +94,7 @@ public enum MessageType private final int value; - private MessageType(int value) + MessageType(int value) { this.value = value; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java index e757097..256fc05 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PayloadTuple.java @@ -51,7 +51,7 @@ public class PayloadTuple extends Tuple @Override public int getWindowId() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override @@ -69,13 +69,13 @@ public class PayloadTuple extends Tuple @Override public int getBaseSeconds() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getWindowWidth() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } public static byte[] getSerializedTuple(int partition, int size) http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java index bead7f3..6a9ba39 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/PublishRequestTuple.java @@ -30,9 +30,10 @@ public class PublishRequestTuple extends GenericRequestTuple super(array, offset, len); } - public static byte[] getSerializedRequest(String version, String identifier, long startingWindowId) + public static byte[] getSerializedRequest(final String version, final String identifier, final long startingWindowId) { - return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId, MessageType.PUBLISHER_REQUEST_VALUE); + return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId, + MessageType.PUBLISHER_REQUEST_VALUE); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java index 9fe7859..53505b4 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/RequestTuple.java @@ -43,19 +43,19 @@ public abstract class RequestTuple extends Tuple @Override public int getPartition() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public Slice getData() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getWindowWidth() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } public abstract void parse(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java index 66fbfcd..17ca585 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetRequestTuple.java @@ -30,9 +30,10 @@ public class ResetRequestTuple extends GenericRequestTuple super(array, offset, length); } - public static byte[] getSerializedRequest(String version, String identifier, long startingWindowId) + public static byte[] getSerializedRequest(final String version, final String identifier, final long startingWindowId) { - return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId, MessageType.RESET_REQUEST_VALUE); + return GenericRequestTuple.getSerializedRequest(version, identifier, startingWindowId, + MessageType.RESET_REQUEST_VALUE); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java index abeceb3..6045416 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/ResetWindowTuple.java @@ -43,19 +43,19 @@ public class ResetWindowTuple extends Tuple @Override public int getWindowId() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getPartition() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public Slice getData() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java index 416cee9..b63487b 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/SubscribeRequestTuple.java @@ -61,12 +61,10 @@ public class SubscribeRequestTuple extends RequestTuple } version = new String(buffer, dataOffset, idlen); dataOffset += idlen; - } - else if (idlen == 0) { + } else if (idlen == 0) { version = EMPTY_STRING; dataOffset++; - } - else { + } else { return; } /* @@ -78,12 +76,10 @@ public class SubscribeRequestTuple extends RequestTuple } identifier = new String(buffer, dataOffset, idlen); dataOffset += idlen; - } - else if (idlen == 0) { + } else if (idlen == 0) { identifier = EMPTY_STRING; dataOffset++; - } - else { + } else { return; } @@ -103,12 +99,10 @@ public class SubscribeRequestTuple extends RequestTuple } streamType = new String(buffer, dataOffset, idlen); dataOffset += idlen; - } - else if (idlen == 0) { + } else if (idlen == 0) { streamType = EMPTY_STRING; dataOffset++; - } - else { + } else { return; } /* @@ -120,12 +114,10 @@ public class SubscribeRequestTuple extends RequestTuple } upstreamIdentifier = new String(buffer, dataOffset, idlen); dataOffset += idlen; - } - else if (idlen == 0) { + } else if (idlen == 0) { upstreamIdentifier = EMPTY_STRING; dataOffset++; - } - else { + } else { return; } /* @@ -139,8 +131,7 @@ public class SubscribeRequestTuple extends RequestTuple if (mask > 0) { while (buffer[dataOffset++] < 0) { } - } - else { + } else { /* mask cannot be zero */ return; } @@ -149,8 +140,7 @@ public class SubscribeRequestTuple extends RequestTuple partitions[i] = readVarInt(dataOffset, limit); if (partitions[i] == -1) { return; - } - else { + } else { while (buffer[dataOffset++] < 0) { } } @@ -165,8 +155,7 @@ public class SubscribeRequestTuple extends RequestTuple } valid = true; - } - catch (NumberFormatException nfe) { + } catch (NumberFormatException nfe) { logger.warn("Unparseable Tuple", nfe); } } @@ -246,15 +235,9 @@ public class SubscribeRequestTuple extends RequestTuple return bufferSize; } - public static byte[] getSerializedRequest( - String version, - String id, - String down_type, - String upstream_id, - int mask, - Collection<Integer> partitions, - long startingWindowId, - int bufferSize) + public static byte[] getSerializedRequest(final String version, final String id, final String down_type, + final String upstream_id, final int mask, final Collection<Integer> partitions, final long startingWindowId, + final int bufferSize) { byte[] array = new byte[4096]; int offset = 0; @@ -263,10 +246,7 @@ public class SubscribeRequestTuple extends RequestTuple array[offset++] = MessageType.SUBSCRIBER_REQUEST_VALUE; /* write the version */ - if (version == null) { - version = CLASSIC_VERSION; - } - offset = Tuple.writeString(version, array, offset); + offset = Tuple.writeString(version == null ? CLASSIC_VERSION : version, array, offset); /* write the identifier */ offset = Tuple.writeString(id, array, offset); @@ -288,8 +268,7 @@ public class SubscribeRequestTuple extends RequestTuple /* write the partitions */ if (partitions == null || partitions.isEmpty()) { offset = VarInt.write(0, array, offset); - } - else { + } else { offset = VarInt.write(partitions.size(), array, offset); offset = VarInt.write(mask, array, offset); for (int i : partitions) { @@ -306,7 +285,11 @@ public class SubscribeRequestTuple extends RequestTuple @Override public String toString() { - return "SubscribeRequestTuple{" + "version=" + version + ", identifier=" + identifier + ", windowId=" + Codec.getStringWindowId((long)baseSeconds | windowId) + ", type=" + streamType + ", upstreamIdentifier=" + upstreamIdentifier + ", mask=" + mask + ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + ", bufferSize=" + bufferSize + '}'; + return "SubscribeRequestTuple{" + "version=" + version + ", identifier=" + identifier + + ", windowId=" + Codec.getStringWindowId((long)baseSeconds | windowId) + ", type=" + streamType + + ", upstreamIdentifier=" + upstreamIdentifier + ", mask=" + mask + + ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + + ", bufferSize=" + bufferSize + '}'; } private static final Logger logger = LoggerFactory.getLogger(SubscribeRequestTuple.class); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java index 1c45cb2..408e8a2 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/Tuple.java @@ -126,33 +126,28 @@ public abstract class Tuple byte tmp = buffer[offset++]; if (tmp >= 0) { return tmp; - } - else if (offset < limit) { + } else if (offset < limit) { int integer = tmp & 0x7f; tmp = buffer[offset++]; if (tmp >= 0) { return integer | tmp << 7; - } - else if (offset < limit) { + } else if (offset < limit) { integer |= (tmp & 0x7f) << 7; tmp = buffer[offset++]; if (tmp >= 0) { return integer | tmp << 14; - } - else if (offset < limit) { + } else if (offset < limit) { integer |= (tmp & 0x7f) << 14; tmp = buffer[offset++]; if (tmp >= 0) { return integer | tmp << 21; - } - else if (offset < limit) { + } else if (offset < limit) { integer |= (tmp & 0x7f) << 21; tmp = buffer[offset++]; if (tmp >= 0) { return integer | tmp << 28; - } - else { + } else { throw new NumberFormatException("Invalid varint at location " + offset + " => " + Arrays.toString(Arrays.copyOfRange(buffer, offset, limit))); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java index 07c2f85..014827c 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/packet/WindowIdTuple.java @@ -49,25 +49,25 @@ public class WindowIdTuple extends Tuple @Override public int getPartition() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public Slice getData() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getBaseSeconds() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override public int getWindowWidth() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + throw new UnsupportedOperationException("Not supported yet."); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java index 25d3742..77aa56f 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/AbstractPolicy.java @@ -25,21 +25,13 @@ import com.datatorrent.bufferserver.util.SerializedData; /** * - * The base class for specifying partition policies, implements interface {@link com.datatorrent.bufferserver.policy.Policy}<p> - * <br> + * The base class for specifying partition policies * * @since 0.3.2 */ public class AbstractPolicy implements Policy { - - /** - * - * - * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s - * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send - */ - + @Override public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException { throw new UnsupportedOperationException("Not supported yet."); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java index 88825ce..0c2819d 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/GiveAll.java @@ -33,7 +33,7 @@ import com.datatorrent.bufferserver.util.SerializedData; */ public class GiveAll extends AbstractPolicy { - final static GiveAll instance = new GiveAll(); + private static final GiveAll instance = new GiveAll(); /** * http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java index 88145fc..c4143be 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/LeastBusy.java @@ -25,9 +25,8 @@ import com.datatorrent.bufferserver.util.SerializedData; /** * - * Implements load balancing by sending the tuple to the least busy partition<p> - * <br> - * Basic load balancing policy. Extends the base class {@link com.datatorrent.bufferserver.policy.AbstractPolicy}<br> + * Implements load balancing by sending the tuple to the least busy partition. + * Basic load balancing policy. Extends the base class {@link AbstractPolicy}<br> * * @since 0.3.2 */ @@ -51,20 +50,13 @@ public class LeastBusy extends AbstractPolicy { } - /** - * - * - * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s - * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send - */ @Override public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException { PhysicalNode theOne = null; for (PhysicalNode node: nodes) { - if (theOne == null - || node.getProcessedMessageCount() < theOne.getProcessedMessageCount()) { + if (theOne == null || node.getProcessedMessageCount() < theOne.getProcessedMessageCount()) { theOne = node; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java index 1393667..0080ce0 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/Policy.java @@ -33,13 +33,13 @@ import com.datatorrent.bufferserver.util.SerializedData; public interface Policy { /** + * Distributes {@code data} to the set of {@code nodes} * - * - * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s - * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send + * @param nodes Set of downstream {@link PhysicalNode} + * @param data Opaque {@link SerializedData} to be send * @throws InterruptedException + * @return {@code true} if successful, otherwise {@code false} */ - - public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException; + boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java index aebe450..4f700a6 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RandomOne.java @@ -25,9 +25,8 @@ import com.datatorrent.bufferserver.util.SerializedData; /** * - * Randomly distributes tuples to downstream nodes. A random load balancing policy<p> - * <br> - * A generic random load balancing policy. Extends the base class {@link com.datatorrent.bufferserver.policy.AbstractPolicy}<br> + * Randomly distributes tuples to downstream nodes. A random load balancing policy. + * A generic random load balancing policy. Extends the base class {@link AbstractPolicy} * * @since 0.3.2 */ @@ -51,13 +50,6 @@ public class RandomOne extends AbstractPolicy { } - /** - * - * - * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s - * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send - */ - @Override public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java index 1185815..7291b7d 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/policy/RoundRobin.java @@ -25,9 +25,10 @@ import com.datatorrent.bufferserver.util.SerializedData; /** * - * Distributes to downstream nodes in a roundrobin fashion. A round robin load balancing policy<p> + * Distributes to downstream nodes in a round robin fashion. A round robin load balancing policy * <br> - * A round robin load balaning policy. Does not take into account busy/load of a downstream physical node. Extends the base class {@link com.datatorrent.bufferserver.policy.AbstractPolicy}<br> + * A round robin load balaning policy. Does not take into account busy/load of a downstream physical node. Extends + * the base class {@link AbstractPolicy}<br> * <br> * * @since 0.3.2 @@ -44,18 +45,15 @@ public class RoundRobin extends AbstractPolicy index = 0; } - /** - * - * - * @param nodes Set of downstream {@link com.datatorrent.bufferserver.PhysicalNode}s - * @param data Opaque {@link com.datatorrent.bufferserver.util.SerializedData} to be send - */ - @Override public boolean distribute(Set<PhysicalNode> nodes, SerializedData data) throws InterruptedException { int size = nodes.size(); - if (size > 0) { // why do i need to do this check? synchronization issues? because if there is no one interested, the logical group should not exist! + /* + * why do i need to do this check? synchronization issues? because if there is no one interested, + * the logical group should not exist! + */ + if (size > 0) { index %= size; int count = index++; /* http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index c4cdf5b..03d96ee 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -91,7 +91,10 @@ public class Server implements ServerListener this.blockSize = blocksize; this.numberOfCacheBlocks = numberOfCacheBlocks; serverHelperExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("ServerHelper")); - storageHelperExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(numberOfCacheBlocks), new NameableThreadFactory("StorageHelper"), new ThreadPoolExecutor.CallerRunsPolicy()); + final ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(numberOfCacheBlocks); + final NameableThreadFactory threadFactory = new NameableThreadFactory("StorageHelper"); + storageHelperExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, workQueue, threadFactory, + new ThreadPoolExecutor.CallerRunsPolicy()); } public void setSpoolStorage(Storage storage) @@ -168,8 +171,8 @@ public class Server implements ServerListener private final HashMap<String, DataList> publisherBuffers = new HashMap<String, DataList>(); private final ConcurrentHashMap<String, LogicalNode> subscriberGroups = new ConcurrentHashMap<String, LogicalNode>(); - private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<String, AbstractLengthPrependerClient>(); - private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels = new ConcurrentHashMap<String, AbstractLengthPrependerClient>(); + private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<String, AbstractLengthPrependerClient> subscriberChannels = new ConcurrentHashMap<>(); private final int blockSize; private final int numberOfCacheBlocks; @@ -251,7 +254,9 @@ public class Server implements ServerListener dl = publisherBuffers.get(upstream_identifier); //logger.debug("old list = {}", dl); } else { - dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : new DataList(upstream_identifier, blockSize, numberOfCacheBlocks); + dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? + new FastDataList(upstream_identifier, blockSize, numberOfCacheBlocks) : + new DataList(upstream_identifier, blockSize, numberOfCacheBlocks); publisherBuffers.put(upstream_identifier, dl); //logger.debug("new list = {}", dl); } @@ -305,7 +310,9 @@ public class Server implements ServerListener throw new RuntimeException(ie); } } else { - dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? new FastDataList(identifier, blockSize, numberOfCacheBlocks) : new DataList(identifier, blockSize, numberOfCacheBlocks); + dl = Tuple.FAST_VERSION.equals(request.getVersion()) ? + new FastDataList(identifier, blockSize, numberOfCacheBlocks) : + new DataList(identifier, blockSize, numberOfCacheBlocks); publisherBuffers.put(identifier, dl); } dl.setSecondaryStorage(storage, storageHelperExecutor); @@ -439,9 +446,11 @@ public class Server implements ServerListener // bufferSize = 16 * 1024; // } if (subscriberRequest.getVersion().equals(Tuple.FAST_VERSION)) { - subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize); + subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), + subscriberRequest.getPartitions(), bufferSize); } else { - subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), subscriberRequest.getPartitions(), bufferSize) + subscriber = new Subscriber(subscriberRequest.getStreamType(), subscriberRequest.getMask(), + subscriberRequest.getPartitions(), bufferSize) { @Override public int readSize() @@ -515,7 +524,8 @@ public class Server implements ServerListener @Override public void onMessage(byte[] buffer, int offset, int size) { - logger.warn("Received data when no data is expected: {}", Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size))); + logger.warn("Received data when no data is expected: {}", + Arrays.toString(Arrays.copyOfRange(buffer, offset, offset + size))); } @Override @@ -535,7 +545,8 @@ public class Server implements ServerListener @Override public String toString() { - return "Server.Subscriber{" + "type=" + type + ", mask=" + mask + ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + '}'; + return "Server.Subscriber{" + "type=" + type + ", mask=" + mask + + ", partitions=" + (partitions == null ? "null" : Arrays.toString(partitions)) + '}'; } private volatile boolean torndown; @@ -600,10 +611,12 @@ public class Server implements ServerListener } /** - * Schedules a task to conditionally resume I/O channel read operations. No-op if {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} - * is already set in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. Otherwise, calls {@linkplain #read(int) read(0)} - * to process data left in the Publisher read buffer and registers {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} in the key - * {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. + * Schedules a task to conditionally resume I/O channel read operations. + * No-op if {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} + * is already set in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. + * Otherwise, calls {@linkplain #read(int) read(0)} to process data + * left in the Publisher read buffer and registers {@linkplain java.nio.channels.SelectionKey#OP_READ OP_READ} + * in the key {@linkplain java.nio.channels.SelectionKey#interestOps() interestOps}. * @return true */ @Override @@ -770,8 +783,9 @@ public class Server implements ServerListener */ /** - * since the publisher server died, the queue which it was using would stop pumping the data unless a new publisher comes up with the same name. We leave - * it to the stream to decide when to bring up a new node with the same identifier as the one which just died. + * since the publisher server died, the queue which it was using would stop pumping the data unless + * a new publisher comes up with the same name. We leave it to the stream to decide when to bring up a new node + * with the same identifier as the one which just died. */ if (publisherChannels.containsValue(this)) { final Iterator<Entry<String, AbstractLengthPrependerClient>> i = publisherChannels.entrySet().iterator(); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java index 3ddc77f..02ba340 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/storage/DiskStorage.java @@ -66,8 +66,7 @@ public class DiskStorage implements Storage Character c = name.charAt(i); if (Character.isLetterOrDigit(c)) { sb.append(c); - } - else { + } else { sb.append('-'); } } @@ -90,30 +89,25 @@ public class DiskStorage implements Storage synchronized (this) { lUniqueIdentifier = ++this.uniqueIdentifier; } + } else { + throw new IllegalStateException("Collision in identifier name, please ensure that the slug for " + + "the identifiers is different"); } - else { - throw new IllegalStateException("Collission in identifier name, please ensure that the slug for the identifiers is differents"); - } - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } - } - else { + } else { throw new IllegalStateException("Identity file is hijacked!"); } - } - else { + } else { if (directory.mkdir()) { File identity = new File(directory, "identity"); try { Files.write(identifier.getBytes(), identity); - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } - } - else { + } else { throw new RuntimeException("directory " + directory.getAbsolutePath() + " could not be created!"); } @@ -122,8 +116,7 @@ public class DiskStorage implements Storage try { return writeFile(bytes, startingOffset, endingOffset, directory, lUniqueIdentifier); - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } } @@ -144,24 +137,20 @@ public class DiskStorage implements Storage if (!deletionFile.delete()) { throw new RuntimeException("File " + deletionFile.getPath() + " could not be deleted!"); } - } - else { + } else { throw new RuntimeException("File " + deletionFile.getPath() + " either is non existent or not a file!"); } + } else { + throw new RuntimeException("Collision in the identifier name, please ensure that the slugs for " + + "the identifiers are different"); } - else { - throw new RuntimeException("Collission in identifier name, please ensure that the slug for the identifiers is differents"); - } - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } - } - else { + } else { throw new RuntimeException(identityFile + " is not a file!"); } - } - else { + } else { throw new RuntimeException("directory " + directory.getPath() + " does not exist!"); } } @@ -180,37 +169,31 @@ public class DiskStorage implements Storage File filename = new File(directory, String.valueOf(uniqueIdentifier)); if (filename.exists() && filename.isFile()) { return Files.toByteArray(filename); - } - else { + } else { throw new RuntimeException("File " + filename.getPath() + " either is non existent or not a file!"); } + } else { + throw new RuntimeException("Collision in the identifier name," + + " please ensure that the slugs for the identifiers [" + identifier + "], and [" + new String(stored) + + "] are different."); } - else { - throw new RuntimeException("Collision in identifier name, please ensure that the slugs for the identifiers [" + identifier + "], and [" + new String(stored) + "] are different."); - } - } - catch (IOException ex) { + } catch (IOException ex) { throw new RuntimeException(ex); } - } - else { + } else { throw new RuntimeException(identityFile + " is not a file!"); } - } - else { + } else { throw new RuntimeException("directory " + directory.getPath() + " does not exist!"); } } - protected int writeFile(byte[] bytes, int startingOffset, int endingOffset, File directory, final int number) throws IOException + protected int writeFile(final byte[] bytes, final int startingOffset, final int endingOffset, final File directory, + final int number) throws IOException { - FileOutputStream stream = new FileOutputStream(new File(directory, String.valueOf(number))); - try { + try (FileOutputStream stream = new FileOutputStream(new File(directory, String.valueOf(number)))) { stream.write(bytes, startingOffset, endingOffset - startingOffset); } - finally { - stream.close(); - } return number; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java index 16f443f..0bc065e 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/SerializedData.java @@ -21,8 +21,8 @@ package com.datatorrent.bufferserver.util; import com.datatorrent.netlet.util.Slice; /** - * Wrapper for a {@code byte[]}, which provides read-only access and can "reveal" a partial slice of the underlying array.<p> - * + * Wrapper for a {@code byte[]}, which provides read-only access and can "reveal" a partial slice of the underlying + * array.<p> * * <b>Note:</b> Multibyte accessors all use big-endian order. * http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java index 0b2b67a..124cc5f 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/System.java @@ -18,12 +18,12 @@ */ package com.datatorrent.bufferserver.util; -import com.datatorrent.netlet.DefaultEventLoop; -import com.datatorrent.netlet.EventLoop; - import java.io.IOException; import java.util.HashMap; +import com.datatorrent.netlet.DefaultEventLoop; +import com.datatorrent.netlet.EventLoop; + /** * <p>System class.</p> * @@ -40,8 +40,7 @@ public class System if (el == null) { try { eventloops.put(identifier, el = DefaultEventLoop.createEventLoop(identifier)); - } - catch (IOException io) { + } catch (IOException io) { throw new RuntimeException(io); } } @@ -55,8 +54,7 @@ public class System DefaultEventLoop el = eventloops.get(identifier); if (el == null) { throw new RuntimeException("System with " + identifier + " not setup!"); - } - else { + } else { el.stop(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java index 6f12cc4..d8583fb 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/util/VarInt.java @@ -38,18 +38,15 @@ public class VarInt extends com.datatorrent.netlet.util.VarInt int result = tmp & 0x7f; if ((tmp = data[offset++]) >= 0) { result |= tmp << 7; - } - else { + } else { result |= (tmp & 0x7f) << 7; if ((tmp = data[offset++]) >= 0) { result |= tmp << 14; - } - else { + } else { result |= (tmp & 0x7f) << 14; if ((tmp = data[offset++]) >= 0) { result |= tmp << 21; - } - else { + } else { result |= (tmp & 0x7f) << 21; result |= (tmp = data[offset++]) << 28; if (tmp < 0) { http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java index ee56e4d..234fb12 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/client/SubscriberTest.java @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.testng.Assert; + import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -40,6 +40,9 @@ import com.datatorrent.bufferserver.support.Subscriber; import com.datatorrent.bufferserver.util.Codec; import com.datatorrent.netlet.DefaultEventLoop; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + /** * */ @@ -57,8 +60,7 @@ public class SubscriberTest try { eventloopServer = DefaultEventLoop.createEventLoop("server"); eventloopClient = DefaultEventLoop.createEventLoop("client"); - } - catch (IOException ioe) { + } catch (IOException ioe) { throw new RuntimeException(ioe); } eventloopServer.start(); @@ -66,7 +68,8 @@ public class SubscriberTest instance = new Server(0, 64, 2); address = instance.run(eventloopServer); - assert (address instanceof InetSocketAddress); + assertTrue(address instanceof InetSocketAddress); + assertFalse(address.isUnresolved()); } @AfterClass @@ -82,7 +85,7 @@ public class SubscriberTest public void test() throws InterruptedException { final Publisher bsp1 = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp1); + eventloopClient.connect(address, bsp1); final Subscriber bss1 = new Subscriber("MySubscriber") { @@ -104,7 +107,7 @@ public class SubscriberTest } }; - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss1); + eventloopClient.connect(address, bss1); final int baseWindow = 0x7afebabe; bsp1.activate(null, baseWindow, 0); @@ -131,12 +134,9 @@ public class SubscriberTest windowId++; Thread.sleep(5); } - } - catch (InterruptedException ex) { - } - catch (CancelledKeyException cke) { - } - finally { + } catch (InterruptedException | CancelledKeyException e) { + logger.debug("{}", e); + } finally { logger.debug("publisher the middle of window = {}", Codec.getStringWindowId(windowId)); } } @@ -158,7 +158,7 @@ public class SubscriberTest * subscribe from 8 onwards. What we should see is that subscriber gets the new data from 8 onwards. */ final Publisher bsp2 = new Publisher("MyPublisher"); - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bsp2); + eventloopClient.connect(address, bsp2); bsp2.activate(null, 0x7afebabe, 5); final Subscriber bss2 = new Subscriber("MyPublisher") @@ -175,7 +175,7 @@ public class SubscriberTest } }; - eventloopClient.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, bss2); + eventloopClient.connect(address, bss2); bss2.activate(null, "BufferServerOutput/BufferServerSubscriber", "MyPublisher", 0, null, 0x7afebabe00000008L, 0); @@ -200,12 +200,9 @@ public class SubscriberTest windowId++; Thread.sleep(5); } - } - catch (InterruptedException ex) { - } - catch (CancelledKeyException cke) { - } - finally { + } catch (InterruptedException | CancelledKeyException e) { + logger.debug("", e); + } finally { logger.debug("publisher in the middle of window = {}", Codec.getStringWindowId(windowId)); } } @@ -221,7 +218,7 @@ public class SubscriberTest eventloopClient.disconnect(bsp2); eventloopClient.disconnect(bss2); - Assert.assertTrue((bss2.lastPayload.getWindowId() - 8) * 3 <= bss2.tupleCount.get()); + assertTrue((bss2.lastPayload.getWindowId() - 8) * 3 <= bss2.tupleCount.get()); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/799df6c0/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java index 5dc581c..04767ef 100644 --- a/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java +++ b/bufferserver/src/test/java/com/datatorrent/bufferserver/packet/NoMessageTupleTest.java @@ -18,34 +18,17 @@ */ package com.datatorrent.bufferserver.packet; -import junit.framework.TestCase; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.testng.annotations.Test; /** * */ -public class NoMessageTupleTest extends TestCase +public class NoMessageTupleTest { - public NoMessageTupleTest(String testName) - { - super(testName); - } - - @Override - protected void setUp() throws Exception - { - super.setUp(); - } - - @Override - protected void tearDown() throws Exception - { - super.tearDown(); - } - @Test public void testSerDe() { @@ -54,7 +37,7 @@ public class NoMessageTupleTest extends TestCase byte[] serialized = NoMessageTuple.getSerializedTuple(); Tuple t = Tuple.getTuple(serialized, 0, serialized.length); - assert(t.getType() == MessageType.NO_MESSAGE); + assert t.getType() == MessageType.NO_MESSAGE; } private static final Logger logger = LoggerFactory.getLogger(NoMessageTupleTest.class);
