This is an automated email from the ASF dual-hosted git repository.
kishoreg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0dfa0e8 Add support for passing headers in pinot client (#4243)
0dfa0e8 is described below
commit 0dfa0e89f2e8dc25feff9642bc24adb964c305e0
Author: Jitendra Kushwaha <[email protected]>
AuthorDate: Wed Jun 5 23:27:12 2019 +0530
Add support for passing headers in pinot client (#4243)
* Add support for passing headers in pinot client
There are usecases where the service using the pinot client need to send
the user defined headers to the http endpoint which seats behind load balancer.
Adding support for the same in the client
* Add support for passing headers in pinot client
There are usecases where the service using the pinot client need to send
the user defined headers to the http endpoint which seats behind load balancer.
Adding support for the same in the client
* Add annotation for interface and default inplementation for new interface
method
* remove unused includes
* Keep PinotClientTransportFactory interface annotation as Private
Currently this interface is not exposed as public
* Annotate ConnectionFactory appropriately which is exposed outside of the
library
* Add pinot-common as dependency and use annotations of it
* Revert "Add pinot-common as dependency and use annotations of it"
This reverts commit 223e4ad31092d2e7a28cbe7380bdaf64e14ee95f.
* Remove annotations for interface from pinot-api module
---
.../org/apache/pinot/client/ConnectionFactory.java | 19 ++++++++++++++++---
.../client/JsonAsyncHttpPinotClientTransport.java | 17 ++++++++++++++++-
.../JsonAsyncHttpPinotClientTransportFactory.java | 7 +++++++
.../pinot/client/PinotClientTransportFactory.java | 14 ++++++++++++++
.../apache/pinot/client/ConnectionFactoryTest.java | 17 +++++++++++++++++
5 files changed, 70 insertions(+), 4 deletions(-)
diff --git
a/pinot-api/src/main/java/org/apache/pinot/client/ConnectionFactory.java
b/pinot-api/src/main/java/org/apache/pinot/client/ConnectionFactory.java
index 9992b9f..666ca7c 100644
--- a/pinot-api/src/main/java/org/apache/pinot/client/ConnectionFactory.java
+++ b/pinot-api/src/main/java/org/apache/pinot/client/ConnectionFactory.java
@@ -19,6 +19,8 @@
package org.apache.pinot.client;
import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
@@ -40,7 +42,7 @@ public class ConnectionFactory {
public static Connection fromZookeeper(String zkUrl) {
try {
DynamicBrokerSelector dynamicBrokerSelector = new
DynamicBrokerSelector(zkUrl);
- return new Connection(dynamicBrokerSelector,
_transportFactory.buildTransport());
+ return new Connection(dynamicBrokerSelector,
_transportFactory.buildTransport(null));
} catch (Exception e) {
throw new PinotClientException(e);
}
@@ -54,7 +56,7 @@ public class ConnectionFactory {
*/
public static Connection fromProperties(Properties properties) {
return new
Connection(Arrays.asList(properties.getProperty("brokerList").split(",")),
- _transportFactory.buildTransport());
+ _transportFactory.buildTransport(null));
}
/**
@@ -64,6 +66,17 @@ public class ConnectionFactory {
* @return A connection to the set of brokers specified
*/
public static Connection fromHostList(String... brokers) {
- return new Connection(Arrays.asList(brokers),
_transportFactory.buildTransport());
+ return new Connection(Arrays.asList(brokers),
_transportFactory.buildTransport(null));
+ }
+
+ /**
+ * Creates a connection which sends queries randomly between the specified
brokers.
+ *
+ * @param brokers The list of brokers to send queries to
+ * @param headers Map of key and values of header which need to be used
during http call
+ * @return A connection to the set of brokers specified
+ */
+ public static Connection fromHostList(List<String> brokers, Map<String,
String> headers) {
+ return new Connection(brokers, _transportFactory.buildTransport(headers));
}
}
diff --git
a/pinot-api/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
b/pinot-api/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
index 2c02357..e6d4472 100644
---
a/pinot-api/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
+++
b/pinot-api/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransport.java
@@ -24,6 +24,8 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.Response;
+
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -39,6 +41,13 @@ class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport {
private static final ObjectReader OBJECT_READER = new
ObjectMapper().reader();
AsyncHttpClient _httpClient = new AsyncHttpClient();
+ Map<String, String> _headers;
+
+ public JsonAsyncHttpPinotClientTransport() {}
+
+ public JsonAsyncHttpPinotClientTransport(Map<String, String> headers) {
+ _headers = headers;
+ }
@Override
public BrokerResponse executeQuery(String brokerAddress, String query)
@@ -58,7 +67,13 @@ class JsonAsyncHttpPinotClientTransport implements
PinotClientTransport {
final String url = "http://" + brokerAddress + "/query";
- final Future<Response> response = _httpClient.preparePost(url)
+ AsyncHttpClient.BoundRequestBuilder request =
_httpClient.preparePost(url);
+
+ if(_headers != null) {
+ _headers.forEach((k, v) -> request.addHeader(k, v));
+ }
+
+ final Future<Response> response = request
.addHeader("Content-Type", "application/json; charset=utf-8")
.setBody(json.toString()).execute();
diff --git
a/pinot-api/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportFactory.java
b/pinot-api/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportFactory.java
index da2ff12..7dda9d8 100644
---
a/pinot-api/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportFactory.java
+++
b/pinot-api/src/main/java/org/apache/pinot/client/JsonAsyncHttpPinotClientTransportFactory.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.client;
+import java.util.Map;
+
/**
* Pinot client transport factory for JSON encoded BrokerResults through HTTP.
*/
@@ -26,4 +28,9 @@ class JsonAsyncHttpPinotClientTransportFactory implements
PinotClientTransportFa
public PinotClientTransport buildTransport() {
return new JsonAsyncHttpPinotClientTransport();
}
+
+ @Override
+ public PinotClientTransport buildTransport(Map<String, String> headers) {
+ return new JsonAsyncHttpPinotClientTransport(headers);
+ }
}
diff --git
a/pinot-api/src/main/java/org/apache/pinot/client/PinotClientTransportFactory.java
b/pinot-api/src/main/java/org/apache/pinot/client/PinotClientTransportFactory.java
index ce74ce8..4d4dd5f 100644
---
a/pinot-api/src/main/java/org/apache/pinot/client/PinotClientTransportFactory.java
+++
b/pinot-api/src/main/java/org/apache/pinot/client/PinotClientTransportFactory.java
@@ -18,9 +18,23 @@
*/
package org.apache.pinot.client;
+import java.util.Map;
+
/**
* Factory for client transports.
*/
interface PinotClientTransportFactory {
+ /**
+ * This method is deprecating. Method with headers can be used in place of
this by passing null headers.
+ */
+ @Deprecated
PinotClientTransport buildTransport();
+
+ /**
+ * Fetch pinot client transport
+ * @param headers custom headers to be passed in the pinot client call.
+ */
+ default PinotClientTransport buildTransport(Map<String, String> headers) {
+ return buildTransport();
+ }
}
diff --git
a/pinot-api/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
b/pinot-api/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
index 81a26bc..ce685c0 100644
--- a/pinot-api/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
+++ b/pinot-api/src/test/java/org/apache/pinot/client/ConnectionFactoryTest.java
@@ -19,7 +19,10 @@
package org.apache.pinot.client;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -58,6 +61,20 @@ public class ConnectionFactoryTest {
Assert.assertEquals(connection.getBrokerList(), brokers);
}
+ @Test
+ public void testBrokerListWithHeaders() {
+ // Create the connection
+ List<String> brokers = new ArrayList<>();
+ brokers.add("127.0.0.1:1234");
+ brokers.add("localhost:2345");
+ Map<String, String> headers = new HashMap<>();
+ headers.put("Caller", "curl");
+ Connection connection = ConnectionFactory.fromHostList(brokers, headers);
+
+ // Check that the broker list has the right length and has the same servers
+ Assert.assertEquals(connection.getBrokerList(), brokers);
+ }
+
// For testing DynamicBrokerSelector
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]