This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 73843b57c7 Add support for Cursors through API Query Params (#14110)
73843b57c7 is described below
commit 73843b57c724c55246daa01309d5b4f5046c63f1
Author: Rajat Venkatesh <[email protected]>
AuthorDate: Tue Dec 24 10:45:33 2024 +0530
Add support for Cursors through API Query Params (#14110)
---
.../broker/api/resources/PinotClientRequest.java | 39 +-
.../api/resources/ResponseStoreResource.java | 202 ++++++++++
.../broker/broker/BrokerAdminApiApplication.java | 5 +-
.../broker/broker/helix/BaseBrokerStarter.java | 33 +-
.../pinot/broker/cursors/FsResponseStore.java | 248 ++++++++++++
.../pinot/broker/cursors/JsonResponseSerde.java | 37 ++
.../BrokerRequestHandlerDelegate.java | 35 +-
.../common/cursors/AbstractResponseStore.java | 243 ++++++++++++
.../apache/pinot/common/metrics/BrokerMeter.java | 22 +-
.../pinot/common/response/CursorResponse.java | 132 +++++++
.../response/broker/CursorResponseNative.java | 182 +++++++++
.../common/utils/config/QueryOptionsUtils.java | 9 +
.../pinot/controller/BaseControllerStarter.java | 14 +
.../apache/pinot/controller/ControllerConf.java | 1 +
.../controller/cursors/ResponseStoreCleaner.java | 222 +++++++++++
...ControllerPeriodicTaskStarterStatelessTest.java | 2 +-
.../java/org/apache/pinot/core/auth/Actions.java | 2 +
.../integration/tests/CursorFsIntegrationTest.java | 45 +++
.../integration/tests/CursorIntegrationTest.java | 425 +++++++++++++++++++++
.../tests/CursorWithAuthIntegrationTest.java | 207 ++++++++++
.../tests/cursors/MemoryResponseStore.java | 105 +++++
.../apache/pinot/spi/cursors/ResponseStore.java | 82 ++++
.../pinot/spi/cursors/ResponseStoreService.java | 77 ++++
.../apache/pinot/spi/utils/CommonConstants.java | 24 ++
24 files changed, 2378 insertions(+), 15 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index bc8c6a5f3c..44da5f962d 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -77,9 +77,11 @@ import
org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.RequestScope;
import org.apache.pinot.spi.trace.Tracing;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.PinotSqlType;
@@ -100,6 +102,9 @@ import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
public class PinotClientRequest {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotClientRequest.class);
+ @Inject
+ PinotConfiguration _brokerConf;
+
@Inject
SqlQueryExecutor _sqlQueryExecutor;
@@ -157,6 +162,10 @@ public class PinotClientRequest {
})
@ManualAuthorization
public void processSqlQueryPost(String query, @Suspended AsyncResponse
asyncResponse,
+ @ApiParam(value = "Return a cursor instead of complete result set")
@QueryParam("getCursor")
+ @DefaultValue("false") boolean getCursor,
+ @ApiParam(value = "Number of rows to fetch. Applicable only when
getCursor is true") @QueryParam("numRows")
+ @DefaultValue("0") int numRows,
@Context org.glassfish.grizzly.http.server.Request requestContext,
@Context HttpHeaders httpHeaders) {
try {
@@ -165,7 +174,8 @@ public class PinotClientRequest {
throw new IllegalStateException("Payload is missing the query string
field 'sql'");
}
BrokerResponse brokerResponse =
- executeSqlQuery((ObjectNode) requestJson,
makeHttpIdentity(requestContext), false, httpHeaders);
+ executeSqlQuery((ObjectNode) requestJson,
makeHttpIdentity(requestContext), false, httpHeaders, false,
+ getCursor, numRows);
asyncResponse.resume(getPinotQueryResponse(brokerResponse));
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
@@ -221,6 +231,10 @@ public class PinotClientRequest {
})
@ManualAuthorization
public void processSqlWithMultiStageQueryEnginePost(String query, @Suspended
AsyncResponse asyncResponse,
+ @ApiParam(value = "Return a cursor instead of complete result set")
@QueryParam("getCursor")
+ @DefaultValue("false") boolean getCursor,
+ @ApiParam(value = "Number of rows to fetch. Applicable only getCursor is
true") @QueryParam("numRows")
+ @DefaultValue("0") int numRows,
@Context org.glassfish.grizzly.http.server.Request requestContext,
@Context HttpHeaders httpHeaders) {
try {
@@ -229,7 +243,8 @@ public class PinotClientRequest {
throw new IllegalStateException("Payload is missing the query string
field 'sql'");
}
BrokerResponse brokerResponse =
- executeSqlQuery((ObjectNode) requestJson,
makeHttpIdentity(requestContext), false, httpHeaders, true);
+ executeSqlQuery((ObjectNode) requestJson,
makeHttpIdentity(requestContext), false, httpHeaders, true,
+ getCursor, numRows);
asyncResponse.resume(getPinotQueryResponse(brokerResponse));
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
@@ -427,6 +442,12 @@ public class PinotClientRequest {
private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson,
HttpRequesterIdentity httpRequesterIdentity,
boolean onlyDql, HttpHeaders httpHeaders, boolean forceUseMultiStage)
throws Exception {
+ return executeSqlQuery(sqlRequestJson, httpRequesterIdentity, onlyDql,
httpHeaders, forceUseMultiStage, false, 0);
+ }
+
+ private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson,
HttpRequesterIdentity httpRequesterIdentity,
+ boolean onlyDql, HttpHeaders httpHeaders, boolean forceUseMultiStage,
boolean getCursor, int numRows)
+ throws Exception {
long requestArrivalTimeMs = System.currentTimeMillis();
SqlNodeAndOptions sqlNodeAndOptions;
try {
@@ -437,6 +458,16 @@ public class PinotClientRequest {
if (forceUseMultiStage) {
sqlNodeAndOptions.setExtraOptions(ImmutableMap.of(Request.QueryOptionKey.USE_MULTISTAGE_ENGINE,
"true"));
}
+ if (getCursor) {
+ if (numRows == 0) {
+ numRows =
_brokerConf.getProperty(CommonConstants.CursorConfigs.CURSOR_FETCH_ROWS,
+ CommonConstants.CursorConfigs.DEFAULT_CURSOR_FETCH_ROWS);
+ }
+ sqlNodeAndOptions.setExtraOptions(
+ ImmutableMap.of(Request.QueryOptionKey.GET_CURSOR, "true",
Request.QueryOptionKey.CURSOR_NUM_ROWS,
+ Integer.toString(numRows)));
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_QUERIES_GLOBAL,
1);
+ }
PinotSqlType sqlType = sqlNodeAndOptions.getSqlType();
if (onlyDql && sqlType != PinotSqlType.DQL) {
return new
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
@@ -475,7 +506,7 @@ public class PinotClientRequest {
return _requestHandler.handleTimeSeriesRequest(language, queryString,
requestContext);
}
- private static HttpRequesterIdentity
makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
+ public static HttpRequesterIdentity
makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
Multimap<String, String> headers = ArrayListMultimap.create();
context.getHeaderNames().forEach(key ->
context.getHeaders(key).forEach(value -> headers.put(key, value)));
@@ -497,7 +528,7 @@ public class PinotClientRequest {
* @throws Exception
*/
@VisibleForTesting
- static Response getPinotQueryResponse(BrokerResponse brokerResponse)
+ public static Response getPinotQueryResponse(BrokerResponse brokerResponse)
throws Exception {
int queryErrorCodeHeaderValue = -1; // default value of the header.
List<QueryProcessingException> exceptions = brokerResponse.getExceptions();
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java
new file mode 100644
index 0000000000..afc8ceebf4
--- /dev/null
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/ResponseStoreResource.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Collection;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
+import javax.ws.rs.container.Suspended;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.broker.api.AccessControl;
+import org.apache.pinot.broker.broker.AccessControlFactory;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.core.auth.Actions;
+import org.apache.pinot.core.auth.Authorize;
+import org.apache.pinot.core.auth.ManualAuthorization;
+import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.spi.auth.TableAuthorizationResult;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.glassfish.grizzly.http.server.Request;
+import org.glassfish.jersey.server.ManagedAsync;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ * This resource API provides API to read cursors as well as admin function
such as list, read and delete response
+ * stores
+ */
+@Api(tags = "ResponseStore", authorizations = {@Authorization(value =
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+ HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY,
+ description = "The format of the key is ```\"Basic <token>\" or \"Bearer
<token>\"```")))
+@Path("/responseStore")
+public class ResponseStoreResource {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ResponseStoreResource.class);
+
+ @Inject
+ private PinotConfiguration _brokerConf;
+
+ @Inject
+ private BrokerMetrics _brokerMetrics;
+
+ @Inject
+ private AbstractResponseStore _responseStore;
+
+ @Inject
+ AccessControlFactory _accessControlFactory;
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_RESPONSE_STORE)
+ @ApiOperation(value = "Get metadata of all response stores.", notes = "Get
metadata of all response stores")
+ public Collection<CursorResponse> getResults(@Context HttpHeaders headers) {
+ try {
+ return _responseStore.getAllStoredResponses();
+ } catch (Exception e) {
+ throw new WebApplicationException(e,
+
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+ }
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{requestId}")
+ @ApiOperation(value = "Response without ResultTable of a query")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Query response"), @ApiResponse(code
= 500, message = "Internal Server Error")
+ })
+ @ManualAuthorization
+ public BrokerResponse getSqlQueryMetadata(
+ @ApiParam(value = "Request ID of the query", required = true)
@PathParam("requestId") String requestId,
+ @Context org.glassfish.grizzly.http.server.Request requestContext) {
+ try {
+ checkRequestExistsAndAuthorized(requestId, requestContext);
+ return _responseStore.readResponse(requestId);
+ } catch (WebApplicationException wae) {
+ throw wae;
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while processing GET request", e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_GET_EXCEPTIONS, 1L);
+ throw new WebApplicationException(e,
+
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+ }
+ }
+
+ @GET
+ @ManagedAsync
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("{requestId}/results")
+ @ApiOperation(value = "Get result set from the query's response store")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Query response"), @ApiResponse(code
= 500, message = "Internal Server Error")
+ })
+ @ManualAuthorization
+ public void getSqlQueryResult(
+ @ApiParam(value = "Request ID of the query", required = true)
@PathParam("requestId") String requestId,
+ @ApiParam(value = "Offset in the result set", required = true)
@QueryParam("offset") int offset,
+ @ApiParam(value = "Number of rows to fetch") @QueryParam("numRows")
Integer numRows,
+ @Context org.glassfish.grizzly.http.server.Request requestContext,
+ @Suspended AsyncResponse asyncResponse) {
+ try {
+ checkRequestExistsAndAuthorized(requestId, requestContext);
+ if (numRows == null) {
+ numRows =
_brokerConf.getProperty(CommonConstants.CursorConfigs.CURSOR_FETCH_ROWS,
+ CommonConstants.CursorConfigs.DEFAULT_CURSOR_FETCH_ROWS);
+ }
+ asyncResponse.resume(
+
PinotClientRequest.getPinotQueryResponse(_responseStore.handleCursorRequest(requestId,
offset, numRows)));
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while processing GET request", e);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.UNCAUGHT_GET_EXCEPTIONS, 1L);
+ asyncResponse.resume(new WebApplicationException(e,
+
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build()));
+ }
+ }
+
+ @DELETE
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("/{requestId}")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.DELETE_RESPONSE_STORE)
+ @ApiOperation(value = "Delete the response store of a query", notes =
"Delete the response store of a query")
+ public String deleteResponse(
+ @ApiParam(value = "Request ID of the query", required = true)
@PathParam("requestId") String requestId,
+ @Context HttpHeaders headers) {
+ try {
+ if (_responseStore.deleteResponse(requestId)) {
+ return "Query Results for " + requestId + " deleted.";
+ }
+ } catch (Exception e) {
+ throw new WebApplicationException(e,
+
Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()).build());
+ }
+
+ // Query Result not found. Throw error.
+ throw new WebApplicationException(
+ Response.status(Response.Status.NOT_FOUND).entity(String.format("Query
results for %s not found.", requestId))
+ .build());
+ }
+
+ private void checkRequestExistsAndAuthorized(String requestId, Request
requestContext)
+ throws Exception {
+ if (_responseStore.exists(requestId)) {
+ CursorResponse response = _responseStore.readResponse(requestId);
+ AccessControl accessControl = _accessControlFactory.create();
+ TableAuthorizationResult result = accessControl.authorize(
+ PinotClientRequest.makeHttpIdentity(requestContext),
+ response.getTablesQueried());
+ if (!result.hasAccess()) {
+ throw new WebApplicationException(
+
Response.status(Response.Status.FORBIDDEN).entity(result.getFailureMessage()).build());
+ }
+ } else {
+ throw new
WebApplicationException(Response.status(Response.Status.NOT_FOUND)
+ .entity(String.format("Query results for %s not found.",
requestId)).build());
+ }
+ }
+}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
index fc443caab0..64e6cb837b 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
@@ -35,6 +35,7 @@ import org.apache.helix.HelixManager;
import org.apache.pinot.broker.queryquota.QueryQuotaManager;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
import org.apache.pinot.common.http.PoolingHttpClientConnectionManagerHelper;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.swagger.SwaggerApiListingResource;
@@ -75,7 +76,7 @@ public class BrokerAdminApiApplication extends ResourceConfig
{
public BrokerAdminApiApplication(BrokerRoutingManager routingManager,
BrokerRequestHandler brokerRequestHandler,
BrokerMetrics brokerMetrics, PinotConfiguration brokerConf,
SqlQueryExecutor sqlQueryExecutor,
ServerRoutingStatsManager serverRoutingStatsManager,
AccessControlFactory accessFactory,
- HelixManager helixManager, QueryQuotaManager queryQuotaManager) {
+ HelixManager helixManager, QueryQuotaManager queryQuotaManager,
AbstractResponseStore responseStore) {
_brokerResourcePackages =
brokerConf.getProperty(CommonConstants.Broker.BROKER_RESOURCE_PACKAGES,
CommonConstants.Broker.DEFAULT_BROKER_RESOURCE_PACKAGES);
String[] pkgs = _brokerResourcePackages.split(",");
@@ -116,6 +117,8 @@ public class BrokerAdminApiApplication extends
ResourceConfig {
bind(queryQuotaManager).to(QueryQuotaManager.class);
bind(accessFactory).to(AccessControlFactory.class);
bind(startTime).named(BrokerAdminApiApplication.START_TIME);
+ bind(responseStore).to(AbstractResponseStore.class);
+ bind(brokerConf).to(PinotConfiguration.class);
}
});
boolean enableBoundedJerseyThreadPoolExecutor =
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
index 368e409e23..2aab062730 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java
@@ -20,6 +20,7 @@ package org.apache.pinot.broker.broker.helix;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
@@ -56,6 +57,7 @@ import org.apache.pinot.common.Utils;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
import org.apache.pinot.common.function.FunctionRegistry;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metrics.BrokerGauge;
@@ -78,8 +80,10 @@ import org.apache.pinot.core.util.ListenerConfigUtil;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
+import org.apache.pinot.spi.cursors.ResponseStoreService;
import org.apache.pinot.spi.env.PinotConfiguration;
import
org.apache.pinot.spi.eventlistener.query.BrokerQueryEventListenerFactory;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.metrics.PinotMetricUtils;
import org.apache.pinot.spi.metrics.PinotMetricsRegistry;
import org.apache.pinot.spi.services.ServiceRole;
@@ -139,6 +143,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
protected ServerRoutingStatsManager _serverRoutingStatsManager;
protected HelixExternalViewBasedQueryQuotaManager _queryQuotaManager;
protected MultiStageQueryThrottler _multiStageQueryThrottler;
+ protected AbstractResponseStore _responseStore;
@Override
public void init(PinotConfiguration brokerConf)
@@ -353,9 +358,26 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
timeSeriesRequestHandler = new TimeSeriesRequestHandler(_brokerConf,
brokerId, _routingManager,
_accessControlFactory, _queryQuotaManager, tableCache,
queryDispatcher);
}
+
+ LOGGER.info("Initializing PinotFSFactory");
+
PinotFSFactory.init(_brokerConf.subset(CommonConstants.Broker.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY));
+
+ LOGGER.info("Initialize ResponseStore");
+ PinotConfiguration responseStoreConfiguration =
+
_brokerConf.subset(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE);
+
+ String expirationTime =
_brokerConf.getProperty(CommonConstants.CursorConfigs.RESULTS_EXPIRATION_INTERVAL,
+ CommonConstants.CursorConfigs.DEFAULT_RESULTS_EXPIRATION_INTERVAL);
+
+ _responseStore = (AbstractResponseStore)
ResponseStoreService.getInstance().getResponseStore(
+
responseStoreConfiguration.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_TYPE,
+ CommonConstants.CursorConfigs.DEFAULT_RESPONSE_STORE_TYPE));
+
_responseStore.init(responseStoreConfiguration.subset(_responseStore.getType()),
_hostname, _port, brokerId,
+ _brokerMetrics, expirationTime);
+
_brokerRequestHandler =
new BrokerRequestHandlerDelegate(singleStageBrokerRequestHandler,
multiStageBrokerRequestHandler,
- timeSeriesRequestHandler);
+ timeSeriesRequestHandler, _responseStore);
_brokerRequestHandler.start();
// Enable/disable thread CPU time measurement through instance config.
@@ -604,6 +626,13 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
_brokerRequestHandler.shutDown();
_brokerAdminApplication.stop();
+ LOGGER.info("Close PinotFs");
+ try {
+ PinotFSFactory.shutdown();
+ } catch (IOException e) {
+ LOGGER.error("Caught exception when shutting down PinotFsFactory", e);
+ }
+
LOGGER.info("Disconnecting spectator Helix manager");
_spectatorHelixManager.disconnect();
@@ -650,7 +679,7 @@ public abstract class BaseBrokerStarter implements
ServiceStartable {
BrokerAdminApiApplication brokerAdminApiApplication =
new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler,
_brokerMetrics, _brokerConf,
_sqlQueryExecutor, _serverRoutingStatsManager,
_accessControlFactory, _spectatorHelixManager,
- _queryQuotaManager);
+ _queryQuotaManager, _responseStore);
registerExtraComponents(brokerAdminApiApplication);
return brokerAdminApiApplication;
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java
new file mode 100644
index 0000000000..8da7b0a33c
--- /dev/null
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/cursors/FsResponseStore.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import com.google.auto.service.AutoService;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Stores responses in a file system. All storage schemes supported by PinotFS
can be used.
+ * Responses are stored in "data.dir" directory with the following structure:
+ * - A directory is created for every request id.
+ * - Response metadata is stored with filename "response"
+ * - Results are stored with filename "resultTable"
+ * The extension of the file is determined by the config "extension"
+ *
+ */
+@AutoService(ResponseStore.class)
+public class FsResponseStore extends AbstractResponseStore {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FsResponseStore.class);
+ private static final String TYPE = "file";
+ private static final String RESULT_TABLE_FILE_NAME_FORMAT = "resultTable.%s";
+ private static final String RESPONSE_FILE_NAME_FORMAT = "response.%s";
+ private static final String URI_SEPARATOR = "/";
+
+ public static final String TEMP_DIR = "temp.dir";
+ public static final String DATA_DIR = "data.dir";
+ public static final String FILE_NAME_EXTENSION = "extension";
+ public static final Path DEFAULT_ROOT_DIR =
Path.of(System.getProperty("java.io.tmpdir"), "broker", "responseStore");
+ public static final Path DEFAULT_TEMP_DIR = DEFAULT_ROOT_DIR.resolve("temp");
+ public static final URI DEFAULT_DATA_DIR =
DEFAULT_ROOT_DIR.resolve("data").toUri();
+ public static final String DEFAULT_FILE_NAME_EXTENSION = "json";
+
+ private Path _localTempDir;
+ private URI _dataDir;
+ private JsonResponseSerde _responseSerde;
+ private String _fileExtension;
+
+ private static URI combinePath(URI baseUri, String path)
+ throws URISyntaxException {
+ String newPath =
+ baseUri.getPath().endsWith(URI_SEPARATOR) ? baseUri.getPath() + path :
baseUri.getPath() + URI_SEPARATOR + path;
+ return new URI(baseUri.getScheme(), baseUri.getHost(), newPath, null);
+ }
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public void init(PinotConfiguration config, String brokerHost, int
brokerPort, String brokerId,
+ BrokerMetrics brokerMetrics, String expirationTime)
+ throws Exception {
+ init(brokerHost, brokerPort, brokerId, brokerMetrics, expirationTime);
+
+ _responseSerde = new JsonResponseSerde();
+ _fileExtension = config.getProperty(FILE_NAME_EXTENSION,
DEFAULT_FILE_NAME_EXTENSION);
+ _localTempDir = config.containsKey(TEMP_DIR) ?
Path.of(config.getProperty(TEMP_DIR)) : DEFAULT_TEMP_DIR;
+ Files.createDirectories(_localTempDir);
+
+ _dataDir = config.containsKey(DATA_DIR) ? new
URI(config.getProperty(DATA_DIR)) : DEFAULT_DATA_DIR;
+ PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+ pinotFS.mkdir(_dataDir);
+ }
+
+ private Path getTempPath(String... nameParts) {
+ StringBuilder filename = new StringBuilder();
+ for (String part : nameParts) {
+ filename.append(part).append("_");
+ }
+ filename.append(Thread.currentThread().getId());
+ return _localTempDir.resolve(filename.toString());
+ }
+
+ @Override
+ public boolean exists(String requestId)
+ throws Exception {
+ PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+ URI queryDir = combinePath(_dataDir, requestId);
+ return pinotFS.exists(queryDir);
+ }
+
+ @Override
+ public Collection<String> getAllStoredRequestIds()
+ throws Exception {
+ PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+ List<FileMetadata> queryPaths = pinotFS.listFilesWithMetadata(_dataDir,
true);
+ List<String> requestIdList = new ArrayList<>(queryPaths.size());
+
+ LOGGER.debug("Found {} paths.", queryPaths.size());
+
+ for (FileMetadata metadata : queryPaths) {
+ LOGGER.debug("Processing query path: {}", metadata.toString());
+ if (metadata.isDirectory()) {
+ try {
+ URI queryDir = new URI(metadata.getFilePath());
+ URI metadataFile = combinePath(queryDir,
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+ boolean metadataFileExists = pinotFS.exists(metadataFile);
+ LOGGER.debug("Checking for query dir {} & metadata file: {}.
Metadata file exists: {}", queryDir,
+ metadataFile, metadataFileExists);
+ if (metadataFileExists) {
+ BrokerResponse response =
+ _responseSerde.deserialize(pinotFS.open(metadataFile),
CursorResponseNative.class);
+ if (response.getBrokerId().equals(_brokerId)) {
+ requestIdList.add(response.getRequestId());
+ LOGGER.debug("Added response store {}", queryDir);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error when processing {}", metadata, e);
+ }
+ }
+ }
+
+ return requestIdList;
+ }
+
+ @Override
+ protected boolean deleteResponseImpl(String requestId)
+ throws Exception {
+ PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+ URI queryDir = combinePath(_dataDir, requestId);
+ if (pinotFS.exists(queryDir)) {
+ pinotFS.delete(queryDir, true);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ protected void writeResponse(String requestId, CursorResponse response)
+ throws Exception {
+ PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+ URI queryDir = combinePath(_dataDir, requestId);
+
+ // Create a directory for this query.
+ pinotFS.mkdir(queryDir);
+
+ Path tempResponseFile = getTempPath("response", requestId);
+ URI metadataFile = combinePath(queryDir,
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+
+ try (OutputStream tempResponseFileOS =
Files.newOutputStream(tempResponseFile)) {
+ _responseSerde.serialize(response, tempResponseFileOS);
+ }
+
+ try {
+ pinotFS.copyFromLocalFile(tempResponseFile.toFile(), metadataFile);
+ } finally {
+ Files.delete(tempResponseFile);
+ }
+ }
+
+ @Override
+ protected long writeResultTable(String requestId, ResultTable resultTable)
+ throws Exception {
+ PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+ URI queryDir = combinePath(_dataDir, requestId);
+
+ // Create a directory for this query.
+ pinotFS.mkdir(queryDir);
+
+ Path tempResultTableFile = getTempPath("resultTable", requestId);
+ URI dataFile = combinePath(queryDir,
String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
+
+ try (OutputStream tempResultTableFileOS =
Files.newOutputStream(tempResultTableFile)) {
+ _responseSerde.serialize(resultTable, tempResultTableFileOS);
+ }
+
+ try {
+ File tempFile = tempResultTableFile.toFile();
+ pinotFS.copyFromLocalFile(tempFile, dataFile);
+ return tempFile.length();
+ } finally {
+ Files.delete(tempResultTableFile);
+ }
+ }
+
+ @Override
+ public CursorResponse readResponse(String requestId)
+ throws Exception {
+ PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+ URI queryDir = combinePath(_dataDir, requestId);
+ URI metadataFile = combinePath(queryDir,
String.format(RESPONSE_FILE_NAME_FORMAT, _fileExtension));
+ try (InputStream metadataIS = pinotFS.open(metadataFile)) {
+ return _responseSerde.deserialize(metadataIS,
CursorResponseNative.class);
+ }
+ }
+
+ @Override
+ protected ResultTable readResultTable(String requestId, int offset, int
numRows)
+ throws Exception {
+ PinotFS pinotFS = PinotFSFactory.create(_dataDir.getScheme());
+ URI queryDir = combinePath(_dataDir, requestId);
+ URI dataFile = combinePath(queryDir,
String.format(RESULT_TABLE_FILE_NAME_FORMAT, _fileExtension));
+ CursorResponse response = readResponse(requestId);
+ int totalTableRows = response.getNumRowsResultSet();
+
+ try (InputStream dataIS = pinotFS.open(dataFile)) {
+ ResultTable resultTable = _responseSerde.deserialize(dataIS,
ResultTable.class);
+
+ int sliceEnd = offset + numRows;
+ if (sliceEnd > totalTableRows) {
+ sliceEnd = totalTableRows;
+ }
+
+ return new ResultTable(resultTable.getDataSchema(),
resultTable.getRows().subList(offset, sliceEnd));
+ }
+ }
+}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/cursors/JsonResponseSerde.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/cursors/JsonResponseSerde.java
new file mode 100644
index 0000000000..eb8083cbc5
--- /dev/null
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/cursors/JsonResponseSerde.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.cursors;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+public class JsonResponseSerde {
+ public void serialize(Object object, OutputStream stream)
+ throws IOException {
+ JsonUtils.objectToOutputStream(object, stream);
+ }
+
+ public <T> T deserialize(InputStream stream, Class<T> valueType)
+ throws IOException {
+ return JsonUtils.inputStreamToObject(stream, valueType);
+ }
+}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index e3a814365a..561e79abb4 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -25,8 +25,10 @@ import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.pinot.broker.api.RequesterIdentity;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
@@ -46,13 +48,15 @@ public class BrokerRequestHandlerDelegate implements
BrokerRequestHandler {
private final BaseSingleStageBrokerRequestHandler
_singleStageBrokerRequestHandler;
private final MultiStageBrokerRequestHandler _multiStageBrokerRequestHandler;
private final TimeSeriesRequestHandler _timeSeriesRequestHandler;
+ private final AbstractResponseStore _responseStore;
public BrokerRequestHandlerDelegate(BaseSingleStageBrokerRequestHandler
singleStageBrokerRequestHandler,
@Nullable MultiStageBrokerRequestHandler multiStageBrokerRequestHandler,
- @Nullable TimeSeriesRequestHandler timeSeriesRequestHandler) {
+ @Nullable TimeSeriesRequestHandler timeSeriesRequestHandler,
AbstractResponseStore responseStore) {
_singleStageBrokerRequestHandler = singleStageBrokerRequestHandler;
_multiStageBrokerRequestHandler = multiStageBrokerRequestHandler;
_timeSeriesRequestHandler = timeSeriesRequestHandler;
+ _responseStore = responseStore;
}
@Override
@@ -99,18 +103,23 @@ public class BrokerRequestHandlerDelegate implements
BrokerRequestHandler {
}
}
+ BaseBrokerRequestHandler requestHandler = _singleStageBrokerRequestHandler;
if
(QueryOptionsUtils.isUseMultistageEngine(sqlNodeAndOptions.getOptions())) {
if (_multiStageBrokerRequestHandler != null) {
- return _multiStageBrokerRequestHandler.handleRequest(request,
sqlNodeAndOptions, requesterIdentity,
- requestContext, httpHeaders);
+ requestHandler = _multiStageBrokerRequestHandler;
} else {
return new
BrokerResponseNative(QueryException.getException(QueryException.INTERNAL_ERROR,
"V2 Multi-Stage query engine not enabled."));
}
- } else {
- return _singleStageBrokerRequestHandler.handleRequest(request,
sqlNodeAndOptions, requesterIdentity,
- requestContext, httpHeaders);
}
+
+ BrokerResponse response = requestHandler.handleRequest(request,
sqlNodeAndOptions, requesterIdentity,
+ requestContext, httpHeaders);
+
+ if (response.getExceptionsSize() == 0 &&
QueryOptionsUtils.isGetCursor(sqlNodeAndOptions.getOptions())) {
+ response =
getCursorResponse(QueryOptionsUtils.getCursorNumRows(sqlNodeAndOptions.getOptions()),
response);
+ }
+ return response;
}
@Override
@@ -138,4 +147,18 @@ public class BrokerRequestHandlerDelegate implements
BrokerRequestHandler {
// not found, try on the singleStaged engine.
return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs,
executor, connMgr, serverResponses);
}
+
+ private CursorResponse getCursorResponse(Integer numRows, BrokerResponse
response)
+ throws Exception {
+ if (numRows == null) {
+ throw new RuntimeException(
+ "numRows not specified when requesting a cursor for request id: " +
response.getRequestId());
+ }
+ long cursorStoreStartTimeMs = System.currentTimeMillis();
+ _responseStore.storeResponse(response);
+ long cursorStoreTimeMs = System.currentTimeMillis() -
cursorStoreStartTimeMs;
+ CursorResponse cursorResponse =
_responseStore.handleCursorRequest(response.getRequestId(), 0, numRows);
+ cursorResponse.setCursorResultWriteTimeMs(cursorStoreTimeMs);
+ return cursorResponse;
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java
b/pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java
new file mode 100644
index 0000000000..186a668d65
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/cursors/AbstractResponseStore.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.cursors;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.TimeUtils;
+
+
+public abstract class AbstractResponseStore implements ResponseStore {
+
+ protected String _brokerHost;
+ protected int _brokerPort;
+ protected String _brokerId;
+ protected BrokerMetrics _brokerMetrics;
+ protected long _expirationIntervalInMs;
+
+ protected void init(String brokerHost, int brokerPort, String brokerId,
BrokerMetrics brokerMetrics,
+ String expirationTime) {
+ _brokerMetrics = brokerMetrics;
+ _brokerHost = brokerHost;
+ _brokerPort = brokerPort;
+ _brokerId = brokerId;
+ _expirationIntervalInMs = TimeUtils.convertPeriodToMillis(expirationTime);
+ }
+
+ /**
+ * Initialize the store.
+ * @param config Subset configuration of
pinot.broker.cursor.response.store.<type>
+ * @param brokerHost Hostname of the broker where ResponseStore is created
+ * @param brokerPort Port of the broker where the ResponseStore is created
+ * @param brokerId ID of the broker where the ResponseStore is created.
+ * @param brokerMetrics Metrics utility to track cursor metrics.
+ */
+ public abstract void init(PinotConfiguration config, String brokerHost, int
brokerPort, String brokerId,
+ BrokerMetrics brokerMetrics, String expirationTime)
+ throws Exception;
+
+ /**
+ * Get the hostname of the broker where the query is executed
+ * @return String containing the hostname
+ */
+ protected String getBrokerHost() {
+ return _brokerHost;
+ }
+
+ /**
+ * Get the port of the broker where the query is executed
+ * @return int containing the port
+ */
+ protected int getBrokerPort() {
+ return _brokerPort;
+ }
+
+ /**
+ * Get the expiration interval of a query response.
+ * @return long containing the expiration interval.
+ */
+ protected long getExpirationIntervalInMs() {
+ return _expirationIntervalInMs;
+ }
+
+ /**
+ * Write a CursorResponse
+ * @param requestId Request ID of the response
+ * @param response The response to write
+ * @throws Exception Thrown if there is any error while writing the response
+ */
+ protected abstract void writeResponse(String requestId, CursorResponse
response)
+ throws Exception;
+
+ /**
+ * Write a {@link ResultTable} to the store
+ * @param requestId Request ID of the response
+ * @param resultTable The {@link ResultTable} of the query
+ * @throws Exception Thrown if there is any error while writing the result
table.
+ * @return Returns the number of bytes written
+ */
+ protected abstract long writeResultTable(String requestId, ResultTable
resultTable)
+ throws Exception;
+
+ /**
+ * Read the response (excluding the {@link ResultTable}) from the store
+ * @param requestId Request ID of the response
+ * @return CursorResponse (without the {@link ResultTable})
+ * @throws Exception Thrown if there is any error while reading the response
+ */
+ public abstract CursorResponse readResponse(String requestId)
+ throws Exception;
+
+ /**
+ * Read the {@link ResultTable} of a query response
+ * @param requestId Request ID of the query
+ * @param offset Offset of the result slice
+ * @param numRows Number of rows required in the slice
+ * @return {@link ResultTable} of the query
+ * @throws Exception Thrown if there is any error while reading the result
table
+ */
+ protected abstract ResultTable readResultTable(String requestId, int offset,
int numRows)
+ throws Exception;
+
+ protected abstract boolean deleteResponseImpl(String requestId)
+ throws Exception;
+
+ /**
+ * Stores the response in the store. {@link CursorResponse} and {@link
ResultTable} are stored separately.
+ * @param response Response to be stored
+ * @throws Exception Thrown if there is any error while storing the response.
+ */
+ public void storeResponse(BrokerResponse response)
+ throws Exception {
+ String requestId = response.getRequestId();
+
+ CursorResponse cursorResponse = new CursorResponseNative(response);
+
+ long submissionTimeMs = System.currentTimeMillis();
+ // Initialize all CursorResponse specific metadata
+ cursorResponse.setBrokerHost(getBrokerHost());
+ cursorResponse.setBrokerPort(getBrokerPort());
+ cursorResponse.setSubmissionTimeMs(submissionTimeMs);
+ cursorResponse.setExpirationTimeMs(submissionTimeMs +
getExpirationIntervalInMs());
+ cursorResponse.setOffset(0);
+ cursorResponse.setNumRows(response.getNumRowsResultSet());
+
+ try {
+ long bytesWritten = writeResultTable(requestId,
response.getResultTable());
+
+ // Remove the resultTable from the response as it is serialized in a
data file.
+ cursorResponse.setResultTable(null);
+ cursorResponse.setBytesWritten(bytesWritten);
+ writeResponse(requestId, cursorResponse);
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_RESPONSE_STORE_SIZE,
bytesWritten);
+ } catch (Exception e) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_WRITE_EXCEPTION,
1);
+ deleteResponse(requestId);
+ throw e;
+ }
+ }
+
+ /**
+ * Reads the response from the store and populates it with a slice of the
{@link ResultTable}
+ * @param requestId Request ID of the query
+ * @param offset Offset of the result slice
+ * @param numRows Number of rows required in the slice
+ * @return A CursorResponse with a slice of the {@link ResultTable}
+ * @throws Exception Thrown if there is any error during the operation.
+ */
+ public CursorResponse handleCursorRequest(String requestId, int offset, int
numRows)
+ throws Exception {
+
+ CursorResponse response;
+ ResultTable resultTable;
+
+ try {
+ response = readResponse(requestId);
+ } catch (Exception e) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION,
1);
+ throw e;
+ }
+
+ int totalTableRows = response.getNumRowsResultSet();
+
+ if (totalTableRows == 0 && offset == 0) {
+ // If sum records is 0, then result set is empty.
+ response.setResultTable(null);
+ response.setOffset(0);
+ response.setNumRows(0);
+ return response;
+ } else if (offset >= totalTableRows) {
+ throw new RuntimeException("Offset " + offset + " should be lesser than
totalRecords " + totalTableRows);
+ }
+
+ long fetchStartTime = System.currentTimeMillis();
+ try {
+ resultTable = readResultTable(requestId, offset, numRows);
+ } catch (Exception e) {
+ _brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_READ_EXCEPTION,
1);
+ throw e;
+ }
+
+ response.setResultTable(resultTable);
+ response.setCursorFetchTimeMs(System.currentTimeMillis() - fetchStartTime);
+ response.setOffset(offset);
+ response.setNumRows(resultTable.getRows().size());
+ response.setNumRowsResultSet(totalTableRows);
+ return response;
+ }
+
+ /**
+ * Returns the list of responses created by the broker.
+ * Note that the ResponseStore object in a broker should only return
responses created by it.
+ * @return A list of CursorResponse objects created by the specific broker
+ * @throws Exception Thrown if there is an error during an operation.
+ */
+ public List<CursorResponse> getAllStoredResponses()
+ throws Exception {
+ List<CursorResponse> responses = new ArrayList<>();
+
+ for (String requestId : getAllStoredRequestIds()) {
+ responses.add(readResponse(requestId));
+ }
+
+ return responses;
+ }
+
+ @Override
+ public boolean deleteResponse(String requestId) throws Exception {
+ if (!exists(requestId)) {
+ return false;
+ }
+
+ long bytesWritten = readResponse(requestId).getBytesWritten();
+ boolean isSucceeded = deleteResponseImpl(requestId);
+ if (isSucceeded) {
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.CURSOR_RESPONSE_STORE_SIZE,
bytesWritten * -1);
+ }
+ return isSucceeded;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
index ea6a66251c..22be35405f 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerMeter.java
@@ -169,7 +169,27 @@ public enum BrokerMeter implements AbstractMetrics.Meter {
* For each query with at least one window function, this meter is increased
as many times as window functions in the
* query.
*/
- WINDOW_COUNT("queries", true),;
+ WINDOW_COUNT("queries", true),
+
+ /**
+ * Number of queries executed with cursors. This count includes queries that
use SSE and MSE
+ */
+ CURSOR_QUERIES_GLOBAL("queries", true),
+
+ /**
+ * Number of exceptions when writing a response to the response store
+ */
+ CURSOR_WRITE_EXCEPTION("exceptions", true),
+
+ /**
+ * Number of exceptions when reading a response and result table from the
response store
+ */
+ CURSOR_READ_EXCEPTION("exceptions", true),
+
+ /**
+ * The number of bytes stored in the response store. Only the size of the
result table is tracked.
+ */
+ CURSOR_RESPONSE_STORE_SIZE("bytes", true);
private final String _brokerMeterName;
private final String _unit;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java
new file mode 100644
index 0000000000..14e65f6fbb
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/CursorResponse.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response;
+
+public interface CursorResponse extends BrokerResponse {
+
+ void setBrokerHost(String brokerHost);
+
+ /**
+ * get hostname of the processing broker
+ * @return String containing the hostname
+ */
+ String getBrokerHost();
+
+ void setBrokerPort(int brokerPort);
+
+ /**
+ * get port of the processing broker
+ * @return int containing the port.
+ */
+ int getBrokerPort();
+
+ /**
+ * Set the starting offset of result table slice
+ * @param offset Offset of the result table slice
+ */
+ void setOffset(int offset);
+
+ /**
+ * Current offset in the query result.
+ * Starts from 0.
+ * @return current offset.
+ */
+ int getOffset();
+
+ /**
+ * Set the number of rows in the result table slice.
+ * @param numRows Number of rows in the result table slice
+ */
+ void setNumRows(int numRows);
+
+ /**
+ * Number of rows in the current response.
+ * @return Number of rows in the current response.
+ */
+ int getNumRows();
+
+ /**
+ * Return the time to write the results to the response store.
+ * @return time in milliseconds
+ */
+ long getCursorResultWriteTimeMs();
+
+ /**
+ * Time taken to write cursor results to query storage.
+ * @param cursorResultWriteMs Time in milliseconds.
+ */
+ void setCursorResultWriteTimeMs(long cursorResultWriteMs);
+
+ /**
+ * Return the time to fetch results from the response store.
+ * @return time in milliseconds.
+ */
+ long getCursorFetchTimeMs();
+
+ /**
+ * Set the time taken to fetch a cursor. The time is specific to the current
call.
+ * @param cursorFetchTimeMs time in milliseconds
+ */
+ void setCursorFetchTimeMs(long cursorFetchTimeMs);
+
+ /**
+ * Unix timestamp when the query was submitted. The timestamp is used to
calculate the expiration time when the
+ * response will be deleted from the response store.
+ * @param submissionTimeMs Unix timestamp when the query was submitted.
+ */
+ void setSubmissionTimeMs(long submissionTimeMs);
+
+ /**
+ * Get the unix timestamp when the query was submitted
+ * @return Submission unix timestamp when the query was submitted
+ */
+ long getSubmissionTimeMs();
+
+ /**
+ * Set the expiration time (unix timestamp) when the response will be
deleted from the response store.
+ * @param expirationTimeMs unix timestamp when the response expires in the
response store
+ */
+ void setExpirationTimeMs(long expirationTimeMs);
+
+ /**
+ * Get the expiration time (unix timestamp) when the response will be
deleted from the response store.
+ * @return expirationTimeMs unix timestamp when the response expires in the
response store
+ */
+ long getExpirationTimeMs();
+
+ /**
+ * Set the number of rows in the result set. This is required because
BrokerResponse checks the ResultTable
+ * to get the number of rows. However the ResultTable is set to null in
CursorResponse. So the numRowsResultSet has to
+ * be remembered.
+ * @param numRowsResultSet Number of rows in the result set.
+ */
+ void setNumRowsResultSet(int numRowsResultSet);
+
+ /**
+ * Set the number of bytes written to the response store when storing the
result table.
+ * @param bytesWritten Number of bytes written
+ */
+ void setBytesWritten(long bytesWritten);
+
+ /**
+ * Get the number of bytes written when storing the result table
+ * @return number of bytes written
+ */
+ long getBytesWritten();
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
new file mode 100644
index 0000000000..d4c2203749
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/CursorResponseNative.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.broker;
+
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import org.apache.pinot.common.response.BrokerResponse;
+import org.apache.pinot.common.response.CursorResponse;
+
+
+@JsonPropertyOrder({
+ "resultTable", "numRowsResultSet", "partialResult", "exceptions",
"numGroupsLimitReached", "timeUsedMs",
+ "requestId", "brokerId", "numDocsScanned", "totalDocs",
"numEntriesScannedInFilter", "numEntriesScannedPostFilter",
+ "numServersQueried", "numServersResponded", "numSegmentsQueried",
"numSegmentsProcessed", "numSegmentsMatched",
+ "numConsumingSegmentsQueried", "numConsumingSegmentsProcessed",
"numConsumingSegmentsMatched",
+ "minConsumingFreshnessTimeMs", "numSegmentsPrunedByBroker",
"numSegmentsPrunedByServer",
+ "numSegmentsPrunedInvalid", "numSegmentsPrunedByLimit",
"numSegmentsPrunedByValue", "brokerReduceTimeMs",
+ "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs",
"offlineSystemActivitiesCpuTimeNs",
+ "realtimeSystemActivitiesCpuTimeNs",
"offlineResponseSerializationCpuTimeNs",
+ "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs",
"realtimeTotalCpuTimeNs",
+ "explainPlanNumEmptyFilterSegments",
"explainPlanNumMatchAllFilterSegments", "traceInfo", "tableQueries",
+ // Fields specific to CursorResponse
+ "offset", "numRows", "cursorResultWriteTimeMs", "cursorFetchTimeMs",
"submissionTimeMs", "expirationTimeMs",
+ "brokerHost", "brokerPort", "bytesWritten"
+})
+public class CursorResponseNative extends BrokerResponseNative implements
CursorResponse {
+ private int _offset;
+ private int _numRows;
+ private long _cursorResultWriteTimeMs;
+ private long _cursorFetchTimeMs;
+ private long _submissionTimeMs;
+ private long _expirationTimeMs;
+ private String _brokerHost;
+ private int _brokerPort;
+ private long _bytesWritten;
+
+ public CursorResponseNative() {
+ }
+
+ public CursorResponseNative(BrokerResponse response) {
+ // Copy all the member variables of BrokerResponse to CursorResponse.
+ setResultTable(response.getResultTable());
+ setNumRowsResultSet(response.getNumRowsResultSet());
+ setExceptions(response.getExceptions());
+ setNumGroupsLimitReached(response.isNumGroupsLimitReached());
+ setTimeUsedMs(response.getTimeUsedMs());
+ setRequestId(response.getRequestId());
+ setBrokerId(response.getBrokerId());
+ setNumDocsScanned(response.getNumDocsScanned());
+ setTotalDocs(response.getTotalDocs());
+ setNumEntriesScannedInFilter(response.getNumEntriesScannedInFilter());
+ setNumEntriesScannedPostFilter(response.getNumEntriesScannedPostFilter());
+ setNumServersQueried(response.getNumServersQueried());
+ setNumServersResponded(response.getNumServersResponded());
+ setNumSegmentsQueried(response.getNumSegmentsQueried());
+ setNumSegmentsProcessed(response.getNumSegmentsProcessed());
+ setNumSegmentsMatched(response.getNumSegmentsMatched());
+ setNumConsumingSegmentsQueried(response.getNumConsumingSegmentsQueried());
+
setNumConsumingSegmentsProcessed(response.getNumConsumingSegmentsProcessed());
+ setNumConsumingSegmentsMatched(response.getNumConsumingSegmentsMatched());
+ setMinConsumingFreshnessTimeMs(response.getMinConsumingFreshnessTimeMs());
+ setNumSegmentsPrunedByBroker(response.getNumSegmentsPrunedByBroker());
+ setNumSegmentsPrunedByServer(response.getNumSegmentsPrunedByServer());
+ setNumSegmentsPrunedInvalid(response.getNumSegmentsPrunedInvalid());
+ setNumSegmentsPrunedByLimit(response.getNumSegmentsPrunedByLimit());
+ setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue());
+ setBrokerReduceTimeMs(response.getBrokerReduceTimeMs());
+ setOfflineThreadCpuTimeNs(response.getOfflineThreadCpuTimeNs());
+ setRealtimeThreadCpuTimeNs(response.getRealtimeThreadCpuTimeNs());
+
setOfflineSystemActivitiesCpuTimeNs(response.getOfflineSystemActivitiesCpuTimeNs());
+
setRealtimeSystemActivitiesCpuTimeNs(response.getRealtimeSystemActivitiesCpuTimeNs());
+
setOfflineResponseSerializationCpuTimeNs(response.getOfflineResponseSerializationCpuTimeNs());
+
setRealtimeResponseSerializationCpuTimeNs(response.getRealtimeResponseSerializationCpuTimeNs());
+
setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments());
+
setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
+ setTraceInfo(response.getTraceInfo());
+ setTablesQueried(response.getTablesQueried());
+ }
+
+ @Override
+ public String getBrokerHost() {
+ return _brokerHost;
+ }
+
+ @Override
+ public void setBrokerHost(String brokerHost) {
+ _brokerHost = brokerHost;
+ }
+
+ @Override
+ public int getBrokerPort() {
+ return _brokerPort;
+ }
+
+ @Override
+ public void setBrokerPort(int brokerPort) {
+ _brokerPort = brokerPort;
+ }
+
+ @Override
+ public void setOffset(int offset) {
+ _offset = offset;
+ }
+
+ @Override
+ public void setNumRows(int numRows) {
+ _numRows = numRows;
+ }
+
+ @Override
+ public void setCursorFetchTimeMs(long cursorFetchTimeMs) {
+ _cursorFetchTimeMs = cursorFetchTimeMs;
+ }
+
+ public long getSubmissionTimeMs() {
+ return _submissionTimeMs;
+ }
+
+ @Override
+ public void setSubmissionTimeMs(long submissionTimeMs) {
+ _submissionTimeMs = submissionTimeMs;
+ }
+
+ public long getExpirationTimeMs() {
+ return _expirationTimeMs;
+ }
+
+ @Override
+ public void setBytesWritten(long bytesWritten) {
+ _bytesWritten = bytesWritten;
+ }
+
+ @Override
+ public long getBytesWritten() {
+ return _bytesWritten;
+ }
+
+ @Override
+ public void setExpirationTimeMs(long expirationTimeMs) {
+ _expirationTimeMs = expirationTimeMs;
+ }
+
+ @Override
+ public int getOffset() {
+ return _offset;
+ }
+
+ @Override
+ public int getNumRows() {
+ return _numRows;
+ }
+
+ @Override
+ public long getCursorResultWriteTimeMs() {
+ return _cursorResultWriteTimeMs;
+ }
+
+ @Override
+ public void setCursorResultWriteTimeMs(long cursorResultWriteMs) {
+ _cursorResultWriteTimeMs = cursorResultWriteMs;
+ }
+
+ @Override
+ public long getCursorFetchTimeMs() {
+ return _cursorFetchTimeMs;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 8dbd4bb402..32f97a0c14 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -190,6 +190,15 @@ public class QueryOptionsUtils {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.USE_MULTISTAGE_ENGINE));
}
+ public static boolean isGetCursor(Map<String, String> queryOptions) {
+ return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.GET_CURSOR));
+ }
+
+ public static Integer getCursorNumRows(Map<String, String> queryOptions) {
+ String cursorNumRows = queryOptions.get(QueryOptionKey.CURSOR_NUM_ROWS);
+ return checkedParseIntPositive(QueryOptionKey.CURSOR_NUM_ROWS,
cursorNumRows);
+ }
+
public static Optional<Boolean> isExplainAskingServers(Map<String, String>
queryOptions) {
String value = queryOptions.get(QueryOptionKey.EXPLAIN_ASKING_SERVERS);
if (value == null) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 0326f97a7b..171e850638 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -91,6 +91,7 @@ import
org.apache.pinot.controller.api.access.AccessControlFactory;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
import org.apache.pinot.controller.api.resources.ControllerFilePathProvider;
import
org.apache.pinot.controller.api.resources.InvalidControllerConfigException;
+import org.apache.pinot.controller.cursors.ResponseStoreCleaner;
import org.apache.pinot.controller.helix.RealtimeConsumerMonitor;
import org.apache.pinot.controller.helix.SegmentStatusChecker;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -895,6 +896,10 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
new TaskMetricsEmitter(_helixResourceManager,
_helixTaskResourceManager, _leadControllerManager, _config,
_controllerMetrics);
periodicTasks.add(_taskMetricsEmitter);
+ PeriodicTask responseStoreCleaner = new ResponseStoreCleaner(_config,
_helixResourceManager, _leadControllerManager,
+ _controllerMetrics, _executorService, _connectionManager);
+ periodicTasks.add(responseStoreCleaner);
+
return periodicTasks;
}
@@ -977,4 +982,13 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
protected ControllerAdminApiApplication createControllerAdminApp() {
return new ControllerAdminApiApplication(_config);
}
+
+ /**
+ * Return the PeriodicTaskScheduler instance so that the periodic tasks can
be tested.
+ * @return PeriodicTaskScheduler.
+ */
+ @VisibleForTesting
+ public PeriodicTaskScheduler getPeriodicTaskScheduler() {
+ return _periodicTaskScheduler;
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 612aa9bafe..f7d0deb137 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -51,6 +51,7 @@ public class ControllerConf extends PinotConfiguration {
public static final String CONTROLLER_BROKER_PROTOCOL =
"controller.broker.protocol";
public static final String CONTROLLER_BROKER_PORT_OVERRIDE =
"controller.broker.port.override";
public static final String CONTROLLER_BROKER_TLS_PREFIX =
"controller.broker.tls";
+ public static final String CONTROLLER_BROKER_AUTH_PREFIX =
"controller.broker.auth";
public static final String CONTROLLER_TLS_PREFIX = "controller.tls";
public static final String CONTROLLER_HOST = "controller.host";
public static final String CONTROLLER_PORT = "controller.port";
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
new file mode 100644
index 0000000000..220533d235
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/cursors/ResponseStoreCleaner.java
@@ -0,0 +1,222 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.cursors;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hc.client5.http.classic.methods.HttpDelete;
+import org.apache.hc.client5.http.classic.methods.HttpGet;
+import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase;
+import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
+import org.apache.hc.core5.http.io.entity.EntityUtils;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.auth.AuthProviderUtils;
+import org.apache.pinot.common.http.MultiHttpRequest;
+import org.apache.pinot.common.http.MultiHttpRequestResponse;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.LeadControllerManager;
+import org.apache.pinot.controller.api.resources.InstanceInfo;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
+import org.apache.pinot.spi.auth.AuthProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * ResponseStoreCleaner periodically gets all responses stored in a response
store and deletes the ones that have
+ * expired. From each broker, tt gets the list of responses. Each of the
response has an expiration unix timestamp.
+ * If the current timestamp is greater, it calls a DELETE API for every
response that has expired.
+ */
+public class ResponseStoreCleaner extends ControllerPeriodicTask<Void> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ResponseStoreCleaner.class);
+ private static final int TIMEOUT_MS = 3000;
+ private static final String QUERY_RESULT_STORE = "%s://%s:%d/responseStore";
+ private static final String DELETE_QUERY_RESULT =
"%s://%s:%d/responseStore/%s";
+ // Used in tests to trigger the delete instead of waiting for the wall clock
to move to an appropriate time.
+ public static final String CLEAN_AT_TIME =
"response.store.cleaner.clean.at.ms";
+ private final ControllerConf _controllerConf;
+ private final Executor _executor;
+ private final PoolingHttpClientConnectionManager _connectionManager;
+ private final AuthProvider _authProvider;
+
+ public ResponseStoreCleaner(ControllerConf config, PinotHelixResourceManager
pinotHelixResourceManager,
+ LeadControllerManager leadControllerManager, ControllerMetrics
controllerMetrics, Executor executor,
+ PoolingHttpClientConnectionManager connectionManager) {
+ super("ResponseStoreCleaner", getFrequencyInSeconds(config),
getInitialDelayInSeconds(config),
+ pinotHelixResourceManager, leadControllerManager, controllerMetrics);
+ _controllerConf = config;
+ _executor = executor;
+ _connectionManager = connectionManager;
+ _authProvider =
+ AuthProviderUtils.extractAuthProvider(config,
ControllerConf.CONTROLLER_BROKER_AUTH_PREFIX);
+ }
+
+ private static long getInitialDelayInSeconds(ControllerConf config) {
+ long initialDelay = config.getPeriodicTaskInitialDelayInSeconds();
+ String responseStoreCleanerTaskInitialDelay =
+
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_INITIAL_DELAY);
+ if (responseStoreCleanerTaskInitialDelay != null) {
+ initialDelay =
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskInitialDelay),
+ TimeUnit.MILLISECONDS);
+ }
+ return initialDelay;
+ }
+
+ private static long getFrequencyInSeconds(ControllerConf config) {
+ long frequencyInSeconds = TimeUnit.SECONDS.convert(
+
TimeUtils.convertPeriodToMillis(CommonConstants.CursorConfigs.DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD),
+ TimeUnit.MILLISECONDS);
+ String responseStoreCleanerTaskPeriod =
+
config.getProperty(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD);
+ if (responseStoreCleanerTaskPeriod != null) {
+ frequencyInSeconds =
TimeUnit.SECONDS.convert(TimeUtils.convertPeriodToMillis(responseStoreCleanerTaskPeriod),
+ TimeUnit.MILLISECONDS);
+ }
+
+ return frequencyInSeconds;
+ }
+
+ @Override
+ protected void processTables(List<String> tableNamesWithType, Properties
periodicTaskProperties) {
+ long cleanAtMs = System.currentTimeMillis();
+ String cleanAtMsStr = periodicTaskProperties.getProperty(CLEAN_AT_TIME);
+ if (cleanAtMsStr != null) {
+ cleanAtMs = Long.parseLong(cleanAtMsStr);
+ }
+ doClean(cleanAtMs);
+ }
+
+ public void doClean(long currentTime) {
+ List<InstanceConfig> brokerList =
_pinotHelixResourceManager.getAllBrokerInstanceConfigs();
+ Map<String, InstanceInfo> brokers = new HashMap<>();
+ for (InstanceConfig broker : brokerList) {
+ brokers.put(getInstanceKey(broker.getHostName(), broker.getPort()),
+ new InstanceInfo(broker.getInstanceName(), broker.getHostName(),
Integer.parseInt(broker.getPort())));
+ }
+
+ try {
+ Map<String, String> requestHeaders =
AuthProviderUtils.makeAuthHeadersMap(_authProvider);
+
+ Map<String, List<CursorResponseNative>> brokerCursorsMap =
getAllQueryResults(brokers, requestHeaders);
+
+ String protocol = _controllerConf.getControllerBrokerProtocol();
+ int portOverride = _controllerConf.getControllerBrokerPortOverride();
+
+ List<String> brokerUrls = new ArrayList<>();
+ for (Map.Entry<String, List<CursorResponseNative>> entry :
brokerCursorsMap.entrySet()) {
+ for (CursorResponse response : entry.getValue()) {
+ if (response.getExpirationTimeMs() <= currentTime) {
+ InstanceInfo broker = brokers.get(entry.getKey());
+ int port = portOverride > 0 ? portOverride : broker.getPort();
+ brokerUrls.add(
+ String.format(DELETE_QUERY_RESULT, protocol, broker.getHost(),
port, response.getRequestId()));
+ }
+ }
+ Map<String, String> deleteStatus = getResponseMap(requestHeaders,
brokerUrls, "DELETE", HttpDelete::new);
+
+ deleteStatus.forEach(
+ (key, value) -> LOGGER.info("ResponseStore delete response -
Broker: {}. Response: {}", key, value));
+ }
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+
+ private Map<String, List<CursorResponseNative>>
getAllQueryResults(Map<String, InstanceInfo> brokers,
+ Map<String, String> requestHeaders)
+ throws Exception {
+ String protocol = _controllerConf.getControllerBrokerProtocol();
+ int portOverride = _controllerConf.getControllerBrokerPortOverride();
+ List<String> brokerUrls = new ArrayList<>();
+ for (InstanceInfo broker : brokers.values()) {
+ int port = portOverride > 0 ? portOverride : broker.getPort();
+ brokerUrls.add(String.format(QUERY_RESULT_STORE, protocol,
broker.getHost(), port));
+ }
+ LOGGER.debug("Getting running queries via broker urls: {}", brokerUrls);
+ Map<String, String> strResponseMap = getResponseMap(requestHeaders,
brokerUrls, "GET", HttpGet::new);
+ return
strResponseMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
e -> {
+ try {
+ return JsonUtils.stringToObject(e.getValue(), new TypeReference<>() {
+ });
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }));
+ }
+
+ private <T extends HttpUriRequestBase> Map<String, String>
getResponseMap(Map<String, String> requestHeaders,
+ List<String> brokerUrls, String methodName, Function<String, T>
httpRequestBaseSupplier)
+ throws Exception {
+ List<Pair<String, String>> urlsAndRequestBodies = new
ArrayList<>(brokerUrls.size());
+ brokerUrls.forEach((url) -> urlsAndRequestBodies.add(Pair.of(url, "")));
+
+ CompletionService<MultiHttpRequestResponse> completionService =
+ new MultiHttpRequest(_executor,
_connectionManager).execute(urlsAndRequestBodies, requestHeaders,
+ ResponseStoreCleaner.TIMEOUT_MS, methodName,
httpRequestBaseSupplier);
+ Map<String, String> responseMap = new HashMap<>();
+ List<String> errMessages = new ArrayList<>(brokerUrls.size());
+ for (int i = 0; i < brokerUrls.size(); i++) {
+ try (MultiHttpRequestResponse httpRequestResponse =
completionService.take().get()) {
+ // The completion order is different from brokerUrls, thus use uri in
the response.
+ URI uri = httpRequestResponse.getURI();
+ int status = httpRequestResponse.getResponse().getCode();
+ String responseString =
EntityUtils.toString(httpRequestResponse.getResponse().getEntity());
+ // Unexpected server responses are collected and returned as exception.
+ if (status != 200) {
+ throw new Exception(
+ String.format("Unexpected status=%d and response='%s' from
uri='%s'", status, responseString, uri));
+ }
+ responseMap.put((getInstanceKey(uri.getHost(),
Integer.toString(uri.getPort()))), responseString);
+ } catch (Exception e) {
+ LOGGER.error("Failed to execute {} op. ", methodName, e);
+ // Can't just throw exception from here as there is a need to release
the other connections.
+ // So just collect the error msg to throw them together after the
for-loop.
+ errMessages.add(e.getMessage());
+ }
+ }
+ if (!errMessages.isEmpty()) {
+ throw new Exception("Unexpected responses from brokers: " +
StringUtils.join(errMessages, ","));
+ }
+ return responseMap;
+ }
+
+ private static String getInstanceKey(String hostname, String port) {
+ return hostname + ":" + port;
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
index e3014b82a8..305c0a26a0 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterStatelessTest.java
@@ -57,7 +57,7 @@ public class ControllerPeriodicTaskStarterStatelessTest
extends ControllerTest {
}
private class MockControllerStarter extends ControllerStarter {
- private static final int NUM_PERIODIC_TASKS = 11;
+ private static final int NUM_PERIODIC_TASKS = 12;
public MockControllerStarter() {
super();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index d92ee5f1b4..96e4f27790 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -97,6 +97,8 @@ public class Actions {
public static final String UPLOAD_SEGMENT = "UploadSegment";
public static final String GET_INSTANCE_PARTITIONS =
"GetInstancePartitions";
public static final String UPDATE_INSTANCE_PARTITIONS =
"UpdateInstancePartitions";
+ public static final String GET_RESPONSE_STORE = "GetResponseStore";
+ public static final String DELETE_RESPONSE_STORE = "DeleteResponseStore";
}
// Action names for table
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorFsIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorFsIntegrationTest.java
new file mode 100644
index 0000000000..6dac55deca
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorFsIntegrationTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class CursorFsIntegrationTest extends CursorIntegrationTest {
+ @Override
+ protected void overrideBrokerConf(PinotConfiguration configuration) {
+
configuration.setProperty(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE
+ ".protocol", "file");
+ File tmpPath = new File(_tempDir, "tmp");
+ File dataPath = new File(_tempDir, "data");
+
configuration.setProperty(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE
+ ".file.temp.dir",
+ tmpPath);
+ configuration.setProperty(
+ CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE +
".file.data.dir", "file://" + dataPath);
+ }
+
+ @Override
+ protected Object[][] getPageSizesAndQueryEngine() {
+ return new Object[][]{
+ {false, 1000}, {false, 0}, // 0 triggers default behaviour
+ {true, 1000}, {true, 0}, // 0 triggers default behaviour
+ };
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorIntegrationTest.java
new file mode 100644
index 0000000000..116654395f
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorIntegrationTest.java
@@ -0,0 +1,425 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.controller.cursors.ResponseStoreCleaner;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.util.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class CursorIntegrationTest extends BaseClusterIntegrationTestSet {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CursorIntegrationTest.class);
+ private static final int NUM_OFFLINE_SEGMENTS = 8;
+ private static final int COUNT_STAR_RESULT = 79003;
+ private static final String TEST_QUERY_ONE =
+ "SELECT SUM(CAST(CAST(ArrTime AS varchar) AS LONG)) FROM mytable WHERE
DaysSinceEpoch <> 16312 AND Carrier = "
+ + "'DL'";
+ private static final String TEST_QUERY_TWO =
+ "SELECT CAST(CAST(ArrTime AS varchar) AS LONG) FROM mytable WHERE
DaysSinceEpoch <> 16312 AND Carrier = 'DL' "
+ + "ORDER BY ArrTime DESC";
+ private static final String TEST_QUERY_THREE =
+ "SELECT ArrDelay, CarrierDelay, (ArrDelay - CarrierDelay) AS diff FROM
mytable WHERE ArrDelay > CarrierDelay "
+ + "ORDER BY diff, ArrDelay, CarrierDelay LIMIT 100000";
+ private static final String EMPTY_RESULT_QUERY =
+ "SELECT SUM(CAST(CAST(ArrTime AS varchar) AS LONG)) FROM mytable WHERE
DaysSinceEpoch <> 16312 AND 1 != 1";
+
+ private static int _resultSize;
+
+ @Override
+ protected void overrideControllerConf(Map<String, Object> properties) {
+
properties.put(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD,
"5m");
+ }
+
+ @Override
+ protected void overrideBrokerConf(PinotConfiguration configuration) {
+
configuration.setProperty(CommonConstants.CursorConfigs.PREFIX_OF_CONFIG_OF_RESPONSE_STORE
+ ".type", "memory");
+ }
+
+ protected long getCountStarResult() {
+ return COUNT_STAR_RESULT;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start Zk, Kafka and Pinot
+ startZk();
+ startController();
+ startBroker();
+ startServer();
+
+ List<File> avroFiles = getAllAvroFiles();
+ List<File> offlineAvroFiles = getOfflineAvroFiles(avroFiles,
NUM_OFFLINE_SEGMENTS);
+
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ getControllerRequestClient().addSchema(schema);
+ TableConfig offlineTableConfig = createOfflineTableConfig();
+ addTableConfig(offlineTableConfig);
+
+ // Create and upload segments
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles,
offlineTableConfig, schema, 0, _segmentDir,
+ _tarDir);
+ uploadSegments(getTableName(), _tarDir);
+
+ // Initialize the query generator
+ setUpQueryGenerator(avroFiles);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(100_000L);
+ }
+
+ protected String getBrokerGetAllResponseStoresApiUrl(String
brokerBaseApiUrl) {
+ return brokerBaseApiUrl + "/responseStore";
+ }
+
+ protected String getBrokerResponseApiUrl(String brokerBaseApiUrl, String
requestId) {
+ return getBrokerGetAllResponseStoresApiUrl(brokerBaseApiUrl) + "/" +
requestId + "/results";
+ }
+
+ protected String getBrokerDeleteResponseStoresApiUrl(String
brokerBaseApiUrl, String requestId) {
+ return getBrokerGetAllResponseStoresApiUrl(brokerBaseApiUrl) + "/" +
requestId;
+ }
+
+ protected String getCursorQueryProperties(int numRows) {
+ return String.format("?getCursor=true&numRows=%d", numRows);
+ }
+
+ protected String getCursorOffset(int offset) {
+ return String.format("?offset=%d", offset);
+ }
+
+ protected String getCursorOffset(int offset, int numRows) {
+ return String.format("?offset=%d&numRows=%d", offset, numRows);
+ }
+
+ protected Map<String, String> getHeaders() {
+ return Collections.emptyMap();
+ }
+
+ /*
+ * This test does not use H2 to compare results. Instead, it compares
results got from iterating through a
+ * cursor AND the complete result set.
+ * Right now, it only compares the number of rows and all columns and rows.
+ */
+ @Override
+ protected void testQuery(String pinotQuery, String h2Query)
+ throws Exception {
+ String queryResourceUrl = getBrokerBaseApiUrl();
+ Map<String, String> headers = getHeaders();
+ Map<String, String> extraJsonProperties = getExtraQueryProperties();
+
+ // Get Pinot BrokerResponse without cursors
+ JsonNode pinotResponse;
+ pinotResponse = ClusterTest.postQuery(pinotQuery,
+ ClusterIntegrationTestUtils.getBrokerQueryApiUrl(queryResourceUrl,
useMultiStageQueryEngine()), headers,
+ extraJsonProperties);
+ if (!pinotResponse.get("exceptions").isEmpty()) {
+ throw new RuntimeException("Got Exceptions from Query Response: " +
pinotResponse);
+ }
+ int brokerResponseSize = pinotResponse.get("numRowsResultSet").asInt();
+
+ // Get a list of responses using cursors.
+ CursorResponse pinotPagingResponse;
+ pinotPagingResponse =
JsonUtils.jsonNodeToObject(ClusterTest.postQuery(pinotQuery,
+ ClusterIntegrationTestUtils.getBrokerQueryApiUrl(queryResourceUrl,
useMultiStageQueryEngine())
+ + getCursorQueryProperties(_resultSize), headers,
getExtraQueryProperties()), CursorResponseNative.class);
+ if (!pinotPagingResponse.getExceptions().isEmpty()) {
+ throw new RuntimeException("Got Exceptions from Query Response: " +
pinotPagingResponse.getExceptions().get(0));
+ }
+ List<CursorResponse> resultPages = getAllResultPages(queryResourceUrl,
headers, pinotPagingResponse, _resultSize);
+
+ int brokerPagingResponseSize = 0;
+ for (CursorResponse response : resultPages) {
+ brokerPagingResponseSize += response.getNumRows();
+ }
+
+ // Compare the number of rows.
+ if (brokerResponseSize != brokerPagingResponseSize) {
+ throw new RuntimeException(
+ "Pinot # of rows from paging API " + brokerPagingResponseSize + "
doesn't match # of rows from default API "
+ + brokerResponseSize);
+ }
+ }
+
+ private List<CursorResponse> getAllResultPages(String queryResourceUrl,
Map<String, String> headers,
+ CursorResponse firstResponse, int numRows)
+ throws Exception {
+ numRows = numRows == 0 ?
CommonConstants.CursorConfigs.DEFAULT_CURSOR_FETCH_ROWS : numRows;
+
+ List<CursorResponse> resultPages = new ArrayList<>();
+ resultPages.add(firstResponse);
+ int totalRows = firstResponse.getNumRowsResultSet();
+
+ int offset = firstResponse.getNumRows();
+ while (offset < totalRows) {
+ CursorResponse response =
JsonUtils.stringToObject(ClusterTest.sendGetRequest(
+ getBrokerResponseApiUrl(queryResourceUrl,
firstResponse.getRequestId()) + getCursorOffset(offset, numRows),
+ headers), CursorResponseNative.class);
+ resultPages.add(response);
+ offset += response.getNumRows();
+ }
+ return resultPages;
+ }
+
+ protected Object[][] getPageSizesAndQueryEngine() {
+ return new Object[][]{
+ {false, 2}, {false, 3}, {false, 10}, {false, 0}, //0 trigger default
behaviour
+ {true, 2}, {true, 3}, {true, 10}, {true, 0} //0 trigger default
behaviour
+ };
+ }
+
+ @DataProvider(name = "pageSizeAndQueryEngineProvider")
+ public Object[][] pageSizeAndQueryEngineProvider() {
+ return getPageSizesAndQueryEngine();
+ }
+
+ // Test hard coded queries with SSE/MSE AND different cursor response sizes.
+ @Test(dataProvider = "pageSizeAndQueryEngineProvider")
+ public void testHardcodedQueries(boolean useMultiStageEngine, int pageSize)
+ throws Exception {
+ _resultSize = pageSize;
+ setUseMultiStageQueryEngine(useMultiStageEngine);
+ super.testHardcodedQueries();
+ }
+
+ // Test a simple cursor workflow.
+ @Test(dataProvider = "useBothQueryEngines")
+ public void testCursorWorkflow(boolean useMultiStageQueryEngine)
+ throws Exception {
+ _resultSize = 10000;
+ setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+ // Submit query
+ CursorResponse pinotPagingResponse;
+ JsonNode jsonNode = ClusterTest.postQuery(TEST_QUERY_THREE,
+
ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(),
useMultiStageQueryEngine())
+ + getCursorQueryProperties(_resultSize), getHeaders(),
getExtraQueryProperties());
+
+ pinotPagingResponse = JsonUtils.jsonNodeToObject(jsonNode,
CursorResponseNative.class);
+ if (!pinotPagingResponse.getExceptions().isEmpty()) {
+ throw new RuntimeException("Got Exceptions from Query Response: " +
pinotPagingResponse.getExceptions().get(0));
+ }
+ String requestId = pinotPagingResponse.getRequestId();
+
+ Assert.assertFalse(pinotPagingResponse.getBrokerHost().isEmpty());
+ Assert.assertTrue(pinotPagingResponse.getBrokerPort() > 0);
+ Assert.assertTrue(pinotPagingResponse.getCursorFetchTimeMs() >= 0);
+ Assert.assertTrue(pinotPagingResponse.getCursorResultWriteTimeMs() >= 0);
+
+ int totalRows = pinotPagingResponse.getNumRowsResultSet();
+ int offset = pinotPagingResponse.getNumRows();
+ while (offset < totalRows) {
+ pinotPagingResponse =
JsonUtils.stringToObject(ClusterTest.sendGetRequest(
+ getBrokerResponseApiUrl(getBrokerBaseApiUrl(), requestId) +
getCursorOffset(offset, _resultSize),
+ getHeaders()), CursorResponseNative.class);
+
+ Assert.assertFalse(pinotPagingResponse.getBrokerHost().isEmpty());
+ Assert.assertTrue(pinotPagingResponse.getBrokerPort() > 0);
+ Assert.assertTrue(pinotPagingResponse.getCursorFetchTimeMs() >= 0);
+ offset += _resultSize;
+ }
+
ClusterTest.sendDeleteRequest(getBrokerDeleteResponseStoresApiUrl(getBrokerBaseApiUrl(),
requestId), getHeaders());
+ }
+
+ @Test
+ public void testGetAndDelete()
+ throws Exception {
+ _resultSize = 100000;
+ testQuery(TEST_QUERY_ONE);
+ testQuery(TEST_QUERY_TWO);
+
+ List<CursorResponseNative> requestIds = JsonUtils.stringToObject(
+
ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()),
getHeaders()),
+ new TypeReference<>() {
+ });
+
+ Assert.assertEquals(requestIds.size(), 2);
+
+ // Delete the first one
+ String deleteRequestId = requestIds.get(0).getRequestId();
+
ClusterTest.sendDeleteRequest(getBrokerDeleteResponseStoresApiUrl(getBrokerBaseApiUrl(),
deleteRequestId),
+ getHeaders());
+
+ requestIds = JsonUtils.stringToObject(
+
ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()),
getHeaders()),
+ new TypeReference<>() {
+ });
+
+ Assert.assertEquals(requestIds.size(), 1);
+ Assert.assertNotEquals(requestIds.get(0).getRequestId(), deleteRequestId);
+ }
+
+ @Test
+ public void testBadGet() {
+ try {
+
ClusterTest.sendGetRequest(getBrokerResponseApiUrl(getBrokerBaseApiUrl(),
"dummy") + getCursorOffset(0),
+ getHeaders());
+ } catch (IOException e) {
+ HttpErrorStatusException h = (HttpErrorStatusException) e.getCause();
+ Assert.assertEquals(h.getStatusCode(), 404);
+ Assert.assertTrue(h.getMessage().contains("Query results for dummy not
found"));
+ }
+ }
+
+ @Test
+ public void testBadDelete() {
+ try {
+
ClusterTest.sendDeleteRequest(getBrokerDeleteResponseStoresApiUrl(getBrokerBaseApiUrl(),
"dummy"), getHeaders());
+ } catch (IOException e) {
+ HttpErrorStatusException h = (HttpErrorStatusException) e.getCause();
+ Assert.assertEquals(h.getStatusCode(), 404);
+ Assert.assertTrue(h.getMessage().contains("Query results for dummy not
found"));
+ }
+ }
+
+ @Test
+ public void testQueryWithEmptyResult()
+ throws Exception {
+ JsonNode pinotResponse = ClusterTest.postQuery(EMPTY_RESULT_QUERY,
+
ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(),
useMultiStageQueryEngine())
+ + getCursorQueryProperties(1000), getHeaders(),
getExtraQueryProperties());
+
+ // There should be no resultTable.
+ Assert.assertNull(pinotResponse.get("resultTable"));
+ // Total Rows in result set should be 0.
+ Assert.assertEquals(pinotResponse.get("numRowsResultSet").asInt(), 0);
+ // Rows in the current response should be 0
+ Assert.assertEquals(pinotResponse.get("numRows").asInt(), 0);
+ Assert.assertTrue(pinotResponse.get("exceptions").isEmpty());
+ }
+
+ @DataProvider(name = "InvalidOffsetQueryProvider")
+ public Object[][] invalidOffsetQueryProvider() {
+ return new Object[][]{{TEST_QUERY_ONE}, {EMPTY_RESULT_QUERY}};
+ }
+
+ @Test(dataProvider = "InvalidOffsetQueryProvider", expectedExceptions =
IOException.class,
+ expectedExceptionsMessageRegExp = ".*Offset \\d+ should be lesser than
totalRecords \\d+.*")
+ public void testGetInvalidOffset(String query)
+ throws Exception {
+ CursorResponse pinotPagingResponse;
+ pinotPagingResponse =
JsonUtils.jsonNodeToObject(ClusterTest.postQuery(query,
+
ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(),
useMultiStageQueryEngine())
+ + getCursorQueryProperties(_resultSize), getHeaders(),
getExtraQueryProperties()),
+ CursorResponseNative.class);
+ Assert.assertTrue(pinotPagingResponse.getExceptions().isEmpty());
+ ClusterTest.sendGetRequest(
+ getBrokerResponseApiUrl(getBrokerBaseApiUrl(),
pinotPagingResponse.getRequestId()) + getCursorOffset(
+ pinotPagingResponse.getNumRowsResultSet() + 1), getHeaders());
+ }
+
+ @Test
+ public void testQueryWithRuntimeError()
+ throws Exception {
+ String queryWithFromMissing = "SELECT * mytable limit 100";
+ JsonNode pinotResponse;
+ pinotResponse = ClusterTest.postQuery(queryWithFromMissing,
+
ClusterIntegrationTestUtils.getBrokerQueryApiUrl(getBrokerBaseApiUrl(),
useMultiStageQueryEngine())
+ + getCursorQueryProperties(_resultSize), getHeaders(),
getExtraQueryProperties());
+ Assert.assertFalse(pinotResponse.get("exceptions").isEmpty());
+ JsonNode exception = pinotResponse.get("exceptions").get(0);
+
Assert.assertTrue(exception.get("message").asText().startsWith("QueryValidationError:"));
+ Assert.assertEquals(exception.get("errorCode").asInt(), 700);
+
Assert.assertTrue(pinotResponse.get("brokerId").asText().startsWith("Broker_"));
+ // There should be no resultTable.
+ Assert.assertNull(pinotResponse.get("resultTable"));
+ }
+
+ @Test
+ public void testResponseStoreCleaner()
+ throws Exception {
+ List<CursorResponseNative> requestIds = JsonUtils.stringToObject(
+
ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()),
getHeaders()),
+ new TypeReference<>() {
+ });
+
+ int numQueryResults = requestIds.size();
+
+ _resultSize = 100000;
+ this.testQuery(TEST_QUERY_ONE);
+ // Sleep so that both the queries do not have the same submission time.
+ Thread.sleep(50);
+ this.testQuery(TEST_QUERY_TWO);
+
+ requestIds = JsonUtils.stringToObject(
+
ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()),
getHeaders()),
+ new TypeReference<>() {
+ });
+
+ int numQueryResultsAfter = requestIds.size();
+ Assert.assertEquals(requestIds.size() - numQueryResults, 2);
+
+ CursorResponseNative cursorResponse0 = JsonUtils.stringToObject(
+
ClusterTest.sendGetRequest(getBrokerResponseApiUrl(getBrokerBaseApiUrl(),
requestIds.get(0).getRequestId()),
+ getHeaders()), new TypeReference<>() {
+ });
+
+ CursorResponseNative cursorResponse1 = JsonUtils.stringToObject(
+
ClusterTest.sendGetRequest(getBrokerResponseApiUrl(getBrokerBaseApiUrl(),
requestIds.get(1).getRequestId()),
+ getHeaders()), new TypeReference<>() {
+ });
+
+ // Get the lower submission time.
+ long expirationTime0 = cursorResponse0.getExpirationTimeMs();
+ long expirationTime1 = cursorResponse1.getExpirationTimeMs();
+
+ Properties perodicTaskProperties = new Properties();
+ perodicTaskProperties.setProperty("requestId", "CursorIntegrationTest");
+ perodicTaskProperties.setProperty(ResponseStoreCleaner.CLEAN_AT_TIME,
+ Long.toString(Math.min(expirationTime0, expirationTime1)));
+
_controllerStarter.getPeriodicTaskScheduler().scheduleNow("ResponseStoreCleaner",
perodicTaskProperties);
+
+ // The periodic task is run in an executor thread. Give the thread some
time to run the cleaner.
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ List<CursorResponse> getNumQueryResults = JsonUtils.stringToObject(
+
ClusterTest.sendGetRequest(getBrokerGetAllResponseStoresApiUrl(getBrokerBaseApiUrl()),
getHeaders()),
+ List.class);
+ return getNumQueryResults.size() < numQueryResultsAfter;
+ } catch (Exception e) {
+ LOGGER.error(e.getMessage());
+ return false;
+ }
+ }, 500L, 100_000L, "Failed to load delete query results", true);
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorWithAuthIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorWithAuthIntegrationTest.java
new file mode 100644
index 0000000000..ebac46edcf
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CursorWithAuthIntegrationTest.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.http.HttpStatus;
+import org.apache.pinot.client.Connection;
+import org.apache.pinot.client.ConnectionFactory;
+import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
+import org.apache.pinot.common.auth.UrlAuthProvider;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.ControllerConf;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.Test;
+
+import static
org.apache.pinot.integration.tests.BasicAuthTestUtils.AUTH_HEADER;
+import static org.apache.pinot.integration.tests.BasicAuthTestUtils.AUTH_TOKEN;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+@Test
+public class CursorWithAuthIntegrationTest extends CursorIntegrationTest {
+ final static String AUTH_PROVIDER_CLASS =
UrlAuthProvider.class.getCanonicalName();
+ final static URL AUTH_URL =
CursorWithAuthIntegrationTest.class.getResource("/url-auth-token.txt");
+ final static String AUTH_PREFIX = "Basic";
+
+ protected Object[][] getPageSizesAndQueryEngine() {
+ return new Object[][]{
+ {false, 1000},
+ {true, 1000}
+ };
+ }
+
+ @Override
+ protected void overrideControllerConf(Map<String, Object> properties) {
+ BasicAuthTestUtils.addControllerConfiguration(properties);
+ properties.put("controller.segment.fetcher.auth.provider.class",
AUTH_PROVIDER_CLASS);
+ properties.put("controller.segment.fetcher.auth.url", AUTH_URL);
+ properties.put("controller.segment.fetcher.auth.prefix", AUTH_PREFIX);
+ properties.put(ControllerConf.CONTROLLER_BROKER_AUTH_PREFIX +
".provider.class", AUTH_PROVIDER_CLASS);
+ properties.put(ControllerConf.CONTROLLER_BROKER_AUTH_PREFIX + ".url",
AUTH_URL);
+ properties.put(ControllerConf.CONTROLLER_BROKER_AUTH_PREFIX + ".prefix",
AUTH_PREFIX);
+
properties.put(CommonConstants.CursorConfigs.RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD,
"5m");
+ }
+
+ @Override
+ protected void overrideBrokerConf(PinotConfiguration configuration) {
+ super.overrideBrokerConf(configuration);
+ BasicAuthTestUtils.addBrokerConfiguration(configuration);
+ }
+
+ @Override
+ protected void overrideServerConf(PinotConfiguration serverConf) {
+ BasicAuthTestUtils.addServerConfiguration(serverConf);
+ serverConf.setProperty("pinot.server.segment.fetcher.auth.provider.class",
AUTH_PROVIDER_CLASS);
+ serverConf.setProperty("pinot.server.segment.fetcher.auth.url", AUTH_URL);
+ serverConf.setProperty("pinot.server.segment.fetcher.auth.prefix",
AUTH_PREFIX);
+
serverConf.setProperty("pinot.server.segment.uploader.auth.provider.class",
AUTH_PROVIDER_CLASS);
+ serverConf.setProperty("pinot.server.segment.uploader.auth.url", AUTH_URL);
+ serverConf.setProperty("pinot.server.segment.uploader.auth.prefix",
AUTH_PREFIX);
+ serverConf.setProperty("pinot.server.instance.auth.provider.class",
AUTH_PROVIDER_CLASS);
+ serverConf.setProperty("pinot.server.instance.auth.url", AUTH_URL);
+ serverConf.setProperty("pinot.server.instance.auth.prefix", AUTH_PREFIX);
+ }
+
+ @Override
+ protected Map<String, String> getHeaders() {
+ return BasicAuthTestUtils.AUTH_HEADER;
+ }
+
+ @Override
+ public ControllerRequestClient getControllerRequestClient() {
+ if (_controllerRequestClient == null) {
+ _controllerRequestClient =
+ new ControllerRequestClient(_controllerRequestURLBuilder,
getHttpClient(), AUTH_HEADER);
+ }
+ return _controllerRequestClient;
+ }
+
+ @Override
+ protected Connection getPinotConnection() {
+ if (_pinotConnection == null) {
+ JsonAsyncHttpPinotClientTransportFactory factory = new
JsonAsyncHttpPinotClientTransportFactory();
+ factory.setHeaders(AUTH_HEADER);
+
+ _pinotConnection =
+ ConnectionFactory.fromZookeeper(getZkUrl() + "/" +
getHelixClusterName(), factory.buildTransport());
+ }
+ return _pinotConnection;
+ }
+
+ /**
+ * Upload all segments inside the given directories to the cluster.
+ */
+ @Override
+ protected void uploadSegments(String tableName, TableType tableType,
List<File> tarDirs)
+ throws Exception {
+ List<File> segmentTarFiles = new ArrayList<>();
+ for (File tarDir : tarDirs) {
+ File[] tarFiles = tarDir.listFiles();
+ assertNotNull(tarFiles);
+ Collections.addAll(segmentTarFiles, tarFiles);
+ }
+ int numSegments = segmentTarFiles.size();
+ assertTrue(numSegments > 0);
+
+ URI uploadSegmentHttpURI =
URI.create(getControllerRequestURLBuilder().forSegmentUpload());
+ NameValuePair
+ tableNameValuePair = new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
tableName);
+ NameValuePair tableTypeValuePair = new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
+ tableType.name());
+ List<NameValuePair> parameters = Arrays.asList(tableNameValuePair,
tableTypeValuePair);
+ List<Header> headers = List.of(new BasicHeader("Authorization",
AUTH_TOKEN));
+
+ try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
+ if (numSegments == 1) {
+ File segmentTarFile = segmentTarFiles.get(0);
+ if (System.currentTimeMillis() % 2 == 0) {
+ assertEquals(
+ fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
segmentTarFile.getName(), segmentTarFile,
+ headers, parameters,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode(), HttpStatus.SC_OK);
+ } else {
+ assertEquals(
+ uploadSegmentWithOnlyMetadata(tableName, tableType,
uploadSegmentHttpURI, fileUploadDownloadClient,
+ segmentTarFile), HttpStatus.SC_OK);
+ }
+ } else {
+ // Upload all segments in parallel
+ ExecutorService executorService =
Executors.newFixedThreadPool(numSegments);
+ List<Future<Integer>> futures = new ArrayList<>(numSegments);
+ for (File segmentTarFile : segmentTarFiles) {
+ futures.add(executorService.submit(() -> {
+ if (System.currentTimeMillis() % 2 == 0) {
+ return
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
segmentTarFile.getName(),
+ segmentTarFile, headers, parameters,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
+ } else {
+ return uploadSegmentWithOnlyMetadata(tableName, tableType,
uploadSegmentHttpURI, fileUploadDownloadClient,
+ segmentTarFile);
+ }
+ }));
+ }
+ executorService.shutdown();
+ for (Future<Integer> future : futures) {
+ assertEquals((int) future.get(), HttpStatus.SC_OK);
+ }
+ }
+ }
+ }
+
+ private int uploadSegmentWithOnlyMetadata(String tableName, TableType
tableType, URI uploadSegmentHttpURI,
+ FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile)
+ throws IOException, HttpErrorStatusException {
+ List<Header> headers = List.of(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
+ String.format("file://%s/%s",
segmentTarFile.getParentFile().getAbsolutePath(),
+ URIUtils.encode(segmentTarFile.getName()))),
+ new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+ FileUploadDownloadClient.FileUploadType.METADATA.toString()),
+ new BasicHeader("Authorization", AUTH_TOKEN));
+ // Add table name and table type as request parameters
+ NameValuePair tableNameValuePair =
+ new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
tableName);
+ NameValuePair tableTypeValuePair =
+ new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
tableType.name());
+ List<NameValuePair> parameters = Arrays.asList(tableNameValuePair,
tableTypeValuePair);
+ return
fileUploadDownloadClient.uploadSegmentMetadata(uploadSegmentHttpURI,
segmentTarFile.getName(),
+ segmentTarFile, headers, parameters,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/cursors/MemoryResponseStore.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/cursors/MemoryResponseStore.java
new file mode 100644
index 0000000000..e8cb3fb24e
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/cursors/MemoryResponseStore.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests.cursors;
+
+import com.google.auto.service.AutoService;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import javax.validation.constraints.NotNull;
+import org.apache.pinot.common.cursors.AbstractResponseStore;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.CursorResponse;
+import org.apache.pinot.common.response.broker.CursorResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.spi.cursors.ResponseStore;
+import org.apache.pinot.spi.env.PinotConfiguration;
+
+
+@AutoService(ResponseStore.class)
+public class MemoryResponseStore extends AbstractResponseStore {
+ private final Map<String, CursorResponse> _cursorResponseMap = new
HashMap<>();
+ private final Map<String, ResultTable> _resultTableMap = new HashMap<>();
+
+ private static final String TYPE = "memory";
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ protected void writeResponse(String requestId, CursorResponse response) {
+ _cursorResponseMap.put(requestId, response);
+ }
+
+ @Override
+ protected long writeResultTable(String requestId, ResultTable resultTable) {
+ _resultTableMap.put(requestId, resultTable);
+ return 0;
+ }
+
+ @Override
+ public CursorResponse readResponse(String requestId) {
+ CursorResponse response = _cursorResponseMap.get(requestId);
+ CursorResponse responseCopy = new CursorResponseNative(response);
+
+ responseCopy.setBrokerHost(response.getBrokerHost());
+ responseCopy.setBrokerPort(response.getBrokerPort());
+ responseCopy.setSubmissionTimeMs(response.getSubmissionTimeMs());
+ responseCopy.setExpirationTimeMs(response.getExpirationTimeMs());
+ return responseCopy;
+ }
+
+ @Override
+ protected ResultTable readResultTable(String requestId, int offset, int
numRows) {
+ CursorResponse response = _cursorResponseMap.get(requestId);
+ int totalTableRows = response.getNumRowsResultSet();
+ ResultTable resultTable = _resultTableMap.get(requestId);
+ int sliceEnd = offset + numRows;
+ if (sliceEnd > totalTableRows) {
+ sliceEnd = totalTableRows;
+ }
+
+ return new ResultTable(resultTable.getDataSchema(),
resultTable.getRows().subList(offset, sliceEnd));
+ }
+
+ @Override
+ public void init(@NotNull PinotConfiguration config, @NotNull String
brokerHost, int brokerPort, String brokerId,
+ @NotNull BrokerMetrics brokerMetrics, String expirationTime)
+ throws Exception {
+ init(brokerHost, brokerPort, brokerId, brokerMetrics, expirationTime);
+ }
+
+ @Override
+ public boolean exists(String requestId)
+ throws Exception {
+ return _cursorResponseMap.containsKey(requestId) &&
_resultTableMap.containsKey(requestId);
+ }
+
+ @Override
+ public Collection<String> getAllStoredRequestIds() {
+ return _cursorResponseMap.keySet();
+ }
+
+ @Override
+ protected boolean deleteResponseImpl(String requestId) {
+ return _cursorResponseMap.remove(requestId) != null &&
_resultTableMap.remove(requestId) != null;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStore.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStore.java
new file mode 100644
index 0000000000..e02067045c
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStore.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.cursors;
+
+import java.util.Collection;
+
+
+/**
+ * ResponseStore stores the response of a query. It is identified by the
request id of the query.
+ * There is one instance of a response store in every broker. An instance of
the response store contains responses
+ * of queries submitted to that broker. An implementation of a response store
may use a shared storage system.
+ * Regardless, a response store is expected to operate on responses created by
it.
+ *
+ * Since BrokerResponse cannot be moved SPI package, some of the functions are
declared in AbstractResponseStore
+ * <br/>
+ * Concurrency Model:
+ * <br/>
+ * There are 3 possible roles - writer, reader and delete.
+ * <br/>
+ * There can only be ONE writer and no other concurrent roles can execute.
+ * A response store is written during query execution. During execution, there
can be no reads or deletes as the
+ * query id would not have been provided to the client.
+ * <br/>
+ * There can be multiple readers. There maybe concurrent deletes but no
concurrent writes.
+ * Multiple clients can potentially iterate through the result set.
+ * <br/>
+ * There can be multiple deletes. There maybe concurrent reads but no
concurrent writes.
+ * Multiple clients can potentially call the delete API.
+ * <br/>
+ * Implementations should ensure that concurrent read/delete and delete/delete
operations are handled correctly.
+ */
+public interface ResponseStore {
+ /**
+ * Get the type of the ResponseStore
+ * @return Type of the store
+ */
+ String getType();
+
+ /**
+ * Checks if the response for a requestId exists.
+ * @param requestId The ID of the request
+ * @return True if response exists else false
+ * @throws Exception Thrown if an error occurs when checking if the response
exists.
+ */
+ boolean exists(String requestId)
+ throws Exception;
+
+ /**
+ * Get all request ids of responses in the ResponseStore.
+ * Note that a broker should only return request ids that are created by it
even if it has access to others in a
+ * shared storage.
+ * @return List of request ids
+ */
+ Collection<String> getAllStoredRequestIds()
+ throws Exception;
+
+ /**
+ * Delete a response.
+ *
+ * @param requestId Request id of the query.
+ * @return True if response was found and deleted.
+ * @throws Exception Exception is thrown if response cannot be deleted by
response store.
+ */
+ boolean deleteResponse(String requestId)
+ throws Exception;
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStoreService.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStoreService.java
new file mode 100644
index 0000000000..7c4d2c94b0
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/cursors/ResponseStoreService.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.cursors;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.Set;
+
+
+public class ResponseStoreService {
+ private static volatile ResponseStoreService _instance = fromServiceLoader();
+
+ private final Set<ResponseStore> _allResponseStores;
+ private final Map<String, ResponseStore> _responseStoreByType;
+
+ private ResponseStoreService(Set<ResponseStore> storeSet) {
+ _allResponseStores = storeSet;
+ _responseStoreByType = new HashMap<>();
+
+ for (ResponseStore responseStore : storeSet) {
+ _responseStoreByType.put(responseStore.getType(), responseStore);
+ }
+ }
+
+ public static ResponseStoreService getInstance() {
+ return _instance;
+ }
+
+ public static void setInstance(ResponseStoreService service) {
+ _instance = service;
+ }
+
+ public static ResponseStoreService fromServiceLoader() {
+ Set<ResponseStore> storeSet = new HashSet<>();
+ for (ResponseStore responseStore :
ServiceLoader.load(ResponseStore.class)) {
+ storeSet.add(responseStore);
+ }
+
+ return new ResponseStoreService(storeSet);
+ }
+
+ public Set<ResponseStore> getAllResponseStores() {
+ return _allResponseStores;
+ }
+
+ public Map<String, ResponseStore> getResponseStoresByType() {
+ return _responseStoreByType;
+ }
+
+ public ResponseStore getResponseStore(String type) {
+ ResponseStore responseStore = _responseStoreByType.get(type);
+
+ if (responseStore == null) {
+ throw new IllegalArgumentException("Unknown ResponseStore type: " +
type);
+ }
+
+ return responseStore;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 67bd6191c3..8e27bbccef 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -499,6 +499,11 @@ public class CommonConstants {
// possible.
public static final String OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY
=
"optimizeMaxInitialResultHolderCapacity";
+
+ // Set to true if a cursor should be returned instead of the complete
result set
+ public static final String GET_CURSOR = "getCursor";
+ // Number of rows that the cursor should contain
+ public static final String CURSOR_NUM_ROWS = "cursorNumRows";
}
public static class QueryOptionValue {
@@ -617,6 +622,8 @@ public class CommonConstants {
CONFIG_PREFIX + ".stats.manager.threadpool.size";
public static final int DEFAULT_STATS_MANAGER_THREADPOOL_SIZE = 2;
}
+
+ public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY =
"pinot.broker.storage.factory";
}
public static class Server {
@@ -1315,4 +1322,21 @@ public class CommonConstants {
public static final byte[][] BYTES_ARRAY = new byte[0][];
public static final Object MAP = Collections.emptyMap();
}
+
+ public static class CursorConfigs {
+ public static final String PREFIX_OF_CONFIG_OF_CURSOR =
"pinot.broker.cursor";
+ public static final String PREFIX_OF_CONFIG_OF_RESPONSE_STORE =
"pinot.broker.cursor.response.store";
+ public static final String DEFAULT_RESPONSE_STORE_TYPE = "file";
+ public static final String RESPONSE_STORE_TYPE = "type";
+ public static final int DEFAULT_CURSOR_FETCH_ROWS = 10000;
+ public static final String CURSOR_FETCH_ROWS = PREFIX_OF_CONFIG_OF_CURSOR
+ ".fetch.rows";
+ public static final String DEFAULT_RESULTS_EXPIRATION_INTERVAL = "1h"; //
1 hour.
+ public static final String RESULTS_EXPIRATION_INTERVAL =
PREFIX_OF_CONFIG_OF_RESPONSE_STORE + ".expiration";
+
+ public static final String RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD =
+ "controller.cluster.response.store.cleaner.frequencyPeriod";
+ public static final String DEFAULT_RESPONSE_STORE_CLEANER_FREQUENCY_PERIOD
= "1h";
+ public static final String RESPONSE_STORE_CLEANER_INITIAL_DELAY =
+ "controller.cluster.response.store.cleaner.initialDelay";
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]