abhishekrb19 commented on code in PR #16691:
URL: https://github.com/apache/druid/pull/16691#discussion_r1668029686
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -542,6 +557,24 @@ private void onRequestCompleted(SegmentHolder holder,
RequestStatus status)
executeCallbacks(holder, status == RequestStatus.SUCCESS);
}
+ @GuardedBy("lock")
+ private void updateLoadProgress(
+ List<DataSegmentChangeResponse> responses,
+ long requestCompleteTimeMillis
+ )
+ {
+ final long loadSize =
+ responses.stream()
+ .filter(response -> response.getStatus().getState() ==
SegmentChangeStatus.State.SUCCESS)
+ .map(DataSegmentChangeResponse::getRequest)
+ .filter(req -> req instanceof SegmentChangeRequestLoad)
Review Comment:
Is it also worth tracking the drop rate separately?
##########
server/src/test/java/org/apache/druid/server/http/CoordinatorResourceTest.java:
##########
@@ -73,4 +74,29 @@ public void testIsLeader()
Assert.assertEquals(ImmutableMap.of("leader", false),
response2.getEntity());
Assert.assertEquals(404, response2.getStatus());
}
+
+ @Test
+ public void testGetLoadStatusSimple()
+ {
+ EasyMock.expect(mock.getLoadManagementPeons())
+ .andReturn(ImmutableMap.of("hist1", new TestLoadQueuePeon()))
+ .anyTimes();
Review Comment:
```suggestion
.once();
```
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/LoadingRateTracker.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.google.common.collect.EvictingQueue;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks the current segment loading rate for a single server.
+ * <p>
+ * The loading rate is computed as a moving average of the last
+ * {@link #MOVING_AVERAGE_WINDOW_SIZE} progress updates (or more if any of the
+ * updates was smaller than {@link #MIN_ENTRY_SIZE_BYTES}).
Review Comment:
```suggestion
* updates were smaller than {@link #MIN_ENTRY_SIZE_BYTES}).
```
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/LoadingRateTracker.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.google.common.collect.EvictingQueue;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks the current segment loading rate for a single server.
+ * <p>
+ * The loading rate is computed as a moving average of the last
+ * {@link #MOVING_AVERAGE_WINDOW_SIZE} progress updates (or more if any of the
+ * updates was smaller than {@link #MIN_ENTRY_SIZE_BYTES}).
+ */
+@ThreadSafe
+public class LoadingRateTracker
+{
+ public static final int MOVING_AVERAGE_WINDOW_SIZE = 10;
+ public static final long MIN_ENTRY_SIZE_BYTES = 1_000_000_000;
+
+ private final EvictingQueue<Entry> window =
EvictingQueue.create(MOVING_AVERAGE_WINDOW_SIZE);
+ private final AtomicReference<Entry> windowTotal = new AtomicReference<>(new
Entry());
+ private Entry currentTail;
+
+ public synchronized void updateProgress(long bytes, long millisElapsed)
+ {
+ if (bytes >= 0 && millisElapsed > 0) {
Review Comment:
Should this be `bytes > 0`?
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -542,6 +557,24 @@ private void onRequestCompleted(SegmentHolder holder,
RequestStatus status)
executeCallbacks(holder, status == RequestStatus.SUCCESS);
}
+ @GuardedBy("lock")
+ private void updateLoadProgress(
+ List<DataSegmentChangeResponse> responses,
+ long requestCompleteTimeMillis
+ )
+ {
+ final long loadSize =
+ responses.stream()
+ .filter(response -> response.getStatus().getState() ==
SegmentChangeStatus.State.SUCCESS)
+ .map(DataSegmentChangeResponse::getRequest)
Review Comment:
Minor optimization: To minimize the time spent inside the lock, we can avoid
iterating over the `responses` again by computing the `loadSize` in the caller
itself where we are already switching based on the response state.
What do you think?
##########
server/src/main/java/org/apache/druid/server/http/CoordinatorResource.java:
##########
@@ -111,14 +111,23 @@ public Response getLoadQueue(
return Response.ok(
Maps.transformValues(
coordinator.getLoadManagementPeons(),
- input -> {
- long loadSize = input.getSizeOfSegmentsToLoad();
- long dropSize =
input.getSegmentsToDrop().stream().mapToLong(DataSegment::getSize).sum();
+ peon -> {
+ long loadSize = peon.getSizeOfSegmentsToLoad();
+ long dropSize =
peon.getSegmentsToDrop().stream().mapToLong(DataSegment::getSize).sum();
+
+ // 1 kbps = 1/8 kB/s = 1/8 B/ms
+ long loadRateKbps = peon.getLoadRateKbps();
+ long expectedLoadTimeMillis
+ = loadRateKbps > 0 && loadSize > 0
+ ? (8 * loadSize) / loadRateKbps
+ : 0;
+
return new ImmutableMap.Builder<>()
- .put("segmentsToLoad", input.getSegmentsToLoad().size())
- .put("segmentsToDrop", input.getSegmentsToDrop().size())
+ .put("segmentsToLoad", peon.getSegmentsToLoad().size())
+ .put("segmentsToDrop", peon.getSegmentsToDrop().size())
.put("segmentsToLoadSize", loadSize)
.put("segmentsToDropSize", dropSize)
+ .put("expectedLoadTimeMillis", expectedLoadTimeMillis)
.build();
}
Review Comment:
Curious why some of these stats are only available in the `simple` mode. I
see with `full`, the response being returned currently is less flexible to add
new things in this code...
##########
server/src/main/java/org/apache/druid/server/coordinator/loading/LoadingRateTracker.java:
##########
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.google.common.collect.EvictingQueue;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks the current segment loading rate for a single server.
+ * <p>
+ * The loading rate is computed as a moving average of the last
+ * {@link #MOVING_AVERAGE_WINDOW_SIZE} progress updates (or more if any of the
+ * updates was smaller than {@link #MIN_ENTRY_SIZE_BYTES}).
+ */
+@ThreadSafe
+public class LoadingRateTracker
+{
+ public static final int MOVING_AVERAGE_WINDOW_SIZE = 10;
+ public static final long MIN_ENTRY_SIZE_BYTES = 1_000_000_000;
+
+ private final EvictingQueue<Entry> window =
EvictingQueue.create(MOVING_AVERAGE_WINDOW_SIZE);
+ private final AtomicReference<Entry> windowTotal = new AtomicReference<>(new
Entry());
+ private Entry currentTail;
+
+ public synchronized void updateProgress(long bytes, long millisElapsed)
+ {
+ if (bytes >= 0 && millisElapsed > 0) {
+ final Entry updatedTotal = new Entry();
+ final Entry currentTotal = windowTotal.get();
+ if (currentTotal != null) {
+ updatedTotal.increment(currentTotal.bytes, currentTotal.millisElapsed);
+ }
+
+ updatedTotal.increment(bytes, millisElapsed);
+
+ final Entry evictedHead = addToTail(bytes, millisElapsed);
+ if (evictedHead != null) {
+ updatedTotal.increment(-evictedHead.bytes, -evictedHead.millisElapsed);
+ }
+
+ if (updatedTotal.bytes > 0 && updatedTotal.millisElapsed > 0) {
Review Comment:
Redundant check that will always be true given we check for input `bytes`
and `millisElapsed` in line 46
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]