This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 07d4226 [proxy][functions] Issue #2154: proxy should be able to
forward rest requests to function workers cluster (#2560)
07d4226 is described below
commit 07d42261eb2d685de6f9a5ab214f4d08944ffb6e
Author: Sijie Guo <[email protected]>
AuthorDate: Fri Sep 14 01:10:35 2018 -0700
[proxy][functions] Issue #2154: proxy should be able to forward rest
requests to function workers cluster (#2560)
*Motivation*
Function workers can be deployed as a separate cluster. If so, proxy is not
able to forward the functions
related rest calls to the correct server.
*Changes*
Add two settings in proxy configuration to allow proxy configuring
forwarding functions related rest calls
to function worker cluster.
*Tests*
Verified with changes in integration tests (manually). It is hard to add
the integration tests based on
current integration tests. will add them in a separate PR.
---
conf/proxy.conf | 5 +++++
.../apache/pulsar/proxy/server/AdminProxyHandler.java | 16 ++++++++++++++--
.../apache/pulsar/proxy/server/ProxyConfiguration.java | 12 ++++++++++++
3 files changed, 31 insertions(+), 2 deletions(-)
diff --git a/conf/proxy.conf b/conf/proxy.conf
index 9b307cc..b95c4d3 100644
--- a/conf/proxy.conf
+++ b/conf/proxy.conf
@@ -23,6 +23,11 @@ zookeeperServers=
# Configuration store connection string (as a comma-separated list)
configurationStoreServers=
+# If function workers are setup in a separate cluster, configure the following
2 settings
+# to point to the function workers cluster
+functionWorkerWebServiceURL=
+functionWorkerWebServiceURLTLS=
+
# ZooKeeper session timeout (in milliseconds)
zookeeperSessionTimeoutMs=30000
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
index 49c789c..d6c32df 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/AdminProxyHandler.java
@@ -49,12 +49,15 @@ class AdminProxyHandler extends AsyncProxyServlet {
private final ProxyConfiguration config;
private final BrokerDiscoveryProvider discoveryProvider;
private final String brokerWebServiceUrl;
+ private final String functionWorkerWebServiceUrl;
AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider
discoveryProvider) {
this.config = config;
this.discoveryProvider = discoveryProvider;
this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ?
config.getBrokerWebServiceURLTLS()
: config.getBrokerWebServiceURL();
+ this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ?
config.getFunctionWorkerWebServiceURLTLS()
+ : config.getFunctionWorkerWebServiceURL();
}
@Override
@@ -122,7 +125,16 @@ class AdminProxyHandler extends AsyncProxyServlet {
protected String rewriteTarget(HttpServletRequest request) {
StringBuilder url = new StringBuilder();
- if (isBlank(brokerWebServiceUrl)) {
+ boolean isFunctionsRestRequest = false;
+ String requestUri = request.getRequestURI();
+ if (requestUri.startsWith("/admin/v2/functions")
+ || requestUri.startsWith("/admin/functions")) {
+ isFunctionsRestRequest = true;
+ }
+
+ if (isFunctionsRestRequest && !isBlank(functionWorkerWebServiceUrl)) {
+ url.append(functionWorkerWebServiceUrl);
+ } else if (isBlank(brokerWebServiceUrl)) {
try {
ServiceLookupData availableBroker =
discoveryProvider.nextBroker();
@@ -148,7 +160,7 @@ class AdminProxyHandler extends AsyncProxyServlet {
if (url.lastIndexOf("/") == url.length() - 1) {
url.deleteCharAt(url.lastIndexOf("/"));
}
- url.append(request.getRequestURI());
+ url.append(requestUri);
String query = request.getQueryString();
if (query != null) {
diff --git
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
index b4e8afb..0155baa 100644
---
a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
+++
b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConfiguration.java
@@ -48,6 +48,10 @@ public class ProxyConfiguration implements
PulsarConfiguration {
private String brokerWebServiceURL;
private String brokerWebServiceURLTLS;
+ // function worker web services
+ private String functionWorkerWebServiceURL;
+ private String functionWorkerWebServiceURLTLS;
+
// Port to use to server binary-proto request
private int servicePort = 6650;
// Port to use to server binary-proto-tls request
@@ -158,6 +162,14 @@ public class ProxyConfiguration implements
PulsarConfiguration {
this.brokerWebServiceURLTLS = brokerWebServiceURLTLS;
}
+ public String getFunctionWorkerWebServiceURL() {
+ return functionWorkerWebServiceURL;
+ }
+
+ public String getFunctionWorkerWebServiceURLTLS() {
+ return functionWorkerWebServiceURLTLS;
+ }
+
public String getZookeeperServers() {
return zookeeperServers;
}