This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new bdb8fe5b2b feat: add Java client E2E Test and load-balance E2E Test
(#4235)
bdb8fe5b2b is described below
commit bdb8fe5b2ba5b139386c836cb24e965c5bb6b91d
Author: xinhao liu <[email protected]>
AuthorDate: Tue Mar 24 23:27:04 2026 +0800
feat: add Java client E2E Test and load-balance E2E Test (#4235)
---
.github/workflows/java-client-e2e-test.yml | 80 +++++
streampipes-client-e2e/README.md | 29 +-
...-compose.yml => docker-compose-loadbalance.yml} | 99 +++++-
streampipes-client-e2e/docker-compose.yml | 9 +
streampipes-client-e2e/java-client-e2e/pom.xml | 87 ++++++
.../streampipes/client/e2e/JavaClientTest.java | 93 ++++++
.../streampipes/client/e2e/LoadBalanceTest.java | 120 ++++++++
.../client/e2e/utils/ClientTestSupport.java | 340 +++++++++++++++++++++
.../client/e2e/utils/PipelineTemplateHelper.java | 249 +++++++++++++++
.../client/e2e/utils/TestResourceHelper.java | 230 ++++++++++++++
streampipes-client-e2e/tool/install-element.sh | 47 ++-
streampipes-client-e2e/tool/java-client-e2e.sh | 79 +++++
.../tool/start-streampipes-client-e2e.sh | 11 +-
13 files changed, 1451 insertions(+), 22 deletions(-)
diff --git a/.github/workflows/java-client-e2e-test.yml
b/.github/workflows/java-client-e2e-test.yml
new file mode 100644
index 0000000000..e2f8fdba9a
--- /dev/null
+++ b/.github/workflows/java-client-e2e-test.yml
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: run-java-client-e2e-tests
+
+on:
+ pull_request:
+
+jobs:
+ build-and-test:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v5
+
+ - name: Set up Docker Buildx
+ uses: docker/setup-buildx-action@v1
+
+ - name: Cache Docker layers
+ uses: actions/cache@v4
+ with:
+ path: /tmp/.buildx-cache
+ key: ${{ runner.os }}-buildx-${{ github.sha }}
+ restore-keys: |
+ ${{ runner.os }}-buildx-
+
+ - name: Set up JDK 17
+ uses: actions/setup-java@v5
+ with:
+ distribution: 'temurin'
+ java-version: '17'
+ cache: 'maven'
+
+ - name: Build with Maven
+ run: mvn clean install
+ env:
+ DEVELOCITY_ACCESS_KEY: ${{ secrets.STREAMPIPES_DEVELOCITY_ACCESS_KEY
}}
+
+ - name: Build Docker images
+ run: |
+ docker build -t
streampipes_pipeline-elements-all-jvm:release-validation
./streampipes-extensions/streampipes-extensions-all-jvm
+ docker build -t streampipes_backend:release-validation
./streampipes-service-core
+
+ - name: Run Java client single-instance test
+ run: |
+ docker compose -f ./streampipes-client-e2e/docker-compose.yml -p
streampipes-client-e2e-single up -d
+ cd ./streampipes-client-e2e/tool
+ chmod +x ./start-streampipes-client-e2e.sh
+ chmod +x ./java-client-e2e.sh
+ chmod +x ./install-element.sh
+ ./start-streampipes-client-e2e.sh -t java-client-e2e.sh -s single
+
+ - name: Stop StreamPipes (single)
+ if: always()
+ run: docker compose -f ./streampipes-client-e2e/docker-compose.yml -p
streampipes-client-e2e-single down -v
+
+ - name: Run Java client load balancer test
+ run: |
+ docker compose -f
./streampipes-client-e2e/docker-compose-loadbalance.yml -p
streampipes-client-e2e-lb up -d
+ cd ./streampipes-client-e2e/tool
+ chmod +x ./start-streampipes-client-e2e.sh
+ chmod +x ./java-client-e2e.sh
+ chmod +x ./install-element.sh
+ ./start-streampipes-client-e2e.sh -t java-client-e2e.sh -s lb
+
+ - name: Stop StreamPipes (load balancer)
+ if: always()
+ run: docker compose -f
./streampipes-client-e2e/docker-compose-loadbalance.yml -p
streampipes-client-e2e-lb down -v
\ No newline at end of file
diff --git a/streampipes-client-e2e/README.md b/streampipes-client-e2e/README.md
index dcd05bc21b..0744a57dd6 100644
--- a/streampipes-client-e2e/README.md
+++ b/streampipes-client-e2e/README.md
@@ -21,18 +21,31 @@
## Environment Setup
Before running, ensure that StreamPipe is operational and the corresponding
configuration is input into the `start-streampipes-client-e2e.sh` script.
**Note**: This script must be executed from within the [tool](./tool)
directory.
+
+```shell
+cd ./tool
+./start-streampipes-client-e2e.sh -h 127.0.0.1 -p 8030 -u
[email protected] -pw admin -t go-client-e2e.sh
+```
+
+```shell
+cd ./tool
+./start-streampipes-client-e2e.sh -h 127.0.0.1 -p 8030 -u
[email protected] -pw admin -t java-client-e2e.sh -s single
+```
+
```shell
-./start-streampipes-client-e2e.sh -h 127.0.0.1 -p 8030 -u
[email protected] -pw admin -t go-client-e2e.sh
+cd ./tool
+./start-streampipes-client-e2e.sh -h 127.0.0.1 -p 8030 -u
[email protected] -pw admin -t java-client-e2e.sh -s lb
```
## Usage Instructions
-| Parameter | Default | Required | Description
|
-|-----------|-------------------|----------|-----------------------------------------------------------------------------------------------------------|
-| `-h` | `127.0.0.1` | No | Host address
|
-| `-p` | `8030` | No | Backend port
|
-| `-u` | `[email protected]` | No | User
|
-| `-pw` | `admin` | No | Password
|
-| `-t` | `""` | Yes | Name of the client's E2E test
startup script (the script must be located in the [tool](./tool) directory) |
+| Parameter | Default | Required | Description
|
+|-----------|-------------------|----------|----------------------------------------------------------------|
+| `-h` | `127.0.0.1` | No | Host address
|
+| `-p` | `8030` | No | Backend port
|
+| `-u` | `[email protected]` | No | User
|
+| `-pw` | `admin` | No | Password
|
+| `-t` | `""` | Yes | Name of the client's E2E test
startup script in [tool](./tool) |
+| `-s` | `single` | No | Java test scenario: `single` or
`lb` (ignored by Go scripts) |
## How to add E2E in a new language
1. If you need to define an E2E test in a new language, you need to download
the element you need to use in `install-element.sh`.
diff --git a/streampipes-client-e2e/docker-compose.yml
b/streampipes-client-e2e/docker-compose-loadbalance.yml
similarity index 56%
copy from streampipes-client-e2e/docker-compose.yml
copy to streampipes-client-e2e/docker-compose-loadbalance.yml
index 50092e7e58..ce29ead36b 100644
--- a/streampipes-client-e2e/docker-compose.yml
+++ b/streampipes-client-e2e/docker-compose-loadbalance.yml
@@ -25,20 +25,102 @@ services:
#### apache/streampipes
backend:
image: streampipes_backend:release-validation
+ hostname: backend
ports:
- "8030:8030"
- environment:
- - SP_PRIORITIZED_PROTOCOL=kafka
depends_on:
- couchdb
+ environment:
+ - SP_LOAD_MANAGER_ENABLE=true
+ - SP_SELECTOR=WeightedRandomSelector
+ - SP_MIGRATOR=ThresholdMigrator
+ - SP_INITIAL_SERVICE_USER=sp-service-client
+ -
SP_INITIAL_SERVICE_USER_SECRET=my-apache-streampipes-secret-key-change-me
+ - SP_CLIENT_USER=sp-service-client
+ - SP_CLIENT_SECRET=my-apache-streampipes-secret-key-change-me
+ # Other selector options:
+ # - SP_SELECTOR=WeightedRandomSelector
+ # - SP_SELECTOR=MinimumLoadSelector
+ # Other Migrator options:
+ # - SP_MIGRATOR=TransferMigrator
+ # - SP_MIGRATOR=OverloadMigrator
volumes:
- backend:/root/.streampipes
+ healthcheck:
+ test: ["CMD-SHELL", "nc -z localhost 8030"]
+ interval: 5s
+ timeout: 3s
+ retries: 30
+ start_period: 20s
+ logging: *default-logging
+ networks:
+ spnet:
+
+ extensions-all-jvm-1:
+ image: streampipes_pipeline-elements-all-jvm:release-validation
+ hostname: extensions-all-jvm-1
+ depends_on:
+ backend:
+ condition: service_healthy
+ environment:
+ - SP_CORE_SCHEME=http
+ - SP_CORE_HOST=backend
+ - SP_CORE_PORT=8030
+ - SP_CLIENT_USER=sp-service-client
+ - SP_CLIENT_SECRET=my-apache-streampipes-secret-key-change-me
+ restart: on-failure
+ healthcheck:
+ test: ["CMD-SHELL", "nc -z localhost 8090"]
+ interval: 5s
+ timeout: 3s
+ retries: 30
+ start_period: 20s
+ logging: *default-logging
+ networks:
+ spnet:
+
+ extensions-all-jvm-2:
+ image: streampipes_pipeline-elements-all-jvm:release-validation
+ hostname: extensions-all-jvm-2
+ depends_on:
+ extensions-all-jvm-1:
+ condition: service_healthy
+ environment:
+ - SP_CORE_SCHEME=http
+ - SP_CORE_HOST=backend
+ - SP_CORE_PORT=8030
+ - SP_CLIENT_USER=sp-service-client
+ - SP_CLIENT_SECRET=my-apache-streampipes-secret-key-change-me
+ restart: on-failure
+ healthcheck:
+ test: ["CMD-SHELL", "nc -z localhost 8090"]
+ interval: 5s
+ timeout: 3s
+ retries: 30
+ start_period: 20s
logging: *default-logging
networks:
spnet:
- extensions-all-jvm:
+ extensions-all-jvm-3:
image: streampipes_pipeline-elements-all-jvm:release-validation
+ hostname: extensions-all-jvm-3
+ depends_on:
+ extensions-all-jvm-2:
+ condition: service_healthy
+ environment:
+ - SP_CORE_SCHEME=http
+ - SP_CORE_HOST=backend
+ - SP_CORE_PORT=8030
+ - SP_CLIENT_USER=sp-service-client
+ - SP_CLIENT_SECRET=my-apache-streampipes-secret-key-change-me
+ restart: on-failure
+ healthcheck:
+ test: ["CMD-SHELL", "nc -z localhost 8090"]
+ interval: 5s
+ timeout: 3s
+ retries: 30
+ start_period: 20s
logging: *default-logging
networks:
spnet:
@@ -57,6 +139,8 @@ services:
kafka:
image: apache/kafka:4.1.0
hostname: kafka
+ ports:
+ - "9094:9094"
environment:
- KAFKA_NODE_ID=0
- KAFKA_PROCESS_ROLES=controller,broker
@@ -79,6 +163,15 @@ services:
networks:
spnet:
+ nats:
+ image: nats:2.10-alpine
+ hostname: nats
+ ports:
+ - "4222:4222"
+ logging: *default-logging
+ networks:
+ spnet:
+
influxdb:
image: influxdb:2.6
diff --git a/streampipes-client-e2e/docker-compose.yml
b/streampipes-client-e2e/docker-compose.yml
index 50092e7e58..69f0e8fbbd 100644
--- a/streampipes-client-e2e/docker-compose.yml
+++ b/streampipes-client-e2e/docker-compose.yml
@@ -79,6 +79,15 @@ services:
networks:
spnet:
+ nats:
+ image: nats:2.10-alpine
+ hostname: nats
+ ports:
+ - "4222:4222"
+ logging: *default-logging
+ networks:
+ spnet:
+
influxdb:
image: influxdb:2.6
diff --git a/streampipes-client-e2e/java-client-e2e/pom.xml
b/streampipes-client-e2e/java-client-e2e/pom.xml
new file mode 100644
index 0000000000..9210acbf93
--- /dev/null
+++ b/streampipes-client-e2e/java-client-e2e/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-parent</artifactId>
+ <version>0.99.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>java-client-e2e</artifactId>
+ <name>StreamPipes Java Client E2E</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streampipes</groupId>
+ <artifactId>streampipes-messaging-nats</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- Required at runtime by Apache HTTP Client (fluent-hc) used in
streampipes-client -->
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <useModulePath>false</useModulePath>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <configuration>
+ <propertyExpansion>
+ checkstyle.config.base.path=${project.parent.basedir}/tools/maven
+ </propertyExpansion>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/JavaClientTest.java
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/JavaClientTest.java
new file mode 100644
index 0000000000..76e4dbc23f
--- /dev/null
+++
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/JavaClientTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.e2e;
+
+import org.apache.streampipes.client.e2e.utils.ClientTestSupport;
+import org.apache.streampipes.client.e2e.utils.ClientTestSupport.SensorEvent;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * E2E test for the Java client: creates a Machine Simulator adapter and a
pipeline (Boolean Filter
+ * + NATS sink), publishes events to the input topic, consumes from the output
topic, and asserts
+ * that only events with {@code sensor_fault_flags == true} are received.
+ */
+class JavaClientTest {
+
+ private static final String TEST_PREFIX = "sp-e2e-java-semantic-";
+ private static final String TOPIC_PREFIX = "sp.e2e.java.semantic.topic.";
+ private static final int EXPECTED_RESOURCE_COUNT = 1;
+ private static final Duration ENDPOINT_READY_TIMEOUT =
Duration.ofSeconds(20);
+
+ private final ClientTestSupport support = new ClientTestSupport(TEST_PREFIX);
+
+ @AfterEach
+ void cleanup() {
+ support.cleanupTestResources();
+ }
+
+ /**
+ * Cleans up any leftover resources, creates one adapter and one pipeline
with unique topics,
+ * waits for endpoint assignment, publishes 6 events (3 expected to pass the
filter), consumes
+ * from the output topic, and asserts that exactly the 3 filtered event IDs
are received.
+ */
+ @Test
+ void testJavaClient() {
+ // Ensure clean state before starting the test
+ support.cleanupTestResources();
+
+ String runId = UUID.randomUUID().toString().replace("-", "");
+ String topicIn = TOPIC_PREFIX + "in." + runId;
+ String topicOut = TOPIC_PREFIX + "out." + runId;
+
+ // 1. Setup Resources
+ AdapterDescription adapter = support.createAndStartAdapter(topicIn);
+ support.createAndStartPipeline(adapter, topicIn, topicOut);
+
+ // 2. Wait for background synchronization (Endpoint assignment)
+ support.waitUntilEndpointsReady(
+ EXPECTED_RESOURCE_COUNT,
+ EXPECTED_RESOURCE_COUNT,
+ ENDPOINT_READY_TIMEOUT
+ );
+
+ // 3. Prepare Test Data
+ List<SensorEvent> inputEvents = support.buildBooleanEvents();
+ Set<String> expectedEventIds = support.expectedPassedEventIds(inputEvents);
+
+ // 4. Execution: Publish to input and consume from output via NATS
+ List<Map<String, Object>> consumed = support.publishAndConsumeNats(
+ topicIn,
+ topicOut,
+ inputEvents,
+ (long) expectedEventIds.size()
+ );
+
+ // 5. Verification
+ support.assertFilteredEvents(consumed, expectedEventIds);
+ }
+}
diff --git
a/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/LoadBalanceTest.java
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/LoadBalanceTest.java
new file mode 100644
index 0000000000..6d6d2cc597
--- /dev/null
+++
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/LoadBalanceTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.e2e;
+
+import org.apache.streampipes.client.e2e.utils.ClientTestSupport;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.pipeline.Pipeline;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.awaitility.Awaitility.await;
+
+/**
+ * E2E test that verifies load balancing succeeds for adapters and pipelines.
+ */
+class LoadBalanceTest {
+
+ private static final int RESOURCE_COUNT = 7;
+ private static final int MIN_EXTENSION_INSTANCES = 2;
+ private static final String TEST_PREFIX = "sp-e2e-java-lb-";
+ private static final String EXTENSIONS_SERVICES_PATH =
"api/v2/extensions-services";
+
+ private static final Duration EXTENSION_REGISTRATION_TIMEOUT =
Duration.ofSeconds(90);
+ private static final Duration ENDPOINT_READY_TIMEOUT =
Duration.ofSeconds(30);
+ private static final Duration POLL_INTERVAL = Duration.ofSeconds(2);
+
+ private final ClientTestSupport support = new ClientTestSupport(TEST_PREFIX);
+
+ @AfterEach
+ void cleanup() {
+ support.cleanupTestResources();
+ }
+
+ /**
+ * Cleans up any leftover resources, creates {@value #RESOURCE_COUNT}
adapter+pipeline pairs with
+ * unique topics, waits until all have endpoint URLs, then asserts count and
that endpoints
+ * are spread across at least {@value #MIN_EXTENSION_INSTANCES} instances.
+ */
+ @Test
+ void testLoadBalance() {
+ support.cleanupTestResources();
+
+ // In CI, extension containers start sequentially; wait until at least 3
are registered
+ // so that adapter/pipeline assignment can spread across instances.
+ await()
+ .pollInterval(POLL_INTERVAL)
+ .atMost(EXTENSION_REGISTRATION_TIMEOUT)
+ .until(() -> support.client().customRequest()
+ .getList(EXTENSIONS_SERVICES_PATH,
SpServiceRegistration.class).size() >= MIN_EXTENSION_INSTANCES);
+
+ for (int i = 0; i < RESOURCE_COUNT; i++) {
+ String runId = UUID.randomUUID().toString().replace("-", "");
+ String topicIn = "sp.e2e.java.lb.topic.in." + runId;
+ String topicOut = "sp.e2e.java.lb.topic.out." + runId;
+
+ AdapterDescription adapter = support.createAndStartAdapter(topicIn);
+ support.createAndStartPipeline(adapter, topicIn, topicOut);
+ }
+
+ support.waitUntilEndpointsReady(RESOURCE_COUNT, RESOURCE_COUNT,
ENDPOINT_READY_TIMEOUT);
+
+ List<AdapterDescription> createdAdapters =
support.client().adapters().all().stream()
+ .filter(a -> a.getName() != null &&
a.getName().startsWith(TEST_PREFIX))
+ .toList();
+
+ List<Pipeline> createdPipelines =
support.client().pipelines().all().stream()
+ .filter(p -> p.getName() != null &&
p.getName().startsWith(TEST_PREFIX))
+ .toList();
+
+ Assertions.assertEquals(RESOURCE_COUNT, createdAdapters.size(),
+ String.format("Expected %d adapters, but found %d.",
RESOURCE_COUNT, createdAdapters.size()));
+ Assertions.assertEquals(RESOURCE_COUNT, createdPipelines.size(),
+ String.format("Expected %d pipelines, but found %d.",
RESOURCE_COUNT, createdPipelines.size()));
+
+ // Distinct endpoint URLs: each unique URL implies a different instance
+ Set<String> adapterEndpoints = createdAdapters.stream()
+ .map(AdapterDescription::getSelectedEndpointUrl)
+ .filter(endpoint -> endpoint != null && !endpoint.isBlank())
+ .collect(Collectors.toCollection(HashSet::new));
+
+ Set<String> pipelineEndpoints = createdPipelines.stream()
+ .map(ClientTestSupport::extractProcessorEndpoint)
+ .filter(endpoint -> endpoint != null && !endpoint.isBlank())
+ .collect(Collectors.toCollection(HashSet::new));
+
+ Assertions.assertTrue(adapterEndpoints.size() >= MIN_EXTENSION_INSTANCES,
+ String.format("Adapter load balancing failed: expected >= %d
instances, but found %d. Endpoints: %s",
+ MIN_EXTENSION_INSTANCES, adapterEndpoints.size(),
adapterEndpoints));
+
+ Assertions.assertTrue(pipelineEndpoints.size() >= MIN_EXTENSION_INSTANCES,
+ String.format("Pipeline load balancing failed: expected >= %d
instances, but found %d. Endpoints: %s",
+ MIN_EXTENSION_INSTANCES, pipelineEndpoints.size(),
pipelineEndpoints));
+ }
+}
diff --git
a/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/ClientTestSupport.java
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/ClientTestSupport.java
new file mode 100644
index 0000000000..14b3415f61
--- /dev/null
+++
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/ClientTestSupport.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.e2e.utils;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.grounding.NatsTransportProtocol;
+import org.apache.streampipes.model.message.SuccessMessage;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import org.junit.jupiter.api.Assertions;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Test support for Java client E2E: creates adapters/pipelines, publishes and
consumes via NATS,
+ * and provides assertions for filtered events.
+ */
+public final class ClientTestSupport {
+
+ private static final String DEFAULT_NATS_URL = "nats://127.0.0.1:4222";
+ private static final int DEFAULT_NATS_PORT = 4222;
+ private static final String DEFAULT_NATS_HOST = "127.0.0.1";
+ private static final int WAIT_TIMEOUT_SECONDS = 20;
+
+ private final String testPrefix;
+ private final StreamPipesClient client;
+
+ public ClientTestSupport(String testPrefix) {
+ this.testPrefix = testPrefix;
+ this.client = buildClient();
+ }
+
+ public AdapterDescription createAdapter(String topicIn) {
+ try {
+ AdapterDescription adapter =
PipelineTemplateHelper.buildAdapter(testPrefix, topicIn);
+ client.adapters().create(adapter);
+ return TestResourceHelper.waitForAdapter(client, adapter.getName(),
Duration.ofSeconds(WAIT_TIMEOUT_SECONDS));
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to create adapter for topic: " +
topicIn, e);
+ }
+ }
+
+ /**
+ * Creates an adapter and starts it; fails the test if start fails.
+ *
+ * @param topicIn NATS topic the adapter will produce to
+ * @return the started adapter description
+ */
+ public AdapterDescription createAndStartAdapter(String topicIn) {
+ AdapterDescription adapter = createAdapter(topicIn);
+ assertAdapterStarted(adapter);
+ return adapter;
+ }
+
+ /**
+ * Creates a pipeline (Boolean Filter + NATS Sink) and waits until it
appears in the pipeline list.
+ *
+ * @param adapter the adapter whose stream the pipeline consumes from
+ * @param topicIn NATS topic for pipeline input
+ * @param topicOut NATS topic for pipeline output
+ * @return the created pipeline from the backend
+ */
+ public Pipeline createPipeline(AdapterDescription adapter, String topicIn,
String topicOut) {
+ try {
+ Pipeline pipeline = PipelineTemplateHelper.buildPipeline(testPrefix,
adapter, topicIn, topicOut);
+ client.pipelines().create(pipeline);
+ return TestResourceHelper.waitForPipeline(client, pipeline.getName(),
Duration.ofSeconds(WAIT_TIMEOUT_SECONDS));
+ } catch (Exception e) {
+ throw new IllegalStateException("[pipeline.create] Failed to create
pipeline", e);
+ }
+ }
+
+ /**
+ * Creates a pipeline and starts it; fails the test if start fails.
+ *
+ * @param adapter the adapter whose stream the pipeline consumes from
+ * @param topicIn NATS topic for pipeline input
+ * @param topicOut NATS topic for pipeline output
+ * @return the started pipeline
+ */
+ public Pipeline createAndStartPipeline(AdapterDescription adapter, String
topicIn, String topicOut) {
+ Pipeline pipeline = createPipeline(adapter, topicIn, topicOut);
+ startPipelineWithRetry(pipeline, 3, Duration.ofSeconds(5));
+ return pipeline;
+ }
+
+ /**
+ * Starts the pipeline with retries so that extension service discovery can
complete.
+ *
+ * @param pipeline the pipeline to start
+ * @param maxAttempts number of start attempts
+ * @param delayBetween delay between attempts
+ */
+ public void startPipelineWithRetry(Pipeline pipeline, int maxAttempts,
Duration delayBetween) {
+ PipelineOperationStatus status = null;
+ for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+ status = client.pipelines().start(pipeline.getPipelineId());
+ if (status.isSuccess()) {
+ return;
+ }
+ if (attempt < maxAttempts) {
+ try {
+ Thread.sleep(delayBetween.toMillis());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException("Interrupted while waiting to retry
pipeline start", e);
+ }
+ }
+ }
+ String errorMsg = (status != null) ? status.getTitle() : "unknown";
+ Assertions.fail("[pipeline.start] Pipeline start failed after " +
maxAttempts + " attempts: "
+ + pipeline.getPipelineId() + " - " + errorMsg);
+ }
+
+ /**
+ * Starts the adapter via the client API and asserts success.
+ *
+ * @param adapter the adapter to start
+ */
+ public void assertAdapterStarted(AdapterDescription adapter) {
+ SuccessMessage startStatus =
client.adapters().start(adapter.getElementId());
+ Assertions.assertTrue(startStatus.isSuccess(),
+ "[adapter.start] Adapter start failed: " + adapter.getElementId());
+ }
+
+ /**
+ * Starts the pipeline via the client API and asserts success.
+ *
+ * @param pipeline the pipeline to start
+ */
+ public void assertPipelineStarted(Pipeline pipeline) {
+ PipelineOperationStatus status =
client.pipelines().start(pipeline.getPipelineId());
+ Assertions.assertTrue(status.isSuccess(),
+ "[pipeline.start] Pipeline start failed: " +
pipeline.getPipelineId() + " - " + status.getTitle());
+ }
+
+ /**
+ * Builds a fixed list of sensor events (mix of true/false for boolean
filter testing).
+ *
+ * @return list of 6 events, 3 with {@code sensor_fault_flags == true}
+ */
+ public List<SensorEvent> buildBooleanEvents() {
+ List<SensorEvent> events = new ArrayList<>();
+ events.add(new SensorEvent("event-1", 12.5, 1.2, "sensor-1", true));
+ events.add(new SensorEvent("event-2", 13.0, 1.3, "sensor-2", false));
+ events.add(new SensorEvent("event-3", 14.1, 1.4, "sensor-3", true));
+ events.add(new SensorEvent("event-4", 15.6, 1.5, "sensor-4", false));
+ events.add(new SensorEvent("event-5", 16.0, 1.6, "sensor-5", true));
+ events.add(new SensorEvent("event-6", 17.3, 1.7, "sensor-6", false));
+ return events;
+ }
+
+ /**
+ * Returns event IDs that are expected to pass the Boolean Filter.
+ *
+ * @param inputEvents events sent into the pipeline
+ * @return set of event IDs that should appear in the output
+ */
+ public Set<String> expectedPassedEventIds(List<SensorEvent> inputEvents) {
+ return inputEvents.stream()
+ .filter(SensorEvent::sensorFaultFlags)
+ .map(SensorEvent::eventId)
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Publishes events to {@code topicIn} and consumes from {@code topicOut}.
+ *
+ * @param topicIn NATS topic to publish to
+ * @param topicOut NATS topic to subscribe to
+ * @param inputEvents events to publish
+ * @param expectedEventCount minimum number of events to wait for
+ * @return list of consumed events
+ */
+ public List<Map<String, Object>> publishAndConsumeNats(
+ String topicIn,
+ String topicOut,
+ List<SensorEvent> inputEvents,
+ long expectedEventCount) {
+ NatsTransportProtocol protocolIn = natsProtocolForTopic(topicIn);
+ NatsTransportProtocol protocolOut = natsProtocolForTopic(topicOut);
+
+ List<Map<String, Object>> consumed = new ArrayList<>();
+ var subscription = client.streams().subscribe(
+ streamForTopic(protocolOut),
+ event -> consumed.add(event.getRaw()));
+
+ var producer = client.streams().getProducer(streamForTopic(protocolIn));
+ try {
+ for (SensorEvent event : inputEvents) {
+ producer.publish(event.toMap());
+ }
+ await().atMost(Duration.ofSeconds(WAIT_TIMEOUT_SECONDS))
+ .until(() -> consumed.size() >= expectedEventCount);
+ } finally {
+ producer.close();
+ subscription.unsubscribe();
+ }
+ return consumed;
+ }
+
+ private static SpDataStream streamForTopic(NatsTransportProtocol protocol) {
+ SpDataStream stream = new SpDataStream();
+ stream.setEventGrounding(new EventGrounding(protocol));
+ return stream;
+ }
+
+ private static NatsTransportProtocol natsProtocolForTopic(String topic) {
+ String natsUrl = System.getProperty("test.nats.url", DEFAULT_NATS_URL);
+ String host = DEFAULT_NATS_HOST;
+ int port = DEFAULT_NATS_PORT;
+
+ if (natsUrl.startsWith("nats://")) {
+ String rest = natsUrl.substring(7);
+ int colon = rest.indexOf(':');
+ if (colon > 0) {
+ host = rest.substring(0, colon);
+ port = Integer.parseInt(rest.substring(colon + 1));
+ }
+ }
+ return new NatsTransportProtocol(host, port, topic);
+ }
+
+ /**
+ * Asserts that consumed events contain at least the expected set of event
IDs.
+ *
+ * @param consumed events received from the output topic
+ * @param expectedEventIds expected set of event IDs
+ */
+ public void assertFilteredEvents(List<Map<String, Object>> consumed,
Set<String> expectedEventIds) {
+ Set<String> ourEventIdsInOutput = consumed.stream()
+ .map(event -> String.valueOf(event.get("eventId")))
+ .filter(expectedEventIds::contains)
+ .collect(Collectors.toSet());
+
+ Assertions.assertTrue(ourEventIdsInOutput.containsAll(expectedEventIds),
+ "[nats.consume] Output missing expected event IDs. Expected: " +
expectedEventIds
+ + ", Found: " + ourEventIdsInOutput);
+ }
+
+ /**
+ * Waits until all resources are ready.
+ */
+ public void waitUntilEndpointsReady(int expectedAdapterCount,
+ int expectedPipelineCount,
+ Duration timeout) {
+ TestResourceHelper.waitUntilEndpointsReady(client, testPrefix,
expectedAdapterCount,
+ expectedPipelineCount, timeout);
+ }
+
+ public void cleanupTestResources() {
+ TestResourceHelper.cleanup(client, testPrefix);
+ }
+
+ public StreamPipesClient client() {
+ return client;
+ }
+
+ public static String extractProcessorEndpoint(Pipeline pipeline) {
+ if (pipeline.getSepas() == null || pipeline.getSepas().isEmpty()) {
+ return null;
+ }
+ DataProcessorInvocation processor = pipeline.getSepas().get(0);
+ return processor.getSelectedEndpointUrl();
+ }
+
+ private StreamPipesClient buildClient() {
+ String host = requiredProperty("test.host");
+ int port = Integer.parseInt(requiredProperty("test.port"));
+ String user = requiredProperty("test.username");
+ String apiKey = requiredProperty("test.apikey");
+
+ var streamPipesClient = StreamPipesClient.create(host, port,
+ StreamPipesCredentials.withApiKey(user, apiKey), true);
+ streamPipesClient.registerProtocol(new SpNatsProtocolFactory());
+ return streamPipesClient;
+ }
+
+ private static String requiredProperty(String key) {
+ String value = System.getProperty(key);
+ if (value == null || value.isBlank()) {
+ throw new IllegalStateException("Missing system property: " + key);
+ }
+ return value;
+ }
+
+ /**
+ * Test event payload matching the Machine Simulator schema.
+ */
+ public record SensorEvent(String eventId,
+ double density,
+ double massFlow,
+ String sensorId,
+ boolean sensorFaultFlags) {
+
+ public Map<String, Object> toMap() {
+ Map<String, Object> event = new HashMap<>();
+ event.put("eventId", eventId);
+ event.put("density", density);
+ event.put("mass_flow", massFlow);
+ event.put("sensorId", sensorId);
+ event.put("sensor_fault_flags", sensorFaultFlags);
+ event.put("temperature", 22.5);
+ event.put("timestamp", System.currentTimeMillis());
+ event.put("volume_flow", 2.3);
+ return event;
+ }
+ }
+}
diff --git
a/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/PipelineTemplateHelper.java
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/PipelineTemplateHelper.java
new file mode 100644
index 0000000000..861ca8ac06
--- /dev/null
+++
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/PipelineTemplateHelper.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.e2e.utils;
+
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.grounding.NatsTransportProtocol;
+import org.apache.streampipes.model.output.KeepOutputStrategy;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
+import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+import org.apache.streampipes.model.util.Cloner;
+import org.apache.streampipes.vocabulary.XSD;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Builds adapter and pipeline models for E2E tests: Machine Simulator
adapter, Boolean Filter processor,
+ * and NATS sink, using the StreamPipes model API.
+ */
+final class PipelineTemplateHelper {
+
+ private static final String MACHINE_SIMULATOR_APP_ID =
+ "org.apache.streampipes.connect.iiot.adapters.simulator.machine";
+
+ private static final String BOOLEAN_FILTER_APP_ID =
+
"org.apache.streampipes.processors.filters.jvm.processor.booleanfilter";
+ private static final String BOOLEAN_FILTER_BELONGS_TO = "sp:" +
BOOLEAN_FILTER_APP_ID;
+
+ private static final String NATS_SINK_APP_ID =
"org.apache.streampipes.sinks.brokers.jvm.nats";
+ private static final String NATS_SINK_BELONGS_TO = "sp:" + NATS_SINK_APP_ID;
+
+ private static final String NATS_HOST = "nats";
+ private static final int NATS_PORT = 4222;
+
+ private static final URI XSD_INTEGER = XSD.INTEGER;
+ private static final String XSD_STRING = XSD.STRING.toString();
+ private static final String XSD_FLOAT = XSD.FLOAT.toString();
+ private static final String XSD_BOOLEAN = XSD.BOOLEAN.toString();
+ private static final String XSD_LONG = XSD.LONG.toString();
+
+ private PipelineTemplateHelper() {
+ }
+
+ /**
+ * Builds a Machine Simulator adapter description.
+ */
+ static AdapterDescription buildAdapter(String testPrefix, String topicIn) {
+ AdapterDescription adapter = new AdapterDescription();
+ adapter.setName(testPrefix + "adapter-" + UUID.randomUUID());
+ adapter.setDescription("Java client e2e adapter");
+ adapter.setAppId(MACHINE_SIMULATOR_APP_ID);
+ adapter.setElementId("");
+ adapter.setRev(null);
+ adapter.setRunning(false);
+ adapter.setSelectedEndpointUrl(null);
+ adapter.setVersion(0);
+
+ adapter.setEventGrounding(new EventGrounding(new
NatsTransportProtocol(NATS_HOST, NATS_PORT, topicIn)));
+ adapter.setConfig(buildAdapterConfig());
+ adapter.setDataStream(buildMachineSimulatorDataStream(topicIn));
+ return adapter;
+ }
+
+ /**
+ * Builds a pipeline with Boolean Filter and NATS Sink.
+ */
+ static Pipeline buildPipeline(String testPrefix, AdapterDescription adapter,
String topicIn, String topicOut) {
+ SpDataStream stream = new Cloner().stream(adapter.getDataStream());
+ stream.getEventGrounding().setTransportProtocol(new
NatsTransportProtocol(NATS_HOST, NATS_PORT, topicIn));
+ stream.setDom("s0");
+ if (stream.getConnectedTo() == null) {
+ stream.setConnectedTo(new ArrayList<>());
+ }
+
+ DataProcessorInvocation processor = buildBooleanFilterProcessor(stream,
topicOut);
+ processor.setDom("p0");
+ processor.setConnectedTo(List.of("s0"));
+ processor.setElementId("sp:processor:" + generateShortUuid());
+ processor.getOutputStream().setDom("p0-out");
+
+ DataSinkInvocation sink = buildNatsSink(processor.getOutputStream(),
topicOut);
+ sink.setDom("sink0");
+ sink.setConnectedTo(List.of("p0"));
+ sink.setElementId("sp:sink:" + generateShortUuid());
+
+ Pipeline pipeline = new Pipeline();
+ pipeline.setName(testPrefix + "pipeline-" + UUID.randomUUID());
+ pipeline.setDescription("Java client e2e pipeline");
+ pipeline.setPipelineId(UUID.randomUUID().toString().replace("-", ""));
+ pipeline.setRev(null);
+ pipeline.setStreams(List.of(stream));
+ pipeline.setSepas(List.of(processor));
+ pipeline.setActions(List.of(sink));
+ return pipeline;
+ }
+
+ private static List<StaticProperty> buildAdapterConfig() {
+ List<StaticProperty> config = new ArrayList<>();
+ FreeTextStaticProperty waitTime = new
FreeTextStaticProperty("wait-time-ms", "", "", XSD_INTEGER);
+ waitTime.setValue("200");
+ config.add(waitTime);
+ FreeTextStaticProperty numSensors = new
FreeTextStaticProperty("numberOfSensors", "", "", XSD_INTEGER);
+ numSensors.setValue("1");
+ config.add(numSensors);
+ OneOfStaticProperty simulatorOption = new OneOfStaticProperty(
+ "selected-simulator-option", "Simulator", "Select simulator type");
+ simulatorOption.addOption(new Option("flowrate", true));
+ simulatorOption.addOption(new Option("pressure", false));
+ simulatorOption.addOption(new Option("waterlevel", false));
+ simulatorOption.addOption(new Option("diagnostics", false));
+ config.add(simulatorOption);
+ return config;
+ }
+
+ private static SpDataStream buildMachineSimulatorDataStream(String topicIn) {
+ SpDataStream stream = new SpDataStream();
+ stream.setEventGrounding(new EventGrounding(new
NatsTransportProtocol(NATS_HOST, NATS_PORT, topicIn)));
+ stream.setEventSchema(buildFlowSimulatorEventSchema());
+ return stream;
+ }
+
+ private static EventSchema buildFlowSimulatorEventSchema() {
+ EventSchema schema = new EventSchema();
+ schema.addEventProperty(primitive("eventId", XSD_STRING,
"DIMENSION_PROPERTY"));
+ schema.addEventProperty(primitive("timestamp", XSD_LONG,
"HEADER_PROPERTY"));
+ schema.addEventProperty(primitive("sensorId", XSD_STRING,
"DIMENSION_PROPERTY"));
+ schema.addEventProperty(primitive("mass_flow", XSD_FLOAT,
"MEASUREMENT_PROPERTY"));
+ schema.addEventProperty(primitive("volume_flow", XSD_FLOAT,
"MEASUREMENT_PROPERTY"));
+ schema.addEventProperty(primitive("temperature", XSD_FLOAT,
"MEASUREMENT_PROPERTY"));
+ schema.addEventProperty(primitive("density", XSD_FLOAT,
"MEASUREMENT_PROPERTY"));
+ schema.addEventProperty(primitive("sensor_fault_flags", XSD_BOOLEAN,
"MEASUREMENT_PROPERTY"));
+ return schema;
+ }
+
+ private static EventPropertyPrimitive primitive(String runtimeName, String
runtimeType, String scope) {
+ EventPropertyPrimitive p = new EventPropertyPrimitive(runtimeType,
runtimeName, null, null);
+ p.setPropertyScope(scope);
+ return p;
+ }
+
+ private static DataProcessorInvocation
buildBooleanFilterProcessor(SpDataStream inputStream, String outputTopic) {
+ DataProcessorInvocation proc = new DataProcessorInvocation();
+ proc.setAppId(BOOLEAN_FILTER_APP_ID);
+ proc.setName("Boolean Filter");
+ proc.setDescription("Retains events with a selected boolean value");
+ proc.setBelongsTo(BOOLEAN_FILTER_BELONGS_TO);
+ proc.setInputStreams(List.of(new Cloner().stream(inputStream)));
+ proc.setStreamRequirements(List.of(new Cloner().stream(inputStream)));
+ proc.setStaticProperties(buildBooleanFilterStaticProperties());
+ proc.setSelectedEndpointUrl(null);
+ proc.setOutputStrategies(List.of(new KeepOutputStrategy()));
+
+ SpDataStream outputStream = new Cloner().stream(inputStream);
+ outputStream.getEventGrounding().setTransportProtocol(new
NatsTransportProtocol(NATS_HOST, NATS_PORT, outputTopic));
+ proc.setOutputStream(outputStream);
+ return proc;
+ }
+
+ private static List<StaticProperty> buildBooleanFilterStaticProperties() {
+ List<StaticProperty> props = new ArrayList<>();
+ MappingPropertyUnary mapping = new MappingPropertyUnary("boolean-mapping",
+ "Boolean Field", "The field to filter on");
+ mapping.setRequirementSelector("r0::boolean-mapping");
+ mapping.setSelectedProperty("s0::sensor_fault_flags");
+ mapping.setMapsFromOptions(List.of("s0::sensor_fault_flags"));
+ mapping.setPropertyScope("NONE");
+ props.add(mapping);
+
+ OneOfStaticProperty valueProp = new OneOfStaticProperty("value", "Value",
"Boolean value to pass through");
+ valueProp.addOption(new Option("True", true));
+ valueProp.addOption(new Option("False", false));
+ props.add(valueProp);
+ return props;
+ }
+
+ private static DataSinkInvocation buildNatsSink(SpDataStream inputStream,
String outputTopic) {
+ DataSinkInvocation sink = new DataSinkInvocation();
+ sink.setName("NATS Sink");
+ sink.setDescription("Publishes events to a NATS subject.");
+ sink.setAppId(NATS_SINK_APP_ID);
+ sink.setBelongsTo(NATS_SINK_BELONGS_TO);
+ sink.setVersion(0);
+ sink.setInputStreams(List.of(inputStream));
+ sink.setStaticProperties(buildNatsSinkStaticProperties(outputTopic));
+ sink.setSelectedEndpointUrl(null);
+ return sink;
+ }
+
+ private static List<StaticProperty> buildNatsSinkStaticProperties(String
outputTopic) {
+ List<StaticProperty> props = new ArrayList<>();
+ props.add(FreeTextStaticProperty.of("subject", outputTopic));
+ props.add(FreeTextStaticProperty.of("natsUrls", "nats://nats:4222"));
+ props.add(alternativesProperty("access-mode", "anonymous-alternative",
"username-alternative"));
+ props.add(alternativesProperty("connection-properties",
"none-properties-alternative",
+ "custom-properties-alternative"));
+ return props;
+ }
+
+ private static StaticPropertyAlternatives alternativesProperty(String
internalName,
+ String
selectedId,
+ String
unselectedId) {
+ StaticPropertyAlternatives p = new
StaticPropertyAlternatives(internalName, internalName, internalName);
+ List<StaticPropertyAlternative> alts = new ArrayList<>();
+ alts.add(alternative(selectedId, true));
+ alts.add(alternative(unselectedId, false));
+ p.setAlternatives(alts);
+ return p;
+ }
+
+ private static StaticPropertyAlternative alternative(String internalName,
boolean selected) {
+ StaticPropertyAlternative a = new StaticPropertyAlternative(internalName,
internalName, internalName);
+ a.setSelected(selected);
+ return a;
+ }
+
+ private static String generateShortUuid() {
+ return UUID.randomUUID().toString().replace("-", "").substring(0, 8);
+ }
+}
diff --git
a/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/TestResourceHelper.java
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/TestResourceHelper.java
new file mode 100644
index 0000000000..c2ce7616de
--- /dev/null
+++
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/TestResourceHelper.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.e2e.utils;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.pipeline.Pipeline;
+
+import org.awaitility.core.ConditionTimeoutException;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Helper for waiting on and cleaning up adapters and pipelines in E2E tests.
+ */
+final class TestResourceHelper {
+
+ private static final long POLL_INTERVAL_MS = 500L;
+ private static final long DEFAULT_POLL_INTERVAL_SEC = 1L;
+
+ private TestResourceHelper() {
+ }
+
+ /**
+ * Polls the adapter list until an adapter with the given name appears, or
the timeout is reached.
+ *
+ * @param client StreamPipes client
+ * @param adapterName name of the adapter to find
+ * @param timeout maximum wait duration
+ * @return the adapter description
+ * @throws IllegalStateException if not found within the timeout
+ */
+ static AdapterDescription waitForAdapter(StreamPipesClient client,
+ String adapterName,
+ Duration timeout) {
+ return waitFor(
+ () -> client.adapters().all().stream()
+ .filter(a -> adapterName.equals(a.getName()))
+ .findFirst()
+ .orElse(null),
+ timeout,
+ "Adapter not found after creation: " + adapterName
+ );
+ }
+
+ /**
+ * Polls the pipeline list until a pipeline with the given name appears, or
the timeout is reached.
+ *
+ * @param client StreamPipes client
+ * @param pipelineName name of the pipeline to find
+ * @param timeout maximum wait duration
+ * @return the pipeline
+ * @throws IllegalStateException if not found within the timeout
+ */
+ static Pipeline waitForPipeline(StreamPipesClient client,
+ String pipelineName,
+ Duration timeout) {
+ return waitFor(
+ () -> client.pipelines().all().stream()
+ .filter(p -> pipelineName.equals(p.getName()))
+ .findFirst()
+ .orElse(null),
+ timeout,
+ "Pipeline not found after creation: " + pipelineName
+ );
+ }
+
+ /**
+ * Waits until all adapters and pipelines with the given prefix have
non-blank endpoint URLs.
+ *
+ * @param client StreamPipes client
+ * @param testPrefix name prefix to filter resources
+ * @param expectedAdapterCount minimum number of adapters required
+ * @param expectedPipelineCount minimum number of pipelines required
+ * @param timeout maximum wait duration
+ */
+ static void waitUntilEndpointsReady(StreamPipesClient client,
+ String testPrefix,
+ int expectedAdapterCount,
+ int expectedPipelineCount,
+ Duration timeout) {
+ try {
+ await()
+ .pollInterval(Duration.ofSeconds(DEFAULT_POLL_INTERVAL_SEC))
+ .atMost(timeout)
+ .until(() -> {
+ List<AdapterDescription> adapters =
client.adapters().all().stream()
+ .filter(a -> a.getName() != null &&
a.getName().startsWith(testPrefix))
+ .toList();
+ List<Pipeline> pipelines = client.pipelines().all().stream()
+ .filter(p -> p.getName() != null &&
p.getName().startsWith(testPrefix))
+ .toList();
+
+ boolean adaptersReady = adapters.size() >= expectedAdapterCount
+ && adapters.stream().allMatch(a ->
a.getSelectedEndpointUrl() != null
+ && !a.getSelectedEndpointUrl().isBlank());
+
+ boolean pipelinesReady = pipelines.size() >=
expectedPipelineCount
+ && pipelines.stream().allMatch(p -> {
+ String endpoint =
ClientTestSupport.extractProcessorEndpoint(p);
+ return endpoint != null && !endpoint.isBlank();
+ });
+
+ return adaptersReady && pipelinesReady;
+ });
+ } catch (ConditionTimeoutException e) {
+ Assertions.fail("Endpoint assignment did not finish in " +
timeout.toSeconds() + " seconds");
+ }
+ }
+
+ /**
+ * Stops and deletes all pipelines and adapters whose names start with
{@code testPrefix}.
+ *
+ * @param client StreamPipes client
+ * @param testPrefix name prefix to filter resources
+ */
+ static void cleanup(StreamPipesClient client, String testPrefix) {
+ List<String> errors = new ArrayList<>();
+
+ // Stop and delete pipelines first
+ try {
+ List<Pipeline> pipelines = client.pipelines().all().stream()
+ .filter(p -> p.getName() != null &&
p.getName().startsWith(testPrefix))
+ .sorted(Comparator.comparing(Pipeline::getName))
+ .toList();
+ for (Pipeline pipeline : pipelines) {
+ capture(errors, "stop pipeline " + pipeline.getPipelineId(),
+ () -> client.pipelines().stop(pipeline.getPipelineId()));
+ capture(errors, "delete pipeline " + pipeline.getPipelineId(),
+ () -> client.pipelines().delete(pipeline.getPipelineId()));
+ }
+ } catch (Exception e) {
+ errors.add("scan pipelines failed: " + e.getMessage());
+ }
+
+ // Stop and delete adapters
+ try {
+ List<AdapterDescription> adapters = client.adapters().all().stream()
+ .filter(a -> a.getName() != null &&
a.getName().startsWith(testPrefix))
+ .sorted(Comparator.comparing(AdapterDescription::getName))
+ .toList();
+ for (AdapterDescription adapter : adapters) {
+ capture(errors, "stop adapter " + adapter.getElementId(),
+ () -> client.adapters().stop(adapter.getElementId()));
+ capture(errors, "delete adapter " + adapter.getElementId(),
+ () -> client.adapters().delete(adapter.getElementId()));
+ }
+ } catch (Exception e) {
+ errors.add("scan adapters failed: " + e.getMessage());
+ }
+
+ if (!errors.isEmpty()) {
+ Assertions.fail("Cleanup errors:\n" + String.join("\n", errors));
+ }
+ }
+
+ private static <T> T waitFor(Supplier<T> poll, Duration timeout, String
errorMessage) {
+ try {
+ await()
+ .pollInterval(Duration.ofMillis(POLL_INTERVAL_MS))
+ .atMost(timeout)
+ .until(() -> poll.get() != null);
+ } catch (ConditionTimeoutException e) {
+ throw new IllegalStateException(errorMessage, e);
+ }
+ T value = poll.get();
+ if (value == null) {
+ throw new IllegalStateException(errorMessage);
+ }
+ return value;
+ }
+
+ private static void capture(List<String> errors, String operation,
ThrowingRunnable action) {
+ try {
+ if (operation.startsWith("delete pipeline ")) {
+ runWithSuppressedStderr(action);
+ } else {
+ action.run();
+ }
+ } catch (Exception e) {
+ // Handle known deserialization issue in client response during deletion
+ if (operation.startsWith("delete ")
+ && e.getMessage() != null
+ && e.getMessage().contains("Cannot construct instance of
`org.apache.streampipes.model.message.Message`")) {
+ return;
+ }
+ errors.add(operation + " failed: " + e.getMessage());
+ }
+ }
+
+ private static void runWithSuppressedStderr(ThrowingRunnable action) throws
Exception {
+ PrintStream originalErr = System.err;
+ try (PrintStream suppressedErr = new
PrintStream(OutputStream.nullOutputStream())) {
+ System.setErr(suppressedErr);
+ action.run();
+ } finally {
+ System.setErr(originalErr);
+ }
+ }
+
+ @FunctionalInterface
+ private interface ThrowingRunnable {
+ void run() throws Exception;
+ }
+}
diff --git a/streampipes-client-e2e/tool/install-element.sh
b/streampipes-client-e2e/tool/install-element.sh
old mode 100644
new mode 100755
index 2b3c340f7a..59956a1ca1
--- a/streampipes-client-e2e/tool/install-element.sh
+++ b/streampipes-client-e2e/tool/install-element.sh
@@ -52,10 +52,9 @@ while true; do
esac
done
-
######################Adapters######################
-#machine
+# machine
installRequestBody='{
"appId":"org.apache.streampipes.connect.iiot.adapters.simulator.machine",
"publicElement":true,
@@ -72,11 +71,9 @@ if [ $? -ne 0 ]; then
exit 1
fi
+######################processors######################
-
-######################processor######################
-
-#bool_inverter
+# bool_inverter
installRequestBody='{
"appId":"org.apache.streampipes.processors.transformation.jvm.booloperator.inverter",
"publicElement":true,
@@ -93,9 +90,26 @@ if [ $? -ne 0 ]; then
exit 1
fi
+# booleanfilter
+installRequestBody='{
+
"appId":"org.apache.streampipes.processors.filters.jvm.processor.booleanfilter",
+ "publicElement":true,
+ "serviceTagPrefix":"DATA_PROCESSOR"
+ }'
+
+response=$(curl -s -X POST "http://$HOST:$PORT$INSTALL_ELEMENT_URL" \
+ -H "Content-Type: application/json" \
+ -H "authorization: Bearer $TOKEN" \
+ -d "$installRequestBody")
+if [ $? -ne 0 ]; then
+ echo "$response"
+ echo "Error install boolean filter processor"
+ exit 1
+fi
-######################sink######################
-#datalake
+######################sinks######################
+
+# datalake
installRequestBody='{
"appId":"org.apache.streampipes.sinks.internal.jvm.datalake",
"publicElement":true,
@@ -110,4 +124,21 @@ if [ $? -ne 0 ]; then
echo "$response"
echo "Error install datalake"
exit 1
+fi
+
+# nats sink
+installRequestBody='{
+ "appId":"org.apache.streampipes.sinks.brokers.jvm.nats",
+ "publicElement":true,
+ "serviceTagPrefix":"DATA_SINK"
+ }'
+
+response=$(curl -s -X POST "http://$HOST:$PORT$INSTALL_ELEMENT_URL" \
+ -H "Content-Type: application/json" \
+ -H "authorization: Bearer $TOKEN" \
+ -d "$installRequestBody")
+if [ $? -ne 0 ]; then
+ echo "$response"
+ echo "Error install nats sink"
+ exit 1
fi
\ No newline at end of file
diff --git a/streampipes-client-e2e/tool/java-client-e2e.sh
b/streampipes-client-e2e/tool/java-client-e2e.sh
new file mode 100755
index 0000000000..cbdb9beb0c
--- /dev/null
+++ b/streampipes-client-e2e/tool/java-client-e2e.sh
@@ -0,0 +1,79 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+E2E_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
+
+HOST=""
+PORT=""
+APIKEY=""
+API_KEY_USER_NAME=""
+SCENARIO="${E2E_SCENARIO:-single}"
+TEST_CLASS=""
+
+while true; do
+ case "$1" in
+ -h)
+ HOST="$2"
+ shift 2
+ ;;
+ -p)
+ PORT="$2"
+ shift 2
+ ;;
+ -u)
+ API_KEY_USER_NAME="$2"
+ shift 2
+ ;;
+ -k)
+ APIKEY="$2"
+ shift 2
+ ;;
+ -s)
+ SCENARIO="$2"
+ shift 2
+ ;;
+ -c)
+ TEST_CLASS="$2"
+ shift 2
+ ;;
+ "")
+ break
+ ;;
+ *)
+ shift
+ ;;
+ esac
+done
+
+if [ -z "$TEST_CLASS" ]; then
+ case "$SCENARIO" in
+ single) TEST_CLASS="JavaClientTest" ;;
+ lb) TEST_CLASS="LoadBalanceTest" ;;
+ *)
+ echo "Error: unknown scenario '$SCENARIO', expected single|lb"
+ exit 1
+ ;;
+ esac
+fi
+
+cd "$E2E_ROOT/java-client-e2e" || exit
+mvn test -Dtest="$TEST_CLASS" -Dtest.host="$HOST" -Dtest.port="$PORT"
-Dtest.apikey="$APIKEY" -Dtest.username="$API_KEY_USER_NAME"
+if [ $? -ne 0 ]; then
+ echo "Error: java test failed"
+ exit 1
+fi
+echo "All tests passed successfully"
diff --git a/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh
b/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh
old mode 100644
new mode 100755
index cbab40355f..16523d14de
--- a/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh
+++ b/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh
@@ -20,6 +20,7 @@ E2E_TEST=""
HOST="127.0.0.1"
PORT="8030"
LOGIN_URL="/streampipes-backend/api/v2/auth/login"
+SCENARIO="${E2E_SCENARIO:-single}"
SP_USERNAME="[email protected]"
SP_PASSWORD="admin"
@@ -46,8 +47,12 @@ while true; do
E2E_TEST="$2"
shift 2
;;
+ -s)
+ SCENARIO="$2"
+ shift 2
+ ;;
--help)
- echo "Usage: $0 [-h <ip>] [-p <port>] [-u <username>] [-pw
<password>] [-t <E2E_TEST>]"
+ echo "Usage: $0 [-h <ip>] [-p <port>] [-u <username>] [-pw
<password>] [-t <E2E_TEST>] [-s <single|lb>]"
exit 0
;;
"")
@@ -61,7 +66,7 @@ while true; do
esac
done
-if [ E2E_TEST == "" ]; then
+if [ -z "$E2E_TEST" ]; then
echo "-t is empty"
exit 1
fi
@@ -139,7 +144,7 @@ fi
echo "start e2e test"
chmod +x ./"$E2E_TEST"
-./"$E2E_TEST" -h "$HOST" -p "$PORT" -u "$API_KEY_USER_NAME" -k "$APIKEY"
+./"$E2E_TEST" -h "$HOST" -p "$PORT" -u "$API_KEY_USER_NAME" -k "$APIKEY" -s
"$SCENARIO"
if [ $? -ne 0 ]; then
echo "start $E2E_TEST failed"
exit 1