This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1c81dfb Take OFFLINE segment into account for real-time rebalancer
(#4337)
1c81dfb is described below
commit 1c81dfbfdfb8f5afd37993025c0caaec765383f0
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Jul 30 13:01:29 2019 -0700
Take OFFLINE segment into account for real-time rebalancer (#4337)
When the consuming segment encounters error, the ideal state can be turned
into OFFLINE
If the instance states for a segment is all OFFLINE, the segment is counted
OFFLINE and won't be rebalanced
RealtimeSegmentValidationManager will periodically detect the OFFLINE
segments and re-assign them
---
...ealtimeBalanceNumSegmentAssignmentStrategy.java | 14 ++++++++++----
...ltimeReplicaGroupSegmentAssignmentStrategy.java | 14 ++++++++++----
.../assignment/segment/SegmentAssignmentUtils.java | 22 +++++++++++++++++-----
...imeBalanceNumSegmentAssignmentStrategyTest.java | 9 +++++++++
...eReplicaGroupSegmentAssignmentStrategyTest.java | 9 +++++++++
5 files changed, 55 insertions(+), 13 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
index 72bdf29..3319ad7 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategy.java
@@ -27,7 +27,6 @@ import org.apache.pinot.common.config.TableConfig;
import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.RealtimeSegmentOnlineOfflineStateModel;
import org.apache.pinot.common.utils.InstancePartitionsType;
import org.apache.pinot.common.utils.LLCSegmentName;
-import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair;
import
org.apache.pinot.controller.helix.core.rebalance.RebalanceUserConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,10 +94,12 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy
implements SegmentAssig
@Override
public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
Configuration config) {
- CompletedConsumingSegmentAssignmentPair pair = new
CompletedConsumingSegmentAssignmentPair(currentAssignment);
+ SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment
completedConsumingOfflineSegmentAssignment =
+ new
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
// Rebalance COMPLETED segments first
- Map<String, Map<String, String>> completedSegmentAssignment =
pair.getCompletedSegmentAssignment();
+ Map<String, Map<String, String>> completedSegmentAssignment =
+
completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
List<String> instancesForCompletedSegments = SegmentAssignmentUtils
.getInstancesForBalanceNumStrategy(_helixManager, _tableConfig,
_replication, InstancePartitionsType.COMPLETED);
Map<String, Map<String, String>> newAssignment = SegmentAssignmentUtils
@@ -106,7 +107,8 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy
implements SegmentAssig
_replication);
// Rebalance CONSUMING segments if needed
- Map<String, Map<String, String>> consumingSegmentAssignment =
pair.getConsumingSegmentAssignment();
+ Map<String, Map<String, String>> consumingSegmentAssignment =
+
completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
List<String> instancesForConsumingSegments = SegmentAssignmentUtils
@@ -132,6 +134,10 @@ public class RealtimeBalanceNumSegmentAssignmentStrategy
implements SegmentAssig
newAssignment.putAll(consumingSegmentAssignment);
}
+ // Keep the OFFLINE segments not moved, and
RealtimeSegmentValidationManager will periodically detect the OFFLINE
+ // segments and re-assign them
+
newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
+
return newAssignment;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
index afd74be..bd5126e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategy.java
@@ -90,11 +90,12 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy
implements SegmentAss
@Override
public Map<String, Map<String, String>> rebalanceTable(Map<String,
Map<String, String>> currentAssignment,
Configuration config) {
- SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair pair =
- new
SegmentAssignmentUtils.CompletedConsumingSegmentAssignmentPair(currentAssignment);
+ SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment
completedConsumingOfflineSegmentAssignment =
+ new
SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(currentAssignment);
// Rebalance COMPLETED segments first
- Map<String, Map<String, String>> completedSegmentAssignment =
pair.getCompletedSegmentAssignment();
+ Map<String, Map<String, String>> completedSegmentAssignment =
+
completedConsumingOfflineSegmentAssignment.getCompletedSegmentAssignment();
InstancePartitions instancePartitionsForCompletedSegments =
InstancePartitionsUtils
.fetchOrComputeInstancePartitions(_helixManager, _tableConfig,
InstancePartitionsType.COMPLETED);
Map<Integer, Set<String>> partitionIdToSegmentsMap = new HashMap<>();
@@ -107,7 +108,8 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy
implements SegmentAss
partitionIdToSegmentsMap);
// Rebalance CONSUMING segments if needed
- Map<String, Map<String, String>> consumingSegmentAssignment =
pair.getConsumingSegmentAssignment();
+ Map<String, Map<String, String>> consumingSegmentAssignment =
+
completedConsumingOfflineSegmentAssignment.getConsumingSegmentAssignment();
if (config.getBoolean(RebalanceUserConfigConstants.INCLUDE_CONSUMING,
RebalanceUserConfigConstants.DEFAULT_INCLUDE_CONSUMING)) {
InstancePartitions instancePartitionsForConsumingSegments =
InstancePartitionsUtils
@@ -134,6 +136,10 @@ public class RealtimeReplicaGroupSegmentAssignmentStrategy
implements SegmentAss
newAssignment.putAll(consumingSegmentAssignment);
}
+ // Keep the OFFLINE segments not moved, and
RealtimeSegmentValidationManager will periodically detect the OFFLINE
+ // segments and re-assign them
+
newAssignment.putAll(completedConsumingOfflineSegmentAssignment.getOfflineSegmentAssignment());
+
return newAssignment;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
index 4333f91..0f3fe6f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java
@@ -248,19 +248,27 @@ class SegmentAssignmentUtils {
}
/**
- * Class that splits segment assignment into CONSUMING segments and
COMPLETED segments.
+ * Class that splits segment assignment into COMPLETED, CONSUMING and
OFFLINE segments.
*/
- static class CompletedConsumingSegmentAssignmentPair {
+ static class CompletedConsumingOfflineSegmentAssignment {
private final Map<String, Map<String, String>> _completedSegmentAssignment
= new TreeMap<>();
private final Map<String, Map<String, String>> _consumingSegmentAssignment
= new TreeMap<>();
+ private final Map<String, Map<String, String>> _offlineSegmentAssignment =
new TreeMap<>();
- CompletedConsumingSegmentAssignmentPair(Map<String, Map<String, String>>
segmentAssignment) {
+ // NOTE: split the segments based on the following criteria:
+ // 1. At least one instance ONLINE -> COMPLETED segment
+ // 2. At least one instance CONSUMING -> CONSUMING segment
+ // 3. All instances OFFLINE (all instances encountered error while
consuming) -> OFFLINE segment
+ CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String,
String>> segmentAssignment) {
for (Map.Entry<String, Map<String, String>> entry :
segmentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
if
(instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.ONLINE))
{
- _completedSegmentAssignment.put(entry.getKey(), instanceStateMap);
+ _completedSegmentAssignment.put(segmentName, instanceStateMap);
+ } else if
(instanceStateMap.values().contains(RealtimeSegmentOnlineOfflineStateModel.CONSUMING))
{
+ _consumingSegmentAssignment.put(segmentName, instanceStateMap);
} else {
- _consumingSegmentAssignment.put(entry.getKey(), instanceStateMap);
+ _offlineSegmentAssignment.put(segmentName, instanceStateMap);
}
}
}
@@ -272,5 +280,9 @@ class SegmentAssignmentUtils {
Map<String, Map<String, String>> getConsumingSegmentAssignment() {
return _consumingSegmentAssignment;
}
+
+ Map<String, Map<String, String>> getOfflineSegmentAssignment() {
+ return _offlineSegmentAssignment;
+ }
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
index 87ef462..0aa454c 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeBalanceNumSegmentAssignmentStrategyTest.java
@@ -188,6 +188,15 @@ public class
RealtimeBalanceNumSegmentAssignmentStrategyTest {
BaseConfiguration config = new BaseConfiguration();
config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
assertEquals(_strategy.rebalanceTable(currentAssignment, config),
newAssignment);
+
+ // Rebalance should not change the assignment for the OFFLINE segments
+ String offlineSegmentName = "offlineSegment";
+ Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
+
.getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_",
NUM_REPLICAS),
+ RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+ newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+ assertEquals(_strategy.rebalanceTable(currentAssignment, config),
newAssignment);
}
private void addToAssignment(Map<String, Map<String, String>>
currentAssignment, int segmentId,
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
index c4c9a82..6d83cc6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentStrategyTest.java
@@ -210,6 +210,15 @@ public class
RealtimeReplicaGroupSegmentAssignmentStrategyTest {
BaseConfiguration config = new BaseConfiguration();
config.setProperty(RebalanceUserConfigConstants.INCLUDE_CONSUMING, true);
assertEquals(_strategy.rebalanceTable(currentAssignment, config),
newAssignment);
+
+ // Rebalance should not change the assignment for the OFFLINE segments
+ String offlineSegmentName = "offlineSegment";
+ Map<String, String> offlineSegmentInstanceStateMap = SegmentAssignmentUtils
+
.getInstanceStateMap(SegmentAssignmentTestUtils.getNameList("badInstance_",
NUM_REPLICAS),
+ RealtimeSegmentOnlineOfflineStateModel.OFFLINE);
+ currentAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+ newAssignment.put(offlineSegmentName, offlineSegmentInstanceStateMap);
+ assertEquals(_strategy.rebalanceTable(currentAssignment, config),
newAssignment);
}
private void addToAssignment(Map<String, Map<String, String>>
currentAssignment, int segmentId,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]