This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch instance_pojo in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 0a9d18c8dac2a5a6e8b0e26034a109da438c20c6 Author: Jackie (Xiaotian) Jiang <[email protected]> AuthorDate: Mon Aug 12 13:20:56 2019 -0700 [Instance Assignment] Enhance Instance class to include the pool config - Enhance Instance class to include the pool config - Allow multiple tags in the Instance - Support pools and multiple tags for instance restlet API - Make the behavior of joining the cluster via instance restlet API and auto-join consistent - HELIX_HOST will have the instance type prefix per Helix behavior --- .../apache/pinot/common/utils/CommonConstants.java | 29 ++-- .../pinot/controller/api/pojos/Instance.java | 160 +++++++++++---------- .../resources/PinotInstanceRestletResource.java | 6 +- .../helix/core/PinotHelixResourceManager.java | 68 ++++----- .../core/minion/PinotHelixTaskResourceManager.java | 2 +- .../helix/core/util/HelixSetupUtils.java | 2 +- .../api/PinotInstanceRestletResourceTest.java | 136 +++++++++--------- .../api/PinotTenantRestletResourceTest.java | 78 +++------- .../helix/core/PinotHelixResourceManagerTest.java | 3 +- ...umSegmentAssignmentStrategyIntegrationTest.java | 22 ++- .../pinot/integration/tests/ClusterTest.java | 12 +- .../org/apache/pinot/minion/MinionStarter.java | 12 +- 12 files changed, 243 insertions(+), 287 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 386621b..d55b72b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -31,13 +31,10 @@ public class CommonConstants { public static final String INSTANCE_CONNECTED_METRIC_NAME = "helix.connected"; - public static final String PREFIX_OF_SERVER_INSTANCE = "Server_"; - public static final String PREFIX_OF_BROKER_INSTANCE = "Broker_"; public static final String PREFIX_OF_CONTROLLER_INSTANCE = "Controller_"; - - public static final String SERVER_INSTANCE_TYPE = "server"; - public static final String BROKER_INSTANCE_TYPE = "broker"; - public static final String CONTROLLER_INSTANCE_TYPE = "controller"; + public static final String PREFIX_OF_BROKER_INSTANCE = "Broker_"; + public static final String PREFIX_OF_SERVER_INSTANCE = "Server_"; + public static final String PREFIX_OF_MINION_INSTANCE = "Minion_"; public static final String BROKER_RESOURCE_INSTANCE = "brokerResource"; public static final String LEAD_CONTROLLER_RESOURCE_NAME = "leadControllerResource"; @@ -49,8 +46,11 @@ public class CommonConstants { public static final int MIN_ACTIVE_REPLICAS = 0; public static final int REBALANCE_DELAY_MS = 300_000; // 5 minutes. - public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged"; + // Instance tags + public static final String CONTROLLER_INSTANCE = "controller"; public static final String UNTAGGED_BROKER_INSTANCE = "broker_untagged"; + public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged"; + public static final String UNTAGGED_MINION_INSTANCE = "minion_untagged"; public static class StateModel { public static class SegmentOnlineOfflineStateModel { @@ -92,6 +92,10 @@ public class CommonConstants { public static final String ADMIN_PORT_KEY = "adminPort"; } + public enum InstanceType { + CONTROLLER, BROKER, SERVER, MINION + } + public enum TableType { OFFLINE, REALTIME; @@ -298,9 +302,6 @@ public class CommonConstants { } public static class Minion { - public static final String INSTANCE_PREFIX = "Minion_"; - public static final String INSTANCE_TYPE = "minion"; - public static final String UNTAGGED_INSTANCE = "minion_untagged"; public static final String CONFIG_OF_METRICS_PREFIX = "pinot.minion."; public static final String METADATA_EVENT_OBSERVER_PREFIX = "metadata.event.notifier"; @@ -319,14 +320,6 @@ public class CommonConstants { public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "crypter"; } - public static class Metric { - public static class Server { - public static final String CURRENT_NUMBER_OF_SEGMENTS = "currentNumberOfSegments"; - public static final String CURRENT_NUMBER_OF_DOCUMENTS = "currentNumberOfDocuments"; - public static final String NUMBER_OF_DELETED_SEGMENTS = "numberOfDeletedSegments"; - } - } - public static class Segment { public static class Realtime { public enum Status { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/pojos/Instance.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/pojos/Instance.java index 7a8ac23..29017ef 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/pojos/Instance.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/pojos/Instance.java @@ -19,119 +19,127 @@ package org.apache.pinot.controller.api.pojos; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import javax.annotation.Nullable; import org.apache.helix.model.InstanceConfig; -import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.CommonConstants.Helix; +import org.apache.pinot.common.utils.CommonConstants.Helix.InstanceType; +import org.apache.pinot.common.utils.JsonUtils; /** * Instance POJO, used as part of the API to create instances. + * <pre> + * Example: + * { + * "host": "hostname.example.com", + * "port": 1234, + * "type": "SERVER", + * "tags": ["example_OFFLINE"], + * "pools": { + * "example_OFFLINE": 0 + * } + * } + * </pre> */ -//@Example("{\n" + "\t\"host\": \"hostname.example.com\",\n" + "\t\"port\": \"1234\",\n" + "\t\"type\": \"server\"\n" + "}") +@JsonIgnoreProperties(ignoreUnknown = true) public class Instance { public static final String POOL_KEY = "pool"; private final String _host; - private final String _port; - private final String _type; - private final String _tag; - private final String _instancePrefix; - - public static Instance fromInstanceConfig(InstanceConfig instanceConfig) { - InstanceConfig ic = instanceConfig; - String instanceName = ic.getInstanceName(); - String type; - if (instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) { - type = CommonConstants.Helix.SERVER_INSTANCE_TYPE; - } else if (instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) { - type = CommonConstants.Helix.BROKER_INSTANCE_TYPE; - } else { - throw new RuntimeException("Unknown instance type for: " + instanceName); - } - - Instance instance = - new Instance(ic.getHostName(), ic.getPort(), type, org.apache.commons.lang.StringUtils.join(ic.getTags(), ',')); - return instance; - } + private final int _port; + private final InstanceType _type; + private final List<String> _tags; + private final Map<String, Integer> _pools; @JsonCreator public Instance(@JsonProperty(value = "host", required = true) String host, - @JsonProperty(value = "port", required = true) String port, - @JsonProperty(value = "type", required = true) String type, - @JsonProperty(value = "tag", required = false) String tag) { + @JsonProperty(value = "port", required = true) int port, + @JsonProperty(value = "type", required = true) InstanceType type, + @JsonProperty(value = "tags") @Nullable List<String> tags, + @JsonProperty(value = "pools") @Nullable Map<String, Integer> pools) { _host = host; _port = port; - _tag = tag; - - if (CommonConstants.Helix.SERVER_INSTANCE_TYPE.equalsIgnoreCase(type)) { - _instancePrefix = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE; - _type = CommonConstants.Helix.SERVER_INSTANCE_TYPE; - } else if (CommonConstants.Helix.BROKER_INSTANCE_TYPE.equalsIgnoreCase(type)) { - _instancePrefix = CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE; - _type = CommonConstants.Helix.BROKER_INSTANCE_TYPE; - } else if (CommonConstants.Minion.INSTANCE_TYPE.equalsIgnoreCase(type)) { - _instancePrefix = CommonConstants.Minion.INSTANCE_PREFIX; - _type = CommonConstants.Minion.INSTANCE_TYPE; - } else { - throw new IllegalArgumentException("Invalid instance type " + type + ", expected either server or broker"); - } + _type = type; + _tags = tags; + _pools = pools; } + @JsonProperty public String getHost() { return _host; } - public String getPort() { + @JsonProperty + public int getPort() { return _port; } - public String getTag() { - return _tag; + @JsonProperty + public InstanceType getType() { + return _type; } - public String getType() { - return _type; + @JsonProperty + public List<String> getTags() { + return _tags; } - public String toInstanceId() { - return _instancePrefix + _host + "_" + _port; + @JsonProperty + public Map<String, Integer> getPools() { + return _pools; } - @Override - public String toString() { - final StringBuilder bld = new StringBuilder(); - bld.append("host : " + _host + "\n"); - bld.append("port : " + _port + "\n"); - bld.append("type : " + _type + "\n"); - if (_tag != null) { - bld.append("tag : " + _tag + "\n"); + @JsonIgnore + public String getInstanceId() { + String prefix; + switch (_type) { + case CONTROLLER: + prefix = Helix.PREFIX_OF_CONTROLLER_INSTANCE; + break; + case BROKER: + prefix = Helix.PREFIX_OF_BROKER_INSTANCE; + break; + case SERVER: + prefix = Helix.PREFIX_OF_SERVER_INSTANCE; + break; + case MINION: + prefix = Helix.PREFIX_OF_MINION_INSTANCE; + break; + default: + throw new IllegalStateException(); } - return bld.toString(); + return prefix + _host + "_" + _port; } public InstanceConfig toInstanceConfig() { - final InstanceConfig iConfig = new InstanceConfig(toInstanceId()); - iConfig.setHostName(_host); - iConfig.setPort(_port); - iConfig.setInstanceEnabled(true); - iConfig.addTag(getTagOrDefaultTag()); - return iConfig; + InstanceConfig instanceConfig = InstanceConfig.toInstanceConfig(getInstanceId()); + if (_tags != null) { + for (String tag : _tags) { + instanceConfig.addTag(tag); + } + } + if (_pools != null && !_pools.isEmpty()) { + Map<String, String> mapValue = new TreeMap<>(); + for (Map.Entry<String, Integer> entry : _pools.entrySet()) { + mapValue.put(entry.getKey(), entry.getValue().toString()); + } + instanceConfig.getRecord().setMapField(POOL_KEY, mapValue); + } + return instanceConfig; } - private String getTagOrDefaultTag() { - if (_tag != null) { - return _tag; - } else { - switch (_type) { - case CommonConstants.Helix.SERVER_INSTANCE_TYPE: - return CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE; - case CommonConstants.Helix.BROKER_INSTANCE_TYPE: - return CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE; - case CommonConstants.Minion.INSTANCE_TYPE: - return CommonConstants.Minion.UNTAGGED_INSTANCE; - default: - throw new RuntimeException("Unknown instance type " + _type + ", was expecting either server or broker"); - } + public String toJsonString() { + try { + return JsonUtils.objectToString(this); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java index 6ff9f16..309c55e 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java @@ -85,8 +85,7 @@ public class PinotInstanceRestletResource { @ApiOperation(value = "Get instance information", produces = MediaType.APPLICATION_JSON) @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 404, message = "Instance not found"), @ApiResponse(code = 500, message = "Internal error")}) public String getInstance( - @ApiParam(value = "Instance name", required = true, example = "Server_a.b.com_20000 | Broker_my.broker.com_30000") - @PathParam("instanceName") String instanceName) { + @ApiParam(value = "Instance name", required = true, example = "Server_a.b.com_20000 | Broker_my.broker.com_30000") @PathParam("instanceName") String instanceName) { InstanceConfig instanceConfig = pinotHelixResourceManager.getHelixInstanceConfig(instanceName); if (instanceConfig == null) { throw new ControllerApplicationException(LOGGER, "Instance " + instanceName + " not found", @@ -98,6 +97,7 @@ public class PinotInstanceRestletResource { response.put("enabled", instanceConfig.getInstanceEnabled()); response.put("port", instanceConfig.getPort()); response.set("tags", JsonUtils.objectToJsonNode(instanceConfig.getTags())); + response.set("pools", JsonUtils.objectToJsonNode(instanceConfig.getRecord().getMapField(Instance.POOL_KEY))); return response.toString(); } @@ -108,7 +108,7 @@ public class PinotInstanceRestletResource { @ApiOperation(value = "Create a new instance", consumes = MediaType.APPLICATION_JSON, notes = "Creates a new instance with given instance config") @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 409, message = "Instance already exists"), @ApiResponse(code = 500, message = "Internal error")}) public SuccessResponse addInstance(Instance instance) { - LOGGER.info("Instance creation request received for instance " + instance.toInstanceId()); + LOGGER.info("Instance creation request received for instance: {}", instance.getInstanceId()); if (!pinotHelixResourceManager.addInstance(instance).isSuccessful()) { throw new ControllerApplicationException(LOGGER, "Instance already exists", Response.Status.CONFLICT); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index cb69b93..4a4adf1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -48,13 +48,11 @@ import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.PropertyPathBuilder; import org.apache.helix.ZNRecord; -import org.apache.helix.examples.MasterSlaveStateModelFactory; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor; import org.apache.helix.model.CurrentState; @@ -62,7 +60,6 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; -import org.apache.helix.model.MasterSlaveSMD; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.config.IndexingConfig; import org.apache.pinot.common.config.OfflineTagConfig; @@ -89,7 +86,7 @@ import org.apache.pinot.common.partition.ReplicaGroupPartitionAssignment; import org.apache.pinot.common.partition.ReplicaGroupPartitionAssignmentGenerator; import org.apache.pinot.common.restlet.resources.RebalanceResult; import org.apache.pinot.common.segment.SegmentMetadata; -import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.CommonConstants.Helix; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerOnlineOfflineStateModel; import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel; import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; @@ -258,10 +255,9 @@ public class PinotHelixResourceManager { */ private void addInstanceGroupTagIfNeeded() { InstanceConfig instanceConfig = getHelixInstanceConfig(_instanceId); - if (!instanceConfig.containsTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE)) { - LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.", _instanceId, - CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE); - instanceConfig.addTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE); + if (!instanceConfig.containsTag(Helix.CONTROLLER_INSTANCE)) { + LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.", _instanceId, Helix.CONTROLLER_INSTANCE); + instanceConfig.addTag(Helix.CONTROLLER_INSTANCE); HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor(); accessor.setProperty(accessor.keyBuilder().instanceConfig(_instanceId), instanceConfig); } @@ -360,7 +356,7 @@ public class PinotHelixResourceManager { @Nonnull public synchronized PinotResourceManagerResponse addInstance(@Nonnull Instance instance) { List<String> instances = getAllInstances(); - String instanceIdToAdd = instance.toInstanceId(); + String instanceIdToAdd = instance.getInstanceId(); if (instances.contains(instanceIdToAdd)) { return PinotResourceManagerResponse.failure("Instance " + instanceIdToAdd + " already exists"); } else { @@ -589,7 +585,7 @@ public class PinotHelixResourceManager { } for (int i = 0; i < numberOfInstancesToAdd; ++i) { String instanceName = unTaggedInstanceList.get(i); - retagInstance(instanceName, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE, brokerTenantTag); + retagInstance(instanceName, Helix.UNTAGGED_BROKER_INSTANCE, brokerTenantTag); // Update idealState by adding new instance to table mapping. addInstanceToBrokerIdealState(brokerTenantTag, instanceName); } @@ -625,7 +621,7 @@ public class PinotHelixResourceManager { // Update ideal state with the new broker instances try { - HelixHelper.updateIdealState(getHelixZkManager(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState -> { + HelixHelper.updateIdealState(getHelixZkManager(), Helix.BROKER_RESOURCE_INSTANCE, idealState -> { assert idealState != null; Map<String, String> instanceStateMap = idealState.getInstanceStateMap(tableNameWithType); if (instanceStateMap != null) { @@ -646,8 +642,7 @@ public class PinotHelixResourceManager { } private void addInstanceToBrokerIdealState(String brokerTenantTag, String instanceName) { - IdealState tableIdealState = - _helixAdmin.getResourceIdealState(_helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + IdealState tableIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, Helix.BROKER_RESOURCE_INSTANCE); for (String tableNameWithType : tableIdealState.getPartitionSet()) { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); Preconditions.checkNotNull(tableConfig); @@ -656,15 +651,14 @@ public class PinotHelixResourceManager { tableIdealState.setPartitionState(tableNameWithType, instanceName, BrokerOnlineOfflineStateModel.ONLINE); } } - _helixAdmin - .setResourceIdealState(_helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, tableIdealState); + _helixAdmin.setResourceIdealState(_helixClusterName, Helix.BROKER_RESOURCE_INSTANCE, tableIdealState); } private PinotResourceManagerResponse scaleDownBroker(Tenant tenant, String brokerTenantTag, List<String> instancesInClusterWithTag) { int numberBrokersToUntag = instancesInClusterWithTag.size() - tenant.getNumberOfInstances(); for (int i = 0; i < numberBrokersToUntag; ++i) { - retagInstance(instancesInClusterWithTag.get(i), brokerTenantTag, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE); + retagInstance(instancesInClusterWithTag.get(i), brokerTenantTag, Helix.UNTAGGED_BROKER_INSTANCE); } return PinotResourceManagerResponse.SUCCESS; } @@ -727,11 +721,11 @@ public class PinotHelixResourceManager { int incOffline = serverTenant.getOfflineInstances() - taggedOfflineServers.size(); int incRealtime = serverTenant.getRealtimeInstances() - taggedRealtimeServers.size(); for (int i = 0; i < incOffline; ++i) { - retagInstance(unTaggedInstanceList.get(i), CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag); + retagInstance(unTaggedInstanceList.get(i), Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag); } for (int i = incOffline; i < incOffline + incRealtime; ++i) { String instanceName = unTaggedInstanceList.get(i); - retagInstance(instanceName, CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag); + retagInstance(instanceName, Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag); // TODO: update idealStates & instanceZkMetadata } return PinotResourceManagerResponse.SUCCESS; @@ -746,14 +740,14 @@ public class PinotHelixResourceManager { taggedOfflineServers.removeAll(taggedRealtimeServers); for (int i = 0; i < incOffline; ++i) { if (i < incInstances) { - retagInstance(unTaggedInstanceList.get(i), CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag); + retagInstance(unTaggedInstanceList.get(i), Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag); } else { _helixAdmin.addInstanceTag(_helixClusterName, taggedRealtimeServers.get(i - incInstances), offlineServerTag); } } for (int i = incOffline; i < incOffline + incRealtime; ++i) { if (i < incInstances) { - retagInstance(unTaggedInstanceList.get(i), CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag); + retagInstance(unTaggedInstanceList.get(i), Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag); // TODO: update idealStates & instanceZkMetadata } else { _helixAdmin.addInstanceTag(_helixClusterName, taggedOfflineServers.get(i - Math.max(incInstances, incOffline)), @@ -777,7 +771,7 @@ public class PinotHelixResourceManager { public boolean isBrokerTenantDeletable(String tenantName) { String brokerTag = TagNameUtils.getBrokerTagForTenant(tenantName); Set<String> taggedInstances = new HashSet<>(HelixHelper.getInstancesWithTag(_helixZkManager, brokerTag)); - String brokerName = CommonConstants.Helix.BROKER_RESOURCE_INSTANCE; + String brokerName = Helix.BROKER_RESOURCE_INSTANCE; IdealState brokerIdealState = _helixAdmin.getResourceIdealState(_helixClusterName, brokerName); for (String partition : brokerIdealState.getPartitionSet()) { for (String instance : brokerIdealState.getInstanceSet(partition)) { @@ -866,12 +860,12 @@ public class PinotHelixResourceManager { List<String> unTaggedInstanceList) { String offlineServerTag = TagNameUtils.getOfflineTagForTenant(serverTenant.getTenantName()); for (int i = 0; i < serverTenant.getOfflineInstances(); i++) { - retagInstance(unTaggedInstanceList.get(i), CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag); + retagInstance(unTaggedInstanceList.get(i), Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag); } String realtimeServerTag = TagNameUtils.getRealtimeTagForTenant(serverTenant.getTenantName()); for (int i = 0; i < serverTenant.getRealtimeInstances(); i++) { - retagInstance(unTaggedInstanceList.get(i + serverTenant.getOfflineInstances()), - CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag); + retagInstance(unTaggedInstanceList.get(i + serverTenant.getOfflineInstances()), Helix.UNTAGGED_SERVER_INSTANCE, + realtimeServerTag); } } @@ -880,11 +874,11 @@ public class PinotHelixResourceManager { int cnt = 0; String offlineServerTag = TagNameUtils.getOfflineTagForTenant(serverTenant.getTenantName()); for (int i = 0; i < serverTenant.getOfflineInstances(); i++) { - retagInstance(unTaggedInstanceList.get(cnt++), CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag); + retagInstance(unTaggedInstanceList.get(cnt++), Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag); } String realtimeServerTag = TagNameUtils.getRealtimeTagForTenant(serverTenant.getTenantName()); for (int i = 0; i < serverTenant.getRealtimeInstances(); i++) { - retagInstance(unTaggedInstanceList.get(cnt++), CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag); + retagInstance(unTaggedInstanceList.get(cnt++), Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag); if (cnt == numberOfInstances) { cnt = 0; } @@ -903,7 +897,7 @@ public class PinotHelixResourceManager { } String brokerTag = TagNameUtils.getBrokerTagForTenant(brokerTenant.getTenantName()); for (int i = 0; i < brokerTenant.getNumberOfInstances(); ++i) { - retagInstance(unTaggedInstanceList.get(i), CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE, brokerTag); + retagInstance(unTaggedInstanceList.get(i), Helix.UNTAGGED_BROKER_INSTANCE, brokerTag); } return PinotResourceManagerResponse.SUCCESS; } @@ -914,7 +908,7 @@ public class PinotHelixResourceManager { for (String instanceName : instancesInClusterWithTag) { _helixAdmin.removeInstanceTag(_helixClusterName, instanceName, offlineTenantTag); if (getTagsForInstance(instanceName).isEmpty()) { - _helixAdmin.addInstanceTag(_helixClusterName, instanceName, CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE); + _helixAdmin.addInstanceTag(_helixClusterName, instanceName, Helix.UNTAGGED_SERVER_INSTANCE); } } return PinotResourceManagerResponse.SUCCESS; @@ -926,7 +920,7 @@ public class PinotHelixResourceManager { for (String instanceName : instancesInClusterWithTag) { _helixAdmin.removeInstanceTag(_helixClusterName, instanceName, realtimeTenantTag); if (getTagsForInstance(instanceName).isEmpty()) { - _helixAdmin.addInstanceTag(_helixClusterName, instanceName, CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE); + _helixAdmin.addInstanceTag(_helixClusterName, instanceName, Helix.UNTAGGED_SERVER_INSTANCE); } } return PinotResourceManagerResponse.SUCCESS; @@ -936,7 +930,7 @@ public class PinotHelixResourceManager { String brokerTag = TagNameUtils.getBrokerTagForTenant(tenantName); List<String> instancesInClusterWithTag = HelixHelper.getInstancesWithTag(_helixZkManager, brokerTag); for (String instance : instancesInClusterWithTag) { - retagInstance(instance, brokerTag, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE); + retagInstance(instance, brokerTag, Helix.UNTAGGED_BROKER_INSTANCE); } return PinotResourceManagerResponse.SUCCESS; } @@ -1361,8 +1355,8 @@ public class PinotHelixResourceManager { private void handleBrokerResource(@Nonnull final String tableName, @Nonnull final List<String> brokersForTenant) { LOGGER.info("Updating BrokerResource IdealState for table: {}", tableName); - HelixHelper.updateIdealState(_helixZkManager, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, - new Function<IdealState, IdealState>() { + HelixHelper + .updateIdealState(_helixZkManager, Helix.BROKER_RESOURCE_INSTANCE, new Function<IdealState, IdealState>() { @Override public IdealState apply(@Nullable IdealState idealState) { Preconditions.checkNotNull(idealState); @@ -1592,7 +1586,7 @@ public class PinotHelixResourceManager { recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); recipientCriteria.setInstanceName("%"); recipientCriteria.setSessionSpecific(true); - recipientCriteria.setResource(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); + recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE); recipientCriteria.setDataSource(Criteria.DataSource.EXTERNALVIEW); // The brokerResource field in the EXTERNALVIEW stores the offline table name in the Partition subfield. recipientCriteria.setPartition(tableNameWithType); @@ -2240,8 +2234,7 @@ public class PinotHelixResourceManager { */ public List<String> getOnlineUnTaggedBrokerInstanceList() { - final List<String> instanceList = - HelixHelper.getInstancesWithTag(_helixZkManager, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE); + final List<String> instanceList = HelixHelper.getInstancesWithTag(_helixZkManager, Helix.UNTAGGED_BROKER_INSTANCE); final List<String> liveInstances = _helixDataAccessor.getChildNames(_keyBuilder.liveInstances()); instanceList.retainAll(liveInstances); return instanceList; @@ -2252,8 +2245,7 @@ public class PinotHelixResourceManager { * @return List of untagged online server instances. */ public List<String> getOnlineUnTaggedServerInstanceList() { - final List<String> instanceList = - HelixHelper.getInstancesWithTag(_helixZkManager, CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE); + final List<String> instanceList = HelixHelper.getInstancesWithTag(_helixZkManager, Helix.UNTAGGED_SERVER_INSTANCE); final List<String> liveInstances = _helixDataAccessor.getChildNames(_keyBuilder.liveInstances()); instanceList.retainAll(liveInstances); return instanceList; @@ -2285,7 +2277,7 @@ public class PinotHelixResourceManager { ZNRecord record = helixInstanceConfig.getRecord(); String[] hostnameSplit = helixInstanceConfig.getHostName().split("_"); Preconditions.checkState(hostnameSplit.length >= 2); - String adminPort = record.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY); + String adminPort = record.getSimpleField(Helix.Instance.ADMIN_PORT_KEY); // If admin port is missing, there's no point to calculate the remaining table size. // Thus, throwing an exception will be good here. if (Strings.isNullOrEmpty(adminPort)) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java index e5b4090..47d6bd9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java @@ -175,7 +175,7 @@ public class PinotHelixTaskResourceManager { @Nonnull public synchronized String submitTask(@Nonnull List<PinotTaskConfig> pinotTaskConfigs, int numConcurrentTasksPerInstance) { - return submitTask(pinotTaskConfigs, CommonConstants.Minion.UNTAGGED_INSTANCE, numConcurrentTasksPerInstance); + return submitTask(pinotTaskConfigs, CommonConstants.Helix.UNTAGGED_MINION_INSTANCE, numConcurrentTasksPerInstance); } /** diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java index a501767..37dec22 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java @@ -170,7 +170,7 @@ public class HelixSetupUtils { idealStateBuilder.enableDelayRebalance(); // Set instance group tag IdealState idealState = idealStateBuilder.build(); - idealState.setInstanceGroupTag(CONTROLLER_INSTANCE_TYPE); + idealState.setInstanceGroupTag(CONTROLLER_INSTANCE); // Set batch message mode idealState.setBatchMessageMode(enableBatchMessageMode); // Explicitly disable this resource when creating this new resource. diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java index cb06fc4..3146693 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java @@ -19,12 +19,17 @@ package org.apache.pinot.controller.api; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; -import org.apache.pinot.common.utils.CommonConstants; +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; +import org.apache.pinot.common.utils.CommonConstants.Helix; +import org.apache.pinot.common.utils.CommonConstants.Helix.InstanceType; import org.apache.pinot.common.utils.JsonUtils; +import org.apache.pinot.controller.api.pojos.Instance; import org.apache.pinot.controller.helix.ControllerTest; -import org.apache.pinot.util.TestUtils; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -37,6 +42,7 @@ import static org.testng.Assert.fail; * Tests for the instances Restlet. */ public class PinotInstanceRestletResourceTest extends ControllerTest { + @BeforeClass public void setUp() { startZk(); @@ -46,109 +52,99 @@ public class PinotInstanceRestletResourceTest extends ControllerTest { @Test public void testInstanceListingAndCreation() throws Exception { - // Check that there is only one instance, which is the controller instance. - JsonNode instanceList = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())); - assertEquals(instanceList.get("instances").size(), 1, "Expected only one instance at beginning of test"); + // Check that there is only one CONTROLLER instance in the cluster + String listInstancesUrl = _controllerRequestURLBuilder.forInstanceList(); + JsonNode instanceList = JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl)); + assertEquals(instanceList.get("instances").size(), 1); + assertTrue(instanceList.get("instances").get(0).asText().startsWith(Helix.PREFIX_OF_CONTROLLER_INSTANCE)); // Create untagged broker and server instances - ObjectNode brokerInstance = - (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\", \"type\":\"broker\", \"port\":\"1234\"}"); - sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), brokerInstance.toString()); - - ObjectNode serverInstance = - (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\", \"type\":\"server\", \"port\":\"2345\"}"); - sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), serverInstance.toString()); - - // Check that there are three instances - TestUtils.waitForCondition(aVoid -> { - try { - // Check that there are two instances - return - JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances") - .size() == 3; - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 10_000L, "Expected three instances after creation of tagged instances"); - - // Create tagged broker and server instances - brokerInstance.put("tag", "someTag"); - brokerInstance.put("host", "2.3.4.5"); - sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), brokerInstance.toString()); - - serverInstance.put("tag", "someTag"); - serverInstance.put("host", "2.3.4.5"); - sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), serverInstance.toString()); - - // It may take some time for cache data accessor to update its data. - TestUtils.waitForCondition(aVoid -> { - try { - // Check that there are five instances - return - JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances") - .size() == 5; - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 10_000L, "Expected five instances after creation of tagged instances"); - - // Create duplicate broker and server instances (both calls should fail) + String createInstanceUrl = _controllerRequestURLBuilder.forInstanceCreate(); + Instance brokerInstance = new Instance("1.2.3.4", 1234, InstanceType.BROKER, null, null); + sendPostRequest(createInstanceUrl, brokerInstance.toJsonString()); + + Instance serverInstance = new Instance("1.2.3.4", 2345, InstanceType.SERVER, null, null); + sendPostRequest(createInstanceUrl, serverInstance.toJsonString()); + + // Check that there are 3 instances + assertEquals(JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl)).get("instances").size(), 3); + + // Create broker and server instances with tags and pools + brokerInstance = new Instance("2.3.4.5", 1234, InstanceType.BROKER, Collections.singletonList("tag_BROKER"), null); + sendPostRequest(createInstanceUrl, brokerInstance.toJsonString()); + + Map<String, Integer> serverPools = new TreeMap<>(); + serverPools.put("tag_OFFLINE", 0); + serverPools.put("tag_REALTIME", 1); + serverInstance = + new Instance("2.3.4.5", 2345, InstanceType.SERVER, Arrays.asList("tag_OFFLINE", "tag_REALTIME"), serverPools); + sendPostRequest(createInstanceUrl, serverInstance.toJsonString()); + + // Check that there are 5 instances + assertEquals(JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl)).get("instances").size(), 5); + + // Create duplicate broker and server instances should fail try { - sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), brokerInstance.toString()); + sendPostRequest(createInstanceUrl, brokerInstance.toJsonString()); fail("Duplicate broker instance creation did not fail"); } catch (IOException e) { // Expected } try { - sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), serverInstance.toString()); + sendPostRequest(createInstanceUrl, serverInstance.toJsonString()); fail("Duplicate server instance creation did not fail"); } catch (IOException e) { // Expected } - // Check that there are five instances - JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())); - assertEquals( - JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances") - .size(), 5, "Expected five instances after creation of duplicate instances"); + // Check that there are still 5 instances + assertEquals(JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl)).get("instances").size(), 5); // Check that the instances are properly created JsonNode instance = JsonUtils .stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceInformation("Broker_1.2.3.4_1234"))); assertEquals(instance.get("instanceName").asText(), "Broker_1.2.3.4_1234"); - assertEquals(instance.get("hostName").asText(), "1.2.3.4"); + assertEquals(instance.get("hostName").asText(), "Broker_1.2.3.4"); assertEquals(instance.get("port").asText(), "1234"); assertTrue(instance.get("enabled").asBoolean()); - assertEquals(instance.get("tags").size(), 1); - assertEquals(instance.get("tags").get(0).asText(), CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE); + assertEquals(instance.get("tags").size(), 0); + assertTrue(instance.get("pools").isNull()); instance = JsonUtils .stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceInformation("Server_1.2.3.4_2345"))); assertEquals(instance.get("instanceName").asText(), "Server_1.2.3.4_2345"); - assertEquals(instance.get("hostName").asText(), "1.2.3.4"); + assertEquals(instance.get("hostName").asText(), "Server_1.2.3.4"); assertEquals(instance.get("port").asText(), "2345"); assertTrue(instance.get("enabled").asBoolean()); - assertEquals(instance.get("tags").size(), 1); - assertEquals(instance.get("tags").get(0).asText(), CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE); + assertEquals(instance.get("tags").size(), 0); + assertTrue(instance.get("pools").isNull()); instance = JsonUtils .stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceInformation("Broker_2.3.4.5_1234"))); assertEquals(instance.get("instanceName").asText(), "Broker_2.3.4.5_1234"); - assertEquals(instance.get("hostName").asText(), "2.3.4.5"); + assertEquals(instance.get("hostName").asText(), "Broker_2.3.4.5"); assertEquals(instance.get("port").asText(), "1234"); assertTrue(instance.get("enabled").asBoolean()); - assertEquals(instance.get("tags").size(), 1); - assertEquals(instance.get("tags").get(0).asText(), "someTag"); + JsonNode tags = instance.get("tags"); + assertEquals(tags.size(), 1); + assertEquals(tags.get(0).asText(), "tag_BROKER"); + assertTrue(instance.get("pools").isNull()); instance = JsonUtils .stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceInformation("Server_2.3.4.5_2345"))); assertEquals(instance.get("instanceName").asText(), "Server_2.3.4.5_2345"); - assertEquals(instance.get("hostName").asText(), "2.3.4.5"); + assertEquals(instance.get("hostName").asText(), "Server_2.3.4.5"); assertEquals(instance.get("port").asText(), "2345"); assertTrue(instance.get("enabled").asBoolean()); - assertEquals(instance.get("tags").size(), 1); - assertEquals(instance.get("tags").get(0).asText(), "someTag"); + tags = instance.get("tags"); + assertEquals(tags.size(), 2); + assertEquals(tags.get(0).asText(), "tag_OFFLINE"); + assertEquals(tags.get(1).asText(), "tag_REALTIME"); + JsonNode pools = instance.get("pools"); + assertEquals(pools.size(), 2); + assertEquals(pools.get("tag_OFFLINE").asText(), "0"); + assertEquals(pools.get("tag_REALTIME").asText(), "1"); // Check that an error is given for an instance that does not exist try { @@ -158,4 +154,10 @@ public class PinotInstanceRestletResourceTest extends ControllerTest { // Expected } } + + @AfterClass + public void tearDown() { + stopController(); + stopZk(); + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java index fa38d8a..f5d61d6 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java @@ -19,9 +19,9 @@ package org.apache.pinot.controller.api; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.pinot.common.config.TableConfig; -import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.config.TagNameUtils; +import org.apache.pinot.common.utils.CommonConstants.Helix.TableType; import org.apache.pinot.common.utils.JsonUtils; import org.apache.pinot.controller.helix.ControllerTest; import org.testng.annotations.AfterClass; @@ -31,76 +31,40 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; -/** - * Test for table to tenant mapping. Real working test is inside offline cluster integration test because it requires - * segment upload. - */ public class PinotTenantRestletResourceTest extends ControllerTest { - private final TableConfig.Builder _offlineBuilder = new TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE); - private static final int NUM_BROKER_INSTANCES = 2; - private static final int NUM_SERVER_INSTANCES = 6; + private static final int NUM_BROKER_INSTANCES = 1; + private static final int NUM_SERVER_INSTANCES = 3; @BeforeClass - public void setUp() { + public void setUp() + throws Exception { startZk(); startController(); + addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true); + addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true); } @Test public void testTableListForTenant() throws Exception { - // Create untagged broker and server instances - ObjectNode brokerInstance = - (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\", \"type\":\"broker\", \"port\":\"1234\"}"); - sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), brokerInstance.toString()); - - ObjectNode serverInstance = - (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\", \"type\":\"server\", \"port\":\"2345\"}"); - sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), serverInstance.toString()); - - // Create tagged broker and server instances - brokerInstance.put("tag", "someTag"); - brokerInstance.put("host", "2.3.4.5"); - sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), brokerInstance.toString()); - - serverInstance.put("tag", "server_REALTIME"); - serverInstance.put("host", "2.3.4.5"); - sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), serverInstance.toString()); - // Check that no tables on tenant works - JsonNode tableList = - JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTablesFromTenant("server_REALTIME"))); - assertEquals(tableList.get("tables").size(), 0, "Expected no tables"); - - // Try to make sure both kinds of tags work - tableList = JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTablesFromTenant("server"))); - assertEquals(tableList.get("tables").size(), 0, "Expected no tables"); - - // Add a table to the server - String createTableUrl = _controllerRequestURLBuilder.forTableCreate(); - - addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true); - addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true); - - _offlineBuilder.setTableName("testOfflineTable").setTimeColumnName("timeColumn").setTimeType("DAYS") - .setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setServerTenant("DefaultTenant"); - - TableConfig offlineTableConfig = _offlineBuilder.build(); - offlineTableConfig.setTableName("mytable_OFFLINE"); - String offlineTableJSONConfigString = offlineTableConfig.toJsonConfigString(); - sendPostRequest(createTableUrl, offlineTableJSONConfigString); - - // Try to make sure both kinds of tags work - tableList = - JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTablesFromTenant("DefaultTenant"))); - assertEquals(tableList.get("tables").size(), 1, "Expected 1 table"); - assertEquals(tableList.get("tables").get(0).asText(), "mytable_OFFLINE"); - - stopFakeInstances(); + String listTablesUrl = _controllerRequestURLBuilder.forTablesFromTenant(TagNameUtils.DEFAULT_TENANT_NAME); + JsonNode tableList = JsonUtils.stringToJsonNode(sendGetRequest(listTablesUrl)); + assertEquals(tableList.get("tables").size(), 0); + + // Add a table + sendPostRequest(_controllerRequestURLBuilder.forTableCreate(), + new TableConfig.Builder(TableType.OFFLINE).setTableName("testTable").build().toJsonConfigString()); + + // There should be 1 table on the tenant + tableList = JsonUtils.stringToJsonNode(sendGetRequest(listTablesUrl)); + assertEquals(tableList.get("tables").size(), 1); + assertEquals(tableList.get("tables").get(0).asText(), "testTable_OFFLINE"); } @AfterClass public void tearDown() { + stopFakeInstances(); stopController(); stopZk(); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java index ed76582..cdf3d9e 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java @@ -55,7 +55,6 @@ import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.util.TestUtils; import org.testng.Assert; import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -407,7 +406,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest { Assert.assertTrue(leadControllerResourceIdealState.isValid()); Assert.assertTrue(leadControllerResourceIdealState.isEnabled()); Assert.assertEquals(leadControllerResourceIdealState.getInstanceGroupTag(), - CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE); + CommonConstants.Helix.CONTROLLER_INSTANCE); Assert.assertEquals(leadControllerResourceIdealState.getNumPartitions(), CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE); Assert.assertEquals(leadControllerResourceIdealState.getReplicas(), diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BalanceNumSegmentAssignmentStrategyIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BalanceNumSegmentAssignmentStrategyIntegrationTest.java index b8ac99a..b6538dd 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BalanceNumSegmentAssignmentStrategyIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BalanceNumSegmentAssignmentStrategyIntegrationTest.java @@ -18,13 +18,14 @@ */ package org.apache.pinot.integration.tests; -import com.fasterxml.jackson.databind.node.ObjectNode; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.helix.model.IdealState; -import org.apache.pinot.common.utils.JsonUtils; +import org.apache.pinot.common.utils.CommonConstants.Helix.InstanceType; +import org.apache.pinot.controller.api.pojos.Instance; import org.apache.pinot.core.indexsegment.generator.SegmentVersion; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; @@ -38,9 +39,9 @@ import static org.testng.Assert.assertEquals; */ // TODO: clean up this test public class BalanceNumSegmentAssignmentStrategyIntegrationTest extends UploadRefreshDeleteIntegrationTest { - private final String serverTenant = "DefaultTenant_OFFLINE"; - private final String hostName = "1.2.3.4"; - private final int basePort = 1234; + private static final String HOST = "1.2.3.4"; + private static final int BASE_PORT = 1234; + private static final String SERVER_TAG = "DefaultTenant_OFFLINE"; @BeforeClass public void setUp() @@ -49,12 +50,9 @@ public class BalanceNumSegmentAssignmentStrategyIntegrationTest extends UploadRe // Create eight dummy server instances for (int i = 0; i < 8; ++i) { - ObjectNode serverInstance = JsonUtils.newObjectNode(); - serverInstance.put("host", hostName); - serverInstance.put("port", Integer.toString(basePort + i)); - serverInstance.put("tag", serverTenant); - serverInstance.put("type", "server"); - sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), serverInstance.toString()); + Instance serverInstance = + new Instance(HOST, BASE_PORT + i, InstanceType.SERVER, Collections.singletonList(SERVER_TAG), null); + sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), serverInstance.toJsonString()); } } @@ -100,7 +98,7 @@ public class BalanceNumSegmentAssignmentStrategyIntegrationTest extends UploadRe @Test(dataProvider = "tableNameProvider") public void testNoAssignmentToDisabledInstances(String tableName, SegmentVersion version) throws Exception { - List<String> instances = _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), serverTenant); + List<String> instances = _helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), SERVER_TAG); List<String> disabledInstances = new ArrayList<>(); // disable 6 instances assertEquals(instances.size(), 9); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index f8baa86..c36def8 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -187,7 +187,7 @@ public abstract class ClusterTest extends ControllerTest { for (int i = 0; i < minionCount; i++) { Configuration config = new PropertiesConfiguration(); config.setProperty(Helix.Instance.INSTANCE_ID_KEY, - Minion.INSTANCE_PREFIX + "minion" + i + "_" + (Minion.DEFAULT_HELIX_PORT + i)); + Helix.PREFIX_OF_MINION_INSTANCE + "minion" + i + "_" + (Minion.DEFAULT_HELIX_PORT + i)); config.setProperty(Helix.Instance.DATA_DIR_KEY, Minion.DEFAULT_INSTANCE_DATA_DIR + "-" + i); MinionStarter minionStarter = new MinionStarter(ZkStarter.DEFAULT_ZK_STR, getHelixClusterName(), config); @@ -405,11 +405,11 @@ public abstract class ClusterTest extends ControllerTest { String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String timeColumnName, String timeType, String schemaName, String brokerTenant, String serverTenant, String loadMode, String sortedColumn, List<String> invertedIndexColumns, List<String> bloomFilterColumns, List<String> noDictionaryColumns, - TableTaskConfig taskConfig, String streamConsumerFactoryName) throws Exception { - addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushRows, - avroFile, timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, - invertedIndexColumns, bloomFilterColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName, - 1); + TableTaskConfig taskConfig, String streamConsumerFactoryName) + throws Exception { + addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, kafkaTopic, realtimeSegmentFlushRows, avroFile, + timeColumnName, timeType, schemaName, brokerTenant, serverTenant, loadMode, sortedColumn, invertedIndexColumns, + bloomFilterColumns, noDictionaryColumns, taskConfig, streamConsumerFactoryName, 1); } protected void addRealtimeTable(String tableName, boolean useLlc, String kafkaBrokerList, String kafkaZkUrl, diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java index 3ef52e5..f6bc0b1 100644 --- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java +++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java @@ -76,7 +76,7 @@ public class MinionStarter { _helixClusterName = helixClusterName; _config = config; _instanceId = config.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY, - CommonConstants.Minion.INSTANCE_PREFIX + NetUtil.getHostAddress() + "_" + CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE + NetUtil.getHostAddress() + "_" + CommonConstants.Minion.DEFAULT_HELIX_PORT); setupHelixSystemProperties(); _helixManager = new ZKHelixManager(_helixClusterName, _instanceId, InstanceType.PARTICIPANT, zkAddress); @@ -141,9 +141,9 @@ public class MinionStarter { MetricsHelper.initializeMetrics(_config); MetricsRegistry metricsRegistry = new MetricsRegistry(); MetricsHelper.registerMetricsRegistry(metricsRegistry); - final MinionMetrics minionMetrics = new MinionMetrics( - _config.getString(CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX_KEY, CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX), - metricsRegistry); + final MinionMetrics minionMetrics = new MinionMetrics(_config + .getString(CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX_KEY, + CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX), metricsRegistry); minionMetrics.initializeGlobalMeters(); minionContext.setMinionMetrics(minionMetrics); @@ -222,8 +222,8 @@ public class MinionStarter { private void addInstanceTagIfNeeded() { InstanceConfig instanceConfig = _helixAdmin.getInstanceConfig(_helixClusterName, _instanceId); if (instanceConfig.getTags().isEmpty()) { - LOGGER.info("Adding default Helix tag: {} to Pinot minion", CommonConstants.Minion.UNTAGGED_INSTANCE); - _helixAdmin.addInstanceTag(_helixClusterName, _instanceId, CommonConstants.Minion.UNTAGGED_INSTANCE); + LOGGER.info("Adding default Helix tag: {} to Pinot minion", CommonConstants.Helix.UNTAGGED_MINION_INSTANCE); + _helixAdmin.addInstanceTag(_helixClusterName, _instanceId, CommonConstants.Helix.UNTAGGED_MINION_INSTANCE); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
