Repository: incubator-gearpump Updated Branches: refs/heads/master 55d6d2f56 -> 175b08e64
[GEARPUMP-349] Optimize Graph topologicalOrderIterator performance Author: huafengw <[email protected]> Closes #223 from huafengw/graph. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/175b08e6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/175b08e6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/175b08e6 Branch: refs/heads/master Commit: 175b08e64c3363a2da9c61e05dac5f01d1f8db2d Parents: 55d6d2f Author: huafengw <[email protected]> Authored: Mon Sep 18 21:36:08 2017 +0800 Committer: manuzhang <[email protected]> Committed: Mon Sep 18 21:36:13 2017 +0800 ---------------------------------------------------------------------- .../scala/org/apache/gearpump/util/Graph.scala | 190 +++++++++++-------- .../org/apache/gearpump/util/GraphSpec.scala | 6 +- .../experiments/pagerank/PageRankWorker.scala | 4 +- .../akkastream/graph/GraphPartitioner.scala | 8 +- .../materializer/LocalMaterializerImpl.scala | 8 +- .../materializer/RemoteMaterializerImpl.scala | 4 +- .../storm/util/GraphBuilderSpec.scala | 4 +- .../gearpump/services/util/UpickleSpec.scala | 4 +- .../org/apache/gearpump/streaming/DAG.scala | 2 +- .../gearpump/streaming/StreamApplication.scala | 6 +- .../streaming/appmaster/ClockService.scala | 2 +- .../gearpump/streaming/dsl/plan/Planner.scala | 2 +- .../org/apache/gearpump/streaming/DAGSpec.scala | 2 +- .../streaming/dsl/plan/PlannerSpec.scala | 2 +- .../streaming/dsl/scalaapi/StreamAppSpec.scala | 4 +- .../streaming/dsl/scalaapi/StreamSpec.scala | 6 +- 16 files changed, 139 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/core/src/main/scala/org/apache/gearpump/util/Graph.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/Graph.scala b/core/src/main/scala/org/apache/gearpump/util/Graph.scala index f110f5f..5b48050 100644 --- a/core/src/main/scala/org/apache/gearpump/util/Graph.scala +++ b/core/src/main/scala/org/apache/gearpump/util/Graph.scala @@ -17,31 +17,34 @@ */ package org.apache.gearpump.util -import scala.annotation.tailrec + import scala.collection.mutable import scala.language.implicitConversions +import scala.util.{Failure, Success, Try} /** * Generic mutable Graph libraries. */ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serializable { - - private val _vertices = mutable.Set.empty[N] - private val _edges = mutable.Set.empty[(N, E, N)] + private val LOG = LogUtil.getLogger(getClass) + private val vertices = mutable.Set.empty[N] + private val edges = mutable.Set.empty[(N, E, N)] + private val outEdges = mutable.Map.empty[N, mutable.Set[(N, E, N)]] + private val inEdges = mutable.Map.empty[N, mutable.Set[(N, E, N)]] // This is used to ensure the output of this Graph is always stable - // Like method vertices(), or edges() - private var _indexs = Map.empty[Any, Int] - private var _nextIndex = 0 + // Like method getVertices(), or getEdges() + private var indexs = Map.empty[Any, Int] + private var nextIndex = 0 private def nextId: Int = { - val result = _nextIndex - _nextIndex += 1 + val result = nextIndex + nextIndex += 1 result } private def init(): Unit = { - Option(vertexList).getOrElse(List.empty[N]).foreach(addVertex(_)) - Option(edgeList).getOrElse(List.empty[(N, E, N)]).foreach(addEdge(_)) + Option(vertexList).getOrElse(List.empty[N]).foreach(addVertex) + Option(edgeList).getOrElse(List.empty[(N, E, N)]).foreach(addEdge) } init() @@ -51,20 +54,22 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * Current Graph is changed. */ def addVertex(vertex: N): Unit = { - val result = _vertices.add(vertex) + val result = vertices.add(vertex) if (result) { - _indexs += vertex -> nextId + indexs += vertex -> nextId } } /** - * Add a edge + * Add an edge * Current Graph is changed. */ def addEdge(edge: (N, E, N)): Unit = { - val result = _edges.add(edge) + val result = edges.add(edge) if (result) { - _indexs += edge -> nextId + indexs += edge -> nextId + outEdges += edge._1 -> (outgoingEdgesOf(edge._1) + edge) + inEdges += edge._3 -> (incomingEdgesOf(edge._3) + edge) } } @@ -72,37 +77,44 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * return all vertices. * The result is stable */ - def vertices: List[N] = { + def getVertices: List[N] = { // Sorts the vertex so that we can keep the order for mapVertex - _vertices.toList.sortBy(_indexs(_)) + vertices.toList.sortBy(indexs(_)) } /** * out degree */ def outDegreeOf(node: N): Int = { - edges.count(_._1 == node) + outgoingEdgesOf(node).size } /** * in degree */ def inDegreeOf(node: N): Int = { - edges.count(_._3 == node) + incomingEdgesOf(node).size } /** * out going edges. */ - def outgoingEdgesOf(node: N): List[(N, E, N)] = { - edges.filter(_._1 == node) + def outgoingEdgesOf(node: N): mutable.Set[(N, E, N)] = { + outEdges.getOrElse(node, mutable.Set.empty) } /** * incoming edges. */ - def incomingEdgesOf(node: N): List[(N, E, N)] = { - edges.filter(_._3 == node) + def incomingEdgesOf(node: N): mutable.Set[(N, E, N)] = { + inEdges.getOrElse(node, mutable.Set.empty) + } + + /** + * adjacent vertices. + */ + private def adjacentVertices(node: N): List[N] = { + outgoingEdgesOf(node).map(_._3).toList } /** @@ -110,10 +122,12 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * Current Graph is changed. */ def removeVertex(node: N): Unit = { - _vertices.remove(node) - _indexs -= node + vertices.remove(node) + indexs -= node val toBeRemoved = incomingEdgesOf(node) ++ outgoingEdgesOf(node) - toBeRemoved.foreach(removeEdge(_)) + toBeRemoved.foreach(removeEdge) + outEdges -= node + inEdges -= node } /** @@ -121,8 +135,10 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * Current Graph is changed. */ private def removeEdge(edge: (N, E, N)): Unit = { - _indexs -= edge - _edges.remove(edge) + indexs -= edge + edges.remove(edge) + inEdges.update(edge._3, inEdges(edge._3) - edge) + outEdges.update(edge._1, outEdges(edge._1) - edge) } /** @@ -140,14 +156,14 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * Current Graph is not changed. */ def mapVertex[NewNode](fun: N => NewNode): Graph[NewNode, E] = { - val vertexes = vertices.map(node => (node, fun(node))) + val newVertices = getVertices.map(node => (node, fun(node))) - val vertexMap: Map[N, NewNode] = vertexes.toMap + val vertexMap: Map[N, NewNode] = newVertices.toMap - val newEdges = edges.map { edge => + val newEdges = getEdges.map { edge => (vertexMap(edge._1), edge._2, vertexMap(edge._3)) } - new Graph(vertexes.map(_._2), newEdges) + new Graph(newVertices.map(_._2), newEdges) } /** @@ -155,24 +171,25 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * Current graph is not changed. */ def mapEdge[NewEdge](fun: (N, E, N) => NewEdge): Graph[N, NewEdge] = { - val newEdges = edges.map { edge => + val newEdges = getEdges.map { edge => (edge._1, fun(edge._1, edge._2, edge._3), edge._3) } - new Graph(vertices, newEdges) + new Graph(getVertices, newEdges) } /** * edges connected to node */ def edgesOf(node: N): List[(N, E, N)] = { - (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toSet[(N, E, N)].toList.sortBy(_indexs(_)) + (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toList } /** * all edges */ - def edges: List[(N, E, N)] = { - _edges.toList.sortBy(_indexs(_)) + def getEdges: List[(N, E, N)] = { + // Sorts the edges so that we can keep the order for mapEdges + edges.toList.sortBy(indexs(_)) } /** @@ -180,8 +197,8 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * Current graph is changed. */ def addGraph(other: Graph[N, E]): Graph[N, E] = { - (vertices ++ other.vertices).foreach(addVertex(_)) - (edges ++ other.edges).foreach(edge => addEdge(edge._1, edge._2, edge._3)) + (getVertices ++ other.getVertices).foreach(addVertex) + (getEdges ++ other.getEdges).foreach(edge => addEdge(edge._1, edge._2, edge._3)) this } @@ -189,15 +206,15 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * clone the graph */ def copy: Graph[N, E] = { - new Graph(vertices, edges) + new Graph(getVertices, getEdges) } /** * check empty */ def isEmpty: Boolean = { - val vertexCount = vertices.size - val edgeCount = edges.length + val vertexCount = getVertices.size + val edgeCount = getEdges.length if (vertexCount + edgeCount == 0) { true } else { @@ -233,8 +250,8 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial } private def removeZeroInDegree: List[N] = { - val toBeRemoved = vertices.filter(inDegreeOf(_) == 0).sortBy(_indexs(_)) - toBeRemoved.foreach(removeVertex(_)) + val toBeRemoved = getVertices.filter(inDegreeOf(_) == 0) + toBeRemoved.foreach(removeVertex) toBeRemoved } @@ -243,13 +260,38 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * The node returned by Iterator is stable sorted. */ def topologicalOrderIterator: Iterator[N] = { - val newGraph = copy - var output = List.empty[N] + tryTopologicalOrderIterator match { + case Success(iterator) => iterator + case Failure(_) => + LOG.warn("Please note this graph is cyclic.") + topologicalOrderWithCirclesIterator + } + } - while (!newGraph.isEmpty) { - output ++= newGraph.removeZeroInDegree + private def tryTopologicalOrderIterator: Try[Iterator[N]] = { + Try { + var indegreeMap = getVertices.map(v => v -> inDegreeOf(v)).toMap + + val verticesWithZeroIndegree = mutable.Queue(indegreeMap.filter(_._2 == 0).keys + .toList.sortBy(indexs(_)): _*) + var output = List.empty[N] + var count = 0 + while (verticesWithZeroIndegree.nonEmpty) { + val vertice = verticesWithZeroIndegree.dequeue() + adjacentVertices(vertice).foreach { adjacentV => + indegreeMap += adjacentV -> (indegreeMap(adjacentV) - 1) + if (indegreeMap(adjacentV) == 0) { + verticesWithZeroIndegree.enqueue(adjacentV) + } + } + output :+= vertice + count += 1 + } + if (count != getVertices.size) { + throw new Exception("There exists a cycle in the graph") + } + output.iterator } - output.iterator } /** @@ -278,18 +320,18 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial edge => { if (!indexMap.contains(edge._3)) { tarjan(edge._3) - if (lowLink.get(edge._3).get < lowLink.get(node).get) { + if (lowLink(edge._3) < lowLink(node)) { lowLink(node) = lowLink(edge._3) } } else { - if (inStack.get(edge._3).get && (indexMap.get(edge._3).get < lowLink.get(node).get)) { + if (inStack(edge._3) && (indexMap(edge._3) < lowLink(node))) { lowLink(node) = indexMap(edge._3) } } } } - if (indexMap.get(node).get == lowLink.get(node).get) { + if (indexMap(node) == lowLink(node)) { val circle = mutable.MutableList.empty[N] var n = node do { @@ -301,7 +343,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial } } - vertices.foreach { + getVertices.foreach { node => { if (!indexMap.contains(node)) tarjan(node) } @@ -318,12 +360,8 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * http://www.drdobbs.com/database/topological-sorting/184410262 */ def topologicalOrderWithCirclesIterator: Iterator[N] = { - if (hasCycle()) { - val topo = getAcyclicCopy().topologicalOrderIterator - topo.flatMap(_.sortBy(_indexs(_)).iterator) - } else { - topologicalOrderIterator - } + val topo = getAcyclicCopy().topologicalOrderIterator + topo.flatMap(_.sortBy(indexs(_)).iterator) } private def getAcyclicCopy(): Graph[mutable.MutableList[N], E] = { @@ -337,13 +375,13 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial for (circle1 <- circles; circle2 <- circles; if circle1 != circle2) yield { for (node1 <- circle1; node2 <- circle2) yield { - var edges = outgoingEdgesOf(node1) - for (edge <- edges; if edge._3 == node2) yield { + var outgoingEdges = outgoingEdgesOf(node1) + for (edge <- outgoingEdges; if edge._3 == node2) yield { newGraph.addEdge(circle1, edge._2, circle2) } - edges = outgoingEdgesOf(node2) - for (edge <- edges; if edge._3 == node1) yield { + outgoingEdges = outgoingEdgesOf(node2) + for (edge <- outgoingEdges; if edge._3 == node1) yield { newGraph.addEdge(circle2, edge._2, circle1) } } @@ -355,26 +393,14 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial * check whether there is a loop */ def hasCycle(): Boolean = { - @tailrec - def detectCycle(graph: Graph[N, E]): Boolean = { - if (graph.edges.isEmpty) { - false - } else if (graph.vertices.nonEmpty && !graph.vertices.exists(graph.inDegreeOf(_) == 0)) { - true - } else { - graph.removeZeroInDegree - detectCycle(graph) - } - } - - detectCycle(copy) + tryTopologicalOrderIterator.isFailure } /** * Check whether there are two edges connecting two nodes. */ def hasDuplicatedEdge(): Boolean = { - edges.groupBy(edge => (edge._1, edge._3)).values.exists(_.size > 1) + getEdges.groupBy(edge => (edge._1, edge._3)).values.exists(_.size > 1) } /** @@ -391,7 +417,7 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial val toBeRemovedLists = newGraph.removeZeroInDegree val maxLength = toBeRemovedLists.map(_.length).max for (subGraph <- toBeRemovedLists) { - val sorted = subGraph.sortBy(_indexs) + val sorted = subGraph.sortBy(indexs) for (i <- sorted.indices) { output += sorted(i) -> (level + i) } @@ -402,8 +428,8 @@ class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serial } override def toString: String = { - Map("vertices" -> vertices.mkString(","), - "edges" -> edges.mkString(",")).toString() + Map("vertices" -> getVertices.mkString(","), + "edges" -> getEdges.mkString(",")).toString() } } @@ -436,7 +462,7 @@ object Graph { } def unapply[N, E](graph: Graph[N, E]): Option[(List[N], List[(N, E, N)])] = { - Some((graph.vertices, graph.edges)) + Some((graph.getVertices, graph.getEdges)) } def empty[N, E]: Graph[N, E] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala b/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala index 6663513..256ac9a 100644 --- a/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala +++ b/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala @@ -34,7 +34,7 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers { property("Graph with no edges should be built correctly") { val vertexSet = Set("A", "B", "C") val graph = Graph(vertexSet.toSeq.map(Node): _*) - graph.vertices.toSet shouldBe vertexSet + graph.getVertices.toSet shouldBe vertexSet } property("Graph with vertices and edges should be built correctly") { @@ -67,7 +67,7 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers { } val graph: Graph[Vertex, Edge] = Graph(graphElements: _*) - graph.vertices should contain theSameElementsAs vertices + graph.getVertices should contain theSameElementsAs vertices 0.until(vertices.size).foreach { i => val v = vertices(i) @@ -129,7 +129,7 @@ class GraphSpec extends PropSpec with PropertyChecks with Matchers { val newGraph = graph.copy newGraph.addVertex("C") - assert(!graph.vertices.toSet.contains("C"), "Graph should be immutable") + assert(!graph.getVertices.toSet.contains("C"), "Graph should be immutable") } property("subGraph should return a sub-graph for certain vertex") { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala ---------------------------------------------------------------------- diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala index e033bf1..2f0bbf2 100644 --- a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala +++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala @@ -37,13 +37,13 @@ class PageRankWorker(taskContext: TaskContext, conf: UserConfig) extends Task(ta private val graph = conf.getValue[Graph[NodeWithTaskId[_], AnyRef]](PageRankApplication.DAG).get - private val node = graph.vertices.find { node => + private val node = graph.getVertices.find { node => node.taskId == taskContext.taskId.index }.get private val downstream = graph.outgoingEdgesOf(node).map(_._3) .map(id => taskId.copy(index = id.taskId)).toSeq - private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).length + private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).size LOG.info(s"downstream nodes: $downstream") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala index d764331..ed5a10d 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala @@ -69,7 +69,7 @@ class GraphPartitioner(strategy: Strategy) { val local = Graph.empty[Module, Edge] val remote = Graph.empty[Module, Edge] - graph.vertices.foreach{ module => + graph.getVertices.foreach{ module => if (tags(module) == Local) { local.addVertex(module) } else { @@ -77,7 +77,7 @@ class GraphPartitioner(strategy: Strategy) { } } - graph.edges.foreach{ nodeEdgeNode => + graph.getEdges.foreach{ nodeEdgeNode => val (node1, edge, node2) = nodeEdgeNode (tags(node1), tags(node2)) match { case (Local, Local) => @@ -115,14 +115,14 @@ class GraphPartitioner(strategy: Strategy) { } private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = { - graph.vertices.map{vertex => + graph.getVertices.map{ vertex => vertex -> strategy.apply(vertex) }.toMap } private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = { val graph = inputGraph.copy - val dummies = graph.vertices.filter {module => + val dummies = graph.getVertices.filter { module => module match { case dummy: DummyModule => true http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala index 477f4d3..7d07ca8 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala @@ -210,10 +210,10 @@ case class LocalMaterializerImpl ( def buildToplevelModule(graph: GGraph[Module, Edge]): Module = { var moduleInProgress: Module = EmptyModule - graph.vertices.foreach(module => { + graph.getVertices.foreach(module => { moduleInProgress = moduleInProgress.compose(module) }) - graph.edges.foreach(value => { + graph.getEdges.foreach(value => { val (node1, edge, node2) = value moduleInProgress = moduleInProgress.wire(edge.from, edge.to) }) @@ -232,7 +232,7 @@ case class LocalMaterializerImpl ( session.materializeAtomic(module.asInstanceOf[AtomicModule], module.attributes, matV) matV.get(module) } - materializedGraph.edges.foreach { nodeEdgeNode => + materializedGraph.getEdges.foreach { nodeEdgeNode => val (node1, edge, node2) = nodeEdgeNode val from = edge.from val to = edge.to @@ -248,7 +248,7 @@ case class LocalMaterializerImpl ( case _ => } } - val matValSources = graph.vertices.flatMap(module => { + val matValSources = graph.getVertices.flatMap(module => { val rt: Option[MaterializedValueSource[_]] = module match { case graphStage: GraphStageModule => graphStage.stage match { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala index a62b8e3..a2a5185 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala @@ -86,7 +86,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { private def junctionConfig(processorIds: Map[Module, ProcessorId]): Map[ProcessorId, UserConfig] = { - val updatedConfigs = graph.vertices.flatMap { vertex => + val updatedConfigs = graph.getVertices.flatMap { vertex => buildShape(vertex, processorIds) }.toMap updatedConfigs @@ -119,7 +119,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) { Map[Module, ProcessorId] = { ids.flatMap { kv => val (module, id) = kv - val processorId = app.dag.vertices.find { processor => + val processorId = app.dag.getVertices.find { processor => processor.taskConf.getString(id).isDefined }.map(_.id) processorId.map((module, _)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala index 9cf5009..fb89268 100644 --- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala +++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala @@ -45,8 +45,8 @@ class GraphBuilderSpec extends WordSpec with Matchers with MockitoSugar { val graph = GraphBuilder.build(topology) - graph.edges.size shouldBe 1 - val (from, edge, to) = graph.edges.head + graph.getEdges.size shouldBe 1 + val (from, edge, to) = graph.getEdges.head from shouldBe sourceProcessor edge shouldBe a[StormPartitioner] to shouldBe targetProcessor http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala index 2bf6b6b..c5b3990 100644 --- a/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala @@ -43,8 +43,8 @@ class UpickleSpec extends FlatSpec with Matchers with BeforeAndAfterEach { val deserialized = read[Graph[Int, String]](serialized) - graph.vertices.toSet shouldBe deserialized.vertices.toSet - graph.edges.toSet shouldBe deserialized.edges.toSet + graph.getVertices.toSet shouldBe deserialized.getVertices.toSet + graph.getEdges.toSet shouldBe deserialized.getEdges.toSet } "MetricType" should "be able to serialize/deserialize correctly" in { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala index 8ad74f8..c43af2f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala @@ -49,7 +49,7 @@ case class DAG(version: Int, processors : Map[ProcessorId, ProcessorDescription] object DAG { def apply(graph: Graph[ProcessorDescription, PartitionerDescription], version: Int = 0): DAG = { - val processors = graph.vertices.map { processorDescription => + val processors = graph.getVertices.map { processorDescription => (processorDescription.id, processorDescription) }.toMap val dag = graph.mapVertex { processor => http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala index f15e1b3..60a3897 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -150,11 +150,7 @@ object StreamApplication { name: String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = { import org.apache.gearpump.streaming.Processor._ - if (dag.hasCycle()) { - LOG.warn(s"Detected cycles in DAG of application $name!") - } - - val indices = dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap + val indices = dag.topologicalOrderIterator.toList.zipWithIndex.toMap val graph = dag.mapVertex { processor => val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor) updatedProcessor http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala index 90141d4..57602c5 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala @@ -397,7 +397,7 @@ object ClockService { } if (isClockStalling) { - val processorId = dag.graph.topologicalOrderWithCirclesIterator.toList.find { processorId => + val processorId = dag.graph.topologicalOrderIterator.toList.find { processorId => val clock = processorClocks.get(processorId) if (clock.isDefined) { clock.get.min == minClock.appClock http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala index 04b5337..77083f0 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala @@ -67,7 +67,7 @@ class Planner { private def optimize(dag: Graph[Op, OpEdge]) (implicit system: ActorSystem): Graph[Op, OpEdge] = { val graph = dag.copy - val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse + val nodes = graph.topologicalOrderIterator.toList.reverse for (node <- nodes) { val outGoingEdges = graph.outgoingEdgesOf(node) for (edge <- outGoingEdges) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala index ccda8f0..f9e2efd 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala @@ -38,7 +38,7 @@ class DAGSpec extends PropSpec with PropertyChecks with Matchers { dag.processors.size shouldBe 1 assert(dag.taskCount == parallelism) dag.tasks.sortBy(_.index) shouldBe (0 until parallelism).map(index => TaskId(0, index)) - dag.graph.edges shouldBe empty + dag.graph.getEdges shouldBe empty } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala index 70d21b5..be4cc63 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -86,7 +86,7 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc val plan = planner.plan(graph) .mapVertex(_.description) - plan.vertices.toSet should contain theSameElementsAs + plan.getVertices.toSet should contain theSameElementsAs Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", "processor", "sink") plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe a[GroupByPartitioner[_, _]] http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala index c8c8b9f..d43bca0 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala @@ -58,8 +58,8 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M application.name shouldBe "dsl" val dag = application.userConfig .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get - dag.vertices.size shouldBe 2 - dag.vertices.foreach { processor => + dag.getVertices.size shouldBe 2 + dag.getVertices.foreach { processor => processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName if (processor.description == "A.globalWindows") { processor.parallelism shouldBe 2 http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/175b08e6/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala index ef8f932..0b8abcd 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala @@ -85,8 +85,10 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock } val expectedDagTopology = getExpectedDagTopology - dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet - dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet + dagTopology.getVertices.toSet should + contain theSameElementsAs expectedDagTopology.getVertices.toSet + dagTopology.getEdges.toSet should + contain theSameElementsAs expectedDagTopology.getEdges.toSet } private def getExpectedDagTopology: Graph[String, String] = {
