I made a Source for an Akka Stream based on a ReactiveStreams Publisher
like this:
object FlickrSource {
val apiKey = Play.current.configuration.getString("flickr.apikey")
val flickrUserId =
Play.current.configuration.getString("flickr.userId")
val flickrPhotoSearchUrl =
s"https://api.flickr.com/services/rest/?method=flickr.photos.search&api_key=$apiKey&user_id=$flickrUserId&min_taken_date=%s&max_taken_date=%s&format=json&nojsoncallback=1&page=%s&per_page=500"
def byDate(date: LocalDate): Source[JsValue, Unit] = {
Source(new FlickrPhotoSearchPublisher(date))
}
}
class FlickrPhotoSearchPublisher(date: LocalDate) extends
Publisher[JsValue] {
override def subscribe(subscriber: Subscriber[_ >: JsValue]) {
try {
val from = new LocalDate()
val fromSeconds = from.toDateTimeAtStartOfDay.getMillis
val toSeconds = from.plusDays(1).toDateTimeAtStartOfDay.getMillis
def pageGet(page: Int): Unit = {
val url = flickrPhotoSearchUrl format (fromSeconds, toSeconds,
page)
Logger.debug("Flickr search request: " + url)
val photosFound = WS.url(url).get().map { response =>
val json = response.json
val photosThisPage = (json \ "photos" \ "photo").as[JsArray]
val numPages = (json \ "photos" \
"pages").as[JsNumber].value.toInt
Logger.debug(s"pages: $numPages")
Logger.debug(s"photos this page:
${photosThisPage.value.size}")
photosThisPage.value.foreach { photo =>
Logger.debug(s"onNext")
subscriber.onNext(photo)
}
if (numPages > page) {
Logger.debug("nextPage")
pageGet(page + 1)
} else {
Logger.debug("onComplete")
subscriber.onComplete()
}
}
}
pageGet(1)
} catch {
case ex: Exception => {
subscriber.onError(ex)
}
}
}
}
It will make a search request to Flickr and source the results as *JsValue*s.
I tried to wire it to lots of different Flows and Sinks, but this would be
the most basic setup:
val source: Source[JsValue, Unit] = FlickrSource.byDate(date)
val sink: Sink[JsValue, Future[Unit]] = Sink.foreach(println)
val stream = source.toMat(sink)(Keep.right)
stream.run()
I see that the *onNext *gets called a couple of times, and then the
*onComplete*. However, the Sink does not receive anything. What am I
missing, is this not a valid way to create a Source?
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.