This is an automated email from the ASF dual-hosted git repository. riemer pushed a commit to branch improve-pull-adapter-plc in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit 4618300f8212930724ab82791f0fec959526f355 Author: Dominik Riemer <[email protected]> AuthorDate: Thu Apr 3 21:19:39 2025 +0200 feat: Add configurable wait and lease time to PLC connectors --- .../apache/streampipes/commons/constants/Envs.java | 6 ++- .../commons/environment/DefaultEnvironment.java | 10 +++++ .../commons/environment/Environment.java | 3 ++ .../management/connect/PullAdapterScheduler.java | 49 ++++++++++++---------- .../connectors/plc/PlcConnectorsModuleExport.java | 5 +++ .../connection/ContinuousPlcRequestReader.java | 3 +- 6 files changed, 53 insertions(+), 23 deletions(-) diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java index 2339475adb..19123ced95 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java @@ -133,7 +133,11 @@ public enum Envs { "/streampipes-security/truststore.pfx"), SP_SECURITY_TRUSTSTORE_PASSWORD("SP_SECURITY_TRUSTSTORE_PASSWORD", ""), SP_SECURITY_TRUSTSTORE_TYPE("SP_SECURITY_TRUSTSTORE_TYPE", "PKCS12"), - SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false"); + SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false"), + + // PLC4X connection cache + SP_PLC4X_CONN_MAX_WAIT_TIME_MS("SP_PLC4X_CONN_MAX_WAIT_TIME_MS", "20000"), + SP_PLC4X_CONN_MAX_LEASE_TIME_MS("SP_PLC4X_CONN_MAX_LEASE_TIME_MS", "4000"); private final String envVariableName; private String defaultValue; diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java index 3434924e3e..473f625562 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java @@ -397,4 +397,14 @@ public class DefaultEnvironment implements Environment { public BooleanEnvironmentVariable getAllowSelfSignedCertificates() { return new BooleanEnvironmentVariable(Envs.SP_SECURITY_ALLOW_SELFSIGNED); } + + @Override + public IntEnvironmentVariable getPlc4xMaxWaitTimeMs() { + return new IntEnvironmentVariable(Envs.SP_PLC4X_CONN_MAX_WAIT_TIME_MS); + } + + @Override + public IntEnvironmentVariable getPlc4xMaxLeaseTimeMs() { + return new IntEnvironmentVariable(Envs.SP_PLC4X_CONN_MAX_LEASE_TIME_MS); + } } diff --git a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java index d1c4adf6ae..187a03587d 100644 --- a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java +++ b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java @@ -155,4 +155,7 @@ public interface Environment { StringEnvironmentVariable getTruststorePassword(); StringEnvironmentVariable getTruststoreType(); BooleanEnvironmentVariable getAllowSelfSignedCertificates(); + + IntEnvironmentVariable getPlc4xMaxWaitTimeMs(); + IntEnvironmentVariable getPlc4xMaxLeaseTimeMs(); } diff --git a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java index 7795d40bfb..0daec7c47b 100644 --- a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java +++ b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java @@ -29,46 +29,53 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class PullAdapterScheduler { protected static final Logger LOG = LoggerFactory.getLogger(PullAdapterScheduler.class); - private ScheduledExecutorService scheduler; + private final ScheduledExecutorService scheduler; + + public PullAdapterScheduler() { + this.scheduler = Executors.newSingleThreadScheduledExecutor(); + } public void schedule(IPullAdapter pullAdapter, String adapterElementId) { - scheduler = Executors.newSingleThreadScheduledExecutor(); - final Runnable task = new Runnable() { - @Override - public void run() { - try { - pullAdapter.pullData(); - } catch (ExecutionException | InterruptedException | TimeoutException e) { - LOG.error("Error while pulling data", e); - SpMonitoringManager.INSTANCE.addErrorMessage( - adapterElementId, - SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(e)) - ); - } finally { - scheduler.schedule( - this, - pullAdapter.getPollingInterval().value(), - pullAdapter.getPollingInterval().timeUnit() - ); + final Runnable task = () -> { + try { + pullAdapter.pullData(); + } catch (ExecutionException | InterruptedException | TimeoutException e) { + LOG.error("Error while pulling data", e); + SpMonitoringManager.INSTANCE.addErrorMessage( + adapterElementId, + SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(e)) + ); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); } } }; - scheduler.schedule( + scheduler.scheduleWithFixedDelay( task, + 0, pullAdapter.getPollingInterval().value(), pullAdapter.getPollingInterval().timeUnit() ); } public void shutdown() { - scheduler.shutdownNow(); + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(2, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + Thread.currentThread().interrupt(); + } } } diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java index a62d2a84c5..59db43c181 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java @@ -18,6 +18,7 @@ package org.apache.streampipes.extensions.connectors.plc; +import org.apache.streampipes.commons.environment.Environments; import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter; import org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport; import org.apache.streampipes.extensions.api.migration.IModelMigrator; @@ -31,6 +32,7 @@ import org.apache.streampipes.extensions.connectors.plc.adapter.s7.Plc4xS7Adapte import org.apache.plc4x.java.api.PlcDriverManager; import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -39,9 +41,12 @@ public class PlcConnectorsModuleExport implements IExtensionModuleExport { @Override public List<StreamPipesAdapter> adapters() { + var env = Environments.getEnvironment(); var driverManager = PlcDriverManager.getDefault(); var cachedConnectionManager = CachedPlcConnectionManager .getBuilder(driverManager.getConnectionManager()) + .withMaxWaitTime(Duration.ofMillis(env.getPlc4xMaxWaitTimeMs().getValueOrDefault())) + .withMaxLeaseTime(Duration.ofMillis(env.getPlc4xMaxLeaseTimeMs().getValueOrDefault())) .build(); var adapters = new ArrayList<>(List.of( new Plc4xModbusAdapter(cachedConnectionManager), diff --git a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java index 80086d20d2..b75cafc7bd 100644 --- a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java +++ b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java @@ -59,7 +59,8 @@ public class ContinuousPlcRequestReader if (connectionManager instanceof CachedPlcConnectionManager) { ((CachedPlcConnectionManager) connectionManager).removeCachedConnection(settings.connectionString()); } - LOG.error("Error while reading from PLC with connection string {} ", settings.connectionString(), e); + LOG.error("Error while reading from PLC with connection string {}: {} ", + settings.connectionString(), e.getMessage()); } }
