This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 4023061a69 feat: Add configurable wait and lease time to PLC
connectors (#3553)
4023061a69 is described below
commit 4023061a695cff835e93484c323f56a8152672f9
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Apr 3 21:53:40 2025 +0200
feat: Add configurable wait and lease time to PLC connectors (#3553)
---
.../apache/streampipes/commons/constants/Envs.java | 6 ++-
.../commons/environment/DefaultEnvironment.java | 10 +++++
.../commons/environment/Environment.java | 3 ++
.../management/connect/PullAdapterScheduler.java | 49 ++++++++++++----------
.../connectors/plc/PlcConnectorsModuleExport.java | 5 +++
.../connection/ContinuousPlcRequestReader.java | 3 +-
6 files changed, 53 insertions(+), 23 deletions(-)
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 2339475adb..19123ced95 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -133,7 +133,11 @@ public enum Envs {
"/streampipes-security/truststore.pfx"),
SP_SECURITY_TRUSTSTORE_PASSWORD("SP_SECURITY_TRUSTSTORE_PASSWORD", ""),
SP_SECURITY_TRUSTSTORE_TYPE("SP_SECURITY_TRUSTSTORE_TYPE", "PKCS12"),
- SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false");
+ SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false"),
+
+ // PLC4X connection cache
+ SP_PLC4X_CONN_MAX_WAIT_TIME_MS("SP_PLC4X_CONN_MAX_WAIT_TIME_MS", "20000"),
+ SP_PLC4X_CONN_MAX_LEASE_TIME_MS("SP_PLC4X_CONN_MAX_LEASE_TIME_MS", "4000");
private final String envVariableName;
private String defaultValue;
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 3434924e3e..473f625562 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -397,4 +397,14 @@ public class DefaultEnvironment implements Environment {
public BooleanEnvironmentVariable getAllowSelfSignedCertificates() {
return new BooleanEnvironmentVariable(Envs.SP_SECURITY_ALLOW_SELFSIGNED);
}
+
+ @Override
+ public IntEnvironmentVariable getPlc4xMaxWaitTimeMs() {
+ return new IntEnvironmentVariable(Envs.SP_PLC4X_CONN_MAX_WAIT_TIME_MS);
+ }
+
+ @Override
+ public IntEnvironmentVariable getPlc4xMaxLeaseTimeMs() {
+ return new IntEnvironmentVariable(Envs.SP_PLC4X_CONN_MAX_LEASE_TIME_MS);
+ }
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index d1c4adf6ae..187a03587d 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -155,4 +155,7 @@ public interface Environment {
StringEnvironmentVariable getTruststorePassword();
StringEnvironmentVariable getTruststoreType();
BooleanEnvironmentVariable getAllowSelfSignedCertificates();
+
+ IntEnvironmentVariable getPlc4xMaxWaitTimeMs();
+ IntEnvironmentVariable getPlc4xMaxLeaseTimeMs();
}
diff --git
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
index 7795d40bfb..0daec7c47b 100644
---
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
+++
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
@@ -29,46 +29,53 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class PullAdapterScheduler {
protected static final Logger LOG =
LoggerFactory.getLogger(PullAdapterScheduler.class);
- private ScheduledExecutorService scheduler;
+ private final ScheduledExecutorService scheduler;
+
+ public PullAdapterScheduler() {
+ this.scheduler = Executors.newSingleThreadScheduledExecutor();
+ }
public void schedule(IPullAdapter pullAdapter,
String adapterElementId) {
- scheduler = Executors.newSingleThreadScheduledExecutor();
- final Runnable task = new Runnable() {
- @Override
- public void run() {
- try {
- pullAdapter.pullData();
- } catch (ExecutionException | InterruptedException | TimeoutException
e) {
- LOG.error("Error while pulling data", e);
- SpMonitoringManager.INSTANCE.addErrorMessage(
- adapterElementId,
- SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(e))
- );
- } finally {
- scheduler.schedule(
- this,
- pullAdapter.getPollingInterval().value(),
- pullAdapter.getPollingInterval().timeUnit()
- );
+ final Runnable task = () -> {
+ try {
+ pullAdapter.pullData();
+ } catch (ExecutionException | InterruptedException | TimeoutException e)
{
+ LOG.error("Error while pulling data", e);
+ SpMonitoringManager.INSTANCE.addErrorMessage(
+ adapterElementId,
+ SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(e))
+ );
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
}
}
};
- scheduler.schedule(
+ scheduler.scheduleWithFixedDelay(
task,
+ 0,
pullAdapter.getPollingInterval().value(),
pullAdapter.getPollingInterval().timeUnit()
);
}
public void shutdown() {
- scheduler.shutdownNow();
+ scheduler.shutdown();
+ try {
+ if (!scheduler.awaitTermination(2, TimeUnit.SECONDS)) {
+ scheduler.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ scheduler.shutdownNow();
+ Thread.currentThread().interrupt();
+ }
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
index a62d2a84c5..59db43c181 100644
---
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
+++
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.extensions.connectors.plc;
+import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
import org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
import org.apache.streampipes.extensions.api.migration.IModelMigrator;
@@ -31,6 +32,7 @@ import
org.apache.streampipes.extensions.connectors.plc.adapter.s7.Plc4xS7Adapte
import org.apache.plc4x.java.api.PlcDriverManager;
import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -39,9 +41,12 @@ public class PlcConnectorsModuleExport implements
IExtensionModuleExport {
@Override
public List<StreamPipesAdapter> adapters() {
+ var env = Environments.getEnvironment();
var driverManager = PlcDriverManager.getDefault();
var cachedConnectionManager = CachedPlcConnectionManager
.getBuilder(driverManager.getConnectionManager())
+
.withMaxWaitTime(Duration.ofMillis(env.getPlc4xMaxWaitTimeMs().getValueOrDefault()))
+
.withMaxLeaseTime(Duration.ofMillis(env.getPlc4xMaxLeaseTimeMs().getValueOrDefault()))
.build();
var adapters = new ArrayList<>(List.of(
new Plc4xModbusAdapter(cachedConnectionManager),
diff --git
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
index 80086d20d2..b75cafc7bd 100644
---
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
+++
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
@@ -59,7 +59,8 @@ public class ContinuousPlcRequestReader
if (connectionManager instanceof CachedPlcConnectionManager) {
((CachedPlcConnectionManager)
connectionManager).removeCachedConnection(settings.connectionString());
}
- LOG.error("Error while reading from PLC with connection string {} ",
settings.connectionString(), e);
+ LOG.error("Error while reading from PLC with connection string {}: {} ",
+ settings.connectionString(), e.getMessage());
}
}