This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch add-broker-communication-mode in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit d2d214618087b0db6d1e655bb6e2305ee9db5499 Author: Dominik Riemer <[email protected]> AuthorDate: Fri Mar 13 17:26:48 2026 +0100 feat: Add initial version of broker communication mode --- pom.xml | 1 + .../apache/streampipes/commons/constants/Envs.java | 6 + .../commons/environment/DefaultEnvironment.java | 15 ++ .../commons/environment/Environment.java | 6 + .../sinks/internal/jvm/datalake/DataLakeSink.java | 2 +- .../messaging/kafka/SpKafkaConsumer.java | 2 +- .../messaging/kafka/SpKafkaProducer.java | 6 +- .../ExtensionServiceBrokerErrorEnvelope.java | 50 ++-- .../ExtensionServiceBrokerRequestEnvelope.java | 72 ++++++ .../ExtensionServiceBrokerResponseEnvelope.java | 72 ++++++ .../transport/ExtensionServiceBrokerTopics.java | 59 +++++ .../transport/ExtensionServiceTransportMode.java | 46 ++-- .../pom.xml | 52 ++-- .../extensions/ExtensionBrokerRequestReceiver.java | 239 ++++++++++++++++++ .../extensions/ExtensionServiceOperationType.java | 1 + .../extensions/ExtensionServiceRequestTarget.java | 15 +- .../extensions/ExtensionServiceRequestTargets.java | 9 + .../core/ExtensionServiceRequestConfiguration.java | 37 ++- .../service/core/StreamPipesCoreApplication.java | 2 +- .../extensions/CoreExtensionTransportMode.java | 38 +-- .../extensions/CoreNatsRequestReplyClient.java | 82 +++++++ ...ansportAwareExtensionServiceRequestManager.java | 267 +++++++++++++++++++++ streampipes-service-extensions/pom.xml | 5 + .../StreamPipesExtensionsServiceBase.java | 33 +++ .../standalone/manager/ProtocolManager.java | 2 +- 25 files changed, 1006 insertions(+), 113 deletions(-) diff --git a/pom.xml b/pom.xml index efbe0d1d6c..7d45f2e858 100644 --- a/pom.xml +++ b/pom.xml @@ -912,6 +912,7 @@ <module>streampipes-service-core-minimal</module> <module>streampipes-service-discovery</module> <module>streampipes-service-discovery-api</module> + <module>streampipes-nats-extensions</module> <module>streampipes-service-extensions</module> <module>streampipes-storage-api</module> <module>streampipes-storage-couchdb</module> diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java index 5974facc9c..c7cb5b9519 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java @@ -101,6 +101,12 @@ public enum Envs { SP_NATS_PORT("SP_NATS_PORT", "4222"), SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"), + SP_CORE_EXTENSION_TRANSPORT_MODE("SP_CORE_EXTENSION_TRANSPORT_MODE", "auto"), + SP_EXTENSION_TRANSPORT_MODE("SP_EXTENSION_TRANSPORT_MODE", "http"), + SP_EXTENSION_REQUEST_TOPIC_PREFIX( + "SP_EXTENSION_REQUEST_TOPIC_PREFIX", + "sp.extensions.request" + ), CPU_RESOURCE_WEIGHT("SP_CPU_RESOURCE_WEIGHT", "1.0"), MEMORY_RESOURCE_WEIGHT("SP_MEMORY_RESOURCE_WEIGHT", "1.0"), diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java index dd01aa8af7..5ff5b0e568 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java @@ -312,6 +312,21 @@ public class DefaultEnvironment implements Environment { return new StringEnvironmentVariable(Envs.SP_PULSAR_URL); } + @Override + public StringEnvironmentVariable getCoreExtensionTransportMode() { + return new StringEnvironmentVariable(Envs.SP_CORE_EXTENSION_TRANSPORT_MODE); + } + + @Override + public StringEnvironmentVariable getExtensionTransportMode() { + return new StringEnvironmentVariable(Envs.SP_EXTENSION_TRANSPORT_MODE); + } + + @Override + public StringEnvironmentVariable getExtensionRequestTopicPrefix() { + return new StringEnvironmentVariable(Envs.SP_EXTENSION_REQUEST_TOPIC_PREFIX); + } + @Override public StringEnvironmentVariable getCustomServiceTags() { return new StringEnvironmentVariable(Envs.SP_SERVICE_TAGS); diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java index 8949d4663a..e0e52234b6 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java @@ -156,6 +156,12 @@ public interface Environment { StringEnvironmentVariable getPulsarUrl(); + StringEnvironmentVariable getCoreExtensionTransportMode(); + + StringEnvironmentVariable getExtensionTransportMode(); + + StringEnvironmentVariable getExtensionRequestTopicPrefix(); + StringEnvironmentVariable getCustomServiceTags(); StringEnvironmentVariable getAllowedUploadFiletypes(); diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java index 0a1b5bf1ea..40230d9e5a 100644 --- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java +++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java @@ -194,7 +194,7 @@ public class DataLakeSink implements IStreamPipesDataSink, SupportsRuntimeConfig .peek(ep -> { // Set all properties to DIMENSION_PROPERTY when seleted in dimensions if (dimensions.contains(ep.getRuntimeName())) { - LOG.info("Using {} as dimension", ep.getRuntimeName()); + LOG.debug("Using {} as dimension", ep.getRuntimeName()); ep.setPropertyScope(PropertyScope.DIMENSION_PROPERTY.name()); } }) diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java index ec2adcb80b..3f3939cf2e 100644 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java +++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaConsumer.java @@ -88,7 +88,7 @@ public class SpKafkaConsumer implements EventConsumer, Runnable, @Override public void connect(InternalEventProcessor<byte[]> eventProcessor) throws SpRuntimeException { - LOG.info("Kafka consumer: Connecting to {}", protocol.getTopicDefinition().getActualTopicName()); + LOG.debug("Kafka consumer: Connecting to {}", protocol.getTopicDefinition().getActualTopicName()); var patternTopic = isPatternTopic(); this.eventProcessor = eventProcessor; this.isRunning = true; diff --git a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java index eb07b66c6b..68637be02c 100644 --- a/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java +++ b/streampipes-messaging-kafka/src/main/java/org/apache/streampipes/messaging/kafka/SpKafkaProducer.java @@ -93,7 +93,7 @@ public class SpKafkaProducer implements EventProducer, Serializable { @Override public void connect() { - LOG.info("Kafka producer: Connecting to " + protocol.getTopicDefinition().getActualTopicName()); + LOG.debug("Kafka producer: Connecting to " + protocol.getTopicDefinition().getActualTopicName()); this.brokerUrl = protocol.getBrokerHostname() + ":" + protocol.getKafkaPort(); this.topic = protocol.getTopicDefinition().getActualTopicName(); @@ -106,7 +106,7 @@ public class SpKafkaProducer implements EventProducer, Serializable { this.producer = new KafkaProducer<>(makeProperties(protocol, Collections.emptyList())); this.connected = true; - LOG.info("Successfully created Kafka producer for topic " + this.topic); + LOG.debug("Successfully created Kafka producer for topic " + this.topic); } /** @@ -136,7 +136,7 @@ public class SpKafkaProducer implements EventProducer, Serializable { LOG.info("Successfully created Kafka topic " + topic); } else { - LOG.info("Topic " + topic + "already exists in the broker, skipping topic creation"); + LOG.debug("Topic " + topic + "already exists in the broker, skipping topic creation"); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerErrorEnvelope.java similarity index 55% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java copy to streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerErrorEnvelope.java index 53fa966da1..5bb2619fb6 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerErrorEnvelope.java @@ -16,24 +16,34 @@ * */ -package org.apache.streampipes.manager.api.extensions; - -public enum ExtensionServiceOperationType { - CONTAINER_PROVIDED_OPTIONS, - MIGRATION, - DESCRIPTION_UPDATE, - EXTENSION_DESCRIPTION, - FUNCTION_STOP, - ADAPTER_STATE_CHANGE, - RUNTIME_OPTIONS, - SAMPLE_DATA, - EXTENSION_INSTANCE_HEALTH, - SERVICE_HEALTH, - PIPELINE_ELEMENT_INVOCATION, - PIPELINE_ELEMENT_DETACH, - PIPELINE_ELEMENT_ASSETS, - ADAPTER_ASSETS, - ADAPTER_ICON_ASSET, - ADAPTER_DOCUMENTATION_ASSET, - OUTPUT_SCHEMA; +package org.apache.streampipes.model.extensions.transport; + +public class ExtensionServiceBrokerErrorEnvelope { + + private String errorType; + private String message; + + public ExtensionServiceBrokerErrorEnvelope() { + } + + public ExtensionServiceBrokerErrorEnvelope(String errorType, String message) { + this.errorType = errorType; + this.message = message; + } + + public String getErrorType() { + return errorType; + } + + public void setErrorType(String errorType) { + this.errorType = errorType; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } } diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerRequestEnvelope.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerRequestEnvelope.java new file mode 100644 index 0000000000..2f43dd734c --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerRequestEnvelope.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.extensions.transport; + +public class ExtensionServiceBrokerRequestEnvelope { + + private String requestId; + private String operation; + private String payload; + private String authToken; + + public ExtensionServiceBrokerRequestEnvelope() { + } + + public ExtensionServiceBrokerRequestEnvelope(String requestId, + String operation, + String payload, + String authToken) { + this.requestId = requestId; + this.operation = operation; + this.payload = payload; + this.authToken = authToken; + } + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getOperation() { + return operation; + } + + public void setOperation(String operation) { + this.operation = operation; + } + + public String getPayload() { + return payload; + } + + public void setPayload(String payload) { + this.payload = payload; + } + + public String getAuthToken() { + return authToken; + } + + public void setAuthToken(String authToken) { + this.authToken = authToken; + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerResponseEnvelope.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerResponseEnvelope.java new file mode 100644 index 0000000000..95b2965546 --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerResponseEnvelope.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.extensions.transport; + +public class ExtensionServiceBrokerResponseEnvelope { + + private String requestId; + private int statusCode; + private String payload; + private ExtensionServiceBrokerErrorEnvelope error; + + public ExtensionServiceBrokerResponseEnvelope() { + } + + public ExtensionServiceBrokerResponseEnvelope(String requestId, + int statusCode, + String payload, + ExtensionServiceBrokerErrorEnvelope error) { + this.requestId = requestId; + this.statusCode = statusCode; + this.payload = payload; + this.error = error; + } + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public int getStatusCode() { + return statusCode; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public String getPayload() { + return payload; + } + + public void setPayload(String payload) { + this.payload = payload; + } + + public ExtensionServiceBrokerErrorEnvelope getError() { + return error; + } + + public void setError(ExtensionServiceBrokerErrorEnvelope error) { + this.error = error; + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerTopics.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerTopics.java new file mode 100644 index 0000000000..d345b8f97e --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceBrokerTopics.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.extensions.transport; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public final class ExtensionServiceBrokerTopics { + + public static final String DEFAULT_REQUEST_TOPIC_PREFIX = "sp.extensions.request"; + + public static final String TRANSPORT_TAG_HTTP = "transport:http"; + public static final String TRANSPORT_TAG_NATS = "transport:nats"; + + private ExtensionServiceBrokerTopics() { + } + + public static String serviceWildcard(String topicPrefix, String serviceId) { + return serviceTopic(topicPrefix, serviceId, List.of()) + ".>"; + } + + public static String serviceTopic(String topicPrefix, + String serviceId, + List<String> topicSegments) { + return Stream.concat( + Stream.of(topicPrefix, serviceId), + topicSegments.stream()) + .filter(Objects::nonNull) + .map(ExtensionServiceBrokerTopics::normalizeSegment) + .filter(part -> !part.isEmpty()) + .collect(Collectors.joining(".")); + } + + private static String normalizeSegment(String value) { + return trimSlashes(value).replace("/", "."); + } + + private static String trimSlashes(String value) { + return value.replaceAll("^/+", "").replaceAll("/+$", ""); + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceTransportMode.java similarity index 56% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java copy to streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceTransportMode.java index 53fa966da1..2b48e5a0c4 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/extensions/transport/ExtensionServiceTransportMode.java @@ -16,24 +16,32 @@ * */ -package org.apache.streampipes.manager.api.extensions; +package org.apache.streampipes.model.extensions.transport; -public enum ExtensionServiceOperationType { - CONTAINER_PROVIDED_OPTIONS, - MIGRATION, - DESCRIPTION_UPDATE, - EXTENSION_DESCRIPTION, - FUNCTION_STOP, - ADAPTER_STATE_CHANGE, - RUNTIME_OPTIONS, - SAMPLE_DATA, - EXTENSION_INSTANCE_HEALTH, - SERVICE_HEALTH, - PIPELINE_ELEMENT_INVOCATION, - PIPELINE_ELEMENT_DETACH, - PIPELINE_ELEMENT_ASSETS, - ADAPTER_ASSETS, - ADAPTER_ICON_ASSET, - ADAPTER_DOCUMENTATION_ASSET, - OUTPUT_SCHEMA; +import java.util.Locale; + +public enum ExtensionServiceTransportMode { + HTTP, + NATS, + DUAL; + + public boolean supportsHttp() { + return this == HTTP || this == DUAL; + } + + public boolean supportsNats() { + return this == NATS || this == DUAL; + } + + public static ExtensionServiceTransportMode from(String value) { + if (value == null || value.isBlank()) { + return HTTP; + } + + try { + return ExtensionServiceTransportMode.valueOf(value.trim().toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + return HTTP; + } + } } diff --git a/streampipes-service-extensions/pom.xml b/streampipes-nats-extensions/pom.xml similarity index 58% copy from streampipes-service-extensions/pom.xml copy to streampipes-nats-extensions/pom.xml index d370d673d1..5dfc39b74a 100644 --- a/streampipes-service-extensions/pom.xml +++ b/streampipes-nats-extensions/pom.xml @@ -1,3 +1,4 @@ +<?xml version="1.0" encoding="UTF-8"?> <!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more ~ contributor license agreements. See the NOTICE file distributed with @@ -16,55 +17,37 @@ ~ --> - -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> <parent> - <artifactId>streampipes-parent</artifactId> <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-parent</artifactId> <version>0.99.0-SNAPSHOT</version> </parent> - <modelVersion>4.0.0</modelVersion> - <artifactId>streampipes-service-extensions</artifactId> + <artifactId>streampipes-nats-extensions</artifactId> <dependencies> - <!-- StreamPipes dependencies --> <dependency> <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-service-base</artifactId> + <artifactId>streampipes-commons</artifactId> <version>0.99.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-connect-transformer-api</artifactId> + <artifactId>streampipes-extensions-management</artifactId> <version>0.99.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-connect-transformer-groovy</artifactId> + <artifactId>streampipes-model</artifactId> <version>0.99.0-SNAPSHOT</version> </dependency> <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-connect-transformer-js</artifactId> - <version>0.99.0-SNAPSHOT</version> - </dependency> - - <dependency> - <groupId>org.apache.streampipes</groupId> - <artifactId>streampipes-rest-extensions</artifactId> - <version>0.99.0-SNAPSHOT</version> - <exclusions> - <exclusion> - <groupId>org.javassist</groupId> - <artifactId>javassist</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.springframework.boot</groupId> - <artifactId>spring-boot-starter-web</artifactId> + <groupId>io.nats</groupId> + <artifactId>jnats</artifactId> </dependency> <!-- Test dependencies --> @@ -73,17 +56,18 @@ <artifactId>junit-jupiter-api</artifactId> <scope>test</scope> </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <scope>test</scope> - </dependency> </dependencies> + <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <propertyExpansion> + checkstyle.config.base.path=${project.parent.basedir}/tools/maven + </propertyExpansion> + </configuration> </plugin> </plugins> </build> diff --git a/streampipes-nats-extensions/src/main/java/org/apache/streampipes/nats/extensions/ExtensionBrokerRequestReceiver.java b/streampipes-nats-extensions/src/main/java/org/apache/streampipes/nats/extensions/ExtensionBrokerRequestReceiver.java new file mode 100644 index 0000000000..df517d7c8b --- /dev/null +++ b/streampipes-nats-extensions/src/main/java/org/apache/streampipes/nats/extensions/ExtensionBrokerRequestReceiver.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.nats.extensions; + +import org.apache.streampipes.commons.environment.Environments; +import org.apache.streampipes.commons.exceptions.connect.AdapterException; +import org.apache.streampipes.extensions.management.connect.AdapterWorkerRequestManagement; +import org.apache.streampipes.extensions.management.monitoring.ServiceMonitorManagement; +import org.apache.streampipes.model.connect.adapter.AdapterDescription; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerErrorEnvelope; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerRequestEnvelope; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerResponseEnvelope; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerTopics; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceTransportMode; +import org.apache.streampipes.serializers.json.JacksonSerializer; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.nats.client.Connection; +import io.nats.client.Dispatcher; +import io.nats.client.Message; +import io.nats.client.Nats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ExtensionBrokerRequestReceiver { + + private static final Logger LOG = LoggerFactory.getLogger(ExtensionBrokerRequestReceiver.class); + + private static final String ADAPTER_STATE_CHANGE_OPERATION = "ADAPTER_STATE_CHANGE"; + private static final String SERVICE_LOAD_OPERATION = "SERVICE_LOAD"; + private static final String STATE_CHANGE_START = "start"; + private static final String STATE_CHANGE_STOP = "stop"; + private static final int HTTP_STATUS_OK = 200; + private static final int HTTP_STATUS_BAD_REQUEST = 400; + private static final int HTTP_STATUS_INTERNAL_SERVER_ERROR = 500; + private static final int HTTP_STATUS_NOT_IMPLEMENTED = 501; + + private final ObjectMapper objectMapper; + private final ServiceMonitorManagement serviceMonitorManagement; + private final AdapterWorkerRequestManagement adapterWorkerRequestManagement; + + private Connection natsConnection; + private Dispatcher dispatcher; + + public ExtensionBrokerRequestReceiver() { + this(new ServiceMonitorManagement(), new AdapterWorkerRequestManagement()); + } + + public ExtensionBrokerRequestReceiver(ServiceMonitorManagement serviceMonitorManagement, + AdapterWorkerRequestManagement adapterWorkerRequestManagement) { + this.objectMapper = JacksonSerializer.getObjectMapper(); + this.serviceMonitorManagement = serviceMonitorManagement; + this.adapterWorkerRequestManagement = adapterWorkerRequestManagement; + } + + public synchronized boolean start(String serviceId, + ExtensionServiceTransportMode mode, + String topicPrefix) { + if (!mode.supportsNats()) { + return false; + } + + try { + var env = Environments.getEnvironment(); + String natsUrl = "nats://" + env.getNatsHost().getValueOrDefault() + + ":" + env.getNatsPort().getValueOrDefault(); + this.natsConnection = Nats.connect(natsUrl); + + String subscriptionTopic = ExtensionServiceBrokerTopics.serviceWildcard(topicPrefix, serviceId); + this.dispatcher = natsConnection.createDispatcher(this::onMessage); + this.dispatcher.subscribe(subscriptionTopic); + + LOG.info("Extension broker receiver listening on topic {}", subscriptionTopic); + return true; + } catch (Exception e) { + LOG.warn("Could not start extension broker receiver", e); + stop(); + return false; + } + } + + public synchronized void stop() { + if (natsConnection != null && dispatcher != null) { + natsConnection.closeDispatcher(dispatcher); + dispatcher = null; + } + + if (natsConnection != null) { + try { + natsConnection.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while closing extension broker receiver", e); + } finally { + natsConnection = null; + } + } + } + + private void onMessage(Message message) { + String replyTo = message.getReplyTo(); + if (replyTo == null || replyTo.isBlank()) { + return; + } + + ExtensionServiceBrokerResponseEnvelope response; + try { + var request = objectMapper.readValue(message.getData(), ExtensionServiceBrokerRequestEnvelope.class); + response = handleRequest(request, message.getSubject()); + } catch (Exception e) { + response = error(null, HTTP_STATUS_INTERNAL_SERVER_ERROR, e); + } + + publishResponse(replyTo, response); + } + + private ExtensionServiceBrokerResponseEnvelope handleRequest(ExtensionServiceBrokerRequestEnvelope request, + String topic) { + try { + if (SERVICE_LOAD_OPERATION.equals(request.getOperation())) { + var payload = objectMapper.writeValueAsString(serviceMonitorManagement.getCurrentReport()); + return new ExtensionServiceBrokerResponseEnvelope( + request.getRequestId(), + HTTP_STATUS_OK, + payload, + null + ); + } + + if (ADAPTER_STATE_CHANGE_OPERATION.equals(request.getOperation())) { + return handleAdapterStateChangeRequest(request, topic); + } + + return new ExtensionServiceBrokerResponseEnvelope( + request.getRequestId(), + HTTP_STATUS_NOT_IMPLEMENTED, + null, + new ExtensionServiceBrokerErrorEnvelope( + "UnsupportedOperation", + "No broker handler available for operation " + request.getOperation() + ) + ); + } catch (Exception e) { + return error(request.getRequestId(), HTTP_STATUS_INTERNAL_SERVER_ERROR, e); + } + } + + private ExtensionServiceBrokerResponseEnvelope handleAdapterStateChangeRequest( + ExtensionServiceBrokerRequestEnvelope request, + String topic + ) throws Exception { + if (request.getPayload() == null || request.getPayload().isBlank()) { + return new ExtensionServiceBrokerResponseEnvelope( + request.getRequestId(), + HTTP_STATUS_BAD_REQUEST, + null, + new ExtensionServiceBrokerErrorEnvelope("InvalidPayload", "Missing adapter payload") + ); + } + + var adapterDescription = objectMapper.readValue(request.getPayload(), AdapterDescription.class); + var command = extractStateChangeCommand(topic); + + try { + if (STATE_CHANGE_START.equals(command)) { + var payload = objectMapper.writeValueAsString(adapterWorkerRequestManagement.invokeAdapter(adapterDescription)); + return new ExtensionServiceBrokerResponseEnvelope(request.getRequestId(), HTTP_STATUS_OK, payload, null); + } + + if (STATE_CHANGE_STOP.equals(command)) { + var payload = objectMapper.writeValueAsString(adapterWorkerRequestManagement.stopAdapter(adapterDescription)); + return new ExtensionServiceBrokerResponseEnvelope(request.getRequestId(), HTTP_STATUS_OK, payload, null); + } + + return new ExtensionServiceBrokerResponseEnvelope( + request.getRequestId(), + HTTP_STATUS_BAD_REQUEST, + null, + new ExtensionServiceBrokerErrorEnvelope( + "InvalidCommand", + "Unknown adapter state change command in topic " + topic + ) + ); + } catch (AdapterException e) { + return new ExtensionServiceBrokerResponseEnvelope( + request.getRequestId(), + HTTP_STATUS_INTERNAL_SERVER_ERROR, + objectMapper.writeValueAsString(e), + new ExtensionServiceBrokerErrorEnvelope(e.getClass().getSimpleName(), e.getMessage()) + ); + } + } + + private String extractStateChangeCommand(String topic) { + int separatorIndex = topic.lastIndexOf('.'); + if (separatorIndex < 0 || separatorIndex + 1 >= topic.length()) { + return ""; + } + + return topic.substring(separatorIndex + 1); + } + + private ExtensionServiceBrokerResponseEnvelope error(String requestId, int statusCode, Exception e) { + return new ExtensionServiceBrokerResponseEnvelope( + requestId, + statusCode, + null, + new ExtensionServiceBrokerErrorEnvelope(e.getClass().getSimpleName(), e.getMessage()) + ); + } + + private void publishResponse(String replyTo, ExtensionServiceBrokerResponseEnvelope response) { + if (natsConnection == null) { + return; + } + + try { + natsConnection.publish(replyTo, objectMapper.writeValueAsBytes(response)); + } catch (Exception e) { + LOG.warn("Could not publish broker response to subject {}", replyTo, e); + } + } +} diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java index 53fa966da1..92350f1ca6 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java @@ -29,6 +29,7 @@ public enum ExtensionServiceOperationType { SAMPLE_DATA, EXTENSION_INSTANCE_HEALTH, SERVICE_HEALTH, + SERVICE_LOAD, PIPELINE_ELEMENT_INVOCATION, PIPELINE_ELEMENT_DETACH, PIPELINE_ELEMENT_ASSETS, diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTarget.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTarget.java index 2667b1ade6..31b311e82b 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTarget.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTarget.java @@ -18,12 +18,13 @@ package org.apache.streampipes.manager.api.extensions; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerTopics; + import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.Objects; import java.util.stream.Collectors; -import java.util.stream.Stream; public record ExtensionServiceRequestTarget(String baseUrl, String serviceId, @@ -80,13 +81,7 @@ public record ExtensionServiceRequestTarget(String baseUrl, ? List.of(operation.name().toLowerCase(Locale.ROOT)) : topicSegments; - return Stream.concat( - Stream.of(topicPrefix, serviceId), - segments.stream()) - .filter(Objects::nonNull) - .map(ExtensionServiceRequestTarget::toTopicSegment) - .filter(part -> !part.isEmpty()) - .collect(Collectors.joining(".")); + return ExtensionServiceBrokerTopics.serviceTopic(topicPrefix, serviceId, segments); } private static String trimTrailingSlash(String value) { @@ -96,8 +91,4 @@ public record ExtensionServiceRequestTarget(String baseUrl, private static String trimSlashes(String value) { return value.replaceAll("^/+", "").replaceAll("/+$", ""); } - - private static String toTopicSegment(String value) { - return trimSlashes(value).replace("/", "."); - } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTargets.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTargets.java index 82f9972480..f1a11c56cb 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTargets.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceRequestTargets.java @@ -134,6 +134,15 @@ public final class ExtensionServiceRequestTargets { ); } + public static ExtensionServiceRequestTarget serviceLoad(SpServiceRegistration service) { + return forService( + service, + ExtensionServiceOperationType.SERVICE_LOAD, + path("serviceMonitor"), + topic("monitoring", "service-load") + ); + } + public static ExtensionServiceRequestTarget pipelineInvocation(String baseUrl, String serviceId, SpServiceUrlProvider provider, diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java index 4e2bd33111..b5831d8ab6 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/ExtensionServiceRequestConfiguration.java @@ -17,19 +17,52 @@ */ package org.apache.streampipes.service.core; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.connect.management.management.WorkerRestClient; import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; import org.apache.streampipes.manager.execution.HttpExtensionServiceRequestManager; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerTopics; +import org.apache.streampipes.service.core.extensions.CoreExtensionTransportMode; +import org.apache.streampipes.service.core.extensions.CoreNatsRequestReplyClient; +import org.apache.streampipes.service.core.extensions.TransportAwareExtensionServiceRequestManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.time.Duration; + @Configuration public class ExtensionServiceRequestConfiguration { + @Bean(destroyMethod = "close") + public CoreNatsRequestReplyClient coreNatsRequestReplyClient() { + var env = Environments.getEnvironment(); + return new CoreNatsRequestReplyClient( + env.getNatsHost().getValueOrDefault(), + env.getNatsPort().getValueOrDefault(), + Duration.ofSeconds(2) + ); + } + @Bean - public ExtensionServiceRequestManager extensionServiceRequestManager() { - return new HttpExtensionServiceRequestManager(); + public ExtensionServiceRequestManager extensionServiceRequestManager( + CoreNatsRequestReplyClient coreNatsRequestReplyClient + ) { + var env = Environments.getEnvironment(); + + var transportMode = CoreExtensionTransportMode.from( + env.getCoreExtensionTransportMode().getValueOrDefault() + ); + + var topicPrefix = env.getExtensionRequestTopicPrefix() + .getValueOrReturn(ExtensionServiceBrokerTopics.DEFAULT_REQUEST_TOPIC_PREFIX); + + return new TransportAwareExtensionServiceRequestManager( + new HttpExtensionServiceRequestManager(), + coreNatsRequestReplyClient, + transportMode, + topicPrefix + ); } @Bean diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java index 1bff0837dd..9517e29109 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java @@ -159,7 +159,7 @@ public class StreamPipesCoreApplication extends StreamPipesServiceBase { var env = Environments.getEnvironment(); ExtensionsServiceReportExecutor.setServiceReportFetcher(serviceRegistration -> { - var target = ExtensionServiceRequestTargets.serviceHealth(serviceRegistration, "serviceMonitor"); + var target = ExtensionServiceRequestTargets.serviceLoad(serviceRegistration); var response = extensionServiceRequestManager.requestServiceLoad(target); if (!response.isSuccess()) { diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/CoreExtensionTransportMode.java similarity index 62% copy from streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java copy to streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/CoreExtensionTransportMode.java index 53fa966da1..45fa0b6225 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/api/extensions/ExtensionServiceOperationType.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/CoreExtensionTransportMode.java @@ -16,24 +16,24 @@ * */ -package org.apache.streampipes.manager.api.extensions; +package org.apache.streampipes.service.core.extensions; -public enum ExtensionServiceOperationType { - CONTAINER_PROVIDED_OPTIONS, - MIGRATION, - DESCRIPTION_UPDATE, - EXTENSION_DESCRIPTION, - FUNCTION_STOP, - ADAPTER_STATE_CHANGE, - RUNTIME_OPTIONS, - SAMPLE_DATA, - EXTENSION_INSTANCE_HEALTH, - SERVICE_HEALTH, - PIPELINE_ELEMENT_INVOCATION, - PIPELINE_ELEMENT_DETACH, - PIPELINE_ELEMENT_ASSETS, - ADAPTER_ASSETS, - ADAPTER_ICON_ASSET, - ADAPTER_DOCUMENTATION_ASSET, - OUTPUT_SCHEMA; +import java.util.Locale; + +public enum CoreExtensionTransportMode { + HTTP, + NATS, + AUTO; + + public static CoreExtensionTransportMode from(String value) { + if (value == null || value.isBlank()) { + return AUTO; + } + + try { + return CoreExtensionTransportMode.valueOf(value.trim().toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + return AUTO; + } + } } diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/CoreNatsRequestReplyClient.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/CoreNatsRequestReplyClient.java new file mode 100644 index 0000000000..c7fbaa3a45 --- /dev/null +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/CoreNatsRequestReplyClient.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.service.core.extensions; + +import io.nats.client.Connection; +import io.nats.client.Message; +import io.nats.client.Nats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.Duration; + +public class CoreNatsRequestReplyClient { + + private static final Logger LOG = LoggerFactory.getLogger(CoreNatsRequestReplyClient.class); + + private final String natsUrl; + private final Duration timeout; + private Connection natsConnection; + + public CoreNatsRequestReplyClient(String host, int port, Duration timeout) { + this.natsUrl = "nats://" + host + ":" + port; + this.timeout = timeout; + } + + public synchronized byte[] request(String subject, byte[] payload) throws IOException { + try { + Message response = getConnection().request(subject, payload, timeout); + if (response == null) { + throw new IOException("No NATS response received for subject " + subject); + } + return response.getData(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("NATS request was interrupted for subject " + subject, e); + } + } + + private Connection getConnection() throws IOException { + if (natsConnection == null || natsConnection.getStatus() != Connection.Status.CONNECTED) { + try { + natsConnection = Nats.connect(natsUrl); + LOG.info("Connected to NATS at {}", natsUrl); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Could not connect to NATS at " + natsUrl, e); + } + } + + return natsConnection; + } + + public synchronized void close() { + if (natsConnection != null) { + try { + natsConnection.close(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while closing NATS connection", e); + } finally { + natsConnection = null; + } + } + } +} diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/TransportAwareExtensionServiceRequestManager.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/TransportAwareExtensionServiceRequestManager.java new file mode 100644 index 0000000000..dba8764980 --- /dev/null +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/extensions/TransportAwareExtensionServiceRequestManager.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.service.core.extensions; + +import org.apache.streampipes.manager.api.extensions.ExtensionServiceOperationResult; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestManager; +import org.apache.streampipes.manager.api.extensions.ExtensionServiceRequestTarget; +import org.apache.streampipes.manager.util.AuthTokenUtils; +import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerErrorEnvelope; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerRequestEnvelope; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerResponseEnvelope; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerTopics; +import org.apache.streampipes.storage.management.StorageDispatcher; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +public class TransportAwareExtensionServiceRequestManager implements ExtensionServiceRequestManager { + + private static final Logger LOG = + LoggerFactory.getLogger(TransportAwareExtensionServiceRequestManager.class); + private static final int INTERNAL_SERVER_ERROR = 500; + + private final ObjectMapper objectMapper; + private final ExtensionServiceRequestManager httpRequestManager; + private final CoreNatsRequestReplyClient natsRequestReplyClient; + private final CoreExtensionTransportMode transportMode; + private final String topicPrefix; + + public TransportAwareExtensionServiceRequestManager( + ExtensionServiceRequestManager httpRequestManager, + CoreNatsRequestReplyClient natsRequestReplyClient, + CoreExtensionTransportMode transportMode, + String topicPrefix + ) { + this.objectMapper = new ObjectMapper(); + this.httpRequestManager = httpRequestManager; + this.natsRequestReplyClient = natsRequestReplyClient; + this.transportMode = transportMode; + this.topicPrefix = topicPrefix; + } + + @Override + public ExtensionServiceOperationResult requestContainerProvidedOptions(ExtensionServiceRequestTarget target, + String payload) throws IOException { + return httpRequestManager.requestContainerProvidedOptions(target, payload); + } + + @Override + public ExtensionServiceOperationResult requestMigration(ExtensionServiceRequestTarget target, + String payload) throws IOException { + return httpRequestManager.requestMigration(target, payload); + } + + @Override + public ExtensionServiceOperationResult requestDescriptionUpdate(ExtensionServiceRequestTarget target) + throws IOException { + return httpRequestManager.requestDescriptionUpdate(target); + } + + @Override + public ExtensionServiceOperationResult requestExtensionDescription(ExtensionServiceRequestTarget target) + throws IOException { + return httpRequestManager.requestExtensionDescription(target); + } + + @Override + public ExtensionServiceOperationResult requestFunctionStop(ExtensionServiceRequestTarget target) + throws IOException { + return httpRequestManager.requestFunctionStop(target); + } + + @Override + public ExtensionServiceOperationResult requestAdapterStateChange(ExtensionServiceRequestTarget target, + String elementId, + String payload) throws IOException { + if (useNats(target)) { + var authToken = AuthTokenUtils.getAuthToken(elementId); + return requestViaNats(target, payload, authToken); + } + + return httpRequestManager.requestAdapterStateChange(target, elementId, payload); + } + + @Override + public ExtensionServiceOperationResult requestRuntimeOptions(ExtensionServiceRequestTarget target, + String payload) throws IOException { + return httpRequestManager.requestRuntimeOptions(target, payload); + } + + @Override + public ExtensionServiceOperationResult requestSampleData(ExtensionServiceRequestTarget target, + String payload) throws IOException { + return httpRequestManager.requestSampleData(target, payload); + } + + @Override + public ExtensionServiceOperationResult requestExtensionInstanceHealth(ExtensionServiceRequestTarget target) + throws IOException { + return httpRequestManager.requestExtensionInstanceHealth(target); + } + + @Override + public ExtensionServiceOperationResult requestServiceHealth(ExtensionServiceRequestTarget target) + throws IOException { + return httpRequestManager.requestServiceHealth(target); + } + + @Override + public ExtensionServiceOperationResult requestServiceLoad(ExtensionServiceRequestTarget target) throws IOException { + if (useNats(target)) { + try { + var response = requestServiceLoadViaNats(target); + if (response.isSuccess() || transportMode == CoreExtensionTransportMode.NATS) { + return response; + } + + LOG.warn("NATS request for operation {} to service {} returned status {} - falling back to HTTP", + target.operation(), target.serviceId(), response.statusCode()); + } catch (IOException e) { + if (transportMode == CoreExtensionTransportMode.NATS) { + throw e; + } + + LOG.warn("NATS request for operation {} to service {} failed - falling back to HTTP", + target.operation(), target.serviceId(), e); + } + } + + return httpRequestManager.requestServiceLoad(target); + } + + @Override + public ExtensionServiceOperationResult requestPipelineElementInvocation(ExtensionServiceRequestTarget target, + String pipelineId, + String payload) throws IOException { + return httpRequestManager.requestPipelineElementInvocation(target, pipelineId, payload); + } + + @Override + public ExtensionServiceOperationResult requestPipelineElementDetach(ExtensionServiceRequestTarget target, + String pipelineId) throws IOException { + return httpRequestManager.requestPipelineElementDetach(target, pipelineId); + } + + @Override + public ExtensionServiceOperationResult requestPipelineElementAssets(ExtensionServiceRequestTarget target) + throws IOException { + return httpRequestManager.requestPipelineElementAssets(target); + } + + @Override + public ExtensionServiceOperationResult requestAdapterAssets(ExtensionServiceRequestTarget target) + throws IOException { + return httpRequestManager.requestAdapterAssets(target); + } + + @Override + public ExtensionServiceOperationResult requestAdapterIconAsset(ExtensionServiceRequestTarget target) + throws IOException { + return httpRequestManager.requestAdapterIconAsset(target); + } + + @Override + public ExtensionServiceOperationResult requestAdapterDocumentationAsset(ExtensionServiceRequestTarget target) + throws IOException { + return httpRequestManager.requestAdapterDocumentationAsset(target); + } + + @Override + public ExtensionServiceOperationResult requestOutputSchema(ExtensionServiceRequestTarget target, + String payload) throws IOException { + return httpRequestManager.requestOutputSchema(target, payload); + } + + private boolean useNats(ExtensionServiceRequestTarget target) { + return switch (transportMode) { + case HTTP -> false; + case NATS -> true; + case AUTO -> serviceSupportsNats(target); + }; + } + + private boolean serviceSupportsNats(ExtensionServiceRequestTarget target) { + var service = StorageDispatcher.INSTANCE + .getNoSqlStore() + .getExtensionsServiceStorage() + .getElementById(target.serviceId()); + + if (service == null || service.getTags() == null) { + return false; + } + + return service.getTags().stream().anyMatch(tag -> + tag.getPrefix() == SpServiceTagPrefix.CUSTOM + && ExtensionServiceBrokerTopics.TRANSPORT_TAG_NATS.equals(tag.getValue()) + ); + } + + private ExtensionServiceOperationResult requestServiceLoadViaNats(ExtensionServiceRequestTarget target) + throws IOException { + return requestViaNats(target, null, null); + } + + private ExtensionServiceOperationResult requestViaNats(ExtensionServiceRequestTarget target, + String payload, + String authToken) throws IOException { + String topic = target.toTopic(topicPrefix); + + var requestEnvelope = new ExtensionServiceBrokerRequestEnvelope( + UUID.randomUUID().toString(), + target.operation().name(), + payload, + authToken + ); + + byte[] responseBytes = natsRequestReplyClient.request(topic, objectMapper.writeValueAsBytes(requestEnvelope)); + var responseEnvelope = objectMapper.readValue(responseBytes, ExtensionServiceBrokerResponseEnvelope.class); + + return toOperationResult(responseEnvelope); + } + + private ExtensionServiceOperationResult toOperationResult(ExtensionServiceBrokerResponseEnvelope responseEnvelope) + throws IOException { + byte[] body = makeBody(responseEnvelope); + int statusCode = responseEnvelope.getStatusCode() == 0 + ? INTERNAL_SERVER_ERROR + : responseEnvelope.getStatusCode(); + + return new ExtensionServiceOperationResult(statusCode, body); + } + + private byte[] makeBody(ExtensionServiceBrokerResponseEnvelope responseEnvelope) throws IOException { + if (responseEnvelope.getPayload() != null) { + return responseEnvelope.getPayload().getBytes(StandardCharsets.UTF_8); + } + + ExtensionServiceBrokerErrorEnvelope error = responseEnvelope.getError(); + if (error != null) { + return objectMapper.writeValueAsBytes(error); + } + + return null; + } +} diff --git a/streampipes-service-extensions/pom.xml b/streampipes-service-extensions/pom.xml index d370d673d1..4d07bc7c9c 100644 --- a/streampipes-service-extensions/pom.xml +++ b/streampipes-service-extensions/pom.xml @@ -61,6 +61,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.streampipes</groupId> + <artifactId>streampipes-nats-extensions</artifactId> + <version>0.99.0-SNAPSHOT</version> + </dependency> <dependency> <groupId>org.springframework.boot</groupId> diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java index 995536e113..508f4d9cb8 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/StreamPipesExtensionsServiceBase.java @@ -36,6 +36,9 @@ import org.apache.streampipes.model.extensions.configuration.SpServiceConfigurat import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceBrokerTopics; +import org.apache.streampipes.model.extensions.transport.ExtensionServiceTransportMode; +import org.apache.streampipes.nats.extensions.ExtensionBrokerRequestReceiver; import org.apache.streampipes.rest.shared.exception.SpRestExceptionHandler; import org.apache.streampipes.rest.shared.serializer.JacksonConfiguration; import org.apache.streampipes.service.base.BaseNetworkingConfig; @@ -75,6 +78,9 @@ import java.util.stream.Collectors; public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServiceBase { private static final Logger LOG = LoggerFactory.getLogger(StreamPipesExtensionsServiceBase.class); + private final ExtensionBrokerRequestReceiver extensionBrokerRequestReceiver = new ExtensionBrokerRequestReceiver(); + private ExtensionServiceTransportMode extensionTransportMode = ExtensionServiceTransportMode.HTTP; + private boolean natsBrokerReceiverActive = false; public void init() { SpServiceDefinition serviceDef = provideServiceDefinition(); @@ -123,6 +129,17 @@ public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServic public void startExtensionsService(Class<?> serviceClass, SpServiceDefinition serviceDef, BaseNetworkingConfig networkingConfig) throws UnknownHostException { + this.extensionTransportMode = ExtensionServiceTransportMode.from( + Environments.getEnvironment().getExtensionTransportMode().getValueOrDefault() + ); + this.natsBrokerReceiverActive = extensionBrokerRequestReceiver.start( + serviceId(), + extensionTransportMode, + Environments.getEnvironment() + .getExtensionRequestTopicPrefix() + .getValueOrReturn(ExtensionServiceBrokerTopics.DEFAULT_REQUEST_TOPIC_PREFIX) + ); + var extensions = new ExtensionItemProvider().getAllItemDescriptions(); var req = SpServiceRegistration.from( DefaultSpServiceTypes.EXT, @@ -159,10 +176,25 @@ public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServic DeclarersSingleton.getInstance().getServiceDefinition().getServiceGroup())); } tags.addAll(getExtensionsServiceTags(extensions)); + tags.addAll(getTransportServiceTags()); tags.addAll(new CustomServiceTagResolver(Environments.getEnvironment()).getCustomServiceTags()); return tags; } + private Set<SpServiceTag> getTransportServiceTags() { + Set<SpServiceTag> tags = new HashSet<>(); + + if (extensionTransportMode.supportsHttp()) { + tags.add(SpServiceTag.create(SpServiceTagPrefix.CUSTOM, ExtensionServiceBrokerTopics.TRANSPORT_TAG_HTTP)); + } + + if (extensionTransportMode.supportsNats() && natsBrokerReceiverActive) { + tags.add(SpServiceTag.create(SpServiceTagPrefix.CUSTOM, ExtensionServiceBrokerTopics.TRANSPORT_TAG_NATS)); + } + + return tags; + } + protected void deregisterService(String serviceId) { LOG.info("Deregistering service (id={})...", serviceId); StreamPipesClient client = new StreamPipesClientResolver().makeStreamPipesClientInstance(); @@ -187,6 +219,7 @@ public abstract class StreamPipesExtensionsServiceBase extends StreamPipesServic @PreDestroy public void onExit() { + extensionBrokerRequestReceiver.stop(); new ExtensionsServiceShutdownHandler().onShutdown(); deregisterService(DeclarersSingleton.getInstance().getServiceId()); } diff --git a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java index a6a448e894..6604d9ad81 100644 --- a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java +++ b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/manager/ProtocolManager.java @@ -47,7 +47,7 @@ public class ProtocolManager { return consumers.get(topicName(protocol)); } else { consumers.put(topicName(protocol), makeInputCollector(protocol, singletonEngine)); - LOG.info("Adding new consumer to consumer map (size=" + consumers.size() + "): " + topicName(protocol)); + LOG.debug("Adding new consumer to consumer map (size=" + consumers.size() + "): " + topicName(protocol)); return consumers.get(topicName(protocol)); }
