This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch blog-post-function-state in repository https://gitbox.apache.org/repos/asf/streampipes-website.git
commit db7d0978ff7bf268f8288bf7f0c509955bb41976 Author: Philipp Zehnder <[email protected]> AuthorDate: Mon Feb 23 17:32:40 2026 +0100 docs: Add blog post for function state --- .../blog/2026-02-23_streampipes-functions.md | 205 +++++++++++++++++++++ 1 file changed, 205 insertions(+) diff --git a/website-v2/blog/2026-02-23_streampipes-functions.md b/website-v2/blog/2026-02-23_streampipes-functions.md new file mode 100644 index 000000000..8848b0670 --- /dev/null +++ b/website-v2/blog/2026-02-23_streampipes-functions.md @@ -0,0 +1,205 @@ +--- +title: Persistent State for StreamPipes Functions +author: Philipp Zehnder +authorURL: "https://github.com/tenthe" +authorImageURL: "/img/zehnder.png" +--- + +# Persistent State for StreamPipes Functions + +With the introduction of **StreamPipes Functions**, Apache StreamPipes provides a lightweight way to attach fixed runtime logic to existing streams — without creating a full pipeline element. + +Functions are ideal when: + +- A full processor would be overkill +- You want logic that runs with the extensions service lifecycle +- You need direct access to incoming events + +Previously, functions could maintain in-memory state during runtime, but any restart of the extensions service resulted in a complete state reset. + +We now provide a new functionality in **developer preview** that introduces typed state persistence for Functions, allowing state to survive service restarts. + +This allows functions to survive restarts and resume exactly where they stopped. + +<!--truncate--> + +## Why Stateful Functions? + +Many real-world use cases require minimal but persistent runtime state: + +- Counting processed events +- Tracking the last processed timestamp +- Storing threshold configurations +- Keeping lightweight usage statistics + +Previously, developers had to implement external persistence manually. +Now, state management is built directly into the Function lifecycle. + +--- + +## The New State Store API + +The `FunctionContext` now provides access to a typed state store: + +```java +StateStore<T> getStateStore(Class<T> stateClass) +```` + +This API allows you to: + +* Load persisted state during startup +* Persist state on shutdown + +State handling is **fully optional and backward compatible**. +If you don’t request a state store, your function behaves exactly as before. + +--- + +## Lifecycle Integration + +Functions still follow the same lifecycle: + +* `onServiceStarted(FunctionContext context)` +* `onEvent(Event event, String streamId)` +* `onServiceStopped()` + +The only difference is that you can now initialize a `StateStore` during startup. + +--- + +## Example: Threshold Monitoring Function + +Let’s implement a simple function that: + +* Counts processed events +* Tracks the last processed timestamp +* Logs a message when a threshold is reached + +### 1. Define the State Object + +State is defined as a typed POJO. + +```java +public class ProcessingState { + + private int schemaVersion = 1; + private long processedEventCount = 0; + private long lastProcessedEpochMs = 0; + private long alertThreshold = 1000; + + public ProcessingState() { + } + + // getters/setters omitted for brevity +} +``` + +--- + +### 2. Implement the Stateful Function + +```java +public class ThresholdMonitoringFunction implements StreamPipesFunction { + + private StateStore<ProcessingState> stateStore; + private ProcessingState state; + + @Override + public String getFunctionId() { + return "threshold-monitoring-function"; + } + + @Override + public List<String> requiredStreamIds() { + return List.of("your-stream-resource-id"); + } + + @Override + public void onServiceStarted(FunctionContext context) { + + // Request typed state store + this.stateStore = context.getStateStore(ProcessingState.class); + + // Load existing state or use default + this.state = stateStore.load(new ProcessingState()); + } + + @Override + public void onEvent(Event event, String streamId) { + + state.setProcessedEventCount( + state.getProcessedEventCount() + 1 + ); + + state.setLastProcessedEpochMs( + System.currentTimeMillis() + ); + + if (state.getProcessedEventCount() + >= state.getAlertThreshold()) { + + // Handle alert once when threshold is reached + } + } + + @Override + public void onServiceStopped() { + + // Persist state before shutdown + stateStore.persist(state); + } +} +``` + +--- + +## State Persistence Lifecycle + +The persistence mechanism follows a simple and predictable pattern: + +### Startup + +```java +state = stateStore.load(new ProcessingState()); +``` + +* If persisted state exists → it is restored +* If not → the provided default object is used + +### Runtime + +* State is modified in memory during `onEvent` + +### Shutdown + +```java +stateStore.persist(state); +``` + +* The current state is stored before the service stops + +After a restart, the function continues with the restored values. + +No external database setup. +No manual serialization logic. +No additional infrastructure required. + +--- + +## Backward Compatibility + +Stateful behavior is **opt-in**. + +Existing functions: + +* Continue to work unchanged +* Require no migration +* Remain fully stateless unless `getStateStore(...)` is used + +--- + +## Conclusion & Feedback + +Functions and state persistence are currently available in **developer preview**, and the API may evolve. + +We are open to feedback from developers, if you try out stateful functions, let us know your thoughts and suggestions.
