abhishekrb19 commented on code in PR #16691:
URL: https://github.com/apache/druid/pull/16691#discussion_r1701150353


##########
server/src/main/java/org/apache/druid/server/coordinator/loading/LoadingRateTracker.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.EvictingQueue;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Stopwatch;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks the current segment loading rate for a single server.
+ * <p>
+ * The loading rate is computed as a moving average of the last
+ * {@link #MOVING_AVERAGE_WINDOW_SIZE} segment batches (or more if any batch 
was
+ * smaller than {@link #MIN_ENTRY_SIZE_BYTES}). A batch is defined as a set of
+ * segments added to the load queue together.
+ * <pre>
+ *   batchDurationMillis
+ *   = t(load queue becomes empty) - t(first load request in batch is sent to 
server)
+ *
+ *   batchBytes = total bytes successfully loaded in batch
+ *
+ *   avg loading rate in batch (kbps) = (8 * batchBytes) / batchDurationMillis
+ *
+ *   overall avg loading rate (kbps)
+ *   = (8 * sumOverWindow(batchBytes)) / sumOverWindow(batchDurationMillis)
+ * </pre>
+ * <p>
+ * This class is currently not required to be thread-safe as the caller
+ * {@link HttpLoadQueuePeon} itself ensures that the write methods of this 
class
+ * are only accessed by one thread at a time.
+ */
+@NotThreadSafe
+public class LoadingRateTracker
+{
+  public static final int MOVING_AVERAGE_WINDOW_SIZE = 10;
+  public static final long MIN_ENTRY_SIZE_BYTES = 1 << 30;
+
+  private final EvictingQueue<Entry> window = 
EvictingQueue.create(MOVING_AVERAGE_WINDOW_SIZE);
+
+  /**
+   * Total stats for the whole window. This includes the total from the 
current batch as well.
+   */
+  private final AtomicReference<Entry> windowTotal = new 
AtomicReference<>(null);

Review Comment:
   nit: they're equivalent:
   ```suggestion
     private final AtomicReference<Entry> windowTotal = new AtomicReference<>();
   ```



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/LoadingRateTracker.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.EvictingQueue;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Stopwatch;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks the current segment loading rate for a single server.
+ * <p>
+ * The loading rate is computed as a moving average of the last
+ * {@link #MOVING_AVERAGE_WINDOW_SIZE} segment batches (or more if any batch 
was
+ * smaller than {@link #MIN_ENTRY_SIZE_BYTES}). A batch is defined as a set of
+ * segments added to the load queue together.
+ * <pre>
+ *   batchDurationMillis
+ *   = t(load queue becomes empty) - t(first load request in batch is sent to 
server)
+ *
+ *   batchBytes = total bytes successfully loaded in batch
+ *
+ *   avg loading rate in batch (kbps) = (8 * batchBytes) / batchDurationMillis
+ *
+ *   overall avg loading rate (kbps)
+ *   = (8 * sumOverWindow(batchBytes)) / sumOverWindow(batchDurationMillis)
+ * </pre>
+ * <p>
+ * This class is currently not required to be thread-safe as the caller
+ * {@link HttpLoadQueuePeon} itself ensures that the write methods of this 
class
+ * are only accessed by one thread at a time.
+ */
+@NotThreadSafe
+public class LoadingRateTracker
+{
+  public static final int MOVING_AVERAGE_WINDOW_SIZE = 10;
+  public static final long MIN_ENTRY_SIZE_BYTES = 1 << 30;
+
+  private final EvictingQueue<Entry> window = 
EvictingQueue.create(MOVING_AVERAGE_WINDOW_SIZE);
+
+  /**
+   * Total stats for the whole window. This includes the total from the 
current batch as well.
+   */
+  private final AtomicReference<Entry> windowTotal = new 
AtomicReference<>(null);
+
+  private Entry currentBatchTotal;
+  private Entry currentTail;
+
+  private final Stopwatch currentBatchDuration = Stopwatch.createUnstarted();
+
+  /**
+   * Marks the start of loading of a batch of segments. This should be called 
when
+   * the first request in a batch is sent to the server.
+   */
+  public void markBatchLoadingStarted()
+  {
+    if (isLoadingBatch()) {
+      // Do nothing
+      return;
+    }
+
+    currentBatchDuration.restart();
+    currentBatchTotal = new Entry();
+
+    // Add a fresh entry at the tail for this batch
+    final Entry evictedHead = addNewEntryIfTailIsFull();
+    if (evictedHead != null) {
+      final Entry delta = new Entry();
+      delta.bytes -= evictedHead.bytes;
+      delta.millisElapsed -= evictedHead.millisElapsed;
+
+      windowTotal.updateAndGet(delta::incrementBy);
+    }
+  }
+
+  public boolean isLoadingBatch()
+  {
+    return currentBatchDuration.isRunning();
+  }
+
+  /**
+   * Adds the given number of bytes to the total data successfully loaded in 
the
+   * current batch. This causes an update of the current load rate.
+   */
+  public void incrementBytesLoadedInBatch(long loadedBytes)
+  {
+    incrementBytesLoadedInBatch(loadedBytes, 
currentBatchDuration.millisElapsed());
+  }
+
+  @VisibleForTesting
+  void incrementBytesLoadedInBatch(final long bytes, final long 
batchDurationMillis)
+  {
+    if (!isLoadingBatch()) {
+      throw DruidException.defensive("markBatchLoadingStarted() must be called 
before tracking load progress.");
+    }
+
+    final Entry delta = new Entry();
+    delta.bytes = bytes;
+    delta.millisElapsed = batchDurationMillis - 
currentBatchTotal.millisElapsed;
+
+    currentTail.incrementBy(delta);
+    currentBatchTotal.incrementBy(delta);
+    windowTotal.updateAndGet(delta::incrementBy);
+  }
+
+  /**
+   * Marks the end of loading of a batch of segments. This method should be 
called
+   * when all the requests in the batch have been processed by the server.
+   */
+  public void markBatchLoadingFinished()
+  {
+    if (isLoadingBatch()) {
+      currentBatchDuration.reset();
+      currentBatchTotal = null;
+    }
+  }
+
+  public void reset()

Review Comment:
   Should we call this `stop()` to avoid any confusion with 
`markBatchLoadingFinished()`? `stop()` would also align with how it's used in 
the `LoadQueuePeon`. At the very least, a javadoc here will be helpful



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/LoadingRateTracker.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.EvictingQueue;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Stopwatch;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks the current segment loading rate for a single server.
+ * <p>
+ * The loading rate is computed as a moving average of the last
+ * {@link #MOVING_AVERAGE_WINDOW_SIZE} segment batches (or more if any batch 
was
+ * smaller than {@link #MIN_ENTRY_SIZE_BYTES}). A batch is defined as a set of
+ * segments added to the load queue together.

Review Comment:
   I think a few more things should be called out in this class level javadoc, 
specifically around the usage of functions.
   
   1. It would be helpful to document what a batch is more concretely in terms 
of this specific implementation. For example, something like: `multiple updates 
invoked between a markBatchLoadingStarted() and a markBatchLoadingFinished() 
call constitute a batch`.
   
   2. Clarify the difference between `reset()` and `markBatchLoadingStopped()` 
or when these should be used; this can just be method level javadocs if you 
will. For instance, in `HttpLoadQueuePeon`, I expected to see a `reset()` to 
reset the tracker's state once there are no more segments to be loaded, but we 
explicitly call `markBatchLoadingStopped()`. Also, what do you think about 
renaming `reset()` to `stop()`?
   
   



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/HttpLoadQueuePeon.java:
##########
@@ -173,7 +175,7 @@ private void doSegmentManagement()
       while (newRequests.size() < batchSize && 
queuedSegmentIterator.hasNext()) {

Review Comment:
   `currentTimeMillis` above can be removed as it is unused now



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/LoadingRateTracker.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.EvictingQueue;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Stopwatch;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks the current segment loading rate for a single server.
+ * <p>
+ * The loading rate is computed as a moving average of the last
+ * {@link #MOVING_AVERAGE_WINDOW_SIZE} segment batches (or more if any batch 
was
+ * smaller than {@link #MIN_ENTRY_SIZE_BYTES}). A batch is defined as a set of
+ * segments added to the load queue together.
+ * <pre>
+ *   batchDurationMillis
+ *   = t(load queue becomes empty) - t(first load request in batch is sent to 
server)
+ *
+ *   batchBytes = total bytes successfully loaded in batch
+ *
+ *   avg loading rate in batch (kbps) = (8 * batchBytes) / batchDurationMillis
+ *
+ *   overall avg loading rate (kbps)
+ *   = (8 * sumOverWindow(batchBytes)) / sumOverWindow(batchDurationMillis)
+ * </pre>
+ * <p>
+ * This class is currently not required to be thread-safe as the caller
+ * {@link HttpLoadQueuePeon} itself ensures that the write methods of this 
class
+ * are only accessed by one thread at a time.
+ */
+@NotThreadSafe
+public class LoadingRateTracker
+{
+  public static final int MOVING_AVERAGE_WINDOW_SIZE = 10;
+  public static final long MIN_ENTRY_SIZE_BYTES = 1 << 30;
+
+  private final EvictingQueue<Entry> window = 
EvictingQueue.create(MOVING_AVERAGE_WINDOW_SIZE);
+
+  /**
+   * Total stats for the whole window. This includes the total from the 
current batch as well.
+   */
+  private final AtomicReference<Entry> windowTotal = new 
AtomicReference<>(null);

Review Comment:
   Is `windowTotal` an atomic reference because it's also used by the API 
`getMovingAverageLoadRateKbps()`? As the javadoc notes, all the other state 
keeping is thread safe viz `HttpLoadQueuePeon`. 
   
   Asking because `LoadingRateTracker` is marked `@NotThreadSafe`, so it would 
be useful to add a comment for `windowTotal` to explain why there is a 
synchronization primitive to avoid confusion.
   
   



##########
server/src/main/java/org/apache/druid/server/coordinator/loading/LoadingRateTracker.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.loading;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.EvictingQueue;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.java.util.common.Stopwatch;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tracks the current segment loading rate for a single server.
+ * <p>
+ * The loading rate is computed as a moving average of the last
+ * {@link #MOVING_AVERAGE_WINDOW_SIZE} segment batches (or more if any batch 
was
+ * smaller than {@link #MIN_ENTRY_SIZE_BYTES}). A batch is defined as a set of
+ * segments added to the load queue together.
+ * <pre>
+ *   batchDurationMillis
+ *   = t(load queue becomes empty) - t(first load request in batch is sent to 
server)
+ *
+ *   batchBytes = total bytes successfully loaded in batch
+ *
+ *   avg loading rate in batch (kbps) = (8 * batchBytes) / batchDurationMillis
+ *
+ *   overall avg loading rate (kbps)
+ *   = (8 * sumOverWindow(batchBytes)) / sumOverWindow(batchDurationMillis)
+ * </pre>
+ * <p>
+ * This class is currently not required to be thread-safe as the caller
+ * {@link HttpLoadQueuePeon} itself ensures that the write methods of this 
class
+ * are only accessed by one thread at a time.
+ */
+@NotThreadSafe
+public class LoadingRateTracker
+{
+  public static final int MOVING_AVERAGE_WINDOW_SIZE = 10;
+  public static final long MIN_ENTRY_SIZE_BYTES = 1 << 30;

Review Comment:
   Add a comment that this is 1 GB? Or maybe initialize it as  `1024 * 1024 * 
1024`  which is a bit more straightforward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to