adoroszlai commented on code in PR #7776:
URL: https://github.com/apache/ozone/pull/7776#discussion_r1935379009
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java:
##########
@@ -232,14 +232,38 @@ private ReplicationSupervisor(StateContext context,
ExecutorService executor,
* Queue an asynchronous download of the given container.
*/
public void addTask(AbstractReplicationTask task) {
+ if (queueHasRoomFor(task)){
Review Comment:
[checkstyle](https://github.com/len548/ozone/actions/runs/13049848933/job/36407507462#step:8:16)
complains:
```
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
235: '{' is not preceded with whitespace.
```
```suggestion
if (queueHasRoomFor(task)) {
```
##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java:
##########
@@ -232,14 +232,38 @@ private ReplicationSupervisor(StateContext context,
ExecutorService executor,
* Queue an asynchronous download of the given container.
*/
public void addTask(AbstractReplicationTask task) {
+ if (queueHasRoomFor(task)){
+ initCounters(task);
+ addToQueue(task);
+ }
+ }
+
+ private boolean queueHasRoomFor(AbstractReplicationTask task) {
final int max = maxQueueSize;
if (getTotalInFlightReplications() >= max) {
LOG.warn("Ignored {} command for container {} in Replication Supervisor"
- + "as queue reached max size of {}.",
- task.getClass(), task.getContainerId(), max);
- return;
+ + "as queue reached max size of {}.",
+ task.getClass(), task.getContainerId(), max);
+ return false;
+ }
+ return true;
+ }
+
+ private void addToQueue(AbstractReplicationTask task) {
+ if (inFlight.add(task)) {
+ if (task.getPriority() != ReplicationCommandPriority.LOW) {
+ // Low priority tasks are not included in the replication queue sizes
+ // returned to SCM in the heartbeat, so we only update the count for
+ // priorities other than low.
+ taskCounter.computeIfAbsent(task.getClass(),
+ k -> new AtomicInteger()).incrementAndGet();
+ }
+ queuedCounter.get(task.getMetricName()).incrementAndGet();
+ executor.execute(new TaskRunner(task));
}
+ }
Review Comment:
Please move this after `initCounters` (as mentioned
[previously](https://github.com/apache/ozone/pull/7776#discussion_r1934228059)).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]