This is an automated email from the ASF dual-hosted git repository. zehnder pushed a commit to branch refactor-add-logging-adapter-health-check in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit f044e19966c131d509fd25405ba5e44f8060e586 Author: Philipp Zehnder <[email protected]> AuthorDate: Mon Oct 27 15:05:53 2025 +0100 refactor: Add logging in adapter health check --- .../connect/management/health/AdapterHealthCheck.java | 17 +++++++++++++++++ .../connect/iiot/adapters/oi4/Oi4Adapter.java | 6 ++++++ 2 files changed, 23 insertions(+) diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java index b992aa7aac..0dda5cde3e 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java @@ -66,6 +66,7 @@ public class AdapterHealthCheck implements Runnable { * {@link org.apache.streampipes.manager.health.PipelineHealthCheck}). */ public void checkAndRestoreAdapters() { + LOG.info("Adapter health check started"); // Get all adapters that are supposed to run according to the backend storage Map<String, AdapterDescription> adapterInstancesSupposedToRun = this.getAllAdaptersSupposedToRun(); @@ -78,6 +79,11 @@ public class AdapterHealthCheck implements Runnable { Map<String, AdapterDescription> allAdaptersToRecover = this.getAdaptersToRecover(groupByWorker, adapterInstancesSupposedToRun); + allAdaptersToRecover + .keySet() + .forEach(adapterId -> + LOG.info("Adapter instance with id {} needs to be recovered", adapterId)); + try { if (!adapterInstancesSupposedToRun.isEmpty()) { // Filter adapters so that only healthy and running adapters are updated in the metrics endpoint @@ -97,6 +103,8 @@ public class AdapterHealthCheck implements Runnable { LOG.error("Could not update adapter metrics due to an invalid state. ({})", e.getMessage()); } + LOG.info("Monitoring metrics updated for running adapters."); + // Recover Adapters this.recoverAdapters(allAdaptersToRecover); } @@ -242,10 +250,19 @@ public class AdapterHealthCheck implements Runnable { // Invoke all adapters that were running when the adapter container was stopped try { if (adapterDescription.isRunning()) { + LOG.info("Start recovering adapter {} ", adapterDescription.getElementId()); this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId()); + LOG.info("Adapter {} is recovered", adapterDescription.getElementId()); + } } catch (AdapterException e) { LOG.warn("Could not start adapter {} ({})", adapterDescription.getName(), e.getMessage()); + } catch (Exception e) { + LOG.error( + "Unexpected error while recovering adapter {} ({})", + adapterDescription.getName(), + e.getMessage() + ); } } diff --git a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java index aa24c3ec5d..b191a47922 100644 --- a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java +++ b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java @@ -140,6 +140,9 @@ public class Oi4Adapter implements StreamPipesAdapter { IEventCollector collector, IAdapterRuntimeContext adapterRuntimeContext ) throws AdapterException { + LOG.info("Adapter type {} starting", ID); + LOG.info("Adapter with id {} starting", extractor.getAdapterDescription().getElementId()); + this.applyConfiguration(extractor.getStaticPropertyExtractor()); this.mqttConsumer = new MqttConsumer( @@ -160,6 +163,9 @@ public class Oi4Adapter implements StreamPipesAdapter { Thread thread = new Thread(this.mqttConsumer); thread.start(); + + LOG.info("Adapter {} started", ID); + LOG.info("Adapter with id {} started", extractor.getAdapterDescription().getElementId()); } private InputStream convertByte(byte[] event) {
