This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new bdb8fe5b2b feat: add Java client E2E Test and load-balance E2E Test 
(#4235)
bdb8fe5b2b is described below

commit bdb8fe5b2ba5b139386c836cb24e965c5bb6b91d
Author: xinhao liu <[email protected]>
AuthorDate: Tue Mar 24 23:27:04 2026 +0800

    feat: add Java client E2E Test and load-balance E2E Test (#4235)
---
 .github/workflows/java-client-e2e-test.yml         |  80 +++++
 streampipes-client-e2e/README.md                   |  29 +-
 ...-compose.yml => docker-compose-loadbalance.yml} |  99 +++++-
 streampipes-client-e2e/docker-compose.yml          |   9 +
 streampipes-client-e2e/java-client-e2e/pom.xml     |  87 ++++++
 .../streampipes/client/e2e/JavaClientTest.java     |  93 ++++++
 .../streampipes/client/e2e/LoadBalanceTest.java    | 120 ++++++++
 .../client/e2e/utils/ClientTestSupport.java        | 340 +++++++++++++++++++++
 .../client/e2e/utils/PipelineTemplateHelper.java   | 249 +++++++++++++++
 .../client/e2e/utils/TestResourceHelper.java       | 230 ++++++++++++++
 streampipes-client-e2e/tool/install-element.sh     |  47 ++-
 streampipes-client-e2e/tool/java-client-e2e.sh     |  79 +++++
 .../tool/start-streampipes-client-e2e.sh           |  11 +-
 13 files changed, 1451 insertions(+), 22 deletions(-)

diff --git a/.github/workflows/java-client-e2e-test.yml 
b/.github/workflows/java-client-e2e-test.yml
new file mode 100644
index 0000000000..e2f8fdba9a
--- /dev/null
+++ b/.github/workflows/java-client-e2e-test.yml
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: run-java-client-e2e-tests
+
+on:
+  pull_request:
+
+jobs:
+  build-and-test:
+    runs-on: ubuntu-latest
+    steps:
+      - name: Checkout
+        uses: actions/checkout@v5
+
+      - name: Set up Docker Buildx
+        uses: docker/setup-buildx-action@v1
+
+      - name: Cache Docker layers
+        uses: actions/cache@v4
+        with:
+          path: /tmp/.buildx-cache
+          key: ${{ runner.os }}-buildx-${{ github.sha }}
+          restore-keys: |
+            ${{ runner.os }}-buildx-
+
+      - name: Set up JDK 17
+        uses: actions/setup-java@v5
+        with:
+          distribution: 'temurin'
+          java-version: '17'
+          cache: 'maven'
+
+      - name: Build with Maven
+        run: mvn clean install
+        env:
+          DEVELOCITY_ACCESS_KEY: ${{ secrets.STREAMPIPES_DEVELOCITY_ACCESS_KEY 
}}
+
+      - name: Build Docker images
+        run: |
+          docker build -t 
streampipes_pipeline-elements-all-jvm:release-validation 
./streampipes-extensions/streampipes-extensions-all-jvm
+          docker build -t streampipes_backend:release-validation 
./streampipes-service-core
+
+      - name: Run Java client single-instance test
+        run: |
+          docker compose -f ./streampipes-client-e2e/docker-compose.yml -p 
streampipes-client-e2e-single up -d
+          cd ./streampipes-client-e2e/tool
+          chmod +x ./start-streampipes-client-e2e.sh
+          chmod +x ./java-client-e2e.sh
+          chmod +x ./install-element.sh
+          ./start-streampipes-client-e2e.sh -t java-client-e2e.sh -s single
+
+      - name: Stop StreamPipes (single)
+        if: always()
+        run: docker compose -f ./streampipes-client-e2e/docker-compose.yml -p 
streampipes-client-e2e-single down -v
+
+      - name: Run Java client load balancer test
+        run: |
+          docker compose -f 
./streampipes-client-e2e/docker-compose-loadbalance.yml -p 
streampipes-client-e2e-lb up -d
+          cd ./streampipes-client-e2e/tool
+          chmod +x ./start-streampipes-client-e2e.sh
+          chmod +x ./java-client-e2e.sh
+          chmod +x ./install-element.sh
+          ./start-streampipes-client-e2e.sh -t java-client-e2e.sh -s lb
+
+      - name: Stop StreamPipes (load balancer)
+        if: always()
+        run: docker compose -f 
./streampipes-client-e2e/docker-compose-loadbalance.yml -p 
streampipes-client-e2e-lb down -v
\ No newline at end of file
diff --git a/streampipes-client-e2e/README.md b/streampipes-client-e2e/README.md
index dcd05bc21b..0744a57dd6 100644
--- a/streampipes-client-e2e/README.md
+++ b/streampipes-client-e2e/README.md
@@ -21,18 +21,31 @@
 ## Environment Setup
 Before running, ensure that StreamPipe is operational and the corresponding 
configuration is input into the `start-streampipes-client-e2e.sh` script.
 **Note**: This script must be executed from within the [tool](./tool) 
directory.
+
+```shell
+cd ./tool
+./start-streampipes-client-e2e.sh -h 127.0.0.1 -p 8030 -u 
[email protected] -pw admin -t go-client-e2e.sh
+```
+
+```shell
+cd ./tool
+./start-streampipes-client-e2e.sh -h 127.0.0.1 -p 8030 -u 
[email protected] -pw admin -t java-client-e2e.sh -s single
+```
+
 ```shell
-./start-streampipes-client-e2e.sh -h 127.0.0.1 -p 8030 -u 
[email protected]   -pw admin -t go-client-e2e.sh
+cd ./tool
+./start-streampipes-client-e2e.sh -h 127.0.0.1 -p 8030 -u 
[email protected] -pw admin -t java-client-e2e.sh -s lb
 ```
 
 ## Usage Instructions
-| Parameter | Default          | Required | Description                        
                                                                       |
-|-----------|-------------------|----------|-----------------------------------------------------------------------------------------------------------|
-| `-h`      | `127.0.0.1`       | No       | Host address                      
                                                                        |
-| `-p`      | `8030`            | No       | Backend port                      
                                                                        |
-| `-u`      | `[email protected]` | No       | User                 
                                                                                
     |
-| `-pw`     | `admin`           | No       | Password                          
                                                                        |
-| `-t`      | `""`              | Yes      | Name of the client's E2E test 
startup script (the script must be located in the [tool](./tool) directory) |
+| Parameter | Default          | Required | Description                        
                            |
+|-----------|-------------------|----------|----------------------------------------------------------------|
+| `-h`      | `127.0.0.1`       | No       | Host address                      
                             |
+| `-p`      | `8030`            | No       | Backend port                      
                             |
+| `-u`      | `[email protected]` | No       | User                 
                                          |
+| `-pw`     | `admin`           | No       | Password                          
                             |
+| `-t`      | `""`              | Yes      | Name of the client's E2E test 
startup script in [tool](./tool) |
+| `-s`      | `single`          | No       | Java test scenario: `single` or 
`lb` (ignored by Go scripts)   |
 
 ## How to add E2E in a new language
 1. If you need to define an E2E test in a new language, you need to download 
the element you need to use in `install-element.sh`.
diff --git a/streampipes-client-e2e/docker-compose.yml 
b/streampipes-client-e2e/docker-compose-loadbalance.yml
similarity index 56%
copy from streampipes-client-e2e/docker-compose.yml
copy to streampipes-client-e2e/docker-compose-loadbalance.yml
index 50092e7e58..ce29ead36b 100644
--- a/streampipes-client-e2e/docker-compose.yml
+++ b/streampipes-client-e2e/docker-compose-loadbalance.yml
@@ -25,20 +25,102 @@ services:
   #### apache/streampipes
   backend:
     image: streampipes_backend:release-validation
+    hostname: backend
     ports:
       - "8030:8030"
-    environment:
-      - SP_PRIORITIZED_PROTOCOL=kafka
     depends_on:
       - couchdb
+    environment:
+      - SP_LOAD_MANAGER_ENABLE=true
+      - SP_SELECTOR=WeightedRandomSelector
+      - SP_MIGRATOR=ThresholdMigrator
+      - SP_INITIAL_SERVICE_USER=sp-service-client
+      - 
SP_INITIAL_SERVICE_USER_SECRET=my-apache-streampipes-secret-key-change-me
+      - SP_CLIENT_USER=sp-service-client
+      - SP_CLIENT_SECRET=my-apache-streampipes-secret-key-change-me
+      # Other selector options:
+      # - SP_SELECTOR=WeightedRandomSelector
+      # - SP_SELECTOR=MinimumLoadSelector
+      # Other Migrator options:
+      # - SP_MIGRATOR=TransferMigrator
+      # - SP_MIGRATOR=OverloadMigrator
     volumes:
       - backend:/root/.streampipes
+    healthcheck:
+      test: ["CMD-SHELL", "nc -z localhost 8030"]
+      interval: 5s
+      timeout: 3s
+      retries: 30
+      start_period: 20s
+    logging: *default-logging
+    networks:
+      spnet:
+
+  extensions-all-jvm-1:
+    image: streampipes_pipeline-elements-all-jvm:release-validation
+    hostname: extensions-all-jvm-1
+    depends_on:
+      backend:
+        condition: service_healthy
+    environment:
+      - SP_CORE_SCHEME=http
+      - SP_CORE_HOST=backend
+      - SP_CORE_PORT=8030
+      - SP_CLIENT_USER=sp-service-client
+      - SP_CLIENT_SECRET=my-apache-streampipes-secret-key-change-me
+    restart: on-failure
+    healthcheck:
+      test: ["CMD-SHELL", "nc -z localhost 8090"]
+      interval: 5s
+      timeout: 3s
+      retries: 30
+      start_period: 20s
+    logging: *default-logging
+    networks:
+      spnet:
+
+  extensions-all-jvm-2:
+    image: streampipes_pipeline-elements-all-jvm:release-validation
+    hostname: extensions-all-jvm-2
+    depends_on:
+      extensions-all-jvm-1:
+        condition: service_healthy
+    environment:
+      - SP_CORE_SCHEME=http
+      - SP_CORE_HOST=backend
+      - SP_CORE_PORT=8030
+      - SP_CLIENT_USER=sp-service-client
+      - SP_CLIENT_SECRET=my-apache-streampipes-secret-key-change-me
+    restart: on-failure
+    healthcheck:
+      test: ["CMD-SHELL", "nc -z localhost 8090"]
+      interval: 5s
+      timeout: 3s
+      retries: 30
+      start_period: 20s
     logging: *default-logging
     networks:
       spnet:
 
-  extensions-all-jvm:
+  extensions-all-jvm-3:
     image: streampipes_pipeline-elements-all-jvm:release-validation
+    hostname: extensions-all-jvm-3
+    depends_on:
+      extensions-all-jvm-2:
+        condition: service_healthy
+    environment:
+      - SP_CORE_SCHEME=http
+      - SP_CORE_HOST=backend
+      - SP_CORE_PORT=8030
+      - SP_CLIENT_USER=sp-service-client
+      - SP_CLIENT_SECRET=my-apache-streampipes-secret-key-change-me
+    restart: on-failure
+    healthcheck:
+      test: ["CMD-SHELL", "nc -z localhost 8090"]
+      interval: 5s
+      timeout: 3s
+      retries: 30
+      start_period: 20s
     logging: *default-logging
     networks:
       spnet:
@@ -57,6 +139,8 @@ services:
   kafka:
     image: apache/kafka:4.1.0
     hostname: kafka
+    ports:
+      - "9094:9094"
     environment:
       - KAFKA_NODE_ID=0
       - KAFKA_PROCESS_ROLES=controller,broker
@@ -79,6 +163,15 @@ services:
     networks:
       spnet:
 
+  nats:
+    image: nats:2.10-alpine
+    hostname: nats
+    ports:
+      - "4222:4222"
+    logging: *default-logging
+    networks:
+      spnet:
+
 
   influxdb:
     image: influxdb:2.6
diff --git a/streampipes-client-e2e/docker-compose.yml 
b/streampipes-client-e2e/docker-compose.yml
index 50092e7e58..69f0e8fbbd 100644
--- a/streampipes-client-e2e/docker-compose.yml
+++ b/streampipes-client-e2e/docker-compose.yml
@@ -79,6 +79,15 @@ services:
     networks:
       spnet:
 
+  nats:
+    image: nats:2.10-alpine
+    hostname: nats
+    ports:
+      - "4222:4222"
+    logging: *default-logging
+    networks:
+      spnet:
+
 
   influxdb:
     image: influxdb:2.6
diff --git a/streampipes-client-e2e/java-client-e2e/pom.xml 
b/streampipes-client-e2e/java-client-e2e/pom.xml
new file mode 100644
index 0000000000..9210acbf93
--- /dev/null
+++ b/streampipes-client-e2e/java-client-e2e/pom.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+         http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.streampipes</groupId>
+    <artifactId>streampipes-parent</artifactId>
+    <version>0.99.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>java-client-e2e</artifactId>
+  <name>StreamPipes Java Client E2E</name>
+  <packaging>jar</packaging>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.streampipes</groupId>
+      <artifactId>streampipes-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.streampipes</groupId>
+      <artifactId>streampipes-messaging-nats</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <!-- Required at runtime by Apache HTTP Client (fluent-hc) used in 
streampipes-client -->
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-api</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.junit.jupiter</groupId>
+      <artifactId>junit-jupiter-engine</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.awaitility</groupId>
+      <artifactId>awaitility</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <useModulePath>false</useModulePath>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <configuration>
+          <propertyExpansion>
+            checkstyle.config.base.path=${project.parent.basedir}/tools/maven
+          </propertyExpansion>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/JavaClientTest.java
 
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/JavaClientTest.java
new file mode 100644
index 0000000000..76e4dbc23f
--- /dev/null
+++ 
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/JavaClientTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.e2e;
+
+import org.apache.streampipes.client.e2e.utils.ClientTestSupport;
+import org.apache.streampipes.client.e2e.utils.ClientTestSupport.SensorEvent;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * E2E test for the Java client: creates a Machine Simulator adapter and a 
pipeline (Boolean Filter
+ * + NATS sink), publishes events to the input topic, consumes from the output 
topic, and asserts
+ * that only events with {@code sensor_fault_flags == true} are received.
+ */
+class JavaClientTest {
+
+  private static final String TEST_PREFIX = "sp-e2e-java-semantic-";
+  private static final String TOPIC_PREFIX = "sp.e2e.java.semantic.topic.";
+  private static final int EXPECTED_RESOURCE_COUNT = 1;
+  private static final Duration ENDPOINT_READY_TIMEOUT = 
Duration.ofSeconds(20);
+
+  private final ClientTestSupport support = new ClientTestSupport(TEST_PREFIX);
+
+  @AfterEach
+  void cleanup() {
+    support.cleanupTestResources();
+  }
+
+  /**
+   * Cleans up any leftover resources, creates one adapter and one pipeline 
with unique topics,
+   * waits for endpoint assignment, publishes 6 events (3 expected to pass the 
filter), consumes
+   * from the output topic, and asserts that exactly the 3 filtered event IDs 
are received.
+   */
+  @Test
+  void testJavaClient() {
+    // Ensure clean state before starting the test
+    support.cleanupTestResources();
+
+    String runId = UUID.randomUUID().toString().replace("-", "");
+    String topicIn = TOPIC_PREFIX + "in." + runId;
+    String topicOut = TOPIC_PREFIX + "out." + runId;
+
+    // 1. Setup Resources
+    AdapterDescription adapter = support.createAndStartAdapter(topicIn);
+    support.createAndStartPipeline(adapter, topicIn, topicOut);
+
+    // 2. Wait for background synchronization (Endpoint assignment)
+    support.waitUntilEndpointsReady(
+            EXPECTED_RESOURCE_COUNT,
+            EXPECTED_RESOURCE_COUNT,
+            ENDPOINT_READY_TIMEOUT
+    );
+
+    // 3. Prepare Test Data
+    List<SensorEvent> inputEvents = support.buildBooleanEvents();
+    Set<String> expectedEventIds = support.expectedPassedEventIds(inputEvents);
+
+    // 4. Execution: Publish to input and consume from output via NATS
+    List<Map<String, Object>> consumed = support.publishAndConsumeNats(
+            topicIn,
+            topicOut,
+            inputEvents,
+            (long) expectedEventIds.size()
+    );
+
+    // 5. Verification
+    support.assertFilteredEvents(consumed, expectedEventIds);
+  }
+}
diff --git 
a/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/LoadBalanceTest.java
 
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/LoadBalanceTest.java
new file mode 100644
index 0000000000..6d6d2cc597
--- /dev/null
+++ 
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/LoadBalanceTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.e2e;
+
+import org.apache.streampipes.client.e2e.utils.ClientTestSupport;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import 
org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
+import org.apache.streampipes.model.pipeline.Pipeline;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.awaitility.Awaitility.await;
+
+/**
+ * E2E test that verifies load balancing succeeds for adapters and pipelines.
+ */
+class LoadBalanceTest {
+
+  private static final int RESOURCE_COUNT = 7;
+  private static final int MIN_EXTENSION_INSTANCES = 2;
+  private static final String TEST_PREFIX = "sp-e2e-java-lb-";
+  private static final String EXTENSIONS_SERVICES_PATH = 
"api/v2/extensions-services";
+
+  private static final Duration EXTENSION_REGISTRATION_TIMEOUT = 
Duration.ofSeconds(90);
+  private static final Duration ENDPOINT_READY_TIMEOUT = 
Duration.ofSeconds(30);
+  private static final Duration POLL_INTERVAL = Duration.ofSeconds(2);
+
+  private final ClientTestSupport support = new ClientTestSupport(TEST_PREFIX);
+
+  @AfterEach
+  void cleanup() {
+    support.cleanupTestResources();
+  }
+
+  /**
+   * Cleans up any leftover resources, creates {@value #RESOURCE_COUNT} 
adapter+pipeline pairs with
+   * unique topics, waits until all have endpoint URLs, then asserts count and 
that endpoints
+   * are spread across at least {@value #MIN_EXTENSION_INSTANCES} instances.
+   */
+  @Test
+  void testLoadBalance() {
+    support.cleanupTestResources();
+
+    // In CI, extension containers start sequentially; wait until at least 3 
are registered
+    // so that adapter/pipeline assignment can spread across instances.
+    await()
+            .pollInterval(POLL_INTERVAL)
+            .atMost(EXTENSION_REGISTRATION_TIMEOUT)
+            .until(() -> support.client().customRequest()
+                    .getList(EXTENSIONS_SERVICES_PATH, 
SpServiceRegistration.class).size() >= MIN_EXTENSION_INSTANCES);
+
+    for (int i = 0; i < RESOURCE_COUNT; i++) {
+      String runId = UUID.randomUUID().toString().replace("-", "");
+      String topicIn = "sp.e2e.java.lb.topic.in." + runId;
+      String topicOut = "sp.e2e.java.lb.topic.out." + runId;
+
+      AdapterDescription adapter = support.createAndStartAdapter(topicIn);
+      support.createAndStartPipeline(adapter, topicIn, topicOut);
+    }
+
+    support.waitUntilEndpointsReady(RESOURCE_COUNT, RESOURCE_COUNT, 
ENDPOINT_READY_TIMEOUT);
+
+    List<AdapterDescription> createdAdapters = 
support.client().adapters().all().stream()
+            .filter(a -> a.getName() != null && 
a.getName().startsWith(TEST_PREFIX))
+            .toList();
+
+    List<Pipeline> createdPipelines = 
support.client().pipelines().all().stream()
+            .filter(p -> p.getName() != null && 
p.getName().startsWith(TEST_PREFIX))
+            .toList();
+
+    Assertions.assertEquals(RESOURCE_COUNT, createdAdapters.size(),
+            String.format("Expected %d adapters, but found %d.", 
RESOURCE_COUNT, createdAdapters.size()));
+    Assertions.assertEquals(RESOURCE_COUNT, createdPipelines.size(),
+            String.format("Expected %d pipelines, but found %d.", 
RESOURCE_COUNT, createdPipelines.size()));
+
+    // Distinct endpoint URLs: each unique URL implies a different instance
+    Set<String> adapterEndpoints = createdAdapters.stream()
+            .map(AdapterDescription::getSelectedEndpointUrl)
+            .filter(endpoint -> endpoint != null && !endpoint.isBlank())
+            .collect(Collectors.toCollection(HashSet::new));
+
+    Set<String> pipelineEndpoints = createdPipelines.stream()
+            .map(ClientTestSupport::extractProcessorEndpoint)
+            .filter(endpoint -> endpoint != null && !endpoint.isBlank())
+            .collect(Collectors.toCollection(HashSet::new));
+
+    Assertions.assertTrue(adapterEndpoints.size() >= MIN_EXTENSION_INSTANCES,
+            String.format("Adapter load balancing failed: expected >= %d 
instances, but found %d. Endpoints: %s",
+                    MIN_EXTENSION_INSTANCES, adapterEndpoints.size(), 
adapterEndpoints));
+
+    Assertions.assertTrue(pipelineEndpoints.size() >= MIN_EXTENSION_INSTANCES,
+            String.format("Pipeline load balancing failed: expected >= %d 
instances, but found %d. Endpoints: %s",
+                    MIN_EXTENSION_INSTANCES, pipelineEndpoints.size(), 
pipelineEndpoints));
+  }
+}
diff --git 
a/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/ClientTestSupport.java
 
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/ClientTestSupport.java
new file mode 100644
index 0000000000..14b3415f61
--- /dev/null
+++ 
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/ClientTestSupport.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.e2e.utils;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.client.StreamPipesCredentials;
+import org.apache.streampipes.messaging.nats.SpNatsProtocolFactory;
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.grounding.NatsTransportProtocol;
+import org.apache.streampipes.model.message.SuccessMessage;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.pipeline.PipelineOperationStatus;
+
+import org.junit.jupiter.api.Assertions;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Test support for Java client E2E: creates adapters/pipelines, publishes and 
consumes via NATS,
+ * and provides assertions for filtered events.
+ */
+public final class ClientTestSupport {
+
+  private static final String DEFAULT_NATS_URL = "nats://127.0.0.1:4222";
+  private static final int DEFAULT_NATS_PORT = 4222;
+  private static final String DEFAULT_NATS_HOST = "127.0.0.1";
+  private static final int WAIT_TIMEOUT_SECONDS = 20;
+
+  private final String testPrefix;
+  private final StreamPipesClient client;
+
+  public ClientTestSupport(String testPrefix) {
+    this.testPrefix = testPrefix;
+    this.client = buildClient();
+  }
+
+  public AdapterDescription createAdapter(String topicIn) {
+    try {
+      AdapterDescription adapter = 
PipelineTemplateHelper.buildAdapter(testPrefix, topicIn);
+      client.adapters().create(adapter);
+      return TestResourceHelper.waitForAdapter(client, adapter.getName(), 
Duration.ofSeconds(WAIT_TIMEOUT_SECONDS));
+    } catch (Exception e) {
+      throw new IllegalStateException("Failed to create adapter for topic: " + 
topicIn, e);
+    }
+  }
+
+  /**
+   * Creates an adapter and starts it; fails the test if start fails.
+   *
+   * @param topicIn NATS topic the adapter will produce to
+   * @return the started adapter description
+   */
+  public AdapterDescription createAndStartAdapter(String topicIn) {
+    AdapterDescription adapter = createAdapter(topicIn);
+    assertAdapterStarted(adapter);
+    return adapter;
+  }
+
+  /**
+   * Creates a pipeline (Boolean Filter + NATS Sink) and waits until it 
appears in the pipeline list.
+   *
+   * @param adapter  the adapter whose stream the pipeline consumes from
+   * @param topicIn  NATS topic for pipeline input
+   * @param topicOut NATS topic for pipeline output
+   * @return the created pipeline from the backend
+   */
+  public Pipeline createPipeline(AdapterDescription adapter, String topicIn, 
String topicOut) {
+    try {
+      Pipeline pipeline = PipelineTemplateHelper.buildPipeline(testPrefix, 
adapter, topicIn, topicOut);
+      client.pipelines().create(pipeline);
+      return TestResourceHelper.waitForPipeline(client, pipeline.getName(), 
Duration.ofSeconds(WAIT_TIMEOUT_SECONDS));
+    } catch (Exception e) {
+      throw new IllegalStateException("[pipeline.create] Failed to create 
pipeline", e);
+    }
+  }
+
+  /**
+   * Creates a pipeline and starts it; fails the test if start fails.
+   *
+   * @param adapter  the adapter whose stream the pipeline consumes from
+   * @param topicIn  NATS topic for pipeline input
+   * @param topicOut NATS topic for pipeline output
+   * @return the started pipeline
+   */
+  public Pipeline createAndStartPipeline(AdapterDescription adapter, String 
topicIn, String topicOut) {
+    Pipeline pipeline = createPipeline(adapter, topicIn, topicOut);
+    startPipelineWithRetry(pipeline, 3, Duration.ofSeconds(5));
+    return pipeline;
+  }
+
+  /**
+   * Starts the pipeline with retries so that extension service discovery can 
complete.
+   *
+   * @param pipeline   the pipeline to start
+   * @param maxAttempts number of start attempts
+   * @param delayBetween delay between attempts
+   */
+  public void startPipelineWithRetry(Pipeline pipeline, int maxAttempts, 
Duration delayBetween) {
+    PipelineOperationStatus status = null;
+    for (int attempt = 1; attempt <= maxAttempts; attempt++) {
+      status = client.pipelines().start(pipeline.getPipelineId());
+      if (status.isSuccess()) {
+        return;
+      }
+      if (attempt < maxAttempts) {
+        try {
+          Thread.sleep(delayBetween.toMillis());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new IllegalStateException("Interrupted while waiting to retry 
pipeline start", e);
+        }
+      }
+    }
+    String errorMsg = (status != null) ? status.getTitle() : "unknown";
+    Assertions.fail("[pipeline.start] Pipeline start failed after " + 
maxAttempts + " attempts: "
+            + pipeline.getPipelineId() + " - " + errorMsg);
+  }
+
+  /**
+   * Starts the adapter via the client API and asserts success.
+   *
+   * @param adapter the adapter to start
+   */
+  public void assertAdapterStarted(AdapterDescription adapter) {
+    SuccessMessage startStatus = 
client.adapters().start(adapter.getElementId());
+    Assertions.assertTrue(startStatus.isSuccess(),
+            "[adapter.start] Adapter start failed: " + adapter.getElementId());
+  }
+
+  /**
+   * Starts the pipeline via the client API and asserts success.
+   *
+   * @param pipeline the pipeline to start
+   */
+  public void assertPipelineStarted(Pipeline pipeline) {
+    PipelineOperationStatus status = 
client.pipelines().start(pipeline.getPipelineId());
+    Assertions.assertTrue(status.isSuccess(),
+            "[pipeline.start] Pipeline start failed: " + 
pipeline.getPipelineId() + " - " + status.getTitle());
+  }
+
+  /**
+   * Builds a fixed list of sensor events (mix of true/false for boolean 
filter testing).
+   *
+   * @return list of 6 events, 3 with {@code sensor_fault_flags == true}
+   */
+  public List<SensorEvent> buildBooleanEvents() {
+    List<SensorEvent> events = new ArrayList<>();
+    events.add(new SensorEvent("event-1", 12.5, 1.2, "sensor-1", true));
+    events.add(new SensorEvent("event-2", 13.0, 1.3, "sensor-2", false));
+    events.add(new SensorEvent("event-3", 14.1, 1.4, "sensor-3", true));
+    events.add(new SensorEvent("event-4", 15.6, 1.5, "sensor-4", false));
+    events.add(new SensorEvent("event-5", 16.0, 1.6, "sensor-5", true));
+    events.add(new SensorEvent("event-6", 17.3, 1.7, "sensor-6", false));
+    return events;
+  }
+
+  /**
+   * Returns event IDs that are expected to pass the Boolean Filter.
+   *
+   * @param inputEvents events sent into the pipeline
+   * @return set of event IDs that should appear in the output
+   */
+  public Set<String> expectedPassedEventIds(List<SensorEvent> inputEvents) {
+    return inputEvents.stream()
+            .filter(SensorEvent::sensorFaultFlags)
+            .map(SensorEvent::eventId)
+            .collect(Collectors.toSet());
+  }
+
+  /**
+   * Publishes events to {@code topicIn} and consumes from {@code topicOut}.
+   *
+   * @param topicIn            NATS topic to publish to
+   * @param topicOut           NATS topic to subscribe to
+   * @param inputEvents        events to publish
+   * @param expectedEventCount minimum number of events to wait for
+   * @return list of consumed events
+   */
+  public List<Map<String, Object>> publishAndConsumeNats(
+          String topicIn,
+          String topicOut,
+          List<SensorEvent> inputEvents,
+          long expectedEventCount) {
+    NatsTransportProtocol protocolIn = natsProtocolForTopic(topicIn);
+    NatsTransportProtocol protocolOut = natsProtocolForTopic(topicOut);
+
+    List<Map<String, Object>> consumed = new ArrayList<>();
+    var subscription = client.streams().subscribe(
+            streamForTopic(protocolOut),
+            event -> consumed.add(event.getRaw()));
+
+    var producer = client.streams().getProducer(streamForTopic(protocolIn));
+    try {
+      for (SensorEvent event : inputEvents) {
+        producer.publish(event.toMap());
+      }
+      await().atMost(Duration.ofSeconds(WAIT_TIMEOUT_SECONDS))
+              .until(() -> consumed.size() >= expectedEventCount);
+    } finally {
+      producer.close();
+      subscription.unsubscribe();
+    }
+    return consumed;
+  }
+
+  private static SpDataStream streamForTopic(NatsTransportProtocol protocol) {
+    SpDataStream stream = new SpDataStream();
+    stream.setEventGrounding(new EventGrounding(protocol));
+    return stream;
+  }
+
+  private static NatsTransportProtocol natsProtocolForTopic(String topic) {
+    String natsUrl = System.getProperty("test.nats.url", DEFAULT_NATS_URL);
+    String host = DEFAULT_NATS_HOST;
+    int port = DEFAULT_NATS_PORT;
+
+    if (natsUrl.startsWith("nats://")) {
+      String rest = natsUrl.substring(7);
+      int colon = rest.indexOf(':');
+      if (colon > 0) {
+        host = rest.substring(0, colon);
+        port = Integer.parseInt(rest.substring(colon + 1));
+      }
+    }
+    return new NatsTransportProtocol(host, port, topic);
+  }
+
+  /**
+   * Asserts that consumed events contain at least the expected set of event 
IDs.
+   *
+   * @param consumed          events received from the output topic
+   * @param expectedEventIds  expected set of event IDs
+   */
+  public void assertFilteredEvents(List<Map<String, Object>> consumed, 
Set<String> expectedEventIds) {
+    Set<String> ourEventIdsInOutput = consumed.stream()
+            .map(event -> String.valueOf(event.get("eventId")))
+            .filter(expectedEventIds::contains)
+            .collect(Collectors.toSet());
+
+    Assertions.assertTrue(ourEventIdsInOutput.containsAll(expectedEventIds),
+            "[nats.consume] Output missing expected event IDs. Expected: " + 
expectedEventIds
+                    + ", Found: " + ourEventIdsInOutput);
+  }
+
+  /**
+   * Waits until all resources are ready.
+   */
+  public void waitUntilEndpointsReady(int expectedAdapterCount,
+                                      int expectedPipelineCount,
+                                      Duration timeout) {
+    TestResourceHelper.waitUntilEndpointsReady(client, testPrefix, 
expectedAdapterCount,
+            expectedPipelineCount, timeout);
+  }
+
+  public void cleanupTestResources() {
+    TestResourceHelper.cleanup(client, testPrefix);
+  }
+
+  public StreamPipesClient client() {
+    return client;
+  }
+
+  public static String extractProcessorEndpoint(Pipeline pipeline) {
+    if (pipeline.getSepas() == null || pipeline.getSepas().isEmpty()) {
+      return null;
+    }
+    DataProcessorInvocation processor = pipeline.getSepas().get(0);
+    return processor.getSelectedEndpointUrl();
+  }
+
+  private StreamPipesClient buildClient() {
+    String host = requiredProperty("test.host");
+    int port = Integer.parseInt(requiredProperty("test.port"));
+    String user = requiredProperty("test.username");
+    String apiKey = requiredProperty("test.apikey");
+
+    var streamPipesClient = StreamPipesClient.create(host, port,
+            StreamPipesCredentials.withApiKey(user, apiKey), true);
+    streamPipesClient.registerProtocol(new SpNatsProtocolFactory());
+    return streamPipesClient;
+  }
+
+  private static String requiredProperty(String key) {
+    String value = System.getProperty(key);
+    if (value == null || value.isBlank()) {
+      throw new IllegalStateException("Missing system property: " + key);
+    }
+    return value;
+  }
+
+  /**
+   * Test event payload matching the Machine Simulator schema.
+   */
+  public record SensorEvent(String eventId,
+                            double density,
+                            double massFlow,
+                            String sensorId,
+                            boolean sensorFaultFlags) {
+
+    public Map<String, Object> toMap() {
+      Map<String, Object> event = new HashMap<>();
+      event.put("eventId", eventId);
+      event.put("density", density);
+      event.put("mass_flow", massFlow);
+      event.put("sensorId", sensorId);
+      event.put("sensor_fault_flags", sensorFaultFlags);
+      event.put("temperature", 22.5);
+      event.put("timestamp", System.currentTimeMillis());
+      event.put("volume_flow", 2.3);
+      return event;
+    }
+  }
+}
diff --git 
a/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/PipelineTemplateHelper.java
 
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/PipelineTemplateHelper.java
new file mode 100644
index 0000000000..861ca8ac06
--- /dev/null
+++ 
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/PipelineTemplateHelper.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.e2e.utils;
+
+import org.apache.streampipes.model.SpDataStream;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.graph.DataSinkInvocation;
+import org.apache.streampipes.model.grounding.EventGrounding;
+import org.apache.streampipes.model.grounding.NatsTransportProtocol;
+import org.apache.streampipes.model.output.KeepOutputStrategy;
+import org.apache.streampipes.model.pipeline.Pipeline;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.MappingPropertyUnary;
+import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticProperty;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternative;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+import org.apache.streampipes.model.util.Cloner;
+import org.apache.streampipes.vocabulary.XSD;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+/**
+ * Builds adapter and pipeline models for E2E tests: Machine Simulator 
adapter, Boolean Filter processor,
+ * and NATS sink, using the StreamPipes model API.
+ */
+final class PipelineTemplateHelper {
+
+  private static final String MACHINE_SIMULATOR_APP_ID =
+          "org.apache.streampipes.connect.iiot.adapters.simulator.machine";
+
+  private static final String BOOLEAN_FILTER_APP_ID =
+          
"org.apache.streampipes.processors.filters.jvm.processor.booleanfilter";
+  private static final String BOOLEAN_FILTER_BELONGS_TO = "sp:" + 
BOOLEAN_FILTER_APP_ID;
+
+  private static final String NATS_SINK_APP_ID = 
"org.apache.streampipes.sinks.brokers.jvm.nats";
+  private static final String NATS_SINK_BELONGS_TO = "sp:" + NATS_SINK_APP_ID;
+
+  private static final String NATS_HOST = "nats";
+  private static final int NATS_PORT = 4222;
+
+  private static final URI XSD_INTEGER = XSD.INTEGER;
+  private static final String XSD_STRING = XSD.STRING.toString();
+  private static final String XSD_FLOAT = XSD.FLOAT.toString();
+  private static final String XSD_BOOLEAN = XSD.BOOLEAN.toString();
+  private static final String XSD_LONG = XSD.LONG.toString();
+
+  private PipelineTemplateHelper() {
+  }
+
+  /**
+   * Builds a Machine Simulator adapter description.
+   */
+  static AdapterDescription buildAdapter(String testPrefix, String topicIn) {
+    AdapterDescription adapter = new AdapterDescription();
+    adapter.setName(testPrefix + "adapter-" + UUID.randomUUID());
+    adapter.setDescription("Java client e2e adapter");
+    adapter.setAppId(MACHINE_SIMULATOR_APP_ID);
+    adapter.setElementId("");
+    adapter.setRev(null);
+    adapter.setRunning(false);
+    adapter.setSelectedEndpointUrl(null);
+    adapter.setVersion(0);
+
+    adapter.setEventGrounding(new EventGrounding(new 
NatsTransportProtocol(NATS_HOST, NATS_PORT, topicIn)));
+    adapter.setConfig(buildAdapterConfig());
+    adapter.setDataStream(buildMachineSimulatorDataStream(topicIn));
+    return adapter;
+  }
+
+  /**
+   * Builds a pipeline with Boolean Filter and NATS Sink.
+   */
+  static Pipeline buildPipeline(String testPrefix, AdapterDescription adapter, 
String topicIn, String topicOut) {
+    SpDataStream stream = new Cloner().stream(adapter.getDataStream());
+    stream.getEventGrounding().setTransportProtocol(new 
NatsTransportProtocol(NATS_HOST, NATS_PORT, topicIn));
+    stream.setDom("s0");
+    if (stream.getConnectedTo() == null) {
+      stream.setConnectedTo(new ArrayList<>());
+    }
+
+    DataProcessorInvocation processor = buildBooleanFilterProcessor(stream, 
topicOut);
+    processor.setDom("p0");
+    processor.setConnectedTo(List.of("s0"));
+    processor.setElementId("sp:processor:" + generateShortUuid());
+    processor.getOutputStream().setDom("p0-out");
+
+    DataSinkInvocation sink = buildNatsSink(processor.getOutputStream(), 
topicOut);
+    sink.setDom("sink0");
+    sink.setConnectedTo(List.of("p0"));
+    sink.setElementId("sp:sink:" + generateShortUuid());
+
+    Pipeline pipeline = new Pipeline();
+    pipeline.setName(testPrefix + "pipeline-" + UUID.randomUUID());
+    pipeline.setDescription("Java client e2e pipeline");
+    pipeline.setPipelineId(UUID.randomUUID().toString().replace("-", ""));
+    pipeline.setRev(null);
+    pipeline.setStreams(List.of(stream));
+    pipeline.setSepas(List.of(processor));
+    pipeline.setActions(List.of(sink));
+    return pipeline;
+  }
+
+  private static List<StaticProperty> buildAdapterConfig() {
+    List<StaticProperty> config = new ArrayList<>();
+    FreeTextStaticProperty waitTime = new 
FreeTextStaticProperty("wait-time-ms", "", "", XSD_INTEGER);
+    waitTime.setValue("200");
+    config.add(waitTime);
+    FreeTextStaticProperty numSensors = new 
FreeTextStaticProperty("numberOfSensors", "", "", XSD_INTEGER);
+    numSensors.setValue("1");
+    config.add(numSensors);
+    OneOfStaticProperty simulatorOption = new OneOfStaticProperty(
+            "selected-simulator-option", "Simulator", "Select simulator type");
+    simulatorOption.addOption(new Option("flowrate", true));
+    simulatorOption.addOption(new Option("pressure", false));
+    simulatorOption.addOption(new Option("waterlevel", false));
+    simulatorOption.addOption(new Option("diagnostics", false));
+    config.add(simulatorOption);
+    return config;
+  }
+
+  private static SpDataStream buildMachineSimulatorDataStream(String topicIn) {
+    SpDataStream stream = new SpDataStream();
+    stream.setEventGrounding(new EventGrounding(new 
NatsTransportProtocol(NATS_HOST, NATS_PORT, topicIn)));
+    stream.setEventSchema(buildFlowSimulatorEventSchema());
+    return stream;
+  }
+
+  private static EventSchema buildFlowSimulatorEventSchema() {
+    EventSchema schema = new EventSchema();
+    schema.addEventProperty(primitive("eventId", XSD_STRING, 
"DIMENSION_PROPERTY"));
+    schema.addEventProperty(primitive("timestamp", XSD_LONG, 
"HEADER_PROPERTY"));
+    schema.addEventProperty(primitive("sensorId", XSD_STRING, 
"DIMENSION_PROPERTY"));
+    schema.addEventProperty(primitive("mass_flow", XSD_FLOAT, 
"MEASUREMENT_PROPERTY"));
+    schema.addEventProperty(primitive("volume_flow", XSD_FLOAT, 
"MEASUREMENT_PROPERTY"));
+    schema.addEventProperty(primitive("temperature", XSD_FLOAT, 
"MEASUREMENT_PROPERTY"));
+    schema.addEventProperty(primitive("density", XSD_FLOAT, 
"MEASUREMENT_PROPERTY"));
+    schema.addEventProperty(primitive("sensor_fault_flags", XSD_BOOLEAN, 
"MEASUREMENT_PROPERTY"));
+    return schema;
+  }
+
+  private static EventPropertyPrimitive primitive(String runtimeName, String 
runtimeType, String scope) {
+    EventPropertyPrimitive p = new EventPropertyPrimitive(runtimeType, 
runtimeName, null, null);
+    p.setPropertyScope(scope);
+    return p;
+  }
+
+  private static DataProcessorInvocation 
buildBooleanFilterProcessor(SpDataStream inputStream, String outputTopic) {
+    DataProcessorInvocation proc = new DataProcessorInvocation();
+    proc.setAppId(BOOLEAN_FILTER_APP_ID);
+    proc.setName("Boolean Filter");
+    proc.setDescription("Retains events with a selected boolean value");
+    proc.setBelongsTo(BOOLEAN_FILTER_BELONGS_TO);
+    proc.setInputStreams(List.of(new Cloner().stream(inputStream)));
+    proc.setStreamRequirements(List.of(new Cloner().stream(inputStream)));
+    proc.setStaticProperties(buildBooleanFilterStaticProperties());
+    proc.setSelectedEndpointUrl(null);
+    proc.setOutputStrategies(List.of(new KeepOutputStrategy()));
+
+    SpDataStream outputStream = new Cloner().stream(inputStream);
+    outputStream.getEventGrounding().setTransportProtocol(new 
NatsTransportProtocol(NATS_HOST, NATS_PORT, outputTopic));
+    proc.setOutputStream(outputStream);
+    return proc;
+  }
+
+  private static List<StaticProperty> buildBooleanFilterStaticProperties() {
+    List<StaticProperty> props = new ArrayList<>();
+    MappingPropertyUnary mapping = new MappingPropertyUnary("boolean-mapping",
+            "Boolean Field", "The field to filter on");
+    mapping.setRequirementSelector("r0::boolean-mapping");
+    mapping.setSelectedProperty("s0::sensor_fault_flags");
+    mapping.setMapsFromOptions(List.of("s0::sensor_fault_flags"));
+    mapping.setPropertyScope("NONE");
+    props.add(mapping);
+
+    OneOfStaticProperty valueProp = new OneOfStaticProperty("value", "Value", 
"Boolean value to pass through");
+    valueProp.addOption(new Option("True", true));
+    valueProp.addOption(new Option("False", false));
+    props.add(valueProp);
+    return props;
+  }
+
+  private static DataSinkInvocation buildNatsSink(SpDataStream inputStream, 
String outputTopic) {
+    DataSinkInvocation sink = new DataSinkInvocation();
+    sink.setName("NATS Sink");
+    sink.setDescription("Publishes events to a NATS subject.");
+    sink.setAppId(NATS_SINK_APP_ID);
+    sink.setBelongsTo(NATS_SINK_BELONGS_TO);
+    sink.setVersion(0);
+    sink.setInputStreams(List.of(inputStream));
+    sink.setStaticProperties(buildNatsSinkStaticProperties(outputTopic));
+    sink.setSelectedEndpointUrl(null);
+    return sink;
+  }
+
+  private static List<StaticProperty> buildNatsSinkStaticProperties(String 
outputTopic) {
+    List<StaticProperty> props = new ArrayList<>();
+    props.add(FreeTextStaticProperty.of("subject", outputTopic));
+    props.add(FreeTextStaticProperty.of("natsUrls", "nats://nats:4222"));
+    props.add(alternativesProperty("access-mode", "anonymous-alternative", 
"username-alternative"));
+    props.add(alternativesProperty("connection-properties", 
"none-properties-alternative",
+            "custom-properties-alternative"));
+    return props;
+  }
+
+  private static StaticPropertyAlternatives alternativesProperty(String 
internalName,
+                                                                 String 
selectedId,
+                                                                 String 
unselectedId) {
+    StaticPropertyAlternatives p = new 
StaticPropertyAlternatives(internalName, internalName, internalName);
+    List<StaticPropertyAlternative> alts = new ArrayList<>();
+    alts.add(alternative(selectedId, true));
+    alts.add(alternative(unselectedId, false));
+    p.setAlternatives(alts);
+    return p;
+  }
+
+  private static StaticPropertyAlternative alternative(String internalName, 
boolean selected) {
+    StaticPropertyAlternative a = new StaticPropertyAlternative(internalName, 
internalName, internalName);
+    a.setSelected(selected);
+    return a;
+  }
+
+  private static String generateShortUuid() {
+    return UUID.randomUUID().toString().replace("-", "").substring(0, 8);
+  }
+}
diff --git 
a/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/TestResourceHelper.java
 
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/TestResourceHelper.java
new file mode 100644
index 0000000000..c2ce7616de
--- /dev/null
+++ 
b/streampipes-client-e2e/java-client-e2e/src/test/java/org/apache/streampipes/client/e2e/utils/TestResourceHelper.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.client.e2e.utils;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.pipeline.Pipeline;
+
+import org.awaitility.core.ConditionTimeoutException;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Supplier;
+
+import static org.awaitility.Awaitility.await;
+
+/**
+ * Helper for waiting on and cleaning up adapters and pipelines in E2E tests.
+ */
+final class TestResourceHelper {
+
+  private static final long POLL_INTERVAL_MS = 500L;
+  private static final long DEFAULT_POLL_INTERVAL_SEC = 1L;
+
+  private TestResourceHelper() {
+  }
+
+  /**
+   * Polls the adapter list until an adapter with the given name appears, or 
the timeout is reached.
+   *
+   * @param client       StreamPipes client
+   * @param adapterName  name of the adapter to find
+   * @param timeout      maximum wait duration
+   * @return the adapter description
+   * @throws IllegalStateException if not found within the timeout
+   */
+  static AdapterDescription waitForAdapter(StreamPipesClient client,
+                                          String adapterName,
+                                          Duration timeout) {
+    return waitFor(
+            () -> client.adapters().all().stream()
+                    .filter(a -> adapterName.equals(a.getName()))
+                    .findFirst()
+                    .orElse(null),
+            timeout,
+            "Adapter not found after creation: " + adapterName
+    );
+  }
+
+  /**
+   * Polls the pipeline list until a pipeline with the given name appears, or 
the timeout is reached.
+   *
+   * @param client        StreamPipes client
+   * @param pipelineName  name of the pipeline to find
+   * @param timeout       maximum wait duration
+   * @return the pipeline
+   * @throws IllegalStateException if not found within the timeout
+   */
+  static Pipeline waitForPipeline(StreamPipesClient client,
+                                  String pipelineName,
+                                  Duration timeout) {
+    return waitFor(
+            () -> client.pipelines().all().stream()
+                    .filter(p -> pipelineName.equals(p.getName()))
+                    .findFirst()
+                    .orElse(null),
+            timeout,
+            "Pipeline not found after creation: " + pipelineName
+    );
+  }
+
+  /**
+   * Waits until all adapters and pipelines with the given prefix have 
non-blank endpoint URLs.
+   *
+   * @param client                 StreamPipes client
+   * @param testPrefix             name prefix to filter resources
+   * @param expectedAdapterCount   minimum number of adapters required
+   * @param expectedPipelineCount  minimum number of pipelines required
+   * @param timeout                maximum wait duration
+   */
+  static void waitUntilEndpointsReady(StreamPipesClient client,
+                                      String testPrefix,
+                                      int expectedAdapterCount,
+                                      int expectedPipelineCount,
+                                      Duration timeout) {
+    try {
+      await()
+              .pollInterval(Duration.ofSeconds(DEFAULT_POLL_INTERVAL_SEC))
+              .atMost(timeout)
+              .until(() -> {
+                List<AdapterDescription> adapters = 
client.adapters().all().stream()
+                        .filter(a -> a.getName() != null && 
a.getName().startsWith(testPrefix))
+                        .toList();
+                List<Pipeline> pipelines = client.pipelines().all().stream()
+                        .filter(p -> p.getName() != null && 
p.getName().startsWith(testPrefix))
+                        .toList();
+
+                boolean adaptersReady = adapters.size() >= expectedAdapterCount
+                        && adapters.stream().allMatch(a -> 
a.getSelectedEndpointUrl() != null
+                        && !a.getSelectedEndpointUrl().isBlank());
+
+                boolean pipelinesReady = pipelines.size() >= 
expectedPipelineCount
+                        && pipelines.stream().allMatch(p -> {
+                  String endpoint = 
ClientTestSupport.extractProcessorEndpoint(p);
+                  return endpoint != null && !endpoint.isBlank();
+                });
+
+                return adaptersReady && pipelinesReady;
+              });
+    } catch (ConditionTimeoutException e) {
+      Assertions.fail("Endpoint assignment did not finish in " + 
timeout.toSeconds() + " seconds");
+    }
+  }
+
+  /**
+   * Stops and deletes all pipelines and adapters whose names start with 
{@code testPrefix}.
+   *
+   * @param client     StreamPipes client
+   * @param testPrefix name prefix to filter resources
+   */
+  static void cleanup(StreamPipesClient client, String testPrefix) {
+    List<String> errors = new ArrayList<>();
+
+    // Stop and delete pipelines first
+    try {
+      List<Pipeline> pipelines = client.pipelines().all().stream()
+              .filter(p -> p.getName() != null && 
p.getName().startsWith(testPrefix))
+              .sorted(Comparator.comparing(Pipeline::getName))
+              .toList();
+      for (Pipeline pipeline : pipelines) {
+        capture(errors, "stop pipeline " + pipeline.getPipelineId(),
+                () -> client.pipelines().stop(pipeline.getPipelineId()));
+        capture(errors, "delete pipeline " + pipeline.getPipelineId(),
+                () -> client.pipelines().delete(pipeline.getPipelineId()));
+      }
+    } catch (Exception e) {
+      errors.add("scan pipelines failed: " + e.getMessage());
+    }
+
+    // Stop and delete adapters
+    try {
+      List<AdapterDescription> adapters = client.adapters().all().stream()
+              .filter(a -> a.getName() != null && 
a.getName().startsWith(testPrefix))
+              .sorted(Comparator.comparing(AdapterDescription::getName))
+              .toList();
+      for (AdapterDescription adapter : adapters) {
+        capture(errors, "stop adapter " + adapter.getElementId(),
+                () -> client.adapters().stop(adapter.getElementId()));
+        capture(errors, "delete adapter " + adapter.getElementId(),
+                () -> client.adapters().delete(adapter.getElementId()));
+      }
+    } catch (Exception e) {
+      errors.add("scan adapters failed: " + e.getMessage());
+    }
+
+    if (!errors.isEmpty()) {
+      Assertions.fail("Cleanup errors:\n" + String.join("\n", errors));
+    }
+  }
+
+  private static <T> T waitFor(Supplier<T> poll, Duration timeout, String 
errorMessage) {
+    try {
+      await()
+              .pollInterval(Duration.ofMillis(POLL_INTERVAL_MS))
+              .atMost(timeout)
+              .until(() -> poll.get() != null);
+    } catch (ConditionTimeoutException e) {
+      throw new IllegalStateException(errorMessage, e);
+    }
+    T value = poll.get();
+    if (value == null) {
+      throw new IllegalStateException(errorMessage);
+    }
+    return value;
+  }
+
+  private static void capture(List<String> errors, String operation, 
ThrowingRunnable action) {
+    try {
+      if (operation.startsWith("delete pipeline ")) {
+        runWithSuppressedStderr(action);
+      } else {
+        action.run();
+      }
+    } catch (Exception e) {
+      // Handle known deserialization issue in client response during deletion
+      if (operation.startsWith("delete ")
+              && e.getMessage() != null
+              && e.getMessage().contains("Cannot construct instance of 
`org.apache.streampipes.model.message.Message`")) {
+        return;
+      }
+      errors.add(operation + " failed: " + e.getMessage());
+    }
+  }
+
+  private static void runWithSuppressedStderr(ThrowingRunnable action) throws 
Exception {
+    PrintStream originalErr = System.err;
+    try (PrintStream suppressedErr = new 
PrintStream(OutputStream.nullOutputStream())) {
+      System.setErr(suppressedErr);
+      action.run();
+    } finally {
+      System.setErr(originalErr);
+    }
+  }
+
+  @FunctionalInterface
+  private interface ThrowingRunnable {
+    void run() throws Exception;
+  }
+}
diff --git a/streampipes-client-e2e/tool/install-element.sh 
b/streampipes-client-e2e/tool/install-element.sh
old mode 100644
new mode 100755
index 2b3c340f7a..59956a1ca1
--- a/streampipes-client-e2e/tool/install-element.sh
+++ b/streampipes-client-e2e/tool/install-element.sh
@@ -52,10 +52,9 @@ while true; do
     esac
 done
 
-
 ######################Adapters######################
 
-#machine
+# machine
 installRequestBody='{
   "appId":"org.apache.streampipes.connect.iiot.adapters.simulator.machine",
   "publicElement":true,
@@ -72,11 +71,9 @@ if [ $? -ne 0 ]; then
     exit 1
 fi
 
+######################processors######################
 
-
-######################processor######################
-
-#bool_inverter
+# bool_inverter
 installRequestBody='{
   
"appId":"org.apache.streampipes.processors.transformation.jvm.booloperator.inverter",
   "publicElement":true,
@@ -93,9 +90,26 @@ if [ $? -ne 0 ]; then
     exit 1
 fi
 
+# booleanfilter
+installRequestBody='{
+  
"appId":"org.apache.streampipes.processors.filters.jvm.processor.booleanfilter",
+  "publicElement":true,
+  "serviceTagPrefix":"DATA_PROCESSOR"
+  }'
+
+response=$(curl -s -X POST "http://$HOST:$PORT$INSTALL_ELEMENT_URL"; \
+   -H "Content-Type: application/json" \
+   -H "authorization: Bearer $TOKEN" \
+   -d "$installRequestBody")
+if [ $? -ne 0 ]; then
+    echo "$response"
+    echo "Error install boolean filter processor"
+    exit 1
+fi
 
-######################sink######################
-#datalake
+######################sinks######################
+
+# datalake
 installRequestBody='{
   "appId":"org.apache.streampipes.sinks.internal.jvm.datalake",
   "publicElement":true,
@@ -110,4 +124,21 @@ if [ $? -ne 0 ]; then
     echo "$response"
     echo "Error install datalake"
     exit 1
+fi
+
+# nats sink
+installRequestBody='{
+  "appId":"org.apache.streampipes.sinks.brokers.jvm.nats",
+  "publicElement":true,
+  "serviceTagPrefix":"DATA_SINK"
+  }'
+
+response=$(curl -s -X POST "http://$HOST:$PORT$INSTALL_ELEMENT_URL"; \
+   -H "Content-Type: application/json" \
+   -H "authorization: Bearer $TOKEN" \
+   -d "$installRequestBody")
+if [ $? -ne 0 ]; then
+    echo "$response"
+    echo "Error install nats sink"
+    exit 1
 fi
\ No newline at end of file
diff --git a/streampipes-client-e2e/tool/java-client-e2e.sh 
b/streampipes-client-e2e/tool/java-client-e2e.sh
new file mode 100755
index 0000000000..cbdb9beb0c
--- /dev/null
+++ b/streampipes-client-e2e/tool/java-client-e2e.sh
@@ -0,0 +1,79 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+E2E_ROOT="$(cd "$(dirname "$0")/.." && pwd)"
+
+HOST=""
+PORT=""
+APIKEY=""
+API_KEY_USER_NAME=""
+SCENARIO="${E2E_SCENARIO:-single}"
+TEST_CLASS=""
+
+while true; do
+    case "$1" in
+        -h)
+            HOST="$2"
+            shift 2
+        ;;
+        -p)
+            PORT="$2"
+            shift 2
+        ;;
+        -u)
+            API_KEY_USER_NAME="$2"
+            shift 2
+        ;;
+        -k)
+            APIKEY="$2"
+            shift 2
+        ;;
+        -s)
+            SCENARIO="$2"
+            shift 2
+        ;;
+        -c)
+            TEST_CLASS="$2"
+            shift 2
+        ;;
+        "")
+            break
+        ;;
+        *)
+            shift
+        ;;
+    esac
+done
+
+if [ -z "$TEST_CLASS" ]; then
+    case "$SCENARIO" in
+        single) TEST_CLASS="JavaClientTest" ;;
+        lb) TEST_CLASS="LoadBalanceTest" ;;
+        *)
+            echo "Error: unknown scenario '$SCENARIO', expected single|lb"
+            exit 1
+            ;;
+    esac
+fi
+
+cd "$E2E_ROOT/java-client-e2e" || exit
+mvn test -Dtest="$TEST_CLASS" -Dtest.host="$HOST" -Dtest.port="$PORT" 
-Dtest.apikey="$APIKEY" -Dtest.username="$API_KEY_USER_NAME"
+if [ $? -ne 0 ]; then
+    echo "Error: java test failed"
+    exit 1
+fi
+echo "All tests passed successfully"
diff --git a/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh 
b/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh
old mode 100644
new mode 100755
index cbab40355f..16523d14de
--- a/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh
+++ b/streampipes-client-e2e/tool/start-streampipes-client-e2e.sh
@@ -20,6 +20,7 @@ E2E_TEST=""
 HOST="127.0.0.1"
 PORT="8030"
 LOGIN_URL="/streampipes-backend/api/v2/auth/login"
+SCENARIO="${E2E_SCENARIO:-single}"
 
 SP_USERNAME="[email protected]"
 SP_PASSWORD="admin"
@@ -46,8 +47,12 @@ while true; do
             E2E_TEST="$2"
             shift 2
         ;;
+        -s)
+            SCENARIO="$2"
+            shift 2
+        ;;
         --help)
-            echo "Usage: $0 [-h <ip>] [-p <port>] [-u <username>] [-pw 
<password>] [-t <E2E_TEST>]"
+            echo "Usage: $0 [-h <ip>] [-p <port>] [-u <username>] [-pw 
<password>] [-t <E2E_TEST>] [-s <single|lb>]"
             exit 0
         ;;
         "")
@@ -61,7 +66,7 @@ while true; do
     esac
 done
 
-if [ E2E_TEST == "" ]; then
+if [ -z "$E2E_TEST" ]; then
     echo "-t is empty"
     exit 1
 fi
@@ -139,7 +144,7 @@ fi
 
 echo "start e2e test"
 chmod +x ./"$E2E_TEST"
-./"$E2E_TEST" -h "$HOST" -p "$PORT" -u "$API_KEY_USER_NAME" -k "$APIKEY"
+./"$E2E_TEST" -h "$HOST" -p "$PORT" -u "$API_KEY_USER_NAME" -k "$APIKEY" -s 
"$SCENARIO"
 if [ $? -ne 0 ]; then
     echo "start $E2E_TEST failed"
     exit 1

Reply via email to