This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d7b4a5  [Instance Assignment] Enhance Instance class to include the 
pool config (#4522)
1d7b4a5 is described below

commit 1d7b4a5763771f93b9abf25fef25aab7c83e4268
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Aug 12 17:29:26 2019 -0700

    [Instance Assignment] Enhance Instance class to include the pool config 
(#4522)
    
    - Enhance Instance class to include the pool config
    - Allow multiple tags in the Instance
    - Support pools and multiple tags for instance restlet API
    - Make the behavior of joining the cluster via instance restlet API and 
auto-join consistent
      - HELIX_HOST will have the instance type prefix per Helix behavior
---
 .../org/apache/pinot/common/config/Instance.java   | 145 +++++++++++++++++++++
 .../apache/pinot/common/utils/CommonConstants.java |  29 ++---
 .../pinot/controller/api/pojos/Instance.java       | 137 -------------------
 .../resources/PinotInstanceRestletResource.java    |   8 +-
 .../helix/core/PinotHelixResourceManager.java      |  70 +++++-----
 .../instance/InstanceTagPoolSelector.java          |   2 +-
 .../core/minion/PinotHelixTaskResourceManager.java |   4 +-
 .../helix/core/util/HelixSetupUtils.java           |   2 +-
 .../api/PinotInstanceRestletResourceTest.java      | 136 +++++++++----------
 .../api/PinotTenantRestletResourceTest.java        |  78 +++--------
 .../helix/core/PinotHelixResourceManagerTest.java  |   3 +-
 .../instance/InstanceAssignmentTest.java           |   2 +-
 ...umSegmentAssignmentStrategyIntegrationTest.java |  22 ++--
 .../pinot/integration/tests/ClusterTest.java       |  12 +-
 .../org/apache/pinot/minion/MinionStarter.java     |  12 +-
 15 files changed, 309 insertions(+), 353 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/config/Instance.java 
b/pinot-common/src/main/java/org/apache/pinot/common/config/Instance.java
new file mode 100644
index 0000000..dc37e20
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/Instance.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
+import org.apache.pinot.common.utils.CommonConstants.Helix.InstanceType;
+import org.apache.pinot.common.utils.JsonUtils;
+
+
+/**
+ * Instance configuration.
+ * <pre>
+ * Example:
+ * {
+ *   "host": "hostname.example.com",
+ *   "port": 1234,
+ *   "type": "SERVER",
+ *   "tags": ["example_OFFLINE"],
+ *   "pools": {
+ *     "example_OFFLINE": 0
+ *   }
+ * }
+ * </pre>
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Instance {
+  public static final String POOL_KEY = "pool";
+
+  private final String _host;
+  private final int _port;
+  private final InstanceType _type;
+  private final List<String> _tags;
+  private final Map<String, Integer> _pools;
+
+  @JsonCreator
+  public Instance(@JsonProperty(value = "host", required = true) String host,
+      @JsonProperty(value = "port", required = true) int port,
+      @JsonProperty(value = "type", required = true) InstanceType type,
+      @JsonProperty(value = "tags") @Nullable List<String> tags,
+      @JsonProperty(value = "pools") @Nullable Map<String, Integer> pools) {
+    _host = host;
+    _port = port;
+    _type = type;
+    _tags = tags;
+    _pools = pools;
+  }
+
+  @JsonProperty
+  public String getHost() {
+    return _host;
+  }
+
+  @JsonProperty
+  public int getPort() {
+    return _port;
+  }
+
+  @JsonProperty
+  public InstanceType getType() {
+    return _type;
+  }
+
+  @JsonProperty
+  public List<String> getTags() {
+    return _tags;
+  }
+
+  @JsonProperty
+  public Map<String, Integer> getPools() {
+    return _pools;
+  }
+
+  @JsonIgnore
+  public String getInstanceId() {
+    String prefix;
+    switch (_type) {
+      case CONTROLLER:
+        prefix = Helix.PREFIX_OF_CONTROLLER_INSTANCE;
+        break;
+      case BROKER:
+        prefix = Helix.PREFIX_OF_BROKER_INSTANCE;
+        break;
+      case SERVER:
+        prefix = Helix.PREFIX_OF_SERVER_INSTANCE;
+        break;
+      case MINION:
+        prefix = Helix.PREFIX_OF_MINION_INSTANCE;
+        break;
+      default:
+        throw new IllegalStateException();
+    }
+    return prefix + _host + "_" + _port;
+  }
+
+  public InstanceConfig toInstanceConfig() {
+    InstanceConfig instanceConfig = 
InstanceConfig.toInstanceConfig(getInstanceId());
+    if (_tags != null) {
+      for (String tag : _tags) {
+        instanceConfig.addTag(tag);
+      }
+    }
+    if (_pools != null && !_pools.isEmpty()) {
+      Map<String, String> mapValue = new TreeMap<>();
+      for (Map.Entry<String, Integer> entry : _pools.entrySet()) {
+        mapValue.put(entry.getKey(), entry.getValue().toString());
+      }
+      instanceConfig.getRecord().setMapField(POOL_KEY, mapValue);
+    }
+    return instanceConfig;
+  }
+
+  public String toJsonString() {
+    try {
+      return JsonUtils.objectToString(this);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 663557e..d17eb79 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -31,13 +31,10 @@ public class CommonConstants {
 
     public static final String INSTANCE_CONNECTED_METRIC_NAME = 
"helix.connected";
 
-    public static final String PREFIX_OF_SERVER_INSTANCE = "Server_";
-    public static final String PREFIX_OF_BROKER_INSTANCE = "Broker_";
     public static final String PREFIX_OF_CONTROLLER_INSTANCE = "Controller_";
-
-    public static final String SERVER_INSTANCE_TYPE = "server";
-    public static final String BROKER_INSTANCE_TYPE = "broker";
-    public static final String CONTROLLER_INSTANCE_TYPE = "controller";
+    public static final String PREFIX_OF_BROKER_INSTANCE = "Broker_";
+    public static final String PREFIX_OF_SERVER_INSTANCE = "Server_";
+    public static final String PREFIX_OF_MINION_INSTANCE = "Minion_";
 
     public static final String BROKER_RESOURCE_INSTANCE = "brokerResource";
     public static final String LEAD_CONTROLLER_RESOURCE_NAME = 
"leadControllerResource";
@@ -49,8 +46,11 @@ public class CommonConstants {
     public static final int MIN_ACTIVE_REPLICAS = 0;
     public static final int REBALANCE_DELAY_MS = 300_000; // 5 minutes.
 
-    public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged";
+    // Instance tags
+    public static final String CONTROLLER_INSTANCE = "controller";
     public static final String UNTAGGED_BROKER_INSTANCE = "broker_untagged";
+    public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged";
+    public static final String UNTAGGED_MINION_INSTANCE = "minion_untagged";
 
     public static class StateModel {
       public static class SegmentOnlineOfflineStateModel {
@@ -92,6 +92,10 @@ public class CommonConstants {
       public static final String ADMIN_PORT_KEY = "adminPort";
     }
 
+    public enum InstanceType {
+      CONTROLLER, BROKER, SERVER, MINION
+    }
+
     public enum TableType {
       OFFLINE, REALTIME;
 
@@ -299,9 +303,6 @@ public class CommonConstants {
   }
 
   public static class Minion {
-    public static final String INSTANCE_PREFIX = "Minion_";
-    public static final String INSTANCE_TYPE = "minion";
-    public static final String UNTAGGED_INSTANCE = "minion_untagged";
     public static final String CONFIG_OF_METRICS_PREFIX = "pinot.minion.";
     public static final String METADATA_EVENT_OBSERVER_PREFIX = 
"metadata.event.notifier";
 
@@ -320,14 +321,6 @@ public class CommonConstants {
     public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "crypter";
   }
 
-  public static class Metric {
-    public static class Server {
-      public static final String CURRENT_NUMBER_OF_SEGMENTS = 
"currentNumberOfSegments";
-      public static final String CURRENT_NUMBER_OF_DOCUMENTS = 
"currentNumberOfDocuments";
-      public static final String NUMBER_OF_DELETED_SEGMENTS = 
"numberOfDeletedSegments";
-    }
-  }
-
   public static class Segment {
     public static class Realtime {
       public enum Status {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/pojos/Instance.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/pojos/Instance.java
deleted file mode 100644
index 7a8ac23..0000000
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/pojos/Instance.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.api.pojos;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.pinot.common.utils.CommonConstants;
-
-
-/**
- * Instance POJO, used as part of the API to create instances.
- */
-//@Example("{\n" + "\t\"host\": \"hostname.example.com\",\n" + "\t\"port\": 
\"1234\",\n" + "\t\"type\": \"server\"\n" + "}")
-public class Instance {
-  public static final String POOL_KEY = "pool";
-
-  private final String _host;
-  private final String _port;
-  private final String _type;
-  private final String _tag;
-  private final String _instancePrefix;
-
-  public static Instance fromInstanceConfig(InstanceConfig instanceConfig) {
-    InstanceConfig ic = instanceConfig;
-    String instanceName = ic.getInstanceName();
-    String type;
-    if 
(instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
-      type = CommonConstants.Helix.SERVER_INSTANCE_TYPE;
-    } else if 
(instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) {
-      type = CommonConstants.Helix.BROKER_INSTANCE_TYPE;
-    } else {
-      throw new RuntimeException("Unknown instance type for: " + instanceName);
-    }
-
-    Instance instance =
-        new Instance(ic.getHostName(), ic.getPort(), type, 
org.apache.commons.lang.StringUtils.join(ic.getTags(), ','));
-    return instance;
-  }
-
-  @JsonCreator
-  public Instance(@JsonProperty(value = "host", required = true) String host,
-      @JsonProperty(value = "port", required = true) String port,
-      @JsonProperty(value = "type", required = true) String type,
-      @JsonProperty(value = "tag", required = false) String tag) {
-    _host = host;
-    _port = port;
-    _tag = tag;
-
-    if (CommonConstants.Helix.SERVER_INSTANCE_TYPE.equalsIgnoreCase(type)) {
-      _instancePrefix = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE;
-      _type = CommonConstants.Helix.SERVER_INSTANCE_TYPE;
-    } else if 
(CommonConstants.Helix.BROKER_INSTANCE_TYPE.equalsIgnoreCase(type)) {
-      _instancePrefix = CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE;
-      _type = CommonConstants.Helix.BROKER_INSTANCE_TYPE;
-    } else if (CommonConstants.Minion.INSTANCE_TYPE.equalsIgnoreCase(type)) {
-      _instancePrefix = CommonConstants.Minion.INSTANCE_PREFIX;
-      _type = CommonConstants.Minion.INSTANCE_TYPE;
-    } else {
-      throw new IllegalArgumentException("Invalid instance type " + type + ", 
expected either server or broker");
-    }
-  }
-
-  public String getHost() {
-    return _host;
-  }
-
-  public String getPort() {
-    return _port;
-  }
-
-  public String getTag() {
-    return _tag;
-  }
-
-  public String getType() {
-    return _type;
-  }
-
-  public String toInstanceId() {
-    return _instancePrefix + _host + "_" + _port;
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder bld = new StringBuilder();
-    bld.append("host : " + _host + "\n");
-    bld.append("port : " + _port + "\n");
-    bld.append("type : " + _type + "\n");
-    if (_tag != null) {
-      bld.append("tag : " + _tag + "\n");
-    }
-    return bld.toString();
-  }
-
-  public InstanceConfig toInstanceConfig() {
-    final InstanceConfig iConfig = new InstanceConfig(toInstanceId());
-    iConfig.setHostName(_host);
-    iConfig.setPort(_port);
-    iConfig.setInstanceEnabled(true);
-    iConfig.addTag(getTagOrDefaultTag());
-    return iConfig;
-  }
-
-  private String getTagOrDefaultTag() {
-    if (_tag != null) {
-      return _tag;
-    } else {
-      switch (_type) {
-        case CommonConstants.Helix.SERVER_INSTANCE_TYPE:
-          return CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
-        case CommonConstants.Helix.BROKER_INSTANCE_TYPE:
-          return CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
-        case CommonConstants.Minion.INSTANCE_TYPE:
-          return CommonConstants.Minion.UNTAGGED_INSTANCE;
-        default:
-          throw new RuntimeException("Unknown instance type " + _type + ", was 
expecting either server or broker");
-      }
-    }
-  }
-}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
index 6ff9f16..79788c1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
@@ -37,8 +37,8 @@ import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.config.Instance;
 import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.controller.api.pojos.Instance;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
 import org.slf4j.Logger;
@@ -85,8 +85,7 @@ public class PinotInstanceRestletResource {
   @ApiOperation(value = "Get instance information", produces = 
MediaType.APPLICATION_JSON)
   @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), 
@ApiResponse(code = 404, message = "Instance not found"), @ApiResponse(code = 
500, message = "Internal error")})
   public String getInstance(
-      @ApiParam(value = "Instance name", required = true, example = 
"Server_a.b.com_20000 | Broker_my.broker.com_30000")
-      @PathParam("instanceName") String instanceName) {
+      @ApiParam(value = "Instance name", required = true, example = 
"Server_a.b.com_20000 | Broker_my.broker.com_30000") @PathParam("instanceName") 
String instanceName) {
     InstanceConfig instanceConfig = 
pinotHelixResourceManager.getHelixInstanceConfig(instanceName);
     if (instanceConfig == null) {
       throw new ControllerApplicationException(LOGGER, "Instance " + 
instanceName + " not found",
@@ -98,6 +97,7 @@ public class PinotInstanceRestletResource {
     response.put("enabled", instanceConfig.getInstanceEnabled());
     response.put("port", instanceConfig.getPort());
     response.set("tags", JsonUtils.objectToJsonNode(instanceConfig.getTags()));
+    response.set("pools", 
JsonUtils.objectToJsonNode(instanceConfig.getRecord().getMapField(Instance.POOL_KEY)));
     return response.toString();
   }
 
@@ -108,7 +108,7 @@ public class PinotInstanceRestletResource {
   @ApiOperation(value = "Create a new instance", consumes = 
MediaType.APPLICATION_JSON, notes = "Creates a new instance with given instance 
config")
   @ApiResponses(value = {@ApiResponse(code = 200, message = "Success"), 
@ApiResponse(code = 409, message = "Instance already exists"), 
@ApiResponse(code = 500, message = "Internal error")})
   public SuccessResponse addInstance(Instance instance) {
-    LOGGER.info("Instance creation request received for instance " + 
instance.toInstanceId());
+    LOGGER.info("Instance creation request received for instance: {}", 
instance.getInstanceId());
     if (!pinotHelixResourceManager.addInstance(instance).isSuccessful()) {
       throw new ControllerApplicationException(LOGGER, "Instance already 
exists", Response.Status.CONFLICT);
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index cb69b93..6714044 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -48,13 +48,11 @@ import org.apache.helix.HelixAdmin;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.examples.MasterSlaveStateModelFactory;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
 import org.apache.helix.model.CurrentState;
@@ -62,9 +60,9 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.MasterSlaveSMD;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.config.IndexingConfig;
+import org.apache.pinot.common.config.Instance;
 import org.apache.pinot.common.config.OfflineTagConfig;
 import org.apache.pinot.common.config.RealtimeTagConfig;
 import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
@@ -89,7 +87,7 @@ import 
org.apache.pinot.common.partition.ReplicaGroupPartitionAssignment;
 import 
org.apache.pinot.common.partition.ReplicaGroupPartitionAssignmentGenerator;
 import org.apache.pinot.common.restlet.resources.RebalanceResult;
 import org.apache.pinot.common.segment.SegmentMetadata;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
 import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerOnlineOfflineStateModel;
 import 
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
@@ -99,7 +97,6 @@ import 
org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvid
 import org.apache.pinot.common.utils.retry.RetryPolicies;
 import org.apache.pinot.common.utils.retry.RetryPolicy;
 import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.api.pojos.Instance;
 import org.apache.pinot.controller.api.resources.StateType;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import 
org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy;
@@ -258,10 +255,9 @@ public class PinotHelixResourceManager {
    */
   private void addInstanceGroupTagIfNeeded() {
     InstanceConfig instanceConfig = getHelixInstanceConfig(_instanceId);
-    if 
(!instanceConfig.containsTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE)) {
-      LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.", 
_instanceId,
-          CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
-      instanceConfig.addTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+    if (!instanceConfig.containsTag(Helix.CONTROLLER_INSTANCE)) {
+      LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.", 
_instanceId, Helix.CONTROLLER_INSTANCE);
+      instanceConfig.addTag(Helix.CONTROLLER_INSTANCE);
       HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
       accessor.setProperty(accessor.keyBuilder().instanceConfig(_instanceId), 
instanceConfig);
     }
@@ -360,7 +356,7 @@ public class PinotHelixResourceManager {
   @Nonnull
   public synchronized PinotResourceManagerResponse addInstance(@Nonnull 
Instance instance) {
     List<String> instances = getAllInstances();
-    String instanceIdToAdd = instance.toInstanceId();
+    String instanceIdToAdd = instance.getInstanceId();
     if (instances.contains(instanceIdToAdd)) {
       return PinotResourceManagerResponse.failure("Instance " + 
instanceIdToAdd + " already exists");
     } else {
@@ -589,7 +585,7 @@ public class PinotHelixResourceManager {
     }
     for (int i = 0; i < numberOfInstancesToAdd; ++i) {
       String instanceName = unTaggedInstanceList.get(i);
-      retagInstance(instanceName, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE, brokerTenantTag);
+      retagInstance(instanceName, Helix.UNTAGGED_BROKER_INSTANCE, 
brokerTenantTag);
       // Update idealState by adding new instance to table mapping.
       addInstanceToBrokerIdealState(brokerTenantTag, instanceName);
     }
@@ -625,7 +621,7 @@ public class PinotHelixResourceManager {
 
     // Update ideal state with the new broker instances
     try {
-      HelixHelper.updateIdealState(getHelixZkManager(), 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState -> {
+      HelixHelper.updateIdealState(getHelixZkManager(), 
Helix.BROKER_RESOURCE_INSTANCE, idealState -> {
         assert idealState != null;
         Map<String, String> instanceStateMap = 
idealState.getInstanceStateMap(tableNameWithType);
         if (instanceStateMap != null) {
@@ -646,8 +642,7 @@ public class PinotHelixResourceManager {
   }
 
   private void addInstanceToBrokerIdealState(String brokerTenantTag, String 
instanceName) {
-    IdealState tableIdealState =
-        _helixAdmin.getResourceIdealState(_helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    IdealState tableIdealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, 
Helix.BROKER_RESOURCE_INSTANCE);
     for (String tableNameWithType : tableIdealState.getPartitionSet()) {
       TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
       Preconditions.checkNotNull(tableConfig);
@@ -656,15 +651,14 @@ public class PinotHelixResourceManager {
         tableIdealState.setPartitionState(tableNameWithType, instanceName, 
BrokerOnlineOfflineStateModel.ONLINE);
       }
     }
-    _helixAdmin
-        .setResourceIdealState(_helixClusterName, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
+    _helixAdmin.setResourceIdealState(_helixClusterName, 
Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
   }
 
   private PinotResourceManagerResponse scaleDownBroker(Tenant tenant, String 
brokerTenantTag,
       List<String> instancesInClusterWithTag) {
     int numberBrokersToUntag = instancesInClusterWithTag.size() - 
tenant.getNumberOfInstances();
     for (int i = 0; i < numberBrokersToUntag; ++i) {
-      retagInstance(instancesInClusterWithTag.get(i), brokerTenantTag, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+      retagInstance(instancesInClusterWithTag.get(i), brokerTenantTag, 
Helix.UNTAGGED_BROKER_INSTANCE);
     }
     return PinotResourceManagerResponse.SUCCESS;
   }
@@ -727,11 +721,11 @@ public class PinotHelixResourceManager {
     int incOffline = serverTenant.getOfflineInstances() - 
taggedOfflineServers.size();
     int incRealtime = serverTenant.getRealtimeInstances() - 
taggedRealtimeServers.size();
     for (int i = 0; i < incOffline; ++i) {
-      retagInstance(unTaggedInstanceList.get(i), 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
+      retagInstance(unTaggedInstanceList.get(i), 
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
     }
     for (int i = incOffline; i < incOffline + incRealtime; ++i) {
       String instanceName = unTaggedInstanceList.get(i);
-      retagInstance(instanceName, 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
+      retagInstance(instanceName, Helix.UNTAGGED_SERVER_INSTANCE, 
realtimeServerTag);
       // TODO: update idealStates & instanceZkMetadata
     }
     return PinotResourceManagerResponse.SUCCESS;
@@ -746,14 +740,14 @@ public class PinotHelixResourceManager {
     taggedOfflineServers.removeAll(taggedRealtimeServers);
     for (int i = 0; i < incOffline; ++i) {
       if (i < incInstances) {
-        retagInstance(unTaggedInstanceList.get(i), 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
+        retagInstance(unTaggedInstanceList.get(i), 
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
       } else {
         _helixAdmin.addInstanceTag(_helixClusterName, 
taggedRealtimeServers.get(i - incInstances), offlineServerTag);
       }
     }
     for (int i = incOffline; i < incOffline + incRealtime; ++i) {
       if (i < incInstances) {
-        retagInstance(unTaggedInstanceList.get(i), 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
+        retagInstance(unTaggedInstanceList.get(i), 
Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
         // TODO: update idealStates & instanceZkMetadata
       } else {
         _helixAdmin.addInstanceTag(_helixClusterName, 
taggedOfflineServers.get(i - Math.max(incInstances, incOffline)),
@@ -777,7 +771,7 @@ public class PinotHelixResourceManager {
   public boolean isBrokerTenantDeletable(String tenantName) {
     String brokerTag = TagNameUtils.getBrokerTagForTenant(tenantName);
     Set<String> taggedInstances = new 
HashSet<>(HelixHelper.getInstancesWithTag(_helixZkManager, brokerTag));
-    String brokerName = CommonConstants.Helix.BROKER_RESOURCE_INSTANCE;
+    String brokerName = Helix.BROKER_RESOURCE_INSTANCE;
     IdealState brokerIdealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, brokerName);
     for (String partition : brokerIdealState.getPartitionSet()) {
       for (String instance : brokerIdealState.getInstanceSet(partition)) {
@@ -866,12 +860,12 @@ public class PinotHelixResourceManager {
       List<String> unTaggedInstanceList) {
     String offlineServerTag = 
TagNameUtils.getOfflineTagForTenant(serverTenant.getTenantName());
     for (int i = 0; i < serverTenant.getOfflineInstances(); i++) {
-      retagInstance(unTaggedInstanceList.get(i), 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
+      retagInstance(unTaggedInstanceList.get(i), 
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
     }
     String realtimeServerTag = 
TagNameUtils.getRealtimeTagForTenant(serverTenant.getTenantName());
     for (int i = 0; i < serverTenant.getRealtimeInstances(); i++) {
-      retagInstance(unTaggedInstanceList.get(i + 
serverTenant.getOfflineInstances()),
-          CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
+      retagInstance(unTaggedInstanceList.get(i + 
serverTenant.getOfflineInstances()), Helix.UNTAGGED_SERVER_INSTANCE,
+          realtimeServerTag);
     }
   }
 
@@ -880,11 +874,11 @@ public class PinotHelixResourceManager {
     int cnt = 0;
     String offlineServerTag = 
TagNameUtils.getOfflineTagForTenant(serverTenant.getTenantName());
     for (int i = 0; i < serverTenant.getOfflineInstances(); i++) {
-      retagInstance(unTaggedInstanceList.get(cnt++), 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
+      retagInstance(unTaggedInstanceList.get(cnt++), 
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
     }
     String realtimeServerTag = 
TagNameUtils.getRealtimeTagForTenant(serverTenant.getTenantName());
     for (int i = 0; i < serverTenant.getRealtimeInstances(); i++) {
-      retagInstance(unTaggedInstanceList.get(cnt++), 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
+      retagInstance(unTaggedInstanceList.get(cnt++), 
Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
       if (cnt == numberOfInstances) {
         cnt = 0;
       }
@@ -903,7 +897,7 @@ public class PinotHelixResourceManager {
     }
     String brokerTag = 
TagNameUtils.getBrokerTagForTenant(brokerTenant.getTenantName());
     for (int i = 0; i < brokerTenant.getNumberOfInstances(); ++i) {
-      retagInstance(unTaggedInstanceList.get(i), 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE, brokerTag);
+      retagInstance(unTaggedInstanceList.get(i), 
Helix.UNTAGGED_BROKER_INSTANCE, brokerTag);
     }
     return PinotResourceManagerResponse.SUCCESS;
   }
@@ -914,7 +908,7 @@ public class PinotHelixResourceManager {
     for (String instanceName : instancesInClusterWithTag) {
       _helixAdmin.removeInstanceTag(_helixClusterName, instanceName, 
offlineTenantTag);
       if (getTagsForInstance(instanceName).isEmpty()) {
-        _helixAdmin.addInstanceTag(_helixClusterName, instanceName, 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE);
+        _helixAdmin.addInstanceTag(_helixClusterName, instanceName, 
Helix.UNTAGGED_SERVER_INSTANCE);
       }
     }
     return PinotResourceManagerResponse.SUCCESS;
@@ -926,7 +920,7 @@ public class PinotHelixResourceManager {
     for (String instanceName : instancesInClusterWithTag) {
       _helixAdmin.removeInstanceTag(_helixClusterName, instanceName, 
realtimeTenantTag);
       if (getTagsForInstance(instanceName).isEmpty()) {
-        _helixAdmin.addInstanceTag(_helixClusterName, instanceName, 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE);
+        _helixAdmin.addInstanceTag(_helixClusterName, instanceName, 
Helix.UNTAGGED_SERVER_INSTANCE);
       }
     }
     return PinotResourceManagerResponse.SUCCESS;
@@ -936,7 +930,7 @@ public class PinotHelixResourceManager {
     String brokerTag = TagNameUtils.getBrokerTagForTenant(tenantName);
     List<String> instancesInClusterWithTag = 
HelixHelper.getInstancesWithTag(_helixZkManager, brokerTag);
     for (String instance : instancesInClusterWithTag) {
-      retagInstance(instance, brokerTag, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+      retagInstance(instance, brokerTag, Helix.UNTAGGED_BROKER_INSTANCE);
     }
     return PinotResourceManagerResponse.SUCCESS;
   }
@@ -1361,8 +1355,8 @@ public class PinotHelixResourceManager {
 
   private void handleBrokerResource(@Nonnull final String tableName, @Nonnull 
final List<String> brokersForTenant) {
     LOGGER.info("Updating BrokerResource IdealState for table: {}", tableName);
-    HelixHelper.updateIdealState(_helixZkManager, 
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
-        new Function<IdealState, IdealState>() {
+    HelixHelper
+        .updateIdealState(_helixZkManager, Helix.BROKER_RESOURCE_INSTANCE, new 
Function<IdealState, IdealState>() {
           @Override
           public IdealState apply(@Nullable IdealState idealState) {
             Preconditions.checkNotNull(idealState);
@@ -1592,7 +1586,7 @@ public class PinotHelixResourceManager {
     recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
     recipientCriteria.setInstanceName("%");
     recipientCriteria.setSessionSpecific(true);
-    
recipientCriteria.setResource(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+    recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
     recipientCriteria.setDataSource(Criteria.DataSource.EXTERNALVIEW);
     // The brokerResource field in the EXTERNALVIEW stores the offline table 
name in the Partition subfield.
     recipientCriteria.setPartition(tableNameWithType);
@@ -2240,8 +2234,7 @@ public class PinotHelixResourceManager {
    */
   public List<String> getOnlineUnTaggedBrokerInstanceList() {
 
-    final List<String> instanceList =
-        HelixHelper.getInstancesWithTag(_helixZkManager, 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+    final List<String> instanceList = 
HelixHelper.getInstancesWithTag(_helixZkManager, 
Helix.UNTAGGED_BROKER_INSTANCE);
     final List<String> liveInstances = 
_helixDataAccessor.getChildNames(_keyBuilder.liveInstances());
     instanceList.retainAll(liveInstances);
     return instanceList;
@@ -2252,8 +2245,7 @@ public class PinotHelixResourceManager {
    * @return List of untagged online server instances.
    */
   public List<String> getOnlineUnTaggedServerInstanceList() {
-    final List<String> instanceList =
-        HelixHelper.getInstancesWithTag(_helixZkManager, 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE);
+    final List<String> instanceList = 
HelixHelper.getInstancesWithTag(_helixZkManager, 
Helix.UNTAGGED_SERVER_INSTANCE);
     final List<String> liveInstances = 
_helixDataAccessor.getChildNames(_keyBuilder.liveInstances());
     instanceList.retainAll(liveInstances);
     return instanceList;
@@ -2285,7 +2277,7 @@ public class PinotHelixResourceManager {
       ZNRecord record = helixInstanceConfig.getRecord();
       String[] hostnameSplit = helixInstanceConfig.getHostName().split("_");
       Preconditions.checkState(hostnameSplit.length >= 2);
-      String adminPort = 
record.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY);
+      String adminPort = record.getSimpleField(Helix.Instance.ADMIN_PORT_KEY);
       // If admin port is missing, there's no point to calculate the remaining 
table size.
       // Thus, throwing an exception will be good here.
       if (Strings.isNullOrEmpty(adminPort)) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 5651b89..d7176db 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -26,8 +26,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.config.Instance;
 import org.apache.pinot.common.config.instance.InstanceTagPoolConfig;
-import org.apache.pinot.controller.api.pojos.Instance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index e5b4090..9899dfa 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -36,7 +36,7 @@ import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.pinot.common.config.PinotTaskConfig;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -175,7 +175,7 @@ public class PinotHelixTaskResourceManager {
   @Nonnull
   public synchronized String submitTask(@Nonnull List<PinotTaskConfig> 
pinotTaskConfigs,
       int numConcurrentTasksPerInstance) {
-    return submitTask(pinotTaskConfigs, 
CommonConstants.Minion.UNTAGGED_INSTANCE, numConcurrentTasksPerInstance);
+    return submitTask(pinotTaskConfigs, Helix.UNTAGGED_MINION_INSTANCE, 
numConcurrentTasksPerInstance);
   }
 
   /**
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index a501767..37dec22 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -170,7 +170,7 @@ public class HelixSetupUtils {
       idealStateBuilder.enableDelayRebalance();
       // Set instance group tag
       IdealState idealState = idealStateBuilder.build();
-      idealState.setInstanceGroupTag(CONTROLLER_INSTANCE_TYPE);
+      idealState.setInstanceGroupTag(CONTROLLER_INSTANCE);
       // Set batch message mode
       idealState.setBatchMessageMode(enableBatchMessageMode);
       // Explicitly disable this resource when creating this new resource.
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
index cb06fc4..a8dac6c 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
@@ -19,12 +19,17 @@
 package org.apache.pinot.controller.api;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.IOException;
-import org.apache.pinot.common.utils.CommonConstants;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
+import org.apache.pinot.common.utils.CommonConstants.Helix.InstanceType;
 import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.config.Instance;
 import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -37,6 +42,7 @@ import static org.testng.Assert.fail;
  * Tests for the instances Restlet.
  */
 public class PinotInstanceRestletResourceTest extends ControllerTest {
+
   @BeforeClass
   public void setUp() {
     startZk();
@@ -46,109 +52,99 @@ public class PinotInstanceRestletResourceTest extends 
ControllerTest {
   @Test
   public void testInstanceListingAndCreation()
       throws Exception {
-    // Check that there is only one instance, which is the controller instance.
-    JsonNode instanceList = 
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList()));
-    assertEquals(instanceList.get("instances").size(), 1, "Expected only one 
instance at beginning of test");
+    // Check that there is only one CONTROLLER instance in the cluster
+    String listInstancesUrl = _controllerRequestURLBuilder.forInstanceList();
+    JsonNode instanceList = 
JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl));
+    assertEquals(instanceList.get("instances").size(), 1);
+    
assertTrue(instanceList.get("instances").get(0).asText().startsWith(Helix.PREFIX_OF_CONTROLLER_INSTANCE));
 
     // Create untagged broker and server instances
-    ObjectNode brokerInstance =
-        (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\", 
\"type\":\"broker\", \"port\":\"1234\"}");
-    sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), 
brokerInstance.toString());
-
-    ObjectNode serverInstance =
-        (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\", 
\"type\":\"server\", \"port\":\"2345\"}");
-    sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), 
serverInstance.toString());
-
-    // Check that there are three instances
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        // Check that there are two instances
-        return
-            
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances")
-                .size() == 3;
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 10_000L, "Expected three instances after creation of tagged instances");
-
-    // Create tagged broker and server instances
-    brokerInstance.put("tag", "someTag");
-    brokerInstance.put("host", "2.3.4.5");
-    sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), 
brokerInstance.toString());
-
-    serverInstance.put("tag", "someTag");
-    serverInstance.put("host", "2.3.4.5");
-    sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), 
serverInstance.toString());
-
-    // It may take some time for cache data accessor to update its data.
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        // Check that there are five instances
-        return
-            
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances")
-                .size() == 5;
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 10_000L, "Expected five instances after creation of tagged instances");
-
-    // Create duplicate broker and server instances (both calls should fail)
+    String createInstanceUrl = 
_controllerRequestURLBuilder.forInstanceCreate();
+    Instance brokerInstance = new Instance("1.2.3.4", 1234, 
InstanceType.BROKER, null, null);
+    sendPostRequest(createInstanceUrl, brokerInstance.toJsonString());
+
+    Instance serverInstance = new Instance("1.2.3.4", 2345, 
InstanceType.SERVER, null, null);
+    sendPostRequest(createInstanceUrl, serverInstance.toJsonString());
+
+    // Check that there are 3 instances
+    
assertEquals(JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl)).get("instances").size(),
 3);
+
+    // Create broker and server instances with tags and pools
+    brokerInstance = new Instance("2.3.4.5", 1234, InstanceType.BROKER, 
Collections.singletonList("tag_BROKER"), null);
+    sendPostRequest(createInstanceUrl, brokerInstance.toJsonString());
+
+    Map<String, Integer> serverPools = new TreeMap<>();
+    serverPools.put("tag_OFFLINE", 0);
+    serverPools.put("tag_REALTIME", 1);
+    serverInstance =
+        new Instance("2.3.4.5", 2345, InstanceType.SERVER, 
Arrays.asList("tag_OFFLINE", "tag_REALTIME"), serverPools);
+    sendPostRequest(createInstanceUrl, serverInstance.toJsonString());
+
+    // Check that there are 5 instances
+    
assertEquals(JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl)).get("instances").size(),
 5);
+
+    // Create duplicate broker and server instances should fail
     try {
-      sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), 
brokerInstance.toString());
+      sendPostRequest(createInstanceUrl, brokerInstance.toJsonString());
       fail("Duplicate broker instance creation did not fail");
     } catch (IOException e) {
       // Expected
     }
 
     try {
-      sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), 
serverInstance.toString());
+      sendPostRequest(createInstanceUrl, serverInstance.toJsonString());
       fail("Duplicate server instance creation did not fail");
     } catch (IOException e) {
       // Expected
     }
 
-    // Check that there are five instances
-    
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList()));
-    assertEquals(
-        
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances")
-            .size(), 5, "Expected five instances after creation of duplicate 
instances");
+    // Check that there are still 5 instances
+    
assertEquals(JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl)).get("instances").size(),
 5);
 
     // Check that the instances are properly created
     JsonNode instance = JsonUtils
         
.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceInformation("Broker_1.2.3.4_1234")));
     assertEquals(instance.get("instanceName").asText(), "Broker_1.2.3.4_1234");
-    assertEquals(instance.get("hostName").asText(), "1.2.3.4");
+    assertEquals(instance.get("hostName").asText(), "Broker_1.2.3.4");
     assertEquals(instance.get("port").asText(), "1234");
     assertTrue(instance.get("enabled").asBoolean());
-    assertEquals(instance.get("tags").size(), 1);
-    assertEquals(instance.get("tags").get(0).asText(), 
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+    assertEquals(instance.get("tags").size(), 0);
+    assertTrue(instance.get("pools").isNull());
 
     instance = JsonUtils
         
.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceInformation("Server_1.2.3.4_2345")));
     assertEquals(instance.get("instanceName").asText(), "Server_1.2.3.4_2345");
-    assertEquals(instance.get("hostName").asText(), "1.2.3.4");
+    assertEquals(instance.get("hostName").asText(), "Server_1.2.3.4");
     assertEquals(instance.get("port").asText(), "2345");
     assertTrue(instance.get("enabled").asBoolean());
-    assertEquals(instance.get("tags").size(), 1);
-    assertEquals(instance.get("tags").get(0).asText(), 
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE);
+    assertEquals(instance.get("tags").size(), 0);
+    assertTrue(instance.get("pools").isNull());
 
     instance = JsonUtils
         
.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceInformation("Broker_2.3.4.5_1234")));
     assertEquals(instance.get("instanceName").asText(), "Broker_2.3.4.5_1234");
-    assertEquals(instance.get("hostName").asText(), "2.3.4.5");
+    assertEquals(instance.get("hostName").asText(), "Broker_2.3.4.5");
     assertEquals(instance.get("port").asText(), "1234");
     assertTrue(instance.get("enabled").asBoolean());
-    assertEquals(instance.get("tags").size(), 1);
-    assertEquals(instance.get("tags").get(0).asText(), "someTag");
+    JsonNode tags = instance.get("tags");
+    assertEquals(tags.size(), 1);
+    assertEquals(tags.get(0).asText(), "tag_BROKER");
+    assertTrue(instance.get("pools").isNull());
 
     instance = JsonUtils
         
.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceInformation("Server_2.3.4.5_2345")));
     assertEquals(instance.get("instanceName").asText(), "Server_2.3.4.5_2345");
-    assertEquals(instance.get("hostName").asText(), "2.3.4.5");
+    assertEquals(instance.get("hostName").asText(), "Server_2.3.4.5");
     assertEquals(instance.get("port").asText(), "2345");
     assertTrue(instance.get("enabled").asBoolean());
-    assertEquals(instance.get("tags").size(), 1);
-    assertEquals(instance.get("tags").get(0).asText(), "someTag");
+    tags = instance.get("tags");
+    assertEquals(tags.size(), 2);
+    assertEquals(tags.get(0).asText(), "tag_OFFLINE");
+    assertEquals(tags.get(1).asText(), "tag_REALTIME");
+    JsonNode pools = instance.get("pools");
+    assertEquals(pools.size(), 2);
+    assertEquals(pools.get("tag_OFFLINE").asText(), "0");
+    assertEquals(pools.get("tag_REALTIME").asText(), "1");
 
     // Check that an error is given for an instance that does not exist
     try {
@@ -158,4 +154,10 @@ public class PinotInstanceRestletResourceTest extends 
ControllerTest {
       // Expected
     }
   }
+
+  @AfterClass
+  public void tearDown() {
+    stopController();
+    stopZk();
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
index fa38d8a..f5d61d6 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
@@ -19,9 +19,9 @@
 package org.apache.pinot.controller.api;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.config.TagNameUtils;
+import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.controller.helix.ControllerTest;
 import org.testng.annotations.AfterClass;
@@ -31,76 +31,40 @@ import org.testng.annotations.Test;
 import static org.testng.Assert.assertEquals;
 
 
-/**
- * Test for table to tenant mapping. Real working test is inside offline 
cluster integration test because it requires
- * segment upload.
- */
 public class PinotTenantRestletResourceTest extends ControllerTest {
-  private final TableConfig.Builder _offlineBuilder = new 
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE);
-  private static final int NUM_BROKER_INSTANCES = 2;
-  private static final int NUM_SERVER_INSTANCES = 6;
+  private static final int NUM_BROKER_INSTANCES = 1;
+  private static final int NUM_SERVER_INSTANCES = 3;
 
   @BeforeClass
-  public void setUp() {
+  public void setUp()
+      throws Exception {
     startZk();
     startController();
+    addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
+    addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
   }
 
   @Test
   public void testTableListForTenant()
       throws Exception {
-    // Create untagged broker and server instances
-    ObjectNode brokerInstance =
-        (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\", 
\"type\":\"broker\", \"port\":\"1234\"}");
-    sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), 
brokerInstance.toString());
-
-    ObjectNode serverInstance =
-        (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\", 
\"type\":\"server\", \"port\":\"2345\"}");
-    sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), 
serverInstance.toString());
-
-    // Create tagged broker and server instances
-    brokerInstance.put("tag", "someTag");
-    brokerInstance.put("host", "2.3.4.5");
-    sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), 
brokerInstance.toString());
-
-    serverInstance.put("tag", "server_REALTIME");
-    serverInstance.put("host", "2.3.4.5");
-    sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), 
serverInstance.toString());
-
     // Check that no tables on tenant works
-    JsonNode tableList =
-        
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTablesFromTenant("server_REALTIME")));
-    assertEquals(tableList.get("tables").size(), 0, "Expected no tables");
-
-    // Try to make sure both kinds of tags work
-    tableList = 
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTablesFromTenant("server")));
-    assertEquals(tableList.get("tables").size(), 0, "Expected no tables");
-
-    // Add a table to the server
-    String createTableUrl = _controllerRequestURLBuilder.forTableCreate();
-
-    addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
-    addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
-
-    
_offlineBuilder.setTableName("testOfflineTable").setTimeColumnName("timeColumn").setTimeType("DAYS")
-        
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setServerTenant("DefaultTenant");
-
-    TableConfig offlineTableConfig = _offlineBuilder.build();
-    offlineTableConfig.setTableName("mytable_OFFLINE");
-    String offlineTableJSONConfigString = 
offlineTableConfig.toJsonConfigString();
-    sendPostRequest(createTableUrl, offlineTableJSONConfigString);
-
-    // Try to make sure both kinds of tags work
-    tableList =
-        
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTablesFromTenant("DefaultTenant")));
-    assertEquals(tableList.get("tables").size(), 1, "Expected 1 table");
-    assertEquals(tableList.get("tables").get(0).asText(), "mytable_OFFLINE");
-
-    stopFakeInstances();
+    String listTablesUrl = 
_controllerRequestURLBuilder.forTablesFromTenant(TagNameUtils.DEFAULT_TENANT_NAME);
+    JsonNode tableList = 
JsonUtils.stringToJsonNode(sendGetRequest(listTablesUrl));
+    assertEquals(tableList.get("tables").size(), 0);
+
+    // Add a table
+    sendPostRequest(_controllerRequestURLBuilder.forTableCreate(),
+        new 
TableConfig.Builder(TableType.OFFLINE).setTableName("testTable").build().toJsonConfigString());
+
+    // There should be 1 table on the tenant
+    tableList = JsonUtils.stringToJsonNode(sendGetRequest(listTablesUrl));
+    assertEquals(tableList.get("tables").size(), 1);
+    assertEquals(tableList.get("tables").get(0).asText(), "testTable_OFFLINE");
   }
 
   @AfterClass
   public void tearDown() {
+    stopFakeInstances();
     stopController();
     stopZk();
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index ed76582..cdf3d9e 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -55,7 +55,6 @@ import org.apache.pinot.controller.helix.ControllerTest;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -407,7 +406,7 @@ public class PinotHelixResourceManagerTest extends 
ControllerTest {
     Assert.assertTrue(leadControllerResourceIdealState.isValid());
     Assert.assertTrue(leadControllerResourceIdealState.isEnabled());
     Assert.assertEquals(leadControllerResourceIdealState.getInstanceGroupTag(),
-        CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+        CommonConstants.Helix.CONTROLLER_INSTANCE);
     Assert.assertEquals(leadControllerResourceIdealState.getNumPartitions(),
         
CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
     Assert.assertEquals(leadControllerResourceIdealState.getReplicas(),
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 599091d..8639b00 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.common.config.ColumnPartitionConfig;
+import org.apache.pinot.common.config.Instance;
 import org.apache.pinot.common.config.ReplicaGroupStrategyConfig;
 import org.apache.pinot.common.config.SegmentPartitionConfig;
 import org.apache.pinot.common.config.TableConfig;
@@ -35,7 +36,6 @@ import 
org.apache.pinot.common.config.instance.InstanceTagPoolConfig;
 import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
 import 
org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy;
 import org.apache.pinot.common.utils.InstancePartitionsType;
-import org.apache.pinot.controller.api.pojos.Instance;
 import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
 import org.testng.annotations.Test;
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BalanceNumSegmentAssignmentStrategyIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BalanceNumSegmentAssignmentStrategyIntegrationTest.java
index b8ac99a..9ce1391 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BalanceNumSegmentAssignmentStrategyIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BalanceNumSegmentAssignmentStrategyIntegrationTest.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pinot.integration.tests;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.helix.model.IdealState;
-import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.config.Instance;
+import org.apache.pinot.common.utils.CommonConstants.Helix.InstanceType;
 import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -38,9 +39,9 @@ import static org.testng.Assert.assertEquals;
  */
 // TODO: clean up this test
 public class BalanceNumSegmentAssignmentStrategyIntegrationTest extends 
UploadRefreshDeleteIntegrationTest {
-  private final String serverTenant = "DefaultTenant_OFFLINE";
-  private final String hostName = "1.2.3.4";
-  private final int basePort = 1234;
+  private static final String HOST = "1.2.3.4";
+  private static final int BASE_PORT = 1234;
+  private static final String SERVER_TAG = "DefaultTenant_OFFLINE";
 
   @BeforeClass
   public void setUp()
@@ -49,12 +50,9 @@ public class 
BalanceNumSegmentAssignmentStrategyIntegrationTest extends UploadRe
 
     // Create eight dummy server instances
     for (int i = 0; i < 8; ++i) {
-      ObjectNode serverInstance = JsonUtils.newObjectNode();
-      serverInstance.put("host", hostName);
-      serverInstance.put("port", Integer.toString(basePort + i));
-      serverInstance.put("tag", serverTenant);
-      serverInstance.put("type", "server");
-      sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), 
serverInstance.toString());
+      Instance serverInstance =
+          new Instance(HOST, BASE_PORT + i, InstanceType.SERVER, 
Collections.singletonList(SERVER_TAG), null);
+      sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(), 
serverInstance.toJsonString());
     }
   }
 
@@ -100,7 +98,7 @@ public class 
BalanceNumSegmentAssignmentStrategyIntegrationTest extends UploadRe
   @Test(dataProvider = "tableNameProvider")
   public void testNoAssignmentToDisabledInstances(String tableName, 
SegmentVersion version)
       throws Exception {
-    List<String> instances = 
_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), serverTenant);
+    List<String> instances = 
_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), SERVER_TAG);
     List<String> disabledInstances = new ArrayList<>();
     // disable 6 instances
     assertEquals(instances.size(), 9);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index f8baa86..c36def8 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -187,7 +187,7 @@ public abstract class ClusterTest extends ControllerTest {
       for (int i = 0; i < minionCount; i++) {
         Configuration config = new PropertiesConfiguration();
         config.setProperty(Helix.Instance.INSTANCE_ID_KEY,
-            Minion.INSTANCE_PREFIX + "minion" + i + "_" + 
(Minion.DEFAULT_HELIX_PORT + i));
+            Helix.PREFIX_OF_MINION_INSTANCE + "minion" + i + "_" + 
(Minion.DEFAULT_HELIX_PORT + i));
         config.setProperty(Helix.Instance.DATA_DIR_KEY, 
Minion.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
         MinionStarter minionStarter = new 
MinionStarter(ZkStarter.DEFAULT_ZK_STR, getHelixClusterName(), config);
 
@@ -405,11 +405,11 @@ public abstract class ClusterTest extends ControllerTest {
       String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String 
timeColumnName, String timeType,
       String schemaName, String brokerTenant, String serverTenant, String 
loadMode, String sortedColumn,
       List<String> invertedIndexColumns, List<String> bloomFilterColumns, 
List<String> noDictionaryColumns,
-      TableTaskConfig taskConfig, String streamConsumerFactoryName) throws 
Exception {
-    addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, 
kafkaTopic, realtimeSegmentFlushRows,
-        avroFile, timeColumnName, timeType, schemaName, brokerTenant, 
serverTenant, loadMode, sortedColumn,
-        invertedIndexColumns, bloomFilterColumns, noDictionaryColumns, 
taskConfig, streamConsumerFactoryName,
-        1);
+      TableTaskConfig taskConfig, String streamConsumerFactoryName)
+      throws Exception {
+    addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl, 
kafkaTopic, realtimeSegmentFlushRows, avroFile,
+        timeColumnName, timeType, schemaName, brokerTenant, serverTenant, 
loadMode, sortedColumn, invertedIndexColumns,
+        bloomFilterColumns, noDictionaryColumns, taskConfig, 
streamConsumerFactoryName, 1);
   }
 
   protected void addRealtimeTable(String tableName, boolean useLlc, String 
kafkaBrokerList, String kafkaZkUrl,
diff --git 
a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java 
b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
index 3ef52e5..f6bc0b1 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
@@ -76,7 +76,7 @@ public class MinionStarter {
     _helixClusterName = helixClusterName;
     _config = config;
     _instanceId = 
config.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
-        CommonConstants.Minion.INSTANCE_PREFIX + NetUtil.getHostAddress() + "_"
+        CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE + 
NetUtil.getHostAddress() + "_"
             + CommonConstants.Minion.DEFAULT_HELIX_PORT);
     setupHelixSystemProperties();
     _helixManager = new ZKHelixManager(_helixClusterName, _instanceId, 
InstanceType.PARTICIPANT, zkAddress);
@@ -141,9 +141,9 @@ public class MinionStarter {
     MetricsHelper.initializeMetrics(_config);
     MetricsRegistry metricsRegistry = new MetricsRegistry();
     MetricsHelper.registerMetricsRegistry(metricsRegistry);
-    final MinionMetrics minionMetrics = new MinionMetrics(
-        _config.getString(CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX_KEY, 
CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX),
-        metricsRegistry);
+    final MinionMetrics minionMetrics = new MinionMetrics(_config
+        .getString(CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX_KEY,
+            CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX), metricsRegistry);
     minionMetrics.initializeGlobalMeters();
     minionContext.setMinionMetrics(minionMetrics);
 
@@ -222,8 +222,8 @@ public class MinionStarter {
   private void addInstanceTagIfNeeded() {
     InstanceConfig instanceConfig = 
_helixAdmin.getInstanceConfig(_helixClusterName, _instanceId);
     if (instanceConfig.getTags().isEmpty()) {
-      LOGGER.info("Adding default Helix tag: {} to Pinot minion", 
CommonConstants.Minion.UNTAGGED_INSTANCE);
-      _helixAdmin.addInstanceTag(_helixClusterName, _instanceId, 
CommonConstants.Minion.UNTAGGED_INSTANCE);
+      LOGGER.info("Adding default Helix tag: {} to Pinot minion", 
CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+      _helixAdmin.addInstanceTag(_helixClusterName, _instanceId, 
CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to