I have a problem while forwarding data received from tcp to a stream . It
works with hardcoded version but does not work with dynamic balance graph .
If i change the rtcpgraph with rtcpgraphdynamic, data does not flow into
stream as if its run method is missing . Data arrives at publisher sink's
materialized value (ret ), but does not continue further into stream. This
dynamic graph works if i read data from file .
val clsParallelPartialGraph = GraphDSL.create(){
implicit builder =>
val balance : UniformFanOutShape[(Array[Int]),(Array[Int])]
= builder.add(Balance[(Array[Int])](ParallelCount ))
(0 until clsParallelCount).foreach { x =>
balance ~> flowClusterings(x) ~>Sink.ignore
}
SinkShape(balance.in)
}
val connectionRet = connections.to(Sink.foreach { connection =>
printOrNot(s"New connection from:
${connection.remoteAddress}")
val sink1 = Sink.asPublisher[ByteString](fanout = true)
val sourceTemp = Source.maybe[ByteString]
val flowsinksource = Flow.fromSinkAndSourceMat(sink1,
sourceTemp)(Keep.left)
val ret : Publisher[ByteString] =
connection.handleWith(flowsinksource
)
val sourcetcp = Source.fromPublisher(ret)
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.via(Flow[ByteString].map{x =>
val decoded = x.decodeString("UTF-8")
println("decoded :" + decoded)
decoded
})
val rgraphtcp = RunnableGraph.fromGraph(GraphDSL.create(
/*sinkToJsTuple*/){ implicit builder =>
val balance : UniformFanOutShape[(Array[Int]),(Array[Int
] )] = builder.add(Balance[(Array[Int])](ParallelCount ))
sourcetcp ~> flow1tcp ~> flow2 ~> flowMain ~>
balance ~> flowClustering0 ~>Sink.ignore
balance ~> flowClustering1 ~>Sink.ignore
balance ~> flowClustering2 ~>Sink.ignore
ClosedShape
})
val mattcp = rgraphtcp.run()
/*
val rgraphtcpdynamic =
RunnableGraph.fromGraph(GraphDSL.create(){ implicit builder =>
val balance :
UniformFanOutShape[(Array[Int]),(Array[Int] )] =
builder.add(Balance[(Array[Int] )](ParallelCount ))
sourcetcp ~> flow1tcp ~> flow2 ~> flowMain ~>
clsParallelPartialGraph
ClosedShape
})
val mattcp = rgraphtcpdynamic.run()
*/
}).run
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.