This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new b5567617ad fix(#4095): Add Email notification for expired opc ua 
certificates (#4100)
b5567617ad is described below

commit b5567617ad2b133ee8bcc2fa47bb3459ab0f45b1
Author: Philipp Zehnder <[email protected]>
AuthorDate: Fri Jan 16 10:32:43 2026 +0100

    fix(#4095): Add Email notification for expired opc ua certificates (#4100)
    
    Co-authored-by: Dominik Riemer <[email protected]>
---
 .../apache/streampipes/commons/constants/Envs.java |   4 +
 .../commons/environment/DefaultEnvironment.java    |  11 ++
 .../commons/environment/Environment.java           |   4 +
 .../model/opcua/CertificateBuilder.java            |   4 +
 .../service/core/scheduler/DataLakeScheduler.java  |   8 +-
 .../CertificateExpiryEmailComposer.java            |  85 ++++++++++++
 .../CertificateExpiryEmailScheduler.java           | 148 +++++++++++++++++++++
 .../certificates/ExpiringCertificateFinder.java    | 125 +++++++++++++++++
 .../CertificateExpiryEmailComposerTest.java        |  91 +++++++++++++
 .../ExpiringCertificateFinderTest.java             | 136 +++++++++++++++++++
 10 files changed, 612 insertions(+), 4 deletions(-)

diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 990629b0f1..c1b04d1193 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -158,6 +158,10 @@ public enum Envs {
   // *") //Cron Job in Dev Setting; Running every 2 min
   SP_RETENTION_LOG_LENGTH("SP_RETENTION_LOG_LENGTH", "10"),
 
+  SP_CERTIFICATE_EXPIRY_CRON("SP_CERTIFICATE_EXPIRY_CRON", "0 2 0 * * *"),
+  SP_CERTIFICATE_EXPIRY_EMAIL_DAYS("SP_CERTIFICATE_EXPIRY_EMAIL_DAYS", null),
+
+
   // Logging
   SP_LOGGING_FILE_ENABLED("SP_LOGGING_FILE_ENABLED", "false"),
   SP_LOGGING_CONSOLE_ENABLED("SP_LOGGING_CONSOLE_ENABLED", "true"),
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 9f5a2d997a..04d790fee9 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -596,4 +596,15 @@ public class DefaultEnvironment implements Environment {
   public IntEnvironmentVariable getDatalakeRetentionLogLength() {
     return new IntEnvironmentVariable(Envs.SP_RETENTION_LOG_LENGTH);
   }
+
+  @Override
+  public StringEnvironmentVariable getCertificateExpiryCron() {
+    return new StringEnvironmentVariable(Envs.SP_CERTIFICATE_EXPIRY_CRON);
+  }
+
+  @Override
+  public StringEnvironmentVariable getCertificateExpiryEmailDays() {
+    return new 
StringEnvironmentVariable(Envs.SP_CERTIFICATE_EXPIRY_EMAIL_DAYS);
+  }
+
 }
diff --git 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index 30567d9214..3f4b6c5291 100644
--- 
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++ 
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -232,6 +232,10 @@ public interface Environment {
 
   BooleanEnvironmentVariable getLoadManagerEnable();
 
+  // Certificate expiration email reminder
+  StringEnvironmentVariable getCertificateExpiryCron();
+  StringEnvironmentVariable getCertificateExpiryEmailDays();
+
   //SpRateLimiter
   LongEnvironmentVariable getRateLimiterDefaultWarmupPeriod();
   IntEnvironmentVariable getRateLimiterSchedulerInitialDelaySeconds();
diff --git 
a/streampipes-model/src/main/java/org/apache/streampipes/model/opcua/CertificateBuilder.java
 
b/streampipes-model/src/main/java/org/apache/streampipes/model/opcua/CertificateBuilder.java
index 191ff612dc..fc784eeb4c 100644
--- 
a/streampipes-model/src/main/java/org/apache/streampipes/model/opcua/CertificateBuilder.java
+++ 
b/streampipes-model/src/main/java/org/apache/streampipes/model/opcua/CertificateBuilder.java
@@ -113,6 +113,10 @@ public final class CertificateBuilder {
     return cert;
   }
 
+  public static CertificateBuilder create() {
+    return new CertificateBuilder();
+  }
+
   public static Certificate fromX509(X509Certificate cert, CertificateState 
state) {
     Objects.requireNonNull(cert, "cert");
     var b = new CertificateBuilder();
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
index 581f886b63..229ea35eaf 100644
--- 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
@@ -35,7 +35,7 @@ import java.util.List;
 @Configuration
 public class DataLakeScheduler implements SchedulingConfigurer {
 
-    private static DataLakeExportManager dataLakeExportManager = new 
DataLakeExportManager();
+    private static final DataLakeExportManager dataLakeExportManager = new 
DataLakeExportManager();
     private static final Logger LOG = 
LoggerFactory.getLogger(DataLakeExportManager.class);
 
     private final IDataExplorerSchemaManagement dataExplorerSchemaManagement = 
new DataExplorerDispatcher()
@@ -43,8 +43,9 @@ public class DataLakeScheduler implements 
SchedulingConfigurer {
             .getSchemaManagement();
 
     public void cleanupMeasurements() {
+        LOG.info("Retention CRON Job triggered.");
         List<DataLakeMeasure> allMeasurements = 
this.dataExplorerSchemaManagement.getAllMeasurements();
-        LOG.info("GET ALL Measurements");
+        LOG.debug("GET ALL Measurements");
         for (DataLakeMeasure dataLakeMeasure : allMeasurements) {
             try {
                 
dataLakeExportManager.cleanupSingleMeasurement(dataLakeMeasure);
@@ -54,12 +55,12 @@ public class DataLakeScheduler implements 
SchedulingConfigurer {
             }
 
         }
+        LOG.info("Retention CRON Job finished.");
     }
 
     @Override
     public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
         var env = Environments.getEnvironment();
-        LOG.info("Retention CRON Job triggered.");
         taskRegistrar.addTriggerTask(
 
                 this::cleanupMeasurements,
@@ -69,7 +70,6 @@ public class DataLakeScheduler implements 
SchedulingConfigurer {
 
         );
 
-        LOG.info("Retention CRON Job finished.");
 
     }
 }
\ No newline at end of file
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailComposer.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailComposer.java
new file mode 100644
index 0000000000..ce9495c546
--- /dev/null
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailComposer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.scheduler.certificates;
+
+import org.apache.streampipes.model.opcua.Certificate;
+
+import java.util.List;
+import java.util.Map;
+
+public class CertificateExpiryEmailComposer {
+
+  public String composeMessage(Map<Integer, List<Certificate>> 
expiringCertificates) {
+    if (expiringCertificates == null || expiringCertificates.isEmpty()) {
+      return "";
+    }
+
+    StringBuilder html = new StringBuilder();
+
+    for (var entry : expiringCertificates.entrySet()) {
+      Integer days = entry.getKey();
+      List<Certificate> certs = entry.getValue();
+
+      if (certs == null || certs.isEmpty()) {
+        continue;
+      }
+
+      html.append("<table width=\"100%\" cellpadding=\"0\" cellspacing=\"0\" 
style=\"max-width:600px;\">")
+          .append("<tr>")
+          .append("<td bgcolor=\"#ffffff\" style=\"padding:24px; 
font-family:Helvetica,Arial,sans-serif; font-size:16px; line-height:24px;\">")
+          .append("<p style=\"margin:0 0 12px 0;\"><strong>")
+          .append("Following certificates expire in ")
+          .append(days)
+          .append(" days:")
+          .append("</strong></p>")
+          .append("<ul style=\"margin:0; padding-left:20px;\">");
+
+      for (Certificate cert : certs) {
+        String issuer = extractIssuer(cert);
+        html.append("<li style=\"margin-bottom:6px;\">")
+            .append(escapeHtml(issuer))
+            .append("</li>");
+      }
+
+      html.append("</ul>")
+          .append("</td>")
+          .append("</tr>")
+          .append("</table>");
+    }
+
+    return html.toString();
+  }
+
+  private String extractIssuer(Certificate cert) {
+    if (cert == null) {
+      return "unknown issuer";
+    }
+    String issuer = cert.getIssuerDn();
+    return (issuer == null || issuer.isBlank()) ? "unknown issuer" : issuer;
+  }
+
+  private String escapeHtml(String value) {
+    return value
+        .replace("&", "&amp;")
+        .replace("<", "&lt;")
+        .replace(">", "&gt;")
+        .replace("\"", "&quot;")
+        .replace("'", "&#39;");
+  }
+}
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailScheduler.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailScheduler.java
new file mode 100644
index 0000000000..42ad863df0
--- /dev/null
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailScheduler.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.scheduler.certificates;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.mail.MailSender;
+import org.apache.streampipes.model.client.user.DefaultRole;
+import org.apache.streampipes.model.client.user.Principal;
+import org.apache.streampipes.model.mail.SpEmail;
+import org.apache.streampipes.model.opcua.Certificate;
+import org.apache.streampipes.storage.api.IUserStorage;
+import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+import org.springframework.scheduling.support.CronTrigger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Configuration
+public class CertificateExpiryEmailScheduler implements SchedulingConfigurer {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CertificateExpiryEmailScheduler.class);
+
+  private static final String SUBJECT = "Upcoming certificate expirations — 
action required";
+
+  public void checkForExpiringCertificates() {
+
+    var certificateExpiryEmailDays = 
Environments.getEnvironment().getCertificateExpiryEmailDays().getValueOrDefault();
+    if (certificateExpiryEmailDays != null) {
+      executeIfEnvVariableIsConfigured(certificateExpiryEmailDays);
+    } else {
+      LOG.debug("No certificate expiry email notification configured.");
+    }
+  }
+
+  @Override
+  public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
+    var env = Environments.getEnvironment();
+    taskRegistrar.addTriggerTask(
+
+        this::checkForExpiringCertificates,
+        triggerContext -> new CronTrigger(env.getCertificateExpiryCron()
+                                             .getValueOrDefault())
+            .nextExecution(triggerContext)
+    );
+  }
+
+  private void executeIfEnvVariableIsConfigured(String 
certificateExpiryEmailDays) {
+    LOG.info("Certificate expiration email notification CRON Job triggered.");
+    var expirePeriodsInDays = 
parseCommaSeparatedIntegers(certificateExpiryEmailDays);
+
+    var expiringCertificates = getExpiringCertificates(expirePeriodsInDays);
+
+    if (checkIfEmailShouldBeSent(expiringCertificates)) {
+      var adminEmails = getEmailAddressesOfAdmins();
+
+      var message = new 
CertificateExpiryEmailComposer().composeMessage(expiringCertificates);
+      sendEmail(adminEmails, message);
+      LOG.info("Certificate expiration email notification email sent to all 
admins.");
+    }
+
+    LOG.info("Certificate expiration email notification CRON Job finished.");
+  }
+
+  private boolean checkIfEmailShouldBeSent(Map<Integer, List<Certificate>> 
expiringCertificates) {
+    return expiringCertificates.values()
+                               .stream()
+                               .anyMatch(list -> !list.isEmpty());
+  }
+
+  private List<String> getEmailAddressesOfAdmins() {
+    return getUserStorageAPI()
+        .getAllUserAccounts()
+        .stream()
+        .filter(u -> u.getRoles().contains(DefaultRole.ROLE_ADMIN.name()))
+        .map(Principal::getUsername)
+        .collect(Collectors.toList());
+
+  }
+
+  /**
+   * Returns a map with one list entry of certificates per requested period 
(days).
+   */
+  private Map<Integer, List<Certificate>> 
getExpiringCertificates(List<Integer> expirePeriodsInDays) {
+    return new ExpiringCertificateFinder()
+        .findCertificates(expirePeriodsInDays);
+  }
+
+  private void sendEmail(List<String> recipients, String message) {
+    var email = SpEmail.from(recipients, SUBJECT, message);
+    try {
+      new MailSender().sendEmail(email);
+    } catch (IOException e) {
+      LOG.error("Failed to send certificate expiry email to {}", recipients, 
e);
+    }
+
+  }
+
+  private List<Integer> parseCommaSeparatedIntegers(String csv) {
+    if (csv == null || csv.isBlank()) {
+      return List.of();
+    }
+
+    return Arrays.stream(csv.split(","))
+                 .map(String::trim)
+                 .filter(s -> !s.isEmpty())
+                 .flatMap(s -> {
+                   try {
+                     return Stream.of(Integer.parseInt(s));
+                   } catch (NumberFormatException e) {
+                     LOG.warn("Invalid integer in env variable 
SP_CERTIFICATE_EXPIRY_EMAIL_DAYS: '{}'", s);
+                     return Stream.empty();
+                   }
+                 })
+                 .toList();
+  }
+
+  private IUserStorage getUserStorageAPI() {
+    return CouchDbStorageManager.INSTANCE
+        .getUserStorageAPI();
+  }
+
+}
diff --git 
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/ExpiringCertificateFinder.java
 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/ExpiringCertificateFinder.java
new file mode 100644
index 0000000000..54b5131e03
--- /dev/null
+++ 
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/ExpiringCertificateFinder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.scheduler.certificates;
+
+import org.apache.streampipes.model.opcua.Certificate;
+import org.apache.streampipes.storage.api.CRUDStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.ChronoUnit;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class ExpiringCertificateFinder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExpiringCertificateFinder.class);
+
+  private static final DateTimeFormatter NOT_AFTER_FORMATTER =
+      DateTimeFormatter.ofPattern("EEE MMM dd HH:mm:ss zzz yyyy", 
Locale.ENGLISH);
+
+  private final CRUDStorage<Certificate> certificateStorage;
+  private final Clock clock;
+
+  public ExpiringCertificateFinder(CRUDStorage<Certificate> 
certificateStorage, Clock clock) {
+    this.certificateStorage = certificateStorage;
+    this.clock = clock;
+  }
+
+  public ExpiringCertificateFinder() {
+    this(
+        StorageDispatcher.INSTANCE.getNoSqlStore()
+                                  .getCertificateStorage(),
+        Clock.systemUTC()
+    );
+  }
+
+
+  /**
+   * Returns a map with one entry per requested period (days).
+   * Each value contains certificates whose notAfter timestamp falls within 
that UTC day.
+   * Invalid periods (null/negative) are ignored.
+   */
+  public Map<Integer, List<Certificate>> findCertificates(List<Integer> 
expirePeriods) {
+    if (expirePeriods == null || expirePeriods.isEmpty()) {
+      return Map.of();
+    }
+
+    var allCertificates = certificateStorage.findAll();
+
+    var now = clock.instant();
+
+    Map<Integer, List<Certificate>> result = new LinkedHashMap<>();
+
+    for (int days : expirePeriods) {
+      var windowStart = now.plus(days, ChronoUnit.DAYS)
+                           .truncatedTo(ChronoUnit.DAYS);
+      var windowEnd = windowStart.plus(1, ChronoUnit.DAYS);
+
+      result.put(days, filterCertificatesExpiringBetween(allCertificates, 
windowStart, windowEnd));
+    }
+
+    return result;
+  }
+
+  private List<Certificate> filterCertificatesExpiringBetween(
+      List<Certificate> certificates,
+      Instant windowStartInclusive,
+      Instant windowEndExclusive
+  ) {
+    if (certificates == null || certificates.isEmpty()) {
+      return List.of();
+    }
+
+    return certificates.stream()
+                       .filter(Objects::nonNull)
+                       .filter(c ->
+                                   parseNotAfterInstant(c).map(ts -> 
!ts.isBefore(windowStartInclusive) && ts.isBefore(windowEndExclusive))
+                           .orElse(false))
+                       .toList();
+  }
+
+  private Optional<Instant> parseNotAfterInstant(Certificate certificate) {
+    String notAfter = certificate.getNotAfter();
+    if (notAfter == null || notAfter.isBlank()) {
+      return Optional.empty();
+    }
+
+    try {
+      return Optional.of(ZonedDateTime.parse(notAfter, NOT_AFTER_FORMATTER)
+                                      .toInstant());
+    } catch (DateTimeParseException e) {
+      LOG.warn("Unable to parse notAfter for certificate {} -> '{}'", 
certificate, notAfter, e);
+      return Optional.empty();
+    }
+  }
+
+
+}
diff --git 
a/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailComposerTest.java
 
b/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailComposerTest.java
new file mode 100644
index 0000000000..7573680f5a
--- /dev/null
+++ 
b/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailComposerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.scheduler.certificates;
+
+import org.apache.streampipes.model.opcua.Certificate;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class CertificateExpiryEmailComposerTest {
+
+  private CertificateExpiryEmailComposer composer;
+
+  @BeforeEach
+  void setUp() {
+    composer = new CertificateExpiryEmailComposer();
+  }
+
+  @Test
+  public void 
composeMessage_shouldReturnEmptyString_whenNoExpiringCertificates() {
+    var result = composer.composeMessage(Map.of());
+    assertEquals("", result);
+  }
+
+  @Test
+  public void composeMessage_shouldListSingleExpiringCertificate() {
+    var server = "C=US,ST=CA,L=Folsom,OU=dev,O=digitalpetri,CN=Eclipse Milo 
OPC UA Demo Server";
+    var cert = getCertificateForIssuer(server);
+
+    var result = composer.composeMessage(java.util.Map.of(5, 
java.util.List.of(cert)));
+
+    assertTrue(result.contains(server));
+    assertTrue(result.contains("5 days"));
+  }
+
+  @Test
+  public void 
composeMessage_shouldListMultipleExpiringCertificatesAcrossPeriods() {
+    var serverA = "C=US,ST=CA,L=Folsom,OU=dev,O=digitalpetri,CN=Server A";
+    var serverB = "C=US,ST=CA,L=Folsom,OU=dev,O=digitalpetri,CN=Server B";
+    var cert5a = getCertificateForIssuer(serverA);
+    var cert5b = getCertificateForIssuer(serverB);
+
+    var serverC = "C=US,ST=CA,L=Folsom,OU=dev,O=digitalpetri,CN=Server C";
+    var serverD = "C=US,ST=CA,L=Folsom,OU=dev,O=digitalpetri,CN=Server D";
+    var cert7a = getCertificateForIssuer(serverC);
+    var cert7b = getCertificateForIssuer(serverD);
+
+    var map = new HashMap<Integer, List<Certificate>>();
+    map.put(5, java.util.List.of(cert5a, cert5b));
+    map.put(7, java.util.List.of(cert7a, cert7b));
+
+    var result = composer.composeMessage(map);
+
+    assertTrue(result.contains(serverA));
+    assertTrue(result.contains(serverB));
+    assertTrue(result.contains(serverC));
+    assertTrue(result.contains(serverD));
+    assertTrue(result.contains("5 days"));
+    assertTrue(result.contains("7 days"));
+  }
+
+  private Certificate getCertificateForIssuer(String issuerDn) {
+    var cert = new Certificate();
+    cert.setIssuerDn(issuerDn);
+    return cert;
+  }
+
+}
\ No newline at end of file
diff --git 
a/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/scheduler/certificates/ExpiringCertificateFinderTest.java
 
b/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/scheduler/certificates/ExpiringCertificateFinderTest.java
new file mode 100644
index 0000000000..bcfe0e513b
--- /dev/null
+++ 
b/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/scheduler/certificates/ExpiringCertificateFinderTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.scheduler.certificates;
+
+
+import org.apache.streampipes.model.opcua.Certificate;
+import org.apache.streampipes.model.opcua.CertificateBuilder;
+import org.apache.streampipes.storage.api.CRUDStorage;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class ExpiringCertificateFinderTest {
+
+  private CRUDStorage<Certificate> storage;
+
+  private static final Instant FIXED_NOW = 
Instant.parse("2026-01-10T12:00:00Z");
+  private Clock clock;
+
+  @BeforeEach
+  void setUp() {
+    storage = mock(CRUDStorage.class);
+    clock = Clock.fixed(FIXED_NOW, ZoneOffset.UTC);
+  }
+
+  @Test
+  void findCertificates_nullCheck() {
+    var certificate = createCertificateExpiringInDays(5);
+    when(storage.findAll()).thenReturn(List.of(certificate));
+
+    var finder = new ExpiringCertificateFinder(storage, clock);
+    var result = finder.findCertificates(null);
+
+    assertTrue(result.isEmpty());
+  }
+
+  @Test
+  void findCertificates_emptyExpirePeriodsList() {
+    var certificate = createCertificateExpiringInDays(5);
+    when(storage.findAll()).thenReturn(List.of(certificate));
+
+    var finder = new ExpiringCertificateFinder(storage, clock);
+    var result = finder.findCertificates(List.of());
+
+    assertTrue(result.isEmpty());
+
+  }
+
+  @Test
+  void findCertificates_noCertificateExpiresInProvidedPeriods() {
+    var certificate = createCertificateExpiringInDays(10);
+    when(storage.findAll()).thenReturn(List.of(certificate));
+
+    var finder = new ExpiringCertificateFinder(storage, clock);
+    var periods = List.of(5, 7);
+    var result = finder.findCertificates(periods);
+
+    assertEquals(2, result.size());
+    assertTrue(result.containsKey(5));
+    assertTrue(result.get(5).isEmpty());
+    assertTrue(result.containsKey(7));
+    assertTrue(result.get(7).isEmpty());
+  }
+
+  @Test
+  void findCertificates_onePeriodHasExpiringCertificate() {
+    var certExpiringIn5 = createCertificateExpiringInDays(5);
+    var certExpiringIn10 = createCertificateExpiringInDays(10);
+    when(storage.findAll()).thenReturn(List.of(certExpiringIn5, 
certExpiringIn10));
+
+    var finder = new ExpiringCertificateFinder(storage, clock);
+    var periods = List.of(5, 7);
+    var result = finder.findCertificates(periods);
+
+    assertEquals(2, result.size());
+    assertTrue(result.containsKey(5));
+    assertEquals(1, result.get(5).size());
+    assertTrue(result.containsKey(7));
+    assertTrue(result.get(7).isEmpty());
+  }
+
+  @Test
+  void findCertificates_bothPeriodsHaveExpiringCertificates() {
+    var certExpiringIn5 = createCertificateExpiringInDays(5);
+    var certExpiringIn7 = createCertificateExpiringInDays(7);
+    when(storage.findAll()).thenReturn(List.of(certExpiringIn5, 
certExpiringIn7));
+
+    var finder = new ExpiringCertificateFinder(storage, clock);
+    var periods = List.of(5, 7);
+    var result = finder.findCertificates(periods);
+
+    assertEquals(2, result.size());
+    assertTrue(result.containsKey(5));
+    assertEquals(1, result.get(5).size());
+    assertTrue(result.containsKey(7));
+    assertEquals(1, result.get(7).size());
+  }
+
+  private Certificate createCertificateExpiringInDays(int days) {
+    Instant notAfter = FIXED_NOW
+        .plus(days, ChronoUnit.DAYS);
+
+    return CertificateBuilder.create()
+                             .notAfter(Date.from(notAfter).toString())
+                             .build();
+  }
+
+}
\ No newline at end of file

Reply via email to