This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch docu-function-state in repository https://gitbox.apache.org/repos/asf/streampipes-website.git
commit 440291a3b57a27c622577d45f21d1a8ffe981e94 Author: Philipp Zehnder <[email protected]> AuthorDate: Mon Feb 23 15:46:09 2026 +0100 docs: update function state management documentation --- docs/06_extend-sdk-functions.md | 72 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 70 insertions(+), 2 deletions(-) diff --git a/docs/06_extend-sdk-functions.md b/docs/06_extend-sdk-functions.md index 659690b71..150f2381d 100644 --- a/docs/06_extend-sdk-functions.md +++ b/docs/06_extend-sdk-functions.md @@ -81,7 +81,7 @@ The structure of a function class is easy to understand: ### Getting a stream ID Functions require a reference to all data streams that should be retrieved by the function. -Currently, the only way to get the ID of a function is by navigating to the ``Asset Management`` view in the StreamPipes UI. +Currently, the only way to get the ID of a stream is by navigating to the ``Asset Management`` view in the StreamPipes UI. Create a new asset, click on ``Edit Asset`` and open ``Add Link`` in the _Linked Resources_ panel. Choose ``Data Source`` as link type, select one of the available sources, copy the ``Resource ID`` and provide this ID in the ``requiredStreamIds`` method. @@ -94,6 +94,74 @@ The ``onServiceStarted`` method provides a function context which provides sever * _getClient_ returns a reference to the StreamPipes client to interact with features from the REST API. * _getStreams_ returns the data model of all data streams defined in the ``requiredStreamIds`` method. * _getSchema_ returns the schema of a specific data stream by providing the ``streamId`` +* _getStateStore_ returns a typed ``StateStore<T>`` to load and persist function state. + +### Stateful Functions + +Functions can persist and restore operational state (for example counters, thresholds, or timestamps) through ``FunctionContext``. +This enables functions to continue with the latest known state after a service restart. + +Use this lifecycle pattern: + +* ``onServiceStarted``: request a typed state store and call ``load(defaultState)``. +* ``onEvent``: update in-memory state. +* ``onServiceStopped``: call ``persist(state)``. + +```java +public class ThroughputMonitorFunction extends StreamPipesFunction { + + public static class ProcessingState { + private int schemaVersion; + private long processedEventCount; + private long lastProcessedEpochMs; + private double alertThreshold; + + public ProcessingState() {} + + public ProcessingState(int schemaVersion, + long processedEventCount, + long lastProcessedEpochMs, + double alertThreshold) { + this.schemaVersion = schemaVersion; + this.processedEventCount = processedEventCount; + this.lastProcessedEpochMs = lastProcessedEpochMs; + this.alertThreshold = alertThreshold; + } + // getters/setters + } + + private ProcessingState state; + private StateStore<ProcessingState> stateStore; + + private ProcessingState createDefaultState() { + return new ProcessingState(1, 0L, 0L, 75.0); + } + + @Override + public void onServiceStarted(FunctionContext context) { + stateStore = context.getStateStore(ProcessingState.class); + state = stateStore.load(createDefaultState()); + } + + @Override + public void onEvent(Event event, String streamId) { + state.setProcessedEventCount(state.getProcessedEventCount() + 1); + state.setLastProcessedEpochMs(System.currentTimeMillis()); + } + + @Override + public void onServiceStopped() { + stateStore.persist(state); + } +} +``` + +### State Persistence Lifecycle + +* If state exists, ``load(defaultState)`` restores it during ``onServiceStarted``. +* If no state exists, the value provided to ``load(defaultState)`` is used. +* State is persisted on ``onServiceStopped`` when ``persist(state)`` is called. +* State usage is optional. Existing functions that do not use state remain unchanged. ## Registering a function @@ -122,6 +190,6 @@ provide an instance of your function. ## Metrics & Monitoring -Similar to pipeline elements, function register at the StreamPipes core. +Similar to pipeline elements, functions register at the StreamPipes core. Running functions can be seen in the pipeline view of the user interface under _Functions_, right below the list of available pipelines. Similar to pipelines, simple metrics, monitoring info and exceptions can be viewed in the _Details_ section of each function.
