azagrebin commented on a change in pull request #12679:
URL: https://github.com/apache/flink/pull/12679#discussion_r440942354
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
##########
@@ -161,6 +161,26 @@ public void testReporterScheduling() throws Exception {
registry.shutdown().get();
}
+ @Test
+ public void
testReporterIntervalParsingErrorDoesNotResultInPartialApplication() throws
Exception {
Review comment:
```suggestion
public void testReporterIntervalParsingErrorFallbacksToDefaultValue()
throws Exception {
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
##########
@@ -161,6 +161,26 @@ public void testReporterScheduling() throws Exception {
registry.shutdown().get();
}
+ @Test
+ public void
testReporterIntervalParsingErrorDoesNotResultInPartialApplication() throws
Exception {
+ TestReporter3.reportCount = 0;
+
+ MetricConfig config = new MetricConfig();
+
config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1
UNICORN");
+
+ MetricRegistryImpl registry = new MetricRegistryImpl(
+
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
+
Collections.singletonList(ReporterSetup.forReporter("test", config, new
TestReporter3())));
+ try {
+ // in a prior implementation the time amount was
applied even if the time unit was invalid
+ // in this case this would imply using 1 SECOND as the
interval (seconds is the default)
+ Thread.sleep(2000);
+ Assert.assertEquals(0, TestReporter3.reportCount);
Review comment:
Although, it is not very probable but possible test instability if the
worker hangs for more than 10 sec.
In general, it would be nice to add a manual `Clock` to
`ManuallyTriggeredScheduledExecutor` and inject it into `MetricRegistryImpl`
constructor.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
##########
@@ -119,14 +121,11 @@ public MetricRegistryImpl(MetricRegistryConfiguration
config, Collection<Reporte
try {
Optional<String> configuredPeriod =
reporterSetup.getIntervalSettings();
- TimeUnit timeunit = TimeUnit.SECONDS;
- long period =
MetricOptions.REPORTER_INTERVAL.defaultValue().getSeconds();
+ Duration period =
MetricOptions.REPORTER_INTERVAL.defaultValue();
Review comment:
nit: imo it would be nice to factor out period parsing into a method and
use it in the `if (reporterInstance instanceof Scheduled)` where it is actually
needed.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
##########
@@ -161,6 +161,26 @@ public void testReporterScheduling() throws Exception {
registry.shutdown().get();
}
+ @Test
+ public void
testReporterIntervalParsingErrorDoesNotResultInPartialApplication() throws
Exception {
+ TestReporter3.reportCount = 0;
+
+ MetricConfig config = new MetricConfig();
+
config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1
UNICORN");
+
+ MetricRegistryImpl registry = new MetricRegistryImpl(
+
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
+
Collections.singletonList(ReporterSetup.forReporter("test", config, new
TestReporter3())));
+ try {
+ // in a prior implementation the time amount was
applied even if the time unit was invalid
+ // in this case this would imply using 1 SECOND as the
interval (seconds is the default)
+ Thread.sleep(2000);
Review comment:
```suggestion
Thread.sleep(waitMillis);
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
##########
@@ -161,6 +161,26 @@ public void testReporterScheduling() throws Exception {
registry.shutdown().get();
}
+ @Test
+ public void
testReporterIntervalParsingErrorDoesNotResultInPartialApplication() throws
Exception {
+ TestReporter3.reportCount = 0;
+
+ MetricConfig config = new MetricConfig();
+
config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1
UNICORN");
Review comment:
or at least explain a bit more the magic numbers in comments
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
##########
@@ -161,6 +161,26 @@ public void testReporterScheduling() throws Exception {
registry.shutdown().get();
}
+ @Test
+ public void
testReporterIntervalParsingErrorDoesNotResultInPartialApplication() throws
Exception {
+ TestReporter3.reportCount = 0;
+
+ MetricConfig config = new MetricConfig();
+
config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1
UNICORN");
Review comment:
```suggestion
long defaultIntervalMillis =
MetricOptions.REPORTER_INTERVAL.defaultValue().toMillis();
long waitMillis = defaultIntervalMillis / 5;
long customMalformedIntervalSec = defaultIntervalMillis / 10 / 1000;
config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX,
customMalformedIntervalSec + " UNICORN");
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
##########
@@ -139,10 +138,10 @@ public MetricRegistryImpl(MetricRegistryConfiguration
config, Collection<Reporte
final String className =
reporterInstance.getClass().getName();
if (reporterInstance instanceof
Scheduled) {
- LOG.info("Periodically
reporting metrics in intervals of {} {} for reporter {} of type {}.", period,
timeunit.name(), namedReporter, className);
+ LOG.info("Periodically
reporting metrics in intervals of {} for reporter {} of type {}.",
TimeUtils.formatWithHighestUnit(period), namedReporter, className);
executor.scheduleWithFixedDelay(
- new
MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period,
timeunit);
+ new
MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance),
period.toMillis(), period.toMillis(), TimeUnit.MILLISECONDS);
Review comment:
👍
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]