change topologicalOrderIterator to topologicalOrderWithCirclesIterator
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/dcd2ca46 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/dcd2ca46 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/dcd2ca46 Branch: refs/heads/master Commit: dcd2ca46335be5682cfd00e64df6a04f99ff8f82 Parents: 55135fb Author: pangolulu <[email protected]> Authored: Tue Feb 2 20:18:31 2016 +0800 Committer: pangolulu <[email protected]> Committed: Tue Feb 2 20:18:31 2016 +0800 ---------------------------------------------------------------------- .../src/main/scala/io/gearpump/streaming/StreamApplication.scala | 2 +- .../main/scala/io/gearpump/streaming/appmaster/ClockService.scala | 2 +- .../src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/dcd2ca46/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala index cba19a4..b68054a 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/StreamApplication.scala @@ -144,7 +144,7 @@ object StreamApplication { LOG.warn(s"Detected cycles in DAG of application $name!") } - val indices = dag.topologicalOrderIterator.toList.zipWithIndex.toMap + val indices = dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap val graph = dag.mapVertex {processor => val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor) updatedProcessor http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/dcd2ca46/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala index 038620a..578a15d 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/appmaster/ClockService.scala @@ -380,7 +380,7 @@ object ClockService { } if (isClockStalling) { - val processorId = dag.graph.topologicalOrderIterator.toList.find { processorId => + val processorId = dag.graph.topologicalOrderWithCirclesIterator.toList.find { processorId => val clock = processorClocks.get(processorId) if (clock.isDefined) { clock.get.min == minClock.appClock http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/dcd2ca46/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala index b3714f2..ee0818d 100644 --- a/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/io/gearpump/streaming/dsl/plan/Planner.scala @@ -55,7 +55,7 @@ class Planner { private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = { val newGraph = dag.mapVertex(op => OpChain(List(op))) - val nodes = newGraph.topologicalOrderIterator.toList.reverse + val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse for (node <- nodes) { val outGoingEdges = newGraph.outgoingEdgesOf(node) for (edge <- outGoingEdges) {
