This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch relocation_fix in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 854f566e4553bd67fe33531e9e1b20c58040d0cb Author: Neha Pawar <[email protected]> AuthorDate: Mon Nov 12 15:49:30 2018 -0800 Fix relocation logic to allow relocation from any servers --- .../core/relocation/RealtimeSegmentRelocator.java | 45 ++++++++++------------ .../relocation/RealtimeSegmentRelocatorTest.java | 5 ++- 2 files changed, 25 insertions(+), 25 deletions(-) diff --git a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java index f71a314..b4d5b23 100644 --- a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java +++ b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java @@ -42,12 +42,12 @@ import org.slf4j.LoggerFactory; /** - * Manager to relocate completed segments from consuming servers to completed servers + * Manager to relocate completed segments to completed servers * Segment relocation will be done by this manager, instead of directly moving segments to completed servers on completion, * so that we don't get segment downtime when a segment is in transition * * We only relocate segments for realtime tables, and only if tenant config indicates that relocation is required - * A segment will be relocated, one replica at a time, once all of its replicas are in ONLINE state and on consuming servers + * A segment will be relocated, one replica at a time, once all of its replicas are in ONLINE state and all/some are on servers other than completed servers */ public class RealtimeSegmentRelocator extends ControllerPeriodicTask { private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeSegmentRelocator.class); @@ -107,7 +107,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask { /** * Given a realtime tag config and an ideal state, relocate the segments - * which are completed but still hanging around on consuming servers, one replica at a time + * which are completed but not yet moved to completed servers, one replica at a time * @param realtimeTagConfig * @param idealState */ @@ -115,12 +115,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask { final HelixManager helixManager = _pinotHelixResourceManager.getHelixZkManager(); - List<String> consumingServers = getInstancesWithTag(helixManager, realtimeTagConfig.getConsumingServerTag()); - if (consumingServers.isEmpty()) { - throw new IllegalStateException( - "Found no realtime consuming servers with tag " + realtimeTagConfig.getConsumingServerTag()); - } - List<String> completedServers = getInstancesWithTag(helixManager, realtimeTagConfig.getCompletedServerTag()); + final List<String> completedServers = getInstancesWithTag(helixManager, realtimeTagConfig.getCompletedServerTag()); if (completedServers.isEmpty()) { throw new IllegalStateException( "Found no realtime completed servers with tag " + realtimeTagConfig.getCompletedServerTag()); @@ -152,7 +147,7 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask { completedServersQueue.addAll(completedServerToNumSegments.entrySet()); // get new mapping for segments that need relocation - createNewIdealState(realtimeTagConfig, idealState, consumingServers, completedServersQueue); + createNewIdealState(realtimeTagConfig, idealState, completedServers, completedServersQueue); } @VisibleForTesting @@ -161,26 +156,26 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask { } /** - * Given an ideal state, list of consuming serves and completed servers, - * create a mapping for those segments that should relocate a replica from consuming to completed server + * Given an ideal state find the segments that need to relocate a replica to completed servers, + * and create a new instance state map for those segments * * @param realtimeTagConfig * @param idealState - * @param consumingServers + * @param completedServers * @param completedServersQueue * @return */ - private void createNewIdealState(RealtimeTagConfig realtimeTagConfig, IdealState idealState, - List<String> consumingServers, MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) { + private void createNewIdealState(final RealtimeTagConfig realtimeTagConfig, IdealState idealState, + final List<String> completedServers, MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) { // TODO: we are scanning the entire segments list every time. This is unnecessary because only the latest segments will need relocation // Can we do something to avoid this? // 1. Time boundary: scan only last day whereas runFrequency = hourly // 2. For each partition, scan in descending order, and stop when the first segment not needing relocation is found for (String segmentName : idealState.getPartitionSet()) { - Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName); + final Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segmentName); Map<String, String> newInstanceStateMap = - createNewInstanceStateMap(realtimeTagConfig, segmentName, instanceStateMap, consumingServers, + createNewInstanceStateMap(realtimeTagConfig, segmentName, instanceStateMap, completedServers, completedServersQueue); if (MapUtils.isNotEmpty(newInstanceStateMap)) { idealState.setInstanceStateMap(segmentName, newInstanceStateMap); @@ -189,30 +184,32 @@ public class RealtimeSegmentRelocator extends ControllerPeriodicTask { } /** - * Given the instanceStateMap and a list of consuming and completed servers for a realtime resource, - * creates a new instanceStateMap, where one replica's instance is replaced from a consuming server to a completed server + * Given the instance state map of a segment, relocate one replica to a completed server if needed + * Relocation should be done only if all replicas are ONLINE and at least one replica is not on the completed servers * * @param realtimeTagConfig * @param instanceStateMap - * @param consumingServers + * @param completedServers * @param completedServersQueue * @return */ - private Map<String, String> createNewInstanceStateMap(RealtimeTagConfig realtimeTagConfig, String segmentName, - Map<String, String> instanceStateMap, List<String> consumingServers, + private Map<String, String> createNewInstanceStateMap(final RealtimeTagConfig realtimeTagConfig, + final String segmentName, final Map<String, String> instanceStateMap, final List<String> completedServers, MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) { Map<String, String> newInstanceStateMap = null; - // proceed only if all segments are ONLINE, and at least 1 server is from consuming list + // proceed only if all segments are ONLINE for (String state : instanceStateMap.values()) { if (!state.equals(PinotHelixSegmentOnlineOfflineStateModelGenerator.ONLINE_STATE)) { return newInstanceStateMap; } } + // if an instance is found in the instance state map which is not a completed server, + // replace the instance with one from the completed servers queue for (String instance : instanceStateMap.keySet()) { - if (consumingServers.contains(instance)) { + if (!completedServers.contains(instance)) { // Decide best strategy to pick completed server. // 1. pick random from list of completed servers // 2. pick completed server with minimum segments, based on ideal state of this resource diff --git a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java index 1b0ae09..45a1212 100644 --- a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java +++ b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java @@ -103,6 +103,7 @@ public class RealtimeSegmentRelocatorTest { ZNRecordSerializer znRecordSerializer = new ZNRecordSerializer(); // no consuming instances found + // TODO: this should not be tested anymore _realtimeSegmentRelocator.setTagToInstance(serverTenantConsuming, new ArrayList<>()); _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, completedInstanceList); boolean exception = false; @@ -116,7 +117,7 @@ public class RealtimeSegmentRelocatorTest { // no completed instances found _realtimeSegmentRelocator.setTagToInstance(serverTenantConsuming, consumingInstanceList); - _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, new ArrayList<String>()); + _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, new ArrayList<>()); try { _realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, idealState); } catch (Exception e) { @@ -229,6 +230,8 @@ public class RealtimeSegmentRelocatorTest { segmentNameToExpectedNumCompletedInstances.put("segment2", 1); verifySegmentAssignment(idealState, prevIdealState, completedInstanceList, consumingInstanceList, nReplicas, segmentNameToExpectedNumCompletedInstances); + + // add test to check that segments not on consuming tag also relocate } private void verifySegmentAssignment(IdealState updatedIdealState, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
