David-
What if we pivot the problem slightly? Instead of framing token refresh as
a reaction to failure, what if we consider it a normal, adjacent flow?
In other words, assume there's a source that provides a continuous stream
of Access Tokens. Within the expiration window of a given Access Token, the
source will always return the same token, as many times as needed. Once the
Access Token expires, or shortly before, a refresh request will be
initiated and the newly minted Access Token seamlessly emitted from the
source.
Accessing a protected resource would then be a simple matter of adding the
current Access Token to any outgoing request. The two concerns, generating
Access Tokens and retrieving protected resources, are then duly separated.
Failure to retrieve an Access Token should halt request processing, but
failure to retrieve a protected resource should never trigger a token
refresh (you can assume you have a valid token).
Following is some semi-code that demonstrates what I'm talking about.
Note that the function signatures are missing an implicit here and there.
I'll try to get this into the form of a working Gist if I can get the time,
but I wanted to get this out there to give you the idea.
Side note, I would reserve supervision for genuine error conditions, not as
a means of executing a standard flow. Doing otherwise is akin to using
exceptions in Java land to govern normal business flow.
Let me know if you have questions.
Lance
abstract class Token(prefix: String, sequence: Long) {
def code: String = prefix + sequence
}
case class RefreshToken(sequence: Long) extends Token("R", sequence) {
def next = this.copy(sequence = sequence + 1)
}
case class AccessToken private (sequence: Long, expiresIn: Duration)
extends Token("A", sequence) {
def this(token: RefreshToken, expiresIn: Duration) = this(token.sequence,
expiresIn)
}
case class RefreshResponse(access: AccessToken, refresh:
Option[RefreshToken])
// The basic OAuth refresh token request/response flow (section 6 of the
OAuth 2.0 spec)
// This flow is the fundamental source of access and refresh tokens
// and would be a real HTTP flow in a working example
def refreshRequest(endpoint: URL, clientId: String, clientSecret: String):
Flow[RefreshToken, Future[RefreshResponse], Unit] = {
// val expiresIn = 10.minutes
// This would be the real flow that makes the request to the OAuth
refresh endpoint
// using Http().singleRequest(...), for example
Flow[RefreshToken].map(refresh => Future.successful(RefreshResponse(new
AccessToken(refresh, 10.minutes), Some(refresh.next))))
}
// Executes a rolling series of refresh requests with appropriate delay to
serve as the
// basis for a continuous stream of Access Tokens
// The pair of futures emitted represent the current and next Access
Tokens, respectively
// The current Access Token should be used until the future for the next
Access Token completes
def refreshFlow(initial: RefreshToken, request: Flow[RefreshToken,
Future[RefreshResponse], Unit]): Source[(Future[AccessToken],
Future[AccessToken])] = {
Source(
Source(initial, Promise[AccessToken]),
request,
Merge[(RefreshToken, Promise[AccessToken])](2),
Unzip[RefreshToken, Promise[AccessToken]],
Zip[RefreshResponse, Promise[AccessToken]],
Broadcast(2))((mat, _, _, _, _, _) => mat)
{
implicit b => (initial, request, merge, unzip, zip, bcast) =>
// Complete current promise and create next promise
val promise = b.add(Flow[(Future[RefreshResponse],
Promise[AccessToken])].map {
case (response, promise) => {
promise.completeWith(response.map(_.access))
(response, Promise[AccessToken])
}
})
// Feeds back the refresh token after delay to initiate the next access
token refresh
// Uses the Akka after pattern to schedule the Future about 30 seconds
prior to expiration
val feedback = b.add(Flow[(Future[RefreshResponse],
Promise[AccessToken])].map {
case (response, promise) => {
Source(response).collect {
case RefreshResponse(access, Some(refresh)) => {
val delay = access.expiresIn.minus(30.seconds)
val future = after(delay, scheduler)(() =>
Future.successful(refresh, promise))
Source(future)
}
}.flatten(FlattenStrategy.concat)
}
})
// Output the current/next token future pair
val output = b.add(Flow[(Future[RefreshResponse],
Promise[AccessToken])].map {
case (response, promise) => (response.map(_.access), promise.future)
})
// The rolling request flow, initiated with the initial refresh token
initial ~> merge ~> unzip.in
unzip.left ~> request ~> zip.in0
unzip.right ~> zip.in1
zip.out ~> promise ~>
bcast ~> feedback
merge
<~ feedback
bcast ~> output
output.outlet
}
}
// Converts a source of (current, next) Access Token future pairs into
// a continuous stream of Access Tokens, switching to the next token, when
available
def accessTokenSource(source: Source[(Future[AccessToken],
Future[AccessToken])]): Source[AccessToken, Unit] = {
source.map {
case (current, next) => Source(current).map(token =>
Source.repeat(token).takeWhile(!next.isCompleted))
}.flatten(FlattenStrategy.concat)
}
// Decorate requests with the current access token
def protected(tokens: Source[AccessToken]): Flow[HttpRequest, HttpRequest]
= {
Flow[HttpRequest].map(_.withHeaders(new
OAuth2BearerToken(AccessToken.code)))
}
On Wednesday, August 5, 2015 at 9:46:05 AM UTC-4, David Pinn wrote:
>
> I have an application that pulls data from an external system using HTTP
> GET requests, the headers of which include an OAuth 2.0 access token. I'm
> trying to work out what to do when access tokens expire, as they do from
> time to time.
>
> I suppose I could let the stream complete with failure, and the class that
> created the stream could detect that the failure was due to token expiry,
> and it could do the OAuth 2.0 refresh token dance to get a new access
> token, and then re-create the stream. It feels wrong, though, because what
> about all the nice little elements in transit at the time of the failure?
> They'll just fall on the floor and die. Sad.
>
> And then I read about supervision strategies, and I'm thinking Yeah!
> that's right, I only need to restart the stage, and all the nice little
> elements can just wait while I do the refresh dance. But then I don't like
> the idea of having the decider function - the function that chooses the
> Supervision.Directive, have side effects like OAuth 2.0 refresh dances and
> missile launches and stuff. So then I read about Custom Stream Processing:
> PushPullStage, and PushStage, and in the Akka Streams code I found a thingy
> called AsyncStage - maybe I could write a custom stage that extends that.
>
> I feel like I'm in a room with eight closed doors, and behind one is a
> land flowing with milk and honey, or nice little elements at least, and
> behind all of the others is ritual humiliation.
>
> Please help me choose. Where do I put the OAuth 2.0 refresh token dance?
>
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.