jackjlli commented on a change in pull request #4553: Refactor 
ControllerLeaderLocator
URL: https://github.com/apache/incubator-pinot/pull/4553#discussion_r326303691
 
 

 ##########
 File path: 
pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java
 ##########
 @@ -80,75 +87,127 @@ public static ControllerLeaderLocator getInstance() {
 
   /**
    * Locates the controller leader so that we can send LLC segment completion 
requests to it.
-   * Checks the {@link 
ControllerLeaderLocator::_cachedControllerLeaderInvalid} flag and fetches the 
leader from helix if cached value is invalid
+   * Checks the {@link ControllerLeaderLocator::_cachedControllerLeaderValid} 
flag and fetches the leaders to {@link 
ControllerLeaderLocator::_cachedControllerLeaderMap} from helix if cached value 
is invalid
    * @param rawTableName table name without type.
    * @return The host-port pair of the current controller leader.
    */
   public synchronized Pair<String, Integer> getControllerLeader(String 
rawTableName) {
-    if (!_cachedControllerLeaderInvalid) {
-      return _controllerLeaderHostPort;
+    int partitionId = LeadControllerUtils.getPartitionIdForTable(rawTableName);
+    if (_cachedControllerLeaderValid) {
+      return _cachedControllerLeaderMap.get(partitionId);
     }
 
-    Pair<String, Integer> leaderForTable = getLeaderForTable(rawTableName);
-    if (leaderForTable == null) {
-      LOGGER.warn("Failed to find a leader for Table: {}", rawTableName);
-      _cachedControllerLeaderInvalid = true;
-      return null;
-    } else {
-      _controllerLeaderHostPort = leaderForTable;
-      _cachedControllerLeaderInvalid = false;
-      LOGGER.info("Setting controller leader to be {}:{}", 
_controllerLeaderHostPort.getFirst(),
-          _controllerLeaderHostPort.getSecond());
-      return _controllerLeaderHostPort;
-    }
+    // No controller leader cached, fetches a fresh copy of external view and 
then gets the leader for the given table.
+    refreshControllerLeaderMap();
+    return _cachedControllerLeaderValid ? 
_cachedControllerLeaderMap.get(partitionId) : null;
   }
 
   /**
-   * Firstly checks whether lead controller resource has been enabled or not.
-   * If yes, use this as the leader for realtime segment completion once 
partition leader exists.
-   * Otherwise, try to use Helix leader.
-   * @param rawTableName table name without type.
-   * @return the controller leader id with hostname and port for this table, 
e.g. localhost_9000
+   * Checks whether lead controller resource has been enabled or not.
+   * If yes, updates lead controller pairs from the external view of lead 
controller resource.
+   * Otherwise, updates lead controller pairs from Helix cluster leader.
    */
-  private Pair<String, Integer> getLeaderForTable(String rawTableName) {
+  private void refreshControllerLeaderMap() {
     // Checks whether lead controller resource has been enabled or not.
     if (isLeadControllerResourceEnabled()) {
-      // Gets leader from lead controller resource.
-      return getLeaderFromLeadControllerResource(rawTableName);
+      refreshControllerLeaderMapFromLeadControllerResource();
     } else {
-      // Gets Helix leader to be the leader to this table, otherwise returns 
null.
-      return getHelixClusterLeader();
+      refreshControllerLeaderMapFromHelixClusterLeader();
     }
   }
 
   /**
-   * Checks whether lead controller resource is enabled or not. The switch is 
in resource config.
+   * Updates lead controller pairs from the external view of lead controller 
resource.
    */
-  private boolean isLeadControllerResourceEnabled() {
-    BaseDataAccessor<ZNRecord> dataAccessor = 
_helixManager.getHelixDataAccessor().getBaseDataAccessor();
-    Stat stat = new Stat();
+  private void refreshControllerLeaderMapFromLeadControllerResource() {
+    boolean refreshSucceeded = false;
     try {
-      ZNRecord znRecord = dataAccessor.get("/" + 
_helixManager.getClusterName() + "/CONFIGS/RESOURCE/"
-          + CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, stat, 
AccessOption.THROW_EXCEPTION_IFNOTEXIST);
-      return 
Boolean.parseBoolean(znRecord.getSimpleField(CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_ENABLED_KEY));
+      ExternalView leadControllerResourceExternalView = 
_helixManager.getClusterManagmentTool()
+          .getResourceExternalView(_helixManager.getClusterName(), 
Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+      if (leadControllerResourceExternalView == null) {
+        LOGGER.warn("External view of {} is null.", 
Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+        return;
+      }
+      Set<String> partitionNames = 
leadControllerResourceExternalView.getPartitionSet();
+      if (partitionNames.isEmpty()) {
+        LOGGER.warn("The partition set in the external view of {} is empty.", 
Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+        return;
+      }
+      if (partitionNames.size() != 
Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE) {
+        LOGGER.warn("The partition size of {} is not {}. Actual size: {}", 
Helix.LEAD_CONTROLLER_RESOURCE_NAME,
+            Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, 
partitionNames.size());
+        return;
+      }
+      for (String partitionName : partitionNames) {
+        int partitionId = 
LeadControllerUtils.extractPartitionId(partitionName);
+        Map<String, String> partitionStateMap = 
leadControllerResourceExternalView.getStateMap(partitionName);
+        boolean masterFound = false;
+        // Get master host from partition map. Return null if no master found.
+        for (Map.Entry<String, String> entry : partitionStateMap.entrySet()) {
+          if (MasterSlaveSMD.States.MASTER.name().equals(entry.getValue())) {
+            // Found the controller in master state.
+            // Converts participant id (with Prefix "Controller_") to 
controller id and assigns it as the leader,
+            // since realtime segment completion protocol doesn't need the 
prefix in controller instance id.
+            String participantInstanceId = entry.getKey();
+            String controllerInstanceId = 
LeadControllerUtils.extractControllerInstanceId(participantInstanceId);
+            Pair<String, Integer> leadControllerPair = 
convertToHostAndPortPair(controllerInstanceId);
+            masterFound = true;
+            _cachedControllerLeaderMap.put(partitionId, leadControllerPair);
+          }
+        }
+        if (!masterFound) {
+          // It's ok to log a warning since we can be in this state for some 
small time during the migration.
+          // Otherwise, we are attempted to mark this as an error.
+          LOGGER.warn("There is no controller in MASTER state for partition: 
{} in {}", partitionName,
+              Helix.LEAD_CONTROLLER_RESOURCE_NAME);
+          return;
+        }
+      }
+      LOGGER.info("Refreshed controller leader map successfully.");
+      refreshSucceeded = true;
     } catch (Exception e) {
-      LOGGER.warn("Could not get whether lead controller resource is enabled 
or not.", e);
-      return false;
+      LOGGER.warn("Caught exception when getting lead controller instance Id 
from external view of {}",
+          Helix.LEAD_CONTROLLER_RESOURCE_NAME, e);
+    } finally {
+      _cachedControllerLeaderValid = refreshSucceeded;
     }
   }
 
   /**
-   * Gets leader from lead controller resource. Null if there is no leader.
-   * @param rawTableName raw table name.
-   * @return pair of instance hostname and port of Helix cluster leader, e.g. 
{localhost, 9000}.
+   * Updates lead controller pairs from Helix cluster leader.
    */
-  private Pair<String, Integer> getLeaderFromLeadControllerResource(String 
rawTableName) {
-    Pair<String, Integer> leaderHostAndPortPair = 
getLeadControllerInstanceIdForTable(rawTableName);
-    if (leaderHostAndPortPair != null) {
-      return leaderHostAndPortPair;
-    } else {
-      LOGGER.warn("Could not locate leader for table: {}", rawTableName);
-      return null;
+  private void refreshControllerLeaderMapFromHelixClusterLeader() {
+    boolean refreshSucceeded = false;
+    try {
+      Pair<String, Integer> helixClusterLeader = getHelixClusterLeader();
+      if (helixClusterLeader == null) {
+        LOGGER.error("Failed to refresh the controller leader map.");
+        return;
+      }
+      for (int i = 0; i < 
Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE; i++) {
+        _cachedControllerLeaderMap.put(i, helixClusterLeader);
+      }
+      refreshSucceeded = true;
+      LOGGER.info("Refreshed controller leader map successfully.");
+    } finally {
+      _cachedControllerLeaderValid = refreshSucceeded;
+    }
+  }
+
+  /**
+   * Checks whether lead controller resource is enabled or not. The switch is 
in resource config.
+   */
+  private boolean isLeadControllerResourceEnabled() {
+    BaseDataAccessor<ZNRecord> dataAccessor = 
_helixManager.getHelixDataAccessor().getBaseDataAccessor();
+    Stat stat = new Stat();
+    try {
+      ZNRecord znRecord = dataAccessor
 
 Review comment:
   There are two ways to get the helix leader. One is presented in this PR. 
Another is to fetch live instance from `helixDataAccessor`.
   Both controller and server need to fetch helix leader. I have another PR 
which puts this logic from locator to utils. At that time, we can merge this 
together to unify the logic. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to