This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d437d3a804 Extract one interface for Pinot Controller API protocol
(#8852)
d437d3a804 is described below
commit d437d3a80494828889be7b07d61ddce21a2d781e
Author: Xiaoman Dong <[email protected]>
AuthorDate: Thu Jun 16 15:56:22 2022 -0700
Extract one interface for Pinot Controller API protocol (#8852)
---
.../api/resources/PinotBrokerRestletResource.java | 135 +++----------
.../api/services/PinotBrokerService.java | 217 +++++++++++++++++++++
2 files changed, 241 insertions(+), 111 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
index 758d7860f1..347b850ecd 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotBrokerRestletResource.java
@@ -19,15 +19,6 @@
package org.apache.pinot.controller.api.resources;
import com.google.common.collect.ImmutableList;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiKeyAuthDefinition;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiParam;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
-import io.swagger.annotations.Authorization;
-import io.swagger.annotations.SecurityDefinition;
-import io.swagger.annotations.SwaggerDefinition;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -35,133 +26,81 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.inject.Inject;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.common.exception.TableNotFoundException;
-import org.apache.pinot.controller.api.access.AccessType;
-import org.apache.pinot.controller.api.access.Authenticate;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.api.services.PinotBrokerService;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
-
-@Api(tags = Constants.BROKER_TAG, authorizations = {@Authorization(value =
SWAGGER_AUTHORIZATION_KEY)})
-@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
- HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
@Path("/")
-public class PinotBrokerRestletResource {
+public class PinotBrokerRestletResource implements PinotBrokerService {
public static final Logger LOGGER =
LoggerFactory.getLogger(PinotBrokerRestletResource.class);
@Inject
PinotHelixResourceManager _pinotHelixResourceManager;
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/brokers")
- @ApiOperation(value = "List tenants and tables to brokers mappings",
- notes = "List tenants and tables to brokers mappings")
- public Map<String, Map<String, List<String>>> listBrokersMapping(
- @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ @Override
+ public Map<String, Map<String, List<String>>> listBrokersMapping(String
state) {
Map<String, Map<String, List<String>>> resultMap = new HashMap<>();
resultMap.put("tenants", getTenantsToBrokersMapping(state));
resultMap.put("tables", getTablesToBrokersMapping(state));
return resultMap;
}
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/brokers/tenants")
- @ApiOperation(value = "List tenants to brokers mappings", notes = "List
tenants to brokers mappings")
- public Map<String, List<String>> getTenantsToBrokersMapping(
- @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ @Override
+ public Map<String, List<String>> getTenantsToBrokersMapping(String state) {
Map<String, List<String>> resultMap = new HashMap<>();
_pinotHelixResourceManager.getAllBrokerTenantNames().stream()
.forEach(tenant -> resultMap.put(tenant, getBrokersForTenant(tenant,
state)));
return resultMap;
}
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/brokers/tenants/{tenantName}")
- @ApiOperation(value = "List brokers for a given tenant", notes = "List
brokers for a given tenant")
- public List<String> getBrokersForTenant(
- @ApiParam(value = "Name of the tenant", required = true)
@PathParam("tenantName") String tenantName,
- @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ @Override
+ public List<String> getBrokersForTenant(String tenantName, String state) {
List<InstanceInfo> instanceInfoList = getBrokersForTenantV2(tenantName,
state);
List<String> tenantBrokers =
instanceInfoList.stream().map(InstanceInfo::getInstanceName).collect(Collectors.toList());
return tenantBrokers;
}
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/brokers/tables")
- @ApiOperation(value = "List tables to brokers mappings", notes = "List
tables to brokers mappings")
- public Map<String, List<String>> getTablesToBrokersMapping(
- @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ @Override
+ public Map<String, List<String>> getTablesToBrokersMapping(String state) {
Map<String, List<String>> resultMap = new HashMap<>();
_pinotHelixResourceManager.getAllRawTables().stream()
.forEach(table -> resultMap.put(table, getBrokersForTable(table, null,
state)));
return resultMap;
}
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/brokers/tables/{tableName}")
- @ApiOperation(value = "List brokers for a given table", notes = "List
brokers for a given table")
- public List<String> getBrokersForTable(
- @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
- @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
- @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ @Override
+ public List<String> getBrokersForTable(String tableName, String
tableTypeStr, String state) {
List<InstanceInfo> instanceInfoList = getBrokersForTableV2(tableName,
tableTypeStr, state);
return
instanceInfoList.stream().map(InstanceInfo::getInstanceName).collect(Collectors.toList());
}
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/v2/brokers")
- @ApiOperation(value = "List tenants and tables to brokers mappings",
- notes = "List tenants and tables to brokers mappings")
- public Map<String, Map<String, List<InstanceInfo>>> listBrokersMappingV2(
- @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ @Override
+ public Map<String, Map<String, List<InstanceInfo>>>
listBrokersMappingV2(String state) {
Map<String, Map<String, List<InstanceInfo>>> resultMap = new HashMap<>();
resultMap.put("tenants", getTenantsToBrokersMappingV2(state));
resultMap.put("tables", getTablesToBrokersMappingV2(state));
return resultMap;
}
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/v2/brokers/tenants")
- @ApiOperation(value = "List tenants to brokers mappings", notes = "List
tenants to brokers mappings")
- public Map<String, List<InstanceInfo>> getTenantsToBrokersMappingV2(
- @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ @Override
+ public Map<String, List<InstanceInfo>> getTenantsToBrokersMappingV2(String
state) {
Map<String, List<InstanceInfo>> resultMap = new HashMap<>();
_pinotHelixResourceManager.getAllBrokerTenantNames().stream()
.forEach(tenant -> resultMap.put(tenant, getBrokersForTenantV2(tenant,
state)));
return resultMap;
}
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/v2/brokers/tenants/{tenantName}")
- @ApiOperation(value = "List brokers for a given tenant", notes = "List
brokers for a given tenant")
- public List<InstanceInfo> getBrokersForTenantV2(
- @ApiParam(value = "Name of the tenant", required = true)
@PathParam("tenantName") String tenantName,
- @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ @Override
+ public List<InstanceInfo> getBrokersForTenantV2(String tenantName, String
state) {
if
(!_pinotHelixResourceManager.getAllBrokerTenantNames().contains(tenantName)) {
throw new ControllerApplicationException(LOGGER, String.format("Tenant
'%s' not found.", tenantName),
Response.Status.NOT_FOUND);
@@ -175,26 +114,16 @@ public class PinotBrokerRestletResource {
return ImmutableList.copyOf(instanceInfoSet);
}
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/v2/brokers/tables")
- @ApiOperation(value = "List tables to brokers mappings", notes = "List
tables to brokers mappings")
- public Map<String, List<InstanceInfo>> getTablesToBrokersMappingV2(
- @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ @Override
+ public Map<String, List<InstanceInfo>> getTablesToBrokersMappingV2(String
state) {
Map<String, List<InstanceInfo>> resultMap = new HashMap<>();
_pinotHelixResourceManager.getAllRawTables().stream()
.forEach(table -> resultMap.put(table, getBrokersForTableV2(table,
null, state)));
return resultMap;
}
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/v2/brokers/tables/{tableName}")
- @ApiOperation(value = "List brokers for a given table", notes = "List
brokers for a given table")
- public List<InstanceInfo> getBrokersForTableV2(
- @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
- @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
- @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state) {
+ @Override
+ public List<InstanceInfo> getBrokersForTableV2(String tableName, String
tableTypeStr, String state) {
try {
List<String> tableNamesWithType = _pinotHelixResourceManager
.getExistingTableNamesWithType(tableName,
Constants.validateTableType(tableTypeStr));
@@ -217,24 +146,8 @@ public class PinotBrokerRestletResource {
}
}
- @POST
- @Path("/brokers/instances/{instanceName}/qps")
- @Authenticate(AccessType.UPDATE)
- @Produces(MediaType.APPLICATION_JSON)
- @Consumes(MediaType.TEXT_PLAIN)
- @ApiOperation(value = "Enable/disable the query rate limiting for a broker
instance",
- notes = "Enable/disable the query rate limiting for a broker instance")
- @ApiResponses(value = {
- @ApiResponse(code = 200, message = "Success"),
- @ApiResponse(code = 400, message = "Bad Request"),
- @ApiResponse(code = 404, message = "Instance not found"),
- @ApiResponse(code = 500, message = "Internal error")
- })
- public SuccessResponse toggleQueryRateLimiting(
- @ApiParam(value = "Broker instance name", required = true, example =
"Broker_my.broker.com_30000")
- @PathParam("instanceName") String brokerInstanceName,
- @ApiParam(value = "ENABLE|DISABLE", allowableValues = "ENABLE, DISABLE",
required = true) @QueryParam("state")
- String state) {
+ @Override
+ public SuccessResponse toggleQueryRateLimiting(String brokerInstanceName,
String state) {
if (brokerInstanceName == null ||
!brokerInstanceName.startsWith("Broker_")) {
throw new ControllerApplicationException(LOGGER,
String.format("'%s' is not a valid broker instance name.",
brokerInstanceName), Response.Status.BAD_REQUEST);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/services/PinotBrokerService.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/services/PinotBrokerService.java
new file mode 100644
index 0000000000..bf25dcf71a
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/services/PinotBrokerService.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.services;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import org.apache.pinot.controller.api.access.AccessType;
+import org.apache.pinot.controller.api.access.Authenticate;
+import org.apache.pinot.controller.api.resources.Constants;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.api.resources.SuccessResponse;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+/**
+ * The interface defining the broker instance operations in Pinot Controller
+ */
+@Api(tags = Constants.BROKER_TAG, authorizations = {@Authorization(value =
CommonConstants.SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = @SecurityDefinition(
+ apiKeyAuthDefinitions = @ApiKeyAuthDefinition(
+ name = HttpHeaders.AUTHORIZATION,
+ in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
+ key = CommonConstants.SWAGGER_AUTHORIZATION_KEY)))
+@Path("/")
+public interface PinotBrokerService {
+
+ /**
+ * List tenants and tables to brokers mappings
+ * @param state whether the broker is online or offline
+ * @return the Map of tenants and tables to brokers mappings
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers")
+ @ApiOperation(value = "List tenants and tables to brokers mappings",
+ notes = "List tenants and tables to brokers mappings")
+ Map<String, Map<String, List<String>>> listBrokersMapping(
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state);
+
+ /**
+ * List tenants to brokers mappings
+ * @param state whether the brokers are online or offline
+ * @return the map of tenants to brokers mappings
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers/tenants")
+ @ApiOperation(value = "List tenants to brokers mappings", notes = "List
tenants to brokers mappings")
+ Map<String, List<String>> getTenantsToBrokersMapping(
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state);
+
+ /**
+ * List brokers for a given tenant
+ * @param tenantName Name of the tenant
+ * @param state whether the brokers are online or offline
+ * @return brokers for a given tenant
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers/tenants/{tenantName}")
+ @ApiOperation(value = "List brokers for a given tenant", notes = "List
brokers for a given tenant")
+ List<String> getBrokersForTenant(
+ @ApiParam(value = "Name of the tenant", required = true)
@PathParam("tenantName") String tenantName,
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state);
+
+ /**
+ * List tables to brokers mappings
+ * @param state whether the brokers are online or offline
+ * @return tables to brokers mapping
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers/tables")
+ @ApiOperation(value = "List tables to brokers mappings", notes = "List
tables to brokers mappings")
+ Map<String, List<String>> getTablesToBrokersMapping(
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state);
+
+ /**
+ * List brokers for a given table
+ * @param tableName Name of the table
+ * @param tableTypeStr Whether table is OFFLINE or REALTIME
+ * @param state whether the brokers are online or offline
+ * @return brokers for a given table
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/brokers/tables/{tableName}")
+ @ApiOperation(value = "List brokers for a given table", notes = "List
brokers for a given table")
+ List<String> getBrokersForTable(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
+ @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state);
+
+ /**
+ * List tenants and tables to brokers mappings
+ * @param state whether the brokers are online or offline
+ * @return tenants and tables to brokers mappings
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/v2/brokers")
+ @ApiOperation(value = "List tenants and tables to brokers mappings",
+ notes = "List tenants and tables to brokers mappings")
+ Map<String, Map<String, List<InstanceInfo>>> listBrokersMappingV2(
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state);
+
+ /**
+ * List tenants to brokers mappings
+ * @param state whether the brokers are online or offline
+ * @return tenants to brokers mappings
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/v2/brokers/tenants")
+ @ApiOperation(value = "List tenants to brokers mappings", notes = "List
tenants to brokers mappings")
+ Map<String, List<InstanceInfo>> getTenantsToBrokersMappingV2(
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state);
+
+ /**
+ * List brokers for a given tenant
+ * @param tenantName Name of the tenant
+ * @param state whether the brokers are online or offline
+ * @return brokers for a given tenant
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/v2/brokers/tenants/{tenantName}")
+ @ApiOperation(value = "List brokers for a given tenant", notes = "List
brokers for a given tenant")
+ List<InstanceInfo> getBrokersForTenantV2(
+ @ApiParam(value = "Name of the tenant", required = true)
@PathParam("tenantName") String tenantName,
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state);
+
+ /**
+ * List tables to brokers mappings
+ * @param state whether the brokers are online or offline
+ * @return tables to brokers mappings
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/v2/brokers/tables")
+ @ApiOperation(value = "List tables to brokers mappings", notes = "List
tables to brokers mappings")
+ Map<String, List<InstanceInfo>> getTablesToBrokersMappingV2(
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state);
+
+ /**
+ * List brokers for a given table
+ * @param tableName Name of the table
+ * @param tableTypeStr wheter the table is OFFLINE or REALTIME
+ * @param state whether the brokers are online or offline
+ * @return brokers for a given table
+ */
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/v2/brokers/tables/{tableName}")
+ @ApiOperation(value = "List brokers for a given table", notes = "List
brokers for a given table")
+ List<InstanceInfo> getBrokersForTableV2(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
+ @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
+ @ApiParam(value = "ONLINE|OFFLINE") @QueryParam("state") String state);
+
+ /**
+ * Enable/disable the query rate limiting for a broker instance
+ * @param brokerInstanceName Broker instance name
+ * @param state whether the brokers are online or offline
+ * @return {@link org.apache.pinot.controller.api.resources.SuccessResponse}
object indicating the operation results
+ */
+ @POST
+ @Path("/brokers/instances/{instanceName}/qps")
+ @Authenticate(AccessType.UPDATE)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Consumes(MediaType.TEXT_PLAIN)
+ @ApiOperation(value = "Enable/disable the query rate limiting for a broker
instance",
+ notes = "Enable/disable the query rate limiting for a broker instance")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 400,
message = "Bad Request"),
+ @ApiResponse(code = 404, message = "Instance not found"),
@ApiResponse(code = 500, message = "Internal error")
+ })
+ SuccessResponse toggleQueryRateLimiting(
+ @ApiParam(value = "Broker instance name", required = true, example =
"Broker_my.broker.com_30000")
+ @PathParam("instanceName") String brokerInstanceName,
+ @ApiParam(value = "ENABLE|DISABLE", allowableValues = "ENABLE, DISABLE",
required = true) @QueryParam("state")
+ String state);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]