This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7bd4dd4be1 Allow headers when making multi get requests + set headers
when making controller API calls (#9170)
7bd4dd4be1 is described below
commit 7bd4dd4be1a49f42b904aba783980b893b5ddba9
Author: Saurabh Dubey <[email protected]>
AuthorDate: Sat Aug 6 03:42:42 2022 +0530
Allow headers when making multi get requests + set headers when making
controller API calls (#9170)
---
.../java/org/apache/pinot/common/http/MultiGetRequest.java | 9 ++++++++-
.../org/apache/pinot/common/http/MultiGetRequestTest.java | 2 +-
.../controller/api/resources/PinotTaskRestletResource.java | 7 ++++++-
.../apache/pinot/controller/util/CompletionServiceHelper.java | 11 +++++++++--
4 files changed, 24 insertions(+), 5 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java
b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java
index f9e765fd7c..9f6e3f3158 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java
@@ -19,9 +19,11 @@
package org.apache.pinot.common.http;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
+import javax.annotation.Nullable;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.methods.GetMethod;
@@ -90,11 +92,13 @@ public class MultiGetRequest {
/**
* GET urls in parallel using the executor service.
* @param urls absolute URLs to GET
+ * @param requestHeaders headers to set when making the request
* @param timeoutMs timeout in milliseconds for each GET request
* @return instance of CompletionService. Completion service will provide
* results as they arrive. The order is NOT same as the order of URLs
*/
- public CompletionService<GetMethod> execute(List<String> urls, int
timeoutMs) {
+ public CompletionService<GetMethod> execute(List<String> urls, @Nullable
Map<String, String> requestHeaders,
+ int timeoutMs) {
HttpClientParams clientParams = new HttpClientParams();
clientParams.setConnectionManagerTimeout(timeoutMs);
HttpClient client = new HttpClient(clientParams, _connectionManager);
@@ -104,6 +108,9 @@ public class MultiGetRequest {
completionService.submit(() -> {
try {
GetMethod getMethod = new GetMethod(url);
+ if (requestHeaders != null) {
+ requestHeaders.forEach(getMethod::setRequestHeader);
+ }
getMethod.getParams().setSoTimeout(timeoutMs);
client.executeMethod(getMethod);
return getMethod;
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java
index a021211fa2..8cfd5a4cfe 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java
@@ -116,7 +116,7 @@ public class MultiGetRequestTest {
// timeout value needs to be less than 5000ms set above for
// third server
final int requestTimeoutMs = 1000;
- CompletionService<GetMethod> completionService = mget.execute(urls,
requestTimeoutMs);
+ CompletionService<GetMethod> completionService = mget.execute(urls, null,
requestTimeoutMs);
int success = 0;
int errors = 0;
int timeouts = 0;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
index 4337c740a9..cae4a3502f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTaskRestletResource.java
@@ -265,6 +265,7 @@ public class PinotTaskRestletResource {
@Path("/tasks/generator/{tableNameWithType}/{taskType}/debug")
@ApiOperation("Fetch task generation information for the recent runs of the
given task for the given table")
public String getTaskGenerationDebugInto(
+ @Context HttpHeaders httpHeaders,
@ApiParam(value = "Task type", required = true) @PathParam("taskType")
String taskType,
@ApiParam(value = "Table name with type", required = true)
@PathParam("tableNameWithType")
String tableNameWithType,
@@ -294,8 +295,12 @@ public class PinotTaskRestletResource {
CompletionServiceHelper completionServiceHelper =
new CompletionServiceHelper(_executor, _connectionManager,
HashBiMap.create(0));
+ Map<String, String> requestHeaders = new HashMap<>();
+ httpHeaders.getRequestHeaders().keySet().forEach(header -> {
+ requestHeaders.put(header, httpHeaders.getHeaderString(header));
+ });
CompletionServiceHelper.CompletionServiceResponse serviceResponse =
- completionServiceHelper.doMultiGetRequest(controllerUrls, null, true,
10000);
+ completionServiceHelper.doMultiGetRequest(controllerUrls, null, true,
requestHeaders, 10000);
List<JsonNode> result = new ArrayList<>();
serviceResponse._httpResponses.values().forEach(resp -> {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
index 34cc030a30..4a1bbb2623 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
+import javax.annotation.Nullable;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.URI;
import org.apache.commons.httpclient.methods.GetMethod;
@@ -54,6 +55,11 @@ public class CompletionServiceHelper {
_endpointsToServers = endpointsToServers;
}
+ public CompletionServiceResponse doMultiGetRequest(List<String> serverURLs,
String tableNameWithType,
+ boolean multiRequestPerServer, int timeoutMs) {
+ return doMultiGetRequest(serverURLs, tableNameWithType,
multiRequestPerServer, null, timeoutMs);
+ }
+
/**
* This method makes a MultiGet call to all given URLs.
* @param serverURLs server urls to send GET request.
@@ -62,17 +68,18 @@ public class CompletionServiceHelper {
* If multiRequestPerServer is set as false,
return as long as one of the requests get
* response; If multiRequestPerServer is set as
true, wait until all requests
* get response.
+ * @param requestHeaders Headers to be set when making the http calls.
* @param timeoutMs timeout in milliseconds to wait per request.
* @return CompletionServiceResponse Map of the endpoint(server instance, or
full request path if
* multiRequestPerServer is true) to the response from that endpoint.
*/
public CompletionServiceResponse doMultiGetRequest(List<String> serverURLs,
String tableNameWithType,
- boolean multiRequestPerServer, int timeoutMs) {
+ boolean multiRequestPerServer, @Nullable Map<String, String>
requestHeaders, int timeoutMs) {
CompletionServiceResponse completionServiceResponse = new
CompletionServiceResponse();
// TODO: use some service other than completion service so that we know
which server encounters the error
CompletionService<GetMethod> completionService =
- new MultiGetRequest(_executor,
_httpConnectionManager).execute(serverURLs, timeoutMs);
+ new MultiGetRequest(_executor,
_httpConnectionManager).execute(serverURLs, requestHeaders, timeoutMs);
for (int i = 0; i < serverURLs.size(); i++) {
GetMethod getMethod = null;
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]