Repository: incubator-reef Updated Branches: refs/heads/master 4cca54bb8 -> 84ff5021c
http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/implementation/protobuf/ProtocolBufferClassHierarchy.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/implementation/protobuf/ProtocolBufferClassHierarchy.java b/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/implementation/protobuf/ProtocolBufferClassHierarchy.java index e1bd96c..4b0328f 100644 --- a/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/implementation/protobuf/ProtocolBufferClassHierarchy.java +++ b/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/implementation/protobuf/ProtocolBufferClassHierarchy.java @@ -74,7 +74,8 @@ public class ProtocolBufferClassHierarchy implements ClassHierarchy { boolean isExternalConstructor, boolean isUnit, List<ClassHierarchyProto.ConstructorDef> injectableConstructors, List<ClassHierarchyProto.ConstructorDef> otherConstructors, - List<String> implFullNames, Iterable<ClassHierarchyProto.Node> children) { + List<String> implFullNames, + Iterable<ClassHierarchyProto.Node> children) { return ClassHierarchyProto.Node .newBuilder() .setName(name) @@ -91,7 +92,9 @@ public class ProtocolBufferClassHierarchy implements ClassHierarchy { } private static ClassHierarchyProto.Node newNamedParameterNode(String name, - String fullName, String simpleArgClassName, String fullArgClassName, + String fullName, + String simpleArgClassName, + String fullArgClassName, boolean isSet, boolean isList, String documentation, // can be null @@ -121,7 +124,8 @@ public class ProtocolBufferClassHierarchy implements ClassHierarchy { } private static ClassHierarchyProto.Node newPackageNode(String name, - String fullName, Iterable<ClassHierarchyProto.Node> children) { + String fullName, + Iterable<ClassHierarchyProto.Node> children) { return ClassHierarchyProto.Node.newBuilder() .setPackageNode(ClassHierarchyProto.PackageNode.newBuilder().build()) .setName(name).setFullName(fullName).addAllChildren(children).build(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/implementation/types/NamedParameterNodeImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/implementation/types/NamedParameterNodeImpl.java b/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/implementation/types/NamedParameterNodeImpl.java index 248f003..f25781e 100644 --- a/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/implementation/types/NamedParameterNodeImpl.java +++ b/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/implementation/types/NamedParameterNodeImpl.java @@ -32,7 +32,8 @@ public class NamedParameterNodeImpl<T> extends AbstractNode implements private final boolean isList; public NamedParameterNodeImpl(Node parent, String simpleName, - String fullName, String fullArgName, String simpleArgName, boolean isSet, boolean isList, + String fullName, String fullArgName, String simpleArgName, + boolean isSet, boolean isList, String documentation, String shortName, String[] defaultInstanceAsStrings) { super(parent, simpleName, fullName); this.fullArgName = fullArgName; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/ReflectionUtilities.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/ReflectionUtilities.java b/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/ReflectionUtilities.java index ecfb11c..4b701dc 100644 --- a/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/ReflectionUtilities.java +++ b/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/ReflectionUtilities.java @@ -290,7 +290,8 @@ public final class ReflectionUtilities { /** * @param clazz * @return T if clazz implements Name<T>, null otherwise - * @throws org.apache.reef.tang.exceptions.BindException If clazz's definition incorrectly uses Name or @NamedParameter + * @throws org.apache.reef.tang.exceptions.BindException + * If clazz's definition incorrectly uses Name or @NamedParameter */ public static Type getNamedParameterTargetOrNull(Class<?> clazz) throws ClassHierarchyException { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/Tint.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/Tint.java b/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/Tint.java index e95ef60..9760adb 100644 --- a/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/Tint.java +++ b/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/Tint.java @@ -99,11 +99,13 @@ public class Tint { // for(Constructor<?> c : injectConstructors) { // classes.add(c.getDeclaringClass()); // } - Set<String> injectConstructors = r.getStore().getConstructorsAnnotatedWith(ReflectionUtilities.getFullName(Inject.class)); + Set<String> injectConstructors = + r.getStore().getConstructorsAnnotatedWith(ReflectionUtilities.getFullName(Inject.class)); for (String s : injectConstructors) { strings.add(s.replaceAll("\\.<.+$", "")); } - Set<String> parameterConstructors = r.getStore().get(MethodParameterScanner.class, ReflectionUtilities.getFullName(Parameter.class)); + Set<String> parameterConstructors = + r.getStore().get(MethodParameterScanner.class, ReflectionUtilities.getFullName(Parameter.class)); for (String s : parameterConstructors) { strings.add(s.replaceAll("\\.<.+$", "")); } @@ -111,9 +113,11 @@ public class Tint { // for(Constructor<?> c : parameterConstructors) { // classes.add(c.getDeclaringClass()); // } - Set<String> defaultStrings = r.getStore().get(TypeAnnotationsScanner.class, ReflectionUtilities.getFullName(DefaultImplementation.class)); + Set<String> defaultStrings = + r.getStore().get(TypeAnnotationsScanner.class, ReflectionUtilities.getFullName(DefaultImplementation.class)); strings.addAll(defaultStrings); - strings.addAll(r.getStore().get(TypeAnnotationsScanner.class, ReflectionUtilities.getFullName(NamedParameter.class))); + strings.addAll(r.getStore().get(TypeAnnotationsScanner.class, + ReflectionUtilities.getFullName(NamedParameter.class))); strings.addAll(r.getStore().get(TypeAnnotationsScanner.class, ReflectionUtilities.getFullName(Unit.class))); // classes.addAll(r.getTypesAnnotatedWith(DefaultImplementation.class)); // classes.addAll(r.getTypesAnnotatedWith(NamedParameter.class)); @@ -121,10 +125,12 @@ public class Tint { strings.addAll(r.getStore().get(SubTypesScanner.class, ReflectionUtilities.getFullName(Name.class))); - moduleBuilders.addAll(r.getStore().get(SubTypesScanner.class, ReflectionUtilities.getFullName(ConfigurationModuleBuilder.class))); + moduleBuilders.addAll(r.getStore().get(SubTypesScanner.class, + ReflectionUtilities.getFullName(ConfigurationModuleBuilder.class))); // classes.addAll(r.getSubTypesOf(Name.class)); - ch = Tang.Factory.getTang().getDefaultClassHierarchy(jars, (Class<? extends ExternalConstructor<?>>[]) new Class[0]); + ch = Tang.Factory.getTang().getDefaultClassHierarchy(jars, + (Class<? extends ExternalConstructor<?>>[]) new Class[0]); // for(String s : defaultStrings) { // if(classFilter(checkTang, s)) { // try { @@ -312,7 +318,8 @@ public class Tint { out.println("<html><head><title>TangDoc</title>"); out.println("<style>"); - out.println("body { font-family: 'Segoe UI', 'Comic Sans MS'; font-size:12pt; font-weight: 200; margin: 1em; column-count: 2; }"); + out.println("body { font-family: 'Segoe UI', 'Comic Sans MS'; font-size:12pt; font-weight: 200; " + + "margin: 1em; column-count: 2; }"); out.println(".package { font-size:18pt; font-weight: 500; column-span: all; }"); // out.println(".class { break-after: never; }"); // out.println(".doc { break-before: never; }"); @@ -328,10 +335,8 @@ public class Tint { out.println("</style>"); out.println("</head><body>"); -// out.println("<table border='1'><tr><th>Type</th><th>Name</th><th>Default value</th><th>Documentation</th><th>Used by</th><th>Set by</th></tr>"); String currentPackage = ""; -// int numcols = 0; for (final Node n : t.getNamesUsedAndSet()) { String fullName = n.getFullName(); String[] tok = fullName.split("\\."); @@ -348,13 +353,7 @@ public class Tint { currentPackage = pack; out.println(t.endPackage()); out.println(t.startPackage(currentPackage)); -// numcols = 0; -// out.println("<div class='row'>"); } -// numcols++; -// if(numcols == NUMCOLS) { -// out.println("</div><div class='row'>"); -// } if (n instanceof NamedParameterNode<?>) { out.println(t.toHtmlString((NamedParameterNode<?>) n, currentPackage)); } else if (n instanceof ClassNode<?>) { @@ -365,12 +364,12 @@ public class Tint { } out.println("</div>"); out.println(t.endPackage()); -// out.println("</table>"); out.println("<div class='package'>Module definitions</div>"); for (final Field f : t.modules.keySet()) { String moduleName = ReflectionUtilities.getFullName(f); // String declaringClassName = ReflectionUtilities.getFullName(f.getDeclaringClass()); - out.println("<div class='module-margin' id='" + moduleName + "'><div class='decl'><span class='fullName'>" + moduleName + "</span>"); + out.println("<div class='module-margin' id='" + moduleName + "'><div class='decl'><span class='fullName'>" + + moduleName + "</span>"); out.println("<pre>"); String conf = t.modules.get(f).toPrettyString(); String[] tok = conf.split("\n"); @@ -398,14 +397,18 @@ public class Tint { e.printStackTrace(); } String typ = clz.isInterface() ? "interface" : "class"; - out.println("<div class='module-margin' id='" + c.getFullName() + "'><div class='decl'><span class='fullName'>" + typ + " " + c.getFullName() + "</span>"); + out.println("<div class='module-margin' id='" + c.getFullName() + "'><div class='decl'>" + + "<span class='fullName'>" + typ + " " + c.getFullName() + "</span>"); for (ConstructorDef<?> d : c.getInjectableConstructors()) { out.println("<div class='uses'>" + c.getFullName() + "("); for (ConstructorArg a : d.getArgs()) { if (a.getNamedParameterName() != null) { - out.print("<div class='constructorArg'><a href='#" + a.getType() + "'>" + stripPrefix(a.getType(), "xxx") + "</a> <a href='#" + a.getNamedParameterName() + "'>" + a.getNamedParameterName() + "</a></div>"); + out.print("<div class='constructorArg'><a href='#" + a.getType() + "'>" + + stripPrefix(a.getType(), "xxx") + "</a> <a href='#" + a.getNamedParameterName() + "'>" + + a.getNamedParameterName() + "</a></div>"); } else { - out.print("<div class='constructorArg'><a href='#" + a.getType() + "'>" + stripPrefix(a.getType(), "xxx") + "</a></div>"); + out.print("<div class='constructorArg'><a href='#" + a.getType() + "'>" + + stripPrefix(a.getType(), "xxx") + "</a></div>"); } } out.println(")</div>"); @@ -427,7 +430,7 @@ public class Tint { } private boolean classFilter(boolean checkTang, String s) { - return (checkTang || /*s.startsWith("org.apache.reef.tang.examples.timer") ||*/ !s.startsWith("org.apache.reef.tang")); + return (checkTang || !s.startsWith("org.apache.reef.tang")); } private void processDefaultAnnotation(Class<?> cmb) { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/ValidateConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/ValidateConfiguration.java b/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/ValidateConfiguration.java index 31f4b96..9ba6450 100644 --- a/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/ValidateConfiguration.java +++ b/lang/java/reef-tang/tang/src/main/java/org/apache/reef/tang/util/ValidateConfiguration.java @@ -71,7 +71,8 @@ public class ValidateConfiguration { public static void main(String[] argv) throws IOException, BindException, InjectionException { @SuppressWarnings("unchecked") - JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder((Class<? extends ExternalConstructor<?>>[]) new Class[]{FileParser.class}); + JavaConfigurationBuilder cb = Tang.Factory.getTang().newConfigurationBuilder( + (Class<? extends ExternalConstructor<?>>[]) new Class[]{FileParser.class}); CommandLine cl = new CommandLine(cb); cl.processCommandLine(argv, Target.class, http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java index 1f1fd34..40e56a5 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/examples/p2p/Pull2Push.java @@ -79,7 +79,8 @@ public final class Pull2Push<T> implements Runnable, AutoCloseable { this.output.onNext(message); } else { // The message source has returned null as the next message. We drop the message source in that case. - Logger.getLogger(Pull2Push.class.getName()).log(Level.INFO, "Droping message source {0} from the queue", nextSource.toString()); + Logger.getLogger(Pull2Push.class.getName()).log(Level.INFO, "Droping message source {0} from the queue", + nextSource.toString()); } } } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java index 4b6a3f7..05cc7ef 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/DefaultIdentifierFactory.java @@ -82,7 +82,8 @@ public class DefaultIdentifierFactory implements IdentifierFactory { Object[] args = new Object[1]; args[0] = str.substring(index + 3); return constructor.newInstance(args); - } catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + } catch (NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | + IllegalArgumentException | InvocationTargetException e) { e.printStackTrace(); throw new RemoteRuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java index 46d6191..11048fc 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/IndependentIterationsThreadPoolStage.java @@ -33,7 +33,8 @@ import java.util.logging.Logger; * can be fired in any order. * * @param numThreads fixed number of threads available in the pool - * @param granularity maximum number of events executed serially. The right choice will balance task spawn overhead with parallelism. + * @param granularity maximum number of events executed serially. + * The right choice will balance task spawn overhead with parallelism. */ public class IndependentIterationsThreadPoolStage<T> extends AbstractEStage<List<T>> { http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/WakeSharedPool.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/WakeSharedPool.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/WakeSharedPool.java index d46a71f..546747e 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/WakeSharedPool.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/impl/WakeSharedPool.java @@ -56,12 +56,13 @@ public class WakeSharedPool implements Stage { @Inject public WakeSharedPool(@Parameter(Parallelism.class) int parallelism) { - this.pool = new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, new Thread.UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - // TODO: need to pass this upwards to REEF can grab it - } - }, + this.pool = new ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, + new Thread.UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + // TODO: need to pass this upwards to REEF can grab it + } + }, // async mode turned on so a task that invokes other tasks does not have to join on them. // this is appropriate for event-based tasks, where once you submit an event to a stage it // is always fire-and-forget. @@ -83,7 +84,9 @@ public class WakeSharedPool implements Stage { public void submit(ForkJoinTask<?> t) { if (ForkJoinTask.inForkJoinPool()) { - ForkJoinTask.invokeAll(t); // alternatively just pool().pool.execute(t), which simply forces it to be this pool (right now we expect only one anyway) + ForkJoinTask.invokeAll(t); + // alternatively just pool().pool.execute(t), which simply forces it to be this pool + // (right now we expect only one anyway) } else { pool.submit(t); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/profiler/WakeProfiler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/profiler/WakeProfiler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/profiler/WakeProfiler.java index a46a03b..927815a 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/profiler/WakeProfiler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/profiler/WakeProfiler.java @@ -97,7 +97,8 @@ public class WakeProfiler implements Aspect { @SuppressWarnings("unchecked") @Override - public <T> T inject(ConstructorDef<T> constructorDef, Constructor<T> constructor, Object[] args) throws InvocationTargetException, IllegalAccessException, IllegalArgumentException, InstantiationException { + public <T> T inject(ConstructorDef<T> constructorDef, Constructor<T> constructor, Object[] args) + throws InvocationTargetException, IllegalAccessException, IllegalArgumentException, InstantiationException { // LOG.info("inject" + constructor + "->" + args.length); Vertex<?>[] vArgs = new Vertex[args.length]; for (int i = 0; i < args.length; i++) { @@ -243,7 +244,8 @@ public class WakeProfiler implements Aspect { if (stat != null) { long cnt = stat.messageCount.get(); long lat = stat.sumLatency.get(); - tooltip = ",\"count\":" + cnt + ",\"latency\":\"" + (((double) lat) / (((double) cnt) * 1000000.0) + "\""); // quote the latency, since it might be nan + tooltip = ",\"count\":" + cnt + ",\"latency\":\"" + (((double) lat) / (((double) cnt) * 1000000.0) + "\""); + // quote the latency, since it might be nan } else { tooltip = null; } @@ -272,14 +274,16 @@ public class WakeProfiler implements Aspect { Integer off = offVertex.get(futureTarget); LOG.fine("future target " + futureTarget + " off = " + off); if (off != null) { - links.add("{\"target\":" + offVertex.get(v) + ",\"source\":" + off + ",\"value\":" + 1.0 + ",\"back\":true}"); + links.add("{\"target\":" + offVertex.get(v) + ",\"source\":" + off + ",\"value\":" + 1.0 + + ",\"back\":true}"); } } else { Integer off = offVertex.get(w); if (off != null) { Stats s = stats.get(w.getObject()); if (s != null) { - links.add("{\"source\":" + offVertex.get(v) + ",\"target\":" + off + ",\"value\":" + (s.messageCount.get() + 3.0) + "}"); + links.add("{\"source\":" + offVertex.get(v) + ",\"target\":" + off + ",\"value\":" + + (s.messageCount.get() + 3.0) + "}"); } else { links.add("{\"source\":" + offVertex.get(v) + ",\"target\":" + off + ",\"value\":" + 1.0 + "}"); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java index 0ec3427..81223f8 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteConfiguration.java @@ -36,7 +36,8 @@ public final class RemoteConfiguration { // Intentionally empty } - @NamedParameter(short_name = "rm_host", doc = "The host address to be used for messages.", default_value = "##UNKNOWN##") + @NamedParameter(short_name = "rm_host", doc = "The host address to be used for messages.", + default_value = "##UNKNOWN##") public static final class HostAddress implements Name<String> { // Intentionally empty } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java index e954ec3..74a6660 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManager.java @@ -56,7 +56,9 @@ public interface RemoteManager extends Stage { * @param theHandler the event handler * @return the subscription that can be used to unsubscribe later */ - <T, U extends T> AutoCloseable registerHandler(final RemoteIdentifier sourceIdentifier, final Class<U> messageType, final EventHandler<T> theHandler); + <T, U extends T> AutoCloseable registerHandler(final RemoteIdentifier sourceIdentifier, + final Class<U> messageType, + final EventHandler<T> theHandler); /** * Registers the given EventHandler to be called for the given message type @@ -69,7 +71,8 @@ public interface RemoteManager extends Stage { * @param theHandler the event handler * @return the subscription that can be used to unsubscribe later */ - <T, U extends T> AutoCloseable registerHandler(final Class<U> messageType, final EventHandler<RemoteMessage<T>> theHandler); + <T, U extends T> AutoCloseable registerHandler(final Class<U> messageType, + final EventHandler<RemoteMessage<T>> theHandler); /** * Register an EventHandler that gets called by Wake in the presence of http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java index 4c741de..354a160 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/RemoteManagerFactory.java @@ -72,7 +72,8 @@ public interface RemoteManagerFactory { * @param errorHandler the error handler invoked for exceptions by the returned RemoteManager. * @param orderingGuarantee whether or not the returned RemoteManager should guarantee message orders. * @param numberOfTries the number of retries before the returned RemoteManager declares sending a failure. - * @param retryTimeout the time (in ms) after which the returned RemoteManager considers a sending attempt failed. + * @param retryTimeout the time (in ms) after which the returned RemoteManager considers a sending attempt + * failed. * @param <T> the message type sent / received by the returned RemoteManager. * @return a new instance of RemoteManager with all parameters but the given one injected via Tang. */ @@ -95,9 +96,12 @@ public interface RemoteManagerFactory { * @param errorHandler the error handler invoked for exceptions by the returned RemoteManager. * @param orderingGuarantee whether or not the returned RemoteManager should guarantee message orders. * @param numberOfTries the number of retries before the returned RemoteManager declares sending a failure. - * @param retryTimeout the time (in ms) after which the returned RemoteManager considers a sending attempt failed. - * @param localAddressProvider the LocalAddressProvider used by the returned RemoteManager to determine the address to bind to. - * @param tcpPortProvider the TcpPortProvider used by the returned RemoteManager to determine the port to listen to. + * @param retryTimeout the time (in ms) after which the returned RemoteManager considers a sending attempt + * failed. + * @param localAddressProvider the LocalAddressProvider used by the returned RemoteManager to determine the address + * to bind to. + * @param tcpPortProvider the TcpPortProvider used by the returned RemoteManager to determine the port + * to listen to. * @param <T> the message type sent / received by the returned RemoteManager. * @return a new instance of RemoteManager with all parameters but the given one injected via Tang. */ http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java index 5fd05d4..902722d 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/LegacyLocalAddressProvider.java @@ -75,7 +75,8 @@ public final class LegacyLocalAddressProvider implements LocalAddressProvider { } } if (sortedAddrs.isEmpty()) { - throw new WakeRuntimeException("This machine apparently doesn't have any IP addresses (not even 127.0.0.1) on interfaces that are up."); + throw new WakeRuntimeException("This machine apparently doesn't have any IP addresses (not even 127.0.0.1) " + + "on interfaces that are up."); } cached.set(sortedAddrs.pollFirst().getHostAddress()); LOG.log(Level.FINE, "Local address is {0}", cached.get()); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java index 83247b6..5dd2851 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java @@ -135,7 +135,8 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { new RemoteReceiverStage(this.handlerContainer, errorHandler, 10); this.transport = tpFactory.newInstance( - hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout, tcpPortProvider); + hostAddress, listeningPort, this.reRecvStage, this.reRecvStage, numberOfTries, retryTimeout, + tcpPortProvider); this.handlerContainer.setTransport(this.transport); @@ -145,7 +146,8 @@ public class DefaultRemoteManagerImplementation implements RemoteManager { this.reSendStage = new RemoteSenderStage(codec, this.transport, 10); StageManager.instance().register(this); - LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} listening on {3}:{4}. Binding address provided by {5}", + LOG.log(Level.FINEST, "RemoteManager {0} instantiated id {1} counter {2} listening on {3}:{4}. " + + "Binding address provided by {5}", new Object[]{this.name, this.myIdentifier, COUNTER.incrementAndGet(), this.transport.getLocalAddress().toString(), this.transport.getListeningPort(), localAddressProvider} http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java index b53ace0..d05bb55 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/HandlerContainer.java @@ -57,7 +57,8 @@ final class HandlerContainer<T> implements EventHandler<RemoteEvent<byte[]>> { } public AutoCloseable registerHandler(final RemoteIdentifier sourceIdentifier, - final Class<? extends T> messageType, final EventHandler<? extends T> theHandler) { + final Class<? extends T> messageType, + final EventHandler<? extends T> theHandler) { final Tuple2<RemoteIdentifier, Class<? extends T>> tuple = new Tuple2<RemoteIdentifier, Class<? extends T>>(sourceIdentifier, messageType); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java index e9ae2d7..1fad3cd 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/MultiCodec.java @@ -42,8 +42,10 @@ public class MultiCodec<T> implements Codec<T> { * @param clazzToDecoderMap */ public MultiCodec(Map<Class<? extends T>, Codec<? extends T>> clazzToCodecMap) { - Map<Class<? extends T>, Encoder<? extends T>> clazzToEncoderMap = new HashMap<Class<? extends T>, Encoder<? extends T>>(); - Map<Class<? extends T>, Decoder<? extends T>> clazzToDecoderMap = new HashMap<Class<? extends T>, Decoder<? extends T>>(); + Map<Class<? extends T>, Encoder<? extends T>> clazzToEncoderMap = + new HashMap<Class<? extends T>, Encoder<? extends T>>(); + Map<Class<? extends T>, Decoder<? extends T>> clazzToDecoderMap = + new HashMap<Class<? extends T>, Decoder<? extends T>>(); for (Class<? extends T> clazz : clazzToCodecMap.keySet()) { clazzToEncoderMap.put(clazz, clazzToCodecMap.get(clazz)); clazzToDecoderMap.put(clazz, clazzToCodecMap.get(clazz)); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java index 2e76dac..d5302f7 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/OrderedRemoteReceiverStage.java @@ -55,11 +55,15 @@ public class OrderedRemoteReceiverStage implements EStage<TransportEvent> { public OrderedRemoteReceiverStage(EventHandler<RemoteEvent<byte[]>> handler, EventHandler<Throwable> errorHandler) { this.streamMap = new ConcurrentHashMap<SocketAddress, OrderedEventStream>(); - this.pushExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory(OrderedRemoteReceiverStage.class.getName() + "_Push")); - this.pullExecutor = Executors.newCachedThreadPool(new DefaultThreadFactory(OrderedRemoteReceiverStage.class.getName() + "_Pull")); - - this.pullStage = new ThreadPoolStage<OrderedEventStream>(new OrderedPullEventHandler(handler), this.pullExecutor, errorHandler); - this.pushStage = new ThreadPoolStage<TransportEvent>(new OrderedPushEventHandler(streamMap, pullStage), this.pushExecutor, errorHandler); // for decoupling + this.pushExecutor = Executors.newCachedThreadPool( + new DefaultThreadFactory(OrderedRemoteReceiverStage.class.getName() + "_Push")); + this.pullExecutor = Executors.newCachedThreadPool( + new DefaultThreadFactory(OrderedRemoteReceiverStage.class.getName() + "_Pull")); + + this.pullStage = new ThreadPoolStage<OrderedEventStream>( + new OrderedPullEventHandler(handler), this.pullExecutor, errorHandler); + this.pushStage = new ThreadPoolStage<TransportEvent>( + new OrderedPushEventHandler(streamMap, pullStage), this.pushExecutor, errorHandler); // for decoupling } @Override @@ -192,7 +196,8 @@ class OrderedEventStream { ++nextSeq; return event; } else { - LOG.log(Level.FINER, "Event sequence is {0} does not match expected {1}", new Object[]{event.getSeq(), nextSeq}); + LOG.log(Level.FINER, "Event sequence is {0} does not match expected {1}", + new Object[]{event.getSeq(), nextSeq}); } } else { LOG.log(Level.FINER, "Event is null"); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java index 6a9fa06..8f763ae 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/ProxyEventHandler.java @@ -48,8 +48,10 @@ public class ProxyEventHandler<T> implements EventHandler<T> { * @param reStage the sender stage * @throws RemoteRuntimeException */ - public ProxyEventHandler(RemoteIdentifier myId, RemoteIdentifier remoteId, String remoteSinkName, EventHandler<RemoteEvent<T>> handler, RemoteSeqNumGenerator seqGen) { - LOG.log(Level.FINE, "ProxyEventHandler myId: {0} remoteId: {1} remoteSink: {2} handler: {3}", new Object[]{myId, remoteId, remoteSinkName, handler}); + public ProxyEventHandler(RemoteIdentifier myId, RemoteIdentifier remoteId, String remoteSinkName, + EventHandler<RemoteEvent<T>> handler, RemoteSeqNumGenerator seqGen) { + LOG.log(Level.FINE, "ProxyEventHandler myId: {0} remoteId: {1} remoteSink: {2} handler: {3}", + new Object[]{myId, remoteId, remoteSinkName, handler}); if (!(myId instanceof SocketRemoteIdentifier && remoteId instanceof SocketRemoteIdentifier)) { throw new RemoteRuntimeException("Unsupported remote identifier type"); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java index efec46f..7abf64e 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteEventDecoder.java @@ -53,7 +53,8 @@ public class RemoteEventDecoder<T> implements Decoder<RemoteEvent<T>> { WakeMessagePBuf pbuf; try { pbuf = WakeMessagePBuf.parseFrom(data); - return new RemoteEvent<T>(null, null, pbuf.getSource(), pbuf.getSink(), pbuf.getSeq(), decoder.decode(pbuf.getData().toByteArray())); + return new RemoteEvent<T>(null, null, pbuf.getSource(), pbuf.getSink(), pbuf.getSeq(), + decoder.decode(pbuf.getData().toByteArray())); } catch (InvalidProtocolBufferException e) { throw new RemoteRuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java index 308afc9..65b9461 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/RemoteSenderEventHandler.java @@ -108,7 +108,8 @@ class RemoteSenderEventHandler<T> implements EventHandler<RemoteEvent<T>> { // consumeQueue(); if (LOG.isLoggable(Level.FINEST)) { - LOG.log(Level.FINEST, "Send an event from " + linkRef.get().getLocalAddress() + " to " + linkRef.get().getRemoteAddress() + " value " + value); + LOG.log(Level.FINEST, "Send an event from " + linkRef.get().getLocalAddress() + " to " + + linkRef.get().getRemoteAddress() + " value " + value); } linkRef.get().write(encoder.encode(value)); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Transport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Transport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Transport.java index 9979588..51ef7b3 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Transport.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/Transport.java @@ -44,7 +44,8 @@ public interface Transport extends Stage { * @return a link associated with the address * @throws IOException */ - <T> Link<T> open(SocketAddress remoteAddr, Encoder<? super T> encoder, LinkListener<? super T> listener) throws IOException; + <T> Link<T> open(SocketAddress remoteAddr, Encoder<? super T> encoder, LinkListener<? super T> listener) + throws IOException; /** * Returns a link for the remote address if already cached; otherwise, returns null. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java index bc0d35c..df15f87 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/ChunkedReadWriteHandler.java @@ -60,7 +60,8 @@ public class ChunkedReadWriteHandler extends ChunkedWriteHandler { private byte[] retArr; /** - * @see org.jboss.netty.handler.stream.ChunkedWriteHandler#handleUpstream(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelEvent) + * @see org.jboss.netty.handler.stream.ChunkedWriteHandler#handleUpstream( + * org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelEvent) */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { @@ -72,21 +73,26 @@ public class ChunkedReadWriteHandler extends ChunkedWriteHandler { if (start) { //LOG.log(Level.FINEST, "{0} Starting dechunking of a chunked write", curThrName); expectedSize = getSize(data); - // LOG.log(Level.FINEST, "Expected Size = {0}. Wrapping byte[{1}] into a ChannelBuffer", new Object[]{expectedSize,expectedSize}); + // LOG.log(Level.FINEST, "Expected Size = {0}. Wrapping byte[{1}] into a ChannelBuffer", + // new Object[]{expectedSize,expectedSize}); retArr = new byte[expectedSize]; readBuffer = Unpooled.wrappedBuffer(retArr); readBuffer.clear(); - //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, curThrName + "read buffer: cur sz = " + readBuffer.writerIndex() + " + " + (data.length - INT_SIZE) + " bytes will added by current chunk"); + //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, curThrName + "read buffer: cur sz = " + + // readBuffer.writerIndex() + " + " + (data.length - INT_SIZE) + " bytes will added by current chunk"); readBuffer.writeBytes(data, INT_SIZE, data.length - INT_SIZE); - //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, curThrName + "read buffer: new sz = " + readBuffer.writerIndex()); + //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, curThrName + "read buffer: new sz = " + + // readBuffer.writerIndex()); start = false; } else { readBuffer.writeBytes(data); } if (readBuffer.writerIndex() == expectedSize) { - //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "{0} Dechunking complete. Creating upstream msg event with the dechunked byte[{1}]", new Object[]{curThrName, expectedSize}); - //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "Resetting state to begin another dechunking", curThrName); + //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "{0} Dechunking complete." + + // "Creating upstream msg event with the dechunked byte[{1}]", new Object[]{curThrName, expectedSize}); + //if (LOG.isLoggable(Level.FINEST)) LOG.log(Level.FINEST, "Resetting state to begin another dechunking", + // curThrName); byte[] temp = retArr; start = true; expectedSize = 0; http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java index 92f59c1..cfb386e 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java @@ -157,9 +157,12 @@ public class NettyMessagingTransport implements Transport { this.clientEventListener = new NettyClientEventListener(this.addrToLinkRefMap, clientStage); this.serverEventListener = new NettyServerEventListener(this.addrToLinkRefMap, serverStage); - this.serverBossGroup = new NioEventLoopGroup(SERVER_BOSS_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + "ServerBoss")); - this.serverWorkerGroup = new NioEventLoopGroup(SERVER_WORKER_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + "ServerWorker")); - this.clientWorkerGroup = new NioEventLoopGroup(CLIENT_WORKER_NUM_THREADS, new DefaultThreadFactory(CLASS_NAME + "ClientWorker")); + this.serverBossGroup = new NioEventLoopGroup(SERVER_BOSS_NUM_THREADS, + new DefaultThreadFactory(CLASS_NAME + "ServerBoss")); + this.serverWorkerGroup = new NioEventLoopGroup(SERVER_WORKER_NUM_THREADS, + new DefaultThreadFactory(CLASS_NAME + "ServerWorker")); + this.clientWorkerGroup = new NioEventLoopGroup(CLIENT_WORKER_NUM_THREADS, + new DefaultThreadFactory(CLASS_NAME + "ClientWorker")); this.clientBootstrap = new Bootstrap(); this.clientBootstrap.group(this.clientWorkerGroup) http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java index 16166e6..e08f220 100644 --- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java +++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/time/runtime/RuntimeClock.java @@ -57,8 +57,10 @@ public final class RuntimeClock implements Clock { RuntimeClock(final Timer timer, @Parameter(Clock.StartHandler.class) final InjectionFuture<Set<EventHandler<StartTime>>> startHandler, @Parameter(StopHandler.class) final InjectionFuture<Set<EventHandler<StopTime>>> stopHandler, - @Parameter(Clock.RuntimeStartHandler.class) final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler, - @Parameter(Clock.RuntimeStopHandler.class) final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler, + @Parameter(Clock.RuntimeStartHandler.class) + final InjectionFuture<Set<EventHandler<RuntimeStart>>> runtimeStartHandler, + @Parameter(Clock.RuntimeStopHandler.class) + final InjectionFuture<Set<EventHandler<RuntimeStop>>> runtimeStopHandler, @Parameter(IdleHandler.class) final InjectionFuture<Set<EventHandler<IdleClock>>> idleHandler) { this.timer = timer; this.schedule = new TreeSet<>(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/EvaluatorListSerializer.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/EvaluatorListSerializer.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/EvaluatorListSerializer.java index 3acfd92..b92b39e 100644 --- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/EvaluatorListSerializer.java +++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/EvaluatorListSerializer.java @@ -36,7 +36,9 @@ public interface EvaluatorListSerializer { * @param startTime * @return */ - AvroEvaluatorList toAvro(final Map<String, EvaluatorDescriptor> evaluatorMap, final int totalEvaluators, final String startTime); + AvroEvaluatorList toAvro(final Map<String, EvaluatorDescriptor> evaluatorMap, + final int totalEvaluators, + final String startTime); /** * Convert AvroEvaluatorList to JSon string. http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpHandler.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpHandler.java index 128e187..4d7406f 100644 --- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpHandler.java +++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpHandler.java @@ -46,5 +46,6 @@ public interface HttpHandler { * @param parsedHttpRequest * @param response */ - void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) throws IOException, ServletException; + void onHttpRequest(final ParsedHttpRequest parsedHttpRequest, final HttpServletResponse response) + throws IOException, ServletException; } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java index d17c9dc..6cef8e0 100644 --- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java +++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java @@ -113,7 +113,8 @@ public final class HttpServerImpl implements HttpServer { } /** - * It will be called from RuntimeStartHandler. As the Jetty server has been started at initialization phase, no need to start here. + * It will be called from RuntimeStartHandler. + * As the Jetty server has been started at initialization phase, no need to start here. * * @throws Exception */ http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java index f53744b..8d092c1 100644 --- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java +++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerReefEventHandler.java @@ -150,12 +150,16 @@ public final class HttpServerReefEventHandler implements HttpHandler { response.getWriter().println("Killing"); break; case "duration": - final ArrayList<String> lines = LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.DURATION, LoggingScopeImpl.TOKEN, null); + final ArrayList<String> lines = + LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.DURATION, LoggingScopeImpl.TOKEN, null); writeLines(response, lines, "Performance..."); break; case "stages": - final ArrayList<String> starts = LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.START_PREFIX, logLevelPrefix, null); - final ArrayList<String> exits = LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.EXIT_PREFIX, logLevelPrefix, LoggingScopeImpl.DURATION); + final ArrayList<String> starts = + LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.START_PREFIX, logLevelPrefix, null); + final ArrayList<String> exits = + LogParser.getFilteredLinesFromFile(driverStderrFile, LoggingScopeImpl.EXIT_PREFIX, logLevelPrefix, + LoggingScopeImpl.DURATION); final ArrayList<String> startsStages = LogParser.findStages(starts, LogParser.startIndicators); final ArrayList<String> endStages = LogParser.findStages(exits, LogParser.endIndicators); final ArrayList<String> result = LogParser.mergeStages(startsStages, endStages); @@ -324,7 +328,8 @@ public final class HttpServerReefEventHandler implements HttpHandler { final DriverInfoSerializer serializer = Tang.Factory.getTang().newInjector().getInstance(DriverInfoSerializer.class); final AvroDriverInfo driverInfo = serializer.toAvro( - this.reefStateManager.getDriverEndpointIdentifier(), this.reefStateManager.getStartTime(), this.reefStateManager.getServicesInfo()); + this.reefStateManager.getDriverEndpointIdentifier(), this.reefStateManager.getStartTime(), + this.reefStateManager.getServicesInfo()); writeResponse(response, serializer.toString(driverInfo)); } catch (final InjectionException e) { LOG.log(Level.SEVERE, "Error in injecting DriverInfoSerializer.", e); @@ -358,7 +363,8 @@ public final class HttpServerReefEventHandler implements HttpHandler { writer.println(String.format("Services registered on Driver:")); writer.write("<br/><br/>"); for (final AvroReefServiceInfo service : this.reefStateManager.getServicesInfo()) { - writer.println(String.format("Service: [%s] , Information: [%s]", service.getServiceName(), service.getServiceInfo())); + writer.println(String.format("Service: [%s] , Information: [%s]", service.getServiceName(), + service.getServiceInfo())); writer.write("<br/><br/>"); } @@ -372,7 +378,8 @@ public final class HttpServerReefEventHandler implements HttpHandler { * @param header * @throws IOException */ - private void writeLines(final HttpServletResponse response, final ArrayList<String> lines, final String header) throws IOException { + private void writeLines(final HttpServletResponse response, final ArrayList<String> lines, final String header) + throws IOException { LOG.log(Level.INFO, "HttpServerReefEventHandler writeLines is called"); final PrintWriter writer = response.getWriter(); http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/JettyHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/JettyHandler.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/JettyHandler.java index 6886e03..205ce48 100644 --- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/JettyHandler.java +++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/JettyHandler.java @@ -87,7 +87,8 @@ class JettyHandler extends AbstractHandler { final ParsedHttpRequest parsedHttpRequest = new ParsedHttpRequest(request); final HttpHandler handler = validate(request, response, parsedHttpRequest); if (handler != null) { - LOG.log(Level.INFO, "calling HttpHandler.onHttpRequest from JettyHandler.handle() for {0}.", handler.getUriSpecification()); + LOG.log(Level.INFO, "calling HttpHandler.onHttpRequest from JettyHandler.handle() for {0}.", + handler.getUriSpecification()); handler.onHttpRequest(parsedHttpRequest, response); response.setStatus(HttpServletResponse.SC_OK); } @@ -118,7 +119,8 @@ class JettyHandler extends AbstractHandler { final HttpHandler handler = eventHandlers.get(specification.toLowerCase()); if (handler == null) { - writeMessage(response, String.format("No event handler registered for: [%s].", specification), HttpServletResponse.SC_NOT_FOUND); + writeMessage(response, String.format("No event handler registered for: [%s].", specification), + HttpServletResponse.SC_NOT_FOUND); return null; } @@ -137,7 +139,8 @@ class JettyHandler extends AbstractHandler { * @param status * @throws IOException */ - private void writeMessage(final HttpServletResponse response, final String message, final int status) throws IOException { + private void writeMessage(final HttpServletResponse response, final String message, final int status) + throws IOException { response.getWriter().println(message); response.setStatus(status); } http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/84ff5021/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java index fc8b859..6e2848a 100644 --- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java +++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/ReefEventStateManager.java @@ -151,7 +151,8 @@ public final class ReefEventStateManager { public void registerServiceInfo(AvroReefServiceInfo serviceInfo) { synchronized (this.serviceInfoList) { serviceInfoList.add(serviceInfo); - LOG.log(Level.INFO, "Registered Service [{0}] with Info [{1}]", new Object[]{serviceInfo.getServiceName(), serviceInfo.getServiceInfo()}); + LOG.log(Level.INFO, "Registered Service [{0}] with Info [{1}]", + new Object[]{serviceInfo.getServiceName(), serviceInfo.getServiceInfo()}); } }
