Hi Jan,
I have implemented one for DynamoDb. I have used a PublisherActor that use
an internal buffer, so it fetches x elements for each batch. And
publish them one by one, I am not sure if it a great example, but maybe you
can work form there. It is not OOS but I can give you a small snippet.


class Streamer(req: QueryRequest) extends ActorPublisher[Item] with
ActorLogging {

  implicit val ec = context.dispatcher

  var completed: Boolean = false
  var nextIndex: Option[NativeKey] = None
  var cache: Iterable[Item] = Vector.empty

  def request = req.clone()

  def receive = {

    case Request(n) =>
      cache = deliver(cache)

    case Cancel =>
      onCompleteThenStop()
  }

  @tailrec
  private def deliver(xs: Iterable[Item]): Iterable[Item] =
    if (totalDemand <= 0) {
      xs
    } else if (xs.isEmpty && completed) {
      onCompleteThenStop()
      xs
    } else if (xs.isEmpty) {
      val effectiveRequest = nextIndex.foldLeft(request)(_
withExclusiveStartKey _)
      val queryResult = amazonDynamoDBClient.query(effectiveRequest)
      nextIndex = Option(queryResult.getLastEvaluatedKey).flatMap(m =>
if (m.isEmpty) None else Some(m))
      completed = nextIndex.isEmpty

      deliver(queryResult.getItems.map(toItem))
    } else {

      val split = Math.min(totalDemand, Int.MaxValue.toLong).toInt
      val (use, keep) = xs.splitAt(split)

      use.foreach(onNext)

      deliver(keep)
    }

}




On 14 August 2015 at 16:49, algermissen1971 <[email protected]> wrote:

> Thanks, Roland. That is exactly the kind of input I needed to proceed.
>
> Jan
>
>
> On 14 Aug 2015, at 12:21, Akka Team <[email protected]> wrote:
>
> > Hi Jan,
> >
> > stream-compatible data sources are starting to appear, see for example
> Slick 3.0 with its DBIO model that can either return a Future[] or a
> Publisher[] of the query result. The former would be incorporated using
> mapAsync and the latter can be wrapped in a Source and (with some care,
> usually) integrated into a stream using `.flatten(FlattenStrategy.concat)`.
> Rate limiting is described in the documentation (see the Cookbook), for
> retries you’ll have to construct a feedback cycle in the stream graph (e.g.
> like in the HTTP client; it is a bit involved but we might later condense
> this pattern into a readily reusable flow element).
> >
> > When it comes to streaming data sinks there also is movement e.g. in the
> form of reactive Kafka drivers, we should see Cassandra and Spark as well
> soon, but this movement has just begun.
> >
> > I am not aware of an FTP implementation for Akka Streams yet, maybe
> someone has done that and would speak up?
> >
> > Regards,
> >
> > Roland
> >
> >
> > On Wed, Aug 12, 2015 at 12:23 PM, algermissen1971 <
> [email protected]> wrote:
> > Hi all,
> >
> > I am looking for an akka-streams example that shows how one would
> approach talking to databases (or an FTP server in my case).
> >
> > Besides the basic question of how to create a Source or Sink on top of a
> database driver (or SFTP connection for that matter) I am looking for best
> practices on rate limiting and retries, or re-establishing a connection if
> it gets lost.
> >
> > Can somone point me to an example to get started?
> >
> > Jan
> >
> >
> > --
> >>>>>>>>>>>     Read the docs: http://akka.io/docs/
> >>>>>>>>>>>     Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>>>     Search the archives:
> https://groups.google.com/group/akka-user
> > ---
> > You received this message because you are subscribed to the Google
> Groups "Akka User List" group.
> > To unsubscribe from this group and stop receiving emails from it, send
> an email to [email protected].
> > To post to this group, send email to [email protected].
> > Visit this group at http://groups.google.com/group/akka-user.
> > For more options, visit https://groups.google.com/d/optout.
> >
> >
> >
> > --
> > Akka Team
> > Typesafe - Reactive apps on the JVM
> > Blog: letitcrash.com
> > Twitter: @akkateam
> >
> > --
> >>>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> > ---
> > You received this message because you are subscribed to the Google
> Groups "Akka User List" group.
> > To unsubscribe from this group and stop receiving emails from it, send
> an email to [email protected].
> > To post to this group, send email to [email protected].
> > Visit this group at http://groups.google.com/group/akka-user.
> > For more options, visit https://groups.google.com/d/optout.
>
> --
> >>>>>>>>>>      Read the docs: http://akka.io/docs/
> >>>>>>>>>>      Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>>      Search the archives:
> https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

[image: --]
Filippo De Luca
[image: http://]about.me/FilippoDeLuca
<http://about.me/FilippoDeLuca?promo=email_sig>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to