Hi, 

I'm using reactive-kafka (0.12)  to consume in batch style kafka topics. 
Now I'm searching a way to restart the stream in case of any error.

Reading the doc, recoverWithRetries is what I need but I have the following 
error at compile time : 

[error]  found   : akka.stream.scaladsl.Source[Iterable[String],akka.kafka.
scaladsl.Consumer.Control]
[error]  required: akka.stream.Graph[akka.stream.SourceShape[?],akka.NotUsed
]
[error]       case _ : Error => kafkaProcessingSource


Below is how I define my processing of kafka source : 

def runStream : Unit = {


      /**
        * Get all the topics last committed offsets
        */
      val topicOffsets = this.getOffsets


      system.log.info("Akka Kafka stream started on topics {} with group id 
{} with offsets {}", topics.mkString("---"), this.groupId, topicOffsets)


      /**
        * Create our kafka source
        */
      val kafkaSource =
        Consumer
          .committableSource(consumerSettings, Subscriptions.
assignmentWithOffset(topicOffsets))


      val kafkaProcessingSource =
        kafkaSource
          .groupedWithin(maxBoundMessages, maxWaitMessagesMillis.millis)
          .mapAsync(1) { batch =>
            /**
                * First step, persist device data in the cache. This return 
a Future[Iterable[String]]
                */
              this
                .fromKafkaSourceToPersistentCache(batch)
                .map { deviceIds =>


                  /**
                    * Get the last batch element in order to commit his 
offset
                    */
                  (batch.last, deviceIds)
                }
          }.mapAsync(1) { case (offsetToCommit, deviceIds) =>


          system.log.debug("Offset to commit : {}", offsetToCommit.record.
offset())


            /**
              * Second step, commit and wait in async way
              * for completion before to search jobs
              */
            offsetToCommit
              .committableOffset
              .commitScaladsl()
              .map(_ => deviceIds)
        }


    kafkaProcessingSource.recoverWithRetries(attempts = 1, {
      case _ : Error => kafkaProcessingSource
    })


          kafkaProcessingSource
            .runForeach { deviceIds =>
          /**
            * Last step search jobs
            */
          this.scheduleJobsToSearch(deviceIds)
        }
}

I see this topic about a similar question : 
https://stackoverflow.com/questions/42017184/reactive-kafka-how-to-pause-the-consumer-on-exception-and-retry-on-demand

How I can fix my compilation error and allowing to restart my source ?
<https://stackoverflow.com/questions/42017184/reactive-kafka-how-to-pause-the-consumer-on-exception-and-retry-on-demand>

Thanks,
Alifirat. 

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to