kfaraz commented on code in PR #13197:
URL: https://github.com/apache/druid/pull/13197#discussion_r991339473
##########
server/src/main/java/org/apache/druid/server/coordinator/DruidCoordinator.java:
##########
@@ -405,111 +405,6 @@ public String getCurrentLeader()
return coordLeaderSelector.getCurrentLeader();
}
- public void moveSegment(
Review Comment:
Change summary:
Moved this method to `SegmentLoader`/`SegmentStateManager`.
##########
server/src/main/java/org/apache/druid/server/coordinator/LoadQueuePeon.java:
##########
@@ -21,37 +21,48 @@
import org.apache.druid.timeline.DataSegment;
+import java.util.Map;
import java.util.Set;
/**
* This interface exists only to support configurable load queue management
via curator or http. Once HttpLoadQueuePeon
* has been verified enough in production, CuratorLoadQueuePeon and this
interface would be removed.
*/
@Deprecated
-public abstract class LoadQueuePeon
+public interface LoadQueuePeon
Review Comment:
Change summary:
Added methods to `cancelLoad`, `cancelDrop`, and `loadSegment(segment,
type)` (primary, replica or balance)
##########
server/src/main/java/org/apache/druid/server/coordinator/CostBalancerStrategy.java:
##########
@@ -184,20 +193,17 @@ public CostBalancerStrategy(ListeningExecutorService exec)
}
@Override
- public ServerHolder findNewSegmentHomeReplicator(DataSegment
proposalSegment, List<ServerHolder> serverHolders)
+ public Iterator<ServerHolder> findNewSegmentHomeReplicator(DataSegment
proposalSegment, List<ServerHolder> serverHolders)
Review Comment:
Change summary:
Find all the best servers to place replicas in one go.
##########
server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java:
##########
@@ -46,13 +46,18 @@
ServerHolder findNewSegmentHomeBalancer(DataSegment proposalSegment,
List<ServerHolder> serverHolders);
/**
- * Find the best server on which to place a {@link DataSegment} replica
according to the balancing strategy
+ * Finds the best servers on which to place a replica of the {@code
proposalSegment}
+ * according to the balancing strategy.
+ *
* @param proposalSegment segment to replicate
- * @param serverHolders servers to consider as replica holders
- * @return The server to replicate to, or null if no suitable server is found
+ * @param serverHolders servers to consider as replica holders
+ * @return Iterator over the best servers (in order) on which the replica(s)
+ * can be placed.
*/
- @Nullable
- ServerHolder findNewSegmentHomeReplicator(DataSegment proposalSegment,
List<ServerHolder> serverHolders);
+ Iterator<ServerHolder> findNewSegmentHomeReplicator(
Review Comment:
Change summary:
Find all the best servers to place replicas in one go rather than multiple
strategy computations.
##########
server/src/main/java/org/apache/druid/server/coordinator/BalancerSegmentHolder.java:
##########
@@ -19,29 +19,28 @@
package org.apache.druid.server.coordinator;
-import org.apache.druid.client.ImmutableDruidServer;
import org.apache.druid.timeline.DataSegment;
/**
*/
public class BalancerSegmentHolder
Review Comment:
Change summary:
Added `ServerHolder` to avoid having to find the corresponding peon later in
the flow.
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/BalanceSegments.java:
##########
@@ -26,85 +26,65 @@
import org.apache.druid.server.coordinator.BalancerSegmentHolder;
import org.apache.druid.server.coordinator.BalancerStrategy;
import org.apache.druid.server.coordinator.CoordinatorStats;
-import org.apache.druid.server.coordinator.DruidCoordinator;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
-import org.apache.druid.server.coordinator.LoadPeonCallback;
-import org.apache.druid.server.coordinator.LoadQueuePeon;
+import org.apache.druid.server.coordinator.SegmentLoader;
+import org.apache.druid.server.coordinator.SegmentStateManager;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.SegmentId;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.SortedSet;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
*/
public class BalanceSegments implements CoordinatorDuty
Review Comment:
Change summary:
Move state of `currentlyMovingSegments` to `SegmentStateManager`.
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/RunRules.java:
##########
@@ -46,38 +45,16 @@ public class RunRules implements CoordinatorDuty
private static final EmittingLogger log = new EmittingLogger(RunRules.class);
private static final int MAX_MISSING_RULES = 10;
- private final ReplicationThrottler replicatorThrottler;
+ private final SegmentStateManager stateManager;
- private final DruidCoordinator coordinator;
-
- public RunRules(DruidCoordinator coordinator)
Review Comment:
Change summary:
Move replication throttling logic to `SegmentStateManager`.
Move check on `maxNonPrimaryReplicantsToLoad` to `ReplicationThrottler`
(requires revisit).
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java:
##########
@@ -322,18 +310,14 @@ private void
emitStatsForHistoricalManagementDuties(DruidCluster cluster, Coordi
);
});
-
coordinator.computeNumsUnavailableUsedSegmentsPerDataSource().object2IntEntrySet().forEach(
- (final Object2IntMap.Entry<String> entry) -> {
- final String dataSource = entry.getKey();
- final int numUnavailableUsedSegmentsInDataSource =
entry.getIntValue();
- emitter.emit(
- new ServiceMetricEvent.Builder()
- .setDimension(DruidMetrics.DUTY_GROUP, groupName)
- .setDimension(DruidMetrics.DATASOURCE, dataSource).build(
- "segment/unavailable/count",
numUnavailableUsedSegmentsInDataSource
- )
- );
- }
+ coordinator.computeNumsUnavailableUsedSegmentsPerDataSource().forEach(
Review Comment:
Style change: Use simpler iterator.
##########
server/src/test/java/org/apache/druid/server/coordinator/simulate/SegmentLoadingNegativeTest.java:
##########
@@ -91,130 +88,4 @@ public void testImmediateLoadingViolatesThrottleLimit()
verifyDatasourceIsFullyLoaded(datasource);
}
- /**
- * Correct behaviour: The first replica on any tier should not be throttled.
- * <p>
- * Fix Apache #12881 to fix this test.
- */
- @Test
Review Comment:
Change summary:
The underlying behaviour for these tests is now fixed!
The corresponding positive test cases can be found in `SegmentLoadingTest`.
##########
server/src/main/java/org/apache/druid/server/coordinator/duty/EmitClusterStatsAndMetrics.java:
##########
@@ -234,36 +218,40 @@ private void
emitStatsForHistoricalManagementDuties(DruidCluster cluster, Coordi
}
);
+ // Log load queue status of all replication or broadcast targets
log.info("Load Queues:");
- for (Iterable<ServerHolder> serverHolders :
cluster.getSortedHistoricalsByTier()) {
Review Comment:
Change summary: Report queue status not just of historicals but all
replication or broadcast targets.
##########
server/src/main/java/org/apache/druid/server/coordinator/CuratorLoadQueuePeon.java:
##########
@@ -63,7 +56,7 @@
* of the same or different methods.
*/
@Deprecated
-public class CuratorLoadQueuePeon extends LoadQueuePeon
+public class CuratorLoadQueuePeon implements LoadQueuePeon
Review Comment:
Change summary:
Commoned out class `SegmentHolder` as `QueuedSegment` to represent an item
in a load queue and to be used by both `HttpLoadQueuePeon` and
`CuratorLoadQueuePeon`.
Implemented new methods in `LoadQueuePeon`.
##########
server/src/main/java/org/apache/druid/server/coordinator/ReplicationThrottler.java:
##########
@@ -36,74 +39,89 @@
{
private static final EmittingLogger log = new
EmittingLogger(ReplicationThrottler.class);
- private final Map<String, Boolean> replicatingLookup = new HashMap<>();
Review Comment:
Change summary:
Commoned out `ReplicationSegmentHolder` as `TierLoadingState` to be used for
tracking both `currentlyReplicatingSegments` and `currentlyMovingSegments` (in
`SegmentStateManager`).
##########
server/src/main/java/org/apache/druid/server/coordinator/HttpLoadQueuePeon.java:
##########
@@ -65,7 +66,7 @@
/**
*/
-public class HttpLoadQueuePeon extends LoadQueuePeon
+public class HttpLoadQueuePeon implements LoadQueuePeon
Review Comment:
Change summary:
Used `QueuedSegment` instead of `SegmentHolder` and implemented new
`LoadQueuePeon` methods for cancellation and loading with type.
##########
server/src/main/java/org/apache/druid/server/coordinator/ServerHolder.java:
##########
@@ -20,31 +20,61 @@
package org.apache.druid.server.coordinator;
import org.apache.druid.client.ImmutableDruidServer;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
/**
+ *
*/
public class ServerHolder implements Comparable<ServerHolder>
Review Comment:
Change summary:
Maintain a map containing
- segments that were in load or drop queue when this run started
- segments that were queued for load or drop during this run
This helps the `SegmentLoader` determine the current state of a segment on a
server and make load/drop decisions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]