This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch rel/0.98.0
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/rel/0.98.0 by this push:
     new b421732ef1 feat: Bump PLC4X version, improve connection cache 
management (#3744)
b421732ef1 is described below

commit b421732ef1d0010e17b5df38b50f02ec448f74d1
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri Nov 7 08:14:39 2025 +0100

    feat: Bump PLC4X version, improve connection cache management (#3744)
    
    Co-authored-by: Philipp Zehnder <[email protected]>
---
 pom.xml                                            |   4 +-
 .../management/connect/PullAdapterScheduler.java   |   5 +-
 .../streampipes-connectors-plc/pom.xml             |   6 +-
 .../connectors/plc/PlcConnectorsModuleExport.java  |   4 +-
 .../connection/ContinuousPlcRequestReader.java     |  32 +-
 .../generic/connection/PlcEventGenerator.java      |   7 +-
 .../generic/model/Plc4xConnectionExtractor.java    |  11 +-
 .../connectors/plc/adapter/s7/Plc4xS7Adapter.java  |   2 +-
 .../plc/cache/SpCachedPlcConnectionManager.java    | 178 +++++++
 .../plc/cache/SpConnectionContainer.java           | 235 +++++++++
 .../plc/cache/SpLeasedPlcConnection.java           | 575 +++++++++++++++++++++
 .../plc/adapter/ConnectionContainerReproTest.java  | 167 ++++++
 12 files changed, 1197 insertions(+), 29 deletions(-)

diff --git a/pom.xml b/pom.xml
index 58b4663faa..95e529b458 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,7 +111,7 @@
         <okhttp.version>3.13.1</okhttp.version>
         <opencsv.version>5.9</opencsv.version>
         <opennlp.version>2.3.1</opennlp.version>
-        <plc4x.version>0.12.0</plc4x.version>
+        <plc4x.version>0.13.1</plc4x.version>
         
<plexus-component-annotations.version>2.2.0</plexus-component-annotations.version>
         
<plexus-interactivity-api.version>1.1</plexus-interactivity-api.version>
         <plexus-utils.version>4.0.0</plexus-utils.version>
@@ -1185,8 +1185,6 @@
     </profiles>
 
     <!-- Build Settings -->
-
-
     <build>
 
         <!-- Plugin Management -->
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
index 0daec7c47b..1c501fe288 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
@@ -26,6 +26,7 @@ import org.apache.streampipes.model.monitoring.SpLogMessage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -47,8 +48,8 @@ public class PullAdapterScheduler {
     final Runnable task = () -> {
       try {
         pullAdapter.pullData();
-      } catch (ExecutionException | InterruptedException | TimeoutException e) 
{
-        LOG.error("Error while pulling data", e);
+      } catch (ExecutionException | InterruptedException | TimeoutException | 
CompletionException e) {
+        LOG.error("Error while pulling data: {}", e.getMessage());
         SpMonitoringManager.INSTANCE.addErrorMessage(
             adapterElementId,
             SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(e))
diff --git a/streampipes-extensions/streampipes-connectors-plc/pom.xml 
b/streampipes-extensions/streampipes-connectors-plc/pom.xml
index 4cdc9dadeb..9841f8065f 100644
--- a/streampipes-extensions/streampipes-connectors-plc/pom.xml
+++ b/streampipes-extensions/streampipes-connectors-plc/pom.xml
@@ -52,6 +52,11 @@
             <groupId>org.apache.plc4x</groupId>
             <artifactId>plc4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.plc4x</groupId>
+            <artifactId>plc4j-spi</artifactId>
+            <version>${plc4x.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.plc4x</groupId>
             <artifactId>plc4j-driver-all</artifactId>
@@ -78,7 +83,6 @@
             <version>${plc4x.version}</version>
         </dependency>
 
-
         <!-- Test dependencies -->
         <dependency>
             <groupId>org.mockito</groupId>
diff --git 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
index 59db43c181..6f53414919 100644
--- 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
+++ 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
@@ -28,9 +28,9 @@ import 
org.apache.streampipes.extensions.connectors.plc.adapter.migration.Plc4xM
 import 
org.apache.streampipes.extensions.connectors.plc.adapter.migration.Plc4xS7AdapterMigrationV1;
 import 
org.apache.streampipes.extensions.connectors.plc.adapter.modbus.Plc4xModbusAdapter;
 import 
org.apache.streampipes.extensions.connectors.plc.adapter.s7.Plc4xS7Adapter;
+import 
org.apache.streampipes.extensions.connectors.plc.cache.SpCachedPlcConnectionManager;
 
 import org.apache.plc4x.java.api.PlcDriverManager;
-import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
 
 import java.time.Duration;
 import java.util.ArrayList;
@@ -43,7 +43,7 @@ public class PlcConnectorsModuleExport implements 
IExtensionModuleExport {
   public List<StreamPipesAdapter> adapters() {
     var env = Environments.getEnvironment();
     var driverManager = PlcDriverManager.getDefault();
-    var cachedConnectionManager =  CachedPlcConnectionManager
+    var cachedConnectionManager =  SpCachedPlcConnectionManager
         .getBuilder(driverManager.getConnectionManager())
         
.withMaxWaitTime(Duration.ofMillis(env.getPlc4xMaxWaitTimeMs().getValueOrDefault()))
         
.withMaxLeaseTime(Duration.ofMillis(env.getPlc4xMaxLeaseTimeMs().getValueOrDefault()))
diff --git 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
index 639f4e0534..7306ead5a5 100644
--- 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
+++ 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
@@ -69,22 +69,20 @@ public class ContinuousPlcRequestReader
 
   private void connectAndReadPlcData() {
     try (PlcConnection plcConnection = 
connectionManager.getConnection(settings.connectionString())) {
-      var readRequest = requestProvider.makeReadRequest(plcConnection, 
settings.nodes());
-      var readResponse = readRequest.execute()
-                                    .get(5000, TimeUnit.MILLISECONDS);
-      processPlcReadResponse(readResponse);
+      if (plcConnection.isConnected()) {
+        var readRequest = requestProvider.makeReadRequest(plcConnection, 
settings.nodes());
+        var readResponse = readRequest.execute()
+            .get(5000, TimeUnit.MILLISECONDS);
+        processPlcReadResponse(readResponse);
+      } else {
+        handleFailingPlcRead("Not connected");
+      }
     } catch (Exception e) {
-      handleFailingPlcRead(e);
+      handleFailingPlcRead(e.getMessage());
     }
   }
 
-  private void processPlcReadResponse(PlcReadResponse readResponse) {
-    var event = eventGenerator.makeEvent(readResponse);
-    collector.collect(event);
-    this.resetIdlePulls();
-  }
-
-  private void handleFailingPlcRead(Exception e) {
+  private void handleFailingPlcRead(String problem) {
     // ensure that the cached connection manager removes the broken connection
     if (connectionManager instanceof CachedPlcConnectionManager) {
       ((CachedPlcConnectionManager) 
connectionManager).removeCachedConnection(settings.connectionString());
@@ -98,13 +96,19 @@ public class ContinuousPlcRequestReader
     }
 
     LOG.error(
-        "Error while reading from PLC with connection string {}. Setting 
adapter to idle for {} attemtps. {} ",
-        settings.connectionString(), idlePullsBeforeNextAttempt, e.getMessage()
+        "Error while reading from PLC with connection string {}. Setting 
adapter to idle for {} attempts. {} ",
+        settings.connectionString(), idlePullsBeforeNextAttempt, problem
     );
 
     currentIdlePulls = 0;
   }
 
+  private void processPlcReadResponse(PlcReadResponse readResponse) {
+    var event = eventGenerator.makeEvent(readResponse);
+    collector.collect(event);
+    this.resetIdlePulls();
+  }
+
   private void idleRead() {
     LOG.debug("Skipping pullData call for {}. Idle pulls left: {}",
               settings.connectionString(), idlePullsBeforeNextAttempt - 
currentIdlePulls);
diff --git 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/PlcEventGenerator.java
 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/PlcEventGenerator.java
index d0dabef4f3..5f0d3099e2 100644
--- 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/PlcEventGenerator.java
+++ 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/PlcEventGenerator.java
@@ -25,7 +25,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 public class PlcEventGenerator {
@@ -43,9 +42,8 @@ public class PlcEventGenerator {
 
     for (String key : nodes.keySet()) {
       if (response.getResponseCode(key) == PlcResponseCode.OK) {
-
         // if the response is a list, add each element to the result
-        if (response.getObject(key) instanceof List) {
+        if (response.getAsPlcValue().getValue(key).isList()) {
           event.put(key,
               response.getAsPlcValue()
                   .getValue(key)
@@ -57,8 +55,7 @@ public class PlcEventGenerator {
           event.put(key, response.getObject(key));
         }
       } else {
-        LOG.error("Error[" + key + "]: "
-            + response.getResponseCode(key).name());
+        LOG.error("Error[{}]: {}", key, response.getResponseCode(key).name());
       }
     }
     return event;
diff --git 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/model/Plc4xConnectionExtractor.java
 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/model/Plc4xConnectionExtractor.java
index 2ed2a13a2b..ce928b1eeb 100644
--- 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/model/Plc4xConnectionExtractor.java
+++ 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/model/Plc4xConnectionExtractor.java
@@ -44,6 +44,7 @@ public class Plc4xConnectionExtractor {
   }
 
   public Plc4xConnectionSettings makeSettings() {
+    var modifiedProtocolCode = checkAndOverrideProtocolCode();
     var host = extractHost();
     var transportCode = extractTransportCode();
     var transportConfigs = extractTransportMetadata(transportCode);
@@ -51,12 +52,20 @@ public class Plc4xConnectionExtractor {
     var configParameters = makeConfigParameters(transportConfigs, 
protocolConfigs);
 
     return new Plc4xConnectionSettings(
-        getConnectionString(host, transportCode, protocolCode, 
configParameters),
+        getConnectionString(host, transportCode, modifiedProtocolCode, 
configParameters),
         extractor.singleValueParameter(Plc4xLabels.PLC_POLLING_INTERVAL, 
Integer.class),
         extractNodes()
     );
   }
 
+  private String checkAndOverrideProtocolCode() {
+    if (protocolCode.equals("s7")) {
+      return "s7-light";
+    } else {
+      return protocolCode;
+    }
+  }
+
   private String getConnectionString(String host,
                                      String transportCode,
                                      String protocolCode,
diff --git 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java
 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java
index 1c7657a79a..9355b9f79f 100644
--- 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java
+++ 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java
@@ -65,7 +65,7 @@ public class Plc4xS7Adapter implements StreamPipesAdapter {
    */
   public static final String ID = 
"org.apache.streampipes.connect.iiot.adapters.plc4x.s7";
 
-  private static final String S7_URL = "s7://";
+  private static final String S7_URL = "s7-light://";
 
   /**
    * Keys of user configuration parameters
diff --git 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpCachedPlcConnectionManager.java
 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpCachedPlcConnectionManager.java
new file mode 100644
index 0000000000..f21f008899
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpCachedPlcConnectionManager.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.plc.cache;
+
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.plc4x.java.api.authentication.PlcAuthentication;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import 
org.apache.plc4x.java.utils.cache.exceptions.PlcConnectionManagerClosedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpCachedPlcConnectionManager implements PlcConnectionManager, 
AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SpCachedPlcConnectionManager.class);
+
+  private final PlcConnectionManager connectionManager;
+  private final Duration maxLeaseTime;
+  private final Duration maxWaitTime;
+  private final Duration maxIdleTime;
+
+  private final Map<String, SpConnectionContainer> connectionContainers;
+
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  public static SpCachedPlcConnectionManager.Builder getBuilder() {
+    return new SpCachedPlcConnectionManager.Builder(new 
DefaultPlcDriverManager());
+  }
+
+  public static SpCachedPlcConnectionManager.Builder 
getBuilder(PlcConnectionManager connectionManager) {
+    return new SpCachedPlcConnectionManager.Builder(connectionManager);
+  }
+
+  public SpCachedPlcConnectionManager(PlcConnectionManager connectionManager, 
Duration maxLeaseTime, Duration maxWaitTime, Duration maxIdleTime) {
+    this.connectionManager = connectionManager;
+    this.maxLeaseTime = maxLeaseTime;
+    this.maxWaitTime = maxWaitTime;
+    this.maxIdleTime = maxIdleTime;
+    this.connectionContainers = new HashMap<>();
+  }
+
+  /**
+   * @return set of connection-urls the CachedPlcConnectionManager is 
currently managing.
+   */
+  public Set<String> getCachedConnections() {
+    synchronized (connectionContainers) {
+      return connectionContainers.keySet();
+    }
+  }
+
+  /**
+   * Removes a given connection from the cache (Should only be used in order 
to remove somehow broken connections).
+   *
+   * @param url url of the connection that should be removed.
+   */
+  public void removeCachedConnection(String url) {
+    synchronized (connectionContainers) {
+      // Make sure the connection is closed before removing it.
+      if (connectionContainers.containsKey(url)) {
+        connectionContainers.get(url).close();
+      }
+      connectionContainers.remove(url);
+    }
+  }
+
+  public PlcConnection getConnection(String url) throws PlcConnectionException 
{
+    // If the connection manager is already closed, abort.
+    if (closed.get()) {
+      throw new PlcConnectionManagerClosedException();
+    }
+
+    // Get a connection container for the given url.
+    SpConnectionContainer connectionContainer;
+    synchronized (connectionContainers) {
+      connectionContainer = connectionContainers.get(url);
+      if (connectionContainer == null) {
+        LOG.debug("Creating new connection");
+
+        // Crate a connection container to manage handling this connection
+        connectionContainer = new SpConnectionContainer(connectionManager, 
url, maxLeaseTime, maxIdleTime,
+            closeConnection -> {
+              removeCachedConnection(closeConnection);
+              return null;
+            });
+        connectionContainers.put(url, connectionContainer);
+      } else {
+        LOG.debug("Reusing exising connection");
+      }
+    }
+
+    // Get a lease (a future for a connection)
+    Future<PlcConnection> leaseFuture = connectionContainer.lease();
+    try {
+      return leaseFuture.get(this.maxWaitTime.toMillis(), 
TimeUnit.MILLISECONDS);
+    } catch (ExecutionException | InterruptedException | TimeoutException e) {
+      throw new PlcConnectionException("Error acquiring lease for connection");
+    }
+  }
+
+  public PlcConnection getConnection(String url, PlcAuthentication 
authentication) throws PlcConnectionException {
+    throw new PlcConnectionException("the cached driver manager currently 
doesn't support authentication");
+  }
+
+  @Override
+  public void close() throws Exception {
+    // Set the cache to "closed" so no new connections can be requested.
+    closed.set(true);
+
+    // Tell all connections to close themselves.
+    connectionContainers.forEach((connectionString, connectionContainer) -> {
+      connectionContainer.close();
+    });
+  }
+
+  public static class Builder {
+
+    private final PlcConnectionManager connectionManager;
+    private Duration maxLeaseTime;
+    private Duration maxWaitTime;
+    private Duration maxIdleTime;
+
+    public Builder(PlcConnectionManager connectionManager) {
+      this.connectionManager = connectionManager;
+      this.maxLeaseTime = Duration.ofSeconds(4);
+      this.maxWaitTime = Duration.ofSeconds(20);
+      this.maxIdleTime = Duration.ofMinutes(5);
+    }
+
+    public SpCachedPlcConnectionManager build() {
+      return new SpCachedPlcConnectionManager(
+          this.connectionManager, this.maxLeaseTime, this.maxWaitTime, 
this.maxIdleTime);
+    }
+
+    public SpCachedPlcConnectionManager.Builder withMaxLeaseTime(Duration 
maxLeaseTime) {
+      this.maxLeaseTime = maxLeaseTime;
+      return this;
+    }
+
+    public SpCachedPlcConnectionManager.Builder withMaxWaitTime(Duration 
maxWaitTime) {
+      this.maxWaitTime = maxWaitTime;
+      return this;
+    }
+
+    public SpCachedPlcConnectionManager.Builder withMaxIdleTime(Duration 
maxIdleTime) {
+      this.maxIdleTime = maxIdleTime;
+      return this;
+    }
+  }
+
+}
+
diff --git 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpConnectionContainer.java
 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpConnectionContainer.java
new file mode 100644
index 0000000000..6d9e324889
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpConnectionContainer.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.plc.cache;
+
+import io.netty.channel.ConnectTimeoutException;
+import org.apache.plc4x.java.api.EventPlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.listener.EventListener;
+import 
org.apache.plc4x.java.utils.cache.exceptions.PlcConnectionManagerClosedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+
+public class SpConnectionContainer {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SpConnectionContainer.class);
+  private final PlcConnectionManager connectionManager;
+  private final String connectionUrl;
+  private final Duration maxLeaseTime;
+  private final Duration maxIdleTime;
+  private final Function<String, Void> closeConnectionHandler;
+  private final Queue<CompletableFuture<PlcConnection>> queue;
+
+  private PlcConnection connection;
+  private SpLeasedPlcConnection leasedConnection;
+  private Timer idleTimer;
+
+  public SpConnectionContainer(PlcConnectionManager connectionManager, String 
connectionUrl,
+                               Duration maxLeaseTime, Duration maxIdleTime,
+                               Function<String, Void> closeConnectionHandler) {
+    this.connectionManager = connectionManager;
+    this.connectionUrl = connectionUrl;
+    this.maxLeaseTime = maxLeaseTime;
+    this.maxIdleTime = maxIdleTime;
+    this.closeConnectionHandler = closeConnectionHandler;
+    this.queue = new LinkedList<>();
+    this.connection = null;
+    this.leasedConnection = null;
+  }
+
+  public synchronized void close() {
+    // Close all waiting clients exceptionally.
+    queue.forEach(plcConnectionCompletableFuture ->
+        plcConnectionCompletableFuture.completeExceptionally(new 
PlcConnectionManagerClosedException()));
+
+    // Clear the queue.
+    queue.clear();
+
+    // Stop the idle timer.
+    if (idleTimer != null) {
+      idleTimer.cancel();
+      idleTimer.purge();
+      idleTimer = null;
+    }
+
+    // If the connection is currently used, close it.
+    if (leasedConnection != null) {
+      try {
+        leasedConnection.closeConnection();
+        leasedConnection = null;
+      } catch (Exception e) {
+        // Ignore this ...
+      }
+    } else {
+      try {
+        connection.close();
+        connection = null;
+      } catch (Exception e) {
+        // Ignore this ...
+      }
+    }
+  }
+
+  public synchronized Future<PlcConnection> lease() {
+    CompletableFuture<PlcConnection> connectionFuture = new 
CompletableFuture<>();
+
+    // Try to get a new connection, if we haven't got one yet.
+    if (connection == null) {
+      try {
+        connection = connectionManager.getConnection(connectionUrl);
+      } catch (PlcConnectionException e) {
+        Throwable root = rootCause(e);
+        if (root instanceof ConnectTimeoutException) {
+          // one-liner — no stacktrace at WARN
+          LOGGER.warn("PLC connect timeout to {}: {}", connectionUrl, 
root.getMessage());
+          // full detail only when debugging
+          LOGGER.debug("Connect timeout details", e);
+        } else {
+          // concise by default, full at DEBUG
+          LOGGER.warn("Failed to connect to {}: {}", connectionUrl,
+              (root != null ? root.toString() : e.toString()));
+          LOGGER.debug("Connection failure details", e);
+        }
+        connectionFuture.completeExceptionally(e);
+        return connectionFuture;
+      }
+    }
+
+    // If the connection is currently idle, return the connection immediately.
+    if (leasedConnection == null) {
+      leasedConnection = new SpLeasedPlcConnection(this, connection, 
maxLeaseTime);
+      connectionFuture.complete(leasedConnection);
+    } else {
+      queue.add(connectionFuture);
+    }
+
+    // Stop the idle timer.
+    if (idleTimer != null) {
+      idleTimer.cancel();
+      idleTimer.purge();
+      idleTimer = null;
+    }
+
+    return connectionFuture;
+  }
+
+  public synchronized void returnConnection(SpLeasedPlcConnection 
returnedLeasedConnection, boolean invalidateConnection) {
+    if (returnedLeasedConnection != leasedConnection) {
+      LOGGER.error("Error trying to return lease from invalid connection: 
returned={} leased={}",
+          returnedLeasedConnection, leasedConnection);
+      throw new PlcRuntimeException("Error trying to return lease from invalid 
connection");
+    }
+
+    // If something happened while using the connection, invalidate this one 
and create a new connection.
+    if (invalidateConnection) {
+      // Close the old connection.
+      try {
+        connection.close();
+      } catch (Exception e) {
+        // We're ignoring this as we have no idea, what state the connection 
is in.
+        // Nevertheless, it is polite to say something in logs about this 
situation.
+        LOGGER.warn("Exception while closing connection", e);
+      }
+
+      // Try to get a new connection.
+      try {
+        connection = connectionManager.getConnection(connectionUrl);
+      } catch (PlcConnectionException e) {
+        // If something goes wrong, close all waiting futures exceptionally.
+        LOGGER.warn("Can't get connection for {} complete queue items 
exceptionally", connectionUrl, e);
+        queue.forEach(future -> future.completeExceptionally(e));
+        queue.clear();
+        leasedConnection = null;
+        connection = null;
+      }
+    }
+
+    // If the queue is empty, simply return.
+    if (queue.isEmpty()) {
+      leasedConnection = null;
+
+      // Start a timer to invalidate this connection if it's idle for too long.
+      idleTimer = new Timer("CC-Idle-Timer-" + Thread.currentThread().getId());
+      idleTimer.schedule(new TimerTask() {
+        @Override
+        public void run() {
+          if (connection != null) {
+            try {
+              connection.close();
+            } catch (Exception e) {
+              // Ignore ...
+            }
+          }
+          closeConnectionHandler.apply(connectionUrl);
+        }
+      }, maxIdleTime.toMillis());
+      return;
+    }
+
+    if (connection == null) {
+      // Defensive: should not happen, but avoid creating a zombie.
+      queue.forEach(f -> f.completeExceptionally(new PlcRuntimeException("No 
connection available")));
+      queue.clear();
+      leasedConnection = null;
+      return;
+    }
+
+    // Create a new lease and complete the next future in the queue with this.
+    leasedConnection = new SpLeasedPlcConnection(this, connection, 
maxLeaseTime);
+    CompletableFuture<PlcConnection> leaseFuture = queue.poll();
+    if (leaseFuture != null) {
+      leaseFuture.complete(leasedConnection);
+    }
+  }
+
+
+  public void addEventListener(EventListener listener) {
+    if ((connection != null) && (connection instanceof EventPlcConnection)) {
+      ((EventPlcConnection) connection).addEventListener(listener);
+    }
+  }
+
+  public void removeEventListener(EventListener listener) {
+    if ((connection != null) && (connection instanceof EventPlcConnection)) {
+      ((EventPlcConnection) connection).removeEventListener(listener);
+    }
+  }
+
+  private static Throwable rootCause(Throwable t) {
+    Throwable c = t;
+    while (c.getCause() != null && c.getCause() != c) {
+      c = c.getCause();
+    }
+    return c;
+  }
+
+
+}
+
diff --git 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpLeasedPlcConnection.java
 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpLeasedPlcConnection.java
new file mode 100644
index 0000000000..c008518a2b
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpLeasedPlcConnection.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.plc.cache;
+
+import org.apache.plc4x.java.api.EventPlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.listener.EventListener;
+import org.apache.plc4x.java.api.messages.PlcBrowseRequest;
+import org.apache.plc4x.java.api.messages.PlcBrowseRequestInterceptor;
+import org.apache.plc4x.java.api.messages.PlcBrowseResponse;
+import org.apache.plc4x.java.api.messages.PlcPingResponse;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.metadata.PlcConnectionMetadata;
+import org.apache.plc4x.java.api.model.PlcQuery;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.api.model.PlcSubscriptionTag;
+import org.apache.plc4x.java.api.model.PlcTag;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.api.value.PlcValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+public class SpLeasedPlcConnection implements EventPlcConnection {
+
+  private static final Logger log = 
LoggerFactory.getLogger(SpLeasedPlcConnection.class);
+  private final SpConnectionContainer connectionContainer;
+  private final AtomicReference<PlcConnection> connection;
+  private boolean invalidateConnection;
+  private final Timer usageTimer;
+  private final Duration maxUseDuration;
+
+  SpLeasedPlcConnection(SpConnectionContainer connectionContainer, 
PlcConnection connection, Duration maxUseTime) {
+    this.connectionContainer = connectionContainer;
+    this.connection = new AtomicReference<>(connection);
+    this.invalidateConnection = false;
+    this.usageTimer = new Timer("CC-Usage-Timer-" + 
Thread.currentThread().getId());
+    this.maxUseDuration = maxUseTime;
+    this.usageTimer.schedule(new TimerTask() {
+      @Override
+      public void run() {
+        close();
+      }
+    }, 
Date.from(LocalDateTime.now().plusNanos(maxUseTime.toNanos()).atZone(ZoneId.systemDefault()).toInstant()));
+  }
+
+  public synchronized void closeConnection() throws Exception {
+    // Get the real connection and close it.
+    PlcConnection plcConnection = connection.get();
+    if (plcConnection != null) {
+      plcConnection.close();
+    }
+
+    // Close the LeasedPlcConnection.
+    close();
+  }
+
+  @Override
+  public synchronized void close() {
+    // In this case the connection was already closed (possibly by the timer)
+    if (connection.get() == null) {
+      return;
+    }
+
+    // Cancel automatically timing out.
+    usageTimer.cancel();
+
+    // Make the connection unusable.
+    connection.set(null);
+
+    // Tell the connection container that the connection is free to be reused.
+    connectionContainer.returnConnection(this, invalidateConnection);
+  }
+
+  @Override
+  public Optional<PlcTag> parseTagAddress(String tagAddress) {
+    PlcConnection plcConnection = connection.get();
+    if (plcConnection == null) {
+      throw new PlcRuntimeException("Error using leased connection after 
returning it to the cache.");
+    }
+    return plcConnection.parseTagAddress(tagAddress);
+  }
+
+  @Override
+  public Optional<PlcValue> parseTagValue(PlcTag tag, Object... values) {
+    PlcConnection plcConnection = connection.get();
+    if (plcConnection == null) {
+      throw new PlcRuntimeException("Error using leased connection after 
returning it to the cache.");
+    }
+    return plcConnection.parseTagValue(tag, values);
+  }
+
+  @Override
+  public void connect() throws PlcConnectionException {
+    throw new PlcConnectionException("Error connecting leased connection");
+  }
+
+  @Override
+  public boolean isConnected() {
+    PlcConnection plcConnection = connection.get();
+    if (plcConnection == null) {
+      throw new PlcRuntimeException("Error using leased connection after 
returning it to the cache.");
+    }
+    return plcConnection.isConnected();
+  }
+
+  @Override
+  public PlcConnectionMetadata getMetadata() {
+    PlcConnection plcConnection = connection.get();
+    if (plcConnection == null) {
+      throw new PlcRuntimeException("Error using leased connection after 
returning it to the cache.");
+    }
+    return plcConnection.getMetadata();
+  }
+
+  @Override
+  public CompletableFuture<? extends PlcPingResponse> ping() {
+    PlcConnection plcConnection = connection.get();
+    if (plcConnection == null) {
+      throw new PlcRuntimeException("Error using leased connection after 
returning it to the cache.");
+    }
+    return plcConnection.ping();
+  }
+
+  @Override
+  public PlcReadRequest.Builder readRequestBuilder() {
+    PlcConnection plcConnection = connection.get();
+    if (plcConnection == null) {
+      throw new PlcRuntimeException("Error using leased connection after 
returning it to the cache.");
+    }
+    final PlcReadRequest.Builder innerBuilder = 
plcConnection.readRequestBuilder();
+    return new PlcReadRequest.Builder() {
+      @Override
+      public PlcReadRequest build() {
+        final PlcReadRequest innerPlcReadRequest = innerBuilder.build();
+        return new PlcReadRequest() {
+          @Override
+          public CompletableFuture<? extends PlcReadResponse> execute() {
+            CompletableFuture<? extends PlcReadResponse> future =
+                innerPlcReadRequest.execute().orTimeout(Math.min(1000, 
maxUseDuration.toMillis()), TimeUnit.MILLISECONDS);
+            final CompletableFuture<PlcReadResponse> responseFuture = new 
CompletableFuture<>();
+            future.handle((plcReadResponse, throwable) -> {
+              if (throwable == null) {
+                responseFuture.complete(plcReadResponse);
+              } else {
+                // Mark the connection as invalid.
+                invalidateConnection = true;
+                log.debug("ReadRequest execution completed exceptionally 
invalidateConnection=true",
+                    throwable);
+                responseFuture.completeExceptionally(throwable);
+              }
+              return null;
+            });
+            return responseFuture;
+          }
+
+          @Override
+          public int getNumberOfTags() {
+            return innerPlcReadRequest.getNumberOfTags();
+          }
+
+          @Override
+          public LinkedHashSet<String> getTagNames() {
+            return innerPlcReadRequest.getTagNames();
+          }
+
+          @Override
+          public PlcResponseCode getTagResponseCode(String tagName) {
+            return innerPlcReadRequest.getTagResponseCode(tagName);
+          }
+
+          @Override
+          public PlcTag getTag(String name) {
+            return innerPlcReadRequest.getTag(name);
+          }
+
+          @Override
+          public List<PlcTag> getTags() {
+            return innerPlcReadRequest.getTags();
+          }
+        };
+      }
+
+      @Override
+      public PlcReadRequest.Builder addTagAddress(String name, String 
tagAddress) {
+        return innerBuilder.addTagAddress(name, tagAddress);
+      }
+
+      @Override
+      public PlcReadRequest.Builder addTag(String name, PlcTag tag) {
+        return innerBuilder.addTag(name, tag);
+      }
+    };
+  }
+
+  @Override
+  public PlcWriteRequest.Builder writeRequestBuilder() {
+    PlcConnection plcConnection = connection.get();
+    if (plcConnection == null) {
+      throw new PlcRuntimeException("Error using leased connection after 
returning it to the cache.");
+    }
+    final PlcWriteRequest.Builder innerBuilder = 
plcConnection.writeRequestBuilder();
+    return new PlcWriteRequest.Builder() {
+      @Override
+      public PlcWriteRequest build() {
+        PlcWriteRequest innerPlcWriteRequest = innerBuilder.build();
+        return new PlcWriteRequest() {
+          @Override
+          public CompletableFuture<? extends PlcWriteResponse> execute() {
+            CompletableFuture<? extends PlcWriteResponse> future = 
innerPlcWriteRequest.execute();
+            final CompletableFuture<PlcWriteResponse> responseFuture = new 
CompletableFuture<>();
+            future.handle((plcWriteResponse, throwable) -> {
+              if (throwable == null) {
+                responseFuture.complete(plcWriteResponse);
+              } else {
+                // Mark the connection as invalid.
+                invalidateConnection = true;
+                responseFuture.completeExceptionally(throwable);
+              }
+              return null;
+            });
+            return responseFuture;
+          }
+
+          @Override
+          public int getNumberOfValues(String name) {
+            return innerPlcWriteRequest.getNumberOfValues(name);
+          }
+
+          @Override
+          public PlcValue getPlcValue(String name) {
+            return innerPlcWriteRequest.getPlcValue(name);
+          }
+
+          @Override
+          public int getNumberOfTags() {
+            return innerPlcWriteRequest.getNumberOfTags();
+          }
+
+          @Override
+          public LinkedHashSet<String> getTagNames() {
+            return innerPlcWriteRequest.getTagNames();
+          }
+
+          @Override
+          public PlcResponseCode getTagResponseCode(String tagName) {
+            return innerPlcWriteRequest.getTagResponseCode(tagName);
+          }
+
+          @Override
+          public PlcTag getTag(String name) {
+            return innerPlcWriteRequest.getTag(name);
+          }
+
+          @Override
+          public List<PlcTag> getTags() {
+            return innerPlcWriteRequest.getTags();
+          }
+        };
+      }
+
+      @Override
+      public PlcWriteRequest.Builder addTagAddress(String name, String 
tagAddress, Object... values) {
+        return innerBuilder.addTagAddress(name, tagAddress, values);
+      }
+
+      @Override
+      public PlcWriteRequest.Builder addTag(String name, PlcTag tag, Object... 
values) {
+        return innerBuilder.addTag(name, tag, values);
+      }
+    };
+  }
+
+  @Override
+  public PlcSubscriptionRequest.Builder subscriptionRequestBuilder() {
+    PlcConnection plcConnection = connection.get();
+    if (plcConnection == null) {
+      throw new PlcRuntimeException("Error using leased connection after 
returning it to the cache.");
+    }
+    final PlcSubscriptionRequest.Builder innerBuilder = 
plcConnection.subscriptionRequestBuilder();
+    return new PlcSubscriptionRequest.Builder() {
+      @Override
+      public PlcSubscriptionRequest build() {
+        PlcSubscriptionRequest innerPlcSubscriptionRequest = 
innerBuilder.build();
+        return new PlcSubscriptionRequest() {
+          @Override
+          public CompletableFuture<? extends PlcSubscriptionResponse> 
execute() {
+            CompletableFuture<? extends PlcSubscriptionResponse> future = 
innerPlcSubscriptionRequest.execute();
+            final CompletableFuture<PlcSubscriptionResponse> responseFuture = 
new CompletableFuture<>();
+            future.handle((plcSubscriptionResponse, throwable) -> {
+              if (throwable == null) {
+                responseFuture.complete(plcSubscriptionResponse);
+              } else {
+                // Mark the connection as invalid.
+                invalidateConnection = true;
+                responseFuture.completeExceptionally(throwable);
+              }
+              return null;
+            });
+            return responseFuture;
+          }
+
+          @Override
+          public int getNumberOfTags() {
+            return innerPlcSubscriptionRequest.getNumberOfTags();
+          }
+
+          @Override
+          public LinkedHashSet<String> getTagNames() {
+            return innerPlcSubscriptionRequest.getTagNames();
+          }
+
+          @Override
+          public PlcSubscriptionTag getTag(String name) {
+            return innerPlcSubscriptionRequest.getTag(name);
+          }
+
+          @Override
+          public PlcResponseCode getTagResponseCode(String tagName) {
+            return innerPlcSubscriptionRequest.getTagResponseCode(tagName);
+          }
+
+          @Override
+          public List<PlcSubscriptionTag> getTags() {
+            return innerPlcSubscriptionRequest.getTags();
+          }
+
+          @Override
+          public Consumer<PlcSubscriptionEvent> getConsumer() {
+            return innerPlcSubscriptionRequest.getConsumer();
+          }
+
+          @Override
+          public Consumer<PlcSubscriptionEvent> getTagConsumer(String name) {
+            return innerPlcSubscriptionRequest.getTagConsumer(name);
+          }
+        };
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder 
setConsumer(Consumer<PlcSubscriptionEvent> consumer) {
+        return innerBuilder.setConsumer(consumer);
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder addCyclicTagAddress(String name, 
String tagAddress, Duration pollingInterval) {
+        return innerBuilder.addCyclicTagAddress(name, tagAddress, 
pollingInterval);
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder addCyclicTagAddress(String name, 
String tagAddress, Duration pollingInterval, Consumer<PlcSubscriptionEvent> 
consumer) {
+        return innerBuilder.addCyclicTagAddress(name, tagAddress, 
pollingInterval, consumer);
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder addCyclicTag(String name, PlcTag 
tag, Duration pollingInterval) {
+        return innerBuilder.addCyclicTag(name, tag, pollingInterval);
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder addCyclicTag(String name, PlcTag 
tag, Duration pollingInterval, Consumer<PlcSubscriptionEvent> consumer) {
+        return innerBuilder.addCyclicTag(name, tag, pollingInterval, consumer);
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String 
name, String tagAddress) {
+        return innerBuilder.addChangeOfStateTagAddress(name, tagAddress);
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String 
name, String tagAddress, Consumer<PlcSubscriptionEvent> consumer) {
+        return innerBuilder.addChangeOfStateTagAddress(name, tagAddress, 
consumer);
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, 
PlcTag tag) {
+        return innerBuilder.addChangeOfStateTag(name, tag);
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name, 
PlcTag tag, Consumer<PlcSubscriptionEvent> consumer) {
+        return innerBuilder.addChangeOfStateTag(name, tag, consumer);
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder addEventTagAddress(String name, 
String tagAddress) {
+        return innerBuilder.addEventTagAddress(name, tagAddress);
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder addEventTagAddress(String name, 
String tagAddress, Consumer<PlcSubscriptionEvent> consumer) {
+        return innerBuilder.addEventTagAddress(name, tagAddress, consumer);
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder addEventTag(String name, PlcTag 
tag) {
+        return innerBuilder.addEventTag(name, tag);
+      }
+
+      @Override
+      public PlcSubscriptionRequest.Builder addEventTag(String name, PlcTag 
tag, Consumer<PlcSubscriptionEvent> consumer) {
+        return innerBuilder.addEventTag(name, tag, consumer);
+      }
+    };
+  }
+
+  @Override
+  public PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder() {
+    PlcConnection plcConnection = connection.get();
+    if (plcConnection == null) {
+      throw new PlcRuntimeException("Error using leased connection after 
returning it to the cache.");
+    }
+    final PlcUnsubscriptionRequest.Builder innerBuilder = 
plcConnection.unsubscriptionRequestBuilder();
+    return new PlcUnsubscriptionRequest.Builder() {
+      @Override
+      public PlcUnsubscriptionRequest build() {
+        PlcUnsubscriptionRequest innerPlcUnsubscriptionRequest = 
innerBuilder.build();
+        return new PlcUnsubscriptionRequest() {
+          @Override
+          public CompletableFuture<PlcUnsubscriptionResponse> execute() {
+            CompletableFuture<? extends PlcUnsubscriptionResponse> future = 
innerPlcUnsubscriptionRequest.execute();
+            final CompletableFuture<PlcUnsubscriptionResponse> responseFuture 
= new CompletableFuture<>();
+            future.handle((plcUnsubscriptionResponse, throwable) -> {
+              if (throwable == null) {
+                responseFuture.complete(plcUnsubscriptionResponse);
+              } else {
+                // Mark the connection as invalid.
+                invalidateConnection = true;
+                responseFuture.completeExceptionally(throwable);
+              }
+              return null;
+            });
+            return responseFuture;
+          }
+
+          @Override
+          public List<PlcSubscriptionHandle> getSubscriptionHandles() {
+            return innerPlcUnsubscriptionRequest.getSubscriptionHandles();
+          }
+        };
+      }
+
+      @Override
+      public PlcUnsubscriptionRequest.Builder addHandles(PlcSubscriptionHandle 
plcSubscriptionHandle) {
+        return innerBuilder.addHandles(plcSubscriptionHandle);
+      }
+
+      @Override
+      public PlcUnsubscriptionRequest.Builder addHandles(PlcSubscriptionHandle 
plcSubscriptionHandle1, PlcSubscriptionHandle... plcSubscriptionHandles) {
+        return innerBuilder.addHandles(plcSubscriptionHandle1, 
plcSubscriptionHandles);
+      }
+
+      @Override
+      public PlcUnsubscriptionRequest.Builder 
addHandles(Collection<PlcSubscriptionHandle> plcSubscriptionHandle) {
+        return innerBuilder.addHandles(plcSubscriptionHandle);
+      }
+    };
+  }
+
+  @Override
+  public PlcBrowseRequest.Builder browseRequestBuilder() {
+    PlcConnection plcConnection = connection.get();
+    if (plcConnection == null) {
+      throw new PlcRuntimeException("Error using leased connection after 
returning it to the cache.");
+    }
+    final PlcBrowseRequest.Builder innerBuilder = 
plcConnection.browseRequestBuilder();
+    return new PlcBrowseRequest.Builder() {
+      @Override
+      public PlcBrowseRequest build() {
+        PlcBrowseRequest innerPlcBrowseRequest = innerBuilder.build();
+        return new PlcBrowseRequest() {
+          @Override
+          public CompletableFuture<? extends PlcBrowseResponse> execute() {
+            CompletableFuture<? extends PlcBrowseResponse> future = 
innerPlcBrowseRequest.execute();
+            final CompletableFuture<PlcBrowseResponse> responseFuture = new 
CompletableFuture<>();
+            future.handle((plcBrowseResponse, throwable) -> {
+              if (throwable == null) {
+                responseFuture.complete(plcBrowseResponse);
+              } else {
+                // Mark the connection as invalid.
+                invalidateConnection = true;
+                responseFuture.completeExceptionally(throwable);
+              }
+              return null;
+            });
+            return responseFuture;
+          }
+
+          @Override
+          public CompletableFuture<? extends PlcBrowseResponse> 
executeWithInterceptor(PlcBrowseRequestInterceptor interceptor) {
+            CompletableFuture<? extends PlcBrowseResponse> future = 
innerPlcBrowseRequest.executeWithInterceptor(interceptor);
+            final CompletableFuture<PlcBrowseResponse> responseFuture = new 
CompletableFuture<>();
+            future.handle((plcBrowseResponse, throwable) -> {
+              if (throwable == null) {
+                responseFuture.complete(plcBrowseResponse);
+              } else {
+                // Mark the connection as invalid.
+                invalidateConnection = true;
+                responseFuture.completeExceptionally(throwable);
+              }
+              return null;
+            });
+            return responseFuture;
+          }
+
+          @Override
+          public LinkedHashSet<String> getQueryNames() {
+            return innerPlcBrowseRequest.getQueryNames();
+          }
+
+          @Override
+          public PlcQuery getQuery(String name) {
+            return innerPlcBrowseRequest.getQuery(name);
+          }
+        };
+      }
+
+      @Override
+      public PlcBrowseRequest.Builder addQuery(String name, String query) {
+        return innerBuilder.addQuery(name, query);
+      }
+    };
+  }
+
+  @Override
+  public void addEventListener(EventListener listener) {
+    connectionContainer.addEventListener(listener);
+  }
+
+  @Override
+  public void removeEventListener(EventListener listener) {
+    connectionContainer.removeEventListener(listener);
+  }
+
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-plc/src/test/java/org/apache/streampipes/extensions/connectors/plc/adapter/ConnectionContainerReproTest.java
 
b/streampipes-extensions/streampipes-connectors-plc/src/test/java/org/apache/streampipes/extensions/connectors/plc/adapter/ConnectionContainerReproTest.java
new file mode 100644
index 0000000000..8dfc646e06
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-plc/src/test/java/org/apache/streampipes/extensions/connectors/plc/adapter/ConnectionContainerReproTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.plc.adapter;
+
+import 
org.apache.streampipes.extensions.connectors.plc.cache.SpConnectionContainer;
+import 
org.apache.streampipes.extensions.connectors.plc.cache.SpLeasedPlcConnection;
+
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.plc4x.java.api.authentication.PlcAuthentication;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcBrowseRequest;
+import org.apache.plc4x.java.api.messages.PlcPingResponse;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.metadata.PlcConnectionMetadata;
+import org.apache.plc4x.java.api.model.PlcTag;
+import org.apache.plc4x.java.api.value.PlcValue;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ConnectionContainerReproTest {
+
+  static class FlakyManager implements PlcConnectionManager {
+    final AtomicInteger calls = new AtomicInteger();
+    final PlcConnection c1 = new DummyConnection();
+    final PlcConnection c2 = new DummyConnection();
+
+    @Override
+    public PlcConnection getConnection(String url) throws 
PlcConnectionException {
+      int n = calls.getAndIncrement();
+      if (n == 0) return c1;                           // initial success
+      if (n == 1) throw new PlcConnectionException("PLC down"); // reconnect 
fails once
+      return c2;                                       // would succeed later
+    }
+
+    @Override
+    public PlcConnection getConnection(String s, PlcAuthentication 
plcAuthentication) throws PlcConnectionException {
+      return null;
+    }
+  }
+
+  static class DummyConnection implements PlcConnection {
+    @Override
+    public void connect() {
+    }
+
+    @Override
+    public boolean isConnected() {
+      return true;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public Optional<PlcTag> parseTagAddress(String s) {
+      return Optional.empty();
+    }
+
+    @Override
+    public Optional<PlcValue> parseTagValue(PlcTag plcTag, Object... objects) {
+      return Optional.empty();
+    }
+
+    @Override
+    public PlcConnectionMetadata getMetadata() {
+      return null;
+    }
+
+    @Override
+    public CompletableFuture<? extends PlcPingResponse> ping() {
+      return null;
+    }
+
+    @Override
+    public PlcReadRequest.Builder readRequestBuilder() {
+      return null;
+    }
+
+    @Override
+    public PlcWriteRequest.Builder writeRequestBuilder() {
+      return null;
+    }
+
+    @Override
+    public PlcSubscriptionRequest.Builder subscriptionRequestBuilder() {
+      return null;
+    }
+
+    @Override
+    public PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder() {
+      return null;
+    }
+
+    @Override
+    public PlcBrowseRequest.Builder browseRequestBuilder() {
+      return null;
+    }
+    // implement other methods as no-ops if your interface requires them
+  }
+
+  @Test
+  void recoversAfterFailedReconnectAndServesNewLeases() throws Exception {
+    FlakyManager mgr = new FlakyManager();
+    SpConnectionContainer cc = new SpConnectionContainer(
+        mgr, "mock://plc",
+        Duration.ofSeconds(30), Duration.ofSeconds(30),
+        url -> null // closeConnectionHandler
+    );
+
+    // 1) First caller gets a lease immediately.
+    SpLeasedPlcConnection lease1 =
+        (SpLeasedPlcConnection) cc.lease().get(500, TimeUnit.MILLISECONDS);
+
+    // 2) Second caller queues up (does not complete yet).
+    Future<PlcConnection> queued = cc.lease();
+
+    // 3) Return with invalidate=true while reconnect will THROW.
+    cc.returnConnection(lease1, true);
+
+    // 3a) The queued future should have been completed exceptionally (queue 
drained).
+    assertThrows(ExecutionException.class, () -> queued.get(200, 
TimeUnit.MILLISECONDS));
+
+    // 4) Now a new lease should succeed quickly (manager will succeed on next 
call).
+    PlcConnection lease2 = cc.lease().get(500, TimeUnit.MILLISECONDS);
+    assertNotNull(lease2, "Expected a fresh lease after recovery");
+    assertEquals(3, mgr.calls.get(), "Expected 3 getConnection() calls: 
success, fail, success");
+
+    // 5) Return normally and ensure subsequent leasing still works.
+    cc.returnConnection((SpLeasedPlcConnection) lease2, false);
+    PlcConnection lease3 = cc.lease().get(500, TimeUnit.MILLISECONDS);
+    assertNotNull(lease3);
+  }
+}

Reply via email to