Hi Patrick-

Nice to meet you as well.

Can you help me understand what would start with an expired token? If 
possible, this would seem to be a design flaw in the token publisher, not 
the subscriber. The Access Token publisher should *always* be publishing 
valid tokens, if at all possible. There should only be two ways that 
doesn't happen - either the refresh endpoint is supplying bad tokens that 
expire prematurely (shame on them), or we're using a token past its stated 
lifetime (shame on us).

To avoid using tokens past their stated lifetime, which is the case I 
believe you're referring to, I would lean toward improving the design of 
the refresh flow, not forcing the token subscribers to compensate. Nobody 
likes a component that does half the job :)
Here's a couple of possibilities:

   - For the original design that was using pure streams to produce the 
   tokens, adding another takeWhile transformation to the internal flow should 
   do the trick. The condition on the takeWhile will return true until token 
   expiration, thus ensuring that expired tokens are never published. Note 
   that there will be two takeWhile conditionals - one for expiration and one 
   for retiring the current token once the next has arrived.
   - For the Agent-based design, something similar can be done, assuming 
   the Agent holds a Future and *not* the raw Access Token (see my previous 
   post). The following simple logic, which can be implemented via streams, if 
   desired, should do the trick.
      - Given: The Agent holds a currently valid Access Token via a Future 
      that is near expiration
      - Create a new Future[AccessToken] that will complete with the new 
      value for the AccessToken. The future on a Promise is sometimes handy for 
      this.
      - Upon completion, have the completion logic set the new Future on 
      the Agent. So, when the new token comes in, clients of the agent will see 
      it.
      - Upon expiration, set the new Future on the Agent if it hasn't 
      completed. This ensures that expired tokens never get used. Clients of 
the 
      Agent will wait for the Future to complete (for a new token to arrive). 
Of 
      course, if the future has already completed, no need to set it - the 
      completion logic will handle it.
      - Note that the two steps above accomplish the same as the two 
      takeWhile conditionals above
      - Also note that this assumes you're retrieving the latest value from 
      the Agent for each outbound request (not saving Access Tokens). If using 
      streams, this means having a source backed by the current future 
contained 
      in the Agent.
      - You may want to set an appropriate timeout duration for clients of 
      the Future[AccessToken]. A simple way to do this if you're using streams 
      would be Source(future).takeWithin(duration) to avoid stalling forever 
      waiting for a token to arrive.
   
Hope that helps.

Lance

On Friday, August 7, 2015 at 1:00:51 PM UTC-4, Patrick Li wrote:
>
> Hi Lance,
>
> Nice to meet you, I am working with David on this problem, so let me jump 
> in while he is sleeping :)
>
> I think one scenario of having an expired token (even with the constant 
> refresh happening in the background), is if the system goes down for a 
> period of time (either planned or otherwise) long enough for the current 
> access token to expire, then when the system is up again, it will start 
> with an expired token.
>
> Patrick
>
> On Friday, August 7, 2015 at 7:28:49 AM UTC-7, Lance Arlaus wrote:
>>
>> David-
>>
>> Excellent call on using an agent. It's a nice fit for this situation. My 
>> only hesitation would be ensuring that the agent is properly initialized 
>> with a valid token before you start slinging requests. The flow I 
>> originally posted ensures that by way of the Access Token source, but it 
>> can also be easily accomplished by having the agent hold a Future instead 
>> of the token itself (and using that Future as the basis for a Source that 
>> feeds into your request flow). More on this in a sec...
>>
>> As for using an independent Actor, that'll certainly work, though it's 
>> not strictly necessary if you want to stay in Streams land. You can put a 
>> Sink on the refresh flow that updates the agent. More readable and 
>> maintainable, IMHO, but there may be other considerations. Of course, if 
>> you go the flow route, you'll need some way to stop the refresh flow, if 
>> need be, since it'll run ad infinitum. Shouldn't be hard.
>>
>> As for buffering considerations, you're right to highlight it but I 
>> wouldn't worry too much. Buffering is strictly for performance and easy to 
>> turn off. The refresh flow should have no buffering on the flow to ensure a 
>> single outstanding request at any given time i.e. 
>> flow.withAttributes(Attributes.inputBuffer(initial = 1, max = 1))
>>
>> Finally, on the expired tokens, can you please describe under what 
>> specific circumstances you foresee where streams would "sometimes fail"? 
>> Given that tokens will be refreshed, and a new token made available prior 
>> to expiration, how would the expired token scenario manifest? Forgive my 
>> insistence on this point, but failure modes tend to increase, not diminish, 
>> when responsibility diffuses across multiple components, especially in 
>> asynchronous systems.
>>
>> Coming back to distribution of the Access Token via an Agent holding a 
>> Future, this may be a good option for accomplishing two design objectives. 
>> First, eliminating initialization timing considerations, as described 
>> above. Second, handling retry delays in the refresh flow. Any delay in 
>> obtaining a new token will naturally pause consumers without the need for 
>> further coordination. Of course, consumers can dial in their tolerance for 
>> delay by selecting the appropriate timeout (using takeWithin, for example) 
>> and interpret a failed Access Token Future accordingly.
>>
>> Thanks for the interesting design problem you've raised. I'll try to 
>> break some of this down into code if I get the chance.
>>
>> Lance
>>
>> On Thursday, August 6, 2015 at 10:17:11 PM UTC-4, David Pinn wrote:
>>>
>>> Wow, Lance! You have blown my mind. You are so right about treating a 
>>> token refresh as a part of normal program flow, and not an error condition. 
>>> I'm spending today getting this right.
>>>
>>> I have a couple of thoughts on implementation. I'm wondering if Akka 
>>> Agents can help us out here. They're designed to hold a value, potentially 
>>> shared between multiple components, the value of which is accessed 
>>> synchronously, and altered asynchronously and atomically. Sounds perfect 
>>> for holding the OAuth tokens. Rather than weave a source of access tokens 
>>> into each stream that I set up, perhaps I can give each stream a reference 
>>> to an Agent<OAuth2Info>. Whenever the stream executes its HTTP GET request 
>>> against the source system, it grabs the latest value from the agent. An 
>>> independent AuthorizationManager actor could refresh the OAuthInfo in each 
>>> of those agents at the appropriate time. I'm thinking that this might be 
>>> simpler, and would avoid any potential complications caused by buffering 
>>> within the stream.
>>>
>>> It seems to me that streams will nevertheless sometimes fail because of 
>>> expired access tokens. Maybe the system time gets out of sync in such a way 
>>> that the AuthorizationManager is late with its refresh. Or maybe the system 
>>> gets shut down and re-started, and the refresh operations haven't caught up 
>>> yet. Whatever. Maybe the streams of data from external systems should, in 
>>> the case that they get a 401 UNAUTHORIZED from the source, emit an event on 
>>> the global Akka event bus, and the AuthorizationManager should listen for 
>>> those events and fire off a refresh. The data streams should just wait 30 
>>> seconds before retrying, and only stop() if that second attempt fails.
>>>
>>> Thank you, thank you for helping me work through this problem.
>>>
>>> David
>>>
>>>
>>>
>>>
>>>
>>> On 7 August 2015 at 01:12, Lance Arlaus <[email protected]> wrote:
>>>
>>>> David-
>>>>
>>>> What if we pivot the problem slightly? Instead of framing token refresh 
>>>> as a reaction to failure, what if we consider it a normal, adjacent flow?
>>>>
>>>> In other words, assume there's a source that provides a continuous 
>>>> stream of Access Tokens. Within the expiration window of a given Access 
>>>> Token, the source will always return the same token, as many times as 
>>>> needed. Once the Access Token expires, or shortly before, a refresh 
>>>> request 
>>>> will be initiated and the newly minted Access Token seamlessly emitted 
>>>> from 
>>>> the source.
>>>>
>>>> Accessing a protected resource would then be a simple matter of adding 
>>>> the current Access Token to any outgoing request. The two concerns, 
>>>> generating Access Tokens and retrieving protected resources, are then duly 
>>>> separated. Failure to retrieve an Access Token should halt request 
>>>> processing, but failure to retrieve a protected resource should never 
>>>> trigger a token refresh (you can assume you have a valid token).
>>>>
>>>> Following is some semi-code that demonstrates what I'm talking about. 
>>>> Note that the function signatures are missing an implicit here and 
>>>> there. I'll try to get this into the form of a working Gist if I can get 
>>>> the time, but I wanted to get this out there to give you the idea.
>>>>
>>>> Side note, I would reserve supervision for genuine error conditions, 
>>>> not as a means of executing a standard flow. Doing otherwise is akin to 
>>>> using exceptions in Java land to govern normal business flow.
>>>>
>>>> Let me know if you have questions.
>>>>
>>>> Lance
>>>>
>>>> abstract class Token(prefix: String, sequence: Long) {
>>>>   def code: String = prefix + sequence
>>>> }
>>>> case class RefreshToken(sequence: Long) extends Token("R", sequence) {
>>>>   def next = this.copy(sequence = sequence + 1)
>>>> }
>>>> case class AccessToken private (sequence: Long, expiresIn: Duration) 
>>>> extends Token("A", sequence) {
>>>>   def this(token: RefreshToken, expiresIn: Duration) = 
>>>> this(token.sequence, expiresIn)
>>>> }
>>>>
>>>> case class RefreshResponse(access: AccessToken, refresh: 
>>>> Option[RefreshToken])
>>>>
>>>>
>>>> // The basic OAuth refresh token request/response flow (section 6 of 
>>>> the OAuth 2.0 spec)
>>>> // This flow is the fundamental source of access and refresh tokens
>>>> // and would be a real HTTP flow in a working example
>>>> def refreshRequest(endpoint: URL, clientId: String, clientSecret: 
>>>> String): Flow[RefreshToken, Future[RefreshResponse], Unit] = {
>>>>   // val expiresIn = 10.minutes
>>>>   // This would be the real flow that makes the request to the OAuth 
>>>> refresh endpoint
>>>>   // using Http().singleRequest(...), for example
>>>>   Flow[RefreshToken].map(refresh => 
>>>> Future.successful(RefreshResponse(new AccessToken(refresh, 10.minutes), 
>>>> Some(refresh.next))))
>>>> }
>>>>
>>>> // Executes a rolling series of refresh requests with appropriate delay 
>>>> to serve as the
>>>> // basis for a continuous stream of Access Tokens
>>>> // The pair of futures emitted represent the current and next Access 
>>>> Tokens, respectively
>>>> // The current Access Token should be used until the future for the 
>>>> next Access Token completes
>>>> def refreshFlow(initial: RefreshToken, request: Flow[RefreshToken, 
>>>> Future[RefreshResponse], Unit]): Source[(Future[AccessToken], 
>>>> Future[AccessToken])] = {
>>>>   Source(
>>>>     Source(initial, Promise[AccessToken]), 
>>>>     request, 
>>>>     Merge[(RefreshToken, Promise[AccessToken])](2), 
>>>>     Unzip[RefreshToken, Promise[AccessToken]],
>>>>     Zip[RefreshResponse, Promise[AccessToken]],
>>>>     Broadcast(2))((mat, _, _, _, _, _) => mat)
>>>>   {
>>>>     implicit b => (initial, request, merge, unzip, zip, bcast) =>
>>>>
>>>>     // Complete current promise and create next promise
>>>>     val promise = b.add(Flow[(Future[RefreshResponse], 
>>>> Promise[AccessToken])].map {
>>>>       case (response, promise) => {
>>>>         promise.completeWith(response.map(_.access))
>>>>         (response, Promise[AccessToken])
>>>>       }
>>>>     })
>>>>
>>>>     // Feeds back the refresh token after delay to initiate the next 
>>>> access token refresh
>>>>     // Uses the Akka after pattern to schedule the Future about 30 
>>>> seconds prior to expiration
>>>>     val feedback = b.add(Flow[(Future[RefreshResponse], 
>>>> Promise[AccessToken])].map {
>>>>       case (response, promise) => {
>>>>         Source(response).collect { 
>>>>           case RefreshResponse(access, Some(refresh)) => {
>>>>             val delay = access.expiresIn.minus(30.seconds)
>>>>             val future = after(delay, scheduler)(() => 
>>>> Future.successful(refresh, promise))
>>>>             Source(future)
>>>>           }
>>>>         }.flatten(FlattenStrategy.concat)
>>>>       }
>>>>     })
>>>>
>>>>     // Output the current/next token future pair
>>>>     val output = b.add(Flow[(Future[RefreshResponse], 
>>>> Promise[AccessToken])].map {
>>>>       case (response, promise) => (response.map(_.access), 
>>>> promise.future)
>>>>     })
>>>>
>>>>     // The rolling request flow, initiated with the initial refresh 
>>>> token
>>>>     initial ~> merge ~> unzip.in
>>>>                         unzip.left ~> request ~> zip.in0 
>>>>                         unzip.right           ~> zip.in1
>>>>                                                  zip.out ~> promise ~> 
>>>> bcast ~> feedback
>>>>                merge                                                   
>>>>       <~ feedback
>>>>                                                                       
>>>>  bcast ~> output
>>>>
>>>>     output.outlet
>>>>   }
>>>> }
>>>>
>>>> // Converts a source of (current, next) Access Token future pairs into
>>>> // a continuous stream of Access Tokens, switching to the next token, 
>>>> when available
>>>> def accessTokenSource(source: Source[(Future[AccessToken], 
>>>> Future[AccessToken])]): Source[AccessToken, Unit] = {
>>>>   source.map {
>>>>     case (current, next) => Source(current).map(token => 
>>>> Source.repeat(token).takeWhile(!next.isCompleted))
>>>>   }.flatten(FlattenStrategy.concat)
>>>> }
>>>>
>>>>
>>>> // Decorate requests with the current access token
>>>> def protected(tokens: Source[AccessToken]): Flow[HttpRequest, 
>>>> HttpRequest] = {
>>>>   Flow[HttpRequest].map(_.withHeaders(new 
>>>> OAuth2BearerToken(AccessToken.code)))
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> On Wednesday, August 5, 2015 at 9:46:05 AM UTC-4, David Pinn wrote:
>>>>>
>>>>> I have an application that pulls data from an external system using 
>>>>> HTTP GET requests, the headers of which include an OAuth 2.0 access 
>>>>> token. 
>>>>> I'm trying to work out what to do when access tokens expire, as they do 
>>>>> from time to time.
>>>>>
>>>>> I suppose I could let the stream complete with failure, and the class 
>>>>> that created the stream could detect that the failure was due to token 
>>>>> expiry, and it could do the OAuth 2.0 refresh token dance to get a new 
>>>>> access token, and then re-create the stream. It feels wrong, though, 
>>>>> because what about all the nice little elements in transit at the time of 
>>>>> the failure? They'll just fall on the floor and die. Sad.
>>>>>
>>>>> And then I read about supervision strategies, and I'm thinking Yeah! 
>>>>> that's right, I only need to restart the stage, and all the nice little 
>>>>> elements can just wait while I do the refresh dance. But then I don't 
>>>>> like 
>>>>> the idea of having the decider function - the function that chooses the 
>>>>> Supervision.Directive, have side effects like OAuth 2.0 refresh dances 
>>>>> and 
>>>>> missile launches and stuff. So then I read about Custom Stream 
>>>>> Processing: 
>>>>> PushPullStage, and PushStage, and in the Akka Streams code I found a 
>>>>> thingy 
>>>>> called AsyncStage - maybe I could write a custom stage that extends that.
>>>>>
>>>>> I feel like I'm in a room with eight closed doors, and behind one is a 
>>>>> land flowing with milk and honey, or nice little elements at least, and 
>>>>> behind all of the others is ritual humiliation. 
>>>>>
>>>>> Please help me choose. Where do I put the OAuth 2.0 refresh token 
>>>>> dance?
>>>>>
>>>> -- 
>>>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>>>> >>>>>>>>>> Check the FAQ: 
>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>> >>>>>>>>>> Search the archives: 
>>>> https://groups.google.com/group/akka-user
>>>> --- 
>>>> You received this message because you are subscribed to a topic in the 
>>>> Google Groups "Akka User List" group.
>>>> To unsubscribe from this topic, visit 
>>>> https://groups.google.com/d/topic/akka-user/qh1ktrdbjbE/unsubscribe.
>>>> To unsubscribe from this group and all its topics, send an email to 
>>>> [email protected].
>>>> To post to this group, send email to [email protected].
>>>> Visit this group at http://groups.google.com/group/akka-user.
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
To post to this group, send email to [email protected].
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to