Hey folex. I was trying to make use of sink-to-source as well, you might 
need to do the following:

val in = Flow[Int].to(Sink(ActorSubscriber[Int](pubsubRef)))
val out = Source(ActorPublisher[Int](pubsubRef))

Flow.wrap(in, out)(Keep.none)


Enter code here...

Check out this useful 
peace: 
https://github.com/jrudolph/akka-http-scala-js-websocket-chat/blob/master/backend/src/main/scala/example/akkawschat/Chat.scala

I don't know your use case, but since you're doing a single PubSub, I think 
you need to create a demand. Otherwise you probably might end up trying to 
stream from an empty sequence. I.e: something like:

case Request(cnt) =>

  if (events.nonEmpty)
    events.take(cnt.toInt).foreach(onNext)
  else
    request(cnt)



On Thursday, February 19, 2015 at 2:02:53 PM UTC+6, folex wrote:
>
> I should add that original (and more complex) example doesn't throw an 
> "Unconnected" exception though it's isomorphic to the one in OP. It just 
> doesn't work without any complains: just nothing ever sent to 
> subscriber+publisher actor.
> I'll try to provide standalone example though I'm not sure if I succeed to 
> reproduce this. 
> Or maybe such a behaviour is also known and you don't need an example of 
> it?
>
> On Thursday, February 19, 2015 at 10:07:18 AM UTC+3, drewhk wrote:
>>
>> Hi, 
>>
>>
>>
>> On Wed, Feb 18, 2015 at 6:27 PM, folex <[email protected]> wrote:
>>
>>> class ActorPubSub extends ActorSubscriber with ActorPublisher[Int] {
>>>   var events = Seq.empty[Int]
>>>
>>>   override protected def requestStrategy: RequestStrategy = 
>>> OneByOneRequestStrategy
>>>   override def receive: Actor.Receive = {
>>>     case OnNext(e: Int) => events = e +: events
>>>     case Request(cnt) => events.take(cnt.toInt).foreach(onNext)
>>>   }
>>> }
>>>
>>> val pubsubRef = system.actorOf(Props(new ActorPubSub))
>>> val pub = ActorPublisher[Int](pubsubRef)
>>> val sub = ActorSubscriber[Int](pubsubRef)
>>> val pubsubFlow = Flow(Sink(sub), Source(pub))
>>>
>>> FlowGraph { implicit b =>
>>>   import akka.stream.scaladsl.FlowGraphImplicits._
>>>
>>>   Source((1 to 10).toList) ~> pubsubFlow ~> Sink.foreach[Int](e =>
>>>     println("Got a number " + e)
>>>   )
>>> }.run()
>>>
>>>
>>>
>>> According to Flow.apply(Sink, Source) doc: 
>>>
>>> Create a Flow from a seemingly disconnected Source and Sink pair.
>>>
>>>
>>> If that's true, why graph remains unconnected? 
>>>
>>
>> This is a known issue and will be fully fixed in M4. The underlying 
>> problem is that the underlying graph representation does not treat it as a 
>> flow but as a Sink/Source, since it flattens everything that contains a 
>> graph inside. 
>>
>> -Endre
>>
>>  
>>
>>> -- 
>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>> >>>>>>>>>> Check the FAQ: 
>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>> >>>>>>>>>> Search the archives: 
>>> https://groups.google.com/group/akka-user
>>> --- 
>>> You received this message because you are subscribed to the Google 
>>> Groups "Akka User List" group.
>>> To unsubscribe from this group and stop receiving emails from it, send 
>>> an email to [email protected].
>>> To post to this group, send email to [email protected].
>>> Visit this group at http://groups.google.com/group/akka-user.
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to