This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch improve-pull-adapter-plc
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 4618300f8212930724ab82791f0fec959526f355
Author: Dominik Riemer <[email protected]>
AuthorDate: Thu Apr 3 21:19:39 2025 +0200

    feat: Add configurable wait and lease time to PLC connectors
---
 .../apache/streampipes/commons/constants/Envs.java |  6 ++-
 .../commons/environment/DefaultEnvironment.java    | 10 +++++
 .../commons/environment/Environment.java           |  3 ++
 .../management/connect/PullAdapterScheduler.java   | 49 ++++++++++++----------
 .../connectors/plc/PlcConnectorsModuleExport.java  |  5 +++
 .../connection/ContinuousPlcRequestReader.java     |  3 +-
 6 files changed, 53 insertions(+), 23 deletions(-)

diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 2339475adb..19123ced95 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -133,7 +133,11 @@ public enum Envs {
       "/streampipes-security/truststore.pfx"),
   SP_SECURITY_TRUSTSTORE_PASSWORD("SP_SECURITY_TRUSTSTORE_PASSWORD", ""),
   SP_SECURITY_TRUSTSTORE_TYPE("SP_SECURITY_TRUSTSTORE_TYPE", "PKCS12"),
-  SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false");
+  SP_SECURITY_ALLOW_SELFSIGNED("SP_SECURITY_ALLOW_SELFSIGNED", "false"),
+
+  // PLC4X connection cache
+  SP_PLC4X_CONN_MAX_WAIT_TIME_MS("SP_PLC4X_CONN_MAX_WAIT_TIME_MS", "20000"),
+  SP_PLC4X_CONN_MAX_LEASE_TIME_MS("SP_PLC4X_CONN_MAX_LEASE_TIME_MS", "4000");
 
   private final String envVariableName;
   private String defaultValue;
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 3434924e3e..473f625562 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -397,4 +397,14 @@ public class DefaultEnvironment implements Environment {
   public BooleanEnvironmentVariable getAllowSelfSignedCertificates() {
     return new BooleanEnvironmentVariable(Envs.SP_SECURITY_ALLOW_SELFSIGNED);
   }
+
+  @Override
+  public IntEnvironmentVariable getPlc4xMaxWaitTimeMs() {
+    return new IntEnvironmentVariable(Envs.SP_PLC4X_CONN_MAX_WAIT_TIME_MS);
+  }
+
+  @Override
+  public IntEnvironmentVariable getPlc4xMaxLeaseTimeMs() {
+    return new IntEnvironmentVariable(Envs.SP_PLC4X_CONN_MAX_LEASE_TIME_MS);
+  }
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index d1c4adf6ae..187a03587d 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -155,4 +155,7 @@ public interface Environment {
   StringEnvironmentVariable getTruststorePassword();
   StringEnvironmentVariable getTruststoreType();
   BooleanEnvironmentVariable getAllowSelfSignedCertificates();
+
+  IntEnvironmentVariable getPlc4xMaxWaitTimeMs();
+  IntEnvironmentVariable getPlc4xMaxLeaseTimeMs();
 }
diff --git 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
index 7795d40bfb..0daec7c47b 100644
--- 
a/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
+++ 
b/streampipes-extensions-management/src/main/java/org/apache/streampipes/extensions/management/connect/PullAdapterScheduler.java
@@ -29,46 +29,53 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 public class PullAdapterScheduler {
 
   protected static final Logger LOG = 
LoggerFactory.getLogger(PullAdapterScheduler.class);
 
-  private ScheduledExecutorService scheduler;
+  private final ScheduledExecutorService scheduler;
+
+  public PullAdapterScheduler() {
+    this.scheduler = Executors.newSingleThreadScheduledExecutor();
+  }
 
   public void schedule(IPullAdapter pullAdapter,
                        String adapterElementId) {
-    scheduler = Executors.newSingleThreadScheduledExecutor();
-    final Runnable task = new Runnable() {
-      @Override
-      public void run() {
-        try {
-          pullAdapter.pullData();
-        } catch (ExecutionException | InterruptedException | TimeoutException 
e) {
-          LOG.error("Error while pulling data", e);
-          SpMonitoringManager.INSTANCE.addErrorMessage(
-              adapterElementId,
-              SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(e))
-          );
-        } finally {
-          scheduler.schedule(
-              this,
-              pullAdapter.getPollingInterval().value(),
-              pullAdapter.getPollingInterval().timeUnit()
-          );
+    final Runnable task = () -> {
+      try {
+        pullAdapter.pullData();
+      } catch (ExecutionException | InterruptedException | TimeoutException e) 
{
+        LOG.error("Error while pulling data", e);
+        SpMonitoringManager.INSTANCE.addErrorMessage(
+            adapterElementId,
+            SpLogEntry.from(System.currentTimeMillis(), SpLogMessage.from(e))
+        );
+        if (e instanceof InterruptedException) {
+          Thread.currentThread().interrupt();
         }
       }
     };
 
-    scheduler.schedule(
+    scheduler.scheduleWithFixedDelay(
         task,
+        0,
         pullAdapter.getPollingInterval().value(),
         pullAdapter.getPollingInterval().timeUnit()
     );
   }
 
   public void shutdown() {
-    scheduler.shutdownNow();
+    scheduler.shutdown();
+    try {
+      if (!scheduler.awaitTermination(2, TimeUnit.SECONDS)) {
+        scheduler.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      scheduler.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
index a62d2a84c5..59db43c181 100644
--- 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
+++ 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/PlcConnectorsModuleExport.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.extensions.connectors.plc;
 
+import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
 import org.apache.streampipes.extensions.api.declarer.IExtensionModuleExport;
 import org.apache.streampipes.extensions.api.migration.IModelMigrator;
@@ -31,6 +32,7 @@ import 
org.apache.streampipes.extensions.connectors.plc.adapter.s7.Plc4xS7Adapte
 import org.apache.plc4x.java.api.PlcDriverManager;
 import org.apache.plc4x.java.utils.cache.CachedPlcConnectionManager;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -39,9 +41,12 @@ public class PlcConnectorsModuleExport implements 
IExtensionModuleExport {
 
   @Override
   public List<StreamPipesAdapter> adapters() {
+    var env = Environments.getEnvironment();
     var driverManager = PlcDriverManager.getDefault();
     var cachedConnectionManager =  CachedPlcConnectionManager
         .getBuilder(driverManager.getConnectionManager())
+        
.withMaxWaitTime(Duration.ofMillis(env.getPlc4xMaxWaitTimeMs().getValueOrDefault()))
+        
.withMaxLeaseTime(Duration.ofMillis(env.getPlc4xMaxLeaseTimeMs().getValueOrDefault()))
         .build();
     var adapters = new ArrayList<>(List.of(
         new Plc4xModbusAdapter(cachedConnectionManager),
diff --git 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
index 80086d20d2..b75cafc7bd 100644
--- 
a/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
+++ 
b/streampipes-extensions/streampipes-connectors-plc/src/main/java/org/apache/streampipes/extensions/connectors/plc/adapter/generic/connection/ContinuousPlcRequestReader.java
@@ -59,7 +59,8 @@ public class ContinuousPlcRequestReader
       if (connectionManager instanceof CachedPlcConnectionManager) {
         ((CachedPlcConnectionManager) 
connectionManager).removeCachedConnection(settings.connectionString());
       }
-      LOG.error("Error while reading from PLC with connection string {} ", 
settings.connectionString(), e);
+      LOG.error("Error while reading from PLC with connection string {}: {} ",
+          settings.connectionString(), e.getMessage());
     }
   }
 

Reply via email to