This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fa615df Put Helix manager out of resource manager (#4506)
fa615df is described below
commit fa615df5c9cb696e01e86a2669bfeddcc6844413
Author: Jialiang Li <[email protected]>
AuthorDate: Fri Aug 9 11:09:13 2019 -0700
Put Helix manager out of resource manager (#4506)
* Put Helix manager out of resource manager
---
.../apache/pinot/controller/ControllerStarter.java | 41 +++++++++++++++++---
.../helix/core/PinotHelixResourceManager.java | 44 +++++-----------------
.../pinot/tools/perf/PerfBenchmarkDriver.java | 30 ++++++++++++++-
3 files changed, 74 insertions(+), 41 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
index 77fbbe7..310748b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java
@@ -38,8 +38,12 @@ import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.io.FileUtils;
import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.api.listeners.ControllerChangeListener;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
+import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.task.TaskDriver;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -94,7 +98,8 @@ public class ControllerStarter {
private final String _helixZkURL;
private final String _helixClusterName;
- private final String _instanceId;
+ private final String _helixControllerInstanceId;
+ private final String _helixParticipantInstanceId;
private final boolean _isUpdateStateModel;
private final boolean _enableBatchMessageMode;
private final ControllerConf.ControllerMode _controllerMode;
@@ -125,7 +130,8 @@ public class ControllerStarter {
// Helix related settings.
_helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(_config.getZkStr());
_helixClusterName = _config.getHelixClusterName();
- _instanceId = conf.getControllerHost() + "_" + conf.getControllerPort();
+ _helixControllerInstanceId = conf.getControllerHost() + "_" +
conf.getControllerPort();
+ _helixParticipantInstanceId =
CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE +
_helixControllerInstanceId;
_isUpdateStateModel = _config.isUpdateSegmentStateModel();
_enableBatchMessageMode = _config.getEnableBatchMessageMode();
@@ -221,7 +227,8 @@ public class ControllerStarter {
private void setUpHelixController() {
// Register and connect instance as Helix controller.
LOGGER.info("Starting Helix controller");
- _helixControllerManager =
HelixSetupUtils.setupHelixController(_helixClusterName, _helixZkURL,
_instanceId);
+ _helixControllerManager =
HelixSetupUtils.setupHelixController(_helixClusterName, _helixZkURL,
+ _helixControllerInstanceId);
// Emit helix controller metrics
_controllerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME,
@@ -251,9 +258,11 @@ public class ControllerStarter {
initSegmentFetcherFactory();
initPinotCrypterFactory();
+ LOGGER.info("Starting Helix manager as Helix participant");
+ HelixManager helixParticipantManager =
registerAndConnectAsHelixParticipant();
+
LOGGER.info("Starting Pinot Helix resource manager and connecting to
Zookeeper");
- _helixResourceManager.start();
- HelixManager helixParticipantManager =
_helixResourceManager.getHelixZkManager();
+ _helixResourceManager.start(helixParticipantManager);
LOGGER.info("Registering controller leadership manager");
// TODO: when Helix separation is completed, leadership only depends on
the master in leadControllerResource, remove
@@ -426,6 +435,28 @@ public class ControllerStarter {
}
}
+ /**
+ * Register and connect to Helix cluster as PARTICIPANT role.
+ */
+ private HelixManager registerAndConnectAsHelixParticipant() {
+ HelixManager helixManager = HelixManagerFactory
+ .getZKHelixManager(_helixClusterName, _helixParticipantInstanceId,
InstanceType.PARTICIPANT, _helixZkURL);
+
+ // Registers Master-Slave state model to state machine engine, which is
for calculating participant assignment in lead controller resource.
+ helixManager.getStateMachineEngine()
+ .registerStateModelFactory(MasterSlaveSMD.name, new
MasterSlaveStateModelFactory());
+
+ try {
+ helixManager.connect();
+ return helixManager;
+ } catch (Exception e) {
+ String errorMsg = String.format("Exception when connecting the instance
%s as Participant role to Helix.",
+ _helixParticipantInstanceId);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(errorMsg);
+ }
+ }
+
public ControllerConf.ControllerMode getControllerMode() {
return _controllerMode;
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 3c8eb62..cb69b93 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -127,7 +127,6 @@ public class PinotHelixResourceManager {
private final String _helixZkURL;
private final String _helixClusterName;
- private final String _instanceId;
private final String _dataDir;
private final long _externalViewOnlineToOfflineTimeoutMillis;
private final boolean _isSingleTenantCluster;
@@ -135,6 +134,7 @@ public class PinotHelixResourceManager {
private final boolean _allowHLCTables;
private HelixManager _helixZkManager;
+ private String _instanceId;
private HelixAdmin _helixAdmin;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private HelixDataAccessor _helixDataAccessor;
@@ -145,12 +145,11 @@ public class PinotHelixResourceManager {
private RebalanceSegmentStrategyFactory _rebalanceSegmentStrategyFactory;
private TableRebalancer _tableRebalancer;
- public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String
helixClusterName,
- @Nonnull String controllerInstanceId, String dataDir, long
externalViewOnlineToOfflineTimeoutMillis,
- boolean isSingleTenantCluster, boolean enableBatchMessageMode, boolean
allowHLCTables) {
+ public PinotHelixResourceManager(@Nonnull String zkURL, @Nonnull String
helixClusterName, String dataDir,
+ long externalViewOnlineToOfflineTimeoutMillis, boolean
isSingleTenantCluster, boolean enableBatchMessageMode,
+ boolean allowHLCTables) {
_helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
_helixClusterName = helixClusterName;
- _instanceId = controllerInstanceId;
_dataDir = dataDir;
_externalViewOnlineToOfflineTimeoutMillis =
externalViewOnlineToOfflineTimeoutMillis;
_isSingleTenantCluster = isSingleTenantCluster;
@@ -159,18 +158,17 @@ public class PinotHelixResourceManager {
}
public PinotHelixResourceManager(@Nonnull ControllerConf controllerConf) {
- this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
- CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE +
controllerConf.getControllerHost() + "_" + controllerConf
- .getControllerPort(), controllerConf.getDataDir(),
controllerConf.getExternalViewOnlineToOfflineTimeout(),
- controllerConf.tenantIsolationEnabled(),
controllerConf.getEnableBatchMessageMode(),
- controllerConf.getHLCTablesAllowed());
+ this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
controllerConf.getDataDir(),
+ controllerConf.getExternalViewOnlineToOfflineTimeout(),
controllerConf.tenantIsolationEnabled(),
+ controllerConf.getEnableBatchMessageMode(),
controllerConf.getHLCTablesAllowed());
}
/**
* Create Helix cluster if needed, and then start a Pinot controller
instance.
*/
- public synchronized void start() {
- _helixZkManager = registerAndConnectAsHelixParticipant();
+ public synchronized void start(HelixManager helixZkManager) {
+ _helixZkManager = helixZkManager;
+ _instanceId = _helixZkManager.getInstanceName();
_helixAdmin = _helixZkManager.getClusterManagmentTool();
_propertyStore = _helixZkManager.getHelixPropertyStore();
_helixDataAccessor = _helixZkManager.getHelixDataAccessor();
@@ -256,28 +254,6 @@ public class PinotHelixResourceManager {
}
/**
- * Register and connect to Helix cluster as PARTICIPANT role.
- */
- private HelixManager registerAndConnectAsHelixParticipant() {
- HelixManager helixManager =
- HelixManagerFactory.getZKHelixManager(_helixClusterName, _instanceId,
InstanceType.PARTICIPANT, _helixZkURL);
-
- // Registers Master-Slave state model to state machine engine, which is
for calculating participant assignment in lead controller resource.
- helixManager.getStateMachineEngine()
- .registerStateModelFactory(MasterSlaveSMD.name, new
MasterSlaveStateModelFactory());
-
- try {
- helixManager.connect();
- return helixManager;
- } catch (Exception e) {
- String errorMsg =
- String.format("Exception when connecting the instance %s as
Participant to Helix.", _instanceId);
- LOGGER.error(errorMsg, e);
- throw new RuntimeException(errorMsg);
- }
- }
-
- /**
* Add instance group tag for controller so that pinot controller can be
assigned to lead controller resource.
*/
private void addInstanceGroupTagIfNeeded() {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
index be7a679..c91067e 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/perf/PerfBenchmarkDriver.java
@@ -38,7 +38,12 @@ import javax.annotation.Nullable;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.examples.MasterSlaveStateModelFactory;
import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.tools.ClusterVerifiers.StrictMatchExternalViewVerifier;
import org.apache.pinot.broker.broker.helix.HelixBrokerStarter;
import org.apache.pinot.common.config.TableConfig;
@@ -252,7 +257,9 @@ public class PerfBenchmarkDriver {
ControllerConf controllerConf = getControllerConf();
controllerConf.setControllerPort(Integer.toString(_conf.getControllerPort() +
1));
_helixResourceManager = new PinotHelixResourceManager(controllerConf);
- _helixResourceManager.start();
+ String instanceId = controllerConf.getControllerHost() + "_" +
controllerConf.getControllerPort();
+ HelixManager helixManager =
registerAndConnectAsHelixSpectator(instanceId);
+ _helixResourceManager.start(helixManager);
}
// Create server tenants if required
@@ -265,11 +272,30 @@ public class PerfBenchmarkDriver {
// Create broker tenant if required
if (_conf.shouldStartBroker()) {
- Tenant brokerTenant = new
TenantBuilder(_brokerTenantName).setRole(TenantRole.BROKER).setTotalInstances(1).build();
+ Tenant brokerTenant =
+ new
TenantBuilder(_brokerTenantName).setRole(TenantRole.BROKER).setTotalInstances(1).build();
_helixResourceManager.createBrokerTenant(brokerTenant);
}
}
+ /**
+ * Register and connect to Helix cluster as Spectator role.
+ */
+ private HelixManager registerAndConnectAsHelixSpectator(String instanceId) {
+ HelixManager helixManager =
+ HelixManagerFactory.getZKHelixManager(_clusterName, instanceId,
InstanceType.SPECTATOR, _zkAddress);
+
+ try {
+ helixManager.connect();
+ return helixManager;
+ } catch (Exception e) {
+ String errorMsg =
+ String.format("Exception when connecting the instance %s as
Spectator role to Helix.", instanceId);
+ LOGGER.error(errorMsg, e);
+ throw new RuntimeException(errorMsg);
+ }
+ }
+
private void configureResources()
throws Exception {
if (!_conf.isConfigureResources()) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]