This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new d866764436 feat: Bump PLC4X version, improve connection cache
management (#3744)
d866764436 is described below
commit d866764436340c9ab91cb0e9422026e76ff0fc56
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri Nov 7 08:14:39 2025 +0100
feat: Bump PLC4X version, improve connection cache management (#3744)
Co-authored-by: Philipp Zehnder <[email protected]>
---
pom.xml | 4 +-
.../management/connect/PullAdapterScheduler.java | 5 +-
.../streampipes-connectors-plc/pom.xml | 6 +-
.../connectors/plc/PlcConnectorsModuleExport.java | 4 +-
.../connection/ContinuousPlcRequestReader.java | 32 +-
.../generic/connection/PlcEventGenerator.java | 7 +-
.../generic/model/Plc4xConnectionExtractor.java | 11 +-
.../connectors/plc/adapter/s7/Plc4xS7Adapter.java | 2 +-
.../plc/cache/SpCachedPlcConnectionManager.java | 178 +++++++
.../plc/cache/SpConnectionContainer.java | 235 +++++++++
.../plc/cache/SpLeasedPlcConnection.java | 575 +++++++++++++++++++++
.../plc/adapter/ConnectionContainerReproTest.java | 167 ++++++
12 files changed, 1197 insertions(+), 29 deletions(-)
diff --git a/pom.xml b/pom.xml
index 367abf1482..87b3e4324a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -112,7 +112,7 @@
<okhttp.version>3.13.1</okhttp.version>
<opencsv.version>5.9</opencsv.version>
<opennlp.version>2.3.1</opennlp.version>
- <plc4x.version>0.12.0</plc4x.version>
+ <plc4x.version>0.13.1</plc4x.version>
<plexus-component-annotations.version>2.2.0</plexus-component-annotations.version>
<plexus-interactivity-api.version>1.1</plexus-interactivity-api.version>
<plexus-utils.version>4.0.0</plexus-utils.version>
@@ -1193,8 +1193,6 @@
</profiles>
<!-- Build Settings -->
-
-
<build>
<!-- Plugin Management -->
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
index 0daec7c47b..1c501fe288 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
@@ -26,6 +26,7 @@ import org.apache.streampipes.model.monitoring.SpLogMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -47,8 +48,8 @@ public class PullAdapterScheduler {
final Runnable task = () -> {
try {
pullAdapter.pullData();
- } catch (ExecutionException | InterruptedException | TimeoutException e)
{
- LOG.error("Error while pulling data", e);
+ } catch (ExecutionException | InterruptedException | TimeoutException |
CompletionException e) {
+ LOG.error("Error while pulling data: {}", e.getMessage());
SpMonitoringManager.INSTANCE.addErrorMessage(
adapterElementId,
SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(e))
diff --git a/streampipes-extensions/streampipes-connectors-plc/pom.xml
b/streampipes-extensions/streampipes-connectors-plc/pom.xml
index 2431d0b668..5754d4b732 100644
--- a/streampipes-extensions/streampipes-connectors-plc/pom.xml
+++ b/streampipes-extensions/streampipes-connectors-plc/pom.xml
@@ -52,6 +52,11 @@
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.plc4x</groupId>
+ <artifactId>plc4j-spi</artifactId>
+ <version>${plc4x.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.plc4x</groupId>
<artifactId>plc4j-driver-all</artifactId>
@@ -78,7 +83,6 @@
<version>${plc4x.version}</version>
</dependency>
-
<!-- Test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
diff --git
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
index 59db43c181..6f53414919 100644
---
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
+++
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
@@ -28,9 +28,9 @@ import
org.apache.streampipes.extensions.connectors.plc.adapter.migration.Plc4xM
import
org.apache.streampipes.extensions.connectors.plc.adapter.migration.Plc4xS7AdapterMigrationV1;
import
org.apache.streampipes.extensions.connectors.plc.adapter.modbus.Plc4xModbusAdapter;
import
org.apache.streampipes.extensions.connectors.plc.adapter.s7.Plc4xS7Adapter;
+import
org.apache.streampipes.extensions.connectors.plc.cache.SpCachedPlcConnectionManager;
import org.apache.plc4x.java.api.PlcDriverManager;
-import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
import java.time.Duration;
import java.util.ArrayList;
@@ -43,7 +43,7 @@ public class PlcConnectorsModuleExport implements
IExtensionModuleExport {
public List<StreamPipesAdapter> adapters() {
var env = Environments.getEnvironment();
var driverManager = PlcDriverManager.getDefault();
- var cachedConnectionManager = CachedPlcConnectionManager
+ var cachedConnectionManager = SpCachedPlcConnectionManager
.getBuilder(driverManager.getConnectionManager())
.withMaxWaitTime(Duration.ofMillis(env.getPlc4xMaxWaitTimeMs().getValueOrDefault()))
.withMaxLeaseTime(Duration.ofMillis(env.getPlc4xMaxLeaseTimeMs().getValueOrDefault()))
diff --git
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
index 639f4e0534..7306ead5a5 100644
---
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
+++
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
@@ -69,22 +69,20 @@ public class ContinuousPlcRequestReader
private void connectAndReadPlcData() {
try (PlcConnection plcConnection =
connectionManager.getConnection(settings.connectionString())) {
- var readRequest = requestProvider.makeReadRequest(plcConnection,
settings.nodes());
- var readResponse = readRequest.execute()
- .get(5000, TimeUnit.MILLISECONDS);
- processPlcReadResponse(readResponse);
+ if (plcConnection.isConnected()) {
+ var readRequest = requestProvider.makeReadRequest(plcConnection,
settings.nodes());
+ var readResponse = readRequest.execute()
+ .get(5000, TimeUnit.MILLISECONDS);
+ processPlcReadResponse(readResponse);
+ } else {
+ handleFailingPlcRead("Not connected");
+ }
} catch (Exception e) {
- handleFailingPlcRead(e);
+ handleFailingPlcRead(e.getMessage());
}
}
- private void processPlcReadResponse(PlcReadResponse readResponse) {
- var event = eventGenerator.makeEvent(readResponse);
- collector.collect(event);
- this.resetIdlePulls();
- }
-
- private void handleFailingPlcRead(Exception e) {
+ private void handleFailingPlcRead(String problem) {
// ensure that the cached connection manager removes the broken connection
if (connectionManager instanceof CachedPlcConnectionManager) {
((CachedPlcConnectionManager)
connectionManager).removeCachedConnection(settings.connectionString());
@@ -98,13 +96,19 @@ public class ContinuousPlcRequestReader
}
LOG.error(
- "Error while reading from PLC with connection string {}. Setting
adapter to idle for {} attemtps. {} ",
- settings.connectionString(), idlePullsBeforeNextAttempt, e.getMessage()
+ "Error while reading from PLC with connection string {}. Setting
adapter to idle for {} attempts. {} ",
+ settings.connectionString(), idlePullsBeforeNextAttempt, problem
);
currentIdlePulls = 0;
}
+ private void processPlcReadResponse(PlcReadResponse readResponse) {
+ var event = eventGenerator.makeEvent(readResponse);
+ collector.collect(event);
+ this.resetIdlePulls();
+ }
+
private void idleRead() {
LOG.debug("Skipping pullData call for {}. Idle pulls left: {}",
settings.connectionString(), idlePullsBeforeNextAttempt -
currentIdlePulls);
diff --git
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/PlcEventGenerator.java
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/PlcEventGenerator.java
index d0dabef4f3..5f0d3099e2 100644
---
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/PlcEventGenerator.java
+++
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/PlcEventGenerator.java
@@ -25,7 +25,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
public class PlcEventGenerator {
@@ -43,9 +42,8 @@ public class PlcEventGenerator {
for (String key : nodes.keySet()) {
if (response.getResponseCode(key) == PlcResponseCode.OK) {
-
// if the response is a list, add each element to the result
- if (response.getObject(key) instanceof List) {
+ if (response.getAsPlcValue().getValue(key).isList()) {
event.put(key,
response.getAsPlcValue()
.getValue(key)
@@ -57,8 +55,7 @@ public class PlcEventGenerator {
event.put(key, response.getObject(key));
}
} else {
- LOG.error("Error[" + key + "]: "
- + response.getResponseCode(key).name());
+ LOG.error("Error[{}]: {}", key, response.getResponseCode(key).name());
}
}
return event;
diff --git
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/model/Plc4xConnectionExtractor.java
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/model/Plc4xConnectionExtractor.java
index 2ed2a13a2b..ce928b1eeb 100644
---
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/model/Plc4xConnectionExtractor.java
+++
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/model/Plc4xConnectionExtractor.java
@@ -44,6 +44,7 @@ public class Plc4xConnectionExtractor {
}
public Plc4xConnectionSettings makeSettings() {
+ var modifiedProtocolCode = checkAndOverrideProtocolCode();
var host = extractHost();
var transportCode = extractTransportCode();
var transportConfigs = extractTransportMetadata(transportCode);
@@ -51,12 +52,20 @@ public class Plc4xConnectionExtractor {
var configParameters = makeConfigParameters(transportConfigs,
protocolConfigs);
return new Plc4xConnectionSettings(
- getConnectionString(host, transportCode, protocolCode,
configParameters),
+ getConnectionString(host, transportCode, modifiedProtocolCode,
configParameters),
extractor.singleValueParameter(Plc4xLabels.PLC_POLLING_INTERVAL,
Integer.class),
extractNodes()
);
}
+ private String checkAndOverrideProtocolCode() {
+ if (protocolCode.equals("s7")) {
+ return "s7-light";
+ } else {
+ return protocolCode;
+ }
+ }
+
private String getConnectionString(String host,
String transportCode,
String protocolCode,
diff --git
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java
index 1c7657a79a..9355b9f79f 100644
---
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java
+++
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/s7/Plc4xS7Adapter.java
@@ -65,7 +65,7 @@ public class Plc4xS7Adapter implements StreamPipesAdapter {
*/
public static final String ID =
"org.apache.streampipes.connect.iiot.adapters.plc4x.s7";
- private static final String S7_URL = "s7://";
+ private static final String S7_URL = "s7-light://";
/**
* Keys of user configuration parameters
diff --git
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpCachedPlcConnectionManager.java
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpCachedPlcConnectionManager.java
new file mode 100644
index 0000000000..f21f008899
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpCachedPlcConnectionManager.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.plc.cache;
+
+import org.apache.plc4x.java.DefaultPlcDriverManager;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.plc4x.java.api.authentication.PlcAuthentication;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import
org.apache.plc4x.java.utils.cache.exceptions.PlcConnectionManagerClosedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpCachedPlcConnectionManager implements PlcConnectionManager,
AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SpCachedPlcConnectionManager.class);
+
+ private final PlcConnectionManager connectionManager;
+ private final Duration maxLeaseTime;
+ private final Duration maxWaitTime;
+ private final Duration maxIdleTime;
+
+ private final Map<String, SpConnectionContainer> connectionContainers;
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ public static SpCachedPlcConnectionManager.Builder getBuilder() {
+ return new SpCachedPlcConnectionManager.Builder(new
DefaultPlcDriverManager());
+ }
+
+ public static SpCachedPlcConnectionManager.Builder
getBuilder(PlcConnectionManager connectionManager) {
+ return new SpCachedPlcConnectionManager.Builder(connectionManager);
+ }
+
+ public SpCachedPlcConnectionManager(PlcConnectionManager connectionManager,
Duration maxLeaseTime, Duration maxWaitTime, Duration maxIdleTime) {
+ this.connectionManager = connectionManager;
+ this.maxLeaseTime = maxLeaseTime;
+ this.maxWaitTime = maxWaitTime;
+ this.maxIdleTime = maxIdleTime;
+ this.connectionContainers = new HashMap<>();
+ }
+
+ /**
+ * @return set of connection-urls the CachedPlcConnectionManager is
currently managing.
+ */
+ public Set<String> getCachedConnections() {
+ synchronized (connectionContainers) {
+ return connectionContainers.keySet();
+ }
+ }
+
+ /**
+ * Removes a given connection from the cache (Should only be used in order
to remove somehow broken connections).
+ *
+ * @param url url of the connection that should be removed.
+ */
+ public void removeCachedConnection(String url) {
+ synchronized (connectionContainers) {
+ // Make sure the connection is closed before removing it.
+ if (connectionContainers.containsKey(url)) {
+ connectionContainers.get(url).close();
+ }
+ connectionContainers.remove(url);
+ }
+ }
+
+ public PlcConnection getConnection(String url) throws PlcConnectionException
{
+ // If the connection manager is already closed, abort.
+ if (closed.get()) {
+ throw new PlcConnectionManagerClosedException();
+ }
+
+ // Get a connection container for the given url.
+ SpConnectionContainer connectionContainer;
+ synchronized (connectionContainers) {
+ connectionContainer = connectionContainers.get(url);
+ if (connectionContainer == null) {
+ LOG.debug("Creating new connection");
+
+ // Crate a connection container to manage handling this connection
+ connectionContainer = new SpConnectionContainer(connectionManager,
url, maxLeaseTime, maxIdleTime,
+ closeConnection -> {
+ removeCachedConnection(closeConnection);
+ return null;
+ });
+ connectionContainers.put(url, connectionContainer);
+ } else {
+ LOG.debug("Reusing exising connection");
+ }
+ }
+
+ // Get a lease (a future for a connection)
+ Future<PlcConnection> leaseFuture = connectionContainer.lease();
+ try {
+ return leaseFuture.get(this.maxWaitTime.toMillis(),
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException | InterruptedException | TimeoutException e) {
+ throw new PlcConnectionException("Error acquiring lease for connection");
+ }
+ }
+
+ public PlcConnection getConnection(String url, PlcAuthentication
authentication) throws PlcConnectionException {
+ throw new PlcConnectionException("the cached driver manager currently
doesn't support authentication");
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Set the cache to "closed" so no new connections can be requested.
+ closed.set(true);
+
+ // Tell all connections to close themselves.
+ connectionContainers.forEach((connectionString, connectionContainer) -> {
+ connectionContainer.close();
+ });
+ }
+
+ public static class Builder {
+
+ private final PlcConnectionManager connectionManager;
+ private Duration maxLeaseTime;
+ private Duration maxWaitTime;
+ private Duration maxIdleTime;
+
+ public Builder(PlcConnectionManager connectionManager) {
+ this.connectionManager = connectionManager;
+ this.maxLeaseTime = Duration.ofSeconds(4);
+ this.maxWaitTime = Duration.ofSeconds(20);
+ this.maxIdleTime = Duration.ofMinutes(5);
+ }
+
+ public SpCachedPlcConnectionManager build() {
+ return new SpCachedPlcConnectionManager(
+ this.connectionManager, this.maxLeaseTime, this.maxWaitTime,
this.maxIdleTime);
+ }
+
+ public SpCachedPlcConnectionManager.Builder withMaxLeaseTime(Duration
maxLeaseTime) {
+ this.maxLeaseTime = maxLeaseTime;
+ return this;
+ }
+
+ public SpCachedPlcConnectionManager.Builder withMaxWaitTime(Duration
maxWaitTime) {
+ this.maxWaitTime = maxWaitTime;
+ return this;
+ }
+
+ public SpCachedPlcConnectionManager.Builder withMaxIdleTime(Duration
maxIdleTime) {
+ this.maxIdleTime = maxIdleTime;
+ return this;
+ }
+ }
+
+}
+
diff --git
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpConnectionContainer.java
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpConnectionContainer.java
new file mode 100644
index 0000000000..6d9e324889
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpConnectionContainer.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.plc.cache;
+
+import io.netty.channel.ConnectTimeoutException;
+import org.apache.plc4x.java.api.EventPlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.listener.EventListener;
+import
org.apache.plc4x.java.utils.cache.exceptions.PlcConnectionManagerClosedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.function.Function;
+
+public class SpConnectionContainer {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SpConnectionContainer.class);
+ private final PlcConnectionManager connectionManager;
+ private final String connectionUrl;
+ private final Duration maxLeaseTime;
+ private final Duration maxIdleTime;
+ private final Function<String, Void> closeConnectionHandler;
+ private final Queue<CompletableFuture<PlcConnection>> queue;
+
+ private PlcConnection connection;
+ private SpLeasedPlcConnection leasedConnection;
+ private Timer idleTimer;
+
+ public SpConnectionContainer(PlcConnectionManager connectionManager, String
connectionUrl,
+ Duration maxLeaseTime, Duration maxIdleTime,
+ Function<String, Void> closeConnectionHandler) {
+ this.connectionManager = connectionManager;
+ this.connectionUrl = connectionUrl;
+ this.maxLeaseTime = maxLeaseTime;
+ this.maxIdleTime = maxIdleTime;
+ this.closeConnectionHandler = closeConnectionHandler;
+ this.queue = new LinkedList<>();
+ this.connection = null;
+ this.leasedConnection = null;
+ }
+
+ public synchronized void close() {
+ // Close all waiting clients exceptionally.
+ queue.forEach(plcConnectionCompletableFuture ->
+ plcConnectionCompletableFuture.completeExceptionally(new
PlcConnectionManagerClosedException()));
+
+ // Clear the queue.
+ queue.clear();
+
+ // Stop the idle timer.
+ if (idleTimer != null) {
+ idleTimer.cancel();
+ idleTimer.purge();
+ idleTimer = null;
+ }
+
+ // If the connection is currently used, close it.
+ if (leasedConnection != null) {
+ try {
+ leasedConnection.closeConnection();
+ leasedConnection = null;
+ } catch (Exception e) {
+ // Ignore this ...
+ }
+ } else {
+ try {
+ connection.close();
+ connection = null;
+ } catch (Exception e) {
+ // Ignore this ...
+ }
+ }
+ }
+
+ public synchronized Future<PlcConnection> lease() {
+ CompletableFuture<PlcConnection> connectionFuture = new
CompletableFuture<>();
+
+ // Try to get a new connection, if we haven't got one yet.
+ if (connection == null) {
+ try {
+ connection = connectionManager.getConnection(connectionUrl);
+ } catch (PlcConnectionException e) {
+ Throwable root = rootCause(e);
+ if (root instanceof ConnectTimeoutException) {
+ // one-liner — no stacktrace at WARN
+ LOGGER.warn("PLC connect timeout to {}: {}", connectionUrl,
root.getMessage());
+ // full detail only when debugging
+ LOGGER.debug("Connect timeout details", e);
+ } else {
+ // concise by default, full at DEBUG
+ LOGGER.warn("Failed to connect to {}: {}", connectionUrl,
+ (root != null ? root.toString() : e.toString()));
+ LOGGER.debug("Connection failure details", e);
+ }
+ connectionFuture.completeExceptionally(e);
+ return connectionFuture;
+ }
+ }
+
+ // If the connection is currently idle, return the connection immediately.
+ if (leasedConnection == null) {
+ leasedConnection = new SpLeasedPlcConnection(this, connection,
maxLeaseTime);
+ connectionFuture.complete(leasedConnection);
+ } else {
+ queue.add(connectionFuture);
+ }
+
+ // Stop the idle timer.
+ if (idleTimer != null) {
+ idleTimer.cancel();
+ idleTimer.purge();
+ idleTimer = null;
+ }
+
+ return connectionFuture;
+ }
+
+ public synchronized void returnConnection(SpLeasedPlcConnection
returnedLeasedConnection, boolean invalidateConnection) {
+ if (returnedLeasedConnection != leasedConnection) {
+ LOGGER.error("Error trying to return lease from invalid connection:
returned={} leased={}",
+ returnedLeasedConnection, leasedConnection);
+ throw new PlcRuntimeException("Error trying to return lease from invalid
connection");
+ }
+
+ // If something happened while using the connection, invalidate this one
and create a new connection.
+ if (invalidateConnection) {
+ // Close the old connection.
+ try {
+ connection.close();
+ } catch (Exception e) {
+ // We're ignoring this as we have no idea, what state the connection
is in.
+ // Nevertheless, it is polite to say something in logs about this
situation.
+ LOGGER.warn("Exception while closing connection", e);
+ }
+
+ // Try to get a new connection.
+ try {
+ connection = connectionManager.getConnection(connectionUrl);
+ } catch (PlcConnectionException e) {
+ // If something goes wrong, close all waiting futures exceptionally.
+ LOGGER.warn("Can't get connection for {} complete queue items
exceptionally", connectionUrl, e);
+ queue.forEach(future -> future.completeExceptionally(e));
+ queue.clear();
+ leasedConnection = null;
+ connection = null;
+ }
+ }
+
+ // If the queue is empty, simply return.
+ if (queue.isEmpty()) {
+ leasedConnection = null;
+
+ // Start a timer to invalidate this connection if it's idle for too long.
+ idleTimer = new Timer("CC-Idle-Timer-" + Thread.currentThread().getId());
+ idleTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (Exception e) {
+ // Ignore ...
+ }
+ }
+ closeConnectionHandler.apply(connectionUrl);
+ }
+ }, maxIdleTime.toMillis());
+ return;
+ }
+
+ if (connection == null) {
+ // Defensive: should not happen, but avoid creating a zombie.
+ queue.forEach(f -> f.completeExceptionally(new PlcRuntimeException("No
connection available")));
+ queue.clear();
+ leasedConnection = null;
+ return;
+ }
+
+ // Create a new lease and complete the next future in the queue with this.
+ leasedConnection = new SpLeasedPlcConnection(this, connection,
maxLeaseTime);
+ CompletableFuture<PlcConnection> leaseFuture = queue.poll();
+ if (leaseFuture != null) {
+ leaseFuture.complete(leasedConnection);
+ }
+ }
+
+
+ public void addEventListener(EventListener listener) {
+ if ((connection != null) && (connection instanceof EventPlcConnection)) {
+ ((EventPlcConnection) connection).addEventListener(listener);
+ }
+ }
+
+ public void removeEventListener(EventListener listener) {
+ if ((connection != null) && (connection instanceof EventPlcConnection)) {
+ ((EventPlcConnection) connection).removeEventListener(listener);
+ }
+ }
+
+ private static Throwable rootCause(Throwable t) {
+ Throwable c = t;
+ while (c.getCause() != null && c.getCause() != c) {
+ c = c.getCause();
+ }
+ return c;
+ }
+
+
+}
+
diff --git
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpLeasedPlcConnection.java
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpLeasedPlcConnection.java
new file mode 100644
index 0000000000..c008518a2b
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/cache/SpLeasedPlcConnection.java
@@ -0,0 +1,575 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.plc.cache;
+
+import org.apache.plc4x.java.api.EventPlcConnection;
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.exceptions.PlcRuntimeException;
+import org.apache.plc4x.java.api.listener.EventListener;
+import org.apache.plc4x.java.api.messages.PlcBrowseRequest;
+import org.apache.plc4x.java.api.messages.PlcBrowseRequestInterceptor;
+import org.apache.plc4x.java.api.messages.PlcBrowseResponse;
+import org.apache.plc4x.java.api.messages.PlcPingResponse;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionEvent;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionResponse;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteResponse;
+import org.apache.plc4x.java.api.metadata.PlcConnectionMetadata;
+import org.apache.plc4x.java.api.model.PlcQuery;
+import org.apache.plc4x.java.api.model.PlcSubscriptionHandle;
+import org.apache.plc4x.java.api.model.PlcSubscriptionTag;
+import org.apache.plc4x.java.api.model.PlcTag;
+import org.apache.plc4x.java.api.types.PlcResponseCode;
+import org.apache.plc4x.java.api.value.PlcValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+public class SpLeasedPlcConnection implements EventPlcConnection {
+
+ private static final Logger log =
LoggerFactory.getLogger(SpLeasedPlcConnection.class);
+ private final SpConnectionContainer connectionContainer;
+ private final AtomicReference<PlcConnection> connection;
+ private boolean invalidateConnection;
+ private final Timer usageTimer;
+ private final Duration maxUseDuration;
+
+ SpLeasedPlcConnection(SpConnectionContainer connectionContainer,
PlcConnection connection, Duration maxUseTime) {
+ this.connectionContainer = connectionContainer;
+ this.connection = new AtomicReference<>(connection);
+ this.invalidateConnection = false;
+ this.usageTimer = new Timer("CC-Usage-Timer-" +
Thread.currentThread().getId());
+ this.maxUseDuration = maxUseTime;
+ this.usageTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ close();
+ }
+ },
Date.from(LocalDateTime.now().plusNanos(maxUseTime.toNanos()).atZone(ZoneId.systemDefault()).toInstant()));
+ }
+
+ public synchronized void closeConnection() throws Exception {
+ // Get the real connection and close it.
+ PlcConnection plcConnection = connection.get();
+ if (plcConnection != null) {
+ plcConnection.close();
+ }
+
+ // Close the LeasedPlcConnection.
+ close();
+ }
+
+ @Override
+ public synchronized void close() {
+ // In this case the connection was already closed (possibly by the timer)
+ if (connection.get() == null) {
+ return;
+ }
+
+ // Cancel automatically timing out.
+ usageTimer.cancel();
+
+ // Make the connection unusable.
+ connection.set(null);
+
+ // Tell the connection container that the connection is free to be reused.
+ connectionContainer.returnConnection(this, invalidateConnection);
+ }
+
+ @Override
+ public Optional<PlcTag> parseTagAddress(String tagAddress) {
+ PlcConnection plcConnection = connection.get();
+ if (plcConnection == null) {
+ throw new PlcRuntimeException("Error using leased connection after
returning it to the cache.");
+ }
+ return plcConnection.parseTagAddress(tagAddress);
+ }
+
+ @Override
+ public Optional<PlcValue> parseTagValue(PlcTag tag, Object... values) {
+ PlcConnection plcConnection = connection.get();
+ if (plcConnection == null) {
+ throw new PlcRuntimeException("Error using leased connection after
returning it to the cache.");
+ }
+ return plcConnection.parseTagValue(tag, values);
+ }
+
+ @Override
+ public void connect() throws PlcConnectionException {
+ throw new PlcConnectionException("Error connecting leased connection");
+ }
+
+ @Override
+ public boolean isConnected() {
+ PlcConnection plcConnection = connection.get();
+ if (plcConnection == null) {
+ throw new PlcRuntimeException("Error using leased connection after
returning it to the cache.");
+ }
+ return plcConnection.isConnected();
+ }
+
+ @Override
+ public PlcConnectionMetadata getMetadata() {
+ PlcConnection plcConnection = connection.get();
+ if (plcConnection == null) {
+ throw new PlcRuntimeException("Error using leased connection after
returning it to the cache.");
+ }
+ return plcConnection.getMetadata();
+ }
+
+ @Override
+ public CompletableFuture<? extends PlcPingResponse> ping() {
+ PlcConnection plcConnection = connection.get();
+ if (plcConnection == null) {
+ throw new PlcRuntimeException("Error using leased connection after
returning it to the cache.");
+ }
+ return plcConnection.ping();
+ }
+
+ @Override
+ public PlcReadRequest.Builder readRequestBuilder() {
+ PlcConnection plcConnection = connection.get();
+ if (plcConnection == null) {
+ throw new PlcRuntimeException("Error using leased connection after
returning it to the cache.");
+ }
+ final PlcReadRequest.Builder innerBuilder =
plcConnection.readRequestBuilder();
+ return new PlcReadRequest.Builder() {
+ @Override
+ public PlcReadRequest build() {
+ final PlcReadRequest innerPlcReadRequest = innerBuilder.build();
+ return new PlcReadRequest() {
+ @Override
+ public CompletableFuture<? extends PlcReadResponse> execute() {
+ CompletableFuture<? extends PlcReadResponse> future =
+ innerPlcReadRequest.execute().orTimeout(Math.min(1000,
maxUseDuration.toMillis()), TimeUnit.MILLISECONDS);
+ final CompletableFuture<PlcReadResponse> responseFuture = new
CompletableFuture<>();
+ future.handle((plcReadResponse, throwable) -> {
+ if (throwable == null) {
+ responseFuture.complete(plcReadResponse);
+ } else {
+ // Mark the connection as invalid.
+ invalidateConnection = true;
+ log.debug("ReadRequest execution completed exceptionally
invalidateConnection=true",
+ throwable);
+ responseFuture.completeExceptionally(throwable);
+ }
+ return null;
+ });
+ return responseFuture;
+ }
+
+ @Override
+ public int getNumberOfTags() {
+ return innerPlcReadRequest.getNumberOfTags();
+ }
+
+ @Override
+ public LinkedHashSet<String> getTagNames() {
+ return innerPlcReadRequest.getTagNames();
+ }
+
+ @Override
+ public PlcResponseCode getTagResponseCode(String tagName) {
+ return innerPlcReadRequest.getTagResponseCode(tagName);
+ }
+
+ @Override
+ public PlcTag getTag(String name) {
+ return innerPlcReadRequest.getTag(name);
+ }
+
+ @Override
+ public List<PlcTag> getTags() {
+ return innerPlcReadRequest.getTags();
+ }
+ };
+ }
+
+ @Override
+ public PlcReadRequest.Builder addTagAddress(String name, String
tagAddress) {
+ return innerBuilder.addTagAddress(name, tagAddress);
+ }
+
+ @Override
+ public PlcReadRequest.Builder addTag(String name, PlcTag tag) {
+ return innerBuilder.addTag(name, tag);
+ }
+ };
+ }
+
+ @Override
+ public PlcWriteRequest.Builder writeRequestBuilder() {
+ PlcConnection plcConnection = connection.get();
+ if (plcConnection == null) {
+ throw new PlcRuntimeException("Error using leased connection after
returning it to the cache.");
+ }
+ final PlcWriteRequest.Builder innerBuilder =
plcConnection.writeRequestBuilder();
+ return new PlcWriteRequest.Builder() {
+ @Override
+ public PlcWriteRequest build() {
+ PlcWriteRequest innerPlcWriteRequest = innerBuilder.build();
+ return new PlcWriteRequest() {
+ @Override
+ public CompletableFuture<? extends PlcWriteResponse> execute() {
+ CompletableFuture<? extends PlcWriteResponse> future =
innerPlcWriteRequest.execute();
+ final CompletableFuture<PlcWriteResponse> responseFuture = new
CompletableFuture<>();
+ future.handle((plcWriteResponse, throwable) -> {
+ if (throwable == null) {
+ responseFuture.complete(plcWriteResponse);
+ } else {
+ // Mark the connection as invalid.
+ invalidateConnection = true;
+ responseFuture.completeExceptionally(throwable);
+ }
+ return null;
+ });
+ return responseFuture;
+ }
+
+ @Override
+ public int getNumberOfValues(String name) {
+ return innerPlcWriteRequest.getNumberOfValues(name);
+ }
+
+ @Override
+ public PlcValue getPlcValue(String name) {
+ return innerPlcWriteRequest.getPlcValue(name);
+ }
+
+ @Override
+ public int getNumberOfTags() {
+ return innerPlcWriteRequest.getNumberOfTags();
+ }
+
+ @Override
+ public LinkedHashSet<String> getTagNames() {
+ return innerPlcWriteRequest.getTagNames();
+ }
+
+ @Override
+ public PlcResponseCode getTagResponseCode(String tagName) {
+ return innerPlcWriteRequest.getTagResponseCode(tagName);
+ }
+
+ @Override
+ public PlcTag getTag(String name) {
+ return innerPlcWriteRequest.getTag(name);
+ }
+
+ @Override
+ public List<PlcTag> getTags() {
+ return innerPlcWriteRequest.getTags();
+ }
+ };
+ }
+
+ @Override
+ public PlcWriteRequest.Builder addTagAddress(String name, String
tagAddress, Object... values) {
+ return innerBuilder.addTagAddress(name, tagAddress, values);
+ }
+
+ @Override
+ public PlcWriteRequest.Builder addTag(String name, PlcTag tag, Object...
values) {
+ return innerBuilder.addTag(name, tag, values);
+ }
+ };
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder subscriptionRequestBuilder() {
+ PlcConnection plcConnection = connection.get();
+ if (plcConnection == null) {
+ throw new PlcRuntimeException("Error using leased connection after
returning it to the cache.");
+ }
+ final PlcSubscriptionRequest.Builder innerBuilder =
plcConnection.subscriptionRequestBuilder();
+ return new PlcSubscriptionRequest.Builder() {
+ @Override
+ public PlcSubscriptionRequest build() {
+ PlcSubscriptionRequest innerPlcSubscriptionRequest =
innerBuilder.build();
+ return new PlcSubscriptionRequest() {
+ @Override
+ public CompletableFuture<? extends PlcSubscriptionResponse>
execute() {
+ CompletableFuture<? extends PlcSubscriptionResponse> future =
innerPlcSubscriptionRequest.execute();
+ final CompletableFuture<PlcSubscriptionResponse> responseFuture =
new CompletableFuture<>();
+ future.handle((plcSubscriptionResponse, throwable) -> {
+ if (throwable == null) {
+ responseFuture.complete(plcSubscriptionResponse);
+ } else {
+ // Mark the connection as invalid.
+ invalidateConnection = true;
+ responseFuture.completeExceptionally(throwable);
+ }
+ return null;
+ });
+ return responseFuture;
+ }
+
+ @Override
+ public int getNumberOfTags() {
+ return innerPlcSubscriptionRequest.getNumberOfTags();
+ }
+
+ @Override
+ public LinkedHashSet<String> getTagNames() {
+ return innerPlcSubscriptionRequest.getTagNames();
+ }
+
+ @Override
+ public PlcSubscriptionTag getTag(String name) {
+ return innerPlcSubscriptionRequest.getTag(name);
+ }
+
+ @Override
+ public PlcResponseCode getTagResponseCode(String tagName) {
+ return innerPlcSubscriptionRequest.getTagResponseCode(tagName);
+ }
+
+ @Override
+ public List<PlcSubscriptionTag> getTags() {
+ return innerPlcSubscriptionRequest.getTags();
+ }
+
+ @Override
+ public Consumer<PlcSubscriptionEvent> getConsumer() {
+ return innerPlcSubscriptionRequest.getConsumer();
+ }
+
+ @Override
+ public Consumer<PlcSubscriptionEvent> getTagConsumer(String name) {
+ return innerPlcSubscriptionRequest.getTagConsumer(name);
+ }
+ };
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder
setConsumer(Consumer<PlcSubscriptionEvent> consumer) {
+ return innerBuilder.setConsumer(consumer);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addCyclicTagAddress(String name,
String tagAddress, Duration pollingInterval) {
+ return innerBuilder.addCyclicTagAddress(name, tagAddress,
pollingInterval);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addCyclicTagAddress(String name,
String tagAddress, Duration pollingInterval, Consumer<PlcSubscriptionEvent>
consumer) {
+ return innerBuilder.addCyclicTagAddress(name, tagAddress,
pollingInterval, consumer);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addCyclicTag(String name, PlcTag
tag, Duration pollingInterval) {
+ return innerBuilder.addCyclicTag(name, tag, pollingInterval);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addCyclicTag(String name, PlcTag
tag, Duration pollingInterval, Consumer<PlcSubscriptionEvent> consumer) {
+ return innerBuilder.addCyclicTag(name, tag, pollingInterval, consumer);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String
name, String tagAddress) {
+ return innerBuilder.addChangeOfStateTagAddress(name, tagAddress);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addChangeOfStateTagAddress(String
name, String tagAddress, Consumer<PlcSubscriptionEvent> consumer) {
+ return innerBuilder.addChangeOfStateTagAddress(name, tagAddress,
consumer);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name,
PlcTag tag) {
+ return innerBuilder.addChangeOfStateTag(name, tag);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addChangeOfStateTag(String name,
PlcTag tag, Consumer<PlcSubscriptionEvent> consumer) {
+ return innerBuilder.addChangeOfStateTag(name, tag, consumer);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addEventTagAddress(String name,
String tagAddress) {
+ return innerBuilder.addEventTagAddress(name, tagAddress);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addEventTagAddress(String name,
String tagAddress, Consumer<PlcSubscriptionEvent> consumer) {
+ return innerBuilder.addEventTagAddress(name, tagAddress, consumer);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addEventTag(String name, PlcTag
tag) {
+ return innerBuilder.addEventTag(name, tag);
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder addEventTag(String name, PlcTag
tag, Consumer<PlcSubscriptionEvent> consumer) {
+ return innerBuilder.addEventTag(name, tag, consumer);
+ }
+ };
+ }
+
+ @Override
+ public PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder() {
+ PlcConnection plcConnection = connection.get();
+ if (plcConnection == null) {
+ throw new PlcRuntimeException("Error using leased connection after
returning it to the cache.");
+ }
+ final PlcUnsubscriptionRequest.Builder innerBuilder =
plcConnection.unsubscriptionRequestBuilder();
+ return new PlcUnsubscriptionRequest.Builder() {
+ @Override
+ public PlcUnsubscriptionRequest build() {
+ PlcUnsubscriptionRequest innerPlcUnsubscriptionRequest =
innerBuilder.build();
+ return new PlcUnsubscriptionRequest() {
+ @Override
+ public CompletableFuture<PlcUnsubscriptionResponse> execute() {
+ CompletableFuture<? extends PlcUnsubscriptionResponse> future =
innerPlcUnsubscriptionRequest.execute();
+ final CompletableFuture<PlcUnsubscriptionResponse> responseFuture
= new CompletableFuture<>();
+ future.handle((plcUnsubscriptionResponse, throwable) -> {
+ if (throwable == null) {
+ responseFuture.complete(plcUnsubscriptionResponse);
+ } else {
+ // Mark the connection as invalid.
+ invalidateConnection = true;
+ responseFuture.completeExceptionally(throwable);
+ }
+ return null;
+ });
+ return responseFuture;
+ }
+
+ @Override
+ public List<PlcSubscriptionHandle> getSubscriptionHandles() {
+ return innerPlcUnsubscriptionRequest.getSubscriptionHandles();
+ }
+ };
+ }
+
+ @Override
+ public PlcUnsubscriptionRequest.Builder addHandles(PlcSubscriptionHandle
plcSubscriptionHandle) {
+ return innerBuilder.addHandles(plcSubscriptionHandle);
+ }
+
+ @Override
+ public PlcUnsubscriptionRequest.Builder addHandles(PlcSubscriptionHandle
plcSubscriptionHandle1, PlcSubscriptionHandle... plcSubscriptionHandles) {
+ return innerBuilder.addHandles(plcSubscriptionHandle1,
plcSubscriptionHandles);
+ }
+
+ @Override
+ public PlcUnsubscriptionRequest.Builder
addHandles(Collection<PlcSubscriptionHandle> plcSubscriptionHandle) {
+ return innerBuilder.addHandles(plcSubscriptionHandle);
+ }
+ };
+ }
+
+ @Override
+ public PlcBrowseRequest.Builder browseRequestBuilder() {
+ PlcConnection plcConnection = connection.get();
+ if (plcConnection == null) {
+ throw new PlcRuntimeException("Error using leased connection after
returning it to the cache.");
+ }
+ final PlcBrowseRequest.Builder innerBuilder =
plcConnection.browseRequestBuilder();
+ return new PlcBrowseRequest.Builder() {
+ @Override
+ public PlcBrowseRequest build() {
+ PlcBrowseRequest innerPlcBrowseRequest = innerBuilder.build();
+ return new PlcBrowseRequest() {
+ @Override
+ public CompletableFuture<? extends PlcBrowseResponse> execute() {
+ CompletableFuture<? extends PlcBrowseResponse> future =
innerPlcBrowseRequest.execute();
+ final CompletableFuture<PlcBrowseResponse> responseFuture = new
CompletableFuture<>();
+ future.handle((plcBrowseResponse, throwable) -> {
+ if (throwable == null) {
+ responseFuture.complete(plcBrowseResponse);
+ } else {
+ // Mark the connection as invalid.
+ invalidateConnection = true;
+ responseFuture.completeExceptionally(throwable);
+ }
+ return null;
+ });
+ return responseFuture;
+ }
+
+ @Override
+ public CompletableFuture<? extends PlcBrowseResponse>
executeWithInterceptor(PlcBrowseRequestInterceptor interceptor) {
+ CompletableFuture<? extends PlcBrowseResponse> future =
innerPlcBrowseRequest.executeWithInterceptor(interceptor);
+ final CompletableFuture<PlcBrowseResponse> responseFuture = new
CompletableFuture<>();
+ future.handle((plcBrowseResponse, throwable) -> {
+ if (throwable == null) {
+ responseFuture.complete(plcBrowseResponse);
+ } else {
+ // Mark the connection as invalid.
+ invalidateConnection = true;
+ responseFuture.completeExceptionally(throwable);
+ }
+ return null;
+ });
+ return responseFuture;
+ }
+
+ @Override
+ public LinkedHashSet<String> getQueryNames() {
+ return innerPlcBrowseRequest.getQueryNames();
+ }
+
+ @Override
+ public PlcQuery getQuery(String name) {
+ return innerPlcBrowseRequest.getQuery(name);
+ }
+ };
+ }
+
+ @Override
+ public PlcBrowseRequest.Builder addQuery(String name, String query) {
+ return innerBuilder.addQuery(name, query);
+ }
+ };
+ }
+
+ @Override
+ public void addEventListener(EventListener listener) {
+ connectionContainer.addEventListener(listener);
+ }
+
+ @Override
+ public void removeEventListener(EventListener listener) {
+ connectionContainer.removeEventListener(listener);
+ }
+
+}
diff --git
a/streampipes-extensions/streampipes-connectors-plc/src/test/java/org/apache/streampipes/extensions/connectors/plc/adapter/ConnectionContainerReproTest.java
b/streampipes-extensions/streampipes-connectors-plc/src/test/java/org/apache/streampipes/extensions/connectors/plc/adapter/ConnectionContainerReproTest.java
new file mode 100644
index 0000000000..8dfc646e06
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-plc/src/test/java/org/apache/streampipes/extensions/connectors/plc/adapter/ConnectionContainerReproTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.plc.adapter;
+
+import
org.apache.streampipes.extensions.connectors.plc.cache.SpConnectionContainer;
+import
org.apache.streampipes.extensions.connectors.plc.cache.SpLeasedPlcConnection;
+
+import org.apache.plc4x.java.api.PlcConnection;
+import org.apache.plc4x.java.api.PlcConnectionManager;
+import org.apache.plc4x.java.api.authentication.PlcAuthentication;
+import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
+import org.apache.plc4x.java.api.messages.PlcBrowseRequest;
+import org.apache.plc4x.java.api.messages.PlcPingResponse;
+import org.apache.plc4x.java.api.messages.PlcReadRequest;
+import org.apache.plc4x.java.api.messages.PlcSubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcUnsubscriptionRequest;
+import org.apache.plc4x.java.api.messages.PlcWriteRequest;
+import org.apache.plc4x.java.api.metadata.PlcConnectionMetadata;
+import org.apache.plc4x.java.api.model.PlcTag;
+import org.apache.plc4x.java.api.value.PlcValue;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class ConnectionContainerReproTest {
+
+ static class FlakyManager implements PlcConnectionManager {
+ final AtomicInteger calls = new AtomicInteger();
+ final PlcConnection c1 = new DummyConnection();
+ final PlcConnection c2 = new DummyConnection();
+
+ @Override
+ public PlcConnection getConnection(String url) throws
PlcConnectionException {
+ int n = calls.getAndIncrement();
+ if (n == 0) return c1; // initial success
+ if (n == 1) throw new PlcConnectionException("PLC down"); // reconnect
fails once
+ return c2; // would succeed later
+ }
+
+ @Override
+ public PlcConnection getConnection(String s, PlcAuthentication
plcAuthentication) throws PlcConnectionException {
+ return null;
+ }
+ }
+
+ static class DummyConnection implements PlcConnection {
+ @Override
+ public void connect() {
+ }
+
+ @Override
+ public boolean isConnected() {
+ return true;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Optional<PlcTag> parseTagAddress(String s) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<PlcValue> parseTagValue(PlcTag plcTag, Object... objects) {
+ return Optional.empty();
+ }
+
+ @Override
+ public PlcConnectionMetadata getMetadata() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<? extends PlcPingResponse> ping() {
+ return null;
+ }
+
+ @Override
+ public PlcReadRequest.Builder readRequestBuilder() {
+ return null;
+ }
+
+ @Override
+ public PlcWriteRequest.Builder writeRequestBuilder() {
+ return null;
+ }
+
+ @Override
+ public PlcSubscriptionRequest.Builder subscriptionRequestBuilder() {
+ return null;
+ }
+
+ @Override
+ public PlcUnsubscriptionRequest.Builder unsubscriptionRequestBuilder() {
+ return null;
+ }
+
+ @Override
+ public PlcBrowseRequest.Builder browseRequestBuilder() {
+ return null;
+ }
+ // implement other methods as no-ops if your interface requires them
+ }
+
+ @Test
+ void recoversAfterFailedReconnectAndServesNewLeases() throws Exception {
+ FlakyManager mgr = new FlakyManager();
+ SpConnectionContainer cc = new SpConnectionContainer(
+ mgr, "mock://plc",
+ Duration.ofSeconds(30), Duration.ofSeconds(30),
+ url -> null // closeConnectionHandler
+ );
+
+ // 1) First caller gets a lease immediately.
+ SpLeasedPlcConnection lease1 =
+ (SpLeasedPlcConnection) cc.lease().get(500, TimeUnit.MILLISECONDS);
+
+ // 2) Second caller queues up (does not complete yet).
+ Future<PlcConnection> queued = cc.lease();
+
+ // 3) Return with invalidate=true while reconnect will THROW.
+ cc.returnConnection(lease1, true);
+
+ // 3a) The queued future should have been completed exceptionally (queue
drained).
+ assertThrows(ExecutionException.class, () -> queued.get(200,
TimeUnit.MILLISECONDS));
+
+ // 4) Now a new lease should succeed quickly (manager will succeed on next
call).
+ PlcConnection lease2 = cc.lease().get(500, TimeUnit.MILLISECONDS);
+ assertNotNull(lease2, "Expected a fresh lease after recovery");
+ assertEquals(3, mgr.calls.get(), "Expected 3 getConnection() calls:
success, fail, success");
+
+ // 5) Return normally and ensure subsequent leasing still works.
+ cc.returnConnection((SpLeasedPlcConnection) lease2, false);
+ PlcConnection lease3 = cc.lease().get(500, TimeUnit.MILLISECONDS);
+ assertNotNull(lease3);
+ }
+}