This is an automated email from the ASF dual-hosted git repository.

gomm pushed a commit to branch edge-extensions
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit f037eff63036a2cdba14453c574d672c1a10da92
Author: daniel-gomm <[email protected]>
AuthorDate: Wed Oct 13 20:48:46 2021 +0200

    update logger and fix logging inconsistencies
---
 .../logging/evaluation/EvaluationLogger.java        | 21 ++++++++++++++-------
 .../controller/api/InvocableEntityResource.java     |  6 ++----
 .../offloading/OffloadingPolicyManager.java         |  9 +++------
 .../ThresholdViolationOffloadingPolicy.java         |  3 +--
 .../statscollector/DockerStatsCollector.java        |  2 +-
 .../apache/streampipes/performance/TestFactory.java |  2 +-
 .../pipeline/PipelineMigrationExecutor.java         | 15 +++++----------
 7 files changed, 27 insertions(+), 31 deletions(-)

diff --git 
a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
 
b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
index ccd0a7a..1f9807c 100644
--- 
a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
+++ 
b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java
@@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets;
 
 public class EvaluationLogger {
     private static EvaluationLogger instance = null;
-    private final MQTT mqtt;
     private final BlockingConnection connection;
     private final String deviceId;
 
@@ -43,7 +42,7 @@ public class EvaluationLogger {
         }else {
             this.deviceId = "default";
         }
-        mqtt = new MQTT();
+        MQTT mqtt = new MQTT();
         try {
             mqtt.setHost(loggingUrl);
         } catch (URISyntaxException e) {
@@ -57,21 +56,29 @@ public class EvaluationLogger {
         }
     }
 
-    public void logMQTT(String topic, Object[] elements){
+    /**public void logMQTT(String topic, Object[] elements){
         String message = System.currentTimeMillis() + "," + this.deviceId + 
",";
         for(Object element:elements)
             message += element + ",";
         message = message.substring(0, message.length()-1);
         publish(topic, message);
+    }**/
+
+    public void logMQTT(String topic, Object ... elements){
+        StringBuilder message = new StringBuilder(System.currentTimeMillis() + 
"," + this.deviceId + ",");
+        for(Object element:elements)
+            message.append(element).append(",");
+        message = new StringBuilder(message.substring(0, message.length() - 
1));
+        publish(topic, message.toString());
     }
 
     public void logHeader(String topic, Object[] elements){
-        String message = "";
+        StringBuilder message = new StringBuilder();
         for(Object element:elements)
-            message += element + ",";
+            message.append(element).append(",");
         if (message.length() > 0)
-            message = message.substring(0, message.length()-1);
-        publish(topic, message);
+            message = new StringBuilder(message.substring(0, message.length() 
- 1));
+        publish(topic, message.toString());
     }
 
     private void publish(String topic, String message){
diff --git 
a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
 
b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
index b005ab7..f950fa5 100644
--- 
a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
+++ 
b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java
@@ -84,8 +84,7 @@ public class InvocableEntityResource extends AbstractResource 
{
         //TODO: Remove Logger after debugging
         InvocableStreamPipesEntity graph = 
RunningInvocableInstances.INSTANCE.get(runningInstanceId);
         EvaluationLogger logger = EvaluationLogger.getInstance();
-        Object[] line = {"Element detached"};
-        logger.logMQTT("Offloading", line);
+        logger.logMQTT("Offloading", "Element detached");
         Response resp = PipelineElementManager.getInstance().detach(graph, 
runningInstanceId);
         RunningInvocableInstances.INSTANCE.remove(runningInstanceId);
 
@@ -114,8 +113,7 @@ public class InvocableEntityResource extends 
AbstractResource {
                     .get(0))
                     .getValue();
         }
-        Object[] line = {"reconfiguration request received", nrRuns++, value};
-        logger.logMQTT("Reconfiguration", line);
+        logger.logMQTT("Reconfiguration", "reconfiguration request received", 
nrRuns++, value);
         InvocableStreamPipesEntity graph = 
RunningInvocableInstances.INSTANCE.get(runningInstanceId);
         return ok(PipelineElementManager.getInstance().reconfigure(graph, 
reconfigurationEntity));
     }
diff --git 
a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
 
b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
index 5d745ce..2fc1271 100644
--- 
a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
+++ 
b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java
@@ -70,8 +70,7 @@ public class OffloadingPolicyManager {
             //Currently uses the first violated policy. Could be extended to 
take the degree of policy violation into
             // account
             //TODO: Remove Logger after debugging
-            Object[] line = {"offloading triggered", 
violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()};
-            EvaluationLogger.getInstance().logMQTT("Offloading", line);
+            EvaluationLogger.getInstance().logMQTT("Offloading", "offloading 
triggered", 
violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName());
             triggerOffloading(violatedPolicies.get(0));
         }
         //Blacklist of entities is cleared when no policies were violated.
@@ -80,16 +79,14 @@ public class OffloadingPolicyManager {
 
     private void triggerOffloading(OffloadingStrategy strategy){
         InvocableStreamPipesEntity offloadEntity = 
strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities);
-        Object[] line = {"entity to offload selected"};
-        EvaluationLogger.getInstance().logMQTT("Offloading", line);
+        EvaluationLogger.getInstance().logMQTT("Offloading", "entity to 
offload selected");
         if(offloadEntity != null){
             Response resp = 
PipelineElementManager.getInstance().offload(offloadEntity);
 
             String appId = offloadEntity.getAppId();
             String pipelineName = offloadEntity.getCorrespondingPipeline();
 
-            Object[] line_done = {"offloading done", 
strategy.getOffloadingPolicy().getClass().getSimpleName(), appId};
-            EvaluationLogger.getInstance().logMQTT("Offloading", line_done);
+            EvaluationLogger.getInstance().logMQTT("Offloading", "offloading 
done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId);
 
             if(resp.isSuccess()){
                 LOG.info("Successfully offloaded: {} of pipeline: {}", appId, 
pipelineName);
diff --git 
a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
 
b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
index e7c60fb..95eb8ff 100644
--- 
a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
+++ 
b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java
@@ -62,8 +62,7 @@ public class ThresholdViolationOffloadingPolicy<T extends 
Comparable<T>> impleme
                         numViolations++;
                     }
                 }
-                Object[] line = {"policy violation #" + numViolations};
-                EvaluationLogger.getInstance().logMQTT("Offloading", line);
+                EvaluationLogger.getInstance().logMQTT("Offloading", "policy 
violation #" + numViolations);
             }
         }
     }
diff --git 
a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
 
b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
index 2d038fc..e812b02 100644
--- 
a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
+++ 
b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java
@@ -72,7 +72,7 @@ public class DockerStatsCollector {
                 "netTxHumanReadable"
         };
 
-        EvaluationLogger.getInstance().logMQTT(LOGGING_TOPIC, header);
+        EvaluationLogger.getInstance().logHeader(LOGGING_TOPIC, header);
     }
 
     private final Runnable collect = () -> {
diff --git 
a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
 
b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
index 4cb6dbe..717095f 100644
--- 
a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
+++ 
b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java
@@ -42,7 +42,7 @@ public class TestFactory {
                 logger.logHeader("Reconfiguration", header_reconfigure);
                 return getReconfigurationTest();
             case "Offloading":
-                Object[] header_offloading = {"timestampInMillis", "deviceId", 
"event"};
+                Object[] header_offloading = {"timestampInMillis", "deviceId", 
"event", "policy", "selectedProcessor"};
                 logger.logHeader("Offloading", header_offloading);
                 return getOffloadingTest();
             default:
diff --git 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
index 2de5bc3..e1f40e4 100644
--- 
a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
+++ 
b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java
@@ -103,8 +103,7 @@ public class PipelineMigrationExecutor extends 
AbstractPipelineExecutor {
             return status;
         }
         long start_target_duration = System.nanoTime() - before_start_target;
-        Object[] line_start_target = {"start target 
element","",start_target_duration,start_target_duration/1000000000.0};
-        logger.logMQTT("Migration", line_start_target);
+        logger.logMQTT("Migration", "start target 
element","",start_target_duration,start_target_duration/1000000000.0);
 
         // Stop relays from origin predecessor
         long downtime_beginning = System.nanoTime();
@@ -114,8 +113,7 @@ public class PipelineMigrationExecutor extends 
AbstractPipelineExecutor {
             return status;
         }
         long stop_relays_origin_duration = System.nanoTime() - 
downtime_beginning;
-        Object[] line_stop_relay = {"stop relay from 
origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0};
-        logger.logMQTT("Migration", line_stop_relay);
+        logger.logMQTT("Migration", "stop relay from 
origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0);
 
         // Start relays to target after migration
         long before_start_relay_target = System.nanoTime();
@@ -125,12 +123,10 @@ public class PipelineMigrationExecutor extends 
AbstractPipelineExecutor {
             return status;
         }
         long start_relay_target_duration = System.nanoTime() - 
before_start_relay_target;
-        Object[] line_start_relay = {"start relay to 
target","",start_relay_target_duration,start_relay_target_duration/1000000000.0};
-        logger.logMQTT("Migration", line_start_relay);
+        logger.logMQTT("Migration", "start relay to 
target","",start_relay_target_duration,start_relay_target_duration/1000000000.0);
 
         long downtime = System.nanoTime() - downtime_beginning;
-        Object[] line_downtime = {"downtime", "", downtime, 
downtime/1000000000.0};
-        logger.logMQTT("Migration", line_downtime);
+        logger.logMQTT("Migration", "downtime", "", downtime, 
downtime/1000000000.0);
 
         //Stop origin and associated relay
         long before_stop_origin = System.nanoTime();
@@ -140,8 +136,7 @@ public class PipelineMigrationExecutor extends 
AbstractPipelineExecutor {
             return status;
         }
         long stop_origin_duration = System.nanoTime() - before_stop_origin;
-        Object[] line_stop_origin = {"stop origin 
element","",stop_origin_duration,stop_origin_duration/1000000000.0};
-        logger.logMQTT("Migration", line_stop_origin);
+        logger.logMQTT("Migration", "stop origin 
element","",stop_origin_duration,stop_origin_duration/1000000000.0);
 
         List<InvocableStreamPipesEntity> graphs = new ArrayList<>();
         graphs.addAll(pipeline.getActions());

Reply via email to