David-

What if we pivot the problem slightly? Instead of framing token refresh as 
a reaction to failure, what if we consider it a normal, adjacent flow?

In other words, assume there's a source that provides a continuous stream 
of Access Tokens. Within the expiration window of a given Access Token, the 
source will always return the same token, as many times as needed. Once the 
Access Token expires, or shortly before, a refresh request will be 
initiated and the newly minted Access Token seamlessly emitted from the 
source.

Accessing a protected resource would then be a simple matter of adding the 
current Access Token to any outgoing request. The two concerns, generating 
Access Tokens and retrieving protected resources, are then duly separated. 
Failure to retrieve an Access Token should halt request processing, but 
failure to retrieve a protected resource should never trigger a token 
refresh (you can assume you have a valid token).

Following is some semi-code that demonstrates what I'm talking about. 
Note that the function signatures are missing an implicit here and there. 
I'll try to get this into the form of a working Gist if I can get the time, 
but I wanted to get this out there to give you the idea.

Side note, I would reserve supervision for genuine error conditions, not as 
a means of executing a standard flow. Doing otherwise is akin to using 
exceptions in Java land to govern normal business flow.

Let me know if you have questions.

Lance

abstract class Token(prefix: String, sequence: Long) {
  def code: String = prefix + sequence
}
case class RefreshToken(sequence: Long) extends Token("R", sequence) {
  def next = this.copy(sequence = sequence + 1)
}
case class AccessToken private (sequence: Long, expiresIn: Duration) 
extends Token("A", sequence) {
  def this(token: RefreshToken, expiresIn: Duration) = this(token.sequence, 
expiresIn)
}

case class RefreshResponse(access: AccessToken, refresh: 
Option[RefreshToken])


// The basic OAuth refresh token request/response flow (section 6 of the 
OAuth 2.0 spec)
// This flow is the fundamental source of access and refresh tokens
// and would be a real HTTP flow in a working example
def refreshRequest(endpoint: URL, clientId: String, clientSecret: String): 
Flow[RefreshToken, Future[RefreshResponse], Unit] = {
  // val expiresIn = 10.minutes
  // This would be the real flow that makes the request to the OAuth 
refresh endpoint
  // using Http().singleRequest(...), for example
  Flow[RefreshToken].map(refresh => Future.successful(RefreshResponse(new 
AccessToken(refresh, 10.minutes), Some(refresh.next))))
}

// Executes a rolling series of refresh requests with appropriate delay to 
serve as the
// basis for a continuous stream of Access Tokens
// The pair of futures emitted represent the current and next Access 
Tokens, respectively
// The current Access Token should be used until the future for the next 
Access Token completes
def refreshFlow(initial: RefreshToken, request: Flow[RefreshToken, 
Future[RefreshResponse], Unit]): Source[(Future[AccessToken], 
Future[AccessToken])] = {
  Source(
    Source(initial, Promise[AccessToken]), 
    request, 
    Merge[(RefreshToken, Promise[AccessToken])](2), 
    Unzip[RefreshToken, Promise[AccessToken]],
    Zip[RefreshResponse, Promise[AccessToken]],
    Broadcast(2))((mat, _, _, _, _, _) => mat)
  {
    implicit b => (initial, request, merge, unzip, zip, bcast) =>

    // Complete current promise and create next promise
    val promise = b.add(Flow[(Future[RefreshResponse], 
Promise[AccessToken])].map {
      case (response, promise) => {
        promise.completeWith(response.map(_.access))
        (response, Promise[AccessToken])
      }
    })

    // Feeds back the refresh token after delay to initiate the next access 
token refresh
    // Uses the Akka after pattern to schedule the Future about 30 seconds 
prior to expiration
    val feedback = b.add(Flow[(Future[RefreshResponse], 
Promise[AccessToken])].map {
      case (response, promise) => {
        Source(response).collect { 
          case RefreshResponse(access, Some(refresh)) => {
            val delay = access.expiresIn.minus(30.seconds)
            val future = after(delay, scheduler)(() => 
Future.successful(refresh, promise))
            Source(future)
          }
        }.flatten(FlattenStrategy.concat)
      }
    })

    // Output the current/next token future pair
    val output = b.add(Flow[(Future[RefreshResponse], 
Promise[AccessToken])].map {
      case (response, promise) => (response.map(_.access), promise.future)
    })

    // The rolling request flow, initiated with the initial refresh token
    initial ~> merge ~> unzip.in
                        unzip.left ~> request ~> zip.in0 
                        unzip.right           ~> zip.in1
                                                 zip.out ~> promise ~> 
bcast ~> feedback
               merge                                                       
  <~ feedback
                                                                      
 bcast ~> output

    output.outlet
  }
}

// Converts a source of (current, next) Access Token future pairs into
// a continuous stream of Access Tokens, switching to the next token, when 
available
def accessTokenSource(source: Source[(Future[AccessToken], 
Future[AccessToken])]): Source[AccessToken, Unit] = {
  source.map {
    case (current, next) => Source(current).map(token => 
Source.repeat(token).takeWhile(!next.isCompleted))
  }.flatten(FlattenStrategy.concat)
}


// Decorate requests with the current access token
def protected(tokens: Source[AccessToken]): Flow[HttpRequest, HttpRequest] 
= {
  Flow[HttpRequest].map(_.withHeaders(new 
OAuth2BearerToken(AccessToken.code)))
}




On Wednesday, August 5, 2015 at 9:46:05 AM UTC-4, David Pinn wrote:
>
> I have an application that pulls data from an external system using HTTP 
> GET requests, the headers of which include an OAuth 2.0 access token. I'm 
> trying to work out what to do when access tokens expire, as they do from 
> time to time.
>
> I suppose I could let the stream complete with failure, and the class that 
> created the stream could detect that the failure was due to token expiry, 
> and it could do the OAuth 2.0 refresh token dance to get a new access 
> token, and then re-create the stream. It feels wrong, though, because what 
> about all the nice little elements in transit at the time of the failure? 
> They'll just fall on the floor and die. Sad.
>
> And then I read about supervision strategies, and I'm thinking Yeah! 
> that's right, I only need to restart the stage, and all the nice little 
> elements can just wait while I do the refresh dance. But then I don't like 
> the idea of having the decider function - the function that chooses the 
> Supervision.Directive, have side effects like OAuth 2.0 refresh dances and 
> missile launches and stuff. So then I read about Custom Stream Processing: 
> PushPullStage, and PushStage, and in the Akka Streams code I found a thingy 
> called AsyncStage - maybe I could write a custom stage that extends that.
>
> I feel like I'm in a room with eight closed doors, and behind one is a 
> land flowing with milk and honey, or nice little elements at least, and 
> behind all of the others is ritual humiliation. 
>
> Please help me choose. Where do I put the OAuth 2.0 refresh token dance?
>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to