This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 25aa780 Adding controller APIs to fetch brokers information (#5685)
25aa780 is described below
commit 25aa780814809c3290d278bb2136373865f5c947
Author: Xiang Fu <[email protected]>
AuthorDate: Tue Jul 14 16:25:58 2020 -0700
Adding controller APIs to fetch brokers information (#5685)
* Adding controller APIs to fetch brokers information
* Address comments
* Address comments
---
.../pinot/controller/api/resources/Constants.java | 1 +
.../api/resources/PinotBrokerRestletResource.java | 145 +++++++++++++++++
.../helix/ControllerRequestURLBuilder.java | 42 +++++
.../api/PinotBrokerRestletResourceTest.java | 171 +++++++++++++++++++++
.../pinot/controller/helix/ControllerTest.java | 10 ++
5 files changed, 369 insertions(+)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index 5e463e0..88658b0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -35,6 +35,7 @@ public class Constants {
public static final String INSTANCE_TAG = "Instance";
public static final String SCHEMA_TAG = "Schema";
public static final String TENANT_TAG = "Tenant";
+ public static final String BROKER_TAG = "Broker";
public static final String SEGMENT_TAG = "Segment";
public static final String TASK_TAG = "Task";
public static final String LEAD_CONTROLLER_TAG = "Leader";
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
new file mode 100644
index 0000000..3416769
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.google.common.collect.ImmutableList;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@Api(tags = Constants.BROKER_TAG)
+@Path("/")
+public class PinotBrokerRestletResource {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(PinotBrokerRestletResource.class);
+
+ @Inject
+ PinotHelixResourceManager _pinotHelixResourceManager;
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers")
+ @ApiOperation(value = "List tenants and tables to brokers mappings", notes =
"List tenants and tables to brokers mappings")
+ public Map<String, Map<String, List<String>>> listBrokersMapping(
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ Map<String, Map<String, List<String>>> resultMap = new HashMap<>();
+ resultMap.put("tenants", getTenantsToBrokersMapping(state));
+ resultMap.put("tables", getTablesToBrokersMapping(state));
+ return resultMap;
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers/tenants")
+ @ApiOperation(value = "List tenants to brokers mappings", notes = "List
tenants to brokers mappings")
+ public Map<String, List<String>> getTenantsToBrokersMapping(
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ Map<String, List<String>> resultMap = new HashMap<>();
+ _pinotHelixResourceManager.getAllBrokerTenantNames().stream()
+ .forEach(tenant -> resultMap.put(tenant, getBrokersForTenant(tenant,
state)));
+ return resultMap;
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers/tenants/{tenantName}")
+ @ApiOperation(value = "List brokers for a given tenant", notes = "List
brokers for a given tenant")
+ public List<String> getBrokersForTenant(
+ @ApiParam(value = "Name of the tenant", required = true)
@PathParam("tenantName") String tenantName,
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ if
(!_pinotHelixResourceManager.getAllBrokerTenantNames().contains(tenantName)) {
+ throw new ControllerApplicationException(LOGGER, String.format("Tenant
'%s' not found.", tenantName),
+ Response.Status.NOT_FOUND);
+ }
+ Set<String> tenantBrokers = new
HashSet<>(_pinotHelixResourceManager.getAllInstancesForBrokerTenant(tenantName));
+ applyStateChanges(tenantBrokers, state);
+ return ImmutableList.copyOf(tenantBrokers);
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers/tables")
+ @ApiOperation(value = "List tables to brokers mappings", notes = "List
tables to brokers mappings")
+ public Map<String, List<String>> getTablesToBrokersMapping(
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ Map<String, List<String>> resultMap = new HashMap<>();
+ _pinotHelixResourceManager.getAllRawTables().stream()
+ .forEach(table -> resultMap.put(table, getBrokersForTable(table, null,
state)));
+ return resultMap;
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers/tables/{tableName}")
+ @ApiOperation(value = "List brokers for a given table", notes = "List
brokers for a given table")
+ public List<String> getBrokersForTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
+ @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ try {
+ List<String> tableNamesWithType = _pinotHelixResourceManager
+ .getExistingTableNamesWithType(tableName,
Constants.validateTableType(tableTypeStr));
+ if (tableNamesWithType.isEmpty()) {
+ throw new ControllerApplicationException(LOGGER, String.format("Table
'%s' not found.", tableName),
+ Response.Status.NOT_FOUND);
+ }
+ Set<String> tableBrokers =
+ new
HashSet<>(_pinotHelixResourceManager.getBrokerInstancesFor(tableNamesWithType.get(0)));
+ applyStateChanges(tableBrokers, state);
+ return ImmutableList.copyOf(tableBrokers);
+ } catch (TableNotFoundException e) {
+ throw new ControllerApplicationException(LOGGER, String.format("Table
'%s' not found.", tableName),
+ Response.Status.NOT_FOUND);
+ } catch (IllegalArgumentException e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.FORBIDDEN);
+ }
+ }
+
+ private void applyStateChanges(Set<String> brokers, String state) {
+ if (state == null) {
+ return;
+ }
+ switch (state) {
+ case CommonConstants.Helix.StateModel.BrokerResourceStateModel.ONLINE:
+ brokers.retainAll(_pinotHelixResourceManager.getOnlineInstanceList());
+ break;
+ case CommonConstants.Helix.StateModel.BrokerResourceStateModel.OFFLINE:
+ brokers.removeAll(_pinotHelixResourceManager.getOnlineInstanceList());
+ break;
+ }
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
index 802d085..069b91b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestURLBuilder.java
@@ -90,6 +90,48 @@ public class ControllerRequestURLBuilder {
return StringUtil.join("/", _baseUrl, "tenants", tenantName,
"?type=server");
}
+ public String forBrokersGet(String state) {
+ if (state == null) {
+ return StringUtil.join("/", _baseUrl, "brokers");
+ }
+ return StringUtil.join("/", _baseUrl, "brokers", "?state=" + state);
+ }
+
+ public String forBrokerTenantsGet(String state) {
+ if (state == null) {
+ return StringUtil.join("/", _baseUrl, "brokers", "tenants");
+ }
+ return StringUtil.join("/", _baseUrl, "brokers", "tenants", "?state=" +
state);
+ }
+
+ public String forBrokerTenantGet(String tenant, String state) {
+ if (state == null) {
+ return StringUtil.join("/", _baseUrl, "brokers", "tenants", tenant);
+ }
+ return StringUtil.join("/", _baseUrl, "brokers", "tenants", tenant,
"?state=" + state);
+ }
+
+ public String forBrokerTablesGet(String state) {
+ if (state == null) {
+ return StringUtil.join("/", _baseUrl, "brokers", "tables");
+ }
+ return StringUtil.join("/", _baseUrl, "brokers", "tables", "?state=" +
state);
+ }
+
+ public String forBrokerTableGet(String table, String tableType, String
state) {
+ StringBuilder params = new StringBuilder();
+ if (tableType != null) {
+ params.append("?type=" + tableType);
+ }
+ if (state != null) {
+ if (params.length() > 0) {
+ params.append("&");
+ }
+ params.append("?state=" + state);
+ }
+ return StringUtil.join("/", _baseUrl, "brokers", "tables", table,
params.toString());
+ }
+
public String forTableCreate() {
return StringUtil.join("/", _baseUrl, "tables");
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceTest.java
new file mode 100644
index 0000000..9101f1e
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotBrokerRestletResourceTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class PinotBrokerRestletResourceTest extends ControllerTest {
+ private static final String TABLE_NAME_1 = "testTable1";
+ private static final String TABLE_NAME_2 = "testTable2";
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ startZk();
+ startController();
+ addFakeServerInstancesToAutoJoinHelixCluster(1, true);
+
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_OFFLINE").size(),
+ 1);
+ }
+
+ public void testGetBrokersHelper(String state, int onlineServers, int
offlineServers)
+ throws Exception {
+ List<String> expectedBrokers = new ArrayList<>();
+ if (state == null) {
+ for (int i = 0; i < onlineServers + offlineServers; i++) {
+ expectedBrokers.add("Broker_localhost_" + i);
+ }
+ } else {
+ switch (state) {
+ case "OFFLINE":
+ for (int i = onlineServers; i < onlineServers + offlineServers; i++)
{
+ expectedBrokers.add("Broker_localhost_" + i);
+ }
+ break;
+ default:
+ for (int i = 0; i < onlineServers; i++) {
+ expectedBrokers.add("Broker_localhost_" + i);
+ }
+ break;
+ }
+ }
+ Map<String, Map<String, List<String>>> allMap =
+
JsonUtils.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokersGet(state)),
Map.class);
+
+ for (String expectedBroker : expectedBrokers) {
+
Assert.assertTrue(allMap.get("tenants").get("DefaultTenant").contains(expectedBroker));
+
Assert.assertTrue(allMap.get("tables").get("testTable1").contains(expectedBroker));
+
Assert.assertTrue(allMap.get("tables").get("testTable2").contains(expectedBroker));
+ }
+
+ Map<String, List<String>> tenantsMap =
+
JsonUtils.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTenantsGet(state)),
Map.class);
+ for (String expectedBroker : expectedBrokers) {
+
Assert.assertTrue(tenantsMap.get("DefaultTenant").contains(expectedBroker));
+ }
+
+ List<String> tenantBrokers = JsonUtils
+
.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTenantGet("DefaultTenant",
state)),
+ List.class);
+ for (String expectedBroker : expectedBrokers) {
+ Assert.assertTrue(tenantBrokers.contains(expectedBroker));
+ }
+
+ try {
+
sendGetRequest(_controllerRequestURLBuilder.forBrokerTenantGet("nonExistTenant",
state));
+ Assert.fail("Shouldn't reach here");
+ } catch (Exception e) {
+ }
+
+ Map<String, List<String>> tablesMap =
+
JsonUtils.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTablesGet(state)),
Map.class);
+ for (String expectedBroker : expectedBrokers) {
+ Assert.assertTrue(tablesMap.get("testTable1").contains(expectedBroker));
+ Assert.assertTrue(tablesMap.get("testTable2").contains(expectedBroker));
+ }
+
+ List<String> tableBrokers = JsonUtils
+
.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("testTable1",
"OFFLINE", state)),
+ List.class);
+ for (String expectedBroker : expectedBrokers) {
+ Assert.assertTrue(tableBrokers.contains(expectedBroker));
+ }
+ tableBrokers = JsonUtils
+
.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("testTable1",
null, state)),
+ List.class);
+ for (String expectedBroker : expectedBrokers) {
+ Assert.assertTrue(tableBrokers.contains(expectedBroker));
+ }
+ tableBrokers = JsonUtils
+
.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("testTable2",
"OFFLINE", state)),
+ List.class);
+ for (String expectedBroker : expectedBrokers) {
+ Assert.assertTrue(tableBrokers.contains(expectedBroker));
+ }
+ tableBrokers = JsonUtils
+
.stringToObject(sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("testTable2",
null, state)),
+ List.class);
+ for (String expectedBroker : expectedBrokers) {
+ Assert.assertTrue(tableBrokers.contains(expectedBroker));
+ }
+ try {
+
sendGetRequest(_controllerRequestURLBuilder.forBrokerTableGet("nonExistTable",
null, state));
+ Assert.fail("Shouldn't reach here");
+ } catch (Exception e) {
+ }
+ }
+
+ @Test
+ public void testGetBrokers()
+ throws Exception {
+ addFakeBrokerInstancesToAutoJoinHelixCluster(10, true);
+
Assert.assertEquals(_helixAdmin.getInstancesInClusterWithTag(getHelixClusterName(),
"DefaultTenant_BROKER").size(),
+ 10);
+
+ // Adding table
+ _helixResourceManager
+ .addTable(new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_1).setNumReplicas(1).build());
+ _helixResourceManager
+ .addTable(new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_2).setNumReplicas(1).build());
+
+ // Wait for the table addition
+ while (!_helixResourceManager.hasOfflineTable(TABLE_NAME_1) &&
!_helixResourceManager
+ .hasOfflineTable(TABLE_NAME_2)) {
+ Thread.sleep(100);
+ }
+
+ testGetBrokersHelper(null, 10, 0);
+ testGetBrokersHelper("ONLINE", 10, 0);
+ testGetBrokersHelper("OFFLINE", 10, 0);
+ for (int i = 9; i >= 0; i--) {
+ stopFakeInstance(BROKER_INSTANCE_ID_PREFIX + i);
+ testGetBrokersHelper(null, i, 10 - i);
+ testGetBrokersHelper("ONLINE", i, 10 - i);
+ testGetBrokersHelper("OFFLINE", i, 10 - i);
+ }
+ }
+
+ @AfterClass
+ public void tearDown() {
+ stopFakeInstances();
+ stopController();
+ stopZk();
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 76c75fa..0513566 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -396,6 +396,16 @@ public abstract class ControllerTest {
_fakeInstanceHelixManagers.clear();
}
+ protected void stopFakeInstance(String instanceId) {
+ for (HelixManager helixManager : _fakeInstanceHelixManagers) {
+ if (helixManager.getInstanceName().equalsIgnoreCase(instanceId)) {
+ helixManager.disconnect();
+ _fakeInstanceHelixManagers.remove(helixManager);
+ return;
+ }
+ }
+ }
+
protected Schema createDummySchema(String tableName) {
Schema schema = new Schema();
schema.setSchemaName(tableName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]