This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6a1ff1daaa feat(#4185:) add persistent state support for streampipe 
functions (#4186)
6a1ff1daaa is described below

commit 6a1ff1daaa40e48c771c0056650629f757976366
Author: Philipp Zehnder <[email protected]>
AuthorDate: Tue Feb 24 08:14:30 2026 +0100

    feat(#4185:) add persistent state support for streampipe functions (#4186)
---
 .../apache/streampipes/client/api/IAdminApi.java   |   6 ++
 .../apache/streampipes/client/api/AdminApi.java    |  18 ++++
 .../commons/constants/GenericDocTypes.java         |   1 +
 .../api/declarer/IStreamPipesFunctionDeclarer.java |   6 ++
 .../function/FunctionDeregistrationHandler.java    |   0
 .../function/FunctionRegistrationHandler.java      |   0
 .../extensions/function/RegistrationHandler.java   |   0
 .../function/StreamPipesFunctionHandler.java       |  11 ++-
 .../model/function/FunctionShutdownResult.java     |  33 +++++--
 .../streampipes/model/function/FunctionState.java  |  86 ++++++++++++++++++
 .../model/function/FunctionsShutdownResponse.java  |  22 +++--
 .../execution/ExtensionServiceExecutions.java      |   9 ++
 .../manager/function/FunctionManager.java          | 101 +++++++++++++++++++++
 .../manager/setup/CouchDbInstallationStep.java     |   2 +
 .../setup/tasks/AddFunctionStateViewTask.java      |  26 ++++--
 .../function/FunctionShutdownResource.java         |  40 ++++++++
 .../streampipes/rest/impl/FunctionsResource.java   |  41 +++++++++
 streampipes-service-core/AGENTS.md                 |   2 +
 .../service/core/StreamPipesCoreApplication.java   |  12 ++-
 .../core/migrations/AvailableMigrations.java       |   4 +-
 .../v099/AddFunctionStateViewMigration.java        |  25 +++--
 .../core/storage/StorageApiConfiguration.java      |  17 ++--
 .../StreamPipesExtensionsServiceBase.java          |   1 -
 .../api/function/IFunctionStateStorage.java        |  16 +---
 .../impl/function/FunctionStateStorageImpl.java    |  21 +++--
 .../standalone/function/FunctionContext.java       |  37 ++++++++
 .../standalone/function/FunctionStateStore.java    |  86 ++++++++++++++++++
 .../wrapper/standalone/function/StateStore.java    |  15 +--
 .../standalone/function/StreamPipesFunction.java   |  29 ++++++
 29 files changed, 597 insertions(+), 70 deletions(-)

diff --git 
a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java
 
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java
index ba776ce51c..11db107216 100644
--- 
a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java
+++ 
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java
@@ -25,6 +25,8 @@ import 
org.apache.streampipes.model.function.FunctionDefinition;
 import org.apache.streampipes.model.migration.ModelMigratorConfig;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 public interface IAdminApi {
 
@@ -40,6 +42,10 @@ public interface IAdminApi {
 
   void deregisterFunction(String functionId);
 
+  Optional<Map<String, Object>> getFunctionState(String functionId);
+
+  void persistFunctionState(String functionId, Map<String, Object> state);
+
   void registerMigrations(List<ModelMigratorConfig> migrationConfigs, String 
serviceId);
 
   MessagingSettings getMessagingSettings();
diff --git 
a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java
 
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java
index 970eb62249..54c84798be 100644
--- 
a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java
+++ 
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java
@@ -27,6 +27,8 @@ import org.apache.streampipes.model.message.SuccessMessage;
 import org.apache.streampipes.model.migration.ModelMigratorConfig;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 public class AdminApi extends AbstractClientApi implements IAdminApi {
 
@@ -67,6 +69,18 @@ public class AdminApi extends AbstractClientApi implements 
IAdminApi {
     delete(getDeleteFunctionPath(functionId), SuccessMessage.class);
   }
 
+  @Override
+  public Optional<Map<String, Object>> getFunctionState(String functionId) {
+    return getSingleOpt(getFunctionStatePath(functionId), Map.class)
+        .map(state -> (Map<String, Object>) state);
+  }
+
+  @Override
+  public void persistFunctionState(String functionId,
+                                   Map<String, Object> state) {
+    put(getFunctionStatePath(functionId), state);
+  }
+
   /**
    * Register migration configs {@link ModelMigratorConfig} at the StreamPipes 
Core service.
    * @param migrationConfigs list of migration configs to be registered
@@ -109,6 +123,10 @@ public class AdminApi extends AbstractClientApi implements 
IAdminApi {
     return getFunctionsPath().addToPath(functionId);
   }
 
+  private StreamPipesApiPath getFunctionStatePath(String functionId) {
+    return getDeleteFunctionPath(functionId).addToPath("state");
+  }
+
   private StreamPipesApiPath getMigrationPath() {
     return StreamPipesApiPath
             .fromBaseApiPath()
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
index 3ae70c5c37..91bceb816d 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
@@ -23,6 +23,7 @@ public class GenericDocTypes {
   public static final String DOC_ASSET_MANAGEMENT = "asset-management";
   public static final String DOC_ASSET_LINK_TYPE = "asset-link-type";
   public static final String DOC_TRANSFORMATION_SCRIPT_TEMPLATE = 
"transformation-script-template";
+  public static final String DOC_FUNCTION_STATE = "function-state";
 
 
   public static final String DEFAULT_ASSET_DOC_ID = "default-asset";
diff --git 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
 
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
index cdf0036377..4f552cce33 100644
--- 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++ 
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
@@ -19,6 +19,8 @@
 package org.apache.streampipes.extensions.api.declarer;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 public interface IStreamPipesFunctionDeclarer {
 
@@ -30,4 +32,8 @@ public interface IStreamPipesFunctionDeclarer {
 
   void discardRuntime();
 
+  default Optional<Map<String, Object>> getRegisteredStatePayload() {
+    return Optional.empty();
+  }
+
 }
diff --git 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/FunctionDeregistrationHandler.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/FunctionDeregistrationHandler.java
similarity index 100%
rename from 
streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/FunctionDeregistrationHandler.java
rename to 
streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/FunctionDeregistrationHandler.java
diff --git 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/FunctionRegistrationHandler.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/FunctionRegistrationHandler.java
similarity index 100%
rename from 
streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/FunctionRegistrationHandler.java
rename to 
streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/FunctionRegistrationHandler.java
diff --git 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/RegistrationHandler.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/RegistrationHandler.java
similarity index 100%
rename from 
streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/RegistrationHandler.java
rename to 
streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/RegistrationHandler.java
diff --git 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
similarity index 88%
rename from 
streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
rename to 
streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
index a36d788a4d..8dd336d00e 100644
--- 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
@@ -24,8 +24,11 @@ import 
org.apache.streampipes.extensions.api.pe.config.IDataStreamConfiguration;
 import 
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
 import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
 import org.apache.streampipes.model.function.FunctionDefinition;
+import org.apache.streampipes.model.function.FunctionShutdownResult;
+import org.apache.streampipes.model.function.FunctionsShutdownResponse;
 import org.apache.streampipes.sdk.builder.stream.DataStreamConfiguration;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -92,11 +95,15 @@ public enum StreamPipesFunctionHandler {
     });
   }
 
-  public void cleanupFunctions() {
+  public FunctionsShutdownResponse shutdownFunctionsAndGetState() {
+    var shutdownResults = new ArrayList<FunctionShutdownResult>();
     this.runningInstances.forEach((key, value) -> {
       value.discardRuntime();
+      shutdownResults.add(new FunctionShutdownResult(key, 
value.getRegisteredStatePayload().orElse(null)));
     });
-    new FunctionDeregistrationHandler(getFunctionDefinitions()).run();
+    this.runningInstances.clear();
+
+    return new FunctionsShutdownResponse(shutdownResults);
   }
 
   private List<FunctionDefinition> getFunctionDefinitions() {
diff --git 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionShutdownResult.java
similarity index 54%
copy from 
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
copy to 
streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionShutdownResult.java
index cdf0036377..c9fabcbf1b 100644
--- 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionShutdownResult.java
@@ -16,18 +16,37 @@
  *
  */
 
-package org.apache.streampipes.extensions.api.declarer;
+package org.apache.streampipes.model.function;
 
-import java.util.List;
+import java.util.Map;
 
-public interface IStreamPipesFunctionDeclarer {
+public class FunctionShutdownResult {
 
-  IFunctionConfig getFunctionConfig();
+  private String functionId;
+  private Map<String, Object> state;
 
-  List<String> requiredStreamIds();
+  public FunctionShutdownResult() {
+  }
 
-  void invokeRuntime(String serviceGroup);
+  public FunctionShutdownResult(String functionId,
+                                Map<String, Object> state) {
+    this.functionId = functionId;
+    this.state = state;
+  }
 
-  void discardRuntime();
+  public String getFunctionId() {
+    return functionId;
+  }
 
+  public void setFunctionId(String functionId) {
+    this.functionId = functionId;
+  }
+
+  public Map<String, Object> getState() {
+    return state;
+  }
+
+  public void setState(Map<String, Object> state) {
+    this.state = state;
+  }
 }
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionState.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionState.java
new file mode 100644
index 0000000000..12f99e4e88
--- /dev/null
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionState.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.function;
+
+import org.apache.streampipes.commons.constants.GenericDocTypes;
+import org.apache.streampipes.model.shared.api.Storable;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Map;
+
+public class FunctionState implements Storable {
+
+  public static final String APP_DOC_TYPE = GenericDocTypes.DOC_FUNCTION_STATE;
+
+  private final String appDocType = APP_DOC_TYPE;
+
+  protected @SerializedName("_rev") String rev;
+  private @SerializedName("_id") String functionId;
+  private Map<String, Object> state;
+
+  public FunctionState() {
+  }
+
+  public FunctionState(String functionId,
+                       Map<String, Object> state) {
+    this.functionId = functionId;
+    this.state = state;
+  }
+
+  public String getFunctionId() {
+    return functionId;
+  }
+
+  public void setFunctionId(String functionId) {
+    this.functionId = functionId;
+  }
+
+  public Map<String, Object> getState() {
+    return state;
+  }
+
+  public void setState(Map<String, Object> state) {
+    this.state = state;
+  }
+
+  @Override
+  public String getRev() {
+    return rev;
+  }
+
+  @Override
+  public void setRev(String rev) {
+    this.rev = rev;
+  }
+
+  @Override
+  public String getElementId() {
+    return functionId;
+  }
+
+  @Override
+  public void setElementId(String elementId) {
+    this.functionId = elementId;
+  }
+
+  public String getAppDocType() {
+    return appDocType;
+  }
+}
diff --git 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionsShutdownResponse.java
similarity index 60%
copy from 
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
copy to 
streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionsShutdownResponse.java
index cdf0036377..802c1f978d 100644
--- 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionsShutdownResponse.java
@@ -16,18 +16,28 @@
  *
  */
 
-package org.apache.streampipes.extensions.api.declarer;
+package org.apache.streampipes.model.function;
 
+import java.util.ArrayList;
 import java.util.List;
 
-public interface IStreamPipesFunctionDeclarer {
+public class FunctionsShutdownResponse {
 
-  IFunctionConfig getFunctionConfig();
+  private List<FunctionShutdownResult> functions;
 
-  List<String> requiredStreamIds();
+  public FunctionsShutdownResponse() {
+    this.functions = new ArrayList<>();
+  }
 
-  void invokeRuntime(String serviceGroup);
+  public FunctionsShutdownResponse(List<FunctionShutdownResult> functions) {
+    this.functions = functions;
+  }
 
-  void discardRuntime();
+  public List<FunctionShutdownResult> getFunctions() {
+    return functions;
+  }
 
+  public void setFunctions(List<FunctionShutdownResult> functions) {
+    this.functions = functions;
+  }
 }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
index e4aca6b6c0..2164b2776c 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
@@ -35,6 +35,15 @@ public class ExtensionServiceExecutions {
         .socketTimeout(10000);
   }
 
+  public static Request extServicePostRequestAsServiceAdmin(String url) {
+    return Request
+        .Post(url)
+        .addHeader("Authorization", 
AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid()))
+        .addHeader("Accept", "application/json")
+        .connectTimeout(10000)
+        .socketTimeout(10000);
+  }
+
   private static String getServiceAdminSid() {
     return new 
SpResourceManager().manageUsers().getServiceAdmin().getPrincipalId();
   }
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java
new file mode 100644
index 0000000000..691714faba
--- /dev/null
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.function;
+
+import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
+import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.function.FunctionState;
+import org.apache.streampipes.model.function.FunctionsShutdownResponse;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class FunctionManager {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FunctionManager.class);
+  private static final String FUNCTION_STOP_PATH = "/api/v1/functions/stop";
+
+  public static void stopAllFunctionsAndPersistState(IFunctionStateStorage 
functionStateStorage) {
+    var extensions = 
StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage().findAll();
+
+    LOG.info("Triggering function stop at {} extension services...", 
extensions.size());
+    extensions.forEach(service -> {
+      var shutdownResponse = triggerFunctionStop(service);
+      if (shutdownResponse != null) {
+        persistReturnedFunctionStates(functionStateStorage, shutdownResponse);
+      }
+    });
+  }
+
+  private static FunctionsShutdownResponse 
triggerFunctionStop(SpServiceRegistration service) {
+    var endpoint = service.getServiceUrl() + FUNCTION_STOP_PATH;
+
+    try {
+      LOG.info("Triggering function stop at {}", endpoint);
+      var response = 
ExtensionServiceExecutions.extServicePostRequestAsServiceAdmin(endpoint).execute();
+      var httpResponse = response.returnResponse();
+      int statusCode = httpResponse.getStatusLine().getStatusCode();
+
+      if (statusCode >= 200 && statusCode < 300) {
+        LOG.debug("Function stop triggered at {} (HTTP {})", 
service.getSvcId(), statusCode);
+        if (httpResponse.getEntity() == null) {
+          return null;
+        }
+
+        return JacksonSerializer.getObjectMapper().readValue(
+            EntityUtils.toString(httpResponse.getEntity(), 
StandardCharsets.UTF_8),
+            FunctionsShutdownResponse.class
+        );
+      } else {
+        LOG.warn("Function stop request returned non-success status at {} 
(HTTP {})",
+            service.getSvcId(), statusCode);
+        return null;
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not trigger function stop at {}: {}", endpoint, 
e.getMessage());
+      return null;
+    }
+  }
+
+  private static void persistReturnedFunctionStates(IFunctionStateStorage 
functionStateStorage,
+                                                    FunctionsShutdownResponse 
shutdownResponse) {
+    if (shutdownResponse == null || shutdownResponse.getFunctions() == null) {
+      return;
+    }
+
+    shutdownResponse.getFunctions().forEach(functionResult -> {
+      if (functionResult.getState() != null) {
+        var existingFunctionState = 
functionStateStorage.getElementById(functionResult.getFunctionId());
+        if (existingFunctionState != null) {
+          existingFunctionState.setState(functionResult.getState());
+          functionStateStorage.updateElement(existingFunctionState);
+        } else {
+          functionStateStorage.persist(new 
FunctionState(functionResult.getFunctionId(), functionResult.getState()));
+        }
+      }
+    });
+  }
+}
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
index 21ad1134ea..89fed8172a 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
@@ -22,6 +22,7 @@ import 
org.apache.streampipes.manager.setup.design.UserDesignDocument;
 import org.apache.streampipes.manager.setup.tasks.AddAssetManagementViewTask;
 import org.apache.streampipes.manager.setup.tasks.AddDataLakeMeasureViewTask;
 import 
org.apache.streampipes.manager.setup.tasks.AddDefaultPipelineTemplatesTask;
+import org.apache.streampipes.manager.setup.tasks.AddFunctionStateViewTask;
 import org.apache.streampipes.manager.setup.tasks.AddScriptTemplateViewTask;
 import org.apache.streampipes.manager.setup.tasks.CreateAssetLinkTypeTask;
 import org.apache.streampipes.storage.couchdb.utils.Utils;
@@ -81,6 +82,7 @@ public class CouchDbInstallationStep extends InstallationStep 
{
     new AddDataLakeMeasureViewTask().execute();
     new AddAssetManagementViewTask().execute();
     new AddScriptTemplateViewTask().execute();
+    new AddFunctionStateViewTask().execute();
   }
 
   private void addNotificationView() {
diff --git 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/tasks/AddFunctionStateViewTask.java
similarity index 54%
copy from 
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
copy to 
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/tasks/AddFunctionStateViewTask.java
index cdf0036377..7f5aae32c2 100644
--- 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/tasks/AddFunctionStateViewTask.java
@@ -16,18 +16,28 @@
  *
  */
 
-package org.apache.streampipes.extensions.api.declarer;
+package org.apache.streampipes.manager.setup.tasks;
 
-import java.util.List;
+import org.apache.streampipes.commons.constants.GenericDocTypes;
 
-public interface IStreamPipesFunctionDeclarer {
+public class AddFunctionStateViewTask extends 
AbstractAddGenericStorageViewTask {
 
-  IFunctionConfig getFunctionConfig();
+  public static final String DESIGN_DOCUMENT = "_design/function-states";
+  public static final String VIEW_NAME = "all-function-states";
 
-  List<String> requiredStreamIds();
+  @Override
+  public String getDesignDocument() {
+    return DESIGN_DOCUMENT;
+  }
 
-  void invokeRuntime(String serviceGroup);
-
-  void discardRuntime();
+  @Override
+  public String getViewName() {
+    return VIEW_NAME;
+  }
 
+  @Override
+  public String getMapFunction() {
+    return String.format("function(doc) { if(doc.appDocType === '%s') { 
emit(doc._id, doc); } }",
+        GenericDocTypes.DOC_FUNCTION_STATE);
+  }
 }
diff --git 
a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/function/FunctionShutdownResource.java
 
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/function/FunctionShutdownResource.java
new file mode 100644
index 0000000000..fd66fb8f70
--- /dev/null
+++ 
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/function/FunctionShutdownResource.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.extensions.function;
+
+import org.apache.streampipes.model.function.FunctionsShutdownResponse;
+import org.apache.streampipes.rest.extensions.AbstractExtensionsResource;
+import 
org.apache.streampipes.service.extensions.function.StreamPipesFunctionHandler;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("api/v1/functions")
+public class FunctionShutdownResource extends AbstractExtensionsResource {
+
+  @PostMapping(path = "stop", produces = MediaType.APPLICATION_JSON_VALUE)
+  public ResponseEntity<FunctionsShutdownResponse> shutdownFunctions() {
+    var shutdownResponse = 
StreamPipesFunctionHandler.INSTANCE.shutdownFunctionsAndGetState();
+    return ok(shutdownResponse);
+  }
+}
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java
index 6164bbecc8..56960a9a81 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java
@@ -21,29 +21,41 @@ package org.apache.streampipes.rest.impl;
 import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider;
 import org.apache.streampipes.manager.function.FunctionRegistrationService;
 import org.apache.streampipes.model.function.FunctionDefinition;
+import org.apache.streampipes.model.function.FunctionState;
 import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.model.message.SuccessMessage;
 import org.apache.streampipes.model.monitoring.SpLogEntry;
 import org.apache.streampipes.model.monitoring.SpMetricsEntry;
 import 
org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
+import org.apache.streampipes.rest.shared.exception.SpMessageException;
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
 
+import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.DeleteMapping;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 @RestController
 @RequestMapping("/api/v2/functions")
 public class FunctionsResource extends AbstractAuthGuardedRestResource {
 
+  private final IFunctionStateStorage functionStateStorage;
+
+  public FunctionsResource(IFunctionStateStorage functionStateStorage) {
+    this.functionStateStorage = functionStateStorage;
+  }
+
   @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<Collection<FunctionDefinition>> getActiveFunctions() {
     return ok(FunctionRegistrationService.INSTANCE.getAllFunctions());
@@ -78,4 +90,33 @@ public class FunctionsResource extends 
AbstractAuthGuardedRestResource {
   public ResponseEntity<List<SpLogEntry>> 
getFunctionLogs(@PathVariable("functionId") String functionId) {
     return 
ok(ExtensionsLogProvider.INSTANCE.getLogInfosForResource(functionId));
   }
+
+  @GetMapping(path = "{functionId}/state", produces = 
MediaType.APPLICATION_JSON_VALUE)
+  public ResponseEntity<Map<String, Object>> 
getFunctionState(@PathVariable("functionId") String functionId) {
+    var functionState = functionStateStorage.getElementById(functionId);
+    if (functionState != null) {
+      return ok(functionState.getState());
+    } else {
+      throw new SpMessageException(HttpStatus.NOT_FOUND, 
Notifications.error("Function state not found"));
+    }
+  }
+
+  @PutMapping(
+      path = "{functionId}/state",
+      produces = MediaType.APPLICATION_JSON_VALUE,
+      consumes = MediaType.APPLICATION_JSON_VALUE
+  )
+  public ResponseEntity<SuccessMessage> 
persistFunctionState(@PathVariable("functionId") String functionId,
+                                                             @RequestBody 
Map<String, Object> state) {
+    var existingFunctionState = 
functionStateStorage.getElementById(functionId);
+
+    if (existingFunctionState != null) {
+      existingFunctionState.setState(state);
+      functionStateStorage.updateElement(existingFunctionState);
+    } else {
+      functionStateStorage.persist(new FunctionState(functionId, state));
+    }
+
+    return ok(Notifications.success("Function state successfully persisted"));
+  }
 }
diff --git a/streampipes-service-core/AGENTS.md 
b/streampipes-service-core/AGENTS.md
index bc9965808a..453a644ae3 100644
--- a/streampipes-service-core/AGENTS.md
+++ b/streampipes-service-core/AGENTS.md
@@ -16,6 +16,8 @@ Applies to `streampipes-service-core/`.
 ## Best Practices
 - Keep this module orchestration-focused; domain behavior belongs in 
management modules.
 - New migrations must be idempotent and explicitly registered.
+- In `migrations/AvailableMigrations`, always append new migrations at the end 
of the list.
+- Do not insert new migrations between existing entries to avoid conflicting 
migration order.
 - Preserve existing startup behavior for OAuth and non-OAuth deployments.
 - Avoid introducing long-running/blocking work on startup thread paths.
 
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
index a893025c12..ed3859743f 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
@@ -29,6 +29,7 @@ import 
org.apache.streampipes.health.monitoring.ResourceProvider;
 import org.apache.streampipes.health.monitoring.ServiceHealthCheck;
 import org.apache.streampipes.loadbalance.LoadManager;
 import 
org.apache.streampipes.loadbalance.pipeline.ExtensionsServiceLogExecutor;
+import org.apache.streampipes.manager.function.FunctionManager;
 import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
 import org.apache.streampipes.manager.health.CoreServiceStatusManager;
 import org.apache.streampipes.manager.pipeline.PipelineManager;
@@ -51,6 +52,8 @@ import 
org.apache.streampipes.service.base.StreamPipesServiceBase;
 import org.apache.streampipes.service.core.migrations.AvailableMigrations;
 import org.apache.streampipes.service.core.migrations.Migration;
 import org.apache.streampipes.service.core.migrations.MigrationsHandler;
+import org.apache.streampipes.service.core.storage.StorageApiConfiguration;
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
 import org.apache.streampipes.storage.api.pipeline.IPipelineStorage;
 import org.apache.streampipes.storage.api.system.ISpCoreConfigurationStorage;
 import org.apache.streampipes.storage.couchdb.impl.user.UserStorage;
@@ -59,6 +62,7 @@ import 
org.apache.streampipes.storage.management.StorageDispatcher;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 import org.springframework.context.annotation.ComponentScan;
 import org.springframework.context.annotation.Configuration;
@@ -78,7 +82,8 @@ import java.util.function.Supplier;
 @EnableAutoConfiguration
 @EnableScheduling
 @Import({OpenApiConfiguration.class, StreamPipesPasswordEncoder.class,
-    StreamPipesPrometheusConfig.class, WebSecurityConfig.class, 
WelcomePageController.class})
+    StreamPipesPrometheusConfig.class, WebSecurityConfig.class, 
WelcomePageController.class,
+    StorageApiConfiguration.class})
 @ComponentScan({"org.apache.streampipes.rest.*", 
"org.apache.streampipes.service.core.oauth2",
     "org.apache.streampipes.service.core.scheduler"})
 public class StreamPipesCoreApplication extends StreamPipesServiceBase {
@@ -92,6 +97,9 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
   private final CoreServiceStatusManager coreStatusManager =
       new CoreServiceStatusManager(coreConfigStorage);
 
+  @Autowired
+  private IFunctionStateStorage functionStateStorage;
+
   public static void main(String[] args) {
     StreamPipesCoreApplication application = new StreamPipesCoreApplication();
     application.initialize(() -> List.of(new SpNatsProtocolFactory(), new 
SpKafkaProtocolFactory(),
@@ -233,6 +241,8 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
       }
     });
 
+    FunctionManager.stopAllFunctionsAndPersistState(functionStateStorage);
+
     LOG.info("Thanks for using Apache StreamPipes - see you next time!");
   }
 
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java
index b79f15b186..b09d4d7ec6 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java
@@ -30,6 +30,7 @@ import 
org.apache.streampipes.service.core.migrations.v0980.FixImportedPermissio
 import 
org.apache.streampipes.service.core.migrations.v0980.ModifyAssetLinkTypesMigration;
 import 
org.apache.streampipes.service.core.migrations.v0980.ModifyAssetLinksMigration;
 import 
org.apache.streampipes.service.core.migrations.v099.AddAssetManagementViewMigration;
+import 
org.apache.streampipes.service.core.migrations.v099.AddFunctionStateViewMigration;
 import 
org.apache.streampipes.service.core.migrations.v099.AddScriptTemplateViewMigration;
 import 
org.apache.streampipes.service.core.migrations.v099.ComputeCertificateThumbprintMigration;
 import 
org.apache.streampipes.service.core.migrations.v099.CreateAssetPermissionMigration;
@@ -80,7 +81,8 @@ public class AvailableMigrations {
         new ComputeCertificateThumbprintMigration(),
         new MigrateAdaptersToUseScript(),
         new ModifyAssetLinkIconMigration(),
-        new RemoveDuplicatedAssetPermissions()
+        new RemoveDuplicatedAssetPermissions(),
+        new AddFunctionStateViewMigration()
     );
   }
 }
diff --git 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/AddFunctionStateViewMigration.java
similarity index 54%
copy from 
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
copy to 
streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/AddFunctionStateViewMigration.java
index cdf0036377..d96ec471d7 100644
--- 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/AddFunctionStateViewMigration.java
@@ -16,18 +16,27 @@
  *
  */
 
-package org.apache.streampipes.extensions.api.declarer;
+package org.apache.streampipes.service.core.migrations.v099;
 
-import java.util.List;
+import org.apache.streampipes.manager.setup.tasks.AddFunctionStateViewTask;
+import 
org.apache.streampipes.service.core.migrations.templates.AddGenericStorageViewMigration;
 
-public interface IStreamPipesFunctionDeclarer {
+import java.io.IOException;
 
-  IFunctionConfig getFunctionConfig();
+public class AddFunctionStateViewMigration extends 
AddGenericStorageViewMigration {
 
-  List<String> requiredStreamIds();
+  @Override
+  public String getDesignDocumentName() {
+    return AddFunctionStateViewTask.DESIGN_DOCUMENT;
+  }
 
-  void invokeRuntime(String serviceGroup);
-
-  void discardRuntime();
+  @Override
+  public String getViewName() {
+    return AddFunctionStateViewTask.VIEW_NAME;
+  }
 
+  @Override
+  public void executeMigration() throws IOException {
+    new AddFunctionStateViewTask().execute();
+  }
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/storage/StorageApiConfiguration.java
similarity index 62%
copy from 
streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
copy to 
streampipes-service-core/src/main/java/org/apache/streampipes/service/core/storage/StorageApiConfiguration.java
index 3ae70c5c37..ae16f2e6ea 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/storage/StorageApiConfiguration.java
@@ -16,14 +16,19 @@
  *
  */
 
-package org.apache.streampipes.commons.constants;
+package org.apache.streampipes.service.core.storage;
 
-public class GenericDocTypes {
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
+import 
org.apache.streampipes.storage.couchdb.impl.function.FunctionStateStorageImpl;
 
-  public static final String DOC_ASSET_MANAGEMENT = "asset-management";
-  public static final String DOC_ASSET_LINK_TYPE = "asset-link-type";
-  public static final String DOC_TRANSFORMATION_SCRIPT_TEMPLATE = 
"transformation-script-template";
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
 
+@Configuration
+public class StorageApiConfiguration {
 
-  public static final String DEFAULT_ASSET_DOC_ID = "default-asset";
+  @Bean
+  public IFunctionStateStorage functionStateStorage() {
+    return new FunctionStateStorageImpl();
+  }
 }
diff --git 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
index b95136efbc..80e8ffde21 100644
--- 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
+++ 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
@@ -190,7 +190,6 @@ public abstract class StreamPipesExtensionsServiceBase 
extends StreamPipesServic
   @PreDestroy
   public void onExit() {
     new ExtensionsServiceShutdownHandler().onShutdown();
-    StreamPipesFunctionHandler.INSTANCE.cleanupFunctions();
     deregisterService(DeclarersSingleton.getInstance().getServiceId());
   }
 
diff --git 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/function/IFunctionStateStorage.java
similarity index 74%
copy from 
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
copy to 
streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/function/IFunctionStateStorage.java
index cdf0036377..1be7d41e13 100644
--- 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++ 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/function/IFunctionStateStorage.java
@@ -16,18 +16,10 @@
  *
  */
 
-package org.apache.streampipes.extensions.api.declarer;
+package org.apache.streampipes.storage.api.function;
 
-import java.util.List;
-
-public interface IStreamPipesFunctionDeclarer {
-
-  IFunctionConfig getFunctionConfig();
-
-  List<String> requiredStreamIds();
-
-  void invokeRuntime(String serviceGroup);
-
-  void discardRuntime();
+import org.apache.streampipes.model.function.FunctionState;
+import org.apache.streampipes.storage.api.core.CRUDStorage;
 
+public interface IFunctionStateStorage extends CRUDStorage<FunctionState> {
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/function/FunctionStateStorageImpl.java
similarity index 55%
copy from 
streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
copy to 
streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/function/FunctionStateStorageImpl.java
index 3ae70c5c37..5720c77de3 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
+++ 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/function/FunctionStateStorageImpl.java
@@ -16,14 +16,21 @@
  *
  */
 
-package org.apache.streampipes.commons.constants;
+package org.apache.streampipes.storage.couchdb.impl.function;
 
-public class GenericDocTypes {
+import org.apache.streampipes.model.function.FunctionState;
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
+import org.apache.streampipes.storage.couchdb.impl.core.DefaultViewCrudStorage;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
 
-  public static final String DOC_ASSET_MANAGEMENT = "asset-management";
-  public static final String DOC_ASSET_LINK_TYPE = "asset-link-type";
-  public static final String DOC_TRANSFORMATION_SCRIPT_TEMPLATE = 
"transformation-script-template";
+public class FunctionStateStorageImpl extends 
DefaultViewCrudStorage<FunctionState>
+    implements IFunctionStateStorage {
 
-
-  public static final String DEFAULT_ASSET_DOC_ID = "default-asset";
+  public FunctionStateStorageImpl() {
+    super(
+        () -> Utils.getCouchDbGsonClient("genericstorage"),
+        FunctionState.class,
+        "function-states/all-function-states"
+    );
+  }
 }
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java
index ff4326971e..2d9709377a 100644
--- 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java
@@ -30,6 +30,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
 
 public class FunctionContext {
 
@@ -38,12 +40,15 @@ public class FunctionContext {
   private StreamPipesClient client;
   private String functionId;
   private ConfigExtractor config;
+  private final Map<Class<?>, StateStore<?>> stateStores;
+  private Supplier<Object> stateSupplier;
 
   private Map<String, SpOutputCollector> outputCollectors;
   private IExtensionsLogger extensionsLogger;
 
   public FunctionContext() {
     this.streams = new HashMap<>();
+    this.stateStores = new HashMap<>();
   }
 
   public FunctionContext(String functionId,
@@ -87,4 +92,36 @@ public class FunctionContext {
   public Map<String, SpOutputCollector> getOutputCollectors() {
     return outputCollectors;
   }
+
+  public <T> StateStore<T> getStateStore(Class<T> stateClass) {
+    return (StateStore<T>) stateStores.computeIfAbsent(
+        stateClass,
+        key -> new FunctionStateStore<>(functionId, client, stateClass)
+    );
+  }
+
+  public void registerStateSupplier(Supplier<Object> stateSupplier) {
+    this.stateSupplier = stateSupplier;
+  }
+
+  public Optional<Object> getRegisteredState() {
+    if (stateSupplier != null) {
+      return Optional.ofNullable(stateSupplier.get());
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public Optional<Map<String, Object>> getPersistedStatePayload() {
+    return stateStores
+        .values()
+        .stream()
+        .filter(FunctionStateStore.class::isInstance)
+        .map(FunctionStateStore.class::cast)
+        .map(FunctionStateStore::getPersistedStatePayload)
+        .filter(payload -> payload != null)
+        .map(payload -> (Map<String, Object>) payload)
+        .findFirst();
+  }
 }
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionStateStore.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionStateStore.java
new file mode 100644
index 0000000000..8bdfccca55
--- /dev/null
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionStateStore.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.wrapper.standalone.function;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FunctionStateStore<T> implements StateStore<T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FunctionStateStore.class);
+  private static final String STATE_PAYLOAD_KEY = "payload";
+
+  private final String functionId;
+  private final StreamPipesClient client;
+  private final Class<T> stateClass;
+  private final ObjectMapper objectMapper;
+  private Map<String, Object> persistedStatePayload;
+
+  public FunctionStateStore(String functionId,
+                            StreamPipesClient client,
+                            Class<T> stateClass) {
+    this.functionId = functionId;
+    this.client = client;
+    this.stateClass = stateClass;
+    this.objectMapper = JacksonSerializer.getObjectMapper();
+  }
+
+  @Override
+  public T load(T defaultState) {
+    try {
+      var functionState = client.adminApi().getFunctionState(functionId);
+      if (functionState.isPresent()) {
+        var persistedState = functionState.get();
+        if (persistedState.containsKey(STATE_PAYLOAD_KEY)) {
+          return 
objectMapper.convertValue(persistedState.get(STATE_PAYLOAD_KEY), stateClass);
+        } else {
+          LOG.warn("Could not load function state for {}: missing payload 
key", functionId);
+          return defaultState;
+        }
+      } else {
+        return defaultState;
+      }
+    } catch (RuntimeException e) {
+      LOG.warn("Could not load function state for {}: {}", functionId, 
e.getMessage());
+      return defaultState;
+    }
+  }
+
+  @Override
+  public void persist(T state) {
+    try {
+      Map<String, Object> payload = new HashMap<>();
+      payload.put(STATE_PAYLOAD_KEY, objectMapper.convertValue(state, 
Object.class));
+      this.persistedStatePayload = payload;
+    } catch (RuntimeException e) {
+      LOG.warn("Could not persist function state for {}: {}", functionId, 
e.getMessage());
+    }
+  }
+
+  public Map<String, Object> getPersistedStatePayload() {
+    return persistedStatePayload;
+  }
+}
diff --git 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StateStore.java
similarity index 74%
copy from 
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
copy to 
streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StateStore.java
index cdf0036377..59b6068c84 100644
--- 
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StateStore.java
@@ -16,18 +16,11 @@
  *
  */
 
-package org.apache.streampipes.extensions.api.declarer;
+package org.apache.streampipes.wrapper.standalone.function;
 
-import java.util.List;
+public interface StateStore<T> {
 
-public interface IStreamPipesFunctionDeclarer {
-
-  IFunctionConfig getFunctionConfig();
-
-  List<String> requiredStreamIds();
-
-  void invokeRuntime(String serviceGroup);
-
-  void discardRuntime();
+  T load(T defaultState);
 
+  void persist(T state);
 }
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index 82e58d0587..fadf1fdff7 100644
--- 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -38,6 +38,7 @@ import org.apache.streampipes.model.runtime.EventFactory;
 import org.apache.streampipes.model.runtime.SchemaInfo;
 import org.apache.streampipes.model.runtime.SourceInfo;
 import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
 import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
 
 import org.slf4j.Logger;
@@ -47,6 +48,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public abstract class StreamPipesFunction implements 
IStreamPipesFunctionDeclarer, RawDataProcessor {
@@ -55,6 +57,7 @@ public abstract class StreamPipesFunction implements 
IStreamPipesFunctionDeclare
   private final Map<String, SourceInfo> sourceInfoMapper;
   private final Map<String, SchemaInfo> schemaInfoMapper;
   private Map<String, SpInputCollector> inputCollectors;
+  private FunctionContext functionContext;
 
   private Map<String, SpOutputCollector> outputCollectors;
 
@@ -76,6 +79,7 @@ public abstract class StreamPipesFunction implements 
IStreamPipesFunctionDeclare
         this.requiredStreamIds(),
         this.outputCollectors
     ).generate();
+    this.functionContext = context;
 
     // Creates a source info for each incoming SpDataStream
     // The index is used to create the selector prefix for the SourceInfo
@@ -215,4 +219,29 @@ public abstract class StreamPipesFunction implements 
IStreamPipesFunctionDeclare
 
   public abstract void onServiceStopped();
 
+  @Override
+  public Optional<Map<String, Object>> getRegisteredStatePayload() {
+    if (functionContext == null) {
+      return Optional.empty();
+    }
+
+    try {
+      var registeredState = functionContext.getRegisteredState().map(state -> {
+        Map<String, Object> payload = new HashMap<>();
+        payload.put("payload", 
JacksonSerializer.getObjectMapper().convertValue(state, Object.class));
+        return payload;
+      });
+
+      if (registeredState.isPresent()) {
+        return registeredState;
+      }
+
+      return functionContext.getPersistedStatePayload();
+    } catch (RuntimeException e) {
+      LOG.warn("Could not collect registered state for function {}: {}", 
getFunctionConfig().getFunctionId().getId(),
+          e.getMessage());
+      return Optional.empty();
+    }
+  }
+
 }

Reply via email to