This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1d7b4a5 [Instance Assignment] Enhance Instance class to include the
pool config (#4522)
1d7b4a5 is described below
commit 1d7b4a5763771f93b9abf25fef25aab7c83e4268
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Aug 12 17:29:26 2019 -0700
[Instance Assignment] Enhance Instance class to include the pool config
(#4522)
- Enhance Instance class to include the pool config
- Allow multiple tags in the Instance
- Support pools and multiple tags for instance restlet API
- Make the behavior of joining the cluster via instance restlet API and
auto-join consistent
- HELIX_HOST will have the instance type prefix per Helix behavior
---
.../org/apache/pinot/common/config/Instance.java | 145 +++++++++++++++++++++
.../apache/pinot/common/utils/CommonConstants.java | 29 ++---
.../pinot/controller/api/pojos/Instance.java | 137 -------------------
.../resources/PinotInstanceRestletResource.java | 8 +-
.../helix/core/PinotHelixResourceManager.java | 70 +++++-----
.../instance/InstanceTagPoolSelector.java | 2 +-
.../core/minion/PinotHelixTaskResourceManager.java | 4 +-
.../helix/core/util/HelixSetupUtils.java | 2 +-
.../api/PinotInstanceRestletResourceTest.java | 136 +++++++++----------
.../api/PinotTenantRestletResourceTest.java | 78 +++--------
.../helix/core/PinotHelixResourceManagerTest.java | 3 +-
.../instance/InstanceAssignmentTest.java | 2 +-
...umSegmentAssignmentStrategyIntegrationTest.java | 22 ++--
.../pinot/integration/tests/ClusterTest.java | 12 +-
.../org/apache/pinot/minion/MinionStarter.java | 12 +-
15 files changed, 309 insertions(+), 353 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/Instance.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/Instance.java
new file mode 100644
index 0000000..dc37e20
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/Instance.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
+import org.apache.pinot.common.utils.CommonConstants.Helix.InstanceType;
+import org.apache.pinot.common.utils.JsonUtils;
+
+
+/**
+ * Instance configuration.
+ * <pre>
+ * Example:
+ * {
+ * "host": "hostname.example.com",
+ * "port": 1234,
+ * "type": "SERVER",
+ * "tags": ["example_OFFLINE"],
+ * "pools": {
+ * "example_OFFLINE": 0
+ * }
+ * }
+ * </pre>
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Instance {
+ public static final String POOL_KEY = "pool";
+
+ private final String _host;
+ private final int _port;
+ private final InstanceType _type;
+ private final List<String> _tags;
+ private final Map<String, Integer> _pools;
+
+ @JsonCreator
+ public Instance(@JsonProperty(value = "host", required = true) String host,
+ @JsonProperty(value = "port", required = true) int port,
+ @JsonProperty(value = "type", required = true) InstanceType type,
+ @JsonProperty(value = "tags") @Nullable List<String> tags,
+ @JsonProperty(value = "pools") @Nullable Map<String, Integer> pools) {
+ _host = host;
+ _port = port;
+ _type = type;
+ _tags = tags;
+ _pools = pools;
+ }
+
+ @JsonProperty
+ public String getHost() {
+ return _host;
+ }
+
+ @JsonProperty
+ public int getPort() {
+ return _port;
+ }
+
+ @JsonProperty
+ public InstanceType getType() {
+ return _type;
+ }
+
+ @JsonProperty
+ public List<String> getTags() {
+ return _tags;
+ }
+
+ @JsonProperty
+ public Map<String, Integer> getPools() {
+ return _pools;
+ }
+
+ @JsonIgnore
+ public String getInstanceId() {
+ String prefix;
+ switch (_type) {
+ case CONTROLLER:
+ prefix = Helix.PREFIX_OF_CONTROLLER_INSTANCE;
+ break;
+ case BROKER:
+ prefix = Helix.PREFIX_OF_BROKER_INSTANCE;
+ break;
+ case SERVER:
+ prefix = Helix.PREFIX_OF_SERVER_INSTANCE;
+ break;
+ case MINION:
+ prefix = Helix.PREFIX_OF_MINION_INSTANCE;
+ break;
+ default:
+ throw new IllegalStateException();
+ }
+ return prefix + _host + "_" + _port;
+ }
+
+ public InstanceConfig toInstanceConfig() {
+ InstanceConfig instanceConfig =
InstanceConfig.toInstanceConfig(getInstanceId());
+ if (_tags != null) {
+ for (String tag : _tags) {
+ instanceConfig.addTag(tag);
+ }
+ }
+ if (_pools != null && !_pools.isEmpty()) {
+ Map<String, String> mapValue = new TreeMap<>();
+ for (Map.Entry<String, Integer> entry : _pools.entrySet()) {
+ mapValue.put(entry.getKey(), entry.getValue().toString());
+ }
+ instanceConfig.getRecord().setMapField(POOL_KEY, mapValue);
+ }
+ return instanceConfig;
+ }
+
+ public String toJsonString() {
+ try {
+ return JsonUtils.objectToString(this);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 663557e..d17eb79 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -31,13 +31,10 @@ public class CommonConstants {
public static final String INSTANCE_CONNECTED_METRIC_NAME =
"helix.connected";
- public static final String PREFIX_OF_SERVER_INSTANCE = "Server_";
- public static final String PREFIX_OF_BROKER_INSTANCE = "Broker_";
public static final String PREFIX_OF_CONTROLLER_INSTANCE = "Controller_";
-
- public static final String SERVER_INSTANCE_TYPE = "server";
- public static final String BROKER_INSTANCE_TYPE = "broker";
- public static final String CONTROLLER_INSTANCE_TYPE = "controller";
+ public static final String PREFIX_OF_BROKER_INSTANCE = "Broker_";
+ public static final String PREFIX_OF_SERVER_INSTANCE = "Server_";
+ public static final String PREFIX_OF_MINION_INSTANCE = "Minion_";
public static final String BROKER_RESOURCE_INSTANCE = "brokerResource";
public static final String LEAD_CONTROLLER_RESOURCE_NAME =
"leadControllerResource";
@@ -49,8 +46,11 @@ public class CommonConstants {
public static final int MIN_ACTIVE_REPLICAS = 0;
public static final int REBALANCE_DELAY_MS = 300_000; // 5 minutes.
- public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged";
+ // Instance tags
+ public static final String CONTROLLER_INSTANCE = "controller";
public static final String UNTAGGED_BROKER_INSTANCE = "broker_untagged";
+ public static final String UNTAGGED_SERVER_INSTANCE = "server_untagged";
+ public static final String UNTAGGED_MINION_INSTANCE = "minion_untagged";
public static class StateModel {
public static class SegmentOnlineOfflineStateModel {
@@ -92,6 +92,10 @@ public class CommonConstants {
public static final String ADMIN_PORT_KEY = "adminPort";
}
+ public enum InstanceType {
+ CONTROLLER, BROKER, SERVER, MINION
+ }
+
public enum TableType {
OFFLINE, REALTIME;
@@ -299,9 +303,6 @@ public class CommonConstants {
}
public static class Minion {
- public static final String INSTANCE_PREFIX = "Minion_";
- public static final String INSTANCE_TYPE = "minion";
- public static final String UNTAGGED_INSTANCE = "minion_untagged";
public static final String CONFIG_OF_METRICS_PREFIX = "pinot.minion.";
public static final String METADATA_EVENT_OBSERVER_PREFIX =
"metadata.event.notifier";
@@ -320,14 +321,6 @@ public class CommonConstants {
public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "crypter";
}
- public static class Metric {
- public static class Server {
- public static final String CURRENT_NUMBER_OF_SEGMENTS =
"currentNumberOfSegments";
- public static final String CURRENT_NUMBER_OF_DOCUMENTS =
"currentNumberOfDocuments";
- public static final String NUMBER_OF_DELETED_SEGMENTS =
"numberOfDeletedSegments";
- }
- }
-
public static class Segment {
public static class Realtime {
public enum Status {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/pojos/Instance.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/pojos/Instance.java
deleted file mode 100644
index 7a8ac23..0000000
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/pojos/Instance.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.controller.api.pojos;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.pinot.common.utils.CommonConstants;
-
-
-/**
- * Instance POJO, used as part of the API to create instances.
- */
-//@Example("{\n" + "\t\"host\": \"hostname.example.com\",\n" + "\t\"port\":
\"1234\",\n" + "\t\"type\": \"server\"\n" + "}")
-public class Instance {
- public static final String POOL_KEY = "pool";
-
- private final String _host;
- private final String _port;
- private final String _type;
- private final String _tag;
- private final String _instancePrefix;
-
- public static Instance fromInstanceConfig(InstanceConfig instanceConfig) {
- InstanceConfig ic = instanceConfig;
- String instanceName = ic.getInstanceName();
- String type;
- if
(instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE)) {
- type = CommonConstants.Helix.SERVER_INSTANCE_TYPE;
- } else if
(instanceName.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE)) {
- type = CommonConstants.Helix.BROKER_INSTANCE_TYPE;
- } else {
- throw new RuntimeException("Unknown instance type for: " + instanceName);
- }
-
- Instance instance =
- new Instance(ic.getHostName(), ic.getPort(), type,
org.apache.commons.lang.StringUtils.join(ic.getTags(), ','));
- return instance;
- }
-
- @JsonCreator
- public Instance(@JsonProperty(value = "host", required = true) String host,
- @JsonProperty(value = "port", required = true) String port,
- @JsonProperty(value = "type", required = true) String type,
- @JsonProperty(value = "tag", required = false) String tag) {
- _host = host;
- _port = port;
- _tag = tag;
-
- if (CommonConstants.Helix.SERVER_INSTANCE_TYPE.equalsIgnoreCase(type)) {
- _instancePrefix = CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE;
- _type = CommonConstants.Helix.SERVER_INSTANCE_TYPE;
- } else if
(CommonConstants.Helix.BROKER_INSTANCE_TYPE.equalsIgnoreCase(type)) {
- _instancePrefix = CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE;
- _type = CommonConstants.Helix.BROKER_INSTANCE_TYPE;
- } else if (CommonConstants.Minion.INSTANCE_TYPE.equalsIgnoreCase(type)) {
- _instancePrefix = CommonConstants.Minion.INSTANCE_PREFIX;
- _type = CommonConstants.Minion.INSTANCE_TYPE;
- } else {
- throw new IllegalArgumentException("Invalid instance type " + type + ",
expected either server or broker");
- }
- }
-
- public String getHost() {
- return _host;
- }
-
- public String getPort() {
- return _port;
- }
-
- public String getTag() {
- return _tag;
- }
-
- public String getType() {
- return _type;
- }
-
- public String toInstanceId() {
- return _instancePrefix + _host + "_" + _port;
- }
-
- @Override
- public String toString() {
- final StringBuilder bld = new StringBuilder();
- bld.append("host : " + _host + "\n");
- bld.append("port : " + _port + "\n");
- bld.append("type : " + _type + "\n");
- if (_tag != null) {
- bld.append("tag : " + _tag + "\n");
- }
- return bld.toString();
- }
-
- public InstanceConfig toInstanceConfig() {
- final InstanceConfig iConfig = new InstanceConfig(toInstanceId());
- iConfig.setHostName(_host);
- iConfig.setPort(_port);
- iConfig.setInstanceEnabled(true);
- iConfig.addTag(getTagOrDefaultTag());
- return iConfig;
- }
-
- private String getTagOrDefaultTag() {
- if (_tag != null) {
- return _tag;
- } else {
- switch (_type) {
- case CommonConstants.Helix.SERVER_INSTANCE_TYPE:
- return CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
- case CommonConstants.Helix.BROKER_INSTANCE_TYPE:
- return CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE;
- case CommonConstants.Minion.INSTANCE_TYPE:
- return CommonConstants.Minion.UNTAGGED_INSTANCE;
- default:
- throw new RuntimeException("Unknown instance type " + _type + ", was
expecting either server or broker");
- }
- }
- }
-}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
index 6ff9f16..79788c1 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java
@@ -37,8 +37,8 @@ import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.config.Instance;
import org.apache.pinot.common.utils.JsonUtils;
-import org.apache.pinot.controller.api.pojos.Instance;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import org.slf4j.Logger;
@@ -85,8 +85,7 @@ public class PinotInstanceRestletResource {
@ApiOperation(value = "Get instance information", produces =
MediaType.APPLICATION_JSON)
@ApiResponses(value = {@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 404, message = "Instance not found"), @ApiResponse(code =
500, message = "Internal error")})
public String getInstance(
- @ApiParam(value = "Instance name", required = true, example =
"Server_a.b.com_20000 | Broker_my.broker.com_30000")
- @PathParam("instanceName") String instanceName) {
+ @ApiParam(value = "Instance name", required = true, example =
"Server_a.b.com_20000 | Broker_my.broker.com_30000") @PathParam("instanceName")
String instanceName) {
InstanceConfig instanceConfig =
pinotHelixResourceManager.getHelixInstanceConfig(instanceName);
if (instanceConfig == null) {
throw new ControllerApplicationException(LOGGER, "Instance " +
instanceName + " not found",
@@ -98,6 +97,7 @@ public class PinotInstanceRestletResource {
response.put("enabled", instanceConfig.getInstanceEnabled());
response.put("port", instanceConfig.getPort());
response.set("tags", JsonUtils.objectToJsonNode(instanceConfig.getTags()));
+ response.set("pools",
JsonUtils.objectToJsonNode(instanceConfig.getRecord().getMapField(Instance.POOL_KEY)));
return response.toString();
}
@@ -108,7 +108,7 @@ public class PinotInstanceRestletResource {
@ApiOperation(value = "Create a new instance", consumes =
MediaType.APPLICATION_JSON, notes = "Creates a new instance with given instance
config")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 409, message = "Instance already exists"),
@ApiResponse(code = 500, message = "Internal error")})
public SuccessResponse addInstance(Instance instance) {
- LOGGER.info("Instance creation request received for instance " +
instance.toInstanceId());
+ LOGGER.info("Instance creation request received for instance: {}",
instance.getInstanceId());
if (!pinotHelixResourceManager.addInstance(instance).isSuccessful()) {
throw new ControllerApplicationException(LOGGER, "Instance already
exists", Response.Status.CONFLICT);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index cb69b93..6714044 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -48,13 +48,11 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.ZNRecord;
-import org.apache.helix.examples.MasterSlaveStateModelFactory;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor;
import org.apache.helix.model.CurrentState;
@@ -62,9 +60,9 @@ import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
-import org.apache.helix.model.MasterSlaveSMD;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.config.IndexingConfig;
+import org.apache.pinot.common.config.Instance;
import org.apache.pinot.common.config.OfflineTagConfig;
import org.apache.pinot.common.config.RealtimeTagConfig;
import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
@@ -89,7 +87,7 @@ import
org.apache.pinot.common.partition.ReplicaGroupPartitionAssignment;
import
org.apache.pinot.common.partition.ReplicaGroupPartitionAssignmentGenerator;
import org.apache.pinot.common.restlet.resources.RebalanceResult;
import org.apache.pinot.common.segment.SegmentMetadata;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.BrokerOnlineOfflineStateModel;
import
org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentOnlineOfflineStateModel;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
@@ -99,7 +97,6 @@ import
org.apache.pinot.common.utils.helix.PinotHelixPropertyStoreZnRecordProvid
import org.apache.pinot.common.utils.retry.RetryPolicies;
import org.apache.pinot.common.utils.retry.RetryPolicy;
import org.apache.pinot.controller.ControllerConf;
-import org.apache.pinot.controller.api.pojos.Instance;
import org.apache.pinot.controller.api.resources.StateType;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import
org.apache.pinot.controller.helix.core.rebalance.RebalanceSegmentStrategy;
@@ -258,10 +255,9 @@ public class PinotHelixResourceManager {
*/
private void addInstanceGroupTagIfNeeded() {
InstanceConfig instanceConfig = getHelixInstanceConfig(_instanceId);
- if
(!instanceConfig.containsTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE)) {
- LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.",
_instanceId,
- CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
- instanceConfig.addTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+ if (!instanceConfig.containsTag(Helix.CONTROLLER_INSTANCE)) {
+ LOGGER.info("Controller: {} doesn't contain group tag: {}. Adding one.",
_instanceId, Helix.CONTROLLER_INSTANCE);
+ instanceConfig.addTag(Helix.CONTROLLER_INSTANCE);
HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
accessor.setProperty(accessor.keyBuilder().instanceConfig(_instanceId),
instanceConfig);
}
@@ -360,7 +356,7 @@ public class PinotHelixResourceManager {
@Nonnull
public synchronized PinotResourceManagerResponse addInstance(@Nonnull
Instance instance) {
List<String> instances = getAllInstances();
- String instanceIdToAdd = instance.toInstanceId();
+ String instanceIdToAdd = instance.getInstanceId();
if (instances.contains(instanceIdToAdd)) {
return PinotResourceManagerResponse.failure("Instance " +
instanceIdToAdd + " already exists");
} else {
@@ -589,7 +585,7 @@ public class PinotHelixResourceManager {
}
for (int i = 0; i < numberOfInstancesToAdd; ++i) {
String instanceName = unTaggedInstanceList.get(i);
- retagInstance(instanceName,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE, brokerTenantTag);
+ retagInstance(instanceName, Helix.UNTAGGED_BROKER_INSTANCE,
brokerTenantTag);
// Update idealState by adding new instance to table mapping.
addInstanceToBrokerIdealState(brokerTenantTag, instanceName);
}
@@ -625,7 +621,7 @@ public class PinotHelixResourceManager {
// Update ideal state with the new broker instances
try {
- HelixHelper.updateIdealState(getHelixZkManager(),
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState -> {
+ HelixHelper.updateIdealState(getHelixZkManager(),
Helix.BROKER_RESOURCE_INSTANCE, idealState -> {
assert idealState != null;
Map<String, String> instanceStateMap =
idealState.getInstanceStateMap(tableNameWithType);
if (instanceStateMap != null) {
@@ -646,8 +642,7 @@ public class PinotHelixResourceManager {
}
private void addInstanceToBrokerIdealState(String brokerTenantTag, String
instanceName) {
- IdealState tableIdealState =
- _helixAdmin.getResourceIdealState(_helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ IdealState tableIdealState =
_helixAdmin.getResourceIdealState(_helixClusterName,
Helix.BROKER_RESOURCE_INSTANCE);
for (String tableNameWithType : tableIdealState.getPartitionSet()) {
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
Preconditions.checkNotNull(tableConfig);
@@ -656,15 +651,14 @@ public class PinotHelixResourceManager {
tableIdealState.setPartitionState(tableNameWithType, instanceName,
BrokerOnlineOfflineStateModel.ONLINE);
}
}
- _helixAdmin
- .setResourceIdealState(_helixClusterName,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
+ _helixAdmin.setResourceIdealState(_helixClusterName,
Helix.BROKER_RESOURCE_INSTANCE, tableIdealState);
}
private PinotResourceManagerResponse scaleDownBroker(Tenant tenant, String
brokerTenantTag,
List<String> instancesInClusterWithTag) {
int numberBrokersToUntag = instancesInClusterWithTag.size() -
tenant.getNumberOfInstances();
for (int i = 0; i < numberBrokersToUntag; ++i) {
- retagInstance(instancesInClusterWithTag.get(i), brokerTenantTag,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+ retagInstance(instancesInClusterWithTag.get(i), brokerTenantTag,
Helix.UNTAGGED_BROKER_INSTANCE);
}
return PinotResourceManagerResponse.SUCCESS;
}
@@ -727,11 +721,11 @@ public class PinotHelixResourceManager {
int incOffline = serverTenant.getOfflineInstances() -
taggedOfflineServers.size();
int incRealtime = serverTenant.getRealtimeInstances() -
taggedRealtimeServers.size();
for (int i = 0; i < incOffline; ++i) {
- retagInstance(unTaggedInstanceList.get(i),
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
+ retagInstance(unTaggedInstanceList.get(i),
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
}
for (int i = incOffline; i < incOffline + incRealtime; ++i) {
String instanceName = unTaggedInstanceList.get(i);
- retagInstance(instanceName,
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
+ retagInstance(instanceName, Helix.UNTAGGED_SERVER_INSTANCE,
realtimeServerTag);
// TODO: update idealStates & instanceZkMetadata
}
return PinotResourceManagerResponse.SUCCESS;
@@ -746,14 +740,14 @@ public class PinotHelixResourceManager {
taggedOfflineServers.removeAll(taggedRealtimeServers);
for (int i = 0; i < incOffline; ++i) {
if (i < incInstances) {
- retagInstance(unTaggedInstanceList.get(i),
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
+ retagInstance(unTaggedInstanceList.get(i),
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
} else {
_helixAdmin.addInstanceTag(_helixClusterName,
taggedRealtimeServers.get(i - incInstances), offlineServerTag);
}
}
for (int i = incOffline; i < incOffline + incRealtime; ++i) {
if (i < incInstances) {
- retagInstance(unTaggedInstanceList.get(i),
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
+ retagInstance(unTaggedInstanceList.get(i),
Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
// TODO: update idealStates & instanceZkMetadata
} else {
_helixAdmin.addInstanceTag(_helixClusterName,
taggedOfflineServers.get(i - Math.max(incInstances, incOffline)),
@@ -777,7 +771,7 @@ public class PinotHelixResourceManager {
public boolean isBrokerTenantDeletable(String tenantName) {
String brokerTag = TagNameUtils.getBrokerTagForTenant(tenantName);
Set<String> taggedInstances = new
HashSet<>(HelixHelper.getInstancesWithTag(_helixZkManager, brokerTag));
- String brokerName = CommonConstants.Helix.BROKER_RESOURCE_INSTANCE;
+ String brokerName = Helix.BROKER_RESOURCE_INSTANCE;
IdealState brokerIdealState =
_helixAdmin.getResourceIdealState(_helixClusterName, brokerName);
for (String partition : brokerIdealState.getPartitionSet()) {
for (String instance : brokerIdealState.getInstanceSet(partition)) {
@@ -866,12 +860,12 @@ public class PinotHelixResourceManager {
List<String> unTaggedInstanceList) {
String offlineServerTag =
TagNameUtils.getOfflineTagForTenant(serverTenant.getTenantName());
for (int i = 0; i < serverTenant.getOfflineInstances(); i++) {
- retagInstance(unTaggedInstanceList.get(i),
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
+ retagInstance(unTaggedInstanceList.get(i),
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
}
String realtimeServerTag =
TagNameUtils.getRealtimeTagForTenant(serverTenant.getTenantName());
for (int i = 0; i < serverTenant.getRealtimeInstances(); i++) {
- retagInstance(unTaggedInstanceList.get(i +
serverTenant.getOfflineInstances()),
- CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
+ retagInstance(unTaggedInstanceList.get(i +
serverTenant.getOfflineInstances()), Helix.UNTAGGED_SERVER_INSTANCE,
+ realtimeServerTag);
}
}
@@ -880,11 +874,11 @@ public class PinotHelixResourceManager {
int cnt = 0;
String offlineServerTag =
TagNameUtils.getOfflineTagForTenant(serverTenant.getTenantName());
for (int i = 0; i < serverTenant.getOfflineInstances(); i++) {
- retagInstance(unTaggedInstanceList.get(cnt++),
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
+ retagInstance(unTaggedInstanceList.get(cnt++),
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
}
String realtimeServerTag =
TagNameUtils.getRealtimeTagForTenant(serverTenant.getTenantName());
for (int i = 0; i < serverTenant.getRealtimeInstances(); i++) {
- retagInstance(unTaggedInstanceList.get(cnt++),
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
+ retagInstance(unTaggedInstanceList.get(cnt++),
Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
if (cnt == numberOfInstances) {
cnt = 0;
}
@@ -903,7 +897,7 @@ public class PinotHelixResourceManager {
}
String brokerTag =
TagNameUtils.getBrokerTagForTenant(brokerTenant.getTenantName());
for (int i = 0; i < brokerTenant.getNumberOfInstances(); ++i) {
- retagInstance(unTaggedInstanceList.get(i),
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE, brokerTag);
+ retagInstance(unTaggedInstanceList.get(i),
Helix.UNTAGGED_BROKER_INSTANCE, brokerTag);
}
return PinotResourceManagerResponse.SUCCESS;
}
@@ -914,7 +908,7 @@ public class PinotHelixResourceManager {
for (String instanceName : instancesInClusterWithTag) {
_helixAdmin.removeInstanceTag(_helixClusterName, instanceName,
offlineTenantTag);
if (getTagsForInstance(instanceName).isEmpty()) {
- _helixAdmin.addInstanceTag(_helixClusterName, instanceName,
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE);
+ _helixAdmin.addInstanceTag(_helixClusterName, instanceName,
Helix.UNTAGGED_SERVER_INSTANCE);
}
}
return PinotResourceManagerResponse.SUCCESS;
@@ -926,7 +920,7 @@ public class PinotHelixResourceManager {
for (String instanceName : instancesInClusterWithTag) {
_helixAdmin.removeInstanceTag(_helixClusterName, instanceName,
realtimeTenantTag);
if (getTagsForInstance(instanceName).isEmpty()) {
- _helixAdmin.addInstanceTag(_helixClusterName, instanceName,
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE);
+ _helixAdmin.addInstanceTag(_helixClusterName, instanceName,
Helix.UNTAGGED_SERVER_INSTANCE);
}
}
return PinotResourceManagerResponse.SUCCESS;
@@ -936,7 +930,7 @@ public class PinotHelixResourceManager {
String brokerTag = TagNameUtils.getBrokerTagForTenant(tenantName);
List<String> instancesInClusterWithTag =
HelixHelper.getInstancesWithTag(_helixZkManager, brokerTag);
for (String instance : instancesInClusterWithTag) {
- retagInstance(instance, brokerTag,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+ retagInstance(instance, brokerTag, Helix.UNTAGGED_BROKER_INSTANCE);
}
return PinotResourceManagerResponse.SUCCESS;
}
@@ -1361,8 +1355,8 @@ public class PinotHelixResourceManager {
private void handleBrokerResource(@Nonnull final String tableName, @Nonnull
final List<String> brokersForTenant) {
LOGGER.info("Updating BrokerResource IdealState for table: {}", tableName);
- HelixHelper.updateIdealState(_helixZkManager,
CommonConstants.Helix.BROKER_RESOURCE_INSTANCE,
- new Function<IdealState, IdealState>() {
+ HelixHelper
+ .updateIdealState(_helixZkManager, Helix.BROKER_RESOURCE_INSTANCE, new
Function<IdealState, IdealState>() {
@Override
public IdealState apply(@Nullable IdealState idealState) {
Preconditions.checkNotNull(idealState);
@@ -1592,7 +1586,7 @@ public class PinotHelixResourceManager {
recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
recipientCriteria.setInstanceName("%");
recipientCriteria.setSessionSpecific(true);
-
recipientCriteria.setResource(CommonConstants.Helix.BROKER_RESOURCE_INSTANCE);
+ recipientCriteria.setResource(Helix.BROKER_RESOURCE_INSTANCE);
recipientCriteria.setDataSource(Criteria.DataSource.EXTERNALVIEW);
// The brokerResource field in the EXTERNALVIEW stores the offline table
name in the Partition subfield.
recipientCriteria.setPartition(tableNameWithType);
@@ -2240,8 +2234,7 @@ public class PinotHelixResourceManager {
*/
public List<String> getOnlineUnTaggedBrokerInstanceList() {
- final List<String> instanceList =
- HelixHelper.getInstancesWithTag(_helixZkManager,
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+ final List<String> instanceList =
HelixHelper.getInstancesWithTag(_helixZkManager,
Helix.UNTAGGED_BROKER_INSTANCE);
final List<String> liveInstances =
_helixDataAccessor.getChildNames(_keyBuilder.liveInstances());
instanceList.retainAll(liveInstances);
return instanceList;
@@ -2252,8 +2245,7 @@ public class PinotHelixResourceManager {
* @return List of untagged online server instances.
*/
public List<String> getOnlineUnTaggedServerInstanceList() {
- final List<String> instanceList =
- HelixHelper.getInstancesWithTag(_helixZkManager,
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE);
+ final List<String> instanceList =
HelixHelper.getInstancesWithTag(_helixZkManager,
Helix.UNTAGGED_SERVER_INSTANCE);
final List<String> liveInstances =
_helixDataAccessor.getChildNames(_keyBuilder.liveInstances());
instanceList.retainAll(liveInstances);
return instanceList;
@@ -2285,7 +2277,7 @@ public class PinotHelixResourceManager {
ZNRecord record = helixInstanceConfig.getRecord();
String[] hostnameSplit = helixInstanceConfig.getHostName().split("_");
Preconditions.checkState(hostnameSplit.length >= 2);
- String adminPort =
record.getSimpleField(CommonConstants.Helix.Instance.ADMIN_PORT_KEY);
+ String adminPort = record.getSimpleField(Helix.Instance.ADMIN_PORT_KEY);
// If admin port is missing, there's no point to calculate the remaining
table size.
// Thus, throwing an exception will be good here.
if (Strings.isNullOrEmpty(adminPort)) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
index 5651b89..d7176db 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceTagPoolSelector.java
@@ -26,8 +26,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.config.Instance;
import org.apache.pinot.common.config.instance.InstanceTagPoolConfig;
-import org.apache.pinot.controller.api.pojos.Instance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
index e5b4090..9899dfa 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/PinotHelixTaskResourceManager.java
@@ -36,7 +36,7 @@ import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
import org.apache.pinot.common.config.PinotTaskConfig;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -175,7 +175,7 @@ public class PinotHelixTaskResourceManager {
@Nonnull
public synchronized String submitTask(@Nonnull List<PinotTaskConfig>
pinotTaskConfigs,
int numConcurrentTasksPerInstance) {
- return submitTask(pinotTaskConfigs,
CommonConstants.Minion.UNTAGGED_INSTANCE, numConcurrentTasksPerInstance);
+ return submitTask(pinotTaskConfigs, Helix.UNTAGGED_MINION_INSTANCE,
numConcurrentTasksPerInstance);
}
/**
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
index a501767..37dec22 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java
@@ -170,7 +170,7 @@ public class HelixSetupUtils {
idealStateBuilder.enableDelayRebalance();
// Set instance group tag
IdealState idealState = idealStateBuilder.build();
- idealState.setInstanceGroupTag(CONTROLLER_INSTANCE_TYPE);
+ idealState.setInstanceGroupTag(CONTROLLER_INSTANCE);
// Set batch message mode
idealState.setBatchMessageMode(enableBatchMessageMode);
// Explicitly disable this resource when creating this new resource.
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
index cb06fc4..a8dac6c 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java
@@ -19,12 +19,17 @@
package org.apache.pinot.controller.api;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
-import org.apache.pinot.common.utils.CommonConstants;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.pinot.common.utils.CommonConstants.Helix;
+import org.apache.pinot.common.utils.CommonConstants.Helix.InstanceType;
import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.config.Instance;
import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -37,6 +42,7 @@ import static org.testng.Assert.fail;
* Tests for the instances Restlet.
*/
public class PinotInstanceRestletResourceTest extends ControllerTest {
+
@BeforeClass
public void setUp() {
startZk();
@@ -46,109 +52,99 @@ public class PinotInstanceRestletResourceTest extends
ControllerTest {
@Test
public void testInstanceListingAndCreation()
throws Exception {
- // Check that there is only one instance, which is the controller instance.
- JsonNode instanceList =
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList()));
- assertEquals(instanceList.get("instances").size(), 1, "Expected only one
instance at beginning of test");
+ // Check that there is only one CONTROLLER instance in the cluster
+ String listInstancesUrl = _controllerRequestURLBuilder.forInstanceList();
+ JsonNode instanceList =
JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl));
+ assertEquals(instanceList.get("instances").size(), 1);
+
assertTrue(instanceList.get("instances").get(0).asText().startsWith(Helix.PREFIX_OF_CONTROLLER_INSTANCE));
// Create untagged broker and server instances
- ObjectNode brokerInstance =
- (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\",
\"type\":\"broker\", \"port\":\"1234\"}");
- sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(),
brokerInstance.toString());
-
- ObjectNode serverInstance =
- (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\",
\"type\":\"server\", \"port\":\"2345\"}");
- sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(),
serverInstance.toString());
-
- // Check that there are three instances
- TestUtils.waitForCondition(aVoid -> {
- try {
- // Check that there are two instances
- return
-
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances")
- .size() == 3;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 10_000L, "Expected three instances after creation of tagged instances");
-
- // Create tagged broker and server instances
- brokerInstance.put("tag", "someTag");
- brokerInstance.put("host", "2.3.4.5");
- sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(),
brokerInstance.toString());
-
- serverInstance.put("tag", "someTag");
- serverInstance.put("host", "2.3.4.5");
- sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(),
serverInstance.toString());
-
- // It may take some time for cache data accessor to update its data.
- TestUtils.waitForCondition(aVoid -> {
- try {
- // Check that there are five instances
- return
-
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances")
- .size() == 5;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }, 10_000L, "Expected five instances after creation of tagged instances");
-
- // Create duplicate broker and server instances (both calls should fail)
+ String createInstanceUrl =
_controllerRequestURLBuilder.forInstanceCreate();
+ Instance brokerInstance = new Instance("1.2.3.4", 1234,
InstanceType.BROKER, null, null);
+ sendPostRequest(createInstanceUrl, brokerInstance.toJsonString());
+
+ Instance serverInstance = new Instance("1.2.3.4", 2345,
InstanceType.SERVER, null, null);
+ sendPostRequest(createInstanceUrl, serverInstance.toJsonString());
+
+ // Check that there are 3 instances
+
assertEquals(JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl)).get("instances").size(),
3);
+
+ // Create broker and server instances with tags and pools
+ brokerInstance = new Instance("2.3.4.5", 1234, InstanceType.BROKER,
Collections.singletonList("tag_BROKER"), null);
+ sendPostRequest(createInstanceUrl, brokerInstance.toJsonString());
+
+ Map<String, Integer> serverPools = new TreeMap<>();
+ serverPools.put("tag_OFFLINE", 0);
+ serverPools.put("tag_REALTIME", 1);
+ serverInstance =
+ new Instance("2.3.4.5", 2345, InstanceType.SERVER,
Arrays.asList("tag_OFFLINE", "tag_REALTIME"), serverPools);
+ sendPostRequest(createInstanceUrl, serverInstance.toJsonString());
+
+ // Check that there are 5 instances
+
assertEquals(JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl)).get("instances").size(),
5);
+
+ // Create duplicate broker and server instances should fail
try {
- sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(),
brokerInstance.toString());
+ sendPostRequest(createInstanceUrl, brokerInstance.toJsonString());
fail("Duplicate broker instance creation did not fail");
} catch (IOException e) {
// Expected
}
try {
- sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(),
serverInstance.toString());
+ sendPostRequest(createInstanceUrl, serverInstance.toJsonString());
fail("Duplicate server instance creation did not fail");
} catch (IOException e) {
// Expected
}
- // Check that there are five instances
-
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList()));
- assertEquals(
-
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceList())).get("instances")
- .size(), 5, "Expected five instances after creation of duplicate
instances");
+ // Check that there are still 5 instances
+
assertEquals(JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl)).get("instances").size(),
5);
// Check that the instances are properly created
JsonNode instance = JsonUtils
.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceInformation("Broker_1.2.3.4_1234")));
assertEquals(instance.get("instanceName").asText(), "Broker_1.2.3.4_1234");
- assertEquals(instance.get("hostName").asText(), "1.2.3.4");
+ assertEquals(instance.get("hostName").asText(), "Broker_1.2.3.4");
assertEquals(instance.get("port").asText(), "1234");
assertTrue(instance.get("enabled").asBoolean());
- assertEquals(instance.get("tags").size(), 1);
- assertEquals(instance.get("tags").get(0).asText(),
CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE);
+ assertEquals(instance.get("tags").size(), 0);
+ assertTrue(instance.get("pools").isNull());
instance = JsonUtils
.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceInformation("Server_1.2.3.4_2345")));
assertEquals(instance.get("instanceName").asText(), "Server_1.2.3.4_2345");
- assertEquals(instance.get("hostName").asText(), "1.2.3.4");
+ assertEquals(instance.get("hostName").asText(), "Server_1.2.3.4");
assertEquals(instance.get("port").asText(), "2345");
assertTrue(instance.get("enabled").asBoolean());
- assertEquals(instance.get("tags").size(), 1);
- assertEquals(instance.get("tags").get(0).asText(),
CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE);
+ assertEquals(instance.get("tags").size(), 0);
+ assertTrue(instance.get("pools").isNull());
instance = JsonUtils
.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceInformation("Broker_2.3.4.5_1234")));
assertEquals(instance.get("instanceName").asText(), "Broker_2.3.4.5_1234");
- assertEquals(instance.get("hostName").asText(), "2.3.4.5");
+ assertEquals(instance.get("hostName").asText(), "Broker_2.3.4.5");
assertEquals(instance.get("port").asText(), "1234");
assertTrue(instance.get("enabled").asBoolean());
- assertEquals(instance.get("tags").size(), 1);
- assertEquals(instance.get("tags").get(0).asText(), "someTag");
+ JsonNode tags = instance.get("tags");
+ assertEquals(tags.size(), 1);
+ assertEquals(tags.get(0).asText(), "tag_BROKER");
+ assertTrue(instance.get("pools").isNull());
instance = JsonUtils
.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forInstanceInformation("Server_2.3.4.5_2345")));
assertEquals(instance.get("instanceName").asText(), "Server_2.3.4.5_2345");
- assertEquals(instance.get("hostName").asText(), "2.3.4.5");
+ assertEquals(instance.get("hostName").asText(), "Server_2.3.4.5");
assertEquals(instance.get("port").asText(), "2345");
assertTrue(instance.get("enabled").asBoolean());
- assertEquals(instance.get("tags").size(), 1);
- assertEquals(instance.get("tags").get(0).asText(), "someTag");
+ tags = instance.get("tags");
+ assertEquals(tags.size(), 2);
+ assertEquals(tags.get(0).asText(), "tag_OFFLINE");
+ assertEquals(tags.get(1).asText(), "tag_REALTIME");
+ JsonNode pools = instance.get("pools");
+ assertEquals(pools.size(), 2);
+ assertEquals(pools.get("tag_OFFLINE").asText(), "0");
+ assertEquals(pools.get("tag_REALTIME").asText(), "1");
// Check that an error is given for an instance that does not exist
try {
@@ -158,4 +154,10 @@ public class PinotInstanceRestletResourceTest extends
ControllerTest {
// Expected
}
}
+
+ @AfterClass
+ public void tearDown() {
+ stopController();
+ stopZk();
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
index fa38d8a..f5d61d6 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotTenantRestletResourceTest.java
@@ -19,9 +19,9 @@
package org.apache.pinot.controller.api;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.config.TagNameUtils;
+import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.controller.helix.ControllerTest;
import org.testng.annotations.AfterClass;
@@ -31,76 +31,40 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
-/**
- * Test for table to tenant mapping. Real working test is inside offline
cluster integration test because it requires
- * segment upload.
- */
public class PinotTenantRestletResourceTest extends ControllerTest {
- private final TableConfig.Builder _offlineBuilder = new
TableConfig.Builder(CommonConstants.Helix.TableType.OFFLINE);
- private static final int NUM_BROKER_INSTANCES = 2;
- private static final int NUM_SERVER_INSTANCES = 6;
+ private static final int NUM_BROKER_INSTANCES = 1;
+ private static final int NUM_SERVER_INSTANCES = 3;
@BeforeClass
- public void setUp() {
+ public void setUp()
+ throws Exception {
startZk();
startController();
+ addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
+ addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
}
@Test
public void testTableListForTenant()
throws Exception {
- // Create untagged broker and server instances
- ObjectNode brokerInstance =
- (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\",
\"type\":\"broker\", \"port\":\"1234\"}");
- sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(),
brokerInstance.toString());
-
- ObjectNode serverInstance =
- (ObjectNode) JsonUtils.stringToJsonNode("{\"host\":\"1.2.3.4\",
\"type\":\"server\", \"port\":\"2345\"}");
- sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(),
serverInstance.toString());
-
- // Create tagged broker and server instances
- brokerInstance.put("tag", "someTag");
- brokerInstance.put("host", "2.3.4.5");
- sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(),
brokerInstance.toString());
-
- serverInstance.put("tag", "server_REALTIME");
- serverInstance.put("host", "2.3.4.5");
- sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(),
serverInstance.toString());
-
// Check that no tables on tenant works
- JsonNode tableList =
-
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTablesFromTenant("server_REALTIME")));
- assertEquals(tableList.get("tables").size(), 0, "Expected no tables");
-
- // Try to make sure both kinds of tags work
- tableList =
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTablesFromTenant("server")));
- assertEquals(tableList.get("tables").size(), 0, "Expected no tables");
-
- // Add a table to the server
- String createTableUrl = _controllerRequestURLBuilder.forTableCreate();
-
- addFakeBrokerInstancesToAutoJoinHelixCluster(NUM_BROKER_INSTANCES, true);
- addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVER_INSTANCES, true);
-
-
_offlineBuilder.setTableName("testOfflineTable").setTimeColumnName("timeColumn").setTimeType("DAYS")
-
.setRetentionTimeUnit("DAYS").setRetentionTimeValue("5").setServerTenant("DefaultTenant");
-
- TableConfig offlineTableConfig = _offlineBuilder.build();
- offlineTableConfig.setTableName("mytable_OFFLINE");
- String offlineTableJSONConfigString =
offlineTableConfig.toJsonConfigString();
- sendPostRequest(createTableUrl, offlineTableJSONConfigString);
-
- // Try to make sure both kinds of tags work
- tableList =
-
JsonUtils.stringToJsonNode(sendGetRequest(_controllerRequestURLBuilder.forTablesFromTenant("DefaultTenant")));
- assertEquals(tableList.get("tables").size(), 1, "Expected 1 table");
- assertEquals(tableList.get("tables").get(0).asText(), "mytable_OFFLINE");
-
- stopFakeInstances();
+ String listTablesUrl =
_controllerRequestURLBuilder.forTablesFromTenant(TagNameUtils.DEFAULT_TENANT_NAME);
+ JsonNode tableList =
JsonUtils.stringToJsonNode(sendGetRequest(listTablesUrl));
+ assertEquals(tableList.get("tables").size(), 0);
+
+ // Add a table
+ sendPostRequest(_controllerRequestURLBuilder.forTableCreate(),
+ new
TableConfig.Builder(TableType.OFFLINE).setTableName("testTable").build().toJsonConfigString());
+
+ // There should be 1 table on the tenant
+ tableList = JsonUtils.stringToJsonNode(sendGetRequest(listTablesUrl));
+ assertEquals(tableList.get("tables").size(), 1);
+ assertEquals(tableList.get("tables").get(0).asText(), "testTable_OFFLINE");
}
@AfterClass
public void tearDown() {
+ stopFakeInstances();
stopController();
stopZk();
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index ed76582..cdf3d9e 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -55,7 +55,6 @@ import org.apache.pinot.controller.helix.ControllerTest;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -407,7 +406,7 @@ public class PinotHelixResourceManagerTest extends
ControllerTest {
Assert.assertTrue(leadControllerResourceIdealState.isValid());
Assert.assertTrue(leadControllerResourceIdealState.isEnabled());
Assert.assertEquals(leadControllerResourceIdealState.getInstanceGroupTag(),
- CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE);
+ CommonConstants.Helix.CONTROLLER_INSTANCE);
Assert.assertEquals(leadControllerResourceIdealState.getNumPartitions(),
CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE);
Assert.assertEquals(leadControllerResourceIdealState.getReplicas(),
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 599091d..8639b00 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.config.ColumnPartitionConfig;
+import org.apache.pinot.common.config.Instance;
import org.apache.pinot.common.config.ReplicaGroupStrategyConfig;
import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.config.TableConfig;
@@ -35,7 +36,6 @@ import
org.apache.pinot.common.config.instance.InstanceTagPoolConfig;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
import
org.apache.pinot.common.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.common.utils.InstancePartitionsType;
-import org.apache.pinot.controller.api.pojos.Instance;
import org.apache.pinot.controller.helix.core.assignment.InstancePartitions;
import org.testng.annotations.Test;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BalanceNumSegmentAssignmentStrategyIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BalanceNumSegmentAssignmentStrategyIntegrationTest.java
index b8ac99a..9ce1391 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BalanceNumSegmentAssignmentStrategyIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BalanceNumSegmentAssignmentStrategyIntegrationTest.java
@@ -18,13 +18,14 @@
*/
package org.apache.pinot.integration.tests;
-import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.helix.model.IdealState;
-import org.apache.pinot.common.utils.JsonUtils;
+import org.apache.pinot.common.config.Instance;
+import org.apache.pinot.common.utils.CommonConstants.Helix.InstanceType;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -38,9 +39,9 @@ import static org.testng.Assert.assertEquals;
*/
// TODO: clean up this test
public class BalanceNumSegmentAssignmentStrategyIntegrationTest extends
UploadRefreshDeleteIntegrationTest {
- private final String serverTenant = "DefaultTenant_OFFLINE";
- private final String hostName = "1.2.3.4";
- private final int basePort = 1234;
+ private static final String HOST = "1.2.3.4";
+ private static final int BASE_PORT = 1234;
+ private static final String SERVER_TAG = "DefaultTenant_OFFLINE";
@BeforeClass
public void setUp()
@@ -49,12 +50,9 @@ public class
BalanceNumSegmentAssignmentStrategyIntegrationTest extends UploadRe
// Create eight dummy server instances
for (int i = 0; i < 8; ++i) {
- ObjectNode serverInstance = JsonUtils.newObjectNode();
- serverInstance.put("host", hostName);
- serverInstance.put("port", Integer.toString(basePort + i));
- serverInstance.put("tag", serverTenant);
- serverInstance.put("type", "server");
- sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(),
serverInstance.toString());
+ Instance serverInstance =
+ new Instance(HOST, BASE_PORT + i, InstanceType.SERVER,
Collections.singletonList(SERVER_TAG), null);
+ sendPostRequest(_controllerRequestURLBuilder.forInstanceCreate(),
serverInstance.toJsonString());
}
}
@@ -100,7 +98,7 @@ public class
BalanceNumSegmentAssignmentStrategyIntegrationTest extends UploadRe
@Test(dataProvider = "tableNameProvider")
public void testNoAssignmentToDisabledInstances(String tableName,
SegmentVersion version)
throws Exception {
- List<String> instances =
_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), serverTenant);
+ List<String> instances =
_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(), SERVER_TAG);
List<String> disabledInstances = new ArrayList<>();
// disable 6 instances
assertEquals(instances.size(), 9);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index f8baa86..c36def8 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -187,7 +187,7 @@ public abstract class ClusterTest extends ControllerTest {
for (int i = 0; i < minionCount; i++) {
Configuration config = new PropertiesConfiguration();
config.setProperty(Helix.Instance.INSTANCE_ID_KEY,
- Minion.INSTANCE_PREFIX + "minion" + i + "_" +
(Minion.DEFAULT_HELIX_PORT + i));
+ Helix.PREFIX_OF_MINION_INSTANCE + "minion" + i + "_" +
(Minion.DEFAULT_HELIX_PORT + i));
config.setProperty(Helix.Instance.DATA_DIR_KEY,
Minion.DEFAULT_INSTANCE_DATA_DIR + "-" + i);
MinionStarter minionStarter = new
MinionStarter(ZkStarter.DEFAULT_ZK_STR, getHelixClusterName(), config);
@@ -405,11 +405,11 @@ public abstract class ClusterTest extends ControllerTest {
String kafkaTopic, int realtimeSegmentFlushRows, File avroFile, String
timeColumnName, String timeType,
String schemaName, String brokerTenant, String serverTenant, String
loadMode, String sortedColumn,
List<String> invertedIndexColumns, List<String> bloomFilterColumns,
List<String> noDictionaryColumns,
- TableTaskConfig taskConfig, String streamConsumerFactoryName) throws
Exception {
- addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl,
kafkaTopic, realtimeSegmentFlushRows,
- avroFile, timeColumnName, timeType, schemaName, brokerTenant,
serverTenant, loadMode, sortedColumn,
- invertedIndexColumns, bloomFilterColumns, noDictionaryColumns,
taskConfig, streamConsumerFactoryName,
- 1);
+ TableTaskConfig taskConfig, String streamConsumerFactoryName)
+ throws Exception {
+ addRealtimeTable(tableName, useLlc, kafkaBrokerList, kafkaZkUrl,
kafkaTopic, realtimeSegmentFlushRows, avroFile,
+ timeColumnName, timeType, schemaName, brokerTenant, serverTenant,
loadMode, sortedColumn, invertedIndexColumns,
+ bloomFilterColumns, noDictionaryColumns, taskConfig,
streamConsumerFactoryName, 1);
}
protected void addRealtimeTable(String tableName, boolean useLlc, String
kafkaBrokerList, String kafkaZkUrl,
diff --git
a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
index 3ef52e5..f6bc0b1 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionStarter.java
@@ -76,7 +76,7 @@ public class MinionStarter {
_helixClusterName = helixClusterName;
_config = config;
_instanceId =
config.getString(CommonConstants.Helix.Instance.INSTANCE_ID_KEY,
- CommonConstants.Minion.INSTANCE_PREFIX + NetUtil.getHostAddress() + "_"
+ CommonConstants.Helix.PREFIX_OF_MINION_INSTANCE +
NetUtil.getHostAddress() + "_"
+ CommonConstants.Minion.DEFAULT_HELIX_PORT);
setupHelixSystemProperties();
_helixManager = new ZKHelixManager(_helixClusterName, _instanceId,
InstanceType.PARTICIPANT, zkAddress);
@@ -141,9 +141,9 @@ public class MinionStarter {
MetricsHelper.initializeMetrics(_config);
MetricsRegistry metricsRegistry = new MetricsRegistry();
MetricsHelper.registerMetricsRegistry(metricsRegistry);
- final MinionMetrics minionMetrics = new MinionMetrics(
- _config.getString(CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX_KEY,
CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX),
- metricsRegistry);
+ final MinionMetrics minionMetrics = new MinionMetrics(_config
+ .getString(CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX_KEY,
+ CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX), metricsRegistry);
minionMetrics.initializeGlobalMeters();
minionContext.setMinionMetrics(minionMetrics);
@@ -222,8 +222,8 @@ public class MinionStarter {
private void addInstanceTagIfNeeded() {
InstanceConfig instanceConfig =
_helixAdmin.getInstanceConfig(_helixClusterName, _instanceId);
if (instanceConfig.getTags().isEmpty()) {
- LOGGER.info("Adding default Helix tag: {} to Pinot minion",
CommonConstants.Minion.UNTAGGED_INSTANCE);
- _helixAdmin.addInstanceTag(_helixClusterName, _instanceId,
CommonConstants.Minion.UNTAGGED_INSTANCE);
+ LOGGER.info("Adding default Helix tag: {} to Pinot minion",
CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
+ _helixAdmin.addInstanceTag(_helixClusterName, _instanceId,
CommonConstants.Helix.UNTAGGED_MINION_INSTANCE);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]