I am modeling a data processing pipeline where there are several decision
points, where the system may refer to a human being to make a decision.
Since earlier decisions change the state of the system and affect later
decisions, another goal of mine is to process a narrow sliding view of the
data and have go not too much faster than humans (another goal: since these
decisions potentially take time and it may need to sort through several
items that the machine decides it can handle, I want the system to buffer
user-decisions so work requests are ready for them when they ask for them,
and then, naturally, back-pressure when the buffer is full).
In summary, my goal is to be able to merge streams in an unordered fashion.
*Lengthy
pseudo-code* follows for clarification:
val source = Source(docsToHandle).
mapAsync(withCurrentHeadVersion).
groupByAsync(attachDecisionDecider.decide).
map[Source[Either[Terminal, DocForProcessing]]] {
case (AttachDecisionDecider.AlreadyAttached, flow) =>
flow map { doc => Right(doc) }
case (AttachDecisionDecider.ObviousDuplicate, flow) =>
flow.mapAsync { doc =>
(entityState ? EntityState.AttachDoc(doc.headEntityVersion,
doc.path)).mapTo[Option[Int]] map {
case None =>
// conflict! Note: we may want to consider a max-retry
signal that dumps to another table when abandoning; this can be handled by
our requeue logic down the stream
Left(ProcessConflict(doc.path))
case Some(version) =>
Right(doc.copy(headEntityVersion = Some(version))) // Note
- it may actually be best to requery this later, to tighten the gap between
decision and acting on the decision.
}
}
case (AttachDecisionDecider.HumanDecisionNeeded, flow) =>
flow.
map(referToHuman("attach", _)).
buffer(20, OverflowStrategy.backpressure). // enque up to 20
unacked human requests
mapAsyncUnordered(identity). // at least one human has begun
the work (on abandon, future will be abandoned)
mapAsync(identity) // human has completed the work
}.
mergeUnordered.
mapConcat(handleResult).
groupByAsync(pickDecisionDecider.decide).
map[Source[Either[Terminal, DocForProcessing]]] {
case (PickDecisionDecider.AlreadyPicked, flow) =>
flow map { doc => Right(doc) }
case (PickDecisionDecider.AutoMerge, flow) =>
flow mapAsync { doc =>
(entityState ? EntityState.PickDoc(doc.headEntityVersion,
doc.path)).mapTo[Option[Int]] map {
case None =>
Left(ProcessConflict(doc.path))
case Some(version) =>
Right(doc.copy(headEntityVersion = Some(version)))
}
}
case (PickDecisionDecider.HumanDecisionNeeded, flow) =>
flow.
map(referToHuman("pick", _)).
buffer(20, OverflowStrategy.backpressure). // enque up to 20
unacked human requests
mapAsyncUnordered(identity). // at least one human has begun
the work (on abandon, future will be abandoned)
mapAsync(identity) // human has completed the work
}.
mergeUnordered.
mapConcat(handleResult).
foreach { doc => // everything that makes it this far is done;
foreach consumer will produce a constant pull on the stream.
ack(doc.path)
}
It looks like Merge unordered has been implemented for FlowGraphs
<http://doc.akka.io/api/akka-stream-and-http-experimental/1.0-M2/?_ga=1.205107984.1839818538.1375706610#akka.stream.scaladsl.Merge>,
but I don't see a GroupBy junction defined, just Broadcast. So, I'm not
sure if I can access this feature.
It looks like I might be able to define an ActorPublisher and
ActorSubscribe to achieve my goal; IteratorPublisher.scala
<https://github.com/akka/akka/blob/releasing-akka-stream-and-http-experimental-1.0-M2/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala>
is fine enough documentation. So, this is my working plan; but wanted to
check to see if I was missing some undocumented feature first or
overlooking something that has already been done in this area.
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.