This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch 
4185-add-persistent-state-support-for-streampipe-functions
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 65e26fd4da305ab9a29367f5b6f20faf45bf1f8d
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Feb 20 20:42:47 2026 +0100

    feat(#4185): First version of function state workflow
---
 .../apache/streampipes/client/api/IAdminApi.java   |  6 ++
 .../apache/streampipes/client/api/AdminApi.java    | 18 +++++
 .../streampipes/model/function/FunctionState.java  | 79 ++++++++++++++++++++++
 .../streampipes/rest/impl/FunctionsResource.java   | 37 ++++++++++
 .../service/core/StreamPipesCoreApplication.java   | 42 ++++++++++++
 .../function/FunctionShutdownResource.java         | 44 ++++++++++++
 .../StreamPipesExtensionsServiceBase.java          |  1 -
 .../function/StreamPipesFunctionHandler.java       |  8 ++-
 .../storage/api/core/INoSqlStorage.java            |  3 +
 .../api/function/IFunctionStateStorage.java        | 25 +++++++
 .../storage/couchdb/CouchDbStorageManager.java     |  7 ++
 .../impl/function/FunctionStateStorageImpl.java    | 34 ++++++++++
 .../standalone/function/FunctionContext.java       |  9 +++
 .../standalone/function/FunctionStateStore.java    | 74 ++++++++++++++++++++
 .../wrapper/standalone/function/StateStore.java    | 26 +++++++
 15 files changed, 411 insertions(+), 2 deletions(-)

diff --git 
a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java
 
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java
index ba776ce51c..11db107216 100644
--- 
a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java
+++ 
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java
@@ -25,6 +25,8 @@ import 
org.apache.streampipes.model.function.FunctionDefinition;
 import org.apache.streampipes.model.migration.ModelMigratorConfig;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 public interface IAdminApi {
 
@@ -40,6 +42,10 @@ public interface IAdminApi {
 
   void deregisterFunction(String functionId);
 
+  Optional<Map<String, Object>> getFunctionState(String functionId);
+
+  void persistFunctionState(String functionId, Map<String, Object> state);
+
   void registerMigrations(List<ModelMigratorConfig> migrationConfigs, String 
serviceId);
 
   MessagingSettings getMessagingSettings();
diff --git 
a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java
 
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java
index 970eb62249..54c84798be 100644
--- 
a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java
+++ 
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java
@@ -27,6 +27,8 @@ import org.apache.streampipes.model.message.SuccessMessage;
 import org.apache.streampipes.model.migration.ModelMigratorConfig;
 
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 
 public class AdminApi extends AbstractClientApi implements IAdminApi {
 
@@ -67,6 +69,18 @@ public class AdminApi extends AbstractClientApi implements 
IAdminApi {
     delete(getDeleteFunctionPath(functionId), SuccessMessage.class);
   }
 
+  @Override
+  public Optional<Map<String, Object>> getFunctionState(String functionId) {
+    return getSingleOpt(getFunctionStatePath(functionId), Map.class)
+        .map(state -> (Map<String, Object>) state);
+  }
+
+  @Override
+  public void persistFunctionState(String functionId,
+                                   Map<String, Object> state) {
+    put(getFunctionStatePath(functionId), state);
+  }
+
   /**
    * Register migration configs {@link ModelMigratorConfig} at the StreamPipes 
Core service.
    * @param migrationConfigs list of migration configs to be registered
@@ -109,6 +123,10 @@ public class AdminApi extends AbstractClientApi implements 
IAdminApi {
     return getFunctionsPath().addToPath(functionId);
   }
 
+  private StreamPipesApiPath getFunctionStatePath(String functionId) {
+    return getDeleteFunctionPath(functionId).addToPath("state");
+  }
+
   private StreamPipesApiPath getMigrationPath() {
     return StreamPipesApiPath
             .fromBaseApiPath()
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionState.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionState.java
new file mode 100644
index 0000000000..9e607c71d6
--- /dev/null
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionState.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.function;
+
+import org.apache.streampipes.model.shared.annotation.TsModel;
+import org.apache.streampipes.model.shared.api.Storable;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Map;
+
+@TsModel
+public class FunctionState implements Storable {
+
+  protected @SerializedName("_rev") String rev;
+  private @SerializedName("_id") String functionId;
+  private Map<String, Object> state;
+
+  public FunctionState() {
+  }
+
+  public FunctionState(String functionId,
+                       Map<String, Object> state) {
+    this.functionId = functionId;
+    this.state = state;
+  }
+
+  public String getFunctionId() {
+    return functionId;
+  }
+
+  public void setFunctionId(String functionId) {
+    this.functionId = functionId;
+  }
+
+  public Map<String, Object> getState() {
+    return state;
+  }
+
+  public void setState(Map<String, Object> state) {
+    this.state = state;
+  }
+
+  @Override
+  public String getRev() {
+    return rev;
+  }
+
+  @Override
+  public void setRev(String rev) {
+    this.rev = rev;
+  }
+
+  @Override
+  public String getElementId() {
+    return functionId;
+  }
+
+  @Override
+  public void setElementId(String elementId) {
+    this.functionId = elementId;
+  }
+}
diff --git 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java
 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java
index 6164bbecc8..d3a1eb16b7 100644
--- 
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java
+++ 
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java
@@ -20,30 +20,38 @@ package org.apache.streampipes.rest.impl;
 
 import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider;
 import org.apache.streampipes.manager.function.FunctionRegistrationService;
+import org.apache.streampipes.model.function.FunctionState;
 import org.apache.streampipes.model.function.FunctionDefinition;
 import org.apache.streampipes.model.message.Notifications;
 import org.apache.streampipes.model.message.SuccessMessage;
 import org.apache.streampipes.model.monitoring.SpLogEntry;
 import org.apache.streampipes.model.monitoring.SpMetricsEntry;
 import 
org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
+import org.apache.streampipes.rest.shared.exception.SpMessageException;
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
 
+import org.springframework.http.HttpStatus;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.DeleteMapping;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 @RestController
 @RequestMapping("/api/v2/functions")
 public class FunctionsResource extends AbstractAuthGuardedRestResource {
 
+  private final IFunctionStateStorage functionStateStorage = 
getNoSqlStorage().getFunctionStateStorage();
+
   @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)
   public ResponseEntity<Collection<FunctionDefinition>> getActiveFunctions() {
     return ok(FunctionRegistrationService.INSTANCE.getAllFunctions());
@@ -78,4 +86,33 @@ public class FunctionsResource extends 
AbstractAuthGuardedRestResource {
   public ResponseEntity<List<SpLogEntry>> 
getFunctionLogs(@PathVariable("functionId") String functionId) {
     return 
ok(ExtensionsLogProvider.INSTANCE.getLogInfosForResource(functionId));
   }
+
+  @GetMapping(path = "{functionId}/state", produces = 
MediaType.APPLICATION_JSON_VALUE)
+  public ResponseEntity<Map<String, Object>> 
getFunctionState(@PathVariable("functionId") String functionId) {
+    var functionState = functionStateStorage.getElementById(functionId);
+    if (functionState != null) {
+      return ok(functionState.getState());
+    } else {
+      throw new SpMessageException(HttpStatus.NOT_FOUND, 
Notifications.error("Function state not found"));
+    }
+  }
+
+  @PutMapping(
+      path = "{functionId}/state",
+      produces = MediaType.APPLICATION_JSON_VALUE,
+      consumes = MediaType.APPLICATION_JSON_VALUE
+  )
+  public ResponseEntity<SuccessMessage> 
persistFunctionState(@PathVariable("functionId") String functionId,
+                                                             @RequestBody 
Map<String, Object> state) {
+    var existingFunctionState = 
functionStateStorage.getElementById(functionId);
+
+    if (existingFunctionState != null) {
+      existingFunctionState.setState(state);
+      functionStateStorage.updateElement(existingFunctionState);
+    } else {
+      functionStateStorage.persist(new FunctionState(functionId, state));
+    }
+
+    return ok(Notifications.success("Function state successfully persisted"));
+  }
 }
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
index a893025c12..278821abf0 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
@@ -35,6 +35,7 @@ import 
org.apache.streampipes.manager.pipeline.PipelineManager;
 import org.apache.streampipes.manager.setup.AutoInstallation;
 import org.apache.streampipes.manager.setup.StreamPipesEnvChecker;
 import 
org.apache.streampipes.manager.setup.tasks.ApplyDefaultRolesAndPrivilegesTask;
+import org.apache.streampipes.manager.util.AuthTokenUtils;
 import org.apache.streampipes.messaging.SpProtocolManager;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
@@ -42,6 +43,7 @@ import 
org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
 import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
 import org.apache.streampipes.messaging.pulsar.SpPulsarProtocolFactory;
 import org.apache.streampipes.model.configuration.SpCoreConfigurationStatus;
+import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
 import org.apache.streampipes.model.pipeline.Pipeline;
 import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
 import org.apache.streampipes.resource.management.SpResourceManager;
@@ -57,6 +59,8 @@ import 
org.apache.streampipes.storage.couchdb.impl.user.UserStorage;
 import org.apache.streampipes.storage.couchdb.utils.CouchDbViewGenerator;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@@ -68,6 +72,7 @@ import 
org.springframework.scheduling.annotation.EnableScheduling;
 import jakarta.annotation.PostConstruct;
 import jakarta.annotation.PreDestroy;
 
+import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.List;
 import java.util.concurrent.Executors;
@@ -85,6 +90,7 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
 
   private static final Logger LOG =
       
LoggerFactory.getLogger(StreamPipesCoreApplication.class.getCanonicalName());
+  private static final String FUNCTION_SHUTDOWN_PATH = 
"/api/v1/functions/shutdown";
 
   private final ISpCoreConfigurationStorage coreConfigStorage =
       
StorageDispatcher.INSTANCE.getNoSqlStore().getSpCoreConfigurationStorage();
@@ -233,9 +239,45 @@ public class StreamPipesCoreApplication extends 
StreamPipesServiceBase {
       }
     });
 
+    triggerExtensionFunctionShutdown();
+
     LOG.info("Thanks for using Apache StreamPipes - see you next time!");
   }
 
+  private void triggerExtensionFunctionShutdown() {
+    var extensions = 
StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage().findAll();
+    var serviceAdmin = new SpResourceManager().manageUsers().getServiceAdmin();
+    var authToken = AuthTokenUtils.getAuthTokenForUser(serviceAdmin);
+
+    LOG.info("Triggering function shutdown at {} extension services...", 
extensions.size());
+    extensions.forEach(service -> triggerFunctionShutdown(service, authToken));
+  }
+
+  private void triggerFunctionShutdown(SpServiceRegistration service,
+                                       String authToken) {
+    var endpoint = service.getServiceUrl() + FUNCTION_SHUTDOWN_PATH;
+
+    try {
+      LOG.info("Triggering function shutdown at {}", endpoint);
+      Response response = Request
+          .Post(endpoint)
+          .addHeader("Authorization", authToken)
+          .connectTimeout(5000)
+          .socketTimeout(10000)
+          .execute();
+      int statusCode = 
response.returnResponse().getStatusLine().getStatusCode();
+
+      if (statusCode >= 200 && statusCode < 300) {
+        LOG.debug("Function shutdown triggered at {} (HTTP {})", 
service.getSvcId(), statusCode);
+      } else {
+        LOG.warn("Function shutdown request returned non-success status at {} 
(HTTP {})",
+            service.getSvcId(), statusCode);
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not trigger function shutdown at {}: {}", endpoint, 
e.getMessage());
+    }
+  }
+
   private List<Pipeline> getAllPipelines() {
     return getPipelineStorage().findAll();
   }
diff --git 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/rest/extensions/function/FunctionShutdownResource.java
 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/rest/extensions/function/FunctionShutdownResource.java
new file mode 100644
index 0000000000..877c6137b1
--- /dev/null
+++ 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/rest/extensions/function/FunctionShutdownResource.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.extensions.function;
+
+import org.apache.streampipes.model.Response;
+import org.apache.streampipes.rest.extensions.AbstractExtensionsResource;
+import 
org.apache.streampipes.service.extensions.function.StreamPipesFunctionHandler;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("api/v1/functions")
+public class FunctionShutdownResource extends AbstractExtensionsResource {
+
+  @PostMapping(path = "shutdown", produces = MediaType.APPLICATION_JSON_VALUE)
+  public ResponseEntity<Response> shutdownFunctions() {
+    try {
+      StreamPipesFunctionHandler.INSTANCE.cleanupFunctions();
+      return ok(new Response("functions-shutdown", true, "Function shutdown 
triggered"));
+    } catch (RuntimeException e) {
+      return ok(new Response("functions-shutdown", false, e.getMessage()));
+    }
+  }
+}
diff --git 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
index b95136efbc..80e8ffde21 100644
--- 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
+++ 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
@@ -190,7 +190,6 @@ public abstract class StreamPipesExtensionsServiceBase 
extends StreamPipesServic
   @PreDestroy
   public void onExit() {
     new ExtensionsServiceShutdownHandler().onShutdown();
-    StreamPipesFunctionHandler.INSTANCE.cleanupFunctions();
     deregisterService(DeclarersSingleton.getInstance().getServiceId());
   }
 
diff --git 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
index a36d788a4d..a6900ef299 100644
--- 
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
+++ 
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
@@ -93,10 +93,16 @@ public enum StreamPipesFunctionHandler {
   }
 
   public void cleanupFunctions() {
+    if (this.runningInstances.isEmpty()) {
+      return;
+    }
+
+    var functionDefinitions = getFunctionDefinitions();
     this.runningInstances.forEach((key, value) -> {
       value.discardRuntime();
     });
-    new FunctionDeregistrationHandler(getFunctionDefinitions()).run();
+    new FunctionDeregistrationHandler(functionDefinitions).run();
+    this.runningInstances.clear();
   }
 
   private List<FunctionDefinition> getFunctionDefinitions() {
diff --git 
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/core/INoSqlStorage.java
 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/core/INoSqlStorage.java
index 2e659aa65f..b80905355d 100644
--- 
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/core/INoSqlStorage.java
+++ 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/core/INoSqlStorage.java
@@ -21,6 +21,7 @@ import 
org.apache.streampipes.storage.api.connect.IAdapterStorage;
 import 
org.apache.streampipes.storage.api.explorer.IDataExplorerDashboardStorage;
 import org.apache.streampipes.storage.api.explorer.IDataExplorerWidgetStorage;
 import org.apache.streampipes.storage.api.explorer.IDataLakeMeasureStorage;
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
 import 
org.apache.streampipes.storage.api.pipeline.ICompactPipelineTemplateStorage;
 import org.apache.streampipes.storage.api.pipeline.IDataProcessorStorage;
 import org.apache.streampipes.storage.api.pipeline.IDataSinkStorage;
@@ -108,4 +109,6 @@ public interface INoSqlStorage {
   IAssetStorage getAssetStorage();
 
   ITransformationScriptTemplateStorage 
getTransformationScriptTemplateStorage();
+
+  IFunctionStateStorage getFunctionStateStorage();
 }
diff --git 
a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/function/IFunctionStateStorage.java
 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/function/IFunctionStateStorage.java
new file mode 100644
index 0000000000..1be7d41e13
--- /dev/null
+++ 
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/function/IFunctionStateStorage.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.storage.api.function;
+
+import org.apache.streampipes.model.function.FunctionState;
+import org.apache.streampipes.storage.api.core.CRUDStorage;
+
+public interface IFunctionStateStorage extends CRUDStorage<FunctionState> {
+}
diff --git 
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
index d7d07af389..1bf164bbfc 100644
--- 
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
+++ 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.storage.api.core.INoSqlStorage;
 import 
org.apache.streampipes.storage.api.explorer.IDataExplorerDashboardStorage;
 import org.apache.streampipes.storage.api.explorer.IDataExplorerWidgetStorage;
 import org.apache.streampipes.storage.api.explorer.IDataLakeMeasureStorage;
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
 import 
org.apache.streampipes.storage.api.pipeline.ICompactPipelineTemplateStorage;
 import org.apache.streampipes.storage.api.pipeline.IDataProcessorStorage;
 import org.apache.streampipes.storage.api.pipeline.IDataSinkStorage;
@@ -53,6 +54,7 @@ import 
org.apache.streampipes.storage.couchdb.impl.connect.AdapterInstanceStorag
 import 
org.apache.streampipes.storage.couchdb.impl.explorer.DataExplorerDashboardStorageImpl;
 import 
org.apache.streampipes.storage.couchdb.impl.explorer.DataExplorerWidgetStorageImpl;
 import 
org.apache.streampipes.storage.couchdb.impl.explorer.DataLakeMeasureStorage;
+import 
org.apache.streampipes.storage.couchdb.impl.function.FunctionStateStorageImpl;
 import 
org.apache.streampipes.storage.couchdb.impl.pipeline.CompactPipelineTemplateStorageImpl;
 import 
org.apache.streampipes.storage.couchdb.impl.pipeline.DataProcessorStorageImpl;
 import 
org.apache.streampipes.storage.couchdb.impl.pipeline.DataSinkStorageImpl;
@@ -234,4 +236,9 @@ public class CouchDbStorageManager implements INoSqlStorage 
{
   public ITransformationScriptTemplateStorage 
getTransformationScriptTemplateStorage() {
     return new TransformationScriptTemplateStorageImpl();
   }
+
+  @Override
+  public IFunctionStateStorage getFunctionStateStorage() {
+    return new FunctionStateStorageImpl();
+  }
 }
diff --git 
a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/function/FunctionStateStorageImpl.java
 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/function/FunctionStateStorageImpl.java
new file mode 100644
index 0000000000..488da13b31
--- /dev/null
+++ 
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/function/FunctionStateStorageImpl.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.storage.couchdb.impl.function;
+
+import org.apache.streampipes.model.function.FunctionState;
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
+import org.apache.streampipes.storage.couchdb.impl.core.DefaultCrudStorage;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
+
+public class FunctionStateStorageImpl extends 
DefaultCrudStorage<FunctionState> implements IFunctionStateStorage {
+
+  public FunctionStateStorageImpl() {
+    super(
+        () -> Utils.getCouchDbGsonClient("functions-state"),
+        FunctionState.class
+    );
+  }
+}
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java
index ff4326971e..caced6b421 100644
--- 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java
@@ -38,12 +38,14 @@ public class FunctionContext {
   private StreamPipesClient client;
   private String functionId;
   private ConfigExtractor config;
+  private final Map<Class<?>, StateStore<?>> stateStores;
 
   private Map<String, SpOutputCollector> outputCollectors;
   private IExtensionsLogger extensionsLogger;
 
   public FunctionContext() {
     this.streams = new HashMap<>();
+    this.stateStores = new HashMap<>();
   }
 
   public FunctionContext(String functionId,
@@ -87,4 +89,11 @@ public class FunctionContext {
   public Map<String, SpOutputCollector> getOutputCollectors() {
     return outputCollectors;
   }
+
+  public <T> StateStore<T> getStateStore(Class<T> stateClass) {
+    return (StateStore<T>) stateStores.computeIfAbsent(
+        stateClass,
+        key -> new FunctionStateStore<>(functionId, client, stateClass)
+    );
+  }
 }
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionStateStore.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionStateStore.java
new file mode 100644
index 0000000000..483d932efc
--- /dev/null
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionStateStore.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.wrapper.standalone.function;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class FunctionStateStore<T> implements StateStore<T> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FunctionStateStore.class);
+
+  private final String functionId;
+  private final StreamPipesClient client;
+  private final Class<T> stateClass;
+  private final ObjectMapper objectMapper;
+
+  public FunctionStateStore(String functionId,
+                            StreamPipesClient client,
+                            Class<T> stateClass) {
+    this.functionId = functionId;
+    this.client = client;
+    this.stateClass = stateClass;
+    this.objectMapper = JacksonSerializer.getObjectMapper();
+  }
+
+  @Override
+  public T load(T defaultState) {
+    try {
+      var functionState = client.adminApi().getFunctionState(functionId);
+      if (functionState.isPresent()) {
+        return objectMapper.convertValue(functionState.get(), stateClass);
+      } else {
+        return defaultState;
+      }
+    } catch (RuntimeException e) {
+      LOG.warn("Could not load function state for {}: {}", functionId, 
e.getMessage());
+      return defaultState;
+    }
+  }
+
+  @Override
+  public void persist(T state) {
+    try {
+      var payload = objectMapper.convertValue(state, new 
TypeReference<Map<String, Object>>() {
+      });
+      client.adminApi().persistFunctionState(functionId, payload);
+    } catch (RuntimeException e) {
+      LOG.warn("Could not persist function state for {}: {}", functionId, 
e.getMessage());
+    }
+  }
+}
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StateStore.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StateStore.java
new file mode 100644
index 0000000000..59b6068c84
--- /dev/null
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StateStore.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.wrapper.standalone.function;
+
+public interface StateStore<T> {
+
+  T load(T defaultState);
+
+  void persist(T state);
+}

Reply via email to