kfaraz commented on code in PR #13238:
URL: https://github.com/apache/druid/pull/13238#discussion_r998862251


##########
core/src/main/java/org/apache/druid/java/util/metrics/Monitor.java:
##########
@@ -30,6 +30,14 @@
 
   void stop();
 
+
+  /**
+   * Useful to push a last round of metrics before stopping the monitor
+   *
+   * @param emitter to use
+   */
+  void stopAfterLastRoundOfMetricsEmission(ServiceEmitter emitter);

Review Comment:
   Given how the other methods are named, you should just call this 
`monitorAndStop`.



##########
indexing-service/src/test/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitorTest.java:
##########
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.stats;
-
-import com.google.common.collect.ImmutableMap;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.segment.incremental.NoopRowIngestionMeters;
-import org.apache.druid.segment.incremental.RowIngestionMeters;
-import org.apache.druid.segment.indexing.DataSchema;
-import org.apache.druid.segment.realtime.FireDepartment;
-import org.apache.druid.segment.realtime.FireDepartmentMetrics;
-import org.easymock.EasyMock;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-public class TaskRealtimeMetricsMonitorTest
-{
-  private ServiceEmitter emitter;
-  private FireDepartment fireDepartment;
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  @Before
-  public void setup()
-  {
-    emitter = EasyMock.mock(ServiceEmitter.class);
-    fireDepartment = EasyMock.mock(FireDepartment.class);
-  }
-
-  @Test
-  public void testLastRoundMetricsEmission()

Review Comment:
   We could still use this test to verify the new method.



##########
indexing-service/src/main/java/org/apache/druid/indexing/common/stats/TaskRealtimeMetricsMonitor.java:
##########
@@ -65,27 +63,6 @@ public TaskRealtimeMetricsMonitor(
     previousRowIngestionMetersTotals = new RowIngestionMetersTotals(0, 0, 0, 
0);
   }
 
-  @Override
-  public void start()
-  {
-    super.start();
-    lastRoundMetricsToBePushed = true;
-  }
-
-  @Override
-  public boolean monitor(ServiceEmitter emitter)

Review Comment:
   For this to continue having the same behaviour, every place that was calling 
`stop` on this object must call the new method.



##########
server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java:
##########
@@ -29,6 +29,8 @@
 {
   private static final long DEFAULT_PROCESSING_COMPLETION_TIME = -1L;
 
+  private static final long INVALID_SEGMENT_HANDOFF_TIME = -1L;

Review Comment:
   Nit: for similarity with the other existing default.
   ```suggestion
     private static final long DEFAULT_SEGMENT_HANDOFF_TIME = -1L;
   ```



##########
server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java:
##########
@@ -261,7 +275,11 @@ public FireDepartmentMetrics snapshot()
     retVal.handOffCount.set(handOffCount.get());
     retVal.sinkCount.set(sinkCount.get());
     retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
+    retVal.maxSegmentHandoffTime.set(maxSegmentHandoffTime.get());
+    // Resetting to invalid value so that it's reported only once after it has 
been set.
+    maxSegmentHandoffTime.set(INVALID_SEGMENT_HANDOFF_TIME);

Review Comment:
   Please don't interleave resetting of the current values with setting the 
values of the snapshot.
   You can first prepare the snapshot, then reset the values separately (maybe 
in a separate method).
   
   I wonder if some of the other metrics shouldn't be reset either. The count 
ones should be fine as we check the difference before reporting them.



##########
core/src/main/java/org/apache/druid/java/util/metrics/ParametrizedUriEmitterMonitor.java:
##########
@@ -65,6 +65,13 @@ public void stop()
     super.stop();
   }
 
+  @Override
+  public void stopAfterLastRoundOfMetricsEmission(ServiceEmitter emitter)
+  {
+    monitors.values().forEach(monitor -> 
monitor.stopAfterLastRoundOfMetricsEmission(emitter));

Review Comment:
   Should this be preceded by a call to `updateMonitors`?
   Or is it okay as we are going down anyway?



##########
server/src/test/java/org/apache/druid/segment/realtime/RealtimeMetricsMonitorTest.java:
##########
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.segment.realtime;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.java.util.emitter.service.ServiceEmitter;
-import org.apache.druid.segment.indexing.DataSchema;
-import org.easymock.EasyMock;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-public class RealtimeMetricsMonitorTest
-{
-  private ServiceEmitter emitter;
-  private FireDepartment fireDepartment;
-
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
-  @Before
-  public void setup()
-  {
-    emitter = EasyMock.mock(ServiceEmitter.class);
-    fireDepartment = EasyMock.mock(FireDepartment.class);
-  }
-
-  @Test
-  public void testLastRoundMetricsEmission()

Review Comment:
   You can retain the test and try to verify the new method with it.



##########
server/src/main/java/org/apache/druid/segment/realtime/FireDepartmentMetrics.java:
##########
@@ -261,7 +275,11 @@ public FireDepartmentMetrics snapshot()
     retVal.handOffCount.set(handOffCount.get());
     retVal.sinkCount.set(sinkCount.get());
     retVal.messageMaxTimestamp.set(messageMaxTimestamp.get());
+    retVal.maxSegmentHandoffTime.set(maxSegmentHandoffTime.get());
+    // Resetting to invalid value so that it's reported only once after it has 
been set.
+    maxSegmentHandoffTime.set(INVALID_SEGMENT_HANDOFF_TIME);

Review Comment:
   Maybe also add a test that demonstrates why the resetting is important.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to