somandal commented on code in PR #16791:
URL: https://github.com/apache/pinot/pull/16791#discussion_r2356715451
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -156,66 +185,85 @@ public synchronized void processClusterChange(ChangeType
changeType) {
}
private void processSegmentAssignmentChange() {
- LOGGER.info("Processing segment assignment change");
- long startTimeMs = System.currentTimeMillis();
-
- int numTables = _routingEntryMap.size();
- if (numTables == 0) {
- LOGGER.info("No table exists in the routing, skipping processing segment
assignment change");
- return;
- }
-
- List<RoutingEntry> routingEntries = new ArrayList<>(numTables);
- List<String> idealStatePaths = new ArrayList<>(numTables);
- List<String> externalViewPaths = new ArrayList<>(numTables);
- for (Map.Entry<String, RoutingEntry> entry : _routingEntryMap.entrySet()) {
- routingEntries.add(entry.getValue());
- idealStatePaths.add(entry.getValue()._idealStatePath);
- externalViewPaths.add(entry.getValue()._externalViewPath);
- }
- Stat[] idealStateStats = _zkDataAccessor.getStats(idealStatePaths,
AccessOption.PERSISTENT);
- Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths,
AccessOption.PERSISTENT);
- long fetchStatsEndTimeMs = System.currentTimeMillis();
-
- List<String> tablesToUpdate = new ArrayList<>();
- for (int i = 0; i < numTables; i++) {
- Stat idealStateStat = idealStateStats[i];
- Stat externalViewStat = externalViewStats[i];
- if (idealStateStat != null && externalViewStat != null) {
- RoutingEntry routingEntry = routingEntries.get(i);
- if (idealStateStat.getVersion() !=
routingEntry.getLastUpdateIdealStateVersion()
- || externalViewStat.getVersion() !=
routingEntry.getLastUpdateExternalViewVersion()) {
- String tableNameWithType = routingEntry.getTableNameWithType();
- tablesToUpdate.add(tableNameWithType);
- try {
- IdealState idealState =
getIdealState(routingEntry._idealStatePath);
- if (idealState == null) {
- LOGGER.warn("Failed to find ideal state for table: {}, skipping
updating routing entry",
- tableNameWithType);
- continue;
- }
- ExternalView externalView =
getExternalView(routingEntry._externalViewPath);
- if (externalView == null) {
- LOGGER.warn("Failed to find external view for table: {},
skipping updating routing entry",
- tableNameWithType);
- continue;
+ _globalLock.readLock().lock();
+ try {
+ LOGGER.info("Processing segment assignment change");
+ long startTimeMs = System.currentTimeMillis();
+
+ Map<String, RoutingEntry> routingEntrySnapshot = new
HashMap<>(_routingEntryMap);
+
+ int numTables = routingEntrySnapshot.size();
+ if (numTables == 0) {
+ LOGGER.info("No table exists in the routing, skipping processing
segment assignment change");
+ return;
+ }
+
+ List<RoutingEntry> routingEntries = new ArrayList<>(numTables);
+ List<String> idealStatePaths = new ArrayList<>(numTables);
+ List<String> externalViewPaths = new ArrayList<>(numTables);
+ for (Map.Entry<String, RoutingEntry> entry :
routingEntrySnapshot.entrySet()) {
+ routingEntries.add(entry.getValue());
+ idealStatePaths.add(entry.getValue()._idealStatePath);
+ externalViewPaths.add(entry.getValue()._externalViewPath);
+ }
+ Stat[] idealStateStats = _zkDataAccessor.getStats(idealStatePaths,
AccessOption.PERSISTENT);
+ Stat[] externalViewStats = _zkDataAccessor.getStats(externalViewPaths,
AccessOption.PERSISTENT);
+ long fetchStatsEndTimeMs = System.currentTimeMillis();
+
+ List<String> tablesToUpdate = new ArrayList<>();
+ for (int i = 0; i < numTables; i++) {
+ Stat idealStateStat = idealStateStats[i];
+ Stat externalViewStat = externalViewStats[i];
+ if (idealStateStat != null && externalViewStat != null) {
+ RoutingEntry cachedRoutingEntry = routingEntries.get(i);
+ // The routingEntry may have been removed from the _routingEntryMap
by the time we get here in case
+ // one of the other functions such as 'removeRouting' was called
since taking the snapshot. Check for
+ // existence before proceeding. Also note that if new entries were
added since the snapshot was taken, we
Review Comment:
discussed offline that we can perhaps do an additional check in buildRouting
to see if the IS / EV changed in case the `processSegmentAssignmentChange()`
code ran at the same time (basically by comparing timestamps and comparing
EV/IS versions)
I put the above in a do - while loop, so even if
`processSegmentAssignmentChange()` runs many times, we should keep trying to
update the `buildRouting` entry in that case. let me know what you think
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -109,12 +116,25 @@ public class BrokerRoutingManager implements
RoutingManager, ClusterChangeHandle
private final BrokerMetrics _brokerMetrics;
private final Map<String, RoutingEntry> _routingEntryMap = new
ConcurrentHashMap<>();
private final Map<String, ServerInstance> _enabledServerInstanceMap = new
ConcurrentHashMap<>();
- // NOTE: _excludedServers doesn't need to be concurrent because it is only
accessed within the synchronized block
- private final Set<String> _excludedServers = new HashSet<>();
+ // Thread-safe set because it can be read/modified concurrently from
instance/server change paths
+ private final Set<String> _excludedServers = ConcurrentHashMap.newKeySet();
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]