I have processing pipeline whereby a Duct terminates in a Future[T] type.
I’d like to know the recommended approach for asynchronously merging the
results from a number of Future[T]s, allowing the results to be passed
downstream as they are available. To demonstrate what I want and how I’ve
been attempting to solve this problem, here’s some test code:
import akka.actor.ActorSystemimport akka.stream.scaladsl._import
akka.stream.{FlowMaterializer, MaterializerSettings}import akka.testkit._import
org.scalatest.FunSpecLike
import scala.concurrent.Futureimport scala.concurrent.duration._class
FutureMergeTest extends TestKit(ActorSystem()) with FunSpecLike with
ImplicitSender {
val materializer = FlowMaterializer(MaterializerSettings())
// Function that artificially inserts a delay in attempt to reorder results.
val maybeDelay = (i: Int) ⇒ { if (i % 2 == 0) Thread.sleep(2000); i }
implicit val ec = system.dispatcher
describe("Flow producing futures") {
it("should be able to feed futures to Flow after materialization") {
val futureProducingDuct = Duct[Int]
.map { i ⇒ Future {maybeDelay(i); i}}
val futureResolvingDuct = Duct[Future[Int]]
.mapFuture(identity)
.map { i ⇒ println("Post future result: " + i); i}
val summationDuct = Duct[Int]
.fold(Seq.empty[Int])(_ :+ _)
val sendSelfResultDuct = Duct[Seq[Int]]
.foreach(total ⇒ testActor ! total)
val numRange = 1 to 10
val mainFlow = Flow(numRange.toIterator)
.append(futureProducingDuct)
.append(futureResolvingDuct)
.append(summationDuct)
.append(sendSelfResultDuct)
note("materializing flow")
mainFlow.consume(materializer)
note("awaiting answer")
val results = expectMsgType[Seq[Int]](30.seconds)
assert(results.size == numRange.size)
assert(results.sum == numRange.sum)
// Asserts that the ordering is not the same....
// Assuming delay inserted in some arbitrary futures should allow
others to come through
// as they are available
assert(results != numRange)
note("ensuring single result")
expectNoMsg(30.seconds)
}
}
}
The assumption here is that by inserting a delay for every input item that
on the output side the ordering should differ from the input. If you run
this, that is not the case:
Post future result: 1
[[output pauses here....]]
Post future result: 2
Post future result: 3
Post future result: 4
Post future result: 5
Post future result: 6
Post future result: 7
Post future result: 8
Post future result: 9
13:55:34 INFO [default-akka.actor.default-dispatcher-5] a.a.RepointableActorRef
- Message [akka.stream.impl.RequestMore] from Actor[akka://default/deadLetters]
to Actor[akka://default/user/flow-1-1-map#477915073] was not delivered. [1]
dead letters encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
Post future result: 10
13:55:36 INFO [default-akka.actor.default-dispatcher-2] a.a.RepointableActorRef
- Message [akka.stream.impl.RequestMore] from Actor[akka://default/deadLetters]
to Actor[akka://default/user/flow-1-2-mapFuture#-1579107088] was not delivered.
[2] dead letters encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
13:55:36 INFO [default-akka.actor.default-dispatcher-2] a.a.RepointableActorRef
- Message [akka.stream.impl.RequestMore] from Actor[akka://default/deadLetters]
to Actor[akka://default/user/flow-1-3-map#-1301380585] was not delivered. [3]
dead letters encountered. This logging can be turned off or adjusted with
configuration settings 'akka.log-dead-letters' and
'akka.log-dead-letters-during-shutdown'.
List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) equaled Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
org.scalatest.exceptions.TestFailedException: List(1, 2, 3, 4, 5, 6, 7, 8, 9,
10) equaled Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
at
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:498)
...
Any recommendations on the proper way of doing this? Am I improperly using
the mapFuture here? Or should I be instead writing custom ActorConsumer and
sending it messages outside of the streams API? Or should I be using
Future.onSuccess callbacks in some way?
Thanks,
Simeon
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.