igalshilman opened a new pull request #241: URL: https://github.com/apache/flink-statefun/pull/241
### This PR adds the ability to cancel delayed messages. - see [FLINK-21308](https://issues.apache.org/jira/browse/FLINK-21308) for the detailed use case that drives this PR. ## High level changes This PR introduces the following methods in the embedded SDK, and the corresponding remote SDKs (language specific flavors) ```java /** * Invokes another function with an input (associated with a {@code cancellationToken}), * identified by the target function's {@link Address}, after a given delay. * * <p>Providing an id to a message, allows "unsending" this message later. ({@link * #cancelDelayedMessage(String)}). * * @param delay the amount of delay before invoking the target function. Value needs to be >= * 0. * @param to the target function's address. * @param message the input to provide for the delayed invocation. * @param cancellationToken the non-empty, non-null, unique token to attach to this message, to be * used for message cancellation. (see {@link #cancelDelayedMessage(String)}.) */ void sendAfter(Duration delay, Address to, Object message, String cancellationToken); /** * Cancel a delayed message (a message that was send via {@link #sendAfter(Duration, Address, * Object, String)}). * * <p>NOTE: this is a best-effort operation, since the message might have been already delivered. * If the message was delivered, this is a no-op operation. * * @param cancellationToken the id of the message to un-send. */ void cancelDelayedMessage(String cancellationToken); ``` * The semantic of the cancelation token is opaque to StateFun, it it's content and uniqueness is completely user defined. * violating uniqueness constraint for two different methods will result in a runtime error. * Once the delayed message has been dispatched, the cancellation token is forgotten. ### Internal Changes ## request reply protocol: We attach the (optional) `cancelltion_token` to the delayed invocation in the request-reply protocol. ```protobuf // DelayedInvocation represents a delayed remote function call with a target address, an argument // and a delay in milliseconds, after which this message to be sent. message DelayedInvocation { // an optional cancellation token that can be used to request the "unsending" of a delayed message. string cancellation_token = 10; // the amount of milliseconds to wait before sending this message int64 delay_in_ms = 1; // the target address to send this message to Address target = 2; // the invocation argument TypedValue argument = 3; } ``` In addition, a new response message: ```protobuf // DelayCancellation represents a single delayed-message cancellation request. message DelayCancellation { string cancellation_token = 1; } ``` ## State We add an additional state handle (`delayed-message-index`) to keep track of the mapping between a `canceltion_token` and the absolute timestamp that this message needs to be dispatched at. These changes are then wired throughout the SDK and the runtime to make the magic happen. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
