This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 441d335dcf0 Isolate potentially long-running minion task API resources
on controller (#17494)
441d335dcf0 is described below
commit 441d335dcf04647c34353aec9213c8361f4009a0
Author: Jhow <[email protected]>
AuthorDate: Tue Feb 3 14:35:15 2026 -0800
Isolate potentially long-running minion task API resources on controller
(#17494)
---
.../pinot/common/metrics/ControllerGauge.java | 4 +-
.../api/ControllerAdminApiApplication.java | 46 +-
.../pinot/controller/api/resources/Constants.java | 2 +
.../PinotSegmentUploadDownloadRestletResource.java | 61 +--
.../api/resources/PinotTaskRestletResource.java | 470 +++++++++++++++------
.../resources/PinotTaskRestletResourceTest.java | 54 ++-
6 files changed, 464 insertions(+), 173 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 0e95f1915e5..2cb716a0af1 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -221,7 +221,9 @@ public enum ControllerGauge implements
AbstractMetrics.Gauge {
// The progress of a certain table rebalance job of a table
TABLE_REBALANCE_JOB_PROGRESS_PERCENT("percent", false),
// HTTP thread utilization
- HTTP_THREAD_UTILIZATION("httpThreadUtilization", true);
+ HTTP_THREAD_UTILIZATION("httpThreadUtilization", true),
+ // Track the concurrent executions of the API resources that use
@ManagedAsync
+ MANAGED_ASYNC_ACTIVE_THREADS("threads", true);
private final String _gaugeName;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/ControllerAdminApiApplication.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/ControllerAdminApiApplication.java
index 74abcbf813d..dddaba1881d 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/ControllerAdminApiApplication.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/ControllerAdminApiApplication.java
@@ -18,15 +18,20 @@
*/
package org.apache.pinot.controller.api;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.swagger.jaxrs.listing.SwaggerSerializers;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ContainerResponseFilter;
+import javax.ws.rs.ext.Provider;
import org.apache.pinot.common.audit.AuditLogFilter;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
@@ -50,7 +55,9 @@ import org.glassfish.grizzly.threadpool.ThreadPoolProbe;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.jackson.JacksonFeature;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
+import org.glassfish.jersey.server.ManagedAsyncExecutor;
import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.spi.ExecutorServiceProvider;
public class ControllerAdminApiApplication extends ResourceConfig {
@@ -62,6 +69,7 @@ public class ControllerAdminApiApplication extends
ResourceConfig {
private final boolean _useHttps;
private final boolean _enableSwagger;
private HttpServer _httpServer;
+ private final ThreadPoolExecutor _managedAsyncExecutor;
public ControllerAdminApiApplication(ControllerConf conf) {
super();
@@ -83,6 +91,8 @@ public class ControllerAdminApiApplication extends
ResourceConfig {
register(new CorsFilter());
register(AuthenticationFilter.class);
register(AuditLogFilter.class);
+ _managedAsyncExecutor = createManagedAsyncExecutor();
+ register(new ManagedAsyncExecutorServiceProvider(_managedAsyncExecutor));
// property("jersey.config.server.tracing.type", "ALL");
// property("jersey.config.server.tracing.threshold", "VERBOSE");
}
@@ -117,6 +127,7 @@ public class ControllerAdminApiApplication extends
ResourceConfig {
.addHttpHandler(new CLStaticHttpHandler(classLoader,
"/webapp/images/"), "/images/");
_httpServer.getServerConfiguration().addHttpHandler(new
CLStaticHttpHandler(classLoader, "/webapp/js/"), "/js/");
registerHttpThreadUtilizationGauge(controllerMetrics);
+ registerManagedAsyncThreadGauges(controllerMetrics);
}
public void stop() {
@@ -124,6 +135,7 @@ public class ControllerAdminApiApplication extends
ResourceConfig {
return;
}
_httpServer.shutdownNow();
+ _managedAsyncExecutor.shutdown();
}
private class CorsFilter implements ContainerResponseFilter {
@@ -145,8 +157,8 @@ public class ControllerAdminApiApplication extends
ResourceConfig {
}
/**
- * Registers a gauge that tracks HTTP thread pool utilization without using
reflection.
- * Instead, it uses a custom ThreadPoolProbe to count active threads.
+ * Registers a gauge that tracks HTTP thread pool utilization without using
reflection.
+ * Instead, it uses a custom ThreadPoolProbe to count active threads.
*/
private void registerHttpThreadUtilizationGauge(ControllerMetrics metrics) {
NetworkListener listener = _httpServer.getListeners().iterator().next();
@@ -170,6 +182,16 @@ public class ControllerAdminApiApplication extends
ResourceConfig {
});
}
+ private void registerManagedAsyncThreadGauges(ControllerMetrics metrics) {
+
metrics.setOrUpdateGauge(ControllerGauge.MANAGED_ASYNC_ACTIVE_THREADS.getGaugeName(),
+ () -> (long) _managedAsyncExecutor.getActiveCount());
+ }
+
+ private ThreadPoolExecutor createManagedAsyncExecutor() {
+ ThreadFactory threadFactory = new
ThreadFactoryBuilder().setNameFormat("managed-async-%d").build();
+ return (ThreadPoolExecutor) Executors.newCachedThreadPool(threadFactory);
+ }
+
/**
* Custom probe to track busy threads in Grizzly thread pools without using
reflection.
*/
@@ -193,4 +215,24 @@ public class ControllerAdminApiApplication extends
ResourceConfig {
return _active.get();
}
}
+
+ @Provider
+ @ManagedAsyncExecutor
+ private static final class ManagedAsyncExecutorServiceProvider implements
ExecutorServiceProvider {
+ private final ExecutorService _executorService;
+
+ private ManagedAsyncExecutorServiceProvider(ExecutorService
executorService) {
+ _executorService = executorService;
+ }
+
+ @Override
+ public ExecutorService getExecutorService() {
+ return _executorService;
+ }
+
+ @Override
+ public void dispose(ExecutorService executorService) {
+ // managed in ControllerAdminApiApplication.stop()
+ }
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
index ef30166c5d8..77964ada533 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/Constants.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.api.resources;
+import javax.annotation.Nullable;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
@@ -63,6 +64,7 @@ public class Constants {
public static final String REALTIME_SEGMENT_VALIDATION_MANAGER =
"RealtimeSegmentValidationManager";
public static final String REALTIME_OFFSET_AUTO_RESET_MANAGER =
"RealtimeOffsetAutoResetManager";
+ @Nullable
public static TableType validateTableType(String tableTypeStr) {
if (StringUtils.isBlank(tableTypeStr)) {
return null;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index bb9c2151410..75ff7eb298f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -126,15 +126,18 @@ import static
org.apache.pinot.spi.utils.CommonConstants.DATABASE;
import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
-@Api(tags = Constants.SEGMENT_TAG, authorizations = {@Authorization(value =
SWAGGER_AUTHORIZATION_KEY),
- @Authorization(value = DATABASE)})
+@Api(tags = Constants.SEGMENT_TAG, authorizations = {
+ @Authorization(value = SWAGGER_AUTHORIZATION_KEY),
+ @Authorization(value = DATABASE)
+})
@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = {
@ApiKeyAuthDefinition(name = HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER,
key = SWAGGER_AUTHORIZATION_KEY,
description = "The format of the key is ```\"Basic <token>\" or
\"Bearer <token>\"```"),
@ApiKeyAuthDefinition(name = DATABASE, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = DATABASE,
description = "Database context passed through http header. If no
context is provided 'default' database "
- + "context will be considered.")}))
+ + "context will be considered.")
+}))
@Path("/")
public class PinotSegmentUploadDownloadRestletResource {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotSegmentUploadDownloadRestletResource.class);
@@ -580,7 +583,7 @@ public class PinotSegmentUploadDownloadRestletResource {
try {
int entryCount = 0;
- for (Map.Entry<String, SegmentMetadataInfo> entry:
segmentsMetadataInfoMap.entrySet()) {
+ for (Map.Entry<String, SegmentMetadataInfo> entry :
segmentsMetadataInfoMap.entrySet()) {
String segmentName = entry.getKey();
SegmentMetadataInfo segmentMetadataInfo = entry.getValue();
segmentNames.add(segmentName);
@@ -808,7 +811,7 @@ public class PinotSegmentUploadDownloadRestletResource {
boolean enableParallelPushProtection,
@ApiParam(value = "Whether to refresh if the segment already exists")
@DefaultValue("true")
@QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH)
boolean allowRefresh,
- @Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
+ @Context HttpHeaders headers, @Context Request request, @Suspended
AsyncResponse asyncResponse) {
try {
asyncResponse.resume(uploadSegment(tableName,
TableType.valueOf(tableType.toUpperCase()), null, false,
enableParallelPushProtection, allowRefresh, headers, request));
@@ -847,7 +850,7 @@ public class PinotSegmentUploadDownloadRestletResource {
boolean enableParallelPushProtection,
@ApiParam(value = "Whether to refresh if the segment already exists")
@DefaultValue("true")
@QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH)
boolean allowRefresh,
- @Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
+ @Context HttpHeaders headers, @Context Request request, @Suspended
AsyncResponse asyncResponse) {
try {
asyncResponse.resume(uploadSegment(tableName,
TableType.valueOf(tableType.toUpperCase()), multiPart, true,
enableParallelPushProtection, allowRefresh, headers, request));
@@ -895,7 +898,7 @@ public class PinotSegmentUploadDownloadRestletResource {
boolean allowRefresh,
@Context HttpHeaders headers,
@Context Request request,
- @Suspended final AsyncResponse asyncResponse) {
+ @Suspended AsyncResponse asyncResponse) {
if (StringUtils.isEmpty(tableName)) {
throw new ControllerApplicationException(LOGGER,
"tableName is a required field while uploading segments in batch
mode.", Response.Status.BAD_REQUEST);
@@ -949,7 +952,7 @@ public class PinotSegmentUploadDownloadRestletResource {
boolean enableParallelPushProtection,
@ApiParam(value = "Whether to refresh if the segment already exists")
@DefaultValue("true")
@QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH)
boolean allowRefresh,
- @Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
+ @Context HttpHeaders headers, @Context Request request, @Suspended
AsyncResponse asyncResponse) {
try {
asyncResponse.resume(
uploadSegment(tableName, TableType.valueOf(tableType.toUpperCase()),
null, true, enableParallelPushProtection,
@@ -989,7 +992,7 @@ public class PinotSegmentUploadDownloadRestletResource {
boolean enableParallelPushProtection,
@ApiParam(value = "Whether to refresh if the segment already exists")
@DefaultValue("true")
@QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH)
boolean allowRefresh,
- @Context HttpHeaders headers, @Context Request request, @Suspended final
AsyncResponse asyncResponse) {
+ @Context HttpHeaders headers, @Context Request request, @Suspended
AsyncResponse asyncResponse) {
try {
asyncResponse.resume(uploadSegment(tableName,
TableType.valueOf(tableType.toUpperCase()), multiPart, true,
enableParallelPushProtection, allowRefresh, headers, request));
@@ -1004,12 +1007,14 @@ public class PinotSegmentUploadDownloadRestletResource {
@Authenticate(AccessType.UPDATE)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Start to replace segments", notes = "Start to replace
segments")
- public Response startReplaceSegments(
+ @ManagedAsync
+ public void startReplaceSegments(
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr,
@ApiParam(value = "Force cleanup") @QueryParam("forceCleanup")
@DefaultValue("false") boolean forceCleanup,
@ApiParam(value = "Fields belonging to start replace segment request",
required = true)
- StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Context
HttpHeaders headers) {
+ StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Context
HttpHeaders headers,
+ @Suspended AsyncResponse asyncResponse) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
@@ -1022,10 +1027,12 @@ public class PinotSegmentUploadDownloadRestletResource {
String segmentLineageEntryId =
_pinotHelixResourceManager.startReplaceSegments(tableNameWithType,
startReplaceSegmentsRequest.getSegmentsFrom(),
startReplaceSegmentsRequest.getSegmentsTo(), forceCleanup,
startReplaceSegmentsRequest.getCustomMap());
- return
Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId",
segmentLineageEntryId)).build();
+ asyncResponse.resume(
+ Response.ok(JsonUtils.newObjectNode().put("segmentLineageEntryId",
segmentLineageEntryId)).build());
} catch (Exception e) {
_controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.NUMBER_START_REPLACE_FAILURE, 1);
- throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ asyncResponse.resume(
+ new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e));
}
}
@@ -1035,7 +1042,8 @@ public class PinotSegmentUploadDownloadRestletResource {
@Authenticate(AccessType.UPDATE)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "End to replace segments", notes = "End to replace
segments")
- public Response endReplaceSegments(
+ @ManagedAsync
+ public void endReplaceSegments(
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr,
@ApiParam(value = "Segment lineage entry id returned by
startReplaceSegments API", required = true)
@@ -1043,7 +1051,8 @@ public class PinotSegmentUploadDownloadRestletResource {
@ApiParam(value = "Trigger an immediate segment cleanup")
@QueryParam("cleanup") @DefaultValue("false")
boolean cleanupSegments,
@ApiParam(value = "Fields belonging to end replace segment request")
- EndReplaceSegmentsRequest endReplaceSegmentsRequest, @Context
HttpHeaders headers) {
+ EndReplaceSegmentsRequest endReplaceSegmentsRequest, @Context
HttpHeaders headers,
+ @Suspended AsyncResponse asyncResponse) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
@@ -1060,10 +1069,11 @@ public class PinotSegmentUploadDownloadRestletResource {
if (cleanupSegments) {
_pinotHelixResourceManager.invokeControllerPeriodicTask(tableNameWithType,
RetentionManager.TASK_NAME, null);
}
- return Response.ok().build();
+ asyncResponse.resume(Response.ok().build());
} catch (Exception e) {
_controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.NUMBER_END_REPLACE_FAILURE, 1);
- throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ asyncResponse.resume(
+ new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e));
}
}
@@ -1073,7 +1083,8 @@ public class PinotSegmentUploadDownloadRestletResource {
@Authenticate(AccessType.UPDATE)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Revert segments replacement", notes = "Revert
segments replacement")
- public Response revertReplaceSegments(
+ @ManagedAsync
+ public void revertReplaceSegments(
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr,
@ApiParam(value = "Segment lineage entry id to revert", required = true)
@QueryParam("segmentLineageEntryId")
@@ -1081,7 +1092,8 @@ public class PinotSegmentUploadDownloadRestletResource {
@ApiParam(value = "Force revert in case the user knows that the lineage
entry is interrupted")
@QueryParam("forceRevert") @DefaultValue("false") boolean forceRevert,
@ApiParam(value = "Fields belonging to revert replace segment request")
- RevertReplaceSegmentsRequest revertReplaceSegmentsRequest, @Context
HttpHeaders headers) {
+ RevertReplaceSegmentsRequest revertReplaceSegmentsRequest, @Context
HttpHeaders headers,
+ @Suspended AsyncResponse asyncResponse) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
TableType tableType = Constants.validateTableType(tableTypeStr);
if (tableType == null) {
@@ -1095,10 +1107,11 @@ public class PinotSegmentUploadDownloadRestletResource {
Preconditions.checkNotNull(segmentLineageEntryId,
"'segmentLineageEntryId' should not be null");
_pinotHelixResourceManager.revertReplaceSegments(tableNameWithType,
segmentLineageEntryId, forceRevert,
revertReplaceSegmentsRequest);
- return Response.ok().build();
+ asyncResponse.resume(Response.ok().build());
} catch (Exception e) {
_controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.NUMBER_REVERT_REPLACE_FAILURE, 1);
- throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ asyncResponse.resume(
+ new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.INTERNAL_SERVER_ERROR, e));
}
}
@@ -1184,8 +1197,8 @@ public class PinotSegmentUploadDownloadRestletResource {
}
// The multipart input would contain a single multipart and this part would
contain the segment metadata
- // files (creation.meta, metadata.properties), and an additional mapping
file names 'all_segments_metadata' which
- // would contain the mappings from segment names to segment download URI's.
+// files (creation.meta, metadata.properties), and an additional mapping file
names 'all_segments_metadata' which
+// would contain the mappings from segment names to segment download URI's.
private static Map<String, SegmentMetadataInfo>
createSegmentsMetadataInfoMap(FormDataMultiPart multiPart) {
List<BodyPart> bodyParts = multiPart.getBodyParts();
validateMultiPartForBatchSegmentUpload(bodyParts);
@@ -1217,7 +1230,7 @@ public class PinotSegmentUploadDownloadRestletResource {
}
Map<String, SegmentMetadataInfo> segmentsMetadataInfoMap = new HashMap<>();
- for (File file: segmentsMetadataFiles) {
+ for (File file : segmentsMetadataFiles) {
String fileName = file.getName();
if
(fileName.equalsIgnoreCase(SegmentUploadConstants.ALL_SEGMENTS_METADATA_FILENAME))
{
try (InputStream inputStream = FileUtils.openInputStream(file)) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index a1c4e7912da..78d70bb7790 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -180,8 +180,14 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("List all task types")
- public Set<String> listTaskTypes() {
- return _pinotHelixTaskResourceManager.getTaskTypes();
+ @ManagedAsync
+ public void listTaskTypes(@Suspended AsyncResponse asyncResponse) {
+ try {
+ Set<String> taskTypes = _pinotHelixTaskResourceManager.getTaskTypes();
+ asyncResponse.resume(taskTypes);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -190,10 +196,18 @@ public class PinotTaskRestletResource {
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Get summary of all tasks across all task types, grouped by
tenant. "
+ "Optionally filter by server tenant name to get tasks for a specific
tenant only.")
- public PinotHelixTaskResourceManager.TaskSummaryResponse getTasksSummary(
+ @ManagedAsync
+ public void getTasksSummary(
@ApiParam(value = "Server tenant name to filter tasks. If not specified,
returns all tenants grouped.")
- @QueryParam("tenant") @Nullable String tenantName) {
- return _pinotHelixTaskResourceManager.getTasksSummary(tenantName);
+ @QueryParam("tenant") @Nullable String tenantName,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ PinotHelixTaskResourceManager.TaskSummaryResponse response =
+ _pinotHelixTaskResourceManager.getTasksSummary(tenantName);
+ asyncResponse.resume(response);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -201,9 +215,16 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Get the state (task queue state) for the given task type")
- public TaskState getTaskQueueState(
- @ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType) {
- return _pinotHelixTaskResourceManager.getTaskQueueState(taskType);
+ @ManagedAsync
+ public void getTaskQueueState(
+ @ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ TaskState state =
_pinotHelixTaskResourceManager.getTaskQueueState(taskType);
+ asyncResponse.resume(state);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -211,13 +232,18 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("List all tasks for the given task type")
- public Set<String> getTasks(@ApiParam(value = "Task type", required = true)
@PathParam("taskType") String taskType) {
- Set<String> tasks = _pinotHelixTaskResourceManager.getTasks(taskType);
- if (tasks == null) {
- throw new NotFoundException("No tasks found for task type: " + taskType);
+ @ManagedAsync
+ public void getTasks(@ApiParam(value = "Task type", required = true)
@PathParam("taskType") String taskType,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ Set<String> tasks = _pinotHelixTaskResourceManager.getTasks(taskType);
+ if (tasks == null) {
+ throw new NotFoundException("No tasks found for task type: " +
taskType);
+ }
+ asyncResponse.resume(tasks);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
}
-
- return tasks;
}
@GET
@@ -225,8 +251,15 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK_COUNT)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Count of all tasks for the given task type")
- public int getTasksCount(@ApiParam(value = "Task type", required = true)
@PathParam("taskType") String taskType) {
- return _pinotHelixTaskResourceManager.getTasks(taskType).size();
+ @ManagedAsync
+ public void getTasksCount(@ApiParam(value = "Task type", required = true)
@PathParam("taskType") String taskType,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ int count = _pinotHelixTaskResourceManager.getTasks(taskType).size();
+ asyncResponse.resume(count);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -234,12 +267,19 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("List all tasks for the given task type")
- public Map<String, TaskState> getTaskStatesByTable(
+ @ManagedAsync
+ public void getTaskStatesByTable(
@ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
@ApiParam(value = "Table name with type", required = true)
@PathParam("tableNameWithType")
- String tableNameWithType, @Context HttpHeaders headers) {
- tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
- return _pinotHelixTaskResourceManager.getTaskStatesByTable(taskType,
tableNameWithType);
+ String tableNameWithType, @Context HttpHeaders headers, @Suspended
AsyncResponse asyncResponse) {
+ try {
+ String translatedTableName =
DatabaseUtils.translateTableName(tableNameWithType, headers);
+ Map<String, TaskState> states =
+ _pinotHelixTaskResourceManager.getTaskStatesByTable(taskType,
translatedTableName);
+ asyncResponse.resume(states);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -274,7 +314,8 @@ public class PinotTaskRestletResource {
tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
_pinotHelixTaskResourceManager.deleteTaskMetadataByTable(taskType,
tableNameWithType);
return new SuccessResponse(
- String.format("Successfully deleted metadata for task type: %s from
table: %s", taskType, tableNameWithType));
+ String.format("Successfully deleted metadata for task type: %s from
table: %s", taskType,
+ tableNameWithType));
}
@GET
@@ -282,7 +323,8 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Fetch count of sub-tasks for each of the tasks for the given
task type")
- public Map<String, PinotHelixTaskResourceManager.TaskCount> getTaskCounts(
+ @ManagedAsync
+ public void getTaskCounts(
@ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
@ApiParam(value = "Task state(s) to filter by. Can be single state or
comma-separated multiple states "
+ "(NOT_STARTED, IN_PROGRESS, STOPPED, STOPPING, FAILED, COMPLETED,
ABORTED, TIMED_OUT, TIMING_OUT, "
@@ -290,13 +332,19 @@ public class PinotTaskRestletResource {
@QueryParam("state") @Nullable String state,
@ApiParam(value = "Table name with type (e.g., 'myTable_OFFLINE') to
filter tasks by table. "
+ "Only tasks that have subtasks for this table will be returned.")
- @QueryParam("table") @Nullable String table, @Context HttpHeaders
headers) {
- String tableNameWithType = table != null ?
DatabaseUtils.translateTableName(table, headers) : null;
-
- if (StringUtils.isNotEmpty(state) ||
StringUtils.isNotEmpty(tableNameWithType)) {
- return _pinotHelixTaskResourceManager.getTaskCounts(taskType, state,
tableNameWithType);
- } else {
- return _pinotHelixTaskResourceManager.getTaskCounts(taskType);
+ @QueryParam("table") @Nullable String table, @Context HttpHeaders
headers,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ String tableNameWithType = table != null ?
DatabaseUtils.translateTableName(table, headers) : null;
+ Map<String, PinotHelixTaskResourceManager.TaskCount> counts;
+ if (StringUtils.isNotEmpty(state) ||
StringUtils.isNotEmpty(tableNameWithType)) {
+ counts = _pinotHelixTaskResourceManager.getTaskCounts(taskType, state,
tableNameWithType);
+ } else {
+ counts = _pinotHelixTaskResourceManager.getTaskCounts(taskType);
+ }
+ asyncResponse.resume(counts);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
}
}
@@ -305,13 +353,21 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.DEBUG_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Fetch information for all the tasks for the given task type")
- public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo>
getTasksDebugInfo(
+ @ManagedAsync
+ public void getTasksDebugInfo(
@ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
@ApiParam(value = "verbosity (Prints information for all the tasks for
the given task type."
+ "By default, only prints subtask details for running and error
tasks. "
+ "Value of > 0 prints subtask details for all tasks)")
- @DefaultValue("0") @QueryParam("verbosity") int verbosity) {
- return _pinotHelixTaskResourceManager.getTasksDebugInfo(taskType,
verbosity);
+ @DefaultValue("0") @QueryParam("verbosity") int verbosity,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> debugInfo =
+ _pinotHelixTaskResourceManager.getTasksDebugInfo(taskType,
verbosity);
+ asyncResponse.resume(debugInfo);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -319,16 +375,24 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.DEBUG_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Fetch information for all the tasks for the given task type
and table")
- public Map<String, PinotHelixTaskResourceManager.TaskDebugInfo>
getTasksDebugInfo(
+ @ManagedAsync
+ public void getTasksDebugInfo(
@ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
@ApiParam(value = "Table name with type", required = true)
@PathParam("tableNameWithType")
String tableNameWithType,
@ApiParam(value = "verbosity (Prints information for all the tasks for
the given task type and table."
+ "By default, only prints subtask details for running and error
tasks. "
+ "Value of > 0 prints subtask details for all tasks)")
- @DefaultValue("0") @QueryParam("verbosity") int verbosity, @Context
HttpHeaders headers) {
- tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
- return _pinotHelixTaskResourceManager.getTasksDebugInfoByTable(taskType,
tableNameWithType, verbosity);
+ @DefaultValue("0") @QueryParam("verbosity") int verbosity, @Context
HttpHeaders headers,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ String translatedTableName =
DatabaseUtils.translateTableName(tableNameWithType, headers);
+ Map<String, PinotHelixTaskResourceManager.TaskDebugInfo> debugInfo =
+ _pinotHelixTaskResourceManager.getTasksDebugInfoByTable(taskType,
translatedTableName, verbosity);
+ asyncResponse.resume(debugInfo);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -336,22 +400,32 @@ public class PinotTaskRestletResource {
@Path("/tasks/generator/{tableNameWithType}/{taskType}/debug")
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@ApiOperation("Fetch task generation information for the recent runs of the
given task for the given table")
- public String getTaskGenerationDebugInto(
+ @ManagedAsync
+ public void getTaskGenerationDebugInfo(
@ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
@ApiParam(value = "Table name with type", required = true)
@PathParam("tableNameWithType")
String tableNameWithType,
@ApiParam(value = "Whether to only lookup local cache for logs",
defaultValue = "false") @QueryParam("localOnly")
- boolean localOnly, @Context HttpHeaders httpHeaders)
+ boolean localOnly, @Context HttpHeaders httpHeaders, @Suspended
AsyncResponse asyncResponse) {
+ try {
+ String result = getTaskGenerationDebugInfoSync(taskType,
tableNameWithType, localOnly, httpHeaders);
+ asyncResponse.resume(result);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
+ }
+
+ private String getTaskGenerationDebugInfoSync(String taskType, String
tableNameWithType, boolean localOnly,
+ HttpHeaders httpHeaders)
throws JsonProcessingException {
- tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
httpHeaders);
+ String translatedTableName =
DatabaseUtils.translateTableName(tableNameWithType, httpHeaders);
if (localOnly) {
BaseTaskGeneratorInfo taskGeneratorMostRecentRunInfo =
- _taskManagerStatusCache.fetchTaskGeneratorInfo(tableNameWithType,
taskType);
+ _taskManagerStatusCache.fetchTaskGeneratorInfo(translatedTableName,
taskType);
if (taskGeneratorMostRecentRunInfo == null) {
throw new ControllerApplicationException(LOGGER, "Task generation
information not found",
Response.Status.NOT_FOUND);
}
-
return JsonUtils.objectToString(taskGeneratorMostRecentRunInfo);
}
@@ -360,7 +434,7 @@ public class PinotTaskRestletResource {
// Relying on original schema that was used to query the controller
URI uri = _uriInfo.getRequestUri();
String scheme = uri.getScheme();
- String finalTableNameWithType = tableNameWithType;
+ String finalTableNameWithType = translatedTableName;
List<String> controllerUrls = controllers.stream().map(controller -> String
.format("%s://%s:%d/tasks/generator/%s/%s/debug?localOnly=true",
scheme, controller.getHostName(),
Integer.parseInt(controller.getPort()), finalTableNameWithType,
taskType)).collect(Collectors.toList());
@@ -392,7 +466,8 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.DEBUG_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Fetch information for the given task name")
- public PinotHelixTaskResourceManager.TaskDebugInfo getTaskDebugInfo(
+ @ManagedAsync
+ public void getTaskDebugInfo(
@ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName,
@ApiParam(value = "verbosity (Prints information for the given task
name."
+ "By default, only prints subtask details for running and error
tasks. "
@@ -400,11 +475,18 @@ public class PinotTaskRestletResource {
@DefaultValue("0") @QueryParam("verbosity") int verbosity,
@ApiParam(value = "Table name with type (e.g., 'myTable_OFFLINE') to
filter subtasks by table. "
+ "Only subtasks for this table will be returned.")
- @QueryParam("tableName") @Nullable String tableNameWithType, @Context
HttpHeaders httpHeaders) {
- if (tableNameWithType != null) {
- tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
httpHeaders);
+ @QueryParam("tableName") @Nullable String tableNameWithType, @Context
HttpHeaders httpHeaders,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ String translatedTableName = tableNameWithType != null
+ ? DatabaseUtils.translateTableName(tableNameWithType, httpHeaders)
+ : null;
+ PinotHelixTaskResourceManager.TaskDebugInfo debugInfo =
+ _pinotHelixTaskResourceManager.getTaskDebugInfo(taskName,
translatedTableName, verbosity);
+ asyncResponse.resume(debugInfo);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
}
- return _pinotHelixTaskResourceManager.getTaskDebugInfo(taskName,
tableNameWithType, verbosity);
}
@GET
@@ -412,9 +494,16 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Get a map from task to task state for the given task type")
- public Map<String, TaskState> getTaskStates(
- @ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType) {
- return _pinotHelixTaskResourceManager.getTaskStates(taskType);
+ @ManagedAsync
+ public void getTaskStates(
+ @ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ Map<String, TaskState> states =
_pinotHelixTaskResourceManager.getTaskStates(taskType);
+ asyncResponse.resume(states);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -422,9 +511,16 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Get the task state for the given task")
- public TaskState getTaskState(
- @ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName) {
- return _pinotHelixTaskResourceManager.getTaskState(taskName);
+ @ManagedAsync
+ public void getTaskState(
+ @ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ TaskState state = _pinotHelixTaskResourceManager.getTaskState(taskName);
+ asyncResponse.resume(state);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -432,9 +528,16 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Get the states of all the sub tasks for the given task")
- public Map<String, TaskPartitionState> getSubtaskStates(
- @ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName) {
- return _pinotHelixTaskResourceManager.getSubtaskStates(taskName);
+ @ManagedAsync
+ public void getSubtaskStates(
+ @ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ Map<String, TaskPartitionState> states =
_pinotHelixTaskResourceManager.getSubtaskStates(taskName);
+ asyncResponse.resume(states);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -442,9 +545,16 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Get the task config (a list of child task configs) for the
given task")
- public List<PinotTaskConfig> getTaskConfigs(
- @ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName) {
- return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
+ @ManagedAsync
+ public void getTaskConfigs(
+ @ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ List<PinotTaskConfig> configs =
_pinotHelixTaskResourceManager.getSubtaskConfigs(taskName);
+ asyncResponse.resume(configs);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -452,9 +562,16 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Get the task runtime config for the given task")
- public Map<String, String> getTaskConfig(
- @ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName) {
- return _pinotHelixTaskResourceManager.getTaskRuntimeConfig(taskName);
+ @ManagedAsync
+ public void getTaskConfig(
+ @ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ Map<String, String> config =
_pinotHelixTaskResourceManager.getTaskRuntimeConfig(taskName);
+ asyncResponse.resume(config);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -462,11 +579,19 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Get the configs of specified sub tasks for the given task")
- public Map<String, PinotTaskConfig> getSubtaskConfigs(
+ @ManagedAsync
+ public void getSubtaskConfigs(
@ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName,
@ApiParam(value = "Sub task names separated by comma")
@QueryParam("subtaskNames") @Nullable
- String subtaskNames) {
- return _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName,
subtaskNames);
+ String subtaskNames,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ Map<String, PinotTaskConfig> configs =
+ _pinotHelixTaskResourceManager.getSubtaskConfigs(taskName,
subtaskNames);
+ asyncResponse.resume(configs);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@GET
@@ -474,10 +599,23 @@ public class PinotTaskRestletResource {
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_TASK)
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation("Get progress of specified sub tasks for the given task
tracked by minion worker in memory")
- public String getSubtaskProgress(@Context HttpHeaders httpHeaders,
+ @ManagedAsync
+ public void getSubtaskProgress(@Context HttpHeaders httpHeaders,
@ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName,
@ApiParam(value = "Sub task names separated by comma")
@QueryParam("subtaskNames") @Nullable
- String subtaskNames) {
+ String subtaskNames,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ String progress = getSubtaskProgressSync(httpHeaders, taskName,
subtaskNames);
+ asyncResponse.resume(progress);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
+ }
+
+ private String getSubtaskProgressSync(HttpHeaders httpHeaders, String
taskName,
+ @Nullable String subtaskNames)
+ throws Exception {
// Relying on original schema that was used to query the controller
String scheme = _uriInfo.getRequestUri().getScheme();
List<InstanceConfig> workers =
_pinotHelixResourceManager.getAllMinionInstanceConfigs();
@@ -513,11 +651,24 @@ public class PinotTaskRestletResource {
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
message = "Internal server error")
})
- public String getSubtaskOnWorkerProgress(@Context HttpHeaders httpHeaders,
+ @ManagedAsync
+ public void getSubtaskOnWorkerProgress(@Context HttpHeaders httpHeaders,
@ApiParam(value = "Subtask state
(UNKNOWN,IN_PROGRESS,SUCCEEDED,CANCELLED,ERROR)", required = true)
@QueryParam("subTaskState") String subTaskState,
@ApiParam(value = "Minion worker IDs separated by comma")
@QueryParam("minionWorkerIds") @Nullable
- String minionWorkerIds) {
+ String minionWorkerIds,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ String progress = getSubtaskOnWorkerProgressSync(httpHeaders,
subTaskState, minionWorkerIds);
+ asyncResponse.resume(progress);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
+ }
+
+ private String getSubtaskOnWorkerProgressSync(HttpHeaders httpHeaders,
String subTaskState,
+ @Nullable String minionWorkerIds)
+ throws Exception {
Set<String> selectedMinionWorkers = new HashSet<>();
if (StringUtils.isNotEmpty(minionWorkerIds)) {
selectedMinionWorkers.addAll(
@@ -673,71 +824,83 @@ public class PinotTaskRestletResource {
@ApiOperation("Schedule tasks and return a map from task type to task name
scheduled. If task type is missing, "
+ "schedules all tasks. If table name is missing, schedules tasks for
all tables in the database. If database "
+ "is missing in headers, uses default.")
- @Nullable
- public Map<String, String> scheduleTasks(
+ @ManagedAsync
+ public void scheduleTasks(
@ApiParam(value = "Task type. If missing, schedules all tasks.")
@QueryParam("taskType") @Nullable
String taskType,
@ApiParam(value = "Table name (with type suffix). If missing, schedules
tasks for all tables in the database.")
@QueryParam("tableName") @Nullable String tableName,
@ApiParam(value = "Minion Instance tag to schedule the task explicitly
on") @QueryParam("minionInstanceTag")
- @Nullable String minionInstanceTag, @Context HttpHeaders headers) {
- String database = headers != null ? headers.getHeaderString(DATABASE) :
DEFAULT_DATABASE;
- Map<String, String> response = new HashMap<>();
- List<String> generationErrors = new ArrayList<>();
- List<String> schedulingErrors = new ArrayList<>();
- TaskSchedulingContext context = new TaskSchedulingContext()
- .setTriggeredBy(CommonConstants.TaskTriggers.MANUAL_TRIGGER.name())
- .setMinionInstanceTag(minionInstanceTag)
- .setLeader(false);
- if (taskType != null) {
- context.setTasksToSchedule(Collections.singleton(taskType));
- }
- if (tableName != null) {
-
context.setTablesToSchedule(Collections.singleton(DatabaseUtils.translateTableName(tableName,
headers)));
- } else {
- context.setDatabasesToSchedule(Collections.singleton(database));
- }
- Map<String, TaskSchedulingInfo> allTaskInfos =
_pinotTaskManager.scheduleTasks(context);
- allTaskInfos.forEach((key, value) -> {
- if (value.getScheduledTaskNames() != null) {
- response.put(key, String.join(",", value.getScheduledTaskNames()));
+ @Nullable String minionInstanceTag, @Context HttpHeaders headers,
@Suspended AsyncResponse asyncResponse) {
+ try {
+ String database = headers != null ? headers.getHeaderString(DATABASE) :
DEFAULT_DATABASE;
+ Map<String, String> response = new HashMap<>();
+ List<String> generationErrors = new ArrayList<>();
+ List<String> schedulingErrors = new ArrayList<>();
+ TaskSchedulingContext context = new TaskSchedulingContext()
+ .setTriggeredBy(CommonConstants.TaskTriggers.MANUAL_TRIGGER.name())
+ .setMinionInstanceTag(minionInstanceTag)
+ .setLeader(false);
+ if (taskType != null) {
+ context.setTasksToSchedule(Collections.singleton(taskType));
}
- generationErrors.addAll(value.getGenerationErrors());
- schedulingErrors.addAll(value.getSchedulingErrors());
- });
- response.put(GENERATION_ERRORS_KEY, String.join(",", generationErrors));
- response.put(SCHEDULING_ERRORS_KEY, String.join(",", schedulingErrors));
- return response;
+ if (tableName != null) {
+
context.setTablesToSchedule(Collections.singleton(DatabaseUtils.translateTableName(tableName,
headers)));
+ } else {
+ context.setDatabasesToSchedule(Collections.singleton(database));
+ }
+ Map<String, TaskSchedulingInfo> allTaskInfos =
_pinotTaskManager.scheduleTasks(context);
+ allTaskInfos.forEach((key, value) -> {
+ if (value.getScheduledTaskNames() != null) {
+ response.put(key, String.join(",", value.getScheduledTaskNames()));
+ }
+ generationErrors.addAll(value.getGenerationErrors());
+ schedulingErrors.addAll(value.getSchedulingErrors());
+ });
+ response.put(GENERATION_ERRORS_KEY, String.join(",", generationErrors));
+ response.put(SCHEDULING_ERRORS_KEY, String.join(",", schedulingErrors));
+ asyncResponse.resume(response);
+ } catch (Exception e) {
+ asyncResponse.resume(new ControllerApplicationException(LOGGER,
+ String.format("Failed to schedule tasks due to error: %s",
ExceptionUtils.getStackTrace(e)),
+ Response.Status.INTERNAL_SERVER_ERROR, e));
+ }
}
@POST
- @ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Path("/tasks/execute")
@Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.EXECUTE_TASK)
@Authenticate(AccessType.CREATE)
@ApiOperation("Execute a task on minion")
+ @ManagedAsync
public void executeAdhocTask(AdhocTaskConfig adhocTaskConfig, @Suspended
AsyncResponse asyncResponse,
@Context Request requestContext) {
try {
-
asyncResponse.resume(_pinotTaskManager.createTask(adhocTaskConfig.getTaskType(),
adhocTaskConfig.getTableName(),
- adhocTaskConfig.getTaskName(), adhocTaskConfig.getTaskConfigs()));
- } catch (TableNotFoundException e) {
- throw new ControllerApplicationException(LOGGER, "Failed to find table:
" + adhocTaskConfig.getTableName(),
- Response.Status.NOT_FOUND, e);
- } catch (TaskAlreadyExistsException e) {
- throw new ControllerApplicationException(LOGGER, "Task already exists: "
+ adhocTaskConfig.getTaskName(),
- Response.Status.CONFLICT, e);
- } catch (UnknownTaskTypeException e) {
- throw new ControllerApplicationException(LOGGER, "Unknown task type: " +
adhocTaskConfig.getTaskType(),
- Response.Status.NOT_FOUND, e);
- } catch (NoTaskScheduledException e) {
- throw new ControllerApplicationException(LOGGER,
- "No task is generated for table: " + adhocTaskConfig.getTableName()
+ ", with task type: "
- + adhocTaskConfig.getTaskType(), Response.Status.BAD_REQUEST, e);
+ try {
+ asyncResponse.resume(
+ _pinotTaskManager.createTask(adhocTaskConfig.getTaskType(),
adhocTaskConfig.getTableName(),
+ adhocTaskConfig.getTaskName(),
adhocTaskConfig.getTaskConfigs()));
+ } catch (TableNotFoundException e) {
+ throw new ControllerApplicationException(LOGGER, "Failed to find
table: " + adhocTaskConfig.getTableName(),
+ Response.Status.NOT_FOUND, e);
+ } catch (TaskAlreadyExistsException e) {
+ throw new ControllerApplicationException(LOGGER, "Task already exists:
" + adhocTaskConfig.getTaskName(),
+ Response.Status.CONFLICT, e);
+ } catch (UnknownTaskTypeException e) {
+ throw new ControllerApplicationException(LOGGER, "Unknown task type: "
+ adhocTaskConfig.getTaskType(),
+ Response.Status.NOT_FOUND, e);
+ } catch (NoTaskScheduledException e) {
+ throw new ControllerApplicationException(LOGGER,
+ "No task is generated for table: " +
adhocTaskConfig.getTableName() + ", with task type: "
+ + adhocTaskConfig.getTaskType(), Response.Status.BAD_REQUEST,
e);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ "Failed to create adhoc task: " + ExceptionUtils.getStackTrace(e),
Response.Status.INTERNAL_SERVER_ERROR,
+ e);
+ }
} catch (Exception e) {
- throw new ControllerApplicationException(LOGGER,
- "Failed to create adhoc task: " + ExceptionUtils.getStackTrace(e),
Response.Status.INTERNAL_SERVER_ERROR, e);
+ asyncResponse.resume(e);
}
}
@@ -747,10 +910,17 @@ public class PinotTaskRestletResource {
@Produces(MediaType.APPLICATION_JSON)
@Authenticate(AccessType.UPDATE)
@ApiOperation("Clean up finished tasks (COMPLETED, FAILED) for the given
task type")
- public SuccessResponse cleanUpTasks(
- @ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType) {
- _pinotHelixTaskResourceManager.cleanUpTaskQueue(taskType);
- return new SuccessResponse("Successfully cleaned up tasks for task type: "
+ taskType);
+ @ManagedAsync
+ public void cleanUpTasks(
+ @ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ _pinotHelixTaskResourceManager.cleanUpTaskQueue(taskType);
+ SuccessResponse response = new SuccessResponse("Successfully cleaned up
tasks for task type: " + taskType);
+ asyncResponse.resume(response);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@PUT
@@ -759,10 +929,17 @@ public class PinotTaskRestletResource {
@Produces(MediaType.APPLICATION_JSON)
@Authenticate(AccessType.UPDATE)
@ApiOperation("Stop all running/pending tasks (as well as the task queue)
for the given task type")
- public SuccessResponse stopTasks(
- @ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType) {
- _pinotHelixTaskResourceManager.stopTaskQueue(taskType);
- return new SuccessResponse("Successfully stopped tasks for task type: " +
taskType);
+ @ManagedAsync
+ public void stopTasks(
+ @ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ _pinotHelixTaskResourceManager.stopTaskQueue(taskType);
+ SuccessResponse response = new SuccessResponse("Successfully stopped
tasks for task type: " + taskType);
+ asyncResponse.resume(response);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@PUT
@@ -771,10 +948,17 @@ public class PinotTaskRestletResource {
@Produces(MediaType.APPLICATION_JSON)
@Authenticate(AccessType.UPDATE)
@ApiOperation("Resume all stopped tasks (as well as the task queue) for the
given task type")
- public SuccessResponse resumeTasks(
- @ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType) {
- _pinotHelixTaskResourceManager.resumeTaskQueue(taskType);
- return new SuccessResponse("Successfully resumed tasks for task type: " +
taskType);
+ @ManagedAsync
+ public void resumeTasks(
+ @ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ _pinotHelixTaskResourceManager.resumeTaskQueue(taskType);
+ SuccessResponse response = new SuccessResponse("Successfully resumed
tasks for task type: " + taskType);
+ asyncResponse.resume(response);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@DELETE
@@ -783,12 +967,19 @@ public class PinotTaskRestletResource {
@Produces(MediaType.APPLICATION_JSON)
@Authenticate(AccessType.DELETE)
@ApiOperation("Delete all tasks (as well as the task queue) for the given
task type")
- public SuccessResponse deleteTasks(
+ @ManagedAsync
+ public void deleteTasks(
@ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
@ApiParam(value = "Whether to force deleting the tasks (expert only
option, enable with cautious")
- @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete) {
- _pinotHelixTaskResourceManager.deleteTaskQueue(taskType, forceDelete);
- return new SuccessResponse("Successfully deleted tasks for task type: " +
taskType);
+ @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ _pinotHelixTaskResourceManager.deleteTaskQueue(taskType, forceDelete);
+ SuccessResponse response = new SuccessResponse("Successfully deleted
tasks for task type: " + taskType);
+ asyncResponse.resume(response);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@DELETE
@@ -797,12 +988,19 @@ public class PinotTaskRestletResource {
@Produces(MediaType.APPLICATION_JSON)
@Authenticate(AccessType.DELETE)
@ApiOperation("Delete a single task given its task name")
- public SuccessResponse deleteTask(
+ @ManagedAsync
+ public void deleteTask(
@ApiParam(value = "Task name", required = true) @PathParam("taskName")
String taskName,
@ApiParam(value = "Whether to force deleting the task (expert only
option, enable with cautious")
- @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete) {
- _pinotHelixTaskResourceManager.deleteTask(taskName, forceDelete);
- return new SuccessResponse("Successfully deleted task: " + taskName);
+ @DefaultValue("false") @QueryParam("forceDelete") boolean forceDelete,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ _pinotHelixTaskResourceManager.deleteTask(taskName, forceDelete);
+ SuccessResponse response = new SuccessResponse("Successfully deleted
task: " + taskName);
+ asyncResponse.resume(response);
+ } catch (Exception e) {
+ asyncResponse.resume(e);
+ }
}
@DELETE
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
index 673554c4caf..8fa53df4dc2 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResourceTest.java
@@ -25,9 +25,12 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
+import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedHashMap;
import javax.ws.rs.core.UriInfo;
+import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
@@ -45,10 +48,10 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
@@ -61,6 +64,10 @@ public class PinotTaskRestletResourceTest {
ControllerConf _controllerConf;
@Mock
UriInfo _uriInfo;
+ @Mock
+ Executor _executor;
+ @Mock
+ HttpClientConnectionManager _connectionManager;
@InjectMocks
PinotTaskRestletResource _pinotTaskRestletResource;
@@ -108,13 +115,18 @@ public class PinotTaskRestletResourceTest {
when(_pinotHelixResourceManager.getAllMinionInstanceConfigs()).thenReturn(List.of(minion1,
minion2));
HttpHeaders httpHeaders = Mockito.mock(HttpHeaders.class);
when(httpHeaders.getRequestHeaders()).thenReturn(new
MultivaluedHashMap<>());
+ when(_controllerConf.getMinionAdminRequestTimeoutSeconds()).thenReturn(10);
+ @SuppressWarnings("unchecked")
ArgumentCaptor<Map<String, String>> minionWorkerEndpointsCaptor =
ArgumentCaptor.forClass(Map.class);
when(_pinotHelixTaskResourceManager.getSubtaskOnWorkerProgress(anyString(),
any(), any(),
minionWorkerEndpointsCaptor.capture(), anyMap(), anyInt()))
.thenReturn(Collections.emptyMap());
- String progress =
- _pinotTaskRestletResource.getSubtaskOnWorkerProgress(httpHeaders,
"IN_PROGRESS", minionWorkerIds);
- assertEquals(progress, "{}");
+ AsyncResponse asyncResponse = Mockito.mock(AsyncResponse.class);
+ ArgumentCaptor<Object> responseCaptor =
ArgumentCaptor.forClass(Object.class);
+ _pinotTaskRestletResource.getSubtaskOnWorkerProgress(httpHeaders,
"IN_PROGRESS", minionWorkerIds, asyncResponse);
+
+ verify(asyncResponse).resume(responseCaptor.capture());
+ assertEquals(responseCaptor.getValue(), "{}");
return minionWorkerEndpointsCaptor.getValue();
}
@@ -131,11 +143,16 @@ public class PinotTaskRestletResourceTest {
when(_pinotHelixResourceManager.getAllMinionInstanceConfigs()).thenReturn(Collections.emptyList());
HttpHeaders httpHeaders = Mockito.mock(HttpHeaders.class);
when(httpHeaders.getRequestHeaders()).thenReturn(new
MultivaluedHashMap<>());
+ when(_controllerConf.getMinionAdminRequestTimeoutSeconds()).thenReturn(10);
when(_pinotHelixTaskResourceManager
.getSubtaskOnWorkerProgress(anyString(), any(), any(), anyMap(),
anyMap(), anyInt()))
.thenThrow(new RuntimeException());
- assertThrows(ControllerApplicationException.class,
- () ->
_pinotTaskRestletResource.getSubtaskOnWorkerProgress(httpHeaders,
"IN_PROGRESS", null));
+ AsyncResponse asyncResponse = Mockito.mock(AsyncResponse.class);
+ ArgumentCaptor<Throwable> responseCaptor =
ArgumentCaptor.forClass(Throwable.class);
+ _pinotTaskRestletResource.getSubtaskOnWorkerProgress(httpHeaders,
"IN_PROGRESS", null, asyncResponse);
+
+ verify(asyncResponse).resume(responseCaptor.capture());
+ assertTrue(responseCaptor.getValue() instanceof
ControllerApplicationException);
}
@Test
@@ -146,8 +163,14 @@ public class PinotTaskRestletResourceTest {
when(_pinotHelixTaskResourceManager.getTasksSummary(null)).thenReturn(emptyResponse);
- PinotHelixTaskResourceManager.TaskSummaryResponse response =
_pinotTaskRestletResource.getTasksSummary(null);
+ AsyncResponse asyncResponse = Mockito.mock(AsyncResponse.class);
+ ArgumentCaptor<Object> responseCaptor =
ArgumentCaptor.forClass(Object.class);
+ _pinotTaskRestletResource.getTasksSummary(null, asyncResponse);
+
+ verify(asyncResponse).resume(responseCaptor.capture());
+ PinotHelixTaskResourceManager.TaskSummaryResponse response =
+ (PinotHelixTaskResourceManager.TaskSummaryResponse)
responseCaptor.getValue();
assertNotNull(response);
assertEquals(response.getTotalRunningTasks(), 0);
assertEquals(response.getTotalWaitingTasks(), 0);
@@ -183,8 +206,14 @@ public class PinotTaskRestletResourceTest {
when(_pinotHelixTaskResourceManager.getTasksSummary(null)).thenReturn(response);
- PinotHelixTaskResourceManager.TaskSummaryResponse actualResponse =
_pinotTaskRestletResource.getTasksSummary(null);
+ AsyncResponse asyncResponse = Mockito.mock(AsyncResponse.class);
+ ArgumentCaptor<Object> responseCaptor =
ArgumentCaptor.forClass(Object.class);
+ _pinotTaskRestletResource.getTasksSummary(null, asyncResponse);
+ verify(asyncResponse).resume(responseCaptor.capture());
+
+ PinotHelixTaskResourceManager.TaskSummaryResponse actualResponse =
+ (PinotHelixTaskResourceManager.TaskSummaryResponse)
responseCaptor.getValue();
assertNotNull(actualResponse);
assertEquals(actualResponse.getTotalRunningTasks(), 150);
assertEquals(actualResponse.getTotalWaitingTasks(), 50);
@@ -228,9 +257,14 @@ public class PinotTaskRestletResourceTest {
when(_pinotHelixTaskResourceManager.getTasksSummary("defaultTenant")).thenReturn(response);
- PinotHelixTaskResourceManager.TaskSummaryResponse actualResponse =
- _pinotTaskRestletResource.getTasksSummary("defaultTenant");
+ AsyncResponse asyncResponse = Mockito.mock(AsyncResponse.class);
+ ArgumentCaptor<Object> responseCaptor =
ArgumentCaptor.forClass(Object.class);
+ _pinotTaskRestletResource.getTasksSummary("defaultTenant", asyncResponse);
+ verify(asyncResponse).resume(responseCaptor.capture());
+
+ PinotHelixTaskResourceManager.TaskSummaryResponse actualResponse =
+ (PinotHelixTaskResourceManager.TaskSummaryResponse)
responseCaptor.getValue();
assertNotNull(actualResponse);
assertEquals(actualResponse.getTotalRunningTasks(), 100);
assertEquals(actualResponse.getTotalWaitingTasks(), 30);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]