This is an automated email from the ASF dual-hosted git repository. gomm pushed a commit to branch edge-extensions in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 98c810138f65e3bd5c2ae7db337029ccc9812da9 Author: daniel-gomm <[email protected]> AuthorDate: Fri Oct 8 09:40:12 2021 +0200 fixed issues with evaluation logging --- .../streampipes/logging/evaluation/EvaluationLogger.java | 2 +- .../policies/ThresholdViolationOffloadingPolicy.java | 15 +++++++++++---- .../org/apache/streampipes/performance/TestFactory.java | 4 ++-- .../performance/performancetest/GenericTest.java | 15 +++++++++++++-- 4 files changed, 27 insertions(+), 9 deletions(-) diff --git a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java index 19ccb4b..ccd0a7a 100644 --- a/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java +++ b/streampipes-logging/src/main/java/org/apache/streampipes/logging/evaluation/EvaluationLogger.java @@ -37,7 +37,7 @@ public class EvaluationLogger { private EvaluationLogger(){ String loggingUrl = System.getenv("SP_LOGGING_MQTT_URL"); - String nodeId = System.getenv("SP_LOGGING_MQTT_URL"); + String nodeId = System.getenv("SP_NODE_CONTROLLER_ID"); if (nodeId != null){ this.deviceId = nodeId; }else { diff --git a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java index 39965d9..e7c60fb 100644 --- a/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java +++ b/streampipes-node-controller/src/main/java/org/apache/streampipes/node/controller/management/offloading/strategies/policies/ThresholdViolationOffloadingPolicy.java @@ -54,6 +54,17 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme if(!this.history.offer(value)) { this.history.poll(); this.history.offer(value); + //Only for logging; can be removed later + if(value.compareTo(this.threshold) > 0){ + int numViolations = 0; + for(T val : this.history){ + if(val.compareTo(this.threshold) > 0){ + numViolations++; + } + } + Object[] line = {"policy violation #" + numViolations}; + EvaluationLogger.getInstance().logMQTT("Offloading", line); + } } } @@ -65,8 +76,6 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme for(T value : this.history){ if(value.compareTo(this.threshold) > 0){ numViolations++; - Object[] line = {"policy violation #" + numViolations}; - EvaluationLogger.getInstance().logMQTT("Offloading", line); } } break; @@ -74,8 +83,6 @@ public class ThresholdViolationOffloadingPolicy<T extends Comparable<T>> impleme for(T value : this.history){ if(value.compareTo(this.threshold) < 0){ numViolations++; - Object[] line = {"policy violation #" + numViolations}; - EvaluationLogger.getInstance().logMQTT("Offloading", line); } } break; diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java index 951a540..4cb6dbe 100644 --- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java +++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/TestFactory.java @@ -34,7 +34,7 @@ public class TestFactory { case "Latency": return getLatencyTest(); case "Migration": - Object[] header_migration = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "durationInNanos", "durationInSecs", "originNode", "targetNode"}; + Object[] header_migration = {"timestampInMillis", "deviceId", "event", "numberOfRuns", "durationInNanos", "durationInSecs", "originNode", "targetNode", "success"}; logger.logHeader("Migration", header_migration); return getMigrationTest(); case "Reconfiguration": @@ -73,7 +73,7 @@ public class TestFactory { public static Test getOffloadingTest(){ return new GenericTest(getPipelineName(), false, false, - true, 20000, 600000); + true, 20000, 1500000); } //Helpers diff --git a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java index 55ce48b..892a5c6 100644 --- a/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java +++ b/streampipes-performance-tests/src/main/java/org/apache/streampipes/performance/performancetest/GenericTest.java @@ -98,7 +98,7 @@ public class GenericTest implements Test{ PipelineOperationStatus migrationMessage = client.pipelines().migrate(pipeline); long migrationDuration = System.nanoTime() - beforeMigration; if(testType.equals("Migration")){ - line = new Object[]{System.currentTimeMillis(), "Migration duration", nrRuns, migrationDuration, migrationDuration/1000000000.0,migrationNodes.a, migrationNodes.b, true}; + line = new Object[]{"Migration duration", nrRuns, migrationDuration, migrationDuration/1000000000.0,migrationNodes.a, migrationNodes.b, true}; } System.out.println(migrationMessage.getTitle()); if (!migrationMessage.isSuccess()) { @@ -108,7 +108,8 @@ public class GenericTest implements Test{ } //Reconfiguration if (shouldBeReconfigured) { - pipeline.getSepas().forEach(p -> p.getStaticProperties().stream() + if (testType.equals("Reconfiguration")) + pipeline.getSepas().forEach(p -> p.getStaticProperties().stream() .filter(FreeTextStaticProperty.class::isInstance) .map(FreeTextStaticProperty.class::cast) .filter(FreeTextStaticProperty::isReconfigurable) @@ -117,6 +118,16 @@ public class GenericTest implements Test{ sp.setValue(Float.toString(this.reconfigurableValue++)); } })); + else if (testType.equals("Offloading")) + pipeline.getSepas().forEach(p -> p.getStaticProperties().stream() + .filter(FreeTextStaticProperty.class::isInstance) + .map(FreeTextStaticProperty.class::cast) + .filter(FreeTextStaticProperty::isReconfigurable) + .forEach(sp -> { + if (sp.getInternalName().equals("load")) { + sp.setValue(Float.toString(0.9f)); + } + })); line = new Object[]{"Reconfiguration triggered", nrRuns, (this.reconfigurableValue - 1), true}; System.out.println("Reconfiguration triggered with value " + (this.reconfigurableValue-1)); PipelineOperationStatus message = client.pipelines().reconfigure(pipeline);
