This is an automated email from the ASF dual-hosted git repository. gomm pushed a commit to branch edge-extensions in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit f037eff63036a2cdba14453c574d672c1a10da92 Author: daniel-gomm <[email protected]> AuthorDate: Wed Oct 13 20:48:46 2021 +0200 update logger and fix logging inconsistencies --- .../logging/evaluation/EvaluationLogger.java | 21 ++++++++++++++------- .../controller/api/InvocableEntityResource.java | 6 ++---- .../offloading/OffloadingPolicyManager.java | 9 +++------ .../ThresholdViolationOffloadingPolicy.java | 3 +-- .../statscollector/DockerStatsCollector.java | 2 +- .../apache/streampipes/performance/TestFactory.java | 2 +- .../pipeline/PipelineMigrationExecutor.java | 15 +++++---------- 7 files changed, 27 insertions(+), 31 deletions(-) diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java index ccd0a7a..1f9807c 100644 --- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java +++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java @@ -26,7 +26,6 @@ import java.nio.charset.StandardCharsets; public class EvaluationLogger { private static EvaluationLogger instance = null; - private final MQTT mqtt; private final BlockingConnection connection; private final String deviceId; @@ -43,7 +42,7 @@ public class EvaluationLogger { }else { this.deviceId = "default"; } - mqtt = new MQTT(); + MQTT mqtt = new MQTT(); try { mqtt.setHost(loggingUrl); } catch (URISyntaxException e) { @@ -57,21 +56,29 @@ public class EvaluationLogger { } } - public void logMQTT(String topic, Object[] elements){ + /**public void logMQTT(String topic, Object[] elements){ String message = System.currentTimeMillis() + "," + this.deviceId + ","; for(Object element:elements) message += element + ","; message = message.substring(0, message.length()-1); publish(topic, message); + }**/ + + public void logMQTT(String topic, Object ... elements){ + StringBuilder message = new StringBuilder(System.currentTimeMillis() + "," + this.deviceId + ","); + for(Object element:elements) + message.append(element).append(","); + message = new StringBuilder(message.substring(0, message.length() - 1)); + publish(topic, message.toString()); } public void logHeader(String topic, Object[] elements){ - String message = ""; + StringBuilder message = new StringBuilder(); for(Object element:elements) - message += element + ","; + message.append(element).append(","); if (message.length() > 0) - message = message.substring(0, message.length()-1); - publish(topic, message); + message = new StringBuilder(message.substring(0, message.length() - 1)); + publish(topic, message.toString()); } private void publish(String topic, String message){ diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java index b005ab7..f950fa5 100644 --- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java +++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/api/InvocableEntityResource.java @@ -84,8 +84,7 @@ public class InvocableEntityResource extends AbstractResource { //TODO: Remove Logger after debugging InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId); EvaluationLogger logger = EvaluationLogger.getInstance(); - Object[] line = {"Element detached"}; - logger.logMQTT("Offloading", line); + logger.logMQTT("Offloading", "Element detached"); Response resp = PipelineElementManager.getInstance().detach(graph, runningInstanceId); RunningInvocableInstances.INSTANCE.remove(runningInstanceId); @@ -114,8 +113,7 @@ public class InvocableEntityResource extends AbstractResource { .get(0)) .getValue(); } - Object[] line = {"reconfiguration request received", nrRuns++, value}; - logger.logMQTT("Reconfiguration", line); + logger.logMQTT("Reconfiguration", "reconfiguration request received", nrRuns++, value); InvocableStreamPipesEntity graph = RunningInvocableInstances.INSTANCE.get(runningInstanceId); return ok(PipelineElementManager.getInstance().reconfigure(graph, reconfigurationEntity)); } diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java index 5d745ce..2fc1271 100644 --- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java +++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/OffloadingPolicyManager.java @@ -70,8 +70,7 @@ public class OffloadingPolicyManager { //Currently uses the first violated policy. Could be extended to take the degree of policy violation into // account //TODO: Remove Logger after debugging - Object[] line = {"offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()}; - EvaluationLogger.getInstance().logMQTT("Offloading", line); + EvaluationLogger.getInstance().logMQTT("Offloading", "offloading triggered", violatedPolicies.get(0).getOffloadingPolicy().getClass().getSimpleName()); triggerOffloading(violatedPolicies.get(0)); } //Blacklist of entities is cleared when no policies were violated. @@ -80,16 +79,14 @@ public class OffloadingPolicyManager { private void triggerOffloading(OffloadingStrategy strategy){ InvocableStreamPipesEntity offloadEntity = strategy.getSelectionStrategy().select(this.unsuccessfullyTriedEntities); - Object[] line = {"entity to offload selected"}; - EvaluationLogger.getInstance().logMQTT("Offloading", line); + EvaluationLogger.getInstance().logMQTT("Offloading", "entity to offload selected"); if(offloadEntity != null){ Response resp = PipelineElementManager.getInstance().offload(offloadEntity); String appId = offloadEntity.getAppId(); String pipelineName = offloadEntity.getCorrespondingPipeline(); - Object[] line_done = {"offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId}; - EvaluationLogger.getInstance().logMQTT("Offloading", line_done); + EvaluationLogger.getInstance().logMQTT("Offloading", "offloading done", strategy.getOffloadingPolicy().getClass().getSimpleName(), appId); if(resp.isSuccess()){ LOG.info("Successfully offloaded: {} of pipeline: {}", appId, pipelineName); diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java index e7c60fb..95eb8ff 100644 --- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java +++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java @@ -62,8 +62,7 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme numViolations++; } } - Object[] line = {"policy violation #" + numViolations}; - EvaluationLogger.getInstance().logMQTT("Offloading", line); + EvaluationLogger.getInstance().logMQTT("Offloading", "policy violation #" + numViolations); } } } diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java index 2d038fc..e812b02 100644 --- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java +++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/statscollector/DockerStatsCollector.java @@ -72,7 +72,7 @@ public class DockerStatsCollector { "netTxHumanReadable" }; - EvaluationLogger.getInstance().logMQTT(LOGGING_TOPIC, header); + EvaluationLogger.getInstance().logHeader(LOGGING_TOPIC, header); } private final Runnable collect = () -> { diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java index 4cb6dbe..717095f 100644 --- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java +++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java @@ -42,7 +42,7 @@ public class TestFactory { logger.logHeader("Reconfiguration", header_reconfigure); return getReconfigurationTest(); case "Offloading": - Object[] header_offloading = {"timestampInMillis", "deviceId", "event"}; + Object[] header_offloading = {"timestampInMillis", "deviceId", "event", "policy", "selectedProcessor"}; logger.logHeader("Offloading", header_offloading); return getOffloadingTest(); default: diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java index 2de5bc3..e1f40e4 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/execution/pipeline/PipelineMigrationExecutor.java @@ -103,8 +103,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor { return status; } long start_target_duration = System.nanoTime() - before_start_target; - Object[] line_start_target = {"start target element","",start_target_duration,start_target_duration/1000000000.0}; - logger.logMQTT("Migration", line_start_target); + logger.logMQTT("Migration", "start target element","",start_target_duration,start_target_duration/1000000000.0); // Stop relays from origin predecessor long downtime_beginning = System.nanoTime(); @@ -114,8 +113,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor { return status; } long stop_relays_origin_duration = System.nanoTime() - downtime_beginning; - Object[] line_stop_relay = {"stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0}; - logger.logMQTT("Migration", line_stop_relay); + logger.logMQTT("Migration", "stop relay from origin","",stop_relays_origin_duration,stop_relays_origin_duration/1000000000.0); // Start relays to target after migration long before_start_relay_target = System.nanoTime(); @@ -125,12 +123,10 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor { return status; } long start_relay_target_duration = System.nanoTime() - before_start_relay_target; - Object[] line_start_relay = {"start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0}; - logger.logMQTT("Migration", line_start_relay); + logger.logMQTT("Migration", "start relay to target","",start_relay_target_duration,start_relay_target_duration/1000000000.0); long downtime = System.nanoTime() - downtime_beginning; - Object[] line_downtime = {"downtime", "", downtime, downtime/1000000000.0}; - logger.logMQTT("Migration", line_downtime); + logger.logMQTT("Migration", "downtime", "", downtime, downtime/1000000000.0); //Stop origin and associated relay long before_stop_origin = System.nanoTime(); @@ -140,8 +136,7 @@ public class PipelineMigrationExecutor extends AbstractPipelineExecutor { return status; } long stop_origin_duration = System.nanoTime() - before_stop_origin; - Object[] line_stop_origin = {"stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0}; - logger.logMQTT("Migration", line_stop_origin); + logger.logMQTT("Migration", "stop origin element","",stop_origin_duration,stop_origin_duration/1000000000.0); List<InvocableStreamPipesEntity> graphs = new ArrayList<>(); graphs.addAll(pipeline.getActions());
