This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch 4185-add-persistent-state-support-for-streampipe-functions in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 65e26fd4da305ab9a29367f5b6f20faf45bf1f8d Author: Philipp Zehnder <[email protected]> AuthorDate: Fri Feb 20 20:42:47 2026 +0100 feat(#4185): First version of function state workflow --- .../apache/streampipes/client/api/IAdminApi.java | 6 ++ .../apache/streampipes/client/api/AdminApi.java | 18 +++++ .../streampipes/model/function/FunctionState.java | 79 ++++++++++++++++++++++ .../streampipes/rest/impl/FunctionsResource.java | 37 ++++++++++ .../service/core/StreamPipesCoreApplication.java | 42 ++++++++++++ .../function/FunctionShutdownResource.java | 44 ++++++++++++ .../StreamPipesExtensionsServiceBase.java | 1 - .../function/StreamPipesFunctionHandler.java | 8 ++- .../storage/api/core/INoSqlStorage.java | 3 + .../api/function/IFunctionStateStorage.java | 25 +++++++ .../storage/couchdb/CouchDbStorageManager.java | 7 ++ .../impl/function/FunctionStateStorageImpl.java | 34 ++++++++++ .../standalone/function/FunctionContext.java | 9 +++ .../standalone/function/FunctionStateStore.java | 74 ++++++++++++++++++++ .../wrapper/standalone/function/StateStore.java | 26 +++++++ 15 files changed, 411 insertions(+), 2 deletions(-) diff --git a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java index ba776ce51c..11db107216 100644 --- a/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java +++ b/streampipes-client-api/src/main/java/org/apache/streampipes/client/api/IAdminApi.java @@ -25,6 +25,8 @@ import org.apache.streampipes.model.function.FunctionDefinition; import org.apache.streampipes.model.migration.ModelMigratorConfig; import java.util.List; +import java.util.Map; +import java.util.Optional; public interface IAdminApi { @@ -40,6 +42,10 @@ public interface IAdminApi { void deregisterFunction(String functionId); + Optional<Map<String, Object>> getFunctionState(String functionId); + + void persistFunctionState(String functionId, Map<String, Object> state); + void registerMigrations(List<ModelMigratorConfig> migrationConfigs, String serviceId); MessagingSettings getMessagingSettings(); diff --git a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java index 970eb62249..54c84798be 100644 --- a/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java +++ b/streampipes-client/src/main/java/org/apache/streampipes/client/api/AdminApi.java @@ -27,6 +27,8 @@ import org.apache.streampipes.model.message.SuccessMessage; import org.apache.streampipes.model.migration.ModelMigratorConfig; import java.util.List; +import java.util.Map; +import java.util.Optional; public class AdminApi extends AbstractClientApi implements IAdminApi { @@ -67,6 +69,18 @@ public class AdminApi extends AbstractClientApi implements IAdminApi { delete(getDeleteFunctionPath(functionId), SuccessMessage.class); } + @Override + public Optional<Map<String, Object>> getFunctionState(String functionId) { + return getSingleOpt(getFunctionStatePath(functionId), Map.class) + .map(state -> (Map<String, Object>) state); + } + + @Override + public void persistFunctionState(String functionId, + Map<String, Object> state) { + put(getFunctionStatePath(functionId), state); + } + /** * Register migration configs {@link ModelMigratorConfig} at the StreamPipes Core service. * @param migrationConfigs list of migration configs to be registered @@ -109,6 +123,10 @@ public class AdminApi extends AbstractClientApi implements IAdminApi { return getFunctionsPath().addToPath(functionId); } + private StreamPipesApiPath getFunctionStatePath(String functionId) { + return getDeleteFunctionPath(functionId).addToPath("state"); + } + private StreamPipesApiPath getMigrationPath() { return StreamPipesApiPath .fromBaseApiPath() diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionState.java b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionState.java new file mode 100644 index 0000000000..9e607c71d6 --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/function/FunctionState.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.function; + +import org.apache.streampipes.model.shared.annotation.TsModel; +import org.apache.streampipes.model.shared.api.Storable; + +import com.google.gson.annotations.SerializedName; + +import java.util.Map; + +@TsModel +public class FunctionState implements Storable { + + protected @SerializedName("_rev") String rev; + private @SerializedName("_id") String functionId; + private Map<String, Object> state; + + public FunctionState() { + } + + public FunctionState(String functionId, + Map<String, Object> state) { + this.functionId = functionId; + this.state = state; + } + + public String getFunctionId() { + return functionId; + } + + public void setFunctionId(String functionId) { + this.functionId = functionId; + } + + public Map<String, Object> getState() { + return state; + } + + public void setState(Map<String, Object> state) { + this.state = state; + } + + @Override + public String getRev() { + return rev; + } + + @Override + public void setRev(String rev) { + this.rev = rev; + } + + @Override + public String getElementId() { + return functionId; + } + + @Override + public void setElementId(String elementId) { + this.functionId = elementId; + } +} diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java index 6164bbecc8..d3a1eb16b7 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/FunctionsResource.java @@ -20,30 +20,38 @@ package org.apache.streampipes.rest.impl; import org.apache.streampipes.loadbalance.pipeline.ExtensionsLogProvider; import org.apache.streampipes.manager.function.FunctionRegistrationService; +import org.apache.streampipes.model.function.FunctionState; import org.apache.streampipes.model.function.FunctionDefinition; import org.apache.streampipes.model.message.Notifications; import org.apache.streampipes.model.message.SuccessMessage; import org.apache.streampipes.model.monitoring.SpLogEntry; import org.apache.streampipes.model.monitoring.SpMetricsEntry; import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource; +import org.apache.streampipes.rest.shared.exception.SpMessageException; +import org.apache.streampipes.storage.api.function.IFunctionStateStorage; +import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.PutMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Collection; import java.util.List; +import java.util.Map; @RestController @RequestMapping("/api/v2/functions") public class FunctionsResource extends AbstractAuthGuardedRestResource { + private final IFunctionStateStorage functionStateStorage = getNoSqlStorage().getFunctionStateStorage(); + @GetMapping(produces = MediaType.APPLICATION_JSON_VALUE) public ResponseEntity<Collection<FunctionDefinition>> getActiveFunctions() { return ok(FunctionRegistrationService.INSTANCE.getAllFunctions()); @@ -78,4 +86,33 @@ public class FunctionsResource extends AbstractAuthGuardedRestResource { public ResponseEntity<List<SpLogEntry>> getFunctionLogs(@PathVariable("functionId") String functionId) { return ok(ExtensionsLogProvider.INSTANCE.getLogInfosForResource(functionId)); } + + @GetMapping(path = "{functionId}/state", produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity<Map<String, Object>> getFunctionState(@PathVariable("functionId") String functionId) { + var functionState = functionStateStorage.getElementById(functionId); + if (functionState != null) { + return ok(functionState.getState()); + } else { + throw new SpMessageException(HttpStatus.NOT_FOUND, Notifications.error("Function state not found")); + } + } + + @PutMapping( + path = "{functionId}/state", + produces = MediaType.APPLICATION_JSON_VALUE, + consumes = MediaType.APPLICATION_JSON_VALUE + ) + public ResponseEntity<SuccessMessage> persistFunctionState(@PathVariable("functionId") String functionId, + @RequestBody Map<String, Object> state) { + var existingFunctionState = functionStateStorage.getElementById(functionId); + + if (existingFunctionState != null) { + existingFunctionState.setState(state); + functionStateStorage.updateElement(existingFunctionState); + } else { + functionStateStorage.persist(new FunctionState(functionId, state)); + } + + return ok(Notifications.success("Function state successfully persisted")); + } } diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java index a893025c12..278821abf0 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java @@ -35,6 +35,7 @@ import org.apache.streampipes.manager.pipeline.PipelineManager; import org.apache.streampipes.manager.setup.AutoInstallation; import org.apache.streampipes.manager.setup.StreamPipesEnvChecker; import org.apache.streampipes.manager.setup.tasks.ApplyDefaultRolesAndPrivilegesTask; +import org.apache.streampipes.manager.util.AuthTokenUtils; import org.apache.streampipes.messaging.SpProtocolManager; import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory; import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory; @@ -42,6 +43,7 @@ import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory; import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory; import org.apache.streampipes.messaging.pulsar.SpPulsarProtocolFactory; import org.apache.streampipes.model.configuration.SpCoreConfigurationStatus; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.pipeline.Pipeline; import org.apache.streampipes.model.pipeline.PipelineOperationStatus; import org.apache.streampipes.resource.management.SpResourceManager; @@ -57,6 +59,8 @@ import org.apache.streampipes.storage.couchdb.impl.user.UserStorage; import org.apache.streampipes.storage.couchdb.utils.CouchDbViewGenerator; import org.apache.streampipes.storage.management.StorageDispatcher; +import org.apache.http.client.fluent.Request; +import org.apache.http.client.fluent.Response; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -68,6 +72,7 @@ import org.springframework.scheduling.annotation.EnableScheduling; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; +import java.io.IOException; import java.net.UnknownHostException; import java.util.List; import java.util.concurrent.Executors; @@ -85,6 +90,7 @@ public class StreamPipesCoreApplication extends StreamPipesServiceBase { private static final Logger LOG = LoggerFactory.getLogger(StreamPipesCoreApplication.class.getCanonicalName()); + private static final String FUNCTION_SHUTDOWN_PATH = "/api/v1/functions/shutdown"; private final ISpCoreConfigurationStorage coreConfigStorage = StorageDispatcher.INSTANCE.getNoSqlStore().getSpCoreConfigurationStorage(); @@ -233,9 +239,45 @@ public class StreamPipesCoreApplication extends StreamPipesServiceBase { } }); + triggerExtensionFunctionShutdown(); + LOG.info("Thanks for using Apache StreamPipes - see you next time!"); } + private void triggerExtensionFunctionShutdown() { + var extensions = StorageDispatcher.INSTANCE.getNoSqlStore().getExtensionsServiceStorage().findAll(); + var serviceAdmin = new SpResourceManager().manageUsers().getServiceAdmin(); + var authToken = AuthTokenUtils.getAuthTokenForUser(serviceAdmin); + + LOG.info("Triggering function shutdown at {} extension services...", extensions.size()); + extensions.forEach(service -> triggerFunctionShutdown(service, authToken)); + } + + private void triggerFunctionShutdown(SpServiceRegistration service, + String authToken) { + var endpoint = service.getServiceUrl() + FUNCTION_SHUTDOWN_PATH; + + try { + LOG.info("Triggering function shutdown at {}", endpoint); + Response response = Request + .Post(endpoint) + .addHeader("Authorization", authToken) + .connectTimeout(5000) + .socketTimeout(10000) + .execute(); + int statusCode = response.returnResponse().getStatusLine().getStatusCode(); + + if (statusCode >= 200 && statusCode < 300) { + LOG.debug("Function shutdown triggered at {} (HTTP {})", service.getSvcId(), statusCode); + } else { + LOG.warn("Function shutdown request returned non-success status at {} (HTTP {})", + service.getSvcId(), statusCode); + } + } catch (IOException e) { + LOG.warn("Could not trigger function shutdown at {}: {}", endpoint, e.getMessage()); + } + } + private List<Pipeline> getAllPipelines() { return getPipelineStorage().findAll(); } diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/rest/extensions/function/FunctionShutdownResource.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/rest/extensions/function/FunctionShutdownResource.java new file mode 100644 index 0000000000..877c6137b1 --- /dev/null +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/rest/extensions/function/FunctionShutdownResource.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.rest.extensions.function; + +import org.apache.streampipes.model.Response; +import org.apache.streampipes.rest.extensions.AbstractExtensionsResource; +import org.apache.streampipes.service.extensions.function.StreamPipesFunctionHandler; + +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("api/v1/functions") +public class FunctionShutdownResource extends AbstractExtensionsResource { + + @PostMapping(path = "shutdown", produces = MediaType.APPLICATION_JSON_VALUE) + public ResponseEntity<Response> shutdownFunctions() { + try { + StreamPipesFunctionHandler.INSTANCE.cleanupFunctions(); + return ok(new Response("functions-shutdown", true, "Function shutdown triggered")); + } catch (RuntimeException e) { + return ok(new Response("functions-shutdown", false, e.getMessage())); + } + } +} diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java index b95136efbc..80e8ffde21 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java @@ -190,7 +190,6 @@ public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServic @PreDestroy public void onExit() { new ExtensionsServiceShutdownHandler().onShutdown(); - StreamPipesFunctionHandler.INSTANCE.cleanupFunctions(); deregisterService(DeclarersSingleton.getInstance().getServiceId()); } diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java index a36d788a4d..a6900ef299 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/function/StreamPipesFunctionHandler.java @@ -93,10 +93,16 @@ public enum StreamPipesFunctionHandler { } public void cleanupFunctions() { + if (this.runningInstances.isEmpty()) { + return; + } + + var functionDefinitions = getFunctionDefinitions(); this.runningInstances.forEach((key, value) -> { value.discardRuntime(); }); - new FunctionDeregistrationHandler(getFunctionDefinitions()).run(); + new FunctionDeregistrationHandler(functionDefinitions).run(); + this.runningInstances.clear(); } private List<FunctionDefinition> getFunctionDefinitions() { diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/core/INoSqlStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/core/INoSqlStorage.java index 2e659aa65f..b80905355d 100644 --- a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/core/INoSqlStorage.java +++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/core/INoSqlStorage.java @@ -21,6 +21,7 @@ import org.apache.streampipes.storage.api.connect.IAdapterStorage; import org.apache.streampipes.storage.api.explorer.IDataExplorerDashboardStorage; import org.apache.streampipes.storage.api.explorer.IDataExplorerWidgetStorage; import org.apache.streampipes.storage.api.explorer.IDataLakeMeasureStorage; +import org.apache.streampipes.storage.api.function.IFunctionStateStorage; import org.apache.streampipes.storage.api.pipeline.ICompactPipelineTemplateStorage; import org.apache.streampipes.storage.api.pipeline.IDataProcessorStorage; import org.apache.streampipes.storage.api.pipeline.IDataSinkStorage; @@ -108,4 +109,6 @@ public interface INoSqlStorage { IAssetStorage getAssetStorage(); ITransformationScriptTemplateStorage getTransformationScriptTemplateStorage(); + + IFunctionStateStorage getFunctionStateStorage(); } diff --git a/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/function/IFunctionStateStorage.java b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/function/IFunctionStateStorage.java new file mode 100644 index 0000000000..1be7d41e13 --- /dev/null +++ b/streampipes-storage-api/src/main/java/org/apache/streampipes/storage/api/function/IFunctionStateStorage.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.storage.api.function; + +import org.apache.streampipes.model.function.FunctionState; +import org.apache.streampipes.storage.api.core.CRUDStorage; + +public interface IFunctionStateStorage extends CRUDStorage<FunctionState> { +} diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java index d7d07af389..1bf164bbfc 100644 --- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java +++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/CouchDbStorageManager.java @@ -23,6 +23,7 @@ import org.apache.streampipes.storage.api.core.INoSqlStorage; import org.apache.streampipes.storage.api.explorer.IDataExplorerDashboardStorage; import org.apache.streampipes.storage.api.explorer.IDataExplorerWidgetStorage; import org.apache.streampipes.storage.api.explorer.IDataLakeMeasureStorage; +import org.apache.streampipes.storage.api.function.IFunctionStateStorage; import org.apache.streampipes.storage.api.pipeline.ICompactPipelineTemplateStorage; import org.apache.streampipes.storage.api.pipeline.IDataProcessorStorage; import org.apache.streampipes.storage.api.pipeline.IDataSinkStorage; @@ -53,6 +54,7 @@ import org.apache.streampipes.storage.couchdb.impl.connect.AdapterInstanceStorag import org.apache.streampipes.storage.couchdb.impl.explorer.DataExplorerDashboardStorageImpl; import org.apache.streampipes.storage.couchdb.impl.explorer.DataExplorerWidgetStorageImpl; import org.apache.streampipes.storage.couchdb.impl.explorer.DataLakeMeasureStorage; +import org.apache.streampipes.storage.couchdb.impl.function.FunctionStateStorageImpl; import org.apache.streampipes.storage.couchdb.impl.pipeline.CompactPipelineTemplateStorageImpl; import org.apache.streampipes.storage.couchdb.impl.pipeline.DataProcessorStorageImpl; import org.apache.streampipes.storage.couchdb.impl.pipeline.DataSinkStorageImpl; @@ -234,4 +236,9 @@ public class CouchDbStorageManager implements INoSqlStorage { public ITransformationScriptTemplateStorage getTransformationScriptTemplateStorage() { return new TransformationScriptTemplateStorageImpl(); } + + @Override + public IFunctionStateStorage getFunctionStateStorage() { + return new FunctionStateStorageImpl(); + } } diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/function/FunctionStateStorageImpl.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/function/FunctionStateStorageImpl.java new file mode 100644 index 0000000000..488da13b31 --- /dev/null +++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/impl/function/FunctionStateStorageImpl.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.storage.couchdb.impl.function; + +import org.apache.streampipes.model.function.FunctionState; +import org.apache.streampipes.storage.api.function.IFunctionStateStorage; +import org.apache.streampipes.storage.couchdb.impl.core.DefaultCrudStorage; +import org.apache.streampipes.storage.couchdb.utils.Utils; + +public class FunctionStateStorageImpl extends DefaultCrudStorage<FunctionState> implements IFunctionStateStorage { + + public FunctionStateStorageImpl() { + super( + () -> Utils.getCouchDbGsonClient("functions-state"), + FunctionState.class + ); + } +} diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java index ff4326971e..caced6b421 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionContext.java @@ -38,12 +38,14 @@ public class FunctionContext { private StreamPipesClient client; private String functionId; private ConfigExtractor config; + private final Map<Class<?>, StateStore<?>> stateStores; private Map<String, SpOutputCollector> outputCollectors; private IExtensionsLogger extensionsLogger; public FunctionContext() { this.streams = new HashMap<>(); + this.stateStores = new HashMap<>(); } public FunctionContext(String functionId, @@ -87,4 +89,11 @@ public class FunctionContext { public Map<String, SpOutputCollector> getOutputCollectors() { return outputCollectors; } + + public <T> StateStore<T> getStateStore(Class<T> stateClass) { + return (StateStore<T>) stateStores.computeIfAbsent( + stateClass, + key -> new FunctionStateStore<>(functionId, client, stateClass) + ); + } } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionStateStore.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionStateStore.java new file mode 100644 index 0000000000..483d932efc --- /dev/null +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/FunctionStateStore.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.wrapper.standalone.function; + +import org.apache.streampipes.client.StreamPipesClient; +import org.apache.streampipes.serializers.json.JacksonSerializer; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class FunctionStateStore<T> implements StateStore<T> { + + private static final Logger LOG = LoggerFactory.getLogger(FunctionStateStore.class); + + private final String functionId; + private final StreamPipesClient client; + private final Class<T> stateClass; + private final ObjectMapper objectMapper; + + public FunctionStateStore(String functionId, + StreamPipesClient client, + Class<T> stateClass) { + this.functionId = functionId; + this.client = client; + this.stateClass = stateClass; + this.objectMapper = JacksonSerializer.getObjectMapper(); + } + + @Override + public T load(T defaultState) { + try { + var functionState = client.adminApi().getFunctionState(functionId); + if (functionState.isPresent()) { + return objectMapper.convertValue(functionState.get(), stateClass); + } else { + return defaultState; + } + } catch (RuntimeException e) { + LOG.warn("Could not load function state for {}: {}", functionId, e.getMessage()); + return defaultState; + } + } + + @Override + public void persist(T state) { + try { + var payload = objectMapper.convertValue(state, new TypeReference<Map<String, Object>>() { + }); + client.adminApi().persistFunctionState(functionId, payload); + } catch (RuntimeException e) { + LOG.warn("Could not persist function state for {}: {}", functionId, e.getMessage()); + } + } +} diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StateStore.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StateStore.java new file mode 100644 index 0000000000..59b6068c84 --- /dev/null +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StateStore.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.wrapper.standalone.function; + +public interface StateStore<T> { + + T load(T defaultState); + + void persist(T state); +}
