This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch
4095-send-email-notifications-for-expiring-certificates
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/4095-send-email-notifications-for-expiring-certificates by this push:
new 93a7254607 fix(#4095): Add Email notification for expired opc ua
certificates
93a7254607 is described below
commit 93a7254607d302aa1a48d1b6b9f94a004c1cf70d
Author: Philipp Zehnder <[email protected]>
AuthorDate: Thu Jan 15 16:21:18 2026 +0100
fix(#4095): Add Email notification for expired opc ua certificates
---
.../apache/streampipes/commons/constants/Envs.java | 4 +
.../commons/environment/DefaultEnvironment.java | 11 ++
.../commons/environment/Environment.java | 4 +
.../model/opcua/CertificateBuilder.java | 4 +
.../service/core/scheduler/DataLakeScheduler.java | 6 +-
.../CertificateExpiryEmailComposer.java | 85 +++++++++++
.../CertificateExpiryEmailScheduler.java | 166 +++++++++++++++++++++
.../certificates/ExpiringCertificateFinder.java | 125 ++++++++++++++++
.../CertificateExpiryEmailComposerTest.java | 91 +++++++++++
.../ExpiringCertificateFinderTest.java | 136 +++++++++++++++++
10 files changed, 629 insertions(+), 3 deletions(-)
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
index 990629b0f1..c1b04d1193 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java
@@ -158,6 +158,10 @@ public enum Envs {
// *") //Cron Job in Dev Setting; Running every 2 min
SP_RETENTION_LOG_LENGTH("SP_RETENTION_LOG_LENGTH", "10"),
+ SP_CERTIFICATE_EXPIRY_CRON("SP_CERTIFICATE_EXPIRY_CRON", "0 2 0 * * *"),
+ SP_CERTIFICATE_EXPIRY_EMAIL_DAYS("SP_CERTIFICATE_EXPIRY_EMAIL_DAYS", null),
+
+
// Logging
SP_LOGGING_FILE_ENABLED("SP_LOGGING_FILE_ENABLED", "false"),
SP_LOGGING_CONSOLE_ENABLED("SP_LOGGING_CONSOLE_ENABLED", "true"),
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
index 9f5a2d997a..04d790fee9 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java
@@ -596,4 +596,15 @@ public class DefaultEnvironment implements Environment {
public IntEnvironmentVariable getDatalakeRetentionLogLength() {
return new IntEnvironmentVariable(Envs.SP_RETENTION_LOG_LENGTH);
}
+
+ @Override
+ public StringEnvironmentVariable getCertificateExpiryCron() {
+ return new StringEnvironmentVariable(Envs.SP_CERTIFICATE_EXPIRY_CRON);
+ }
+
+ @Override
+ public StringEnvironmentVariable getCertificateExpiryEmailDays() {
+ return new
StringEnvironmentVariable(Envs.SP_CERTIFICATE_EXPIRY_EMAIL_DAYS);
+ }
+
}
diff --git
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
index 30567d9214..3f4b6c5291 100644
---
a/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
+++
b/streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java
@@ -232,6 +232,10 @@ public interface Environment {
BooleanEnvironmentVariable getLoadManagerEnable();
+ // Certificate expiration email reminder
+ StringEnvironmentVariable getCertificateExpiryCron();
+ StringEnvironmentVariable getCertificateExpiryEmailDays();
+
//SpRateLimiter
LongEnvironmentVariable getRateLimiterDefaultWarmupPeriod();
IntEnvironmentVariable getRateLimiterSchedulerInitialDelaySeconds();
diff --git
a/streampipes-model/src/main/java/org/apache/streampipes/model/opcua/CertificateBuilder.java
b/streampipes-model/src/main/java/org/apache/streampipes/model/opcua/CertificateBuilder.java
index 191ff612dc..fc784eeb4c 100644
---
a/streampipes-model/src/main/java/org/apache/streampipes/model/opcua/CertificateBuilder.java
+++
b/streampipes-model/src/main/java/org/apache/streampipes/model/opcua/CertificateBuilder.java
@@ -113,6 +113,10 @@ public final class CertificateBuilder {
return cert;
}
+ public static CertificateBuilder create() {
+ return new CertificateBuilder();
+ }
+
public static Certificate fromX509(X509Certificate cert, CertificateState
state) {
Objects.requireNonNull(cert, "cert");
var b = new CertificateBuilder();
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
index 581f886b63..251e750ffb 100644
---
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java
@@ -35,7 +35,7 @@ import java.util.List;
@Configuration
public class DataLakeScheduler implements SchedulingConfigurer {
- private static DataLakeExportManager dataLakeExportManager = new
DataLakeExportManager();
+ private static final DataLakeExportManager dataLakeExportManager = new
DataLakeExportManager();
private static final Logger LOG =
LoggerFactory.getLogger(DataLakeExportManager.class);
private final IDataExplorerSchemaManagement dataExplorerSchemaManagement =
new DataExplorerDispatcher()
@@ -43,6 +43,7 @@ public class DataLakeScheduler implements
SchedulingConfigurer {
.getSchemaManagement();
public void cleanupMeasurements() {
+ LOG.info("Retention CRON Job triggered.");
List<DataLakeMeasure> allMeasurements =
this.dataExplorerSchemaManagement.getAllMeasurements();
LOG.info("GET ALL Measurements");
for (DataLakeMeasure dataLakeMeasure : allMeasurements) {
@@ -54,12 +55,12 @@ public class DataLakeScheduler implements
SchedulingConfigurer {
}
}
+ LOG.info("Retention CRON Job finished.");
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
var env = Environments.getEnvironment();
- LOG.info("Retention CRON Job triggered.");
taskRegistrar.addTriggerTask(
this::cleanupMeasurements,
@@ -69,7 +70,6 @@ public class DataLakeScheduler implements
SchedulingConfigurer {
);
- LOG.info("Retention CRON Job finished.");
}
}
\ No newline at end of file
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailComposer.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailComposer.java
new file mode 100644
index 0000000000..ce9495c546
--- /dev/null
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailComposer.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.scheduler.certificates;
+
+import org.apache.streampipes.model.opcua.Certificate;
+
+import java.util.List;
+import java.util.Map;
+
+public class CertificateExpiryEmailComposer {
+
+ public String composeMessage(Map<Integer, List<Certificate>>
expiringCertificates) {
+ if (expiringCertificates == null || expiringCertificates.isEmpty()) {
+ return "";
+ }
+
+ StringBuilder html = new StringBuilder();
+
+ for (var entry : expiringCertificates.entrySet()) {
+ Integer days = entry.getKey();
+ List<Certificate> certs = entry.getValue();
+
+ if (certs == null || certs.isEmpty()) {
+ continue;
+ }
+
+ html.append("<table width=\"100%\" cellpadding=\"0\" cellspacing=\"0\"
style=\"max-width:600px;\">")
+ .append("<tr>")
+ .append("<td bgcolor=\"#ffffff\" style=\"padding:24px;
font-family:Helvetica,Arial,sans-serif; font-size:16px; line-height:24px;\">")
+ .append("<p style=\"margin:0 0 12px 0;\"><strong>")
+ .append("Following certificates expire in ")
+ .append(days)
+ .append(" days:")
+ .append("</strong></p>")
+ .append("<ul style=\"margin:0; padding-left:20px;\">");
+
+ for (Certificate cert : certs) {
+ String issuer = extractIssuer(cert);
+ html.append("<li style=\"margin-bottom:6px;\">")
+ .append(escapeHtml(issuer))
+ .append("</li>");
+ }
+
+ html.append("</ul>")
+ .append("</td>")
+ .append("</tr>")
+ .append("</table>");
+ }
+
+ return html.toString();
+ }
+
+ private String extractIssuer(Certificate cert) {
+ if (cert == null) {
+ return "unknown issuer";
+ }
+ String issuer = cert.getIssuerDn();
+ return (issuer == null || issuer.isBlank()) ? "unknown issuer" : issuer;
+ }
+
+ private String escapeHtml(String value) {
+ return value
+ .replace("&", "&")
+ .replace("<", "<")
+ .replace(">", ">")
+ .replace("\"", """)
+ .replace("'", "'");
+ }
+}
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailScheduler.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailScheduler.java
new file mode 100644
index 0000000000..4bde4b1c60
--- /dev/null
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailScheduler.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.scheduler.certificates;
+
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.mail.MailSender;
+import org.apache.streampipes.model.client.user.DefaultRole;
+import org.apache.streampipes.model.client.user.Principal;
+import org.apache.streampipes.model.mail.SpEmail;
+import org.apache.streampipes.model.opcua.Certificate;
+import org.apache.streampipes.storage.api.IUserStorage;
+import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.SchedulingConfigurer;
+import org.springframework.scheduling.config.ScheduledTaskRegistrar;
+import org.springframework.scheduling.support.CronTrigger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Configuration
+public class CertificateExpiryEmailScheduler implements SchedulingConfigurer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CertificateExpiryEmailScheduler.class);
+
+ private static final String SUBJECT = "Upcoming certificate expirations —
action required";
+
+ public void checkForExpiringCertificates() {
+
+ var certificateExpiryEmailDays =
Environments.getEnvironment().getCertificateExpiryEmailDays().getValueOrDefault();
+ if (certificateExpiryEmailDays != null) {
+ executeIfEnvVariableIsConfigured(certificateExpiryEmailDays);
+ } else {
+ LOG.debug("No certificate expiry email notification configured.");
+ }
+ }
+
+ @Override
+ public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
+ var env = Environments.getEnvironment();
+ taskRegistrar.addTriggerTask(
+
+ this::checkForExpiringCertificates,
+ triggerContext -> new CronTrigger(env.getCertificateExpiryCron()
+ .getValueOrDefault())
+ .nextExecution(triggerContext)
+ );
+ }
+
+ private void executeIfEnvVariableIsConfigured(String
certificateExpiryEmailDays) {
+ LOG.info("Certificate expiration email notification CRON Job triggered.");
+ var expirePeriodsInDays =
parseCommaSeparatedIntegers(certificateExpiryEmailDays);
+
+ var expiringCertificates = getExpiringCertificates(expirePeriodsInDays);
+
+ if (checkIfEmailShouldBeSent(expiringCertificates)) {
+ var adminEmails = getEmailAddressesOfAdmins();
+
+ var message = new
CertificateExpiryEmailComposer().composeMessage(expiringCertificates);
+ sendEmail(adminEmails, message);
+ LOG.info("Certificate expiration email notification email sent to all
admins.");
+ }
+
+ LOG.info("Certificate expiration email notification CRON Job finished.");
+ }
+
+ private boolean checkIfEmailShouldBeSent(Map<Integer, List<Certificate>>
expiringCertificates) {
+ return expiringCertificates.values()
+ .stream()
+ .anyMatch(list -> !list.isEmpty());
+ }
+
+ private List<String> getEmailAddressesOfAdmins() {
+ return getUserStorageAPI()
+ .getAllUserAccounts()
+ .stream()
+ .filter(u -> u.getRoles().contains(DefaultRole.ROLE_ADMIN.name()))
+ .map(Principal::getUsername)
+ .collect(Collectors.toList());
+
+ }
+
+ /**
+ * Returns a map with one list entry of certificates per requested period
(days).
+ */
+ private Map<Integer, List<Certificate>>
getExpiringCertificates(List<Integer> expirePeriodsInDays) {
+ return new ExpiringCertificateFinder()
+ .findCertificates(expirePeriodsInDays);
+ }
+
+ private void sendEmail(List<String> recipients, String message) {
+ var email = SpEmail.from(recipients, SUBJECT, message);
+ try {
+ new MailSender().sendEmail(email);
+ } catch (IOException e) {
+ LOG.error("Failed to send certificate expiry email to {}", recipients,
e);
+ }
+
+ }
+
+ private List<Integer> parseCommaSeparatedIntegers(String csv) {
+ if (csv == null || csv.isBlank()) {
+ return List.of();
+ }
+
+ return Arrays.stream(csv.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .flatMap(s -> {
+ try {
+ return Stream.of(Integer.parseInt(s));
+ } catch (NumberFormatException e) {
+ LOG.warn("Invalid integer in env variable
SP_CERTIFICATE_EXPIRY_EMAIL_DAYS: '{}'", s);
+ return Stream.empty();
+ }
+ })
+ .toList();
+ }
+
+ private IUserStorage getUserStorageAPI() {
+ return CouchDbStorageManager.INSTANCE
+ .getUserStorageAPI();
+ }
+
+}
diff --git
a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/ExpiringCertificateFinder.java
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/ExpiringCertificateFinder.java
new file mode 100644
index 0000000000..54b5131e03
--- /dev/null
+++
b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/certificates/ExpiringCertificateFinder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.scheduler.certificates;
+
+import org.apache.streampipes.model.opcua.Certificate;
+import org.apache.streampipes.storage.api.CRUDStorage;
+import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeParseException;
+import java.time.temporal.ChronoUnit;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class ExpiringCertificateFinder {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ExpiringCertificateFinder.class);
+
+ private static final DateTimeFormatter NOT_AFTER_FORMATTER =
+ DateTimeFormatter.ofPattern("EEE MMM dd HH:mm:ss zzz yyyy",
Locale.ENGLISH);
+
+ private final CRUDStorage<Certificate> certificateStorage;
+ private final Clock clock;
+
+ public ExpiringCertificateFinder(CRUDStorage<Certificate>
certificateStorage, Clock clock) {
+ this.certificateStorage = certificateStorage;
+ this.clock = clock;
+ }
+
+ public ExpiringCertificateFinder() {
+ this(
+ StorageDispatcher.INSTANCE.getNoSqlStore()
+ .getCertificateStorage(),
+ Clock.systemUTC()
+ );
+ }
+
+
+ /**
+ * Returns a map with one entry per requested period (days).
+ * Each value contains certificates whose notAfter timestamp falls within
that UTC day.
+ * Invalid periods (null/negative) are ignored.
+ */
+ public Map<Integer, List<Certificate>> findCertificates(List<Integer>
expirePeriods) {
+ if (expirePeriods == null || expirePeriods.isEmpty()) {
+ return Map.of();
+ }
+
+ var allCertificates = certificateStorage.findAll();
+
+ var now = clock.instant();
+
+ Map<Integer, List<Certificate>> result = new LinkedHashMap<>();
+
+ for (int days : expirePeriods) {
+ var windowStart = now.plus(days, ChronoUnit.DAYS)
+ .truncatedTo(ChronoUnit.DAYS);
+ var windowEnd = windowStart.plus(1, ChronoUnit.DAYS);
+
+ result.put(days, filterCertificatesExpiringBetween(allCertificates,
windowStart, windowEnd));
+ }
+
+ return result;
+ }
+
+ private List<Certificate> filterCertificatesExpiringBetween(
+ List<Certificate> certificates,
+ Instant windowStartInclusive,
+ Instant windowEndExclusive
+ ) {
+ if (certificates == null || certificates.isEmpty()) {
+ return List.of();
+ }
+
+ return certificates.stream()
+ .filter(Objects::nonNull)
+ .filter(c ->
+ parseNotAfterInstant(c).map(ts ->
!ts.isBefore(windowStartInclusive) && ts.isBefore(windowEndExclusive))
+ .orElse(false))
+ .toList();
+ }
+
+ private Optional<Instant> parseNotAfterInstant(Certificate certificate) {
+ String notAfter = certificate.getNotAfter();
+ if (notAfter == null || notAfter.isBlank()) {
+ return Optional.empty();
+ }
+
+ try {
+ return Optional.of(ZonedDateTime.parse(notAfter, NOT_AFTER_FORMATTER)
+ .toInstant());
+ } catch (DateTimeParseException e) {
+ LOG.warn("Unable to parse notAfter for certificate {} -> '{}'",
certificate, notAfter, e);
+ return Optional.empty();
+ }
+ }
+
+
+}
diff --git
a/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailComposerTest.java
b/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailComposerTest.java
new file mode 100644
index 0000000000..7573680f5a
--- /dev/null
+++
b/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/scheduler/certificates/CertificateExpiryEmailComposerTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.scheduler.certificates;
+
+import org.apache.streampipes.model.opcua.Certificate;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class CertificateExpiryEmailComposerTest {
+
+ private CertificateExpiryEmailComposer composer;
+
+ @BeforeEach
+ void setUp() {
+ composer = new CertificateExpiryEmailComposer();
+ }
+
+ @Test
+ public void
composeMessage_shouldReturnEmptyString_whenNoExpiringCertificates() {
+ var result = composer.composeMessage(Map.of());
+ assertEquals("", result);
+ }
+
+ @Test
+ public void composeMessage_shouldListSingleExpiringCertificate() {
+ var server = "C=US,ST=CA,L=Folsom,OU=dev,O=digitalpetri,CN=Eclipse Milo
OPC UA Demo Server";
+ var cert = getCertificateForIssuer(server);
+
+ var result = composer.composeMessage(java.util.Map.of(5,
java.util.List.of(cert)));
+
+ assertTrue(result.contains(server));
+ assertTrue(result.contains("5 days"));
+ }
+
+ @Test
+ public void
composeMessage_shouldListMultipleExpiringCertificatesAcrossPeriods() {
+ var serverA = "C=US,ST=CA,L=Folsom,OU=dev,O=digitalpetri,CN=Server A";
+ var serverB = "C=US,ST=CA,L=Folsom,OU=dev,O=digitalpetri,CN=Server B";
+ var cert5a = getCertificateForIssuer(serverA);
+ var cert5b = getCertificateForIssuer(serverB);
+
+ var serverC = "C=US,ST=CA,L=Folsom,OU=dev,O=digitalpetri,CN=Server C";
+ var serverD = "C=US,ST=CA,L=Folsom,OU=dev,O=digitalpetri,CN=Server D";
+ var cert7a = getCertificateForIssuer(serverC);
+ var cert7b = getCertificateForIssuer(serverD);
+
+ var map = new HashMap<Integer, List<Certificate>>();
+ map.put(5, java.util.List.of(cert5a, cert5b));
+ map.put(7, java.util.List.of(cert7a, cert7b));
+
+ var result = composer.composeMessage(map);
+
+ assertTrue(result.contains(serverA));
+ assertTrue(result.contains(serverB));
+ assertTrue(result.contains(serverC));
+ assertTrue(result.contains(serverD));
+ assertTrue(result.contains("5 days"));
+ assertTrue(result.contains("7 days"));
+ }
+
+ private Certificate getCertificateForIssuer(String issuerDn) {
+ var cert = new Certificate();
+ cert.setIssuerDn(issuerDn);
+ return cert;
+ }
+
+}
\ No newline at end of file
diff --git
a/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/scheduler/certificates/ExpiringCertificateFinderTest.java
b/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/scheduler/certificates/ExpiringCertificateFinderTest.java
new file mode 100644
index 0000000000..bcfe0e513b
--- /dev/null
+++
b/streampipes-service-core/src/test/java/org/apache/streampipes/service/core/scheduler/certificates/ExpiringCertificateFinderTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.service.core.scheduler.certificates;
+
+
+import org.apache.streampipes.model.opcua.Certificate;
+import org.apache.streampipes.model.opcua.CertificateBuilder;
+import org.apache.streampipes.storage.api.CRUDStorage;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.Date;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class ExpiringCertificateFinderTest {
+
+ private CRUDStorage<Certificate> storage;
+
+ private static final Instant FIXED_NOW =
Instant.parse("2026-01-10T12:00:00Z");
+ private Clock clock;
+
+ @BeforeEach
+ void setUp() {
+ storage = mock(CRUDStorage.class);
+ clock = Clock.fixed(FIXED_NOW, ZoneOffset.UTC);
+ }
+
+ @Test
+ void findCertificates_nullCheck() {
+ var certificate = createCertificateExpiringInDays(5);
+ when(storage.findAll()).thenReturn(List.of(certificate));
+
+ var finder = new ExpiringCertificateFinder(storage, clock);
+ var result = finder.findCertificates(null);
+
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void findCertificates_emptyExpirePeriodsList() {
+ var certificate = createCertificateExpiringInDays(5);
+ when(storage.findAll()).thenReturn(List.of(certificate));
+
+ var finder = new ExpiringCertificateFinder(storage, clock);
+ var result = finder.findCertificates(List.of());
+
+ assertTrue(result.isEmpty());
+
+ }
+
+ @Test
+ void findCertificates_noCertificateExpiresInProvidedPeriods() {
+ var certificate = createCertificateExpiringInDays(10);
+ when(storage.findAll()).thenReturn(List.of(certificate));
+
+ var finder = new ExpiringCertificateFinder(storage, clock);
+ var periods = List.of(5, 7);
+ var result = finder.findCertificates(periods);
+
+ assertEquals(2, result.size());
+ assertTrue(result.containsKey(5));
+ assertTrue(result.get(5).isEmpty());
+ assertTrue(result.containsKey(7));
+ assertTrue(result.get(7).isEmpty());
+ }
+
+ @Test
+ void findCertificates_onePeriodHasExpiringCertificate() {
+ var certExpiringIn5 = createCertificateExpiringInDays(5);
+ var certExpiringIn10 = createCertificateExpiringInDays(10);
+ when(storage.findAll()).thenReturn(List.of(certExpiringIn5,
certExpiringIn10));
+
+ var finder = new ExpiringCertificateFinder(storage, clock);
+ var periods = List.of(5, 7);
+ var result = finder.findCertificates(periods);
+
+ assertEquals(2, result.size());
+ assertTrue(result.containsKey(5));
+ assertEquals(1, result.get(5).size());
+ assertTrue(result.containsKey(7));
+ assertTrue(result.get(7).isEmpty());
+ }
+
+ @Test
+ void findCertificates_bothPeriodsHaveExpiringCertificates() {
+ var certExpiringIn5 = createCertificateExpiringInDays(5);
+ var certExpiringIn7 = createCertificateExpiringInDays(7);
+ when(storage.findAll()).thenReturn(List.of(certExpiringIn5,
certExpiringIn7));
+
+ var finder = new ExpiringCertificateFinder(storage, clock);
+ var periods = List.of(5, 7);
+ var result = finder.findCertificates(periods);
+
+ assertEquals(2, result.size());
+ assertTrue(result.containsKey(5));
+ assertEquals(1, result.get(5).size());
+ assertTrue(result.containsKey(7));
+ assertEquals(1, result.get(7).size());
+ }
+
+ private Certificate createCertificateExpiringInDays(int days) {
+ Instant notAfter = FIXED_NOW
+ .plus(days, ChronoUnit.DAYS);
+
+ return CertificateBuilder.create()
+ .notAfter(Date.from(notAfter).toString())
+ .build();
+ }
+
+}
\ No newline at end of file