This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2f7ddf8 Relocation manager should relocate segments from any servers
(irrespective of the tag) to completed servers (#3466)
2f7ddf8 is described below
commit 2f7ddf840492d4d28b04ddad604888e5d7449b47
Author: Neha Pawar <[email protected]>
AuthorDate: Tue Nov 13 10:40:00 2018 -0800
Relocation manager should relocate segments from any servers (irrespective
of the tag) to completed servers (#3466)
---
.../core/relocation/RealtimeSegmentRelocator.java | 50 +++++++++----------
.../relocation/RealtimeSegmentRelocatorTest.java | 58 +++++++++++-----------
2 files changed, 52 insertions(+), 56 deletions(-)
diff --git
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
index f71a314..0954f5b 100644
---
a/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
+++
b/pinot-controller/src/main/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocator.java
@@ -42,12 +42,12 @@ import org.slf4j.LoggerFactory;
/**
- * Manager to relocate completed segments from consuming servers to completed
servers
+ * Manager to relocate completed segments to completed servers
* Segment relocation will be done by this manager, instead of directly moving
segments to completed servers on completion,
* so that we don't get segment downtime when a segment is in transition
*
* We only relocate segments for realtime tables, and only if tenant config
indicates that relocation is required
- * A segment will be relocated, one replica at a time, once all of its
replicas are in ONLINE state and on consuming servers
+ * A segment will be relocated, one replica at a time, once all of its
replicas are in ONLINE state and all/some are on servers other than completed
servers
*/
public class RealtimeSegmentRelocator extends ControllerPeriodicTask {
private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeSegmentRelocator.class);
@@ -107,7 +107,7 @@ public class RealtimeSegmentRelocator extends
ControllerPeriodicTask {
/**
* Given a realtime tag config and an ideal state, relocate the segments
- * which are completed but still hanging around on consuming servers, one
replica at a time
+ * which are completed but not yet moved to completed servers, one replica
at a time
* @param realtimeTagConfig
* @param idealState
*/
@@ -115,12 +115,7 @@ public class RealtimeSegmentRelocator extends
ControllerPeriodicTask {
final HelixManager helixManager =
_pinotHelixResourceManager.getHelixZkManager();
- List<String> consumingServers = getInstancesWithTag(helixManager,
realtimeTagConfig.getConsumingServerTag());
- if (consumingServers.isEmpty()) {
- throw new IllegalStateException(
- "Found no realtime consuming servers with tag " +
realtimeTagConfig.getConsumingServerTag());
- }
- List<String> completedServers = getInstancesWithTag(helixManager,
realtimeTagConfig.getCompletedServerTag());
+ final List<String> completedServers = getInstancesWithTag(helixManager,
realtimeTagConfig.getCompletedServerTag());
if (completedServers.isEmpty()) {
throw new IllegalStateException(
"Found no realtime completed servers with tag " +
realtimeTagConfig.getCompletedServerTag());
@@ -152,7 +147,7 @@ public class RealtimeSegmentRelocator extends
ControllerPeriodicTask {
completedServersQueue.addAll(completedServerToNumSegments.entrySet());
// get new mapping for segments that need relocation
- createNewIdealState(realtimeTagConfig, idealState, consumingServers,
completedServersQueue);
+ createNewIdealState(realtimeTagConfig, idealState, completedServers,
completedServersQueue);
}
@VisibleForTesting
@@ -161,26 +156,26 @@ public class RealtimeSegmentRelocator extends
ControllerPeriodicTask {
}
/**
- * Given an ideal state, list of consuming serves and completed servers,
- * create a mapping for those segments that should relocate a replica from
consuming to completed server
+ * Given an ideal state find the segments that need to relocate a replica to
completed servers,
+ * and create a new instance state map for those segments
*
* @param realtimeTagConfig
* @param idealState
- * @param consumingServers
+ * @param completedServers
* @param completedServersQueue
* @return
*/
- private void createNewIdealState(RealtimeTagConfig realtimeTagConfig,
IdealState idealState,
- List<String> consumingServers, MinMaxPriorityQueue<Map.Entry<String,
Integer>> completedServersQueue) {
+ private void createNewIdealState(final RealtimeTagConfig realtimeTagConfig,
IdealState idealState,
+ final List<String> completedServers,
MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
// TODO: we are scanning the entire segments list every time. This is
unnecessary because only the latest segments will need relocation
// Can we do something to avoid this?
// 1. Time boundary: scan only last day whereas runFrequency = hourly
// 2. For each partition, scan in descending order, and stop when the
first segment not needing relocation is found
for (String segmentName : idealState.getPartitionSet()) {
- Map<String, String> instanceStateMap =
idealState.getInstanceStateMap(segmentName);
+ final Map<String, String> instanceStateMap =
idealState.getInstanceStateMap(segmentName);
Map<String, String> newInstanceStateMap =
- createNewInstanceStateMap(realtimeTagConfig, segmentName,
instanceStateMap, consumingServers,
+ createNewInstanceStateMap(realtimeTagConfig, segmentName,
instanceStateMap, completedServers,
completedServersQueue);
if (MapUtils.isNotEmpty(newInstanceStateMap)) {
idealState.setInstanceStateMap(segmentName, newInstanceStateMap);
@@ -189,30 +184,32 @@ public class RealtimeSegmentRelocator extends
ControllerPeriodicTask {
}
/**
- * Given the instanceStateMap and a list of consuming and completed servers
for a realtime resource,
- * creates a new instanceStateMap, where one replica's instance is replaced
from a consuming server to a completed server
+ * Given the instance state map of a segment, relocate one replica to a
completed server if needed
+ * Relocation should be done only if all replicas are ONLINE and at least
one replica is not on the completed servers
*
* @param realtimeTagConfig
* @param instanceStateMap
- * @param consumingServers
+ * @param completedServers
* @param completedServersQueue
* @return
*/
- private Map<String, String> createNewInstanceStateMap(RealtimeTagConfig
realtimeTagConfig, String segmentName,
- Map<String, String> instanceStateMap, List<String> consumingServers,
+ private Map<String, String> createNewInstanceStateMap(final
RealtimeTagConfig realtimeTagConfig,
+ final String segmentName, final Map<String, String> instanceStateMap,
final List<String> completedServers,
MinMaxPriorityQueue<Map.Entry<String, Integer>> completedServersQueue) {
Map<String, String> newInstanceStateMap = null;
- // proceed only if all segments are ONLINE, and at least 1 server is from
consuming list
+ // proceed only if all segments are ONLINE
for (String state : instanceStateMap.values()) {
if
(!state.equals(PinotHelixSegmentOnlineOfflineStateModelGenerator.ONLINE_STATE))
{
return newInstanceStateMap;
}
}
+ // if an instance is found in the instance state map which is not a
completed server,
+ // replace the instance with one from the completed servers queue
for (String instance : instanceStateMap.keySet()) {
- if (consumingServers.contains(instance)) {
+ if (!completedServers.contains(instance)) {
// Decide best strategy to pick completed server.
// 1. pick random from list of completed servers
// 2. pick completed server with minimum segments, based on ideal
state of this resource
@@ -245,9 +242,8 @@ public class RealtimeSegmentRelocator extends
ControllerPeriodicTask {
chosenServer.setValue(chosenServer.getValue() + 1);
completedServersQueue.add(chosenServer);
- LOGGER.info("Relocating segment {} from consuming server {} (tag {})
to completed server {} (tag {})",
- segmentName, instance, realtimeTagConfig.getConsumingServerTag(),
chosenServer,
- realtimeTagConfig.getCompletedServerTag());
+ LOGGER.info("Relocating segment {} from server {} to completed server
{} (tag {})", segmentName, instance,
+ chosenServer, realtimeTagConfig.getCompletedServerTag());
break;
}
}
diff --git
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
index 1b0ae09..ab79cf7 100644
---
a/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
+++
b/pinot-controller/src/test/java/com/linkedin/pinot/controller/helix/core/relocation/RealtimeSegmentRelocatorTest.java
@@ -102,21 +102,11 @@ public class RealtimeSegmentRelocatorTest {
Map<String, Integer> segmentNameToExpectedNumCompletedInstances = new
HashMap<>(1);
ZNRecordSerializer znRecordSerializer = new ZNRecordSerializer();
- // no consuming instances found
- _realtimeSegmentRelocator.setTagToInstance(serverTenantConsuming, new
ArrayList<>());
- _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted,
completedInstanceList);
boolean exception = false;
- try {
- _realtimeSegmentRelocator.relocateSegments(realtimeTagConfig,
idealState);
- } catch (Exception e) {
- exception = true;
- }
- Assert.assertTrue(exception);
- exception = false;
// no completed instances found
_realtimeSegmentRelocator.setTagToInstance(serverTenantConsuming,
consumingInstanceList);
- _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, new
ArrayList<String>());
+ _realtimeSegmentRelocator.setTagToInstance(serverTenantCompleted, new
ArrayList<>());
try {
_realtimeSegmentRelocator.relocateSegments(realtimeTagConfig,
idealState);
} catch (Exception e) {
@@ -164,8 +154,7 @@ public class RealtimeSegmentRelocatorTest {
_realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, idealState);
Assert.assertNotSame(idealState, prevIdealState);
segmentNameToExpectedNumCompletedInstances.put("segment0", 1);
- verifySegmentAssignment(idealState, prevIdealState, completedInstanceList,
consumingInstanceList,
- nReplicas, segmentNameToExpectedNumCompletedInstances);
+ verifySegmentAssignment(idealState, prevIdealState, completedInstanceList,
nReplicas, segmentNameToExpectedNumCompletedInstances);
// 2 replicas ONLINE on consuming servers, and 1 already relocated.
relocate 1 replica from consuming to completed
prevIdealState = new IdealState(
@@ -173,8 +162,7 @@ public class RealtimeSegmentRelocatorTest {
_realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, idealState);
Assert.assertNotSame(idealState, prevIdealState);
segmentNameToExpectedNumCompletedInstances.put("segment0", 2);
- verifySegmentAssignment(idealState, prevIdealState, completedInstanceList,
consumingInstanceList,
- nReplicas, segmentNameToExpectedNumCompletedInstances);
+ verifySegmentAssignment(idealState, prevIdealState, completedInstanceList,
nReplicas, segmentNameToExpectedNumCompletedInstances);
// 1 replica ONLINE on consuming, 2 already relocated. relocate the 3rd
replica.
// However, only 2 completed servers, which is less than num replicas
@@ -198,8 +186,7 @@ public class RealtimeSegmentRelocatorTest {
_realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, idealState);
Assert.assertNotSame(idealState, prevIdealState);
segmentNameToExpectedNumCompletedInstances.put("segment0", 3);
- verifySegmentAssignment(idealState, prevIdealState, completedInstanceList,
consumingInstanceList,
- nReplicas, segmentNameToExpectedNumCompletedInstances);
+ verifySegmentAssignment(idealState, prevIdealState, completedInstanceList,
nReplicas, segmentNameToExpectedNumCompletedInstances);
// new segment, all CONSUMING, no move necessary
Map<String, String> instanceStateMap1 = new HashMap<>(3);
@@ -227,29 +214,42 @@ public class RealtimeSegmentRelocatorTest {
Assert.assertNotSame(idealState, prevIdealState);
segmentNameToExpectedNumCompletedInstances.put("segment1", 1);
segmentNameToExpectedNumCompletedInstances.put("segment2", 1);
- verifySegmentAssignment(idealState, prevIdealState, completedInstanceList,
consumingInstanceList,
- nReplicas, segmentNameToExpectedNumCompletedInstances);
+ verifySegmentAssignment(idealState, prevIdealState, completedInstanceList,
nReplicas, segmentNameToExpectedNumCompletedInstances);
+
+ // a segment with instances that are not consuming tagged instances.
Relocate them as well
+ Map<String, String> instanceStateMap3 = new HashMap<>(3);
+ instanceStateMap3.put("notAConsumingServer_0", "ONLINE");
+ instanceStateMap3.put("notAConsumingServer_1", "ONLINE");
+ instanceStateMap3.put("notAConsumingServer_2", "ONLINE");
+ idealState.setInstanceStateMap("segment3", instanceStateMap3);
+ prevIdealState = new IdealState(
+ (ZNRecord)
znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
+ _realtimeSegmentRelocator.relocateSegments(realtimeTagConfig, idealState);
+ Assert.assertNotSame(idealState, prevIdealState);
+ segmentNameToExpectedNumCompletedInstances.put("segment1", 2);
+ segmentNameToExpectedNumCompletedInstances.put("segment2", 2);
+ segmentNameToExpectedNumCompletedInstances.put("segment3", 1);
+ verifySegmentAssignment(idealState, prevIdealState, completedInstanceList,
nReplicas, segmentNameToExpectedNumCompletedInstances);
}
- private void verifySegmentAssignment(IdealState updatedIdealState,
- IdealState prevIdealState, List<String> completedInstanceList,
List<String> consumingInstanceList, int nReplicas,
- Map<String, Integer> segmentNameToNumCompletedInstances) {
+ private void verifySegmentAssignment(IdealState updatedIdealState,
IdealState prevIdealState,
+ List<String> completedInstanceList, int nReplicas, Map<String, Integer>
segmentNameToNumCompletedInstances) {
Assert.assertEquals(updatedIdealState.getPartitionSet().size(),
prevIdealState.getPartitionSet().size());
Assert.assertTrue(prevIdealState.getPartitionSet().containsAll(updatedIdealState.getPartitionSet()));
for (String segmentName : updatedIdealState.getPartitionSet()) {
Map<String, String> newInstanceStateMap =
updatedIdealState.getInstanceStateMap(segmentName);
- int completed = 0;
- int consuming = 0;
+ int onCompleted = 0;
+ int notOnCompleted = 0;
for (String instance : newInstanceStateMap.keySet()) {
if (completedInstanceList.contains(instance)) {
- completed++;
- } else if (consumingInstanceList.contains(instance)) {
- consuming++;
+ onCompleted++;
+ } else {
+ notOnCompleted++;
}
}
int expectedOnCompletedServers =
segmentNameToNumCompletedInstances.get(segmentName).intValue();
- Assert.assertEquals(completed, expectedOnCompletedServers);
- Assert.assertEquals(consuming, nReplicas - expectedOnCompletedServers);
+ Assert.assertEquals(onCompleted, expectedOnCompletedServers);
+ Assert.assertEquals(notOnCompleted, nReplicas -
expectedOnCompletedServers);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]