It's not clear to me, what are you trying to accomplish. It looks like you
are trying to implement AmorphousShape (ie. arbitrary number of open inlets
and outlets) on your own, and then a specific variant of it, that has all
inlets sharing the same type, and all outlets sharing another type. The
"Fan" fragment in the names you used is a bit misleading, since in Akka
Stream's own usage of it names like FanIn / FanOut shape mean that such
grap has many inlets and single outlet / single inlet many outlets. The
analogy is to a Chinese-style hand held fan, rather than ceiling fan with
many blades :) I am wondering what use case you have in mind for your
AmorphousShape because the graphs that can be materialized and executed
must ultimately have a ClosedShape. You could use such multi-outlet graphs
for reusing pieces of functionality, but anything more complex than a
BidiShape seems rather unwieldy to me.
My understanding is that Graph's shape should not interfere with message
flow, because it's just a canvas with contact points on the perimeter. What
matters are the components that you plug into it. Akka just makes sure that
you don't leave any of the contact points dangling. This makes me think
that the problems with messages getting "stuck" was caused somewhere other
than graph shape construction site.
Have you tried inserting probes alon the lines of Flow.alsoTo(Sink.foreach(_
=> println("beep!"))) (shooting from the hip here, apologies if it does not
compile straight away) into your graph? That could help you locate where
the messages are stuck / discarded.
Cheers,
Rafał
W dniu poniedziałek, 17 października 2016 20:22:43 UTC+2 użytkownik Sergey
Sopin napisał:
>
> Hi,
>
> I am trying to create my own akka streams shape with several Inlets and
> Outlets. I have written following code:
>
> package kernel.modeller.workers.streamFinder.generic
>
> import akka.stream.{Shape, Outlet, Inlet}
> import scala.annotation.unchecked.uncheckedVariance
> import scala.collection.immutable
>
> object FanShape {
> sealed trait Init[_] {
> def inlets: immutable.Seq[Inlet[_]]
> def outlets: immutable.Seq[Outlet[_]]
> def name: String
> }
> final case class Name[_](override val name: String) extends Init[Any] {
> override def inlets: immutable.Seq[Inlet[_]] = Nil
> override def outlets: immutable.Seq[Outlet[_]] = Nil
> }
> final case class Ports[_](override val inlets: immutable.Seq[Inlet[_]],
> override val outlets: immutable.Seq[Outlet[_]]) extends Init[Any] {
> override def name: String = "FanShape"
> }
> }
>
> abstract class FanShape[_] private (_in: Iterator[Inlet[_]], _out:
> Iterator[Outlet[_]], _name: String) extends Shape {
>
> import FanShape._
>
> def this(init: FanShape.Init[_]) = this(init.inlets.iterator,
> init.outlets.iterator, init.name)
>
> final override def outlets: immutable.Seq[Outlet[_]] = _outlets
> final override def inlets: immutable.Seq[Inlet[_]] = _inlets
>
> private var _outlets: Vector[Outlet[_]] = Vector.empty
> private var _inlets: Vector[Inlet[_]] = Vector.empty
>
> protected def newOutlet[T](name: String): Outlet[T] = {
> val p = if (_out.hasNext) _out.next().asInstanceOf[Outlet[T]] else
> Outlet[T](s"${_name}.$name")
> _outlets :+= p
> p
> }
>
> protected def newInlet[T](name: String): Inlet[T] = {
> val p = if (_in.hasNext) _in.next().asInstanceOf[Inlet[T]] else
> Inlet[T](s"${_name}.$name")
> _inlets :+= p
> p
> }
>
> protected def construct(init: Init[_]): FanShape[_]
>
> def deepCopy(): FanShape[_] = construct(Ports(inlets.map(_.carbonCopy()),
> outlets.map(_.carbonCopy())))
> final def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets:
> immutable.Seq[Outlet[_]]): FanShape[_] = {
> require(outlets.size == _outlets.size, s"proposed outlets
> [${outlets.mkString(", ")}] do not fit FanShape")
> require(inlets.size == _inlets.size, s"proposed inlects
> [${inlets.mkString(", ")}] do not fit FanShape")
> construct(Ports(inlets, outlets))
> }
> }
>
> object UniformFanShape {
> def apply[I, O](inlets: Array[Inlet[I]], outlets: Outlet[O]*):
> UniformFanShape[I, O] =
> new UniformFanShape(inlets.size, outlets.size,
> FanShape.Ports(inlets.toList, outlets.toList))
> }
>
> class UniformFanShape[-I, +O](n: Int, m: Int, _init: FanShape.Init[_])
> extends FanShape(_init) {
> def this(n: Int, m: Int) = this (n, m, FanShape.Name("UniformFan"))
> def this(n: Int, m: Int, name: String) = this(n, m, FanShape.Name(name))
> def this(inlets: Array[Inlet[I]], outlets: Array[Outlet[O]]) =
> this(inlets.size, outlets.size, FanShape.Ports(inlets.toList, outlets.toList))
> override protected def construct(init: FanShape.Init[_]): FanShape[_] = new
> UniformFanShape(n, m, init)
> override def deepCopy(): UniformFanShape[I, O] =
> super.deepCopy().asInstanceOf[UniformFanShape[I, O]]
>
> val inArray: Array[Inlet[I @uncheckedVariance]] = Array.tabulate(n)(i ⇒
> newInlet[I](s"in$i"))
> def in(n: Int): Inlet[I @uncheckedVariance] = inArray(n)
>
> val outArray: Array[Outlet[O @uncheckedVariance]] = Array.tabulate(m)(j ⇒
> newOutlet[O](s"out$j"))
> def out(m: Int): Outlet[O @uncheckedVariance] = outArray(m)
> }
>
>
> This code allows creating graph, however, it is not possible to process
> messages with it. It doesn't call handlers for messages, they get stuck
> somewhere.
> Could you please help me to fix it?
>
> PS: I am not an expert in Scala.
>
> Thank you in advance!
>
> Regards,
> Sergey
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.