zentol commented on a change in pull request #16951:
URL: https://github.com/apache/flink/pull/16951#discussion_r694770859



##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java
##########
@@ -128,6 +142,38 @@ private void startTaskManager() throws Exception {
         LOG.debug("New TaskManager {} has been launched.", latestTMIndex);
     }
 
+    @SuppressWarnings("unchecked")
+    private void waitUntilJobRestarted(JobClient jobClient, Deadline timeout) 
throws Exception {
+        checkNotNull(this.metricReporter, "In-memory metric reporter should 
exists");
+
+        // Get metric group of the running job
+        final Optional<JobManagerJobMetricGroup> jobMetricGroup =
+                metricReporter.findGroups("jobmanager").stream()
+                        .filter(group -> group instanceof 
JobManagerJobMetricGroup)

Review comment:
       ideally we avoid references to the internal metrics groups; can we not 
search for a pattern like `*<jobID>*numRestarts`?

##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java
##########
@@ -103,6 +113,9 @@ public void startUp() throws Exception {
             return;
         }
         this.miniCluster.before();
+        this.metricReporter =

Review comment:
       I'm sorry but how does this even work? Where are we configuring the mini 
cluster to even use the reporter?

##########
File path: 
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java
##########
@@ -128,6 +142,38 @@ private void startTaskManager() throws Exception {
         LOG.debug("New TaskManager {} has been launched.", latestTMIndex);
     }
 
+    @SuppressWarnings("unchecked")
+    private void waitUntilJobRestarted(JobClient jobClient, Deadline timeout) 
throws Exception {
+        checkNotNull(this.metricReporter, "In-memory metric reporter should 
exists");
+
+        // Get metric group of the running job
+        final Optional<JobManagerJobMetricGroup> jobMetricGroup =
+                metricReporter.findGroups("jobmanager").stream()
+                        .filter(group -> group instanceof 
JobManagerJobMetricGroup)
+                        .map(group -> (JobManagerJobMetricGroup) group)
+                        .filter(group -> 
group.jobId().equals(jobClient.getJobID()))
+                        .findFirst();
+        if (!jobMetricGroup.isPresent()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot find metric group of the running job with 
ID %s",
+                            jobClient.getJobID()));
+        }
+
+        // Get numRestarts gauge
+        final Map<String, Metric> metrics = 
metricReporter.getMetricsByGroup(jobMetricGroup.get());
+        if (!metrics.containsKey(MetricNames.NUM_RESTARTS)) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot find metric \"%s\" in job metric group",
+                            MetricNames.NUM_RESTARTS));
+        }
+        final Gauge<Long> numRestarts = (Gauge<Long>) 
metrics.get(MetricNames.NUM_RESTARTS);
+
+        // Wait until the job is restarted at least once
+        CommonTestUtils.waitUntilCondition(() -> numRestarts.getValue() > 0, 
timeout);

Review comment:
       I'm not fond of the implementation because it depends a lot on the 
assumption that this method is only called once, and that previously the job 
has not restarted once for any other reason.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to