sajjad-moradi commented on code in PR #8986:
URL: https://github.com/apache/pinot/pull/8986#discussion_r918209300
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -49,27 +50,71 @@
HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
@Path("/")
public class PinotRealtimeTableResource {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PinotRealtimeTableResource.class);
+
@Inject
PinotHelixResourceManager _pinotHelixResourceManager;
+ @Inject
+ PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+
+ @POST
+ @Path("/tables/{tableName}/pauseConsumption")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Pause consumption of a realtime table",
+ notes = "Pause the consumption of a realtime table")
+ public Response pauseConsumption(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName) {
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ validate(tableNameWithType);
+ try {
+ return
Response.ok(_pinotLLCRealtimeSegmentManager.pauseConsumption(tableNameWithType)).build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
@POST
@Path("/tables/{tableName}/resumeConsumption")
@Produces(MediaType.APPLICATION_JSON)
- @Consumes(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Resume the consumption of a realtime table",
- notes = "Resume the consumption of a realtime table")
- public String resumeConsumption(
- @ApiParam(value = "Name of the table", required = true)
- @PathParam("tableName") String tableName) throws JsonProcessingException
{
- // TODO: Add util method for invoking periodic tasks
+ @ApiOperation(value = "Resume consumption of a realtime table",
+ notes = "Resume the consumption for a realtime table")
+ public Response resumeConsumption(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName) {
String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
- Map<String, String> taskProperties = new HashMap<>();
-
taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY,
"true");
+ validate(tableNameWithType);
+ try {
+ return
Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType)).build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
- Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager
- .invokeControllerPeriodicTask(tableNameWithType,
Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);
+ @GET
+ @Path("/tables/{tableName}/pauseStatus")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Return pause status of a realtime table",
+ notes = "Return pause status of a realtime table along with list of
consuming segments.")
+ public Response getConsumptionStatus(
+ @ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName) {
+ String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+ validate(tableNameWithType);
+ try {
+ return
Response.ok().entity(_pinotLLCRealtimeSegmentManager.getPauseStatus(tableNameWithType)).build();
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
- return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft()
- + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() >
0) + "}";
+ private void validate(String tableNameWithType) {
+ IdealState idealState =
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+ if (idealState == null) {
+ throw new ControllerApplicationException(LOGGER, "Ideal State is null
for table " + tableNameWithType,
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]