Hello All,

We are using Akka Streams to process 400,000 xml documents, run it through 
series of transformations and then save it to a database. We are using 
basic transformation and here is how our stream code looks,

Source(Set(allDocumentUris))
      .map(uri => getDocumentFromNetwork(uri))
      .map(doc => transformation1(doc))
      .map(doc => saveToDatabase(doc))
      .runWith(Sink(transformationStatusActorSubscriber))

We wanted to keep track of all successful and failed documents and be able 
to print summary at the end of transformation. We decided to use 
ActorSubscriber at end of stream because it allows us to keep data and keep 
updating it without worrying about thread-safety. Here is how our actor 
looks:

class SomeActor extends ActorSubscriber {

  private var successful = Set.empty[String]
  private var failed = Set.empty[String]

  private var numberOfDocumentsToProcess = 0
  private var complete = false

  context.system.scheduler.schedule(1.minute, 1.minute, self, EchoProgress)

  override protected def requestStrategy: RequestStrategy = 
WatermarkRequestStrategy(highWatermark = 10)

  override def receive: Receive = {
    case EchoProgress =>
      echoProgress()
    case Count(size) =>
      numberOfDocumentsToProcess = size
    case OnNext(element: (String, Future[Unit])) =>
      element._2 onComplete {
        case Success(_) =>
          successful = successful + element._1
          processComplete()
        case Failure(error) =>
          failed = failed + TransformationFailure(element._1, error)
          processComplete()
      }
    case OnError(error) =>
      context.stop(self)
    case OnComplete =>
      complete = true
  }

  private def isStreamComplete = {
    val totalDocumentsProcessedSoFar = successful.size + failed.size
    complete && (numberOfDocumentsToProcess == totalDocumentsProcessedSoFar)
  }

  private def processComplete() {
    if (isStreamComplete) {
      echoSummary()
      context.stop(self)
    }
  }
  
  private def echoProgress() ...
  private def echoSummary() ...
  
}

Problems that we are facing
----------------------------

1. Sequence of messages? 
   We are not sure about the sequence of events that actor receives. Is it 
possible that Actor will receive OnComplete first but then some OnNext 
messages are still in queue?

2. When to stop Actor?
   What is the correct way to stop Actor? Right now we are stopping Actor 
in OnError event and OnNext event (for OnNext - we check if we have 
processed all documents and also have already received OnComplete event). 
If we stop Actor in OnError and OnComplete event will it work?

3. OutOfMemory issues?
   We ran stream with 2 GB memory but we faced OutOfMemory error before 
stream completed. Because backpressure is mandatory, we thought that this 
will not happen. We increased memory to 4 GB and after that program 
executed without OutOfMemory error. Did we miss anything in our 
implementation, how can we ensure that we will never get OutOfMemory error 
irrespective of memory size available to program?

4. Program dies abruptly
   Right now our current code stops at some point before it has processed 
all documents. From our observation we think that it stops after OnComplete 
message is received, but in OnComplete event we are not stopping the Actor. 
We are not sure how to debug/fix this behavior.

Appreciate any help/suggestions on this.

-Regards
Ajay

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to