Hi,

Reactive Streams API is not intended to be an end user API. It is in fact
very difficult to implement correctly. It should be implemented by
libraries, such as Akka Streams. If you are a library developer and would
like to implement a pure Publisher you should read the RS Specification
carefully and use the TCK to find issues with the implementation.

If you just want to integrate with the Flickr API I think you can use a
Source with the page numbers and then use mapAsync to perform the WS call
and mapConcat to emit the photos one by one.

Cheers,
Patrik

On Sun, Aug 30, 2015 at 7:58 PM, Jeroen Kransen <[email protected]> wrote:

> I made a Source for an Akka Stream based on a ReactiveStreams Publisher
> like this:
>
>     object FlickrSource {
>
>       val apiKey = Play.current.configuration.getString("flickr.apikey")
>       val flickrUserId =
> Play.current.configuration.getString("flickr.userId")
>       val flickrPhotoSearchUrl = s"
> https://api.flickr.com/services/rest/?method=flickr.photos.search&api_key=$apiKey&user_id=$flickrUserId&min_taken_date=%s&max_taken_date=%s&format=json&nojsoncallback=1&page=%s&per_page=500
> "
>
>       def byDate(date: LocalDate): Source[JsValue, Unit] = {
>         Source(new FlickrPhotoSearchPublisher(date))
>       }
>     }
>
>     class FlickrPhotoSearchPublisher(date: LocalDate) extends
> Publisher[JsValue] {
>
>       override def subscribe(subscriber: Subscriber[_ >: JsValue]) {
>         try {
>           val from = new LocalDate()
>           val fromSeconds = from.toDateTimeAtStartOfDay.getMillis
>           val toSeconds = from.plusDays(1).toDateTimeAtStartOfDay.getMillis
>
>           def pageGet(page: Int): Unit = {
>             val url = flickrPhotoSearchUrl format (fromSeconds, toSeconds,
> page)
>             Logger.debug("Flickr search request: " + url)
>             val photosFound = WS.url(url).get().map { response =>
>               val json = response.json
>               val photosThisPage = (json \ "photos" \ "photo").as[JsArray]
>               val numPages = (json \ "photos" \
> "pages").as[JsNumber].value.toInt
>               Logger.debug(s"pages: $numPages")
>               Logger.debug(s"photos this page:
> ${photosThisPage.value.size}")
>               photosThisPage.value.foreach { photo =>
>                 Logger.debug(s"onNext")
>                 subscriber.onNext(photo)
>               }
>
>               if (numPages > page) {
>                 Logger.debug("nextPage")
>                 pageGet(page + 1)
>               } else {
>                 Logger.debug("onComplete")
>                 subscriber.onComplete()
>               }
>             }
>           }
>           pageGet(1)
>         } catch {
>           case ex: Exception => {
>             subscriber.onError(ex)
>           }
>         }
>       }
>     }
>
> It will make a search request to Flickr and source the results as
> *JsValue*s. I tried to wire it to lots of different Flows and Sinks, but
> this would be the most basic setup:
>
>     val source: Source[JsValue, Unit] = FlickrSource.byDate(date)
>     val sink: Sink[JsValue, Future[Unit]] = Sink.foreach(println)
>     val stream = source.toMat(sink)(Keep.right)
>     stream.run()
>
>
> I see that the *onNext *gets called a couple of times, and then the
> *onComplete*. However, the Sink does not receive anything. What am I
> missing, is this not a valid way to create a Source?
>
> --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to [email protected].
> To post to this group, send email to [email protected].
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to