This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fa615df  Put Helix manager out of resource manager (#4506)
fa615df is described below

commit fa615df5c9cb696e01e86a2669bfeddcc6844413
Author: Jialiang Li <[email protected]>
AuthorDate: Fri Aug 9 11:09:13 2019 -0700

    Put Helix manager out of resource manager (#4506)
    
    * Put Helix manager out of resource manager
---
 .../apache/pinot/controller/ControllerStarter.java | 41 +++++++++++++++++---
 .../helix/core/PinotHelixResourceManager.java      | 44 +++++-----------------
 .../pinot/tools/perf/PerfBenchmarkDriver.java      | 30 ++++++++++++++-
 3 files changed, 74 insertions(+), 41 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 77fbbe7..310748b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -38,8 +38,12 @@ import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
 import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.api.listeners.ControllerChangeListener;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.task.TaskDriver;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metrics.ControllerMeter;
@@ -94,7 +98,8 @@ public class ControllerStarter {
 
   private final String _helixZkURL;
   private final String _helixClusterName;
-  private final String _instanceId;
+  private final String _helixControllerInstanceId;
+  private final String _helixParticipantInstanceId;
   private final boolean _isUpdateStateModel;
   private final boolean _enableBatchMessageMode;
   private final ControllerConf.ControllerMode _controllerMode;
@@ -125,7 +130,8 @@ public class ControllerStarter {
     // Helix related settings.
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(_config.getZkStr());
     _helixClusterName = _config.getHelixClusterName();
-    _instanceId = conf.getControllerHost() + "_" + conf.getControllerPort();
+    _helixControllerInstanceId = conf.getControllerHost() + "_" + 
conf.getControllerPort();
+    _helixParticipantInstanceId = 
CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + 
_helixControllerInstanceId;
     _isUpdateStateModel = _config.isUpdateSegmentStateModel();
     _enableBatchMessageMode = _config.getEnableBatchMessageMode();
 
@@ -221,7 +227,8 @@ public class ControllerStarter {
   private void setUpHelixController() {
     // Register and connect instance as Helix controller.
     LOGGER.info("Starting Helix controller");
-    _helixControllerManager = 
HelixSetupUtils.setupHelixController(_helixClusterName, _helixZkURL, 
_instanceId);
+    _helixControllerManager = 
HelixSetupUtils.setupHelixController(_helixClusterName, _helixZkURL,
+        _helixControllerInstanceId);
 
     // Emit helix controller metrics
     
_controllerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
@@ -251,9 +258,11 @@ public class ControllerStarter {
     initSegmentFetcherFactory();
     initPinotCrypterFactory();
 
+    LOGGER.info("Starting Helix manager as Helix participant");
+    HelixManager helixParticipantManager = 
registerAndConnectAsHelixParticipant();
+
     LOGGER.info("Starting Pinot Helix resource manager and connecting to 
Zookeeper");
-    _helixResourceManager.start();
-    HelixManager helixParticipantManager = 
_helixResourceManager.getHelixZkManager();
+    _helixResourceManager.start(helixParticipantManager);
 
     LOGGER.info("Registering controller leadership manager");
     // TODO: when Helix separation is completed, leadership only depends on 
the master in leadControllerResource, remove
@@ -426,6 +435,28 @@ public class ControllerStarter {
     }
   }
 
+  /**
+   * Register and connect to Helix cluster as PARTICIPANT role.
+   */
+  private HelixManager registerAndConnectAsHelixParticipant() {
+    HelixManager helixManager = HelixManagerFactory
+        .getZKHelixManager(_helixClusterName, _helixParticipantInstanceId, 
InstanceType.PARTICIPANT, _helixZkURL);
+
+    // Registers Master-Slave state model to state machine engine, which is 
for calculating participant assignment in lead controller resource.
+    helixManager.getStateMachineEngine()
+        .registerStateModelFactory(MasterSlaveSMD.name, new 
MasterSlaveStateModelFactory());
+
+    try {
+      helixManager.connect();
+      return helixManager;
+    } catch (Exception e) {
+      String errorMsg = String.format("Exception when connecting the instance 
%s as Participant role to Helix.",
+          _helixParticipantInstanceId);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg);
+    }
+  }
+
   public ControllerConf.ControllerMode getControllerMode() {
     return _controllerMode;
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 3c8eb62..cb69b93 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -127,7 +127,6 @@ public class PinotHelixResourceManager {
 
   private final String _helixZkURL;
   private final String _helixClusterName;
-  private final String _instanceId;
   private final String _dataDir;
   private final long _externalViewOnlineToOfflineTimeoutMillis;
   private final boolean _isSingleTenantCluster;
@@ -135,6 +134,7 @@ public class PinotHelixResourceManager {
   private final boolean _allowHLCTables;
 
   private HelixManager _helixZkManager;
+  private String _instanceId;
   private HelixAdmin _helixAdmin;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private HelixDataAccessor _helixDataAccessor;
@@ -145,12 +145,11 @@ public class PinotHelixResourceManager {
   private RebalanceSegmentStrategyFactory _rebalanceSegmentStrategyFactory;
   private TableRebalancer _tableRebalancer;
 
-  public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String 
helixClusterName,
-      @Nonnull String controllerInstanceId, String dataDir, long 
externalViewOnlineToOfflineTimeoutMillis,
-      boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean 
allowHLCTables) {
+  public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String 
helixClusterName, String dataDir,
+      long externalViewOnlineToOfflineTimeoutMillis, boolean 
isSingleTenantCluster, boolean enableBatchMessageMode,
+      boolean allowHLCTables) {
     _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
     _helixClusterName = helixClusterName;
-    _instanceId = controllerInstanceId;
     _dataDir = dataDir;
     _externalViewOnlineToOfflineTimeoutMillis = 
externalViewOnlineToOfflineTimeoutMillis;
     _isSingleTenantCluster = isSingleTenantCluster;
@@ -159,18 +158,17 @@ public class PinotHelixResourceManager {
   }
 
   public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
-    this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
-        CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE + 
controllerConf.getControllerHost() + "_" + controllerConf
-            .getControllerPort(), controllerConf.getDataDir(), 
controllerConf.getExternalViewOnlineToOfflineTimeout(),
-        controllerConf.tenantIsolationEnabled(), 
controllerConf.getEnableBatchMessageMode(),
-        controllerConf.getHLCTablesAllowed());
+    this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), 
controllerConf.getDataDir(),
+        controllerConf.getExternalViewOnlineToOfflineTimeout(), 
controllerConf.tenantIsolationEnabled(),
+        controllerConf.getEnableBatchMessageMode(), 
controllerConf.getHLCTablesAllowed());
   }
 
   /**
    * Create Helix cluster if needed, and then start a Pinot controller 
instance.
    */
-  public synchronized void start() {
-    _helixZkManager = registerAndConnectAsHelixParticipant();
+  public synchronized void start(HelixManager helixZkManager) {
+    _helixZkManager = helixZkManager;
+    _instanceId = _helixZkManager.getInstanceName();
     _helixAdmin = _helixZkManager.getClusterManagmentTool();
     _propertyStore = _helixZkManager.getHelixPropertyStore();
     _helixDataAccessor = _helixZkManager.getHelixDataAccessor();
@@ -256,28 +254,6 @@ public class PinotHelixResourceManager {
   }
 
   /**
-   * Register and connect to Helix cluster as PARTICIPANT role.
-   */
-  private HelixManager registerAndConnectAsHelixParticipant() {
-    HelixManager helixManager =
-        HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId, 
InstanceType.PARTICIPANT, _helixZkURL);
-
-    // Registers Master-Slave state model to state machine engine, which is 
for calculating participant assignment in lead controller resource.
-    helixManager.getStateMachineEngine()
-        .registerStateModelFactory(MasterSlaveSMD.name, new 
MasterSlaveStateModelFactory());
-
-    try {
-      helixManager.connect();
-      return helixManager;
-    } catch (Exception e) {
-      String errorMsg =
-          String.format("Exception when connecting the instance %s as 
Participant to Helix.", _instanceId);
-      LOGGER.error(errorMsg, e);
-      throw new RuntimeException(errorMsg);
-    }
-  }
-
-  /**
    * Add instance group tag for controller so that pinot controller can be 
assigned to lead controller resource.
    */
   private void addInstanceGroupTagIfNeeded() {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
index be7a679..c91067e 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -38,7 +38,12 @@ import javax.annotation.Nullable;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
 import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
 import org.apache.pinot.common.config.TableConfig;
@@ -252,7 +257,9 @@ public class PerfBenchmarkDriver {
       ControllerConf controllerConf = getControllerConf();
       
controllerConf.setControllerPort(Integer.toString(_conf.getControllerPort() + 
1));
       _helixResourceManager = new PinotHelixResourceManager(controllerConf);
-      _helixResourceManager.start();
+      String instanceId = controllerConf.getControllerHost() + "_" + 
controllerConf.getControllerPort();
+      HelixManager helixManager = 
registerAndConnectAsHelixSpectator(instanceId);
+      _helixResourceManager.start(helixManager);
     }
 
     // Create server tenants if required
@@ -265,11 +272,30 @@ public class PerfBenchmarkDriver {
 
     // Create broker tenant if required
     if (_conf.shouldStartBroker()) {
-      Tenant brokerTenant = new 
TenantBuilder(_brokerTenantName).setRole(TenantRole.BROKER).setTotalInstances(1).build();
+      Tenant brokerTenant =
+          new 
TenantBuilder(_brokerTenantName).setRole(TenantRole.BROKER).setTotalInstances(1).build();
       _helixResourceManager.createBrokerTenant(brokerTenant);
     }
   }
 
+  /**
+   * Register and connect to Helix cluster as Spectator role.
+   */
+  private HelixManager registerAndConnectAsHelixSpectator(String instanceId) {
+    HelixManager helixManager =
+        HelixManagerFactory.getZKHelixManager(_clusterName, instanceId, 
InstanceType.SPECTATOR, _zkAddress);
+
+    try {
+      helixManager.connect();
+      return helixManager;
+    } catch (Exception e) {
+      String errorMsg =
+          String.format("Exception when connecting the instance %s as 
Spectator role to Helix.", instanceId);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg);
+    }
+  }
+
   private void configureResources()
       throws Exception {
     if (!_conf.isConfigureResources()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to