zentol commented on a change in pull request #16951:
URL: https://github.com/apache/flink/pull/16951#discussion_r694770859
##########
File path:
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java
##########
@@ -128,6 +142,38 @@ private void startTaskManager() throws Exception {
LOG.debug("New TaskManager {} has been launched.", latestTMIndex);
}
+ @SuppressWarnings("unchecked")
+ private void waitUntilJobRestarted(JobClient jobClient, Deadline timeout)
throws Exception {
+ checkNotNull(this.metricReporter, "In-memory metric reporter should
exists");
+
+ // Get metric group of the running job
+ final Optional<JobManagerJobMetricGroup> jobMetricGroup =
+ metricReporter.findGroups("jobmanager").stream()
+ .filter(group -> group instanceof
JobManagerJobMetricGroup)
Review comment:
ideally we avoid references to the internal metrics groups; can we not
search for a pattern like `*<jobID>*numRestarts`?
##########
File path:
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java
##########
@@ -103,6 +113,9 @@ public void startUp() throws Exception {
return;
}
this.miniCluster.before();
+ this.metricReporter =
Review comment:
I'm sorry but how does this even work? Where are we configuring the mini
cluster to even use the reporter?
##########
File path:
flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/environment/MiniClusterTestEnvironment.java
##########
@@ -128,6 +142,38 @@ private void startTaskManager() throws Exception {
LOG.debug("New TaskManager {} has been launched.", latestTMIndex);
}
+ @SuppressWarnings("unchecked")
+ private void waitUntilJobRestarted(JobClient jobClient, Deadline timeout)
throws Exception {
+ checkNotNull(this.metricReporter, "In-memory metric reporter should
exists");
+
+ // Get metric group of the running job
+ final Optional<JobManagerJobMetricGroup> jobMetricGroup =
+ metricReporter.findGroups("jobmanager").stream()
+ .filter(group -> group instanceof
JobManagerJobMetricGroup)
+ .map(group -> (JobManagerJobMetricGroup) group)
+ .filter(group ->
group.jobId().equals(jobClient.getJobID()))
+ .findFirst();
+ if (!jobMetricGroup.isPresent()) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot find metric group of the running job with
ID %s",
+ jobClient.getJobID()));
+ }
+
+ // Get numRestarts gauge
+ final Map<String, Metric> metrics =
metricReporter.getMetricsByGroup(jobMetricGroup.get());
+ if (!metrics.containsKey(MetricNames.NUM_RESTARTS)) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot find metric \"%s\" in job metric group",
+ MetricNames.NUM_RESTARTS));
+ }
+ final Gauge<Long> numRestarts = (Gauge<Long>)
metrics.get(MetricNames.NUM_RESTARTS);
+
+ // Wait until the job is restarted at least once
+ CommonTestUtils.waitUntilCondition(() -> numRestarts.getValue() > 0,
timeout);
Review comment:
I'm not fond of the implementation because it depends a lot on the
assumption that this method is only called once, and that previously the job
has not restarted once for any other reason.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]