azagrebin commented on a change in pull request #12679:
URL: https://github.com/apache/flink/pull/12679#discussion_r440942354



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
##########
@@ -161,6 +161,26 @@ public void testReporterScheduling() throws Exception {
                registry.shutdown().get();
        }
 
+       @Test
+       public void 
testReporterIntervalParsingErrorDoesNotResultInPartialApplication() throws 
Exception {

Review comment:
       ```suggestion
        public void testReporterIntervalParsingErrorFallbacksToDefaultValue() 
throws Exception {
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
##########
@@ -161,6 +161,26 @@ public void testReporterScheduling() throws Exception {
                registry.shutdown().get();
        }
 
+       @Test
+       public void 
testReporterIntervalParsingErrorDoesNotResultInPartialApplication() throws 
Exception {
+               TestReporter3.reportCount = 0;
+
+               MetricConfig config = new MetricConfig();
+               
config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 
UNICORN");
+
+               MetricRegistryImpl registry = new MetricRegistryImpl(
+                       
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
+                       
Collections.singletonList(ReporterSetup.forReporter("test", config, new 
TestReporter3())));
+               try {
+                       // in a prior implementation the time amount was 
applied even if the time unit was invalid
+                       // in this case this would imply using 1 SECOND as the 
interval (seconds is the default)
+                       Thread.sleep(2000);
+                       Assert.assertEquals(0, TestReporter3.reportCount);

Review comment:
       Although, it is not very probable but possible test instability if the 
worker hangs for more than 10 sec.
   In general, it would be nice to add a manual `Clock` to 
`ManuallyTriggeredScheduledExecutor` and inject it into `MetricRegistryImpl` 
constructor.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
##########
@@ -119,14 +121,11 @@ public MetricRegistryImpl(MetricRegistryConfiguration 
config, Collection<Reporte
 
                                try {
                                        Optional<String> configuredPeriod = 
reporterSetup.getIntervalSettings();
-                                       TimeUnit timeunit = TimeUnit.SECONDS;
-                                       long period = 
MetricOptions.REPORTER_INTERVAL.defaultValue().getSeconds();
+                                       Duration period = 
MetricOptions.REPORTER_INTERVAL.defaultValue();

Review comment:
       nit: imo it would be nice to factor out period parsing into a method and 
use it in the `if (reporterInstance instanceof Scheduled)` where it is actually 
needed.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
##########
@@ -161,6 +161,26 @@ public void testReporterScheduling() throws Exception {
                registry.shutdown().get();
        }
 
+       @Test
+       public void 
testReporterIntervalParsingErrorDoesNotResultInPartialApplication() throws 
Exception {
+               TestReporter3.reportCount = 0;
+
+               MetricConfig config = new MetricConfig();
+               
config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 
UNICORN");
+
+               MetricRegistryImpl registry = new MetricRegistryImpl(
+                       
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
+                       
Collections.singletonList(ReporterSetup.forReporter("test", config, new 
TestReporter3())));
+               try {
+                       // in a prior implementation the time amount was 
applied even if the time unit was invalid
+                       // in this case this would imply using 1 SECOND as the 
interval (seconds is the default)
+                       Thread.sleep(2000);

Review comment:
       ```suggestion
                        Thread.sleep(waitMillis);
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
##########
@@ -161,6 +161,26 @@ public void testReporterScheduling() throws Exception {
                registry.shutdown().get();
        }
 
+       @Test
+       public void 
testReporterIntervalParsingErrorDoesNotResultInPartialApplication() throws 
Exception {
+               TestReporter3.reportCount = 0;
+
+               MetricConfig config = new MetricConfig();
+               
config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 
UNICORN");

Review comment:
       or at least explain a bit more the magic numbers in comments

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
##########
@@ -161,6 +161,26 @@ public void testReporterScheduling() throws Exception {
                registry.shutdown().get();
        }
 
+       @Test
+       public void 
testReporterIntervalParsingErrorDoesNotResultInPartialApplication() throws 
Exception {
+               TestReporter3.reportCount = 0;
+
+               MetricConfig config = new MetricConfig();
+               
config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 
UNICORN");

Review comment:
       ```suggestion
           long defaultIntervalMillis = 
MetricOptions.REPORTER_INTERVAL.defaultValue().toMillis();
           long waitMillis = defaultIntervalMillis / 5;
           long customMalformedIntervalSec =  defaultIntervalMillis / 10 / 1000;
                
config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, 
customMalformedIntervalSec + " UNICORN");
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricRegistryImpl.java
##########
@@ -139,10 +138,10 @@ public MetricRegistryImpl(MetricRegistryConfiguration 
config, Collection<Reporte
                                        final String className = 
reporterInstance.getClass().getName();
 
                                        if (reporterInstance instanceof 
Scheduled) {
-                                               LOG.info("Periodically 
reporting metrics in intervals of {} {} for reporter {} of type {}.", period, 
timeunit.name(), namedReporter, className);
+                                               LOG.info("Periodically 
reporting metrics in intervals of {} for reporter {} of type {}.", 
TimeUtils.formatWithHighestUnit(period), namedReporter, className);
 
                                                executor.scheduleWithFixedDelay(
-                                                               new 
MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), period, period, 
timeunit);
+                                                               new 
MetricRegistryImpl.ReporterTask((Scheduled) reporterInstance), 
period.toMillis(), period.toMillis(), TimeUnit.MILLISECONDS);

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to