This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 5aec1ac0fa refactor: Add additional logging in adapter health check
(#3882)
5aec1ac0fa is described below
commit 5aec1ac0fa8d1e425d2b3dfcbdfc55e50256d33c
Author: Philipp Zehnder <[email protected]>
AuthorDate: Mon Oct 27 15:11:11 2025 +0100
refactor: Add additional logging in adapter health check (#3882)
---
.../connect/management/health/AdapterHealthCheck.java | 17 +++++++++++++++++
.../connect/iiot/adapters/oi4/Oi4Adapter.java | 6 ++++++
2 files changed, 23 insertions(+)
diff --git
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
index b992aa7aac..0dda5cde3e 100644
---
a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
+++
b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java
@@ -66,6 +66,7 @@ public class AdapterHealthCheck implements Runnable {
* {@link org.apache.streampipes.manager.health.PipelineHealthCheck}).
*/
public void checkAndRestoreAdapters() {
+ LOG.info("Adapter health check started");
// Get all adapters that are supposed to run according to the backend
storage
Map<String, AdapterDescription> adapterInstancesSupposedToRun =
this.getAllAdaptersSupposedToRun();
@@ -78,6 +79,11 @@ public class AdapterHealthCheck implements Runnable {
Map<String, AdapterDescription> allAdaptersToRecover =
this.getAdaptersToRecover(groupByWorker,
adapterInstancesSupposedToRun);
+ allAdaptersToRecover
+ .keySet()
+ .forEach(adapterId ->
+ LOG.info("Adapter instance with id {} needs to be
recovered", adapterId));
+
try {
if (!adapterInstancesSupposedToRun.isEmpty()) {
// Filter adapters so that only healthy and running adapters are
updated in the metrics endpoint
@@ -97,6 +103,8 @@ public class AdapterHealthCheck implements Runnable {
LOG.error("Could not update adapter metrics due to an invalid state.
({})", e.getMessage());
}
+ LOG.info("Monitoring metrics updated for running adapters.");
+
// Recover Adapters
this.recoverAdapters(allAdaptersToRecover);
}
@@ -242,10 +250,19 @@ public class AdapterHealthCheck implements Runnable {
// Invoke all adapters that were running when the adapter container was
stopped
try {
if (adapterDescription.isRunning()) {
+ LOG.info("Start recovering adapter {} ",
adapterDescription.getElementId());
this.adapterMasterManagement.startStreamAdapter(adapterDescription.getElementId());
+ LOG.info("Adapter {} is recovered",
adapterDescription.getElementId());
+
}
} catch (AdapterException e) {
LOG.warn("Could not start adapter {} ({})",
adapterDescription.getName(), e.getMessage());
+ } catch (Exception e) {
+ LOG.error(
+ "Unexpected error while recovering adapter {} ({})",
+ adapterDescription.getName(),
+ e.getMessage()
+ );
}
}
diff --git
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
index aa24c3ec5d..b191a47922 100644
---
a/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
+++
b/streampipes-extensions/streampipes-connect-adapters-iiot/src/main/java/org/apache/streampipes/connect/iiot/adapters/oi4/Oi4Adapter.java
@@ -140,6 +140,9 @@ public class Oi4Adapter implements StreamPipesAdapter {
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext
) throws AdapterException {
+ LOG.info("Adapter type {} starting", ID);
+ LOG.info("Adapter with id {} starting",
extractor.getAdapterDescription().getElementId());
+
this.applyConfiguration(extractor.getStaticPropertyExtractor());
this.mqttConsumer = new MqttConsumer(
@@ -160,6 +163,9 @@ public class Oi4Adapter implements StreamPipesAdapter {
Thread thread = new Thread(this.mqttConsumer);
thread.start();
+
+ LOG.info("Adapter {} started", ID);
+ LOG.info("Adapter with id {} started",
extractor.getAdapterDescription().getElementId());
}
private InputStream convertByte(byte[] event) {