Hello All,
We are using Akka Streams to process 400,000 xml documents, run it through
series of transformations and then save it to a database. We are using
basic transformation and here is how our stream code looks,
Source(Set(allDocumentUris))
.map(uri => getDocumentFromNetwork(uri))
.map(doc => transformation1(doc))
.map(doc => saveToDatabase(doc))
.runWith(Sink(transformationStatusActorSubscriber))
We wanted to keep track of all successful and failed documents and be able
to print summary at the end of transformation. We decided to use
ActorSubscriber at end of stream because it allows us to keep data and keep
updating it without worrying about thread-safety. Here is how our actor
looks:
class SomeActor extends ActorSubscriber {
private var successful = Set.empty[String]
private var failed = Set.empty[String]
private var numberOfDocumentsToProcess = 0
private var complete = false
context.system.scheduler.schedule(1.minute, 1.minute, self, EchoProgress)
override protected def requestStrategy: RequestStrategy =
WatermarkRequestStrategy(highWatermark = 10)
override def receive: Receive = {
case EchoProgress =>
echoProgress()
case Count(size) =>
numberOfDocumentsToProcess = size
case OnNext(element: (String, Future[Unit])) =>
element._2 onComplete {
case Success(_) =>
successful = successful + element._1
processComplete()
case Failure(error) =>
failed = failed + TransformationFailure(element._1, error)
processComplete()
}
case OnError(error) =>
context.stop(self)
case OnComplete =>
complete = true
}
private def isStreamComplete = {
val totalDocumentsProcessedSoFar = successful.size + failed.size
complete && (numberOfDocumentsToProcess == totalDocumentsProcessedSoFar)
}
private def processComplete() {
if (isStreamComplete) {
echoSummary()
context.stop(self)
}
}
private def echoProgress() ...
private def echoSummary() ...
}
Problems that we are facing
----------------------------
1. Sequence of messages?
We are not sure about the sequence of events that actor receives. Is it
possible that Actor will receive OnComplete first but then some OnNext
messages are still in queue?
2. When to stop Actor?
What is the correct way to stop Actor? Right now we are stopping Actor
in OnError event and OnNext event (for OnNext - we check if we have
processed all documents and also have already received OnComplete event).
If we stop Actor in OnError and OnComplete event will it work?
3. OutOfMemory issues?
We ran stream with 2 GB memory but we faced OutOfMemory error before
stream completed. Because backpressure is mandatory, we thought that this
will not happen. We increased memory to 4 GB and after that program
executed without OutOfMemory error. Did we miss anything in our
implementation, how can we ensure that we will never get OutOfMemory error
irrespective of memory size available to program?
4. Program dies abruptly
Right now our current code stops at some point before it has processed
all documents. From our observation we think that it stops after OnComplete
message is received, but in OnComplete event we are not stopping the Actor.
We are not sure how to debug/fix this behavior.
Appreciate any help/suggestions on this.
-Regards
Ajay
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.