http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java index 902722d..00a38e6 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java @@ -53,21 +53,21 @@ public final class LegacyLocalAddressProvider implements LocalAddressProvider { public String getLocalAddress() { // This method is surprisingly slow: It was causing unit test timeouts, so we memoize the result. if (cached.get() == null) { - Enumeration<NetworkInterface> ifaces; + final Enumeration<NetworkInterface> ifaces; try { ifaces = NetworkInterface.getNetworkInterfaces(); - TreeSet<Inet4Address> sortedAddrs = new TreeSet<>(new AddressComparator()); + final TreeSet<Inet4Address> sortedAddrs = new TreeSet<>(new AddressComparator()); // There is an idea of virtual / subinterfaces exposed by java here. // We're not walking around looking for those because the javadoc says: // "NOTE: can use getNetworkInterfaces()+getInetAddresses() to obtain all IP addresses for this node" while (ifaces.hasMoreElements()) { - NetworkInterface iface = ifaces.nextElement(); + final NetworkInterface iface = ifaces.nextElement(); // if(iface.isUp()) { // leads to slowness and non-deterministic return values, so don't call isUp(). - Enumeration<InetAddress> addrs = iface.getInetAddresses(); + final Enumeration<InetAddress> addrs = iface.getInetAddresses(); while (addrs.hasMoreElements()) { - InetAddress a = addrs.nextElement(); + final InetAddress a = addrs.nextElement(); if (a instanceof Inet4Address) { sortedAddrs.add((Inet4Address) a); } @@ -80,7 +80,7 @@ public final class LegacyLocalAddressProvider implements LocalAddressProvider { } cached.set(sortedAddrs.pollFirst().getHostAddress()); LOG.log(Level.FINE, "Local address is {0}", cached.get()); - } catch (SocketException e) { + } catch (final SocketException e) { throw new WakeRuntimeException("Unable to get local host address", e.getCause()); } @@ -98,14 +98,14 @@ public final class LegacyLocalAddressProvider implements LocalAddressProvider { private static class AddressComparator implements Comparator<Inet4Address> { // get unsigned byte. - private static int u(byte b) { + private static int u(final byte b) { return ((int) b); // & 0xff; } @Override - public int compare(Inet4Address aa, Inet4Address ba) { - byte[] a = aa.getAddress(); - byte[] b = ba.getAddress(); + public int compare(final Inet4Address aa, final Inet4Address ba) { + final byte[] a = aa.getAddress(); + final byte[] b = ba.getAddress(); // local subnet comes after all else. if (a[0] == 127 && b[0] != 127) { return 1;
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java index e6f4a3c..b6d558a 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LocalAddressProviderFactory.java @@ -61,7 +61,7 @@ public final class LocalAddressProviderFactory { LOGGER.log(Level.FINER, "Instantiating default LocalAddressProvider for legacy users."); instance = Tang.Factory.getTang().newInjector().getInstance(LocalAddressProvider.class); LOGGER.log(Level.FINER, "Instantiated default LocalAddressProvider for legacy users."); - } catch (InjectionException e) { + } catch (final InjectionException e) { throw new RuntimeException("Unable to instantiate default LocalAddressProvider for legacy users.", e); } assert (null != instance); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/exception/RemoteRuntimeException.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/exception/RemoteRuntimeException.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/exception/RemoteRuntimeException.java index e23ee3e..1516d8e 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/exception/RemoteRuntimeException.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/exception/RemoteRuntimeException.java @@ -30,7 +30,7 @@ public class RemoteRuntimeException extends RuntimeException { * @param s the detailed message * @param e the cause */ - public RemoteRuntimeException(String s, Throwable e) { + public RemoteRuntimeException(final String s, final Throwable e) { super(s, e); } @@ -39,7 +39,7 @@ public class RemoteRuntimeException extends RuntimeException { * * @param s the detailed message */ - public RemoteRuntimeException(String s) { + public RemoteRuntimeException(final String s) { super(s); } @@ -48,7 +48,7 @@ public class RemoteRuntimeException extends RuntimeException { * * @param e the cause */ - public RemoteRuntimeException(Throwable e) { + public RemoteRuntimeException(final Throwable e) { super(e); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ByteCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ByteCodec.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ByteCodec.java index 5c24dd3..ec107ad 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ByteCodec.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ByteCodec.java @@ -32,7 +32,7 @@ public class ByteCodec implements Codec<byte[]> { * @return the same bytes */ @Override - public byte[] encode(byte[] obj) { + public byte[] encode(final byte[] obj) { return obj; } @@ -43,7 +43,7 @@ public class ByteCodec implements Codec<byte[]> { * @return the same bytes */ @Override - public byte[] decode(byte[] buf) { + public byte[] decode(final byte[] buf) { return buf; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ConnectFutureTask.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ConnectFutureTask.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ConnectFutureTask.java index fc6d73c..5e664af 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ConnectFutureTask.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ConnectFutureTask.java @@ -27,7 +27,7 @@ public class ConnectFutureTask<T> extends FutureTask<T> { private final EventHandler<ConnectFutureTask<T>> handler; - public ConnectFutureTask(Callable<T> callable, EventHandler<ConnectFutureTask<T>> handler) { + public ConnectFutureTask(final Callable<T> callable, final EventHandler<ConnectFutureTask<T>> handler) { super(callable); this.handler = handler; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteIdentifierFactoryImplementation.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteIdentifierFactoryImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteIdentifierFactoryImplementation.java index b403891..ba8f638 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteIdentifierFactoryImplementation.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteIdentifierFactoryImplementation.java @@ -35,12 +35,13 @@ public class DefaultRemoteIdentifierFactoryImplementation extends DefaultIdentif } @SuppressWarnings({"unchecked", "rawtypes"}) - public DefaultRemoteIdentifierFactoryImplementation(Map<String, Class<? extends RemoteIdentifier>> typeToClazzMap) { + public DefaultRemoteIdentifierFactoryImplementation( + final Map<String, Class<? extends RemoteIdentifier>> typeToClazzMap) { super((Map<String, Class<? extends Identifier>>) (Map) typeToClazzMap); } @Override - public RemoteIdentifier getNewInstance(String str) { + public RemoteIdentifier getNewInstance(final String str) { return (RemoteIdentifier) super.getNewInstance(str); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java index 2d63b34..0aa78c4 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerFactory.java @@ -125,7 +125,8 @@ public final class DefaultRemoteManagerFactory implements RemoteManagerFactory { @Override @SuppressWarnings("checkstyle:hiddenfield") - public <T> RemoteManager getInstance(String name, Codec<T> codec, EventHandler<Throwable> errorHandler) { + public <T> RemoteManager getInstance( + final String name, final Codec<T> codec, final EventHandler<Throwable> errorHandler) { return new DefaultRemoteManagerImplementation(name, DefaultRemoteManagerImplementation.UNKNOWN_HOST_NAME, // Indicate to use the localAddressProvider 0, // Indicate to use the tcpPortProvider, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteMessage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteMessage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteMessage.java index 3929415..de2f23b 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteMessage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteMessage.java @@ -37,7 +37,7 @@ class DefaultRemoteMessage<T> implements RemoteMessage<T> { * @param id the remote identifier * @param message the message */ - DefaultRemoteMessage(RemoteIdentifier id, T message) { + DefaultRemoteMessage(final RemoteIdentifier id, final T message) { this.id = id; this.message = message; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java index 7f33847..49b0bac 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultTransportEStage.java @@ -29,7 +29,7 @@ public class DefaultTransportEStage implements EStage<TransportEvent> { } @Override - public void onNext(TransportEvent value) { + public void onNext(final TransportEvent value) { } @Override http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java index 1fad3cd..440b241 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java @@ -41,12 +41,12 @@ public class MultiCodec<T> implements Codec<T> { * * @param clazzToDecoderMap */ - public MultiCodec(Map<Class<? extends T>, Codec<? extends T>> clazzToCodecMap) { - Map<Class<? extends T>, Encoder<? extends T>> clazzToEncoderMap = + public MultiCodec(final Map<Class<? extends T>, Codec<? extends T>> clazzToCodecMap) { + final Map<Class<? extends T>, Encoder<? extends T>> clazzToEncoderMap = new HashMap<Class<? extends T>, Encoder<? extends T>>(); - Map<Class<? extends T>, Decoder<? extends T>> clazzToDecoderMap = + final Map<Class<? extends T>, Decoder<? extends T>> clazzToDecoderMap = new HashMap<Class<? extends T>, Decoder<? extends T>>(); - for (Class<? extends T> clazz : clazzToCodecMap.keySet()) { + for (final Class<? extends T> clazz : clazzToCodecMap.keySet()) { clazzToEncoderMap.put(clazz, clazzToCodecMap.get(clazz)); clazzToDecoderMap.put(clazz, clazzToCodecMap.get(clazz)); } @@ -60,7 +60,7 @@ public class MultiCodec<T> implements Codec<T> { * @param obj */ @Override - public byte[] encode(T obj) { + public byte[] encode(final T obj) { return encoder.encode(obj); } @@ -70,7 +70,7 @@ public class MultiCodec<T> implements Codec<T> { * @param data class name and byte payload */ @Override - public T decode(byte[] data) { + public T decode(final byte[] data) { return decoder.decode(data); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiDecoder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiDecoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiDecoder.java index d757d6a..1df9517 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiDecoder.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiDecoder.java @@ -39,7 +39,7 @@ public class MultiDecoder<T> implements Decoder<T> { * * @param clazzToDecoderMap */ - public MultiDecoder(Map<Class<? extends T>, Decoder<? extends T>> clazzToDecoderMap) { + public MultiDecoder(final Map<Class<? extends T>, Decoder<? extends T>> clazzToDecoderMap) { this.clazzToDecoderMap = clazzToDecoderMap; } @@ -49,21 +49,21 @@ public class MultiDecoder<T> implements Decoder<T> { * @param data class name and byte payload */ @Override - public T decode(byte[] data) { - WakeTuplePBuf tuple; + public T decode(final byte[] data) { + final WakeTuplePBuf tuple; try { tuple = WakeTuplePBuf.parseFrom(data); - } catch (InvalidProtocolBufferException e) { + } catch (final InvalidProtocolBufferException e) { e.printStackTrace(); throw new RemoteRuntimeException(e); } - String className = tuple.getClassName(); - byte[] message = tuple.getData().toByteArray(); - Class<?> clazz; + final String className = tuple.getClassName(); + final byte[] message = tuple.getData().toByteArray(); + final Class<?> clazz; try { clazz = Class.forName(className); - } catch (ClassNotFoundException e) { + } catch (final ClassNotFoundException e) { e.printStackTrace(); throw new RemoteRuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiEncoder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiEncoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiEncoder.java index 4e3489e..e3e3c46 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiEncoder.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiEncoder.java @@ -40,7 +40,7 @@ public class MultiEncoder<T> implements Encoder<T> { * * @param clazzToEncoderMap */ - public MultiEncoder(Map<Class<? extends T>, Encoder<? extends T>> clazzToEncoderMap) { + public MultiEncoder(final Map<Class<? extends T>, Encoder<? extends T>> clazzToEncoderMap) { this.clazzToEncoderMap = clazzToEncoderMap; } @@ -50,13 +50,13 @@ public class MultiEncoder<T> implements Encoder<T> { * @param obj */ @Override - public byte[] encode(T obj) { - Encoder<T> encoder = (Encoder<T>) clazzToEncoderMap.get(obj.getClass()); + public byte[] encode(final T obj) { + final Encoder<T> encoder = (Encoder<T>) clazzToEncoderMap.get(obj.getClass()); if (encoder == null) { throw new RemoteRuntimeException("Encoder for " + obj.getClass() + " not known."); } - WakeTuplePBuf.Builder tupleBuilder = WakeTuplePBuf.newBuilder(); + final WakeTuplePBuf.Builder tupleBuilder = WakeTuplePBuf.newBuilder(); tupleBuilder.setClassName(obj.getClass().getName()); tupleBuilder.setData(ByteString.copyFrom(encoder.encode(obj))); return tupleBuilder.build().toByteArray(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java index d5302f7..d695867 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java @@ -52,7 +52,8 @@ public class OrderedRemoteReceiverStage implements EStage<TransportEvent> { * @param handler the handler of remote events * @param errorHandler the exception handler */ - public OrderedRemoteReceiverStage(EventHandler<RemoteEvent<byte[]>> handler, EventHandler<Throwable> errorHandler) { + public OrderedRemoteReceiverStage( + final EventHandler<RemoteEvent<byte[]>> handler, final EventHandler<Throwable> errorHandler) { this.streamMap = new ConcurrentHashMap<SocketAddress, OrderedEventStream>(); this.pushExecutor = Executors.newCachedThreadPool( @@ -67,7 +68,7 @@ public class OrderedRemoteReceiverStage implements EStage<TransportEvent> { } @Override - public void onNext(TransportEvent value) { + public void onNext(final TransportEvent value) { LOG.log(Level.FINEST, "{0}", value); pushStage.onNext(value); } @@ -82,10 +83,10 @@ public class OrderedRemoteReceiverStage implements EStage<TransportEvent> { // wait for threads to finish for timeout if (!pushExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); - List<Runnable> droppedRunnables = pushExecutor.shutdownNow(); + final List<Runnable> droppedRunnables = pushExecutor.shutdownNow(); LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks."); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.log(Level.WARNING, "Close interrupted"); throw new RemoteRuntimeException(e); } @@ -97,10 +98,10 @@ public class OrderedRemoteReceiverStage implements EStage<TransportEvent> { // wait for threads to finish for timeout if (!pullExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); - List<Runnable> droppedRunnables = pullExecutor.shutdownNow(); + final List<Runnable> droppedRunnables = pullExecutor.shutdownNow(); LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks."); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { LOG.log(Level.WARNING, "Close interrupted"); throw new RemoteRuntimeException(e); } @@ -116,16 +117,16 @@ class OrderedPushEventHandler implements EventHandler<TransportEvent> { private final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap; // per remote address private final ThreadPoolStage<OrderedEventStream> pullStage; - OrderedPushEventHandler(ConcurrentMap<SocketAddress, OrderedEventStream> streamMap, - ThreadPoolStage<OrderedEventStream> pullStage) { + OrderedPushEventHandler(final ConcurrentMap<SocketAddress, OrderedEventStream> streamMap, + final ThreadPoolStage<OrderedEventStream> pullStage) { this.codec = new RemoteEventCodec<byte[]>(new ByteCodec()); this.streamMap = streamMap; this.pullStage = pullStage; } @Override - public void onNext(TransportEvent value) { - RemoteEvent<byte[]> re = codec.decode(value.getData()); + public void onNext(final TransportEvent value) { + final RemoteEvent<byte[]> re = codec.decode(value.getData()); re.setLocalAddress(value.getLocalAddress()); re.setRemoteAddress(value.getRemoteAddress()); @@ -135,7 +136,7 @@ class OrderedPushEventHandler implements EventHandler<TransportEvent> { LOG.log(Level.FINER, "Value length is {0}", value.getData().length); - SocketAddress addr = re.remoteAddress(); + final SocketAddress addr = re.remoteAddress(); OrderedEventStream stream = streamMap.get(re.remoteAddress()); if (stream == null) { stream = new OrderedEventStream(); @@ -154,12 +155,12 @@ class OrderedPullEventHandler implements EventHandler<OrderedEventStream> { private final EventHandler<RemoteEvent<byte[]>> handler; - OrderedPullEventHandler(EventHandler<RemoteEvent<byte[]>> handler) { + OrderedPullEventHandler(final EventHandler<RemoteEvent<byte[]>> handler) { this.handler = handler; } @Override - public void onNext(OrderedEventStream stream) { + public void onNext(final OrderedEventStream stream) { if (LOG.isLoggable(Level.FINER)) { LOG.log(Level.FINER, "{0}", stream); } @@ -183,7 +184,7 @@ class OrderedEventStream { nextSeq = 0; } - synchronized void add(RemoteEvent<byte[]> event) { + synchronized void add(final RemoteEvent<byte[]> event) { queue.add(event); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java index 8f763ae..72ba6d7 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java @@ -48,8 +48,8 @@ public class ProxyEventHandler<T> implements EventHandler<T> { * @param reStage the sender stage * @throws RemoteRuntimeException */ - public ProxyEventHandler(RemoteIdentifier myId, RemoteIdentifier remoteId, String remoteSinkName, - EventHandler<RemoteEvent<T>> handler, RemoteSeqNumGenerator seqGen) { + public ProxyEventHandler(final RemoteIdentifier myId, final RemoteIdentifier remoteId, final String remoteSinkName, + final EventHandler<RemoteEvent<T>> handler, final RemoteSeqNumGenerator seqGen) { LOG.log(Level.FINE, "ProxyEventHandler myId: {0} remoteId: {1} remoteSink: {2} handler: {3}", new Object[]{myId, remoteId, remoteSinkName, handler}); if (!(myId instanceof SocketRemoteIdentifier && remoteId instanceof SocketRemoteIdentifier)) { @@ -69,7 +69,7 @@ public class ProxyEventHandler<T> implements EventHandler<T> { * @param event the event */ @Override - public void onNext(T event) { + public void onNext(final T event) { if (LOG.isLoggable(Level.FINE)) { LOG.log(Level.FINE, "remoteid: {0}\n{1}", new Object[]{remoteId.getSocketAddress(), event.toString()}); } @@ -83,7 +83,7 @@ public class ProxyEventHandler<T> implements EventHandler<T> { * @return a string representation of the object */ public String toString() { - StringBuilder builder = new StringBuilder(); + final StringBuilder builder = new StringBuilder(); builder.append(this.getClass().getName()); builder.append(" remote_id="); builder.append(remoteId.toString()); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java index 51b35cd..b7e6dc3 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEvent.java @@ -45,7 +45,8 @@ public class RemoteEvent<T> { * @param seq the sequence number * @param event the event */ - public RemoteEvent(SocketAddress localAddr, SocketAddress remoteAddr, String src, String sink, long seq, T event) { + public RemoteEvent(final SocketAddress localAddr, final SocketAddress remoteAddr, final String src, + final String sink, final long seq, final T event) { this.localAddr = localAddr; this.remoteAddr = remoteAddr; this.src = src; @@ -86,7 +87,7 @@ public class RemoteEvent<T> { * * @param name the source name */ - public void setSource(String name) { + public void setSource(final String name) { src = name; } @@ -104,7 +105,7 @@ public class RemoteEvent<T> { * * @param name the sink name */ - public void setSink(String name) { + public void setSink(final String name) { sink = name; } @@ -131,7 +132,7 @@ public class RemoteEvent<T> { * * @param addr the local socket address */ - public void setLocalAddress(SocketAddress addr) { + public void setLocalAddress(final SocketAddress addr) { localAddr = addr; } @@ -140,7 +141,7 @@ public class RemoteEvent<T> { * * @param addr the remote socket address */ - public void setRemoteAddress(SocketAddress addr) { + public void setRemoteAddress(final SocketAddress addr) { remoteAddr = addr; } @@ -150,7 +151,7 @@ public class RemoteEvent<T> { * @return a string representation of this object */ public String toString() { - StringBuilder builder = new StringBuilder(); + final StringBuilder builder = new StringBuilder(); builder.append("RemoteEvent"); builder.append(" localAddr="); builder.append(localAddr); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventCodec.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventCodec.java index b2ca25f..b3e2023 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventCodec.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventCodec.java @@ -35,7 +35,7 @@ public class RemoteEventCodec<T> implements Codec<RemoteEvent<T>> { * * @param codec the codec for the event */ - public RemoteEventCodec(Codec<T> codec) { + public RemoteEventCodec(final Codec<T> codec) { encoder = new RemoteEventEncoder<T>(codec); decoder = new RemoteEventDecoder<T>(codec); } @@ -47,7 +47,7 @@ public class RemoteEventCodec<T> implements Codec<RemoteEvent<T>> { * @returns bytes */ @Override - public byte[] encode(RemoteEvent<T> obj) { + public byte[] encode(final RemoteEvent<T> obj) { return encoder.encode(obj); } @@ -58,7 +58,7 @@ public class RemoteEventCodec<T> implements Codec<RemoteEvent<T>> { * @return a remote event object */ @Override - public RemoteEvent<T> decode(byte[] data) { + public RemoteEvent<T> decode(final byte[] data) { return decoder.decode(data); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventComparator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventComparator.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventComparator.java index 83eb6ce..049b4d6 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventComparator.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventComparator.java @@ -26,7 +26,7 @@ import java.util.Comparator; public class RemoteEventComparator<T> implements Comparator<RemoteEvent<T>> { @Override - public int compare(RemoteEvent<T> o1, RemoteEvent<T> o2) { + public int compare(final RemoteEvent<T> o1, final RemoteEvent<T> o2) { if (o1.getSeq() < o2.getSeq()) { return -1; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java index 7abf64e..3ef0a27 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java @@ -37,7 +37,7 @@ public class RemoteEventDecoder<T> implements Decoder<RemoteEvent<T>> { * * @param decoder the decoder of the event */ - public RemoteEventDecoder(Decoder<T> decoder) { + public RemoteEventDecoder(final Decoder<T> decoder) { this.decoder = decoder; } @@ -49,13 +49,13 @@ public class RemoteEventDecoder<T> implements Decoder<RemoteEvent<T>> { * @throws RemoteRuntimeException */ @Override - public RemoteEvent<T> decode(byte[] data) { - WakeMessagePBuf pbuf; + public RemoteEvent<T> decode(final byte[] data) { + final WakeMessagePBuf pbuf; try { pbuf = WakeMessagePBuf.parseFrom(data); return new RemoteEvent<T>(null, null, pbuf.getSource(), pbuf.getSink(), pbuf.getSeq(), decoder.decode(pbuf.getData().toByteArray())); - } catch (InvalidProtocolBufferException e) { + } catch (final InvalidProtocolBufferException e) { throw new RemoteRuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java index d5a777b..db0c78a 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventEncoder.java @@ -37,7 +37,7 @@ public class RemoteEventEncoder<T> implements Encoder<RemoteEvent<T>> { * * @param encoder the encoder of the event */ - public RemoteEventEncoder(Encoder<T> encoder) { + public RemoteEventEncoder(final Encoder<T> encoder) { this.encoder = encoder; } @@ -49,7 +49,7 @@ public class RemoteEventEncoder<T> implements Encoder<RemoteEvent<T>> { * @throws RemoteRuntimeException */ @Override - public byte[] encode(RemoteEvent<T> obj) { + public byte[] encode(final RemoteEvent<T> obj) { if (obj.getSink() == null) { throw new RemoteRuntimeException("Sink stage is null"); } @@ -57,8 +57,8 @@ public class RemoteEventEncoder<T> implements Encoder<RemoteEvent<T>> { throw new RemoteRuntimeException("Event is null"); } - WakeMessagePBuf.Builder builder = WakeMessagePBuf.newBuilder(); - String source = obj.getSource() == null ? "" : obj.getSource(); + final WakeMessagePBuf.Builder builder = WakeMessagePBuf.newBuilder(); + final String source = obj.getSource() == null ? "" : obj.getSource(); builder.setSource(source); builder.setSink(obj.getSink()); builder.setSeq(obj.getSeq()); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverEventHandler.java index c75b30f..bbb1217 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverEventHandler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverEventHandler.java @@ -38,7 +38,7 @@ class RemoteReceiverEventHandler implements EventHandler<TransportEvent> { * * @param handler the upstream handler */ - RemoteReceiverEventHandler(EventHandler<RemoteEvent<byte[]>> handler) { + RemoteReceiverEventHandler(final EventHandler<RemoteEvent<byte[]>> handler) { this.codec = new RemoteEventCodec<byte[]>(new ByteCodec()); this.handler = handler; } @@ -49,8 +49,8 @@ class RemoteReceiverEventHandler implements EventHandler<TransportEvent> { * @param e the event */ @Override - public void onNext(TransportEvent e) { - RemoteEvent<byte[]> re = codec.decode(e.getData()); + public void onNext(final TransportEvent e) { + final RemoteEvent<byte[]> re = codec.decode(e.getData()); re.setLocalAddress(e.getLocalAddress()); re.setRemoteAddress(e.getRemoteAddress()); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverStage.java index 1f19f3d..4ea47ce 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteReceiverStage.java @@ -69,7 +69,7 @@ public class RemoteReceiverStage implements EStage<TransportEvent> { * @param value the event */ @Override - public void onNext(TransportEvent value) { + public void onNext(final TransportEvent value) { LOG.log(Level.FINEST, "{0}", value); stage.onNext(value); } @@ -87,7 +87,7 @@ public class RemoteReceiverStage implements EStage<TransportEvent> { // wait for threads to finish for timeout if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { LOG.log(Level.WARNING, "Executor did not terminate in {0} ms.", shutdownTimeout); - List<Runnable> droppedRunnables = executor.shutdownNow(); + final List<Runnable> droppedRunnables = executor.shutdownNow(); LOG.log(Level.WARNING, "Executor dropped {0} tasks.", droppedRunnables.size()); } } catch (final InterruptedException e) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java index 65b9461..057caac 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java @@ -53,7 +53,7 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { * @param transport the transport to send events * @param executor the executor service used for creating channels */ - RemoteSenderEventHandler(Encoder<T> encoder, Transport transport, ExecutorService executor) { + RemoteSenderEventHandler(final Encoder<T> encoder, final Transport transport, final ExecutorService executor) { this.encoder = new RemoteEventEncoder<T>(encoder); this.transport = transport; this.executor = executor; @@ -61,7 +61,7 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { this.queue = new LinkedBlockingQueue<RemoteEvent<T>>(); } - void setLink(Link<byte[]> link) { + void setLink(final Link<byte[]> link) { LOG.log(Level.FINEST, "thread {0} link {1}", new Object[]{Thread.currentThread(), link}); linkRef.compareAndSet(null, link); consumeQueue(); @@ -74,7 +74,7 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { LOG.log(Level.FINEST, "{0}", event); linkRef.get().write(encoder.encode(event)); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { e.printStackTrace(); throw new RemoteRuntimeException(e); } @@ -87,19 +87,19 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { * @throws RemoteRuntimeException */ @Override - public void onNext(RemoteEvent<T> value) { + public void onNext(final RemoteEvent<T> value) { try { if (linkRef.get() == null) { queue.add(value); - Link<byte[]> link = transport.get(value.remoteAddress()); + final Link<byte[]> link = transport.get(value.remoteAddress()); if (link != null) { LOG.log(Level.FINEST, "transport get link: {0}", link); setLink(link); return; } - ConnectFutureTask<Link<byte[]>> cf = new ConnectFutureTask<Link<byte[]>>( + final ConnectFutureTask<Link<byte[]>> cf = new ConnectFutureTask<Link<byte[]>>( new ConnectCallable(transport, value.localAddress(), value.remoteAddress()), new ConnectEventHandler<T>(this)); executor.submit(cf); @@ -113,7 +113,7 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { } linkRef.get().write(encoder.encode(value)); } - } catch (RemoteRuntimeException ex2) { + } catch (final RemoteRuntimeException ex2) { ex2.printStackTrace(); throw ex2; } @@ -128,7 +128,7 @@ class ConnectCallable implements Callable<Link<byte[]>> { private final SocketAddress localAddress; private final SocketAddress remoteAddress; - ConnectCallable(Transport transport, SocketAddress localAddress, SocketAddress remoteAddress) { + ConnectCallable(final Transport transport, final SocketAddress localAddress, final SocketAddress remoteAddress) { this.transport = transport; this.localAddress = localAddress; this.remoteAddress = remoteAddress; @@ -146,12 +146,12 @@ class ConnectEventHandler<T> implements EventHandler<ConnectFutureTask<Link<byte private final RemoteSenderEventHandler<T> handler; - ConnectEventHandler(RemoteSenderEventHandler<T> handler) { + ConnectEventHandler(final RemoteSenderEventHandler<T> handler) { this.handler = handler; } @Override - public void onNext(ConnectFutureTask<Link<byte[]>> value) { + public void onNext(final ConnectFutureTask<Link<byte[]>> value) { try { handler.setLink(value.get()); } catch (InterruptedException | ExecutionException e) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSeqNumGenerator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSeqNumGenerator.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSeqNumGenerator.java index 5c38471..d30c8ee 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSeqNumGenerator.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSeqNumGenerator.java @@ -35,7 +35,7 @@ public class RemoteSeqNumGenerator { seqMap = new ConcurrentHashMap<SocketAddress, AtomicLong>(); } - public long getNextSeq(SocketAddress addr) { + public long getNextSeq(final SocketAddress addr) { AtomicLong seq = seqMap.get(addr); if (seq == null) { seq = new AtomicLong(0); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SocketRemoteIdentifier.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SocketRemoteIdentifier.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SocketRemoteIdentifier.java index 1ae7478..0914813 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SocketRemoteIdentifier.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/SocketRemoteIdentifier.java @@ -35,7 +35,7 @@ public class SocketRemoteIdentifier implements RemoteIdentifier { * * @param addr the socket address */ - public SocketRemoteIdentifier(InetSocketAddress addr) { + public SocketRemoteIdentifier(final InetSocketAddress addr) { this.addr = addr; } @@ -45,20 +45,20 @@ public class SocketRemoteIdentifier implements RemoteIdentifier { * @param str the string representation * @throws RemoteRuntimeException */ - public SocketRemoteIdentifier(String str) { + public SocketRemoteIdentifier(final String str) { int index = str.indexOf("0:0:0:0:0:0:0:0:"); if (index >= 0) { - String host = str.substring(0, 15); - int port = Integer.parseInt(str.substring(index + 16)); + final String host = str.substring(0, 15); + final int port = Integer.parseInt(str.substring(index + 16)); this.addr = new InetSocketAddress(host, port); } else { index = str.indexOf(":"); if (index <= 0) { throw new RemoteRuntimeException("Invalid name " + str); } - String host = str.substring(0, index); - int port = Integer.parseInt(str.substring(index + 1)); + final String host = str.substring(0, index); + final int port = Integer.parseInt(str.substring(index + 1)); this.addr = new InetSocketAddress(host, port); } } @@ -89,7 +89,7 @@ public class SocketRemoteIdentifier implements RemoteIdentifier { * @return true if the object is the same as the object argument; false, otherwise */ @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { return addr.equals(((SocketRemoteIdentifier) o).getSocketAddress()); } @@ -100,7 +100,7 @@ public class SocketRemoteIdentifier implements RemoteIdentifier { */ @Override public String toString() { - StringBuilder builder = new StringBuilder(); + final StringBuilder builder = new StringBuilder(); builder.append("socket://"); builder.append(addr.getHostString()); builder.append(":"); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/StringCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/StringCodec.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/StringCodec.java index 70f8403..d972a33 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/StringCodec.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/StringCodec.java @@ -32,7 +32,7 @@ public class StringCodec implements Codec<String> { * @return a byte array representation of the string */ @Override - public byte[] encode(String obj) { + public byte[] encode(final String obj) { return obj.getBytes(); } @@ -43,7 +43,7 @@ public class StringCodec implements Codec<String> { * @return a string */ @Override - public String decode(byte[] buf) { + public String decode(final byte[] buf) { return new String(buf); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java index b3e06e6..5ae498a 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Subscription.java @@ -34,7 +34,7 @@ public class Subscription<T> implements AutoCloseable { * @param token the token for finding the subscription * @param handlerContainer the container managing handlers */ - public Subscription(T token, HandlerContainer<T> handlerContainer) { + public Subscription(final T token, final HandlerContainer<T> handlerContainer) { this.token = token; this.container = handlerContainer; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java index 3736017..5f6b15e 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/TransportEvent.java @@ -39,7 +39,7 @@ public class TransportEvent { * @param localAddr the local socket address * @param remoteAddr the remote socket address */ - public TransportEvent(byte[] data, SocketAddress localAddr, SocketAddress remoteAddr) { + public TransportEvent(final byte[] data, final SocketAddress localAddr, final SocketAddress remoteAddr) { this.data = data; this.localAddr = localAddr; this.remoteAddr = remoteAddr; @@ -53,7 +53,7 @@ public class TransportEvent { * @param data * @param link */ - public TransportEvent(byte[] data, Link<byte[]> link) { + public TransportEvent(final byte[] data, final Link<byte[]> link) { this.data = data; this.link = link; if (this.link != null) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Tuple2.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Tuple2.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Tuple2.java index 334f319..13f1d71 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Tuple2.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/Tuple2.java @@ -29,7 +29,7 @@ public class Tuple2<T1, T2> { private final T1 t1; private final T2 t2; - public Tuple2(T1 t1, T2 t2) { + public Tuple2(final T1 t1, final T2 t2) { this.t1 = t1; this.t2 = t2; } @@ -48,8 +48,8 @@ public class Tuple2<T1, T2> { } @Override - public boolean equals(Object o) { - Tuple2<T1, T2> tuple = (Tuple2<T1, T2>) o; + public boolean equals(final Object o) { + final Tuple2<T1, T2> tuple = (Tuple2<T1, T2>) o; return t1.equals((Object) tuple.getT1()) && t2.equals((Object) tuple.getT2()); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java index 2cf4a7a..635b8c1 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/RandomRangeIterator.java @@ -35,7 +35,7 @@ final class RandomRangeIterator implements Iterator<Integer> { private int currentRetryCount; private final Random random = new Random(System.currentTimeMillis()); - RandomRangeIterator(final int tcpPortRangeBegin, final int tcpPortRangeCount, int tryCount) { + RandomRangeIterator(final int tcpPortRangeBegin, final int tcpPortRangeCount, final int tryCount) { this.tcpPortRangeBegin = tcpPortRangeBegin; this.tcpPortRangeCount = tcpPortRangeCount; this.tryCount = tryCount; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/exception/TransportRuntimeException.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/exception/TransportRuntimeException.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/exception/TransportRuntimeException.java index bddc167..d8c329b 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/exception/TransportRuntimeException.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/exception/TransportRuntimeException.java @@ -30,7 +30,7 @@ public class TransportRuntimeException extends RuntimeException { * @param s the detailed message * @param e the cause */ - public TransportRuntimeException(String s, Throwable e) { + public TransportRuntimeException(final String s, final Throwable e) { super(s, e); } @@ -39,7 +39,7 @@ public class TransportRuntimeException extends RuntimeException { * * @param s the detailed message */ - public TransportRuntimeException(String s) { + public TransportRuntimeException(final String s) { super(s); } @@ -48,7 +48,7 @@ public class TransportRuntimeException extends RuntimeException { * * @param e the cause */ - public TransportRuntimeException(Throwable e) { + public TransportRuntimeException(final Throwable e) { super(e); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/AbstractNettyEventListener.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/AbstractNettyEventListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/AbstractNettyEventListener.java index 7d77b76..8dea218 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/AbstractNettyEventListener.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/AbstractNettyEventListener.java @@ -54,7 +54,7 @@ abstract class AbstractNettyEventListener implements NettyEventListener { } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { final Channel channel = ctx.channel(); final byte[] message = (byte[]) msg; @@ -70,7 +70,7 @@ abstract class AbstractNettyEventListener implements NettyEventListener { } @Override - public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) { + public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { final Channel channel = ctx.channel(); LOG.log(Level.WARNING, "ExceptionEvent: local: {0} remote: {1} :: {2}", new Object[]{ channel.localAddress(), channel.remoteAddress(), cause}); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ByteEncoder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ByteEncoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ByteEncoder.java index 2536ea5..0cca5e1 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ByteEncoder.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ByteEncoder.java @@ -29,7 +29,7 @@ public class ByteEncoder implements Encoder<byte[]> { * @see org.apache.reef.wake.remote.Encoder#encode(java.lang.Object) */ @Override - public byte[] encode(byte[] obj) { + public byte[] encode(final byte[] obj) { return obj; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java index df15f87..f55bbeb 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java @@ -64,11 +64,11 @@ public class ChunkedReadWriteHandler extends ChunkedWriteHandler { * org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelEvent) */ @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { if (msg instanceof byte[]) { - byte[] data = (byte[]) msg; + final byte[] data = (byte[]) msg; if (start) { //LOG.log(Level.FINEST, "{0} Starting dechunking of a chunked write", curThrName); @@ -93,7 +93,7 @@ public class ChunkedReadWriteHandler extends ChunkedWriteHandler { // "Creating upstream msg event with the dechunked byte[{1}]", new Object[]{curThrName, expectedSize}); //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "Resetting state to begin another dechunking", // curThrName); - byte[] temp = retArr; + final byte[] temp = retArr; start = true; expectedSize = 0; readBuffer.release(); @@ -117,7 +117,7 @@ public class ChunkedReadWriteHandler extends ChunkedWriteHandler { * the second begins. */ @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception { if (msg instanceof ByteBuf) { @@ -183,7 +183,7 @@ public class ChunkedReadWriteHandler extends ChunkedWriteHandler { private class ByteBufCloseableStream extends ByteBufInputStream { private final ByteBuf buffer; - public ByteBufCloseableStream(ByteBuf buffer) { + public ByteBufCloseableStream(final ByteBuf buffer) { super(buffer); this.buffer = buffer; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java index a9634a6..4c6626c 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/LoggingLinkListener.java @@ -37,7 +37,7 @@ public class LoggingLinkListener<T> implements LinkListener<T> { * Called when the sent message is transferred successfully. */ @Override - public void onSuccess(T message) { + public void onSuccess(final T message) { if (LOG.isLoggable(Level.FINEST)) { LOG.log(Level.FINEST, "The message is successfully sent : {0}", new Object[]{message}); } @@ -47,7 +47,7 @@ public class LoggingLinkListener<T> implements LinkListener<T> { * Called when the sent message to remoteAddress is failed to be transferred. */ @Override - public void onException(Throwable cause, SocketAddress remoteAddress, T message) { + public void onException(final Throwable cause, final SocketAddress remoteAddress, final T message) { if (LOG.isLoggable(Level.FINEST)) { LOG.log(Level.FINEST, "The message to {0} is failed to be sent. message : {1}, cause : {2}" , new Object[]{remoteAddress, message, cause}); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java index 8921486..5b8191c 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/MessagingTransportFactory.java @@ -73,7 +73,7 @@ public class MessagingTransportFactory implements TransportFactory { final EventHandler<TransportEvent> serverHandler, final EventHandler<Exception> exHandler) { - Injector injector = Tang.Factory.getTang().newInjector(); + final Injector injector = Tang.Factory.getTang().newInjector(); injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, this.localAddress); injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, new SyncStage<>(clientHandler)); @@ -84,13 +84,13 @@ public class MessagingTransportFactory implements TransportFactory { transport = injector.getInstance(NettyMessagingTransport.class); transport.registerErrorHandler(exHandler); return transport; - } catch (InjectionException e) { + } catch (final InjectionException e) { throw new RuntimeException(e); } } @Override - public Transport newInstance(final String hostAddress, int port, + public Transport newInstance(final String hostAddress, final int port, final EStage<TransportEvent> clientStage, final EStage<TransportEvent> serverStage, final int numberOfTries, @@ -100,14 +100,14 @@ public class MessagingTransportFactory implements TransportFactory { } @Override - public Transport newInstance(final String hostAddress, int port, + public Transport newInstance(final String hostAddress, final int port, final EStage<TransportEvent> clientStage, final EStage<TransportEvent> serverStage, final int numberOfTries, final int retryTimeout, final TcpPortProvider tcpPortProvider) { - Injector injector = Tang.Factory.getTang().newInjector(); + final Injector injector = Tang.Factory.getTang().newInjector(); injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, hostAddress); injector.bindVolatileParameter(RemoteConfiguration.Port.class, port); injector.bindVolatileParameter(RemoteConfiguration.RemoteClientStage.class, clientStage); @@ -117,7 +117,7 @@ public class MessagingTransportFactory implements TransportFactory { injector.bindVolatileInstance(TcpPortProvider.class, tcpPortProvider); try { return injector.getInstance(NettyMessagingTransport.class); - } catch (InjectionException e) { + } catch (final InjectionException e) { throw new RuntimeException(e); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java index 0f00ea7..011b9c2 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyChannelInitializer.java @@ -40,7 +40,7 @@ class NettyChannelInitializer extends ChannelInitializer<SocketChannel> { } @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(final SocketChannel ch) throws Exception { ch.pipeline() .addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MAXFRAMELENGTH, 0, 4, 0, 4)) .addLast("bytesDecoder", new ByteArrayDecoder()) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java index cb80c7f..833ad3c 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyLink.java @@ -77,7 +77,7 @@ public class NettyLink<T> implements Link<T> { @Override public void write(final T message) { LOG.log(Level.FINEST, "write {0} {1}", new Object[]{channel, message}); - byte[] allData = encoder.encode(message); + final byte[] allData = encoder.encode(message); // byte[] -> ByteBuf if (listener != null) { channel.writeAndFlush(Unpooled.wrappedBuffer(allData)) @@ -124,7 +124,7 @@ class NettyChannelFutureListener<T> implements ChannelFutureListener { } @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { + public void operationComplete(final ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { listener.onSuccess(message); } else { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java index 33d7137..828a10b 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java @@ -114,7 +114,7 @@ public class NettyMessagingTransport implements Transport { @Deprecated public NettyMessagingTransport( final String hostAddress, - int port, + final int port, final EStage<TransportEvent> clientStage, final EStage<TransportEvent> serverStage, final int numberOfTries, @@ -138,7 +138,7 @@ public class NettyMessagingTransport implements Transport { @Inject NettyMessagingTransport( @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress, - @Parameter(RemoteConfiguration.Port.class) int port, + @Parameter(RemoteConfiguration.Port.class) final int port, @Parameter(RemoteConfiguration.RemoteClientStage.class) final EStage<TransportEvent> clientStage, @Parameter(RemoteConfiguration.RemoteServerStage.class) final EStage<TransportEvent> serverStage, @Parameter(RemoteConfiguration.NumberOfTries.class) final int numberOfTries, @@ -146,8 +146,9 @@ public class NettyMessagingTransport implements Transport { final TcpPortProvider tcpPortProvider, final LocalAddressProvider localAddressProvider) { - if (port < 0) { - throw new RemoteRuntimeException("Invalid server port: " + port); + int p = port; + if (p < 0) { + throw new RemoteRuntimeException("Invalid server port: " + p); } final String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress; @@ -181,25 +182,25 @@ public class NettyMessagingTransport implements Transport { .option(ChannelOption.SO_REUSEADDR, true) .childOption(ChannelOption.SO_KEEPALIVE, true); - LOG.log(Level.FINE, "Binding to {0}", port); + LOG.log(Level.FINE, "Binding to {0}", p); Channel acceptorFound = null; try { - if (port > 0) { - acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, port)).sync().channel(); + if (p > 0) { + acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, p)).sync().channel(); } else { - Iterator<Integer> ports = tcpPortProvider.iterator(); + final Iterator<Integer> ports = tcpPortProvider.iterator(); while (acceptorFound == null) { if (!ports.hasNext()) { break; } - port = ports.next(); - LOG.log(Level.FINEST, "Try port {0}", port); + p = ports.next(); + LOG.log(Level.FINEST, "Try port {0}", p); try { - acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, port)).sync().channel(); + acceptorFound = this.serverBootstrap.bind(new InetSocketAddress(host, p)).sync().channel(); } catch (final Exception ex) { if (ex instanceof BindException) { - LOG.log(Level.FINEST, "The port {0} is already bound. Try again", port); + LOG.log(Level.FINEST, "The port {0} is already bound. Try again", p); } else { throw ex; } @@ -208,8 +209,8 @@ public class NettyMessagingTransport implements Transport { } } catch (final Exception ex) { final RuntimeException transportException = - new TransportRuntimeException("Cannot bind to port " + port); - LOG.log(Level.SEVERE, "Cannot bind to port " + port, ex); + new TransportRuntimeException("Cannot bind to port " + p); + LOG.log(Level.SEVERE, "Cannot bind to port " + p, ex); this.clientWorkerGroup.shutdownGracefully(); this.serverBossGroup.shutdownGracefully(); @@ -218,7 +219,7 @@ public class NettyMessagingTransport implements Transport { } this.acceptor = acceptorFound; - this.serverPort = port; + this.serverPort = p; this.localAddress = new InetSocketAddress(host, this.serverPort); LOG.log(Level.FINE, "Starting netty transport socket address: {0}", this.localAddress); @@ -236,7 +237,7 @@ public class NettyMessagingTransport implements Transport { * @deprecated use the constructor that takes a TcpProvider and LocalAddressProvider instead. */ @Deprecated - public NettyMessagingTransport(final String hostAddress, int port, + public NettyMessagingTransport(final String hostAddress, final int port, final EStage<TransportEvent> clientStage, final EStage<TransportEvent> serverStage, final int numberOfTries, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/AbstractRxStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/AbstractRxStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/AbstractRxStage.java index a03a9a4..e076552 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/AbstractRxStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/AbstractRxStage.java @@ -39,7 +39,7 @@ public abstract class AbstractRxStage<T> implements RxStage<T> { * * @param stageName the stage name */ - public AbstractRxStage(String stageName) { + public AbstractRxStage(final String stageName) { this.closed = new AtomicBoolean(false); this.name = stageName; this.inMeter = new Meter(stageName + "_in"); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxSyncStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxSyncStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxSyncStage.java index e287e27..d3ecd8a 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxSyncStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxSyncStage.java @@ -53,8 +53,8 @@ public final class RxSyncStage<T> extends AbstractRxStage<T> { * @param observer the observer */ @Inject - public RxSyncStage(@Parameter(StageName.class) String name, - @Parameter(StageObserver.class) Observer<T> observer) { + public RxSyncStage(@Parameter(StageName.class) final String name, + @Parameter(StageObserver.class) final Observer<T> observer) { super(name); this.observer = observer; StageManager.instance().register(this); @@ -66,7 +66,7 @@ public final class RxSyncStage<T> extends AbstractRxStage<T> { * @param value the new value */ @Override - public void onNext(T value) { + public void onNext(final T value) { beforeOnNext(); observer.onNext(value); afterOnNext(); @@ -79,7 +79,7 @@ public final class RxSyncStage<T> extends AbstractRxStage<T> { * @param error the error */ @Override - public void onError(Exception error) { + public void onError(final Exception error) { observer.onError(error); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxThreadPoolStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxThreadPoolStage.java index 82d1208..01142b3 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxThreadPoolStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/RxThreadPoolStage.java @@ -156,7 +156,7 @@ public final class RxThreadPoolStage<T> extends AbstractRxStage<T> { LOG.log(Level.SEVERE, "Executor terminated due to unrequired timeout"); observer.onError(new TimeoutException()); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { e.printStackTrace(); observer.onError(e); } @@ -177,12 +177,12 @@ public final class RxThreadPoolStage<T> extends AbstractRxStage<T> { completionExecutor.shutdown(); if (!executor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); - List<Runnable> droppedRunnables = executor.shutdownNow(); + final List<Runnable> droppedRunnables = executor.shutdownNow(); LOG.log(Level.WARNING, "Executor dropped " + droppedRunnables.size() + " tasks."); } if (!completionExecutor.awaitTermination(shutdownTimeout, TimeUnit.MILLISECONDS)) { LOG.log(Level.WARNING, "Executor did not terminate in " + shutdownTimeout + "ms."); - List<Runnable> droppedRunnables = completionExecutor.shutdownNow(); + final List<Runnable> droppedRunnables = completionExecutor.shutdownNow(); LOG.log(Level.WARNING, "Completion executor dropped " + droppedRunnables.size() + " tasks."); } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/SimpleSubject.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/SimpleSubject.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/SimpleSubject.java index 62e2259..c83850c 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/SimpleSubject.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/SimpleSubject.java @@ -38,7 +38,7 @@ public class SimpleSubject<T> implements Subject<T, T> { * @param observer the observer */ @Inject - public SimpleSubject(Observer<T> observer) { + public SimpleSubject(final Observer<T> observer) { this.observer = observer; } @@ -48,7 +48,7 @@ public class SimpleSubject<T> implements Subject<T, T> { * @param value the new value */ @Override - public void onNext(T value) { + public void onNext(final T value) { this.observer.onNext(value); } @@ -58,7 +58,7 @@ public class SimpleSubject<T> implements Subject<T, T> { * @param error the error */ @Override - public void onError(Exception error) { + public void onError(final Exception error) { this.observer.onError(error); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java index 5d2d3c9..e43b681 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/rx/impl/TimeoutSubject.java @@ -35,12 +35,12 @@ public class TimeoutSubject<T> implements Subject<T, T> { this.timeBomb = new Thread(new Runnable() { @Override public void run() { - boolean finishedCopy; + final boolean finishedCopy; synchronized (outer) { if (!finished) { try { outer.wait(timeout); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { return; } } @@ -56,8 +56,8 @@ public class TimeoutSubject<T> implements Subject<T, T> { } @Override - public void onNext(T value) { - boolean wasFinished; + public void onNext(final T value) { + final boolean wasFinished; synchronized (this) { wasFinished = finished; if (!finished) { @@ -73,7 +73,7 @@ public class TimeoutSubject<T> implements Subject<T, T> { } @Override - public void onError(Exception error) { + public void onError(final Exception error) { this.timeBomb.interrupt(); destination.onError(error); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileHandlePool.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileHandlePool.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileHandlePool.java index c28c22a..a0779ae 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileHandlePool.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileHandlePool.java @@ -21,11 +21,11 @@ package org.apache.reef.wake.storage; import java.io.FileInputStream; public class FileHandlePool { - public FileInputStream get(StorageIdentifier f) { + public FileInputStream get(final StorageIdentifier f) { return null; } - public void release(StorageIdentifier f, FileInputStream is) { + public void release(final StorageIdentifier f, final FileInputStream is) { } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileIdentifier.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileIdentifier.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileIdentifier.java index 5071196..ac980c3 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileIdentifier.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/FileIdentifier.java @@ -25,7 +25,7 @@ import java.net.URISyntaxException; public class FileIdentifier implements StorageIdentifier { private final File f; - public FileIdentifier(String s) throws URISyntaxException { + public FileIdentifier(final String s) throws URISyntaxException { f = new File(new URI(s)); } @@ -35,7 +35,7 @@ public class FileIdentifier implements StorageIdentifier { } @Override - public boolean equals(Object o) { + public boolean equals(final Object o) { if (!(o instanceof FileIdentifier)) { return false; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java index e84e483..2221f74 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/ReadRequest.java @@ -27,7 +27,7 @@ public class ReadRequest implements Identifiable { final byte[] buf; final Identifier id; - public ReadRequest(StorageIdentifier f, long offset, byte[] buf, Identifier id) { + public ReadRequest(final StorageIdentifier f, final long offset, final byte[] buf, final Identifier id) { this.f = f; this.offset = offset; this.buf = buf; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java index e52f562..a864f53 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/storage/SequentialFileReader.java @@ -29,22 +29,22 @@ public class SequentialFileReader implements EStage<ReadRequest> { final FileHandlePool fdPool = new FileHandlePool(); @Override - public void onNext(ReadRequest value) { - FileInputStream fin = fdPool.get(value.f); + public void onNext(final ReadRequest value) { + final FileInputStream fin = fdPool.get(value.f); int readSoFar = 0; try { synchronized (fin) { fin.reset(); fin.skip(value.offset); while (readSoFar != value.buf.length) { - int ret = fin.read(value.buf, readSoFar, value.buf.length); + final int ret = fin.read(value.buf, readSoFar, value.buf.length); if (ret == -1) { break; } readSoFar += ret; } } - } catch (IOException e) { + } catch (final IOException e) { fdPool.release(value.f, fin); // err.onNext(null); //new ReadError(e)); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java index a174461..44e90b5 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/Time.java @@ -39,7 +39,7 @@ public abstract class Time implements Comparable<Time> { } @Override - public final int compareTo(Time o) { + public final int compareTo(final Time o) { if (this.timestamp < o.timestamp) { return -1; } @@ -56,7 +56,7 @@ public abstract class Time implements Comparable<Time> { } @Override - public final boolean equals(Object o) { + public final boolean equals(final Object o) { if (o instanceof Time) { return compareTo((Time) o) == 0; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java index cc6bee9..6ffbce8 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/LogicalTimer.java @@ -34,13 +34,13 @@ public final class LogicalTimer implements Timer { } @Override - public long getDuration(long time) { + public long getDuration(final long time) { isReady(time); return 0; } @Override - public boolean isReady(long time) { + public boolean isReady(final long time) { if (this.current < time) { this.current = time; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/141c7fa0/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java index 6b162bc..65c75ac 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java @@ -136,7 +136,7 @@ public final class RuntimeClock implements Clock { */ private long findAcceptableStopTime() { long time = timer.getCurrent(); - for (Time t : this.schedule) { + for (final Time t : this.schedule) { if (t instanceof ClientAlarm) { assert (time <= t.getTimeStamp()); time = t.getTimeStamp(); @@ -149,7 +149,7 @@ public final class RuntimeClock implements Clock { @Override public boolean isIdle() { synchronized (this.schedule) { - for (Time t : this.schedule) { + for (final Time t : this.schedule) { if (t instanceof ClientAlarm) { return false; } @@ -237,12 +237,12 @@ public final class RuntimeClock implements Clock { break; // we're done. } } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { // waiting interrupted - return to loop } } this.handlers.onNext(new RuntimeStop(this.timer.getCurrent())); - } catch (Exception e) { + } catch (final Exception e) { e.printStackTrace(); this.handlers.onNext(new RuntimeStop(this.timer.getCurrent(), e)); } finally {
