Repository: reef Updated Branches: refs/heads/master dd724ffdc -> 24e73feb9
[REEF-1186] Remove wrong inject annotation on OperatorTopologyImpl This removes the wrong inject annotation and fixes some typos. JIRA: [REEF-1186](https://issues.apache.org/jira/browse/REEF-1186) Pull Request: This closes #821 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/24e73feb Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/24e73feb Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/24e73feb Branch: refs/heads/master Commit: 24e73feb98c575669217360fbb507a325e0147f5 Parents: dd724ff Author: Dongjoon Hyun <[email protected]> Authored: Thu Feb 4 00:00:25 2016 -0800 Committer: Markus Weimer <[email protected]> Committed: Thu Feb 4 15:34:25 2016 -0800 ---------------------------------------------------------------------- .../group/impl/task/OperatorTopologyImpl.java | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/24e73feb/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java index 86e6f55..a5ac964 100644 --- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java +++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/group/impl/task/OperatorTopologyImpl.java @@ -34,7 +34,6 @@ import org.apache.reef.wake.EStage; import org.apache.reef.wake.EventHandler; import org.apache.reef.wake.impl.SingleThreadStage; -import javax.inject.Inject; import java.util.Arrays; import java.util.HashSet; import java.util.Map; @@ -62,7 +61,7 @@ public class OperatorTopologyImpl implements OperatorTopology { private OperatorTopologyStruct baseTopology; private OperatorTopologyStruct effectiveTopology; - private final ResettingCountDownLatch topologyLockAquired = new ResettingCountDownLatch(1); + private final ResettingCountDownLatch topologyLockAcquired = new ResettingCountDownLatch(1); private final AtomicBoolean updatingTopo = new AtomicBoolean(false); private final EventHandler<GroupCommunicationMessage> baseTopologyUpdateHandler = new BaseTopologyUpdateHandler(); @@ -79,7 +78,6 @@ public class OperatorTopologyImpl implements OperatorTopology { dataHandlingStageHandler, 10000); - @Inject public OperatorTopologyImpl(final Class<? extends Name<String>> groupName, final Class<? extends Name<String>> operName, final String selfId, final String driverId, final Sender sender, final int version) { @@ -127,8 +125,8 @@ public class OperatorTopologyImpl implements OperatorTopology { case UpdateTopology: updatingTopo.set(true); baseTopologyUpdateStage.onNext(msg); - topologyLockAquired.awaitAndReset(1); - LOG.finest(getQualifiedName() + "topoLockAquired CDL released. Resetting it to new CDL"); + topologyLockAcquired.awaitAndReset(1); + LOG.finest(getQualifiedName() + "topoLockAcquired CDL released. Resetting it to new CDL"); sendAckToDriver(msg); break; @@ -329,7 +327,7 @@ public class OperatorTopologyImpl implements OperatorTopology { throw new ParentDeadException(getQualifiedName() + "Parent dead. Current behavior is for the child to die too."); } else { - LOG.finest(getQualifiedName() + "Updating basetopology struct"); + LOG.finest(getQualifiedName() + "Updating baseTopology struct"); baseTopology.update(msg); sendAckToDriver(msg); } @@ -428,7 +426,7 @@ public class OperatorTopologyImpl implements OperatorTopology { * Unlike Dead msgs this needs to be synchronized because data msgs are not * routed through the base topo changes So we need to make sure to wait for * updateTopo to complete and for the new effective topo to take effect. Hence - * updatinTopo is set to false in refreshEffTopo. Also, since this is called + * updatingTopo is set to false in refreshEffTopo. Also, since this is called * from a netty IO thread, we need to create a stage to move the msgs from * netty space to application space and release the netty threads. Otherwise * weird deadlocks can happen Ex: Sent model to k nodes using broadcast. Send @@ -447,12 +445,12 @@ public class OperatorTopologyImpl implements OperatorTopology { dataMsg}); LOG.finest(getQualifiedName() + "Waiting to acquire topoLock"); synchronized (topologyLock) { - LOG.finest(getQualifiedName() + "Aqcuired topoLock"); + LOG.finest(getQualifiedName() + "Acquired topoLock"); while (updatingTopo.get()) { try { LOG.finest(getQualifiedName() + "Topology is being updated. Released topoLock, Waiting on topoLock"); topologyLock.wait(); - LOG.finest(getQualifiedName() + "Aqcuired topoLock"); + LOG.finest(getQualifiedName() + "Acquired topoLock"); } catch (final InterruptedException e) { throw new RuntimeException("InterruptedException while data handling" + "stage was waiting for updatingTopo to become false", e); @@ -480,8 +478,8 @@ public class OperatorTopologyImpl implements OperatorTopology { LOG.finest(getQualifiedName() + "Waiting to acquire topoLock"); synchronized (topologyLock) { LOG.finest(getQualifiedName() + "Acquired topoLock"); - LOG.finest(getQualifiedName() + "Releasing topoLoackAcquired CDL"); - topologyLockAquired.countDown(); + LOG.finest(getQualifiedName() + "Releasing topoLockAcquired CDL"); + topologyLockAcquired.countDown(); try { updateBaseTopology(); LOG.finest(getQualifiedName() + "Completed updating base & effective topologies");
