This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 73843b57c7 Add support for Cursors through API Query Params (#14110)
73843b57c7 is described below

commit 73843b57c724c55246daa01309d5b4f5046c63f1
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Tue Dec 24 10:45:33 2024 +0530

    Add support for Cursors through API Query Params (#14110)
---
 .../broker/api/resources/PinotClientRequest.java   |  39 +-
 .../api/resources/ResponseStoreResource.java       | 202 ++++++++++
 .../broker/broker/BrokerAdminApiApplication.java   |   5 +-
 .../broker/broker/helix/BaseBrokerStarter.java     |  33 +-
 .../pinot/broker/cursors/FsResponseStore.java      | 248 ++++++++++++
 .../pinot/broker/cursors/JsonResponseSerde.java    |  37 ++
 .../BrokerRequestHandlerDelegate.java              |  35 +-
 .../common/cursors/AbstractResponseStore.java      | 243 ++++++++++++
 .../apache/pinot/common/metrics/BrokerMeter.java   |  22 +-
 .../pinot/common/response/CursorResponse.java      | 132 +++++++
 .../response/broker/CursorResponseNative.java      | 182 +++++++++
 .../common/utils/config/QueryOptionsUtils.java     |   9 +
 .../pinot/controller/BaseControllerStarter.java    |  14 +
 .../apache/pinot/controller/ControllerConf.java    |   1 +
 .../controller/cursors/ResponseStoreCleaner.java   | 222 +++++++++++
 ...ControllerPeriodicTaskStarterStatelessTest.java |   2 +-
 .../java/org/apache/pinot/core/auth/Actions.java   |   2 +
 .../integration/tests/CursorFsIntegrationTest.java |  45 +++
 .../integration/tests/CursorIntegrationTest.java   | 425 +++++++++++++++++++++
 .../tests/CursorWithAuthIntegrationTest.java       | 207 ++++++++++
 .../tests/cursors/MemoryResponseStore.java         | 105 +++++
 .../apache/pinot/spi/cursors/ResponseStore.java    |  82 ++++
 .../pinot/spi/cursors/ResponseStoreService.java    |  77 ++++
 .../apache/pinot/spi/utils/CommonConstants.java    |  24 ++
 24 files changed, 2378 insertions(+), 15 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index bc8c6a5f3c..44da5f962d 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -77,9 +77,11 @@ import 
org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.trace.RequestContext;
 import org.apache.pinot.spi.trace.RequestScope;
 import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.sql.parsers.PinotSqlType;
@@ -100,6 +102,9 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
 public class PinotClientRequest {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotClientRequest.class);
 
+  @Inject
+  PinotConfiguration _brokerConf;
+
   @Inject
   SqlQueryExecutor _sqlQueryExecutor;
 
@@ -157,6 +162,10 @@ public class PinotClientRequest {
   })
   @ManualAuthorization
   public void processSqlQueryPost(String query, @Suspended AsyncResponse 
asyncResponse,
+      @ApiParam(value = "Return a cursor instead of complete result set") 
@QueryParam("getCursor")
+      @DefaultValue("false") boolean getCursor,
+      @ApiParam(value = "Number of rows to fetch. Applicable only when 
getCursor is true") @QueryParam("numRows")
+      @DefaultValue("0") int numRows,
       @Context org.glassfish.grizzly.http.server.Request requestContext,
       @Context HttpHeaders httpHeaders) {
     try {
@@ -165,7 +174,8 @@ public class PinotClientRequest {
         throw new IllegalStateException("Payload is missing the query string 
field 'sql'");
       }
       BrokerResponse brokerResponse =
-          executeSqlQuery((ObjectNode) requestJson, 
makeHttpIdentity(requestContext), false, httpHeaders);
+          executeSqlQuery((ObjectNode) requestJson, 
makeHttpIdentity(requestContext), false, httpHeaders, false,
+              getCursor, numRows);
       asyncResponse.resume(getPinotQueryResponse(brokerResponse));
     } catch (WebApplicationException wae) {
       asyncResponse.resume(wae);
@@ -221,6 +231,10 @@ public class PinotClientRequest {
   })
   @ManualAuthorization
   public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended 
AsyncResponse asyncResponse,
+      @ApiParam(value = "Return a cursor instead of complete result set") 
@QueryParam("getCursor")
+      @DefaultValue("false") boolean getCursor,
+      @ApiParam(value = "Number of rows to fetch. Applicable only getCursor is 
true") @QueryParam("numRows")
+      @DefaultValue("0") int numRows,
       @Context org.glassfish.grizzly.http.server.Request requestContext,
       @Context HttpHeaders httpHeaders) {
     try {
@@ -229,7 +243,8 @@ public class PinotClientRequest {
         throw new IllegalStateException("Payload is missing the query string 
field 'sql'");
       }
       BrokerResponse brokerResponse =
-          executeSqlQuery((ObjectNode) requestJson, 
makeHttpIdentity(requestContext), false, httpHeaders, true);
+          executeSqlQuery((ObjectNode) requestJson, 
makeHttpIdentity(requestContext), false, httpHeaders, true,
+              getCursor, numRows);
       asyncResponse.resume(getPinotQueryResponse(brokerResponse));
     } catch (WebApplicationException wae) {
       asyncResponse.resume(wae);
@@ -427,6 +442,12 @@ public class PinotClientRequest {
   private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, 
HttpRequesterIdentity httpRequesterIdentity,
       boolean onlyDql, HttpHeaders httpHeaders, boolean forceUseMultiStage)
       throws Exception {
+    return executeSqlQuery(sqlRequestJson, httpRequesterIdentity, onlyDql, 
httpHeaders, forceUseMultiStage, false, 0);
+  }
+
+  private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson, 
HttpRequesterIdentity httpRequesterIdentity,
+      boolean onlyDql, HttpHeaders httpHeaders, boolean forceUseMultiStage, 
boolean getCursor, int numRows)
+      throws Exception {
     long requestArrivalTimeMs = System.currentTimeMillis();
     SqlNodeAndOptions sqlNodeAndOptions;
     try {
@@ -437,6 +458,16 @@ public class PinotClientRequest {
     if (forceUseMultiStage) {
       
sqlNodeAndOptions.setExtraOptions(ImmutableMap.of(Request.QueryOptionKey.USE_MULTISTAGE_ENGINE,
 "true"));
     }
+    if (getCursor) {
+      if (numRows == 0) {
+        numRows = 
_brokerConf.getProperty(CommonConstants.CursorConfigs.CURSOR_FETCH_ROWS,
+            CommonConstants.CursorConfigs.DEFAULT_CURSOR_FETCH_ROWS);
+      }
+      sqlNodeAndOptions.setExtraOptions(
+          ImmutableMap.of(Request.QueryOptionKey.GET_CURSOR, "true", 
Request.QueryOptionKey.CURSOR_NUM_ROWS,
+              Integer.toString(numRows)));
+      _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_QUERIES_GLOBAL, 
1);
+    }
     PinotSqlType sqlType = sqlNodeAndOptions.getSqlType();
     if (onlyDql && sqlType != PinotSqlType.DQL) {
       return new 
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
@@ -475,7 +506,7 @@ public class PinotClientRequest {
     return _requestHandler.handleTimeSeriesRequest(language, queryString, 
requestContext);
   }
 
-  private static HttpRequesterIdentity 
makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
+  public static HttpRequesterIdentity 
makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
     Multimap<String, String> headers = ArrayListMultimap.create();
     context.getHeaderNames().forEach(key -> 
context.getHeaders(key).forEach(value -> headers.put(key, value)));
 
@@ -497,7 +528,7 @@ public class PinotClientRequest {
    * @throws Exception
    */
   @VisibleForTesting
-  static Response getPinotQueryResponse(BrokerResponse brokerResponse)
+  public static Response getPinotQueryResponse(BrokerResponse brokerResponse)
       throws Exception {
     int queryErrorCodeHeaderValue = -1; // default value of the header.
     List<QueryProcessingException> exceptions = brokerResponse.getExceptions();
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java
new file mode 100644
index 0000000000..afc8ceebf4
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collection;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.ManualAuthorization;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.auth.TableAuthorizationResult;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.glassfish.grizzly.http.server.Request;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ * This resource API provides API to read cursors as well as admin function 
such as list, read and delete response
+ * stores
+ */
+@Api(tags = "ResponseStore", authorizations = {@Authorization(value = 
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition = 
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+    HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
+    description = "The format of the key is  ```\"Basic <token>\" or \"Bearer 
<token>\"```")))
+@Path("/responseStore")
+public class ResponseStoreResource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreResource.class);
+
+  @Inject
+  private PinotConfiguration _brokerConf;
+
+  @Inject
+  private BrokerMetrics _brokerMetrics;
+
+  @Inject
+  private AbstractResponseStore _responseStore;
+
+  @Inject
+  AccessControlFactory _accessControlFactory;
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.GET_RESPONSE_STORE)
+  @ApiOperation(value = "Get metadata of all response stores.", notes = "Get 
metadata of all response stores")
+  public Collection<CursorResponse> getResults(@Context HttpHeaders headers) {
+    try {
+      return _responseStore.getAllStoredResponses();
+    } catch (Exception e) {
+      throw new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("{requestId}")
+  @ApiOperation(value = "Response without ResultTable of a query")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"), @ApiResponse(code 
= 500, message = "Internal Server Error")
+  })
+  @ManualAuthorization
+  public BrokerResponse getSqlQueryMetadata(
+      @ApiParam(value = "Request ID of the query", required = true) 
@PathParam("requestId") String requestId,
+      @Context org.glassfish.grizzly.http.server.Request requestContext) {
+    try {
+      checkRequestExistsAndAuthorized(requestId, requestContext);
+      return _responseStore.readResponse(requestId);
+    } catch (WebApplicationException wae) {
+      throw wae;
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing GET request", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_GET_EXCEPTIONS, 1L);
+      throw new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+    }
+  }
+
+  @GET
+  @ManagedAsync
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("{requestId}/results")
+  @ApiOperation(value = "Get result set from the query's response store")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Query response"), @ApiResponse(code 
= 500, message = "Internal Server Error")
+  })
+  @ManualAuthorization
+  public void getSqlQueryResult(
+      @ApiParam(value = "Request ID of the query", required = true) 
@PathParam("requestId") String requestId,
+      @ApiParam(value = "Offset in the result set", required = true) 
@QueryParam("offset") int offset,
+      @ApiParam(value = "Number of rows to fetch") @QueryParam("numRows") 
Integer numRows,
+      @Context org.glassfish.grizzly.http.server.Request requestContext,
+      @Suspended AsyncResponse asyncResponse) {
+    try {
+      checkRequestExistsAndAuthorized(requestId, requestContext);
+      if (numRows == null) {
+        numRows = 
_brokerConf.getProperty(CommonConstants.CursorConfigs.CURSOR_FETCH_ROWS,
+            CommonConstants.CursorConfigs.DEFAULT_CURSOR_FETCH_ROWS);
+      }
+      asyncResponse.resume(
+          
PinotClientRequest.getPinotQueryResponse(_responseStore.handleCursorRequest(requestId,
 offset, numRows)));
+    } catch (WebApplicationException wae) {
+      asyncResponse.resume(wae);
+    } catch (Exception e) {
+      LOGGER.error("Caught exception while processing GET request", e);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_GET_EXCEPTIONS, 1L);
+      asyncResponse.resume(new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build()));
+    }
+  }
+
+  @DELETE
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/{requestId}")
+  @Authorize(targetType = TargetType.CLUSTER, action = 
Actions.Cluster.DELETE_RESPONSE_STORE)
+  @ApiOperation(value = "Delete the response store of a query", notes = 
"Delete the response store of a query")
+  public String deleteResponse(
+      @ApiParam(value = "Request ID of the query", required = true) 
@PathParam("requestId") String requestId,
+      @Context HttpHeaders headers) {
+    try {
+      if (_responseStore.deleteResponse(requestId)) {
+        return "Query Results for " + requestId + " deleted.";
+      }
+    } catch (Exception e) {
+      throw new WebApplicationException(e,
+          
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+    }
+
+    // Query Result not found. Throw error.
+    throw new WebApplicationException(
+        Response.status(Response.Status.NOT_FOUND).entity(String.format("Query 
results for %s not found.", requestId))
+            .build());
+  }
+
+  private void checkRequestExistsAndAuthorized(String requestId, Request 
requestContext)
+      throws Exception {
+    if (_responseStore.exists(requestId)) {
+      CursorResponse response = _responseStore.readResponse(requestId);
+      AccessControl accessControl = _accessControlFactory.create();
+      TableAuthorizationResult result = accessControl.authorize(
+          PinotClientRequest.makeHttpIdentity(requestContext),
+          response.getTablesQueried());
+      if (!result.hasAccess()) {
+        throw new WebApplicationException(
+            
Response.status(Response.Status.FORBIDDEN).entity(result.getFailureMessage()).build());
+      }
+    } else {
+      throw new 
WebApplicationException(Response.status(Response.Status.NOT_FOUND)
+          .entity(String.format("Query results for %s not found.", 
requestId)).build());
+    }
+  }
+}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
index fc443caab0..64e6cb837b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
@@ -35,6 +35,7 @@ import org.apache.helix.HelixManager;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
 import org.apache.pinot.common.http.PoolingHttpClientConnectionManagerHelper;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.swagger.SwaggerApiListingResource;
@@ -75,7 +76,7 @@ public class BrokerAdminApiApplication extends ResourceConfig 
{
   public BrokerAdminApiApplication(BrokerRoutingManager routingManager, 
BrokerRequestHandler brokerRequestHandler,
       BrokerMetrics brokerMetrics, PinotConfiguration brokerConf, 
SqlQueryExecutor sqlQueryExecutor,
       ServerRoutingStatsManager serverRoutingStatsManager, 
AccessControlFactory accessFactory,
-      HelixManager helixManager, QueryQuotaManager queryQuotaManager) {
+      HelixManager helixManager, QueryQuotaManager queryQuotaManager, 
AbstractResponseStore responseStore) {
     _brokerResourcePackages = 
brokerConf.getProperty(CommonConstants.Broker.BROKER_RESOURCE_PACKAGES,
         CommonConstants.Broker.DEFAULT_BROKER_RESOURCE_PACKAGES);
     String[] pkgs = _brokerResourcePackages.split(",");
@@ -116,6 +117,8 @@ public class BrokerAdminApiApplication extends 
ResourceConfig {
         bind(queryQuotaManager).to(QueryQuotaManager.class);
         bind(accessFactory).to(AccessControlFactory.class);
         bind(startTime).named(BrokerAdminApiApplication.START_TIME);
+        bind(responseStore).to(AbstractResponseStore.class);
+        bind(brokerConf).to(PinotConfiguration.class);
       }
     });
     boolean enableBoundedJerseyThreadPoolExecutor =
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 368e409e23..2aab062730 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.broker.helix;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -56,6 +57,7 @@ import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.config.NettyConfig;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
 import org.apache.pinot.common.function.FunctionRegistry;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metrics.BrokerGauge;
@@ -78,8 +80,10 @@ import org.apache.pinot.core.util.ListenerConfigUtil;
 import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.cursors.ResponseStoreService;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import 
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
 import org.apache.pinot.spi.services.ServiceRole;
@@ -139,6 +143,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
   protected ServerRoutingStatsManager _serverRoutingStatsManager;
   protected HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;
   protected MultiStageQueryThrottler _multiStageQueryThrottler;
+  protected AbstractResponseStore _responseStore;
 
   @Override
   public void init(PinotConfiguration brokerConf)
@@ -353,9 +358,26 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
       timeSeriesRequestHandler = new TimeSeriesRequestHandler(_brokerConf, 
brokerId, _routingManager,
           _accessControlFactory, _queryQuotaManager, tableCache, 
queryDispatcher);
     }
+
+    LOGGER.info("Initializing PinotFSFactory");
+    
PinotFSFactory.init(_brokerConf.subset(CommonConstants.Broker.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY));
+
+    LOGGER.info("Initialize ResponseStore");
+    PinotConfiguration responseStoreConfiguration =
+        
_brokerConf.subset(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE);
+
+    String expirationTime = 
_brokerConf.getProperty(CommonConstants.CursorConfigs.RESULTS_EXPIRATION_INTERVAL,
+        CommonConstants.CursorConfigs.DEFAULT_RESULTS_EXPIRATION_INTERVAL);
+
+    _responseStore = (AbstractResponseStore) 
ResponseStoreService.getInstance().getResponseStore(
+        
responseStoreConfiguration.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_TYPE,
+            CommonConstants.CursorConfigs.DEFAULT_RESPONSE_STORE_TYPE));
+    
_responseStore.init(responseStoreConfiguration.subset(_responseStore.getType()),
 _hostname, _port, brokerId,
+        _brokerMetrics, expirationTime);
+
     _brokerRequestHandler =
         new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler, 
multiStageBrokerRequestHandler,
-            timeSeriesRequestHandler);
+            timeSeriesRequestHandler, _responseStore);
     _brokerRequestHandler.start();
 
     // Enable/disable thread CPU time measurement through instance config.
@@ -604,6 +626,13 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     _brokerRequestHandler.shutDown();
     _brokerAdminApplication.stop();
 
+    LOGGER.info("Close PinotFs");
+    try {
+      PinotFSFactory.shutdown();
+    } catch (IOException e) {
+      LOGGER.error("Caught exception when shutting down PinotFsFactory", e);
+    }
+
     LOGGER.info("Disconnecting spectator Helix manager");
     _spectatorHelixManager.disconnect();
 
@@ -650,7 +679,7 @@ public abstract class BaseBrokerStarter implements 
ServiceStartable {
     BrokerAdminApiApplication brokerAdminApiApplication =
         new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler, 
_brokerMetrics, _brokerConf,
             _sqlQueryExecutor, _serverRoutingStatsManager, 
_accessControlFactory, _spectatorHelixManager,
-            _queryQuotaManager);
+            _queryQuotaManager, _responseStore);
     registerExtraComponents(brokerAdminApiApplication);
     return brokerAdminApiApplication;
   }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java
new file mode 100644
index 0000000000..8da7b0a33c
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS 
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FsResponseStore.class);
+  private static final String TYPE = "file";
+  private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+  private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+  private static final String URI_SEPARATOR = "/";
+
+  public static final String TEMP_DIR = "temp.dir";
+  public static final String DATA_DIR = "data.dir";
+  public static final String FILE_NAME_EXTENSION = "extension";
+  public static final Path DEFAULT_ROOT_DIR = 
Path.of(System.getProperty("java.io.tmpdir"), "broker", "responseStore");
+  public static final Path DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR.resolve("temp");
+  public static final URI DEFAULT_DATA_DIR = 
DEFAULT_ROOT_DIR.resolve("data").toUri();
+  public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+  private Path _localTempDir;
+  private URI _dataDir;
+  private JsonResponseSerde _responseSerde;
+  private String _fileExtension;
+
+  private static URI combinePath(URI baseUri, String path)
+      throws URISyntaxException {
+    String newPath =
+        baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path : 
baseUri.getPath() + URI_SEPARATOR + path;
+    return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+  }
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  public void init(PinotConfiguration config, String brokerHost, int 
brokerPort, String brokerId,
+      BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    init(brokerHost, brokerPort, brokerId, brokerMetrics, expirationTime);
+
+    _responseSerde = new JsonResponseSerde();
+    _fileExtension = config.getProperty(FILE_NAME_EXTENSION, 
DEFAULT_FILE_NAME_EXTENSION);
+    _localTempDir = config.containsKey(TEMP_DIR) ? 
Path.of(config.getProperty(TEMP_DIR)) : DEFAULT_TEMP_DIR;
+    Files.createDirectories(_localTempDir);
+
+    _dataDir = config.containsKey(DATA_DIR) ? new 
URI(config.getProperty(DATA_DIR)) : DEFAULT_DATA_DIR;
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    pinotFS.mkdir(_dataDir);
+  }
+
+  private Path getTempPath(String... nameParts) {
+    StringBuilder filename = new StringBuilder();
+    for (String part : nameParts) {
+      filename.append(part).append("_");
+    }
+    filename.append(Thread.currentThread().getId());
+    return _localTempDir.resolve(filename.toString());
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    return pinotFS.exists(queryDir);
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds()
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir, 
true);
+    List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+    LOGGER.debug("Found {} paths.", queryPaths.size());
+
+    for (FileMetadata metadata : queryPaths) {
+      LOGGER.debug("Processing query path: {}", metadata.toString());
+      if (metadata.isDirectory()) {
+        try {
+          URI queryDir = new URI(metadata.getFilePath());
+          URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+          boolean metadataFileExists = pinotFS.exists(metadataFile);
+          LOGGER.debug("Checking for query dir {} & metadata file: {}. 
Metadata file exists: {}", queryDir,
+              metadataFile, metadataFileExists);
+          if (metadataFileExists) {
+            BrokerResponse response =
+                _responseSerde.deserialize(pinotFS.open(metadataFile), 
CursorResponseNative.class);
+            if (response.getBrokerId().equals(_brokerId)) {
+              requestIdList.add(response.getRequestId());
+              LOGGER.debug("Added response store {}", queryDir);
+            }
+          }
+        } catch (Exception e) {
+          LOGGER.error("Error when processing {}", metadata, e);
+        }
+      }
+    }
+
+    return requestIdList;
+  }
+
+  @Override
+  protected boolean deleteResponseImpl(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    if (pinotFS.exists(queryDir)) {
+      pinotFS.delete(queryDir, true);
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  protected void writeResponse(String requestId, CursorResponse response)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResponseFile = getTempPath("response", requestId);
+    URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+
+    try (OutputStream tempResponseFileOS = 
Files.newOutputStream(tempResponseFile)) {
+      _responseSerde.serialize(response, tempResponseFileOS);
+    }
+
+    try {
+      pinotFS.copyFromLocalFile(tempResponseFile.toFile(), metadataFile);
+    } finally {
+      Files.delete(tempResponseFile);
+    }
+  }
+
+  @Override
+  protected long writeResultTable(String requestId, ResultTable resultTable)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+
+    // Create a directory for this query.
+    pinotFS.mkdir(queryDir);
+
+    Path tempResultTableFile = getTempPath("resultTable", requestId);
+    URI dataFile = combinePath(queryDir, 
String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
+
+    try (OutputStream tempResultTableFileOS = 
Files.newOutputStream(tempResultTableFile)) {
+      _responseSerde.serialize(resultTable, tempResultTableFileOS);
+    }
+
+    try {
+      File tempFile = tempResultTableFile.toFile();
+      pinotFS.copyFromLocalFile(tempFile, dataFile);
+      return tempFile.length();
+    } finally {
+      Files.delete(tempResultTableFile);
+    }
+  }
+
+  @Override
+  public CursorResponse readResponse(String requestId)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    URI metadataFile = combinePath(queryDir, 
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+    try (InputStream metadataIS = pinotFS.open(metadataFile)) {
+      return _responseSerde.deserialize(metadataIS, 
CursorResponseNative.class);
+    }
+  }
+
+  @Override
+  protected ResultTable readResultTable(String requestId, int offset, int 
numRows)
+      throws Exception {
+    PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+    URI queryDir = combinePath(_dataDir, requestId);
+    URI dataFile = combinePath(queryDir, 
String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
+    CursorResponse response = readResponse(requestId);
+    int totalTableRows = response.getNumRowsResultSet();
+
+    try (InputStream dataIS = pinotFS.open(dataFile)) {
+      ResultTable resultTable = _responseSerde.deserialize(dataIS, 
ResultTable.class);
+
+      int sliceEnd = offset + numRows;
+      if (sliceEnd > totalTableRows) {
+        sliceEnd = totalTableRows;
+      }
+
+      return new ResultTable(resultTable.getDataSchema(), 
resultTable.getRows().subList(offset, sliceEnd));
+    }
+  }
+}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/cursors/JsonResponseSerde.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/cursors/JsonResponseSerde.java
new file mode 100644
index 0000000000..eb8083cbc5
--- /dev/null
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/cursors/JsonResponseSerde.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+public class JsonResponseSerde {
+  public void serialize(Object object, OutputStream stream)
+      throws IOException {
+    JsonUtils.objectToOutputStream(object, stream);
+  }
+
+  public <T> T deserialize(InputStream stream, Class<T> valueType)
+      throws IOException {
+    return JsonUtils.inputStreamToObject(stream, valueType);
+  }
+}
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index e3a814365a..561e79abb4 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -25,8 +25,10 @@ import javax.annotation.Nullable;
 import javax.ws.rs.core.HttpHeaders;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
 import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
 import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -46,13 +48,15 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
   private final BaseSingleStageBrokerRequestHandler 
_singleStageBrokerRequestHandler;
   private final MultiStageBrokerRequestHandler _multiStageBrokerRequestHandler;
   private final TimeSeriesRequestHandler _timeSeriesRequestHandler;
+  private final AbstractResponseStore _responseStore;
 
   public BrokerRequestHandlerDelegate(BaseSingleStageBrokerRequestHandler 
singleStageBrokerRequestHandler,
       @Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler,
-      @Nullable TimeSeriesRequestHandler timeSeriesRequestHandler) {
+      @Nullable TimeSeriesRequestHandler timeSeriesRequestHandler, 
AbstractResponseStore responseStore) {
     _singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
     _multiStageBrokerRequestHandler = multiStageBrokerRequestHandler;
     _timeSeriesRequestHandler = timeSeriesRequestHandler;
+    _responseStore = responseStore;
   }
 
   @Override
@@ -99,18 +103,23 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
       }
     }
 
+    BaseBrokerRequestHandler requestHandler = _singleStageBrokerRequestHandler;
     if 
(QueryOptionsUtils.isUseMultistageEngine(sqlNodeAndOptions.getOptions())) {
       if (_multiStageBrokerRequestHandler != null) {
-        return _multiStageBrokerRequestHandler.handleRequest(request, 
sqlNodeAndOptions, requesterIdentity,
-            requestContext, httpHeaders);
+        requestHandler = _multiStageBrokerRequestHandler;
       } else {
         return new 
BrokerResponseNative(QueryException.getException(QueryException.INTERNAL_ERROR,
             "V2 Multi-Stage query engine not enabled."));
       }
-    } else {
-      return _singleStageBrokerRequestHandler.handleRequest(request, 
sqlNodeAndOptions, requesterIdentity,
-          requestContext, httpHeaders);
     }
+
+    BrokerResponse response = requestHandler.handleRequest(request, 
sqlNodeAndOptions, requesterIdentity,
+        requestContext, httpHeaders);
+
+    if (response.getExceptionsSize() == 0 && 
QueryOptionsUtils.isGetCursor(sqlNodeAndOptions.getOptions())) {
+      response = 
getCursorResponse(QueryOptionsUtils.getCursorNumRows(sqlNodeAndOptions.getOptions()),
 response);
+    }
+    return response;
   }
 
   @Override
@@ -138,4 +147,18 @@ public class BrokerRequestHandlerDelegate implements 
BrokerRequestHandler {
     //       not found, try on the singleStaged engine.
     return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs, 
executor, connMgr, serverResponses);
   }
+
+  private CursorResponse getCursorResponse(Integer numRows, BrokerResponse 
response)
+      throws Exception {
+    if (numRows == null) {
+      throw new RuntimeException(
+          "numRows not specified when requesting a cursor for request id: " + 
response.getRequestId());
+    }
+    long cursorStoreStartTimeMs = System.currentTimeMillis();
+    _responseStore.storeResponse(response);
+    long cursorStoreTimeMs = System.currentTimeMillis() - 
cursorStoreStartTimeMs;
+    CursorResponse cursorResponse = 
_responseStore.handleCursorRequest(response.getRequestId(), 0, numRows);
+    cursorResponse.setCursorResultWriteTimeMs(cursorStoreTimeMs);
+    return cursorResponse;
+  }
 }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java
new file mode 100644
index 0000000000..186a668d65
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.TimeUtils;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+  protected String _brokerHost;
+  protected int _brokerPort;
+  protected String _brokerId;
+  protected BrokerMetrics _brokerMetrics;
+  protected long _expirationIntervalInMs;
+
+  protected void init(String brokerHost, int brokerPort, String brokerId, 
BrokerMetrics brokerMetrics,
+      String expirationTime) {
+    _brokerMetrics = brokerMetrics;
+    _brokerHost = brokerHost;
+    _brokerPort = brokerPort;
+    _brokerId = brokerId;
+    _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+  }
+
+  /**
+   * Initialize the store.
+   * @param config Subset configuration of 
pinot.broker.cursor.response.store.&lt;type&gt;
+   * @param brokerHost Hostname of the broker where ResponseStore is created
+   * @param brokerPort Port of the broker where the ResponseStore is created
+   * @param brokerId ID of the broker where the ResponseStore is created.
+   * @param brokerMetrics Metrics utility to track cursor metrics.
+   */
+  public abstract void init(PinotConfiguration config, String brokerHost, int 
brokerPort, String brokerId,
+      BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception;
+
+  /**
+   * Get the hostname of the broker where the query is executed
+   * @return String containing the hostname
+   */
+  protected String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  /**
+   * Get the port of the broker where the query is executed
+   * @return int containing the port
+   */
+  protected int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  /**
+   * Get the expiration interval of a query response.
+   * @return long containing the expiration interval.
+   */
+  protected long getExpirationIntervalInMs() {
+    return _expirationIntervalInMs;
+  }
+
+  /**
+   * Write a CursorResponse
+   * @param requestId Request ID of the response
+   * @param response The response to write
+   * @throws Exception Thrown if there is any error while writing the response
+   */
+  protected abstract void writeResponse(String requestId, CursorResponse 
response)
+      throws Exception;
+
+  /**
+   * Write a {@link ResultTable} to the store
+   * @param requestId Request ID of the response
+   * @param resultTable The {@link ResultTable} of the query
+   * @throws Exception Thrown if there is any error while writing the result 
table.
+   * @return Returns the number of bytes written
+   */
+  protected abstract long writeResultTable(String requestId, ResultTable 
resultTable)
+      throws Exception;
+
+  /**
+   * Read the response (excluding the {@link ResultTable}) from the store
+   * @param requestId Request ID of the response
+   * @return CursorResponse (without the {@link ResultTable})
+   * @throws Exception Thrown if there is any error while reading the response
+   */
+  public abstract CursorResponse readResponse(String requestId)
+      throws Exception;
+
+  /**
+   * Read the {@link ResultTable} of a query response
+   * @param requestId Request ID of the query
+   * @param offset Offset of the result slice
+   * @param numRows Number of rows required in the slice
+   * @return {@link ResultTable} of the query
+   * @throws Exception Thrown if there is any error while reading the result 
table
+   */
+  protected abstract ResultTable readResultTable(String requestId, int offset, 
int numRows)
+      throws Exception;
+
+  protected abstract boolean deleteResponseImpl(String requestId)
+      throws Exception;
+
+  /**
+   * Stores the response in the store. {@link CursorResponse} and {@link 
ResultTable} are stored separately.
+   * @param response Response to be stored
+   * @throws Exception Thrown if there is any error while storing the response.
+   */
+  public void storeResponse(BrokerResponse response)
+      throws Exception {
+    String requestId = response.getRequestId();
+
+    CursorResponse cursorResponse = new CursorResponseNative(response);
+
+    long submissionTimeMs = System.currentTimeMillis();
+    // Initialize all CursorResponse specific metadata
+    cursorResponse.setBrokerHost(getBrokerHost());
+    cursorResponse.setBrokerPort(getBrokerPort());
+    cursorResponse.setSubmissionTimeMs(submissionTimeMs);
+    cursorResponse.setExpirationTimeMs(submissionTimeMs + 
getExpirationIntervalInMs());
+    cursorResponse.setOffset(0);
+    cursorResponse.setNumRows(response.getNumRowsResultSet());
+
+    try {
+      long bytesWritten = writeResultTable(requestId, 
response.getResultTable());
+
+      // Remove the resultTable from the response as it is serialized in a 
data file.
+      cursorResponse.setResultTable(null);
+      cursorResponse.setBytesWritten(bytesWritten);
+      writeResponse(requestId, cursorResponse);
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_RESPONSE_STORE_SIZE, 
bytesWritten);
+    } catch (Exception e) {
+      _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_WRITE_EXCEPTION, 
1);
+      deleteResponse(requestId);
+      throw e;
+    }
+  }
+
+  /**
+   * Reads the response from the store and populates it with a slice of the 
{@link ResultTable}
+   * @param requestId Request ID of the query
+   * @param offset Offset of the result slice
+   * @param numRows Number of rows required in the slice
+   * @return A CursorResponse with a slice of the {@link ResultTable}
+   * @throws Exception Thrown if there is any error during the operation.
+   */
+  public CursorResponse handleCursorRequest(String requestId, int offset, int 
numRows)
+      throws Exception {
+
+    CursorResponse response;
+    ResultTable resultTable;
+
+    try {
+      response = readResponse(requestId);
+    } catch (Exception e) {
+      _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 
1);
+      throw e;
+    }
+
+    int totalTableRows = response.getNumRowsResultSet();
+
+    if (totalTableRows == 0 && offset == 0) {
+      // If sum records is 0, then result set is empty.
+      response.setResultTable(null);
+      response.setOffset(0);
+      response.setNumRows(0);
+      return response;
+    } else if (offset >= totalTableRows) {
+      throw new RuntimeException("Offset " + offset + " should be lesser than 
totalRecords " + totalTableRows);
+    }
+
+    long fetchStartTime = System.currentTimeMillis();
+    try {
+      resultTable = readResultTable(requestId, offset, numRows);
+    } catch (Exception e) {
+      _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION, 
1);
+      throw e;
+    }
+
+    response.setResultTable(resultTable);
+    response.setCursorFetchTimeMs(System.currentTimeMillis() - fetchStartTime);
+    response.setOffset(offset);
+    response.setNumRows(resultTable.getRows().size());
+    response.setNumRowsResultSet(totalTableRows);
+    return response;
+  }
+
+  /**
+   * Returns the list of responses created by the broker.
+   * Note that the ResponseStore object in a broker should only return 
responses created by it.
+   * @return A list of CursorResponse objects created by the specific broker
+   * @throws Exception Thrown if there is an error during an operation.
+   */
+  public List<CursorResponse> getAllStoredResponses()
+      throws Exception {
+    List<CursorResponse> responses = new ArrayList<>();
+
+    for (String requestId : getAllStoredRequestIds()) {
+      responses.add(readResponse(requestId));
+    }
+
+    return responses;
+  }
+
+  @Override
+  public boolean deleteResponse(String requestId) throws Exception {
+    if (!exists(requestId)) {
+      return false;
+    }
+
+    long bytesWritten = readResponse(requestId).getBytesWritten();
+    boolean isSucceeded = deleteResponseImpl(requestId);
+    if (isSucceeded) {
+      
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_RESPONSE_STORE_SIZE, 
bytesWritten * -1);
+    }
+    return isSucceeded;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index ea6a66251c..22be35405f 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -169,7 +169,27 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
    * For each query with at least one window function, this meter is increased 
as many times as window functions in the
    * query.
    */
-  WINDOW_COUNT("queries", true),;
+  WINDOW_COUNT("queries", true),
+
+  /**
+   * Number of queries executed with cursors. This count includes queries that 
use SSE and MSE
+   */
+  CURSOR_QUERIES_GLOBAL("queries", true),
+
+  /**
+   * Number of exceptions when writing a response to the response store
+   */
+  CURSOR_WRITE_EXCEPTION("exceptions", true),
+
+  /**
+   * Number of exceptions when reading a response and result table from the 
response store
+   */
+  CURSOR_READ_EXCEPTION("exceptions", true),
+
+  /**
+   * The number of bytes stored in the response store. Only the size of the 
result table is tracked.
+   */
+  CURSOR_RESPONSE_STORE_SIZE("bytes", true);
 
   private final String _brokerMeterName;
   private final String _unit;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java
new file mode 100644
index 0000000000..14e65f6fbb
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response;
+
+public interface CursorResponse extends BrokerResponse {
+
+  void setBrokerHost(String brokerHost);
+
+    /**
+     * get hostname of the processing broker
+     * @return String containing the hostname
+     */
+  String getBrokerHost();
+
+  void setBrokerPort(int brokerPort);
+
+    /**
+     * get port of the processing broker
+     * @return int containing the port.
+     */
+  int getBrokerPort();
+
+  /**
+   * Set the starting offset of result table slice
+   * @param offset Offset of the result table slice
+   */
+  void setOffset(int offset);
+
+  /**
+   * Current offset in the query result.
+   * Starts from 0.
+   * @return current offset.
+   */
+  int getOffset();
+
+  /**
+   * Set the number of rows in the result table slice.
+   * @param numRows Number of rows in the result table slice
+   */
+  void setNumRows(int numRows);
+
+  /**
+   * Number of rows in the current response.
+   * @return Number of rows in the current response.
+   */
+  int getNumRows();
+
+  /**
+   * Return the time to write the results to the response store.
+   * @return time in milliseconds
+   */
+  long getCursorResultWriteTimeMs();
+
+  /**
+   * Time taken to write cursor results to query storage.
+   * @param cursorResultWriteMs Time in milliseconds.
+   */
+  void setCursorResultWriteTimeMs(long cursorResultWriteMs);
+
+  /**
+   * Return the time to fetch results from the response store.
+   * @return time in milliseconds.
+   */
+  long getCursorFetchTimeMs();
+
+  /**
+   * Set the time taken to fetch a cursor. The time is specific to the current 
call.
+   * @param cursorFetchTimeMs time in milliseconds
+   */
+  void setCursorFetchTimeMs(long cursorFetchTimeMs);
+
+  /**
+   * Unix timestamp when the query was submitted. The timestamp is used to 
calculate the expiration time when the
+   * response will be deleted from the response store.
+   * @param submissionTimeMs Unix timestamp when the query was submitted.
+   */
+  void setSubmissionTimeMs(long submissionTimeMs);
+
+  /**
+   * Get the unix timestamp when the query was submitted
+   * @return Submission unix timestamp when the query was submitted
+   */
+  long getSubmissionTimeMs();
+
+  /**
+   * Set the expiration time (unix timestamp) when the response will be 
deleted from the response store.
+   * @param expirationTimeMs unix timestamp when the response expires in the 
response store
+   */
+  void setExpirationTimeMs(long expirationTimeMs);
+
+  /**
+   * Get the expiration time (unix timestamp) when the response will be 
deleted from the response store.
+   * @return  expirationTimeMs unix timestamp when the response expires in the 
response store
+   */
+  long getExpirationTimeMs();
+
+  /**
+   * Set the number of rows in the result set. This is required because 
BrokerResponse checks the ResultTable
+   * to get the number of rows. However the ResultTable is set to null in 
CursorResponse. So the numRowsResultSet has to
+   * be remembered.
+   * @param numRowsResultSet Number of rows in the result set.
+   */
+  void setNumRowsResultSet(int numRowsResultSet);
+
+  /**
+   * Set the number of bytes written to the response store when storing the 
result table.
+   * @param bytesWritten Number of bytes written
+   */
+  void setBytesWritten(long bytesWritten);
+
+  /**
+   * Get the number of bytes written when storing the result table
+   * @return number of bytes written
+   */
+  long getBytesWritten();
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
new file mode 100644
index 0000000000..d4c2203749
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+
+
+@JsonPropertyOrder({
+    "resultTable", "numRowsResultSet", "partialResult", "exceptions", 
"numGroupsLimitReached", "timeUsedMs",
+    "requestId", "brokerId", "numDocsScanned", "totalDocs", 
"numEntriesScannedInFilter", "numEntriesScannedPostFilter",
+    "numServersQueried", "numServersResponded", "numSegmentsQueried", 
"numSegmentsProcessed", "numSegmentsMatched",
+    "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed", 
"numConsumingSegmentsMatched",
+    "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker", 
"numSegmentsPrunedByServer",
+    "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit", 
"numSegmentsPrunedByValue", "brokerReduceTimeMs",
+    "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", 
"offlineSystemActivitiesCpuTimeNs",
+    "realtimeSystemActivitiesCpuTimeNs", 
"offlineResponseSerializationCpuTimeNs",
+    "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", 
"realtimeTotalCpuTimeNs",
+    "explainPlanNumEmptyFilterSegments", 
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tableQueries",
+    // Fields specific to CursorResponse
+    "offset", "numRows", "cursorResultWriteTimeMs", "cursorFetchTimeMs", 
"submissionTimeMs", "expirationTimeMs",
+    "brokerHost", "brokerPort", "bytesWritten"
+})
+public class CursorResponseNative extends BrokerResponseNative implements 
CursorResponse {
+  private int _offset;
+  private int _numRows;
+  private long _cursorResultWriteTimeMs;
+  private long _cursorFetchTimeMs;
+  private long _submissionTimeMs;
+  private long _expirationTimeMs;
+  private String _brokerHost;
+  private int _brokerPort;
+  private long _bytesWritten;
+
+  public CursorResponseNative() {
+  }
+
+  public CursorResponseNative(BrokerResponse response) {
+    // Copy all the member variables of BrokerResponse to CursorResponse.
+    setResultTable(response.getResultTable());
+    setNumRowsResultSet(response.getNumRowsResultSet());
+    setExceptions(response.getExceptions());
+    setNumGroupsLimitReached(response.isNumGroupsLimitReached());
+    setTimeUsedMs(response.getTimeUsedMs());
+    setRequestId(response.getRequestId());
+    setBrokerId(response.getBrokerId());
+    setNumDocsScanned(response.getNumDocsScanned());
+    setTotalDocs(response.getTotalDocs());
+    setNumEntriesScannedInFilter(response.getNumEntriesScannedInFilter());
+    setNumEntriesScannedPostFilter(response.getNumEntriesScannedPostFilter());
+    setNumServersQueried(response.getNumServersQueried());
+    setNumServersResponded(response.getNumServersResponded());
+    setNumSegmentsQueried(response.getNumSegmentsQueried());
+    setNumSegmentsProcessed(response.getNumSegmentsProcessed());
+    setNumSegmentsMatched(response.getNumSegmentsMatched());
+    setNumConsumingSegmentsQueried(response.getNumConsumingSegmentsQueried());
+    
setNumConsumingSegmentsProcessed(response.getNumConsumingSegmentsProcessed());
+    setNumConsumingSegmentsMatched(response.getNumConsumingSegmentsMatched());
+    setMinConsumingFreshnessTimeMs(response.getMinConsumingFreshnessTimeMs());
+    setNumSegmentsPrunedByBroker(response.getNumSegmentsPrunedByBroker());
+    setNumSegmentsPrunedByServer(response.getNumSegmentsPrunedByServer());
+    setNumSegmentsPrunedInvalid(response.getNumSegmentsPrunedInvalid());
+    setNumSegmentsPrunedByLimit(response.getNumSegmentsPrunedByLimit());
+    setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue());
+    setBrokerReduceTimeMs(response.getBrokerReduceTimeMs());
+    setOfflineThreadCpuTimeNs(response.getOfflineThreadCpuTimeNs());
+    setRealtimeThreadCpuTimeNs(response.getRealtimeThreadCpuTimeNs());
+    
setOfflineSystemActivitiesCpuTimeNs(response.getOfflineSystemActivitiesCpuTimeNs());
+    
setRealtimeSystemActivitiesCpuTimeNs(response.getRealtimeSystemActivitiesCpuTimeNs());
+    
setOfflineResponseSerializationCpuTimeNs(response.getOfflineResponseSerializationCpuTimeNs());
+    
setRealtimeResponseSerializationCpuTimeNs(response.getRealtimeResponseSerializationCpuTimeNs());
+    
setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments());
+    
setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
+    setTraceInfo(response.getTraceInfo());
+    setTablesQueried(response.getTablesQueried());
+  }
+
+  @Override
+  public String getBrokerHost() {
+    return _brokerHost;
+  }
+
+  @Override
+  public void setBrokerHost(String brokerHost) {
+    _brokerHost = brokerHost;
+  }
+
+  @Override
+  public int getBrokerPort() {
+    return _brokerPort;
+  }
+
+  @Override
+  public void setBrokerPort(int brokerPort) {
+    _brokerPort = brokerPort;
+  }
+
+  @Override
+  public void setOffset(int offset) {
+    _offset = offset;
+  }
+
+  @Override
+  public void setNumRows(int numRows) {
+    _numRows = numRows;
+  }
+
+  @Override
+  public void setCursorFetchTimeMs(long cursorFetchTimeMs) {
+    _cursorFetchTimeMs = cursorFetchTimeMs;
+  }
+
+  public long getSubmissionTimeMs() {
+    return _submissionTimeMs;
+  }
+
+  @Override
+  public void setSubmissionTimeMs(long submissionTimeMs) {
+    _submissionTimeMs = submissionTimeMs;
+  }
+
+  public long getExpirationTimeMs() {
+    return _expirationTimeMs;
+  }
+
+  @Override
+  public void setBytesWritten(long bytesWritten) {
+    _bytesWritten = bytesWritten;
+  }
+
+  @Override
+  public long getBytesWritten() {
+    return _bytesWritten;
+  }
+
+  @Override
+  public void setExpirationTimeMs(long expirationTimeMs) {
+    _expirationTimeMs = expirationTimeMs;
+  }
+
+  @Override
+  public int getOffset() {
+    return _offset;
+  }
+
+  @Override
+  public int getNumRows() {
+    return _numRows;
+  }
+
+  @Override
+  public long getCursorResultWriteTimeMs() {
+    return _cursorResultWriteTimeMs;
+  }
+
+  @Override
+  public void setCursorResultWriteTimeMs(long cursorResultWriteMs) {
+    _cursorResultWriteTimeMs = cursorResultWriteMs;
+  }
+
+  @Override
+  public long getCursorFetchTimeMs() {
+    return _cursorFetchTimeMs;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 8dbd4bb402..32f97a0c14 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -190,6 +190,15 @@ public class QueryOptionsUtils {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.USE_MULTISTAGE_ENGINE));
   }
 
+  public static boolean isGetCursor(Map<String, String> queryOptions) {
+    return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.GET_CURSOR));
+  }
+
+  public static Integer getCursorNumRows(Map<String, String> queryOptions) {
+    String cursorNumRows = queryOptions.get(QueryOptionKey.CURSOR_NUM_ROWS);
+    return checkedParseIntPositive(QueryOptionKey.CURSOR_NUM_ROWS, 
cursorNumRows);
+  }
+
   public static Optional<Boolean> isExplainAskingServers(Map<String, String> 
queryOptions) {
     String value = queryOptions.get(QueryOptionKey.EXPLAIN_ASKING_SERVERS);
     if (value == null) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 0326f97a7b..171e850638 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -91,6 +91,7 @@ import 
org.apache.pinot.controller.api.access.AccessControlFactory;
 import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
 import org.apache.pinot.controller.api.resources.ControllerFilePathProvider;
 import 
org.apache.pinot.controller.api.resources.InvalidControllerConfigException;
+import org.apache.pinot.controller.cursors.ResponseStoreCleaner;
 import org.apache.pinot.controller.helix.RealtimeConsumerMonitor;
 import org.apache.pinot.controller.helix.SegmentStatusChecker;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -895,6 +896,10 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
         new TaskMetricsEmitter(_helixResourceManager, 
_helixTaskResourceManager, _leadControllerManager, _config,
             _controllerMetrics);
     periodicTasks.add(_taskMetricsEmitter);
+    PeriodicTask responseStoreCleaner = new ResponseStoreCleaner(_config, 
_helixResourceManager, _leadControllerManager,
+        _controllerMetrics, _executorService, _connectionManager);
+    periodicTasks.add(responseStoreCleaner);
+
     return periodicTasks;
   }
 
@@ -977,4 +982,13 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
   protected ControllerAdminApiApplication createControllerAdminApp() {
     return new ControllerAdminApiApplication(_config);
   }
+
+  /**
+   * Return the PeriodicTaskScheduler instance so that the periodic tasks can 
be tested.
+   * @return PeriodicTaskScheduler.
+   */
+  @VisibleForTesting
+  public PeriodicTaskScheduler getPeriodicTaskScheduler() {
+    return _periodicTaskScheduler;
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 612aa9bafe..f7d0deb137 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -51,6 +51,7 @@ public class ControllerConf extends PinotConfiguration {
   public static final String CONTROLLER_BROKER_PROTOCOL = 
"controller.broker.protocol";
   public static final String CONTROLLER_BROKER_PORT_OVERRIDE = 
"controller.broker.port.override";
   public static final String CONTROLLER_BROKER_TLS_PREFIX = 
"controller.broker.tls";
+  public static final String CONTROLLER_BROKER_AUTH_PREFIX = 
"controller.broker.auth";
   public static final String CONTROLLER_TLS_PREFIX = "controller.tls";
   public static final String CONTROLLER_HOST = "controller.host";
   public static final String CONTROLLER_PORT = "controller.port";
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
new file mode 100644
index 0000000000..220533d235
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ResponseStoreCleaner periodically gets all responses stored in a response 
store and deletes the ones that have
+ * expired. From each broker, tt gets the list of responses. Each of the 
response has an expiration unix timestamp.
+ * If the current timestamp is greater, it calls a DELETE API for every 
response that has expired.
+ */
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ResponseStoreCleaner.class);
+  private static final int TIMEOUT_MS = 3000;
+  private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
+  private static final String DELETE_QUERY_RESULT = 
"%s://%s:%d/responseStore/%s";
+  // Used in tests to trigger the delete instead of waiting for the wall clock 
to move to an appropriate time.
+  public static final String CLEAN_AT_TIME = 
"response.store.cleaner.clean.at.ms";
+  private final ControllerConf _controllerConf;
+  private final Executor _executor;
+  private final PoolingHttpClientConnectionManager _connectionManager;
+  private final AuthProvider _authProvider;
+
+  public ResponseStoreCleaner(ControllerConf config, PinotHelixResourceManager 
pinotHelixResourceManager,
+      LeadControllerManager leadControllerManager, ControllerMetrics 
controllerMetrics, Executor executor,
+      PoolingHttpClientConnectionManager connectionManager) {
+    super("ResponseStoreCleaner", getFrequencyInSeconds(config), 
getInitialDelayInSeconds(config),
+        pinotHelixResourceManager, leadControllerManager, controllerMetrics);
+    _controllerConf = config;
+    _executor = executor;
+    _connectionManager = connectionManager;
+    _authProvider =
+        AuthProviderUtils.extractAuthProvider(config, 
ControllerConf.CONTROLLER_BROKER_AUTH_PREFIX);
+  }
+
+  private static long getInitialDelayInSeconds(ControllerConf config) {
+    long initialDelay = config.getPeriodicTaskInitialDelayInSeconds();
+    String responseStoreCleanerTaskInitialDelay =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_INITIAL_DELAY);
+    if (responseStoreCleanerTaskInitialDelay != null) {
+      initialDelay = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskInitialDelay),
+          TimeUnit.MILLISECONDS);
+    }
+    return initialDelay;
+  }
+
+  private static long getFrequencyInSeconds(ControllerConf config) {
+    long frequencyInSeconds = TimeUnit.SECONDS.convert(
+        
TimeUtils.convertPeriodToMillis(CommonConstants.CursorConfigs.DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD),
+        TimeUnit.MILLISECONDS);
+    String responseStoreCleanerTaskPeriod =
+        
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD);
+    if (responseStoreCleanerTaskPeriod != null) {
+      frequencyInSeconds = 
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskPeriod),
+          TimeUnit.MILLISECONDS);
+    }
+
+    return frequencyInSeconds;
+  }
+
+  @Override
+  protected void processTables(List<String> tableNamesWithType, Properties 
periodicTaskProperties) {
+    long cleanAtMs = System.currentTimeMillis();
+    String cleanAtMsStr = periodicTaskProperties.getProperty(CLEAN_AT_TIME);
+    if (cleanAtMsStr != null) {
+      cleanAtMs = Long.parseLong(cleanAtMsStr);
+    }
+    doClean(cleanAtMs);
+  }
+
+  public void doClean(long currentTime) {
+    List<InstanceConfig> brokerList = 
_pinotHelixResourceManager.getAllBrokerInstanceConfigs();
+    Map<String, InstanceInfo> brokers = new HashMap<>();
+    for (InstanceConfig broker : brokerList) {
+      brokers.put(getInstanceKey(broker.getHostName(), broker.getPort()),
+          new InstanceInfo(broker.getInstanceName(), broker.getHostName(), 
Integer.parseInt(broker.getPort())));
+    }
+
+    try {
+      Map<String, String> requestHeaders = 
AuthProviderUtils.makeAuthHeadersMap(_authProvider);
+
+      Map<String, List<CursorResponseNative>> brokerCursorsMap = 
getAllQueryResults(brokers, requestHeaders);
+
+      String protocol = _controllerConf.getControllerBrokerProtocol();
+      int portOverride = _controllerConf.getControllerBrokerPortOverride();
+
+      List<String> brokerUrls = new ArrayList<>();
+      for (Map.Entry<String, List<CursorResponseNative>> entry : 
brokerCursorsMap.entrySet()) {
+        for (CursorResponse response : entry.getValue()) {
+          if (response.getExpirationTimeMs() <= currentTime) {
+            InstanceInfo broker = brokers.get(entry.getKey());
+            int port = portOverride > 0 ? portOverride : broker.getPort();
+            brokerUrls.add(
+                String.format(DELETE_QUERY_RESULT, protocol, broker.getHost(), 
port, response.getRequestId()));
+          }
+        }
+        Map<String, String> deleteStatus = getResponseMap(requestHeaders, 
brokerUrls, "DELETE", HttpDelete::new);
+
+        deleteStatus.forEach(
+            (key, value) -> LOGGER.info("ResponseStore delete response - 
Broker: {}. Response: {}", key, value));
+      }
+    } catch (Exception e) {
+      LOGGER.error(e.getMessage());
+    }
+  }
+
+  private Map<String, List<CursorResponseNative>> 
getAllQueryResults(Map<String, InstanceInfo> brokers,
+      Map<String, String> requestHeaders)
+      throws Exception {
+    String protocol = _controllerConf.getControllerBrokerProtocol();
+    int portOverride = _controllerConf.getControllerBrokerPortOverride();
+    List<String> brokerUrls = new ArrayList<>();
+    for (InstanceInfo broker : brokers.values()) {
+      int port = portOverride > 0 ? portOverride : broker.getPort();
+      brokerUrls.add(String.format(QUERY_RESULT_STORE, protocol, 
broker.getHost(), port));
+    }
+    LOGGER.debug("Getting running queries via broker urls: {}", brokerUrls);
+    Map<String, String> strResponseMap = getResponseMap(requestHeaders, 
brokerUrls, "GET", HttpGet::new);
+    return 
strResponseMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, 
e -> {
+      try {
+        return JsonUtils.stringToObject(e.getValue(), new TypeReference<>() {
+        });
+      } catch (IOException ex) {
+        throw new RuntimeException(ex);
+      }
+    }));
+  }
+
+  private <T extends HttpUriRequestBase> Map<String, String> 
getResponseMap(Map<String, String> requestHeaders,
+      List<String> brokerUrls, String methodName, Function<String, T> 
httpRequestBaseSupplier)
+      throws Exception {
+    List<Pair<String, String>> urlsAndRequestBodies = new 
ArrayList<>(brokerUrls.size());
+    brokerUrls.forEach((url) -> urlsAndRequestBodies.add(Pair.of(url, "")));
+
+    CompletionService<MultiHttpRequestResponse> completionService =
+        new MultiHttpRequest(_executor, 
_connectionManager).execute(urlsAndRequestBodies, requestHeaders,
+            ResponseStoreCleaner.TIMEOUT_MS, methodName, 
httpRequestBaseSupplier);
+    Map<String, String> responseMap = new HashMap<>();
+    List<String> errMessages = new ArrayList<>(brokerUrls.size());
+    for (int i = 0; i < brokerUrls.size(); i++) {
+      try (MultiHttpRequestResponse httpRequestResponse = 
completionService.take().get()) {
+        // The completion order is different from brokerUrls, thus use uri in 
the response.
+        URI uri = httpRequestResponse.getURI();
+        int status = httpRequestResponse.getResponse().getCode();
+        String responseString = 
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+        // Unexpected server responses are collected and returned as exception.
+        if (status != 200) {
+          throw new Exception(
+              String.format("Unexpected status=%d and response='%s' from 
uri='%s'", status, responseString, uri));
+        }
+        responseMap.put((getInstanceKey(uri.getHost(), 
Integer.toString(uri.getPort()))), responseString);
+      } catch (Exception e) {
+        LOGGER.error("Failed to execute {} op. ", methodName, e);
+        // Can't just throw exception from here as there is a need to release 
the other connections.
+        // So just collect the error msg to throw them together after the 
for-loop.
+        errMessages.add(e.getMessage());
+      }
+    }
+    if (!errMessages.isEmpty()) {
+      throw new Exception("Unexpected responses from brokers: " + 
StringUtils.join(errMessages, ","));
+    }
+    return responseMap;
+  }
+
+  private static String getInstanceKey(String hostname, String port) {
+    return hostname + ":" + port;
+  }
+}
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
index e3014b82a8..305c0a26a0 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
@@ -57,7 +57,7 @@ public class ControllerPeriodicTaskStarterStatelessTest 
extends ControllerTest {
   }
 
   private class MockControllerStarter extends ControllerStarter {
-    private static final int NUM_PERIODIC_TASKS = 11;
+    private static final int NUM_PERIODIC_TASKS = 12;
 
     public MockControllerStarter() {
       super();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index d92ee5f1b4..96e4f27790 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -97,6 +97,8 @@ public class Actions {
     public static final String UPLOAD_SEGMENT = "UploadSegment";
     public static final String GET_INSTANCE_PARTITIONS = 
"GetInstancePartitions";
     public static final String UPDATE_INSTANCE_PARTITIONS = 
"UpdateInstancePartitions";
+    public static final String GET_RESPONSE_STORE = "GetResponseStore";
+    public static final String DELETE_RESPONSE_STORE = "DeleteResponseStore";
   }
 
   // Action names for table
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorFsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorFsIntegrationTest.java
new file mode 100644
index 0000000000..6dac55deca
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorFsIntegrationTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class CursorFsIntegrationTest extends CursorIntegrationTest {
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration configuration) {
+    
configuration.setProperty(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE
 + ".protocol", "file");
+    File tmpPath = new File(_tempDir, "tmp");
+    File dataPath = new File(_tempDir, "data");
+    
configuration.setProperty(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE
 + ".file.temp.dir",
+        tmpPath);
+    configuration.setProperty(
+        CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE + 
".file.data.dir", "file://" + dataPath);
+  }
+
+  @Override
+  protected Object[][] getPageSizesAndQueryEngine() {
+    return new Object[][]{
+        {false, 1000}, {false, 0}, // 0 triggers default behaviour
+        {true, 1000}, {true, 0}, // 0 triggers default behaviour
+    };
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorIntegrationTest.java
new file mode 100644
index 0000000000..116654395f
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorIntegrationTest.java
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.cursors.ResponseStoreCleaner;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class CursorIntegrationTest extends BaseClusterIntegrationTestSet {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CursorIntegrationTest.class);
+  private static final int NUM_OFFLINE_SEGMENTS = 8;
+  private static final int COUNT_STAR_RESULT = 79003;
+  private static final String TEST_QUERY_ONE =
+      "SELECT SUM(CAST(CAST(ArrTime AS varchar) AS LONG)) FROM mytable WHERE 
DaysSinceEpoch <> 16312 AND Carrier = "
+          + "'DL'";
+  private static final String TEST_QUERY_TWO =
+      "SELECT CAST(CAST(ArrTime AS varchar) AS LONG) FROM mytable WHERE 
DaysSinceEpoch <> 16312 AND Carrier = 'DL' "
+          + "ORDER BY ArrTime DESC";
+  private static final String TEST_QUERY_THREE =
+      "SELECT ArrDelay, CarrierDelay, (ArrDelay - CarrierDelay) AS diff FROM 
mytable WHERE ArrDelay > CarrierDelay "
+          + "ORDER BY diff, ArrDelay, CarrierDelay LIMIT 100000";
+  private static final String EMPTY_RESULT_QUERY =
+      "SELECT SUM(CAST(CAST(ArrTime AS varchar) AS LONG)) FROM mytable WHERE 
DaysSinceEpoch <> 16312 AND 1 != 1";
+
+  private static int _resultSize;
+
+  @Override
+  protected void overrideControllerConf(Map<String, Object> properties) {
+    
properties.put(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD,
 "5m");
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration configuration) {
+    
configuration.setProperty(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE
 + ".type", "memory");
+  }
+
+  protected long getCountStarResult() {
+    return COUNT_STAR_RESULT;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start Zk, Kafka and Pinot
+    startZk();
+    startController();
+    startBroker();
+    startServer();
+
+    List<File> avroFiles = getAllAvroFiles();
+    List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles, 
NUM_OFFLINE_SEGMENTS);
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    getControllerRequestClient().addSchema(schema);
+    TableConfig offlineTableConfig = createOfflineTableConfig();
+    addTableConfig(offlineTableConfig);
+
+    // Create and upload segments
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, 
offlineTableConfig, schema, 0, _segmentDir,
+        _tarDir);
+    uploadSegments(getTableName(), _tarDir);
+
+    // Initialize the query generator
+    setUpQueryGenerator(avroFiles);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(100_000L);
+  }
+
+  protected String getBrokerGetAllResponseStoresApiUrl(String 
brokerBaseApiUrl) {
+    return brokerBaseApiUrl + "/responseStore";
+  }
+
+  protected String getBrokerResponseApiUrl(String brokerBaseApiUrl, String 
requestId) {
+    return getBrokerGetAllResponseStoresApiUrl(brokerBaseApiUrl) + "/" + 
requestId + "/results";
+  }
+
+  protected String getBrokerDeleteResponseStoresApiUrl(String 
brokerBaseApiUrl, String requestId) {
+    return getBrokerGetAllResponseStoresApiUrl(brokerBaseApiUrl) + "/" + 
requestId;
+  }
+
+  protected String getCursorQueryProperties(int numRows) {
+    return String.format("?getCursor=true&numRows=%d", numRows);
+  }
+
+  protected String getCursorOffset(int offset) {
+    return String.format("?offset=%d", offset);
+  }
+
+  protected String getCursorOffset(int offset, int numRows) {
+    return String.format("?offset=%d&numRows=%d", offset, numRows);
+  }
+
+  protected Map<String, String> getHeaders() {
+    return Collections.emptyMap();
+  }
+
+  /*
+   * This test does not use H2 to compare results. Instead, it compares 
results got from iterating through a
+   * cursor AND the complete result set.
+   * Right now, it only compares the number of rows and all columns and rows.
+   */
+  @Override
+  protected void testQuery(String pinotQuery, String h2Query)
+      throws Exception {
+    String queryResourceUrl = getBrokerBaseApiUrl();
+    Map<String, String> headers = getHeaders();
+    Map<String, String> extraJsonProperties = getExtraQueryProperties();
+
+    // Get Pinot BrokerResponse without cursors
+    JsonNode pinotResponse;
+    pinotResponse = ClusterTest.postQuery(pinotQuery,
+        ClusterIntegrationTestUtils.getBrokerQueryApiUrl(queryResourceUrl, 
useMultiStageQueryEngine()), headers,
+        extraJsonProperties);
+    if (!pinotResponse.get("exceptions").isEmpty()) {
+      throw new RuntimeException("Got Exceptions from Query Response: " + 
pinotResponse);
+    }
+    int brokerResponseSize = pinotResponse.get("numRowsResultSet").asInt();
+
+    // Get a list of responses using cursors.
+    CursorResponse pinotPagingResponse;
+    pinotPagingResponse = 
JsonUtils.jsonNodeToObject(ClusterTest.postQuery(pinotQuery,
+        ClusterIntegrationTestUtils.getBrokerQueryApiUrl(queryResourceUrl, 
useMultiStageQueryEngine())
+            + getCursorQueryProperties(_resultSize), headers, 
getExtraQueryProperties()), CursorResponseNative.class);
+    if (!pinotPagingResponse.getExceptions().isEmpty()) {
+      throw new RuntimeException("Got Exceptions from Query Response: " + 
pinotPagingResponse.getExceptions().get(0));
+    }
+    List<CursorResponse> resultPages = getAllResultPages(queryResourceUrl, 
headers, pinotPagingResponse, _resultSize);
+
+    int brokerPagingResponseSize = 0;
+    for (CursorResponse response : resultPages) {
+      brokerPagingResponseSize += response.getNumRows();
+    }
+
+    // Compare the number of rows.
+    if (brokerResponseSize != brokerPagingResponseSize) {
+      throw new RuntimeException(
+          "Pinot # of rows from paging API " + brokerPagingResponseSize + " 
doesn't match # of rows from default API "
+              + brokerResponseSize);
+    }
+  }
+
+  private List<CursorResponse> getAllResultPages(String queryResourceUrl, 
Map<String, String> headers,
+      CursorResponse firstResponse, int numRows)
+      throws Exception {
+    numRows = numRows == 0 ? 
CommonConstants.CursorConfigs.DEFAULT_CURSOR_FETCH_ROWS : numRows;
+
+    List<CursorResponse> resultPages = new ArrayList<>();
+    resultPages.add(firstResponse);
+    int totalRows = firstResponse.getNumRowsResultSet();
+
+    int offset = firstResponse.getNumRows();
+    while (offset < totalRows) {
+      CursorResponse response = 
JsonUtils.stringToObject(ClusterTest.sendGetRequest(
+          getBrokerResponseApiUrl(queryResourceUrl, 
firstResponse.getRequestId()) + getCursorOffset(offset, numRows),
+          headers), CursorResponseNative.class);
+      resultPages.add(response);
+      offset += response.getNumRows();
+    }
+    return resultPages;
+  }
+
+  protected Object[][] getPageSizesAndQueryEngine() {
+    return new Object[][]{
+        {false, 2}, {false, 3}, {false, 10}, {false, 0}, //0 trigger default 
behaviour
+        {true, 2}, {true, 3}, {true, 10}, {true, 0} //0 trigger default 
behaviour
+    };
+  }
+
+  @DataProvider(name = "pageSizeAndQueryEngineProvider")
+  public Object[][] pageSizeAndQueryEngineProvider() {
+    return getPageSizesAndQueryEngine();
+  }
+
+  // Test hard coded queries with SSE/MSE AND different cursor response sizes.
+  @Test(dataProvider = "pageSizeAndQueryEngineProvider")
+  public void testHardcodedQueries(boolean useMultiStageEngine, int pageSize)
+      throws Exception {
+    _resultSize = pageSize;
+    setUseMultiStageQueryEngine(useMultiStageEngine);
+    super.testHardcodedQueries();
+  }
+
+  // Test a simple cursor workflow.
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testCursorWorkflow(boolean useMultiStageQueryEngine)
+      throws Exception {
+    _resultSize = 10000;
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    // Submit query
+    CursorResponse pinotPagingResponse;
+    JsonNode jsonNode = ClusterTest.postQuery(TEST_QUERY_THREE,
+        
ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(), 
useMultiStageQueryEngine())
+            + getCursorQueryProperties(_resultSize), getHeaders(), 
getExtraQueryProperties());
+
+    pinotPagingResponse = JsonUtils.jsonNodeToObject(jsonNode, 
CursorResponseNative.class);
+    if (!pinotPagingResponse.getExceptions().isEmpty()) {
+      throw new RuntimeException("Got Exceptions from Query Response: " + 
pinotPagingResponse.getExceptions().get(0));
+    }
+    String requestId = pinotPagingResponse.getRequestId();
+
+    Assert.assertFalse(pinotPagingResponse.getBrokerHost().isEmpty());
+    Assert.assertTrue(pinotPagingResponse.getBrokerPort() > 0);
+    Assert.assertTrue(pinotPagingResponse.getCursorFetchTimeMs() >= 0);
+    Assert.assertTrue(pinotPagingResponse.getCursorResultWriteTimeMs() >= 0);
+
+    int totalRows = pinotPagingResponse.getNumRowsResultSet();
+    int offset = pinotPagingResponse.getNumRows();
+    while (offset < totalRows) {
+      pinotPagingResponse = 
JsonUtils.stringToObject(ClusterTest.sendGetRequest(
+          getBrokerResponseApiUrl(getBrokerBaseApiUrl(), requestId) + 
getCursorOffset(offset, _resultSize),
+          getHeaders()), CursorResponseNative.class);
+
+      Assert.assertFalse(pinotPagingResponse.getBrokerHost().isEmpty());
+      Assert.assertTrue(pinotPagingResponse.getBrokerPort() > 0);
+      Assert.assertTrue(pinotPagingResponse.getCursorFetchTimeMs() >= 0);
+      offset += _resultSize;
+    }
+    
ClusterTest.sendDeleteRequest(getBrokerDeleteResponseStoresApiUrl(getBrokerBaseApiUrl(),
 requestId), getHeaders());
+  }
+
+  @Test
+  public void testGetAndDelete()
+      throws Exception {
+    _resultSize = 100000;
+    testQuery(TEST_QUERY_ONE);
+    testQuery(TEST_QUERY_TWO);
+
+    List<CursorResponseNative> requestIds = JsonUtils.stringToObject(
+        
ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()),
 getHeaders()),
+        new TypeReference<>() {
+        });
+
+    Assert.assertEquals(requestIds.size(), 2);
+
+    // Delete the first one
+    String deleteRequestId = requestIds.get(0).getRequestId();
+    
ClusterTest.sendDeleteRequest(getBrokerDeleteResponseStoresApiUrl(getBrokerBaseApiUrl(),
 deleteRequestId),
+        getHeaders());
+
+    requestIds = JsonUtils.stringToObject(
+        
ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()),
 getHeaders()),
+        new TypeReference<>() {
+        });
+
+    Assert.assertEquals(requestIds.size(), 1);
+    Assert.assertNotEquals(requestIds.get(0).getRequestId(), deleteRequestId);
+  }
+
+  @Test
+  public void testBadGet() {
+    try {
+      
ClusterTest.sendGetRequest(getBrokerResponseApiUrl(getBrokerBaseApiUrl(), 
"dummy") + getCursorOffset(0),
+          getHeaders());
+    } catch (IOException e) {
+      HttpErrorStatusException h = (HttpErrorStatusException) e.getCause();
+      Assert.assertEquals(h.getStatusCode(), 404);
+      Assert.assertTrue(h.getMessage().contains("Query results for dummy not 
found"));
+    }
+  }
+
+  @Test
+  public void testBadDelete() {
+    try {
+      
ClusterTest.sendDeleteRequest(getBrokerDeleteResponseStoresApiUrl(getBrokerBaseApiUrl(),
 "dummy"), getHeaders());
+    } catch (IOException e) {
+      HttpErrorStatusException h = (HttpErrorStatusException) e.getCause();
+      Assert.assertEquals(h.getStatusCode(), 404);
+      Assert.assertTrue(h.getMessage().contains("Query results for dummy not 
found"));
+    }
+  }
+
+  @Test
+  public void testQueryWithEmptyResult()
+      throws Exception {
+    JsonNode pinotResponse = ClusterTest.postQuery(EMPTY_RESULT_QUERY,
+        
ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(), 
useMultiStageQueryEngine())
+            + getCursorQueryProperties(1000), getHeaders(), 
getExtraQueryProperties());
+
+    // There should be no resultTable.
+    Assert.assertNull(pinotResponse.get("resultTable"));
+    // Total Rows in result set should be 0.
+    Assert.assertEquals(pinotResponse.get("numRowsResultSet").asInt(), 0);
+    // Rows in the current response should be 0
+    Assert.assertEquals(pinotResponse.get("numRows").asInt(), 0);
+    Assert.assertTrue(pinotResponse.get("exceptions").isEmpty());
+  }
+
+  @DataProvider(name = "InvalidOffsetQueryProvider")
+  public Object[][] invalidOffsetQueryProvider() {
+    return new Object[][]{{TEST_QUERY_ONE}, {EMPTY_RESULT_QUERY}};
+  }
+
+  @Test(dataProvider = "InvalidOffsetQueryProvider", expectedExceptions = 
IOException.class,
+      expectedExceptionsMessageRegExp = ".*Offset \\d+ should be lesser than 
totalRecords \\d+.*")
+  public void testGetInvalidOffset(String query)
+      throws Exception {
+    CursorResponse pinotPagingResponse;
+    pinotPagingResponse = 
JsonUtils.jsonNodeToObject(ClusterTest.postQuery(query,
+        
ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(), 
useMultiStageQueryEngine())
+            + getCursorQueryProperties(_resultSize), getHeaders(), 
getExtraQueryProperties()),
+        CursorResponseNative.class);
+    Assert.assertTrue(pinotPagingResponse.getExceptions().isEmpty());
+    ClusterTest.sendGetRequest(
+        getBrokerResponseApiUrl(getBrokerBaseApiUrl(), 
pinotPagingResponse.getRequestId()) + getCursorOffset(
+            pinotPagingResponse.getNumRowsResultSet() + 1), getHeaders());
+  }
+
+  @Test
+  public void testQueryWithRuntimeError()
+      throws Exception {
+    String queryWithFromMissing = "SELECT * mytable limit 100";
+    JsonNode pinotResponse;
+    pinotResponse = ClusterTest.postQuery(queryWithFromMissing,
+        
ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(), 
useMultiStageQueryEngine())
+            + getCursorQueryProperties(_resultSize), getHeaders(), 
getExtraQueryProperties());
+    Assert.assertFalse(pinotResponse.get("exceptions").isEmpty());
+    JsonNode exception = pinotResponse.get("exceptions").get(0);
+    
Assert.assertTrue(exception.get("message").asText().startsWith("QueryValidationError:"));
+    Assert.assertEquals(exception.get("errorCode").asInt(), 700);
+    
Assert.assertTrue(pinotResponse.get("brokerId").asText().startsWith("Broker_"));
+    // There should be no resultTable.
+    Assert.assertNull(pinotResponse.get("resultTable"));
+  }
+
+  @Test
+  public void testResponseStoreCleaner()
+      throws Exception {
+    List<CursorResponseNative> requestIds = JsonUtils.stringToObject(
+        
ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()),
 getHeaders()),
+        new TypeReference<>() {
+        });
+
+    int numQueryResults = requestIds.size();
+
+    _resultSize = 100000;
+    this.testQuery(TEST_QUERY_ONE);
+    // Sleep so that both the queries do not have the same submission time.
+    Thread.sleep(50);
+    this.testQuery(TEST_QUERY_TWO);
+
+    requestIds = JsonUtils.stringToObject(
+        
ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()),
 getHeaders()),
+        new TypeReference<>() {
+        });
+
+    int numQueryResultsAfter = requestIds.size();
+    Assert.assertEquals(requestIds.size() - numQueryResults, 2);
+
+    CursorResponseNative cursorResponse0 = JsonUtils.stringToObject(
+        
ClusterTest.sendGetRequest(getBrokerResponseApiUrl(getBrokerBaseApiUrl(), 
requestIds.get(0).getRequestId()),
+            getHeaders()), new TypeReference<>() {
+        });
+
+    CursorResponseNative cursorResponse1 = JsonUtils.stringToObject(
+        
ClusterTest.sendGetRequest(getBrokerResponseApiUrl(getBrokerBaseApiUrl(), 
requestIds.get(1).getRequestId()),
+            getHeaders()), new TypeReference<>() {
+        });
+
+    // Get the lower submission time.
+    long expirationTime0 = cursorResponse0.getExpirationTimeMs();
+    long expirationTime1 = cursorResponse1.getExpirationTimeMs();
+
+    Properties perodicTaskProperties = new Properties();
+    perodicTaskProperties.setProperty("requestId", "CursorIntegrationTest");
+    perodicTaskProperties.setProperty(ResponseStoreCleaner.CLEAN_AT_TIME,
+        Long.toString(Math.min(expirationTime0, expirationTime1)));
+    
_controllerStarter.getPeriodicTaskScheduler().scheduleNow("ResponseStoreCleaner",
 perodicTaskProperties);
+
+    // The periodic task is run in an executor thread. Give the thread some 
time to run the cleaner.
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        List<CursorResponse> getNumQueryResults = JsonUtils.stringToObject(
+            
ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()),
 getHeaders()),
+            List.class);
+        return getNumQueryResults.size() < numQueryResultsAfter;
+      } catch (Exception e) {
+        LOGGER.error(e.getMessage());
+        return false;
+      }
+    }, 500L, 100_000L, "Failed to load delete query results", true);
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorWithAuthIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorWithAuthIntegrationTest.java
new file mode 100644
index 0000000000..ebac46edcf
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorWithAuthIntegrationTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.http.HttpStatus;
+import org.apache.pinot.client.Connection;
+import org.apache.pinot.client.ConnectionFactory;
+import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
+import org.apache.pinot.common.auth.UrlAuthProvider;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.Test;
+
+import static 
org.apache.pinot.integration.tests.BasicAuthTestUtils.AUTH_HEADER;
+import static org.apache.pinot.integration.tests.BasicAuthTestUtils.AUTH_TOKEN;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+@Test
+public class CursorWithAuthIntegrationTest extends CursorIntegrationTest {
+  final static String AUTH_PROVIDER_CLASS = 
UrlAuthProvider.class.getCanonicalName();
+  final static URL AUTH_URL = 
CursorWithAuthIntegrationTest.class.getResource("/url-auth-token.txt");
+  final static String AUTH_PREFIX = "Basic";
+
+  protected Object[][] getPageSizesAndQueryEngine() {
+    return new Object[][]{
+        {false, 1000},
+        {true, 1000}
+    };
+  }
+
+  @Override
+  protected void overrideControllerConf(Map<String, Object> properties) {
+    BasicAuthTestUtils.addControllerConfiguration(properties);
+    properties.put("controller.segment.fetcher.auth.provider.class", 
AUTH_PROVIDER_CLASS);
+    properties.put("controller.segment.fetcher.auth.url", AUTH_URL);
+    properties.put("controller.segment.fetcher.auth.prefix", AUTH_PREFIX);
+    properties.put(ControllerConf.CONTROLLER_BROKER_AUTH_PREFIX + 
".provider.class", AUTH_PROVIDER_CLASS);
+    properties.put(ControllerConf.CONTROLLER_BROKER_AUTH_PREFIX + ".url", 
AUTH_URL);
+    properties.put(ControllerConf.CONTROLLER_BROKER_AUTH_PREFIX + ".prefix", 
AUTH_PREFIX);
+    
properties.put(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD,
 "5m");
+  }
+
+  @Override
+  protected void overrideBrokerConf(PinotConfiguration configuration) {
+    super.overrideBrokerConf(configuration);
+    BasicAuthTestUtils.addBrokerConfiguration(configuration);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    BasicAuthTestUtils.addServerConfiguration(serverConf);
+    serverConf.setProperty("pinot.server.segment.fetcher.auth.provider.class", 
AUTH_PROVIDER_CLASS);
+    serverConf.setProperty("pinot.server.segment.fetcher.auth.url", AUTH_URL);
+    serverConf.setProperty("pinot.server.segment.fetcher.auth.prefix", 
AUTH_PREFIX);
+    
serverConf.setProperty("pinot.server.segment.uploader.auth.provider.class", 
AUTH_PROVIDER_CLASS);
+    serverConf.setProperty("pinot.server.segment.uploader.auth.url", AUTH_URL);
+    serverConf.setProperty("pinot.server.segment.uploader.auth.prefix", 
AUTH_PREFIX);
+    serverConf.setProperty("pinot.server.instance.auth.provider.class", 
AUTH_PROVIDER_CLASS);
+    serverConf.setProperty("pinot.server.instance.auth.url", AUTH_URL);
+    serverConf.setProperty("pinot.server.instance.auth.prefix", AUTH_PREFIX);
+  }
+
+  @Override
+  protected Map<String, String> getHeaders() {
+    return BasicAuthTestUtils.AUTH_HEADER;
+  }
+
+  @Override
+  public ControllerRequestClient getControllerRequestClient() {
+    if (_controllerRequestClient == null) {
+      _controllerRequestClient =
+          new ControllerRequestClient(_controllerRequestURLBuilder, 
getHttpClient(), AUTH_HEADER);
+    }
+    return _controllerRequestClient;
+  }
+
+  @Override
+  protected Connection getPinotConnection() {
+    if (_pinotConnection == null) {
+      JsonAsyncHttpPinotClientTransportFactory factory = new 
JsonAsyncHttpPinotClientTransportFactory();
+      factory.setHeaders(AUTH_HEADER);
+
+      _pinotConnection =
+          ConnectionFactory.fromZookeeper(getZkUrl() + "/" + 
getHelixClusterName(), factory.buildTransport());
+    }
+    return _pinotConnection;
+  }
+
+  /**
+   * Upload all segments inside the given directories to the cluster.
+   */
+  @Override
+  protected void uploadSegments(String tableName, TableType tableType, 
List<File> tarDirs)
+      throws Exception {
+    List<File> segmentTarFiles = new ArrayList<>();
+    for (File tarDir : tarDirs) {
+      File[] tarFiles = tarDir.listFiles();
+      assertNotNull(tarFiles);
+      Collections.addAll(segmentTarFiles, tarFiles);
+    }
+    int numSegments = segmentTarFiles.size();
+    assertTrue(numSegments > 0);
+
+    URI uploadSegmentHttpURI = 
URI.create(getControllerRequestURLBuilder().forSegmentUpload());
+    NameValuePair
+        tableNameValuePair = new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, 
tableName);
+    NameValuePair tableTypeValuePair = new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
+        tableType.name());
+    List<NameValuePair> parameters = Arrays.asList(tableNameValuePair, 
tableTypeValuePair);
+    List<Header> headers = List.of(new BasicHeader("Authorization", 
AUTH_TOKEN));
+
+    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
+      if (numSegments == 1) {
+        File segmentTarFile = segmentTarFiles.get(0);
+        if (System.currentTimeMillis() % 2 == 0) {
+          assertEquals(
+              fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, 
segmentTarFile.getName(), segmentTarFile,
+                  headers, parameters, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode(), HttpStatus.SC_OK);
+        } else {
+          assertEquals(
+              uploadSegmentWithOnlyMetadata(tableName, tableType, 
uploadSegmentHttpURI, fileUploadDownloadClient,
+                  segmentTarFile), HttpStatus.SC_OK);
+        }
+      } else {
+        // Upload all segments in parallel
+        ExecutorService executorService = 
Executors.newFixedThreadPool(numSegments);
+        List<Future<Integer>> futures = new ArrayList<>(numSegments);
+        for (File segmentTarFile : segmentTarFiles) {
+          futures.add(executorService.submit(() -> {
+            if (System.currentTimeMillis() % 2 == 0) {
+              return 
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, 
segmentTarFile.getName(),
+                  segmentTarFile, headers, parameters, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
+            } else {
+              return uploadSegmentWithOnlyMetadata(tableName, tableType, 
uploadSegmentHttpURI, fileUploadDownloadClient,
+                  segmentTarFile);
+            }
+          }));
+        }
+        executorService.shutdown();
+        for (Future<Integer> future : futures) {
+          assertEquals((int) future.get(), HttpStatus.SC_OK);
+        }
+      }
+    }
+  }
+
+  private int uploadSegmentWithOnlyMetadata(String tableName, TableType 
tableType, URI uploadSegmentHttpURI,
+      FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile)
+      throws IOException, HttpErrorStatusException {
+    List<Header> headers = List.of(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
+            String.format("file://%s/%s", 
segmentTarFile.getParentFile().getAbsolutePath(),
+                URIUtils.encode(segmentTarFile.getName()))),
+        new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+            FileUploadDownloadClient.FileUploadType.METADATA.toString()),
+        new BasicHeader("Authorization", AUTH_TOKEN));
+    // Add table name and table type as request parameters
+    NameValuePair tableNameValuePair =
+        new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, 
tableName);
+    NameValuePair tableTypeValuePair =
+        new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, 
tableType.name());
+    List<NameValuePair> parameters = Arrays.asList(tableNameValuePair, 
tableTypeValuePair);
+    return 
fileUploadDownloadClient.uploadSegmentMetadata(uploadSegmentHttpURI, 
segmentTarFile.getName(),
+        segmentTarFile, headers, parameters, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/cursors/MemoryResponseStore.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/cursors/MemoryResponseStore.java
new file mode 100644
index 0000000000..e8cb3fb24e
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/cursors/MemoryResponseStore.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.cursors;
+
+import com.google.auto.service.AutoService;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+@AutoService(ResponseStore.class)
+public class MemoryResponseStore extends AbstractResponseStore {
+  private final Map<String, CursorResponse> _cursorResponseMap = new 
HashMap<>();
+  private final Map<String, ResultTable> _resultTableMap = new HashMap<>();
+
+  private static final String TYPE = "memory";
+
+  @Override
+  public String getType() {
+    return TYPE;
+  }
+
+  @Override
+  protected void writeResponse(String requestId, CursorResponse response) {
+    _cursorResponseMap.put(requestId, response);
+  }
+
+  @Override
+  protected long writeResultTable(String requestId, ResultTable resultTable) {
+    _resultTableMap.put(requestId, resultTable);
+    return 0;
+  }
+
+  @Override
+  public CursorResponse readResponse(String requestId) {
+    CursorResponse response = _cursorResponseMap.get(requestId);
+    CursorResponse responseCopy = new CursorResponseNative(response);
+
+    responseCopy.setBrokerHost(response.getBrokerHost());
+    responseCopy.setBrokerPort(response.getBrokerPort());
+    responseCopy.setSubmissionTimeMs(response.getSubmissionTimeMs());
+    responseCopy.setExpirationTimeMs(response.getExpirationTimeMs());
+    return responseCopy;
+  }
+
+  @Override
+  protected ResultTable readResultTable(String requestId, int offset, int 
numRows) {
+    CursorResponse response = _cursorResponseMap.get(requestId);
+    int totalTableRows = response.getNumRowsResultSet();
+    ResultTable resultTable = _resultTableMap.get(requestId);
+    int sliceEnd = offset + numRows;
+    if (sliceEnd > totalTableRows) {
+      sliceEnd = totalTableRows;
+    }
+
+    return new ResultTable(resultTable.getDataSchema(), 
resultTable.getRows().subList(offset, sliceEnd));
+  }
+
+  @Override
+  public void init(@NotNull PinotConfiguration config, @NotNull String 
brokerHost, int brokerPort, String brokerId,
+      @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+      throws Exception {
+    init(brokerHost, brokerPort, brokerId, brokerMetrics, expirationTime);
+  }
+
+  @Override
+  public boolean exists(String requestId)
+      throws Exception {
+    return _cursorResponseMap.containsKey(requestId) && 
_resultTableMap.containsKey(requestId);
+  }
+
+  @Override
+  public Collection<String> getAllStoredRequestIds() {
+    return _cursorResponseMap.keySet();
+  }
+
+  @Override
+  protected boolean deleteResponseImpl(String requestId) {
+    return _cursorResponseMap.remove(requestId) != null && 
_resultTableMap.remove(requestId) != null;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStore.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStore.java
new file mode 100644
index 0000000000..e02067045c
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStore.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.cursors;
+
+import java.util.Collection;
+
+
+/**
+ * ResponseStore stores the response of a query. It is identified by the 
request id of the query.
+ * There is one instance of a response store in every broker. An instance of 
the response store contains responses
+ * of queries submitted to that broker. An implementation of a response store 
may use a shared storage system.
+ * Regardless, a response store is expected to operate on responses created by 
it.
+ *
+ * Since BrokerResponse cannot be moved SPI package, some of the functions are 
declared in AbstractResponseStore
+ * <br/>
+ * Concurrency Model:
+ * <br/>
+ * There are 3 possible roles - writer, reader and delete.
+ * <br/>
+ * There can only be ONE writer and no other concurrent roles can execute.
+ * A response store is written during query execution. During execution, there 
can be no reads or deletes as the
+ * query id would not have been provided to the client.
+ * <br/>
+ * There can be multiple readers. There maybe concurrent deletes but no 
concurrent writes.
+ * Multiple clients can potentially iterate through the result set.
+ * <br/>
+ * There can be multiple deletes. There maybe concurrent reads but no 
concurrent writes.
+ * Multiple clients can potentially call the delete API.
+ * <br/>
+ * Implementations should ensure that concurrent read/delete and delete/delete 
operations are handled correctly.
+ */
+public interface ResponseStore {
+  /**
+   * Get the type of the ResponseStore
+   * @return Type of the store
+   */
+  String getType();
+
+  /**
+   * Checks if the response for a requestId exists.
+   * @param requestId The ID of the request
+   * @return True if response exists else false
+   * @throws Exception Thrown if an error occurs when checking if the response 
exists.
+   */
+  boolean exists(String requestId)
+    throws Exception;
+
+  /**
+   * Get all request ids of responses in the ResponseStore.
+   * Note that a broker should only return request ids that are created by it 
even if it has access to others in a
+   * shared storage.
+   * @return List of request ids
+   */
+  Collection<String> getAllStoredRequestIds()
+      throws Exception;
+
+  /**
+   * Delete a response.
+   *
+   * @param requestId Request id of the query.
+   * @return True if response was found and deleted.
+   * @throws Exception Exception is thrown if response cannot be deleted by 
response store.
+   */
+  boolean deleteResponse(String requestId)
+      throws Exception;
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStoreService.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStoreService.java
new file mode 100644
index 0000000000..7c4d2c94b0
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStoreService.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.cursors;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+
+
+public class ResponseStoreService {
+  private static volatile ResponseStoreService _instance = fromServiceLoader();
+
+  private final Set<ResponseStore> _allResponseStores;
+  private final Map<String, ResponseStore> _responseStoreByType;
+
+  private ResponseStoreService(Set<ResponseStore> storeSet) {
+    _allResponseStores = storeSet;
+    _responseStoreByType = new HashMap<>();
+
+    for (ResponseStore responseStore : storeSet) {
+      _responseStoreByType.put(responseStore.getType(), responseStore);
+    }
+  }
+
+  public static ResponseStoreService getInstance() {
+    return _instance;
+  }
+
+  public static void setInstance(ResponseStoreService service) {
+    _instance = service;
+  }
+
+  public static ResponseStoreService fromServiceLoader() {
+    Set<ResponseStore> storeSet = new HashSet<>();
+    for (ResponseStore responseStore : 
ServiceLoader.load(ResponseStore.class)) {
+      storeSet.add(responseStore);
+    }
+
+    return new ResponseStoreService(storeSet);
+  }
+
+  public Set<ResponseStore> getAllResponseStores() {
+    return _allResponseStores;
+  }
+
+  public Map<String, ResponseStore> getResponseStoresByType() {
+    return _responseStoreByType;
+  }
+
+  public ResponseStore getResponseStore(String type) {
+    ResponseStore responseStore = _responseStoreByType.get(type);
+
+    if (responseStore == null) {
+      throw new IllegalArgumentException("Unknown ResponseStore type: " + 
type);
+    }
+
+    return responseStore;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 67bd6191c3..8e27bbccef 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -499,6 +499,11 @@ public class CommonConstants {
         // possible.
         public static final String OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY 
=
             "optimizeMaxInitialResultHolderCapacity";
+
+        // Set to true if a cursor should be returned instead of the complete 
result set
+        public static final String GET_CURSOR = "getCursor";
+        // Number of rows that the cursor should contain
+        public static final String CURSOR_NUM_ROWS = "cursorNumRows";
       }
 
       public static class QueryOptionValue {
@@ -617,6 +622,8 @@ public class CommonConstants {
           CONFIG_PREFIX + ".stats.manager.threadpool.size";
       public static final int DEFAULT_STATS_MANAGER_THREADPOOL_SIZE = 2;
     }
+
+    public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = 
"pinot.broker.storage.factory";
   }
 
   public static class Server {
@@ -1315,4 +1322,21 @@ public class CommonConstants {
     public static final byte[][] BYTES_ARRAY = new byte[0][];
     public static final Object MAP = Collections.emptyMap();
   }
+
+  public static class CursorConfigs {
+    public static final String PREFIX_OF_CONFIG_OF_CURSOR = 
"pinot.broker.cursor";
+    public static final String PREFIX_OF_CONFIG_OF_RESPONSE_STORE = 
"pinot.broker.cursor.response.store";
+    public static final String DEFAULT_RESPONSE_STORE_TYPE = "file";
+    public static final String RESPONSE_STORE_TYPE = "type";
+    public static final int DEFAULT_CURSOR_FETCH_ROWS = 10000;
+    public static final String CURSOR_FETCH_ROWS = PREFIX_OF_CONFIG_OF_CURSOR 
+ ".fetch.rows";
+    public static final String DEFAULT_RESULTS_EXPIRATION_INTERVAL = "1h"; // 
1 hour.
+    public static final String RESULTS_EXPIRATION_INTERVAL = 
PREFIX_OF_CONFIG_OF_RESPONSE_STORE + ".expiration";
+
+    public static final String RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD =
+        "controller.cluster.response.store.cleaner.frequencyPeriod";
+    public static final String DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD 
= "1h";
+    public static final String RESPONSE_STORE_CLEANER_INITIAL_DELAY =
+        "controller.cluster.response.store.cleaner.initialDelay";
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to