mxm commented on code in PR #533:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/533#discussion_r1111853051


##########
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java:
##########
@@ -212,6 +213,70 @@ public void testUtilizationBoundaryComputation() {
         assertEquals(Tuple2.of(778.0, 1350.0), getThresholds(700, 0, conf));
     }
 
+    @Test
+    public void testVertexesExclusionForEvaluation() {
+        var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
+        var source = JobVertexID.fromHexString(sourceHexString);
+        var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+        var sink = JobVertexID.fromHexString(sinkHexString);
+
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source, Collections.emptySet(), 1, 1),
+                        new VertexInfo(sink, Set.of(source), 1, 1));
+
+        var evaluator = new ScalingMetricEvaluator();
+
+        var metricHistory = new TreeMap<Instant, Map<JobVertexID, 
Map<ScalingMetric, Double>>>();
+
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                SOURCE_DATA_RATE,
+                                100.,
+                                LAG,
+                                0.,
+                                OUTPUT_RATIO,
+                                2.,
+                                TRUE_OUTPUT_RATE,
+                                200.,
+                                TRUE_PROCESSING_RATE,
+                                200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                SOURCE_DATA_RATE, 200.,
+                                LAG, 1000.,
+                                OUTPUT_RATIO, 2.,
+                                TRUE_OUTPUT_RATE, 200.,
+                                TRUE_PROCESSING_RATE, 200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        var conf = new Configuration();
+
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+        conf.set(AutoScalerOptions.EXCLUDE_VERTEX_IDS, List.of(sinkHexString));
+        var evaluatedMetrics =
+                evaluator.evaluate(conf, new CollectedMetrics(topology, 
metricHistory));
+        // sink should not get evaluated
+        assertEquals(evaluatedMetrics.size(), 1);

Review Comment:
   ```suggestion
           assertEquals(1, evaluatedMetrics.size());
   ```
   
   First argument is expected, second actual.



##########
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java:
##########
@@ -212,6 +213,70 @@ public void testUtilizationBoundaryComputation() {
         assertEquals(Tuple2.of(778.0, 1350.0), getThresholds(700, 0, conf));
     }
 
+    @Test
+    public void testVertexesExclusionForEvaluation() {
+        var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
+        var source = JobVertexID.fromHexString(sourceHexString);
+        var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+        var sink = JobVertexID.fromHexString(sinkHexString);
+
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source, Collections.emptySet(), 1, 1),
+                        new VertexInfo(sink, Set.of(source), 1, 1));
+
+        var evaluator = new ScalingMetricEvaluator();
+
+        var metricHistory = new TreeMap<Instant, Map<JobVertexID, 
Map<ScalingMetric, Double>>>();
+
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                SOURCE_DATA_RATE,
+                                100.,
+                                LAG,
+                                0.,
+                                OUTPUT_RATIO,
+                                2.,
+                                TRUE_OUTPUT_RATE,
+                                200.,
+                                TRUE_PROCESSING_RATE,
+                                200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                SOURCE_DATA_RATE, 200.,
+                                LAG, 1000.,
+                                OUTPUT_RATIO, 2.,
+                                TRUE_OUTPUT_RATE, 200.,
+                                TRUE_PROCESSING_RATE, 200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        var conf = new Configuration();
+
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);

Review Comment:
   Do we need to set those?



##########
flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingMetricEvaluatorTest.java:
##########
@@ -212,6 +213,70 @@ public void testUtilizationBoundaryComputation() {
         assertEquals(Tuple2.of(778.0, 1350.0), getThresholds(700, 0, conf));
     }
 
+    @Test
+    public void testVertexesExclusionForEvaluation() {
+        var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
+        var source = JobVertexID.fromHexString(sourceHexString);
+        var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+        var sink = JobVertexID.fromHexString(sinkHexString);
+
+        var topology =
+                new JobTopology(
+                        new VertexInfo(source, Collections.emptySet(), 1, 1),
+                        new VertexInfo(sink, Set.of(source), 1, 1));
+
+        var evaluator = new ScalingMetricEvaluator();
+
+        var metricHistory = new TreeMap<Instant, Map<JobVertexID, 
Map<ScalingMetric, Double>>>();
+
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                SOURCE_DATA_RATE,
+                                100.,
+                                LAG,
+                                0.,
+                                OUTPUT_RATIO,
+                                2.,
+                                TRUE_OUTPUT_RATE,
+                                200.,
+                                TRUE_PROCESSING_RATE,
+                                200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        metricHistory.put(
+                Instant.now(),
+                Map.of(
+                        source,
+                        Map.of(
+                                SOURCE_DATA_RATE, 200.,
+                                LAG, 1000.,
+                                OUTPUT_RATIO, 2.,
+                                TRUE_OUTPUT_RATE, 200.,
+                                TRUE_PROCESSING_RATE, 200.),
+                        sink,
+                        Map.of(TRUE_PROCESSING_RATE, 2000.)));
+
+        var conf = new Configuration();
+
+        conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
+        conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
+        conf.set(AutoScalerOptions.EXCLUDE_VERTEX_IDS, List.of(sinkHexString));
+        var evaluatedMetrics =
+                evaluator.evaluate(conf, new CollectedMetrics(topology, 
metricHistory));
+        // sink should not get evaluated
+        assertEquals(evaluatedMetrics.size(), 1);
+        assertEquals(
+                new EvaluatedScalingMetric(200, 150),
+                evaluatedMetrics.get(source).get(TARGET_DATA_RATE));
+        assertEquals(
+                EvaluatedScalingMetric.of(500),
+                evaluatedMetrics.get(source).get(CATCH_UP_DATA_RATE));

Review Comment:
   Is it necessary to check these or can we just check that the source vertex 
is contained in the map keys?
   
   ```suggestion
           assertNotNull(evaluatedMetrics.get(source));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to