Hi again,

I have a very specific case. My flow looks like this one:


The idea of multi input/output shape was to redirect messages to a right 
output based on the message data.

I just learn streams, so maybe you can suggest a better solution?




> It's not clear to me, what are you trying to accomplish. It looks like you 
> are trying to implement AmorphousShape (ie. arbitrary number of open inlets 
> and outlets) on your own, and then a specific variant of it, that has all 
> inlets sharing the same type, and all outlets sharing another type. The 
> "Fan" fragment in the names you used is a bit misleading, since in Akka 
> Stream's own usage of it names like FanIn / FanOut shape mean that such 
> grap has many inlets and single outlet / single inlet many outlets. The 
> analogy is to a Chinese-style hand held fan, rather than ceiling fan with 
> many blades :) I am wondering what use case you have in mind for your 
> AmorphousShape because the graphs that can be materialized and executed 
> must ultimately have a ClosedShape. You could use such multi-outlet graphs 
> for reusing pieces of functionality, but anything more complex than a 
> BidiShape  seems  rather unwieldy to me.
> My understanding is that Graph's shape should not interfere with message 
> flow, because it's just a canvas with contact points on the perimeter. What 
> matters are the components that you plug into it. Akka just makes sure that 
> you don't leave any of the contact points dangling. This makes me think 
> that the problems with messages getting "stuck" was caused somewhere other 
> than graph shape construction site.
> Have you tried inserting probes alon the lines of Flow.alsoTo(Sink.foreach(_ 
> => println("beep!"))) (shooting from the hip here, apologies if it does 
> not compile straight away) into your graph? That could help you locate 
> where the messages are stuck / discarded.
> Cheers,
> Rafał
>> Hi,
>> I am trying to create my own akka streams shape with several Inlets and 
>> Outlets. I have written following code: 
>> package kernel.modeller.workers.streamFinder.generic
>> import akka.stream.{Shape, Outlet, Inlet}
>> import scala.annotation.unchecked.uncheckedVariance
>> import scala.collection.immutable
>> object FanShape {
>>   sealed trait Init[_] {
>>     def inlets: immutable.Seq[Inlet[_]]
>>     def outlets: immutable.Seq[Outlet[_]]
>>     def name: String
>>   }
>>   final case class Name[_](override val name: String) extends Init[Any] {
>>     override def inlets: immutable.Seq[Inlet[_]] = Nil
>>     override def outlets: immutable.Seq[Outlet[_]] = Nil
>>   }
>>   final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]], 
>> override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
>>     override def name: String = "FanShape"
>>   }
>> }
>> abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out: 
>> Iterator[Outlet[_]], _name: String) extends Shape {
>>   import FanShape._
>>   def this(init: FanShape.Init[_]) = this(init.inlets.iterator, 
>> init.outlets.iterator, init.name)
>>   final override def outlets: immutable.Seq[Outlet[_]] = _outlets
>>   final override def inlets: immutable.Seq[Inlet[_]] = _inlets
>>   private var _outlets: Vector[Outlet[_]] = Vector.empty
>>   private var _inlets: Vector[Inlet[_]] = Vector.empty
>>   protected def newOutlet[T](name: String): Outlet[T] = {
>>     val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else 
>> Outlet[T](s"${_name}.$name")
>>     _outlets :+= p
>>     p
>>   }
>>   protected def newInlet[T](name: String): Inlet[T] = {
>>     val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else 
>> Inlet[T](s"${_name}.$name")
>>     _inlets :+= p
>>     p
>>   }
>>   protected def construct(init: Init[_]): FanShape[_]
>>   def deepCopy(): FanShape[_] = construct(Ports(inlets.map(_.carbonCopy()), 
>> outlets.map(_.carbonCopy())))
>>   final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: 
>> immutable.Seq[Outlet[_]]): FanShape[_] = {
>>     require(outlets.size == _outlets.size, s"proposed outlets 
>> [${outlets.mkString(", ")}] do not fit FanShape")
>>     require(inlets.size == _inlets.size, s"proposed inlects 
>> [${inlets.mkString(", ")}] do not fit FanShape")
>>     construct(Ports(inlets, outlets))
>>   }
>> }
>> object UniformFanShape {
>>   def apply[I, O](inlets: Array[Inlet[I]], outlets: Outlet[O]*): 
>> UniformFanShape[I, O] =
>>     new UniformFanShape(inlets.size, outlets.size, 
>> FanShape.Ports(inlets.toList, outlets.toList))
>> }
>> class UniformFanShape[-I, +O](n: Int, m: Int, _init: FanShape.Init[_]) 
>> extends FanShape(_init) {
>>   def this(n: Int, m: Int) = this (n, m, FanShape.Name("UniformFan"))
>>   def this(n: Int, m: Int, name: String) = this(n, m, FanShape.Name(name))
>>   def this(inlets: Array[Inlet[I]], outlets: Array[Outlet[O]]) = 
>> this(inlets.size, outlets.size, FanShape.Ports(inlets.toList, 
>> outlets.toList))
>>   override protected def construct(init: FanShape.Init[_]): FanShape[_] = 
>> new UniformFanShape(n, m, init)
>>   override def deepCopy(): UniformFanShape[I, O] = 
>> super.deepCopy().asInstanceOf[UniformFanShape[I, O]]
>>   val inArray: Array[Inlet[I @uncheckedVariance]] = Array.tabulate(n)(i ⇒ 
>> newInlet[I](s"in$i"))
>>   def in(n: Int): Inlet[I @uncheckedVariance] = inArray(n)
>>   val outArray: Array[Outlet[O @uncheckedVariance]] = Array.tabulate(m)(j ⇒ 
>> newOutlet[O](s"out$j"))
>>   def out(m: Int): Outlet[O @uncheckedVariance] = outArray(m)
>> }
>> This code allows creating graph, however, it is not possible to process 
>> messages with it. It doesn't call handlers for messages, they get stuck 
>> somewhere. 
>> Could you please help me to fix it? 
>> PS: I am not an expert in Scala.
>> Thank you in advance!
>> Regards,
>> Sergey

Reply via email to