This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 6a1ff1daaa feat(#4185:) add persistent state support for streampipe
functions (#4186)
6a1ff1daaa is described below
commit 6a1ff1daaa40e48c771c0056650629f757976366
Author: Philipp Zehnder <[email protected]>
AuthorDate: Tue Feb 24 08:14:30 2026 +0100
feat(#4185:) add persistent state support for streampipe functions (#4186)
---
.../apache/streampipes/client/api/IAdminApi.java | 6 ++
.../apache/streampipes/client/api/AdminApi.java | 18 ++++
.../commons/constants/GenericDocTypes.java | 1 +
.../api/declarer/IStreamPipesFunctionDeclarer.java | 6 ++
.../function/FunctionDeregistrationHandler.java | 0
.../function/FunctionRegistrationHandler.java | 0
.../extensions/function/RegistrationHandler.java | 0
.../function/StreamPipesFunctionHandler.java | 11 ++-
.../model/function/FunctionShutdownResult.java | 33 +++++--
.../streampipes/model/function/FunctionState.java | 86 ++++++++++++++++++
.../model/function/FunctionsShutdownResponse.java | 22 +++--
.../execution/ExtensionServiceExecutions.java | 9 ++
.../manager/function/FunctionManager.java | 101 +++++++++++++++++++++
.../manager/setup/CouchDbInstallationStep.java | 2 +
.../setup/tasks/AddFunctionStateViewTask.java | 26 ++++--
.../function/FunctionShutdownResource.java | 40 ++++++++
.../streampipes/rest/impl/FunctionsResource.java | 41 +++++++++
streampipes-service-core/AGENTS.md | 2 +
.../service/core/StreamPipesCoreApplication.java | 12 ++-
.../core/migrations/AvailableMigrations.java | 4 +-
.../v099/AddFunctionStateViewMigration.java | 25 +++--
.../core/storage/StorageApiConfiguration.java | 17 ++--
.../StreamPipesExtensionsServiceBase.java | 1 -
.../api/function/IFunctionStateStorage.java | 16 +---
.../impl/function/FunctionStateStorageImpl.java | 21 +++--
.../standalone/function/FunctionContext.java | 37 ++++++++
.../standalone/function/FunctionStateStore.java | 86 ++++++++++++++++++
.../wrapper/standalone/function/StateStore.java | 15 +--
.../standalone/function/StreamPipesFunction.java | 29 ++++++
29 files changed, 597 insertions(+), 70 deletions(-)
diff --git
a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java
index ba776ce51c..11db107216 100644
---
a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java
+++
b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java
@@ -25,6 +25,8 @@ import
org.apache.streampipes.model.function.FunctionDefinition;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
public interface IAdminApi {
@@ -40,6 +42,10 @@ public interface IAdminApi {
void deregisterFunction(String functionId);
+ Optional<Map<String, Object>> getFunctionState(String functionId);
+
+ void persistFunctionState(String functionId, Map<String, Object> state);
+
void registerMigrations(List<ModelMigratorConfig> migrationConfigs, String
serviceId);
MessagingSettings getMessagingSettings();
diff --git
a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java
index 970eb62249..54c84798be 100644
---
a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java
+++
b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java
@@ -27,6 +27,8 @@ import org.apache.streampipes.model.message.SuccessMessage;
import org.apache.streampipes.model.migration.ModelMigratorConfig;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
public class AdminApi extends AbstractClientApi implements IAdminApi {
@@ -67,6 +69,18 @@ public class AdminApi extends AbstractClientApi implements
IAdminApi {
delete(getDeleteFunctionPath(functionId), SuccessMessage.class);
}
+ @Override
+ public Optional<Map<String, Object>> getFunctionState(String functionId) {
+ return getSingleOpt(getFunctionStatePath(functionId), Map.class)
+ .map(state -> (Map<String, Object>) state);
+ }
+
+ @Override
+ public void persistFunctionState(String functionId,
+ Map<String, Object> state) {
+ put(getFunctionStatePath(functionId), state);
+ }
+
/**
* Register migration configs {@link ModelMigratorConfig} at the StreamPipes
Core service.
* @param migrationConfigs list of migration configs to be registered
@@ -109,6 +123,10 @@ public class AdminApi extends AbstractClientApi implements
IAdminApi {
return getFunctionsPath().addToPath(functionId);
}
+ private StreamPipesApiPath getFunctionStatePath(String functionId) {
+ return getDeleteFunctionPath(functionId).addToPath("state");
+ }
+
private StreamPipesApiPath getMigrationPath() {
return StreamPipesApiPath
.fromBaseApiPath()
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
index 3ae70c5c37..91bceb816d 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
@@ -23,6 +23,7 @@ public class GenericDocTypes {
public static final String DOC_ASSET_MANAGEMENT = "asset-management";
public static final String DOC_ASSET_LINK_TYPE = "asset-link-type";
public static final String DOC_TRANSFORMATION_SCRIPT_TEMPLATE =
"transformation-script-template";
+ public static final String DOC_FUNCTION_STATE = "function-state";
public static final String DEFAULT_ASSET_DOC_ID = "default-asset";
diff --git
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
index cdf0036377..4f552cce33 100644
---
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++
b/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
@@ -19,6 +19,8 @@
package org.apache.streampipes.extensions.api.declarer;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
public interface IStreamPipesFunctionDeclarer {
@@ -30,4 +32,8 @@ public interface IStreamPipesFunctionDeclarer {
void discardRuntime();
+ default Optional<Map<String, Object>> getRegisteredStatePayload() {
+ return Optional.empty();
+ }
+
}
diff --git
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/FunctionDeregistrationHandler.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/FunctionDeregistrationHandler.java
similarity index 100%
rename from
streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/FunctionDeregistrationHandler.java
rename to
streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/FunctionDeregistrationHandler.java
diff --git
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/FunctionRegistrationHandler.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/FunctionRegistrationHandler.java
similarity index 100%
rename from
streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/FunctionRegistrationHandler.java
rename to
streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/FunctionRegistrationHandler.java
diff --git
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/RegistrationHandler.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/RegistrationHandler.java
similarity index 100%
rename from
streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/RegistrationHandler.java
rename to
streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/RegistrationHandler.java
diff --git
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
similarity index 88%
rename from
streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
rename to
streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
index a36d788a4d..8dd336d00e 100644
---
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java
@@ -24,8 +24,11 @@ import
org.apache.streampipes.extensions.api.pe.config.IDataStreamConfiguration;
import
org.apache.streampipes.extensions.management.client.StreamPipesClientResolver;
import org.apache.streampipes.extensions.management.init.DeclarersSingleton;
import org.apache.streampipes.model.function.FunctionDefinition;
+import org.apache.streampipes.model.function.FunctionShutdownResult;
+import org.apache.streampipes.model.function.FunctionsShutdownResponse;
import org.apache.streampipes.sdk.builder.stream.DataStreamConfiguration;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -92,11 +95,15 @@ public enum StreamPipesFunctionHandler {
});
}
- public void cleanupFunctions() {
+ public FunctionsShutdownResponse shutdownFunctionsAndGetState() {
+ var shutdownResults = new ArrayList<FunctionShutdownResult>();
this.runningInstances.forEach((key, value) -> {
value.discardRuntime();
+ shutdownResults.add(new FunctionShutdownResult(key,
value.getRegisteredStatePayload().orElse(null)));
});
- new FunctionDeregistrationHandler(getFunctionDefinitions()).run();
+ this.runningInstances.clear();
+
+ return new FunctionsShutdownResponse(shutdownResults);
}
private List<FunctionDefinition> getFunctionDefinitions() {
diff --git
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionShutdownResult.java
similarity index 54%
copy from
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionShutdownResult.java
index cdf0036377..c9fabcbf1b 100644
---
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionShutdownResult.java
@@ -16,18 +16,37 @@
*
*/
-package org.apache.streampipes.extensions.api.declarer;
+package org.apache.streampipes.model.function;
-import java.util.List;
+import java.util.Map;
-public interface IStreamPipesFunctionDeclarer {
+public class FunctionShutdownResult {
- IFunctionConfig getFunctionConfig();
+ private String functionId;
+ private Map<String, Object> state;
- List<String> requiredStreamIds();
+ public FunctionShutdownResult() {
+ }
- void invokeRuntime(String serviceGroup);
+ public FunctionShutdownResult(String functionId,
+ Map<String, Object> state) {
+ this.functionId = functionId;
+ this.state = state;
+ }
- void discardRuntime();
+ public String getFunctionId() {
+ return functionId;
+ }
+ public void setFunctionId(String functionId) {
+ this.functionId = functionId;
+ }
+
+ public Map<String, Object> getState() {
+ return state;
+ }
+
+ public void setState(Map<String, Object> state) {
+ this.state = state;
+ }
}
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionState.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionState.java
new file mode 100644
index 0000000000..12f99e4e88
--- /dev/null
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionState.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.function;
+
+import org.apache.streampipes.commons.constants.GenericDocTypes;
+import org.apache.streampipes.model.shared.api.Storable;
+
+import com.google.gson.annotations.SerializedName;
+
+import java.util.Map;
+
+public class FunctionState implements Storable {
+
+ public static final String APP_DOC_TYPE = GenericDocTypes.DOC_FUNCTION_STATE;
+
+ private final String appDocType = APP_DOC_TYPE;
+
+ protected @SerializedName("_rev") String rev;
+ private @SerializedName("_id") String functionId;
+ private Map<String, Object> state;
+
+ public FunctionState() {
+ }
+
+ public FunctionState(String functionId,
+ Map<String, Object> state) {
+ this.functionId = functionId;
+ this.state = state;
+ }
+
+ public String getFunctionId() {
+ return functionId;
+ }
+
+ public void setFunctionId(String functionId) {
+ this.functionId = functionId;
+ }
+
+ public Map<String, Object> getState() {
+ return state;
+ }
+
+ public void setState(Map<String, Object> state) {
+ this.state = state;
+ }
+
+ @Override
+ public String getRev() {
+ return rev;
+ }
+
+ @Override
+ public void setRev(String rev) {
+ this.rev = rev;
+ }
+
+ @Override
+ public String getElementId() {
+ return functionId;
+ }
+
+ @Override
+ public void setElementId(String elementId) {
+ this.functionId = elementId;
+ }
+
+ public String getAppDocType() {
+ return appDocType;
+ }
+}
diff --git
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionsShutdownResponse.java
similarity index 60%
copy from
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
copy to
streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionsShutdownResponse.java
index cdf0036377..802c1f978d 100644
---
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionsShutdownResponse.java
@@ -16,18 +16,28 @@
*
*/
-package org.apache.streampipes.extensions.api.declarer;
+package org.apache.streampipes.model.function;
+import java.util.ArrayList;
import java.util.List;
-public interface IStreamPipesFunctionDeclarer {
+public class FunctionsShutdownResponse {
- IFunctionConfig getFunctionConfig();
+ private List<FunctionShutdownResult> functions;
- List<String> requiredStreamIds();
+ public FunctionsShutdownResponse() {
+ this.functions = new ArrayList<>();
+ }
- void invokeRuntime(String serviceGroup);
+ public FunctionsShutdownResponse(List<FunctionShutdownResult> functions) {
+ this.functions = functions;
+ }
- void discardRuntime();
+ public List<FunctionShutdownResult> getFunctions() {
+ return functions;
+ }
+ public void setFunctions(List<FunctionShutdownResult> functions) {
+ this.functions = functions;
+ }
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
index e4aca6b6c0..2164b2776c 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/ExtensionServiceExecutions.java
@@ -35,6 +35,15 @@ public class ExtensionServiceExecutions {
.socketTimeout(10000);
}
+ public static Request extServicePostRequestAsServiceAdmin(String url) {
+ return Request
+ .Post(url)
+ .addHeader("Authorization",
AuthTokenUtils.getAuthTokenForUser(getServiceAdminSid()))
+ .addHeader("Accept", "application/json")
+ .connectTimeout(10000)
+ .socketTimeout(10000);
+ }
+
private static String getServiceAdminSid() {
return new
SpResourceManager().manageUsers().getServiceAdmin().getPrincipalId();
}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java
new file mode 100644
index 0000000000..691714faba
--- /dev/null
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/function/FunctionManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.manager.function;
+
+import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
+import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.function.FunctionState;
+import org.apache.streampipes.model.function.FunctionsShutdownResponse;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class FunctionManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FunctionManager.class);
+ private static final String FUNCTION_STOP_PATH = "/api/v1/functions/stop";
+
+ public static void stopAllFunctionsAndPersistState(IFunctionStateStorage
functionStateStorage) {
+ var extensions =
StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage().findAll();
+
+ LOG.info("Triggering function stop at {} extension services...",
extensions.size());
+ extensions.forEach(service -> {
+ var shutdownResponse = triggerFunctionStop(service);
+ if (shutdownResponse != null) {
+ persistReturnedFunctionStates(functionStateStorage, shutdownResponse);
+ }
+ });
+ }
+
+ private static FunctionsShutdownResponse
triggerFunctionStop(SpServiceRegistration service) {
+ var endpoint = service.getServiceUrl() + FUNCTION_STOP_PATH;
+
+ try {
+ LOG.info("Triggering function stop at {}", endpoint);
+ var response =
ExtensionServiceExecutions.extServicePostRequestAsServiceAdmin(endpoint).execute();
+ var httpResponse = response.returnResponse();
+ int statusCode = httpResponse.getStatusLine().getStatusCode();
+
+ if (statusCode >= 200 && statusCode < 300) {
+ LOG.debug("Function stop triggered at {} (HTTP {})",
service.getSvcId(), statusCode);
+ if (httpResponse.getEntity() == null) {
+ return null;
+ }
+
+ return JacksonSerializer.getObjectMapper().readValue(
+ EntityUtils.toString(httpResponse.getEntity(),
StandardCharsets.UTF_8),
+ FunctionsShutdownResponse.class
+ );
+ } else {
+ LOG.warn("Function stop request returned non-success status at {}
(HTTP {})",
+ service.getSvcId(), statusCode);
+ return null;
+ }
+ } catch (IOException e) {
+ LOG.warn("Could not trigger function stop at {}: {}", endpoint,
e.getMessage());
+ return null;
+ }
+ }
+
+ private static void persistReturnedFunctionStates(IFunctionStateStorage
functionStateStorage,
+ FunctionsShutdownResponse
shutdownResponse) {
+ if (shutdownResponse == null || shutdownResponse.getFunctions() == null) {
+ return;
+ }
+
+ shutdownResponse.getFunctions().forEach(functionResult -> {
+ if (functionResult.getState() != null) {
+ var existingFunctionState =
functionStateStorage.getElementById(functionResult.getFunctionId());
+ if (existingFunctionState != null) {
+ existingFunctionState.setState(functionResult.getState());
+ functionStateStorage.updateElement(existingFunctionState);
+ } else {
+ functionStateStorage.persist(new
FunctionState(functionResult.getFunctionId(), functionResult.getState()));
+ }
+ }
+ });
+ }
+}
diff --git
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
index 21ad1134ea..89fed8172a 100644
---
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/CouchDbInstallationStep.java
@@ -22,6 +22,7 @@ import
org.apache.streampipes.manager.setup.design.UserDesignDocument;
import org.apache.streampipes.manager.setup.tasks.AddAssetManagementViewTask;
import org.apache.streampipes.manager.setup.tasks.AddDataLakeMeasureViewTask;
import
org.apache.streampipes.manager.setup.tasks.AddDefaultPipelineTemplatesTask;
+import org.apache.streampipes.manager.setup.tasks.AddFunctionStateViewTask;
import org.apache.streampipes.manager.setup.tasks.AddScriptTemplateViewTask;
import org.apache.streampipes.manager.setup.tasks.CreateAssetLinkTypeTask;
import org.apache.streampipes.storage.couchdb.utils.Utils;
@@ -81,6 +82,7 @@ public class CouchDbInstallationStep extends InstallationStep
{
new AddDataLakeMeasureViewTask().execute();
new AddAssetManagementViewTask().execute();
new AddScriptTemplateViewTask().execute();
+ new AddFunctionStateViewTask().execute();
}
private void addNotificationView() {
diff --git
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/tasks/AddFunctionStateViewTask.java
similarity index 54%
copy from
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
copy to
streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/tasks/AddFunctionStateViewTask.java
index cdf0036377..7f5aae32c2 100644
---
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/setup/tasks/AddFunctionStateViewTask.java
@@ -16,18 +16,28 @@
*
*/
-package org.apache.streampipes.extensions.api.declarer;
+package org.apache.streampipes.manager.setup.tasks;
-import java.util.List;
+import org.apache.streampipes.commons.constants.GenericDocTypes;
-public interface IStreamPipesFunctionDeclarer {
+public class AddFunctionStateViewTask extends
AbstractAddGenericStorageViewTask {
- IFunctionConfig getFunctionConfig();
+ public static final String DESIGN_DOCUMENT = "_design/function-states";
+ public static final String VIEW_NAME = "all-function-states";
- List<String> requiredStreamIds();
+ @Override
+ public String getDesignDocument() {
+ return DESIGN_DOCUMENT;
+ }
- void invokeRuntime(String serviceGroup);
-
- void discardRuntime();
+ @Override
+ public String getViewName() {
+ return VIEW_NAME;
+ }
+ @Override
+ public String getMapFunction() {
+ return String.format("function(doc) { if(doc.appDocType === '%s') {
emit(doc._id, doc); } }",
+ GenericDocTypes.DOC_FUNCTION_STATE);
+ }
}
diff --git
a/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/function/FunctionShutdownResource.java
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/function/FunctionShutdownResource.java
new file mode 100644
index 0000000000..fd66fb8f70
--- /dev/null
+++
b/streampipes-rest-extensions/src/main/java/org/apache/streampipes/rest/extensions/function/FunctionShutdownResource.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.rest.extensions.function;
+
+import org.apache.streampipes.model.function.FunctionsShutdownResponse;
+import org.apache.streampipes.rest.extensions.AbstractExtensionsResource;
+import
org.apache.streampipes.service.extensions.function.StreamPipesFunctionHandler;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping("api/v1/functions")
+public class FunctionShutdownResource extends AbstractExtensionsResource {
+
+ @PostMapping(path = "stop", produces = MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity<FunctionsShutdownResponse> shutdownFunctions() {
+ var shutdownResponse =
StreamPipesFunctionHandler.INSTANCE.shutdownFunctionsAndGetState();
+ return ok(shutdownResponse);
+ }
+}
diff --git
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java
index 6164bbecc8..56960a9a81 100644
---
a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java
+++
b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java
@@ -21,29 +21,41 @@ package org.apache.streampipes.rest.impl;
import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider;
import org.apache.streampipes.manager.function.FunctionRegistrationService;
import org.apache.streampipes.model.function.FunctionDefinition;
+import org.apache.streampipes.model.function.FunctionState;
import org.apache.streampipes.model.message.Notifications;
import org.apache.streampipes.model.message.SuccessMessage;
import org.apache.streampipes.model.monitoring.SpLogEntry;
import org.apache.streampipes.model.monitoring.SpMetricsEntry;
import
org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
+import org.apache.streampipes.rest.shared.exception.SpMessageException;
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
+import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
@RestController
@RequestMapping("/api/v2/functions")
public class FunctionsResource extends AbstractAuthGuardedRestResource {
+ private final IFunctionStateStorage functionStateStorage;
+
+ public FunctionsResource(IFunctionStateStorage functionStateStorage) {
+ this.functionStateStorage = functionStateStorage;
+ }
+
@GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<Collection<FunctionDefinition>> getActiveFunctions() {
return ok(FunctionRegistrationService.INSTANCE.getAllFunctions());
@@ -78,4 +90,33 @@ public class FunctionsResource extends
AbstractAuthGuardedRestResource {
public ResponseEntity<List<SpLogEntry>>
getFunctionLogs(@PathVariable("functionId") String functionId) {
return
ok(ExtensionsLogProvider.INSTANCE.getLogInfosForResource(functionId));
}
+
+ @GetMapping(path = "{functionId}/state", produces =
MediaType.APPLICATION_JSON_VALUE)
+ public ResponseEntity<Map<String, Object>>
getFunctionState(@PathVariable("functionId") String functionId) {
+ var functionState = functionStateStorage.getElementById(functionId);
+ if (functionState != null) {
+ return ok(functionState.getState());
+ } else {
+ throw new SpMessageException(HttpStatus.NOT_FOUND,
Notifications.error("Function state not found"));
+ }
+ }
+
+ @PutMapping(
+ path = "{functionId}/state",
+ produces = MediaType.APPLICATION_JSON_VALUE,
+ consumes = MediaType.APPLICATION_JSON_VALUE
+ )
+ public ResponseEntity<SuccessMessage>
persistFunctionState(@PathVariable("functionId") String functionId,
+ @RequestBody
Map<String, Object> state) {
+ var existingFunctionState =
functionStateStorage.getElementById(functionId);
+
+ if (existingFunctionState != null) {
+ existingFunctionState.setState(state);
+ functionStateStorage.updateElement(existingFunctionState);
+ } else {
+ functionStateStorage.persist(new FunctionState(functionId, state));
+ }
+
+ return ok(Notifications.success("Function state successfully persisted"));
+ }
}
diff --git a/streampipes-service-core/AGENTS.md
b/streampipes-service-core/AGENTS.md
index bc9965808a..453a644ae3 100644
--- a/streampipes-service-core/AGENTS.md
+++ b/streampipes-service-core/AGENTS.md
@@ -16,6 +16,8 @@ Applies to `streampipes-service-core/`.
## Best Practices
- Keep this module orchestration-focused; domain behavior belongs in
management modules.
- New migrations must be idempotent and explicitly registered.
+- In `migrations/AvailableMigrations`, always append new migrations at the end
of the list.
+- Do not insert new migrations between existing entries to avoid conflicting
migration order.
- Preserve existing startup behavior for OAuth and non-OAuth deployments.
- Avoid introducing long-running/blocking work on startup thread paths.
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
index a893025c12..ed3859743f 100644
---
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java
@@ -29,6 +29,7 @@ import
org.apache.streampipes.health.monitoring.ResourceProvider;
import org.apache.streampipes.health.monitoring.ServiceHealthCheck;
import org.apache.streampipes.loadbalance.LoadManager;
import
org.apache.streampipes.loadbalance.pipeline.ExtensionsServiceLogExecutor;
+import org.apache.streampipes.manager.function.FunctionManager;
import org.apache.streampipes.manager.health.CoreInitialInstallationProgress;
import org.apache.streampipes.manager.health.CoreServiceStatusManager;
import org.apache.streampipes.manager.pipeline.PipelineManager;
@@ -51,6 +52,8 @@ import
org.apache.streampipes.service.base.StreamPipesServiceBase;
import org.apache.streampipes.service.core.migrations.AvailableMigrations;
import org.apache.streampipes.service.core.migrations.Migration;
import org.apache.streampipes.service.core.migrations.MigrationsHandler;
+import org.apache.streampipes.service.core.storage.StorageApiConfiguration;
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
import org.apache.streampipes.storage.api.pipeline.IPipelineStorage;
import org.apache.streampipes.storage.api.system.ISpCoreConfigurationStorage;
import org.apache.streampipes.storage.couchdb.impl.user.UserStorage;
@@ -59,6 +62,7 @@ import
org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@@ -78,7 +82,8 @@ import java.util.function.Supplier;
@EnableAutoConfiguration
@EnableScheduling
@Import({OpenApiConfiguration.class, StreamPipesPasswordEncoder.class,
- StreamPipesPrometheusConfig.class, WebSecurityConfig.class,
WelcomePageController.class})
+ StreamPipesPrometheusConfig.class, WebSecurityConfig.class,
WelcomePageController.class,
+ StorageApiConfiguration.class})
@ComponentScan({"org.apache.streampipes.rest.*",
"org.apache.streampipes.service.core.oauth2",
"org.apache.streampipes.service.core.scheduler"})
public class StreamPipesCoreApplication extends StreamPipesServiceBase {
@@ -92,6 +97,9 @@ public class StreamPipesCoreApplication extends
StreamPipesServiceBase {
private final CoreServiceStatusManager coreStatusManager =
new CoreServiceStatusManager(coreConfigStorage);
+ @Autowired
+ private IFunctionStateStorage functionStateStorage;
+
public static void main(String[] args) {
StreamPipesCoreApplication application = new StreamPipesCoreApplication();
application.initialize(() -> List.of(new SpNatsProtocolFactory(), new
SpKafkaProtocolFactory(),
@@ -233,6 +241,8 @@ public class StreamPipesCoreApplication extends
StreamPipesServiceBase {
}
});
+ FunctionManager.stopAllFunctionsAndPersistState(functionStateStorage);
+
LOG.info("Thanks for using Apache StreamPipes - see you next time!");
}
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java
index b79f15b186..b09d4d7ec6 100644
---
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/AvailableMigrations.java
@@ -30,6 +30,7 @@ import
org.apache.streampipes.service.core.migrations.v0980.FixImportedPermissio
import
org.apache.streampipes.service.core.migrations.v0980.ModifyAssetLinkTypesMigration;
import
org.apache.streampipes.service.core.migrations.v0980.ModifyAssetLinksMigration;
import
org.apache.streampipes.service.core.migrations.v099.AddAssetManagementViewMigration;
+import
org.apache.streampipes.service.core.migrations.v099.AddFunctionStateViewMigration;
import
org.apache.streampipes.service.core.migrations.v099.AddScriptTemplateViewMigration;
import
org.apache.streampipes.service.core.migrations.v099.ComputeCertificateThumbprintMigration;
import
org.apache.streampipes.service.core.migrations.v099.CreateAssetPermissionMigration;
@@ -80,7 +81,8 @@ public class AvailableMigrations {
new ComputeCertificateThumbprintMigration(),
new MigrateAdaptersToUseScript(),
new ModifyAssetLinkIconMigration(),
- new RemoveDuplicatedAssetPermissions()
+ new RemoveDuplicatedAssetPermissions(),
+ new AddFunctionStateViewMigration()
);
}
}
diff --git
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/AddFunctionStateViewMigration.java
similarity index 54%
copy from
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
copy to
streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/AddFunctionStateViewMigration.java
index cdf0036377..d96ec471d7 100644
---
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v099/AddFunctionStateViewMigration.java
@@ -16,18 +16,27 @@
*
*/
-package org.apache.streampipes.extensions.api.declarer;
+package org.apache.streampipes.service.core.migrations.v099;
-import java.util.List;
+import org.apache.streampipes.manager.setup.tasks.AddFunctionStateViewTask;
+import
org.apache.streampipes.service.core.migrations.templates.AddGenericStorageViewMigration;
-public interface IStreamPipesFunctionDeclarer {
+import java.io.IOException;
- IFunctionConfig getFunctionConfig();
+public class AddFunctionStateViewMigration extends
AddGenericStorageViewMigration {
- List<String> requiredStreamIds();
+ @Override
+ public String getDesignDocumentName() {
+ return AddFunctionStateViewTask.DESIGN_DOCUMENT;
+ }
- void invokeRuntime(String serviceGroup);
-
- void discardRuntime();
+ @Override
+ public String getViewName() {
+ return AddFunctionStateViewTask.VIEW_NAME;
+ }
+ @Override
+ public void executeMigration() throws IOException {
+ new AddFunctionStateViewTask().execute();
+ }
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/storage/StorageApiConfiguration.java
similarity index 62%
copy from
streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
copy to
streampipes-service-core/src/main/java/org/apache/streampipes/service/core/storage/StorageApiConfiguration.java
index 3ae70c5c37..ae16f2e6ea 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/storage/StorageApiConfiguration.java
@@ -16,14 +16,19 @@
*
*/
-package org.apache.streampipes.commons.constants;
+package org.apache.streampipes.service.core.storage;
-public class GenericDocTypes {
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
+import
org.apache.streampipes.storage.couchdb.impl.function.FunctionStateStorageImpl;
- public static final String DOC_ASSET_MANAGEMENT = "asset-management";
- public static final String DOC_ASSET_LINK_TYPE = "asset-link-type";
- public static final String DOC_TRANSFORMATION_SCRIPT_TEMPLATE =
"transformation-script-template";
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+@Configuration
+public class StorageApiConfiguration {
- public static final String DEFAULT_ASSET_DOC_ID = "default-asset";
+ @Bean
+ public IFunctionStateStorage functionStateStorage() {
+ return new FunctionStateStorageImpl();
+ }
}
diff --git
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
index b95136efbc..80e8ffde21 100644
---
a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
+++
b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java
@@ -190,7 +190,6 @@ public abstract class StreamPipesExtensionsServiceBase
extends StreamPipesServic
@PreDestroy
public void onExit() {
new ExtensionsServiceShutdownHandler().onShutdown();
- StreamPipesFunctionHandler.INSTANCE.cleanupFunctions();
deregisterService(DeclarersSingleton.getInstance().getServiceId());
}
diff --git
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/function/IFunctionStateStorage.java
similarity index 74%
copy from
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
copy to
streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/function/IFunctionStateStorage.java
index cdf0036377..1be7d41e13 100644
---
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++
b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/function/IFunctionStateStorage.java
@@ -16,18 +16,10 @@
*
*/
-package org.apache.streampipes.extensions.api.declarer;
+package org.apache.streampipes.storage.api.function;
-import java.util.List;
-
-public interface IStreamPipesFunctionDeclarer {
-
- IFunctionConfig getFunctionConfig();
-
- List<String> requiredStreamIds();
-
- void invokeRuntime(String serviceGroup);
-
- void discardRuntime();
+import org.apache.streampipes.model.function.FunctionState;
+import org.apache.streampipes.storage.api.core.CRUDStorage;
+public interface IFunctionStateStorage extends CRUDStorage<FunctionState> {
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/function/FunctionStateStorageImpl.java
similarity index 55%
copy from
streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
copy to
streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/function/FunctionStateStorageImpl.java
index 3ae70c5c37..5720c77de3 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/GenericDocTypes.java
+++
b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/function/FunctionStateStorageImpl.java
@@ -16,14 +16,21 @@
*
*/
-package org.apache.streampipes.commons.constants;
+package org.apache.streampipes.storage.couchdb.impl.function;
-public class GenericDocTypes {
+import org.apache.streampipes.model.function.FunctionState;
+import org.apache.streampipes.storage.api.function.IFunctionStateStorage;
+import org.apache.streampipes.storage.couchdb.impl.core.DefaultViewCrudStorage;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
- public static final String DOC_ASSET_MANAGEMENT = "asset-management";
- public static final String DOC_ASSET_LINK_TYPE = "asset-link-type";
- public static final String DOC_TRANSFORMATION_SCRIPT_TEMPLATE =
"transformation-script-template";
+public class FunctionStateStorageImpl extends
DefaultViewCrudStorage<FunctionState>
+ implements IFunctionStateStorage {
-
- public static final String DEFAULT_ASSET_DOC_ID = "default-asset";
+ public FunctionStateStorageImpl() {
+ super(
+ () -> Utils.getCouchDbGsonClient("genericstorage"),
+ FunctionState.class,
+ "function-states/all-function-states"
+ );
+ }
}
diff --git
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java
index ff4326971e..2d9709377a 100644
---
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java
+++
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java
@@ -30,6 +30,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
public class FunctionContext {
@@ -38,12 +40,15 @@ public class FunctionContext {
private StreamPipesClient client;
private String functionId;
private ConfigExtractor config;
+ private final Map<Class<?>, StateStore<?>> stateStores;
+ private Supplier<Object> stateSupplier;
private Map<String, SpOutputCollector> outputCollectors;
private IExtensionsLogger extensionsLogger;
public FunctionContext() {
this.streams = new HashMap<>();
+ this.stateStores = new HashMap<>();
}
public FunctionContext(String functionId,
@@ -87,4 +92,36 @@ public class FunctionContext {
public Map<String, SpOutputCollector> getOutputCollectors() {
return outputCollectors;
}
+
+ public <T> StateStore<T> getStateStore(Class<T> stateClass) {
+ return (StateStore<T>) stateStores.computeIfAbsent(
+ stateClass,
+ key -> new FunctionStateStore<>(functionId, client, stateClass)
+ );
+ }
+
+ public void registerStateSupplier(Supplier<Object> stateSupplier) {
+ this.stateSupplier = stateSupplier;
+ }
+
+ public Optional<Object> getRegisteredState() {
+ if (stateSupplier != null) {
+ return Optional.ofNullable(stateSupplier.get());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public Optional<Map<String, Object>> getPersistedStatePayload() {
+ return stateStores
+ .values()
+ .stream()
+ .filter(FunctionStateStore.class::isInstance)
+ .map(FunctionStateStore.class::cast)
+ .map(FunctionStateStore::getPersistedStatePayload)
+ .filter(payload -> payload != null)
+ .map(payload -> (Map<String, Object>) payload)
+ .findFirst();
+ }
}
diff --git
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionStateStore.java
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionStateStore.java
new file mode 100644
index 0000000000..8bdfccca55
--- /dev/null
+++
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionStateStore.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.wrapper.standalone.function;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FunctionStateStore<T> implements StateStore<T> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FunctionStateStore.class);
+ private static final String STATE_PAYLOAD_KEY = "payload";
+
+ private final String functionId;
+ private final StreamPipesClient client;
+ private final Class<T> stateClass;
+ private final ObjectMapper objectMapper;
+ private Map<String, Object> persistedStatePayload;
+
+ public FunctionStateStore(String functionId,
+ StreamPipesClient client,
+ Class<T> stateClass) {
+ this.functionId = functionId;
+ this.client = client;
+ this.stateClass = stateClass;
+ this.objectMapper = JacksonSerializer.getObjectMapper();
+ }
+
+ @Override
+ public T load(T defaultState) {
+ try {
+ var functionState = client.adminApi().getFunctionState(functionId);
+ if (functionState.isPresent()) {
+ var persistedState = functionState.get();
+ if (persistedState.containsKey(STATE_PAYLOAD_KEY)) {
+ return
objectMapper.convertValue(persistedState.get(STATE_PAYLOAD_KEY), stateClass);
+ } else {
+ LOG.warn("Could not load function state for {}: missing payload
key", functionId);
+ return defaultState;
+ }
+ } else {
+ return defaultState;
+ }
+ } catch (RuntimeException e) {
+ LOG.warn("Could not load function state for {}: {}", functionId,
e.getMessage());
+ return defaultState;
+ }
+ }
+
+ @Override
+ public void persist(T state) {
+ try {
+ Map<String, Object> payload = new HashMap<>();
+ payload.put(STATE_PAYLOAD_KEY, objectMapper.convertValue(state,
Object.class));
+ this.persistedStatePayload = payload;
+ } catch (RuntimeException e) {
+ LOG.warn("Could not persist function state for {}: {}", functionId,
e.getMessage());
+ }
+ }
+
+ public Map<String, Object> getPersistedStatePayload() {
+ return persistedStatePayload;
+ }
+}
diff --git
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StateStore.java
similarity index 74%
copy from
streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
copy to
streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StateStore.java
index cdf0036377..59b6068c84 100644
---
a/streampipes-extensions-api/src/main/java/org/apache/streampipes/extensions/api/declarer/IStreamPipesFunctionDeclarer.java
+++
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StateStore.java
@@ -16,18 +16,11 @@
*
*/
-package org.apache.streampipes.extensions.api.declarer;
+package org.apache.streampipes.wrapper.standalone.function;
-import java.util.List;
+public interface StateStore<T> {
-public interface IStreamPipesFunctionDeclarer {
-
- IFunctionConfig getFunctionConfig();
-
- List<String> requiredStreamIds();
-
- void invokeRuntime(String serviceGroup);
-
- void discardRuntime();
+ T load(T defaultState);
+ void persist(T state);
}
diff --git
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index 82e58d0587..fadf1fdff7 100644
---
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -38,6 +38,7 @@ import org.apache.streampipes.model.runtime.EventFactory;
import org.apache.streampipes.model.runtime.SchemaInfo;
import org.apache.streampipes.model.runtime.SourceInfo;
import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.wrapper.standalone.manager.ProtocolManager;
import org.slf4j.Logger;
@@ -47,6 +48,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
public abstract class StreamPipesFunction implements
IStreamPipesFunctionDeclarer, RawDataProcessor {
@@ -55,6 +57,7 @@ public abstract class StreamPipesFunction implements
IStreamPipesFunctionDeclare
private final Map<String, SourceInfo> sourceInfoMapper;
private final Map<String, SchemaInfo> schemaInfoMapper;
private Map<String, SpInputCollector> inputCollectors;
+ private FunctionContext functionContext;
private Map<String, SpOutputCollector> outputCollectors;
@@ -76,6 +79,7 @@ public abstract class StreamPipesFunction implements
IStreamPipesFunctionDeclare
this.requiredStreamIds(),
this.outputCollectors
).generate();
+ this.functionContext = context;
// Creates a source info for each incoming SpDataStream
// The index is used to create the selector prefix for the SourceInfo
@@ -215,4 +219,29 @@ public abstract class StreamPipesFunction implements
IStreamPipesFunctionDeclare
public abstract void onServiceStopped();
+ @Override
+ public Optional<Map<String, Object>> getRegisteredStatePayload() {
+ if (functionContext == null) {
+ return Optional.empty();
+ }
+
+ try {
+ var registeredState = functionContext.getRegisteredState().map(state -> {
+ Map<String, Object> payload = new HashMap<>();
+ payload.put("payload",
JacksonSerializer.getObjectMapper().convertValue(state, Object.class));
+ return payload;
+ });
+
+ if (registeredState.isPresent()) {
+ return registeredState;
+ }
+
+ return functionContext.getPersistedStatePayload();
+ } catch (RuntimeException e) {
+ LOG.warn("Could not collect registered state for function {}: {}",
getFunctionConfig().getFunctionId().getId(),
+ e.getMessage());
+ return Optional.empty();
+ }
+ }
+
}