igalshilman opened a new pull request #241:
URL: https://github.com/apache/flink-statefun/pull/241


   ### This PR adds the ability to cancel delayed messages.
   
   - see [FLINK-21308](https://issues.apache.org/jira/browse/FLINK-21308) for 
the detailed use case that drives this PR.
   
   ## High level changes
   
   This PR introduces the following methods in the embedded SDK, and the 
corresponding remote SDKs (language specific flavors)
   
   
   ```java
   /**
      * Invokes another function with an input (associated with a {@code 
cancellationToken}),
      * identified by the target function's {@link Address}, after a given 
delay.
      *
      * <p>Providing an id to a message, allows "unsending" this message later. 
({@link
      * #cancelDelayedMessage(String)}).
      *
      * @param delay the amount of delay before invoking the target function. 
Value needs to be &gt;=
      *     0.
      * @param to the target function's address.
      * @param message the input to provide for the delayed invocation.
      * @param cancellationToken the non-empty, non-null, unique token to 
attach to this message, to be
      *     used for message cancellation. (see {@link 
#cancelDelayedMessage(String)}.)
      */
     void sendAfter(Duration delay, Address to, Object message, String 
cancellationToken);
   
     /**
      * Cancel a delayed message (a message that was send via {@link 
#sendAfter(Duration, Address,
      * Object, String)}).
      *
      * <p>NOTE: this is a best-effort operation, since the message might have 
been already delivered.
      * If the message was delivered, this is a no-op operation.
      *
      * @param cancellationToken the id of the message to un-send.
      */
     void cancelDelayedMessage(String cancellationToken);
   ```
   
   * The semantic of the cancelation token is opaque to StateFun, it it's 
content and uniqueness is completely user defined.
   * violating uniqueness constraint for two different methods will result in a 
runtime error.
   * Once the delayed message has been dispatched, the cancellation token is 
forgotten.
   
   ### Internal Changes
   
   ## request reply protocol:
   
   We attach the (optional) `cancelltion_token` to the delayed invocation in 
the request-reply protocol.
   
   ```protobuf
   // DelayedInvocation represents a delayed remote function call with a target 
address, an argument
   // and a delay in milliseconds, after which this message to be sent.
       message DelayedInvocation {
           // an optional cancellation token that can be used to request the 
"unsending" of a delayed message.
           string cancellation_token = 10;
           // the amount of milliseconds to wait before sending this message
           int64 delay_in_ms = 1;
           // the target address to send this message to
           Address target = 2;
           // the invocation argument
           TypedValue argument = 3;
       }
   ```
   
   In addition, a new response message:
   
   ```protobuf
   // DelayCancellation represents a single delayed-message cancellation 
request.
   message DelayCancellation {
           string cancellation_token = 1;
   }
   ```
   
   ## State
   
   We add an additional state handle (`delayed-message-index`) to keep track of 
the mapping between a `canceltion_token` and the absolute timestamp that this 
message needs to be dispatched at.
   
   
   
   These changes are then wired throughout the SDK and the runtime to make the 
magic happen.
   
     
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to