This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch branch-0.5.0
in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git

commit adf65a98e6c09bc5a8e2657addd8262e4e4c8412
Author: Bäm <benjamin.per...@switch.ch>
AuthorDate: Mon May 6 14:01:33 2024 +0200

    fix the broker stats with different environments (#570)
    
    * feat: extend broker stats with environment
    
    * tests: fix environment changes
    
    * fix: settings to mitigate potential hikari pool errors
    
    * fix: reset dockerfiles
    
    (cherry picked from commit 5b4eda203d0765db3b2d5d549142d2858c1c304d)
---
 front-end/package.json                             |  1 +
 front-end/src/utils/request.js                     |  6 ++-
 .../pulsar/manager/service/PulsarAdminService.java |  3 +-
 .../service/impl/BrokerStatsServiceImpl.java       | 54 ++++++++++------------
 .../service/impl/EnvironmentCacheServiceImpl.java  |  6 +--
 .../service/impl/PulsarAdminServiceImpl.java       | 29 +++++++-----
 src/main/resources/application.properties          |  5 ++
 .../service/BrokerStatsServiceImplTest.java        |  4 +-
 .../service/PulsarAdminServiceImplTest.java        |  2 +-
 9 files changed, 60 insertions(+), 50 deletions(-)

diff --git a/front-end/package.json b/front-end/package.json
index 7b4853e..4a7989e 100644
--- a/front-end/package.json
+++ b/front-end/package.json
@@ -49,6 +49,7 @@
     "mockjs": "1.0.1-beta3",
     "normalize.css": "7.0.0",
     "nprogress": "0.2.0",
+    "qs": "^6.12.1",
     "showdown": "1.9.1",
     "sortablejs": "1.7.0",
     "vue": "2.6.0",
diff --git a/front-end/src/utils/request.js b/front-end/src/utils/request.js
index 1a55e73..5a21e67 100644
--- a/front-end/src/utils/request.js
+++ b/front-end/src/utils/request.js
@@ -21,11 +21,15 @@ import { getEnvironment } from '@/utils/environment'
 import { getTenant } from '@/utils/tenant'
 import router from '../router'
 import { getCsrfToken } from '@/utils/csrfToken'
+import qs from "qs";
 
 // create an axios instance
 const service = axios.create({
   baseURL: process.env.BASE_API, // api 的 base_url
-  timeout: 60000 // request timeout
+  timeout: 60000, // request timeout
+  paramsSerializer: function(params) {
+    return qs.stringify(params, { arrayFormat: 'repeat' })
+  }
 })
 
 // request interceptor
diff --git 
a/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java 
b/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java
index 0afdb6f..da8fe52 100644
--- a/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java
+++ b/src/main/java/org/apache/pulsar/manager/service/PulsarAdminService.java
@@ -24,7 +24,8 @@ import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.client.admin.Topics;
 
 public interface PulsarAdminService {
-    PulsarAdmin getPulsarAdmin(String url);
+    PulsarAdmin getPulsarAdmin(String url, String env, String token);
+    BrokerStats brokerStats(String url, String env);
     BrokerStats brokerStats(String url);
     Clusters clusters(String url);
     Clusters clusters(String url, String token);
diff --git 
a/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java
 
b/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java
index ae0f083..ab85bbe 100644
--- 
a/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java
+++ 
b/src/main/java/org/apache/pulsar/manager/service/impl/BrokerStatsServiceImpl.java
@@ -54,6 +54,7 @@ import 
org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Service;
+import org.springframework.web.util.UriComponents;
 import org.springframework.web.util.UriComponentsBuilder;
 
 @Service
@@ -122,24 +123,24 @@ public class BrokerStatsServiceImpl implements 
BrokerStatsService {
         List<EnvironmentEntity> environmentEntities = 
environmentsRepository.getAllEnvironments();
         Map<Pair<String, String>, String> collectStatsServiceUrls = new 
HashMap<>();
         for (EnvironmentEntity env : environmentEntities) {
-            String serviceUrl = checkServiceUrl(null, env.getBroker());
+            String brokerUrl = env.getBroker();
             Map<String, Object> clusterObject =
-                clustersService.getClustersList(0, 0, serviceUrl, (c) -> 
serviceUrl);
+                clustersService.getClustersList(0, 0, brokerUrl, (c) -> 
brokerUrl);
             List<HashMap<String, Object>> clusterLists = (List<HashMap<String, 
Object>>) clusterObject.get("data");
             clusterLists.forEach((clusterMap) -> {
                 String cluster = (String) clusterMap.get("cluster");
                 Pair<String, String> envCluster = Pair.of(env.getName(), 
cluster);
 
+                log.debug(envCluster.toString());
+
                 String serviceUrlTls = (String) 
clusterMap.get("serviceUrlTls");
-                tlsEnabled = tlsEnabled && 
StringUtils.isNotBlank(serviceUrlTls);
-                String webServiceUrl = tlsEnabled ? serviceUrlTls : (String) 
clusterMap.get("serviceUrl");
+                String serviceUrl = (String) clusterMap.get("serviceUrl");
+
+                String webServiceUrl = StringUtils.isNotBlank(serviceUrlTls) ? 
serviceUrlTls : serviceUrl;
                 if (webServiceUrl.contains(",")) {
                     String[] webServiceUrlList = webServiceUrl.split(",");
                     for (String url : webServiceUrlList) {
-                        // making sure the protocol is appended in case the 
env was added without the protocol
-                        if (!tlsEnabled && !url.contains("http://";)) {
-                            url = (tlsEnabled ? "https://"; : "http://";) + url;
-                        }
+
                         try {
                             Brokers brokers = pulsarAdminService.brokers(url);
                             brokers.healthcheck();
@@ -150,14 +151,10 @@ public class BrokerStatsServiceImpl implements 
BrokerStatsService {
                         }
                     }
                 }
-                collectStatsServiceUrls.put(envCluster, webServiceUrl);
+                log.info("Start collecting stats from env {} / cluster {} @ 
{}", envCluster.getLeft(), envCluster.getRight(), serviceUrl);
+                collectStatsToDB(unixTime, envCluster.getLeft(), 
envCluster.getRight(), webServiceUrl);
             });
         }
-        collectStatsServiceUrls.forEach((envCluster, serviceUrl) -> {
-            log.info("Start collecting stats from env {} / cluster {} @ {}",
-                envCluster.getLeft(), envCluster.getRight(), serviceUrl);
-            collectStatsToDB(unixTime, envCluster.getLeft(), 
envCluster.getRight(), serviceUrl);
-        });
 
         log.info("Start clearing stats from broker");
         clearStats(unixTime, clearStatsInterval / 1000);
@@ -168,18 +165,21 @@ public class BrokerStatsServiceImpl implements 
BrokerStatsService {
         List<HashMap<String, Object>> brokerLists = (List<HashMap<String, 
Object>>) brokerObject.get("data");
         brokerLists.forEach((brokerMap) -> {
             // returns [Broker Hostname]:[Broker non Tls port]
-            String tempBroker = (String) brokerMap.get("broker");
-            //default to http
-            String broker = "http://"; + tempBroker;
-            // if tls enabled the protocol and port is extracted from service 
url
-            if (tlsEnabled && tempBroker.contains(":")) {
-                String brokerHost = tempBroker.substring(0, 
tempBroker.indexOf(":"));
-                UriComponentsBuilder builder = 
UriComponentsBuilder.fromHttpUrl(serviceUrl);
-                broker = builder.host(brokerHost).toUriString();
-            }
+            String broker = (String) brokerMap.get("broker");
+            log.info("processing broker: {}", broker);
+
+            // use web service url scheme to replace host part with broker
+            UriComponents serviceURI = 
UriComponentsBuilder.fromHttpUrl(serviceUrl).build();
+            UriComponentsBuilder builder = UriComponentsBuilder.newInstance()
+                    .scheme(serviceURI.getScheme())
+                    .host(broker.split(":")[0])
+                    .port(serviceURI.getPort());
+            String finalBroker = builder.toUriString();
+
             JsonObject result;
             try {
-                result = pulsarAdminService.brokerStats(broker).getTopics();
+                log.info("Start collecting stats from broker {}", finalBroker);
+                result = pulsarAdminService.brokerStats(finalBroker, 
env).getTopics();
             } catch(PulsarAdminException e) {
                 log.error("Failed to get broker metrics.", e);
                 return;
@@ -197,7 +197,7 @@ public class BrokerStatsServiceImpl implements 
BrokerStatsService {
                             String[] topicPath = this.parseTopic(topic);
                             topicStatsEntity.setEnvironment(env);
                             topicStatsEntity.setCluster(cluster);
-                            topicStatsEntity.setBroker(tempBroker);
+                            topicStatsEntity.setBroker(finalBroker);
                             topicStatsEntity.setTenant(topicPath[0]);
                             topicStatsEntity.setNamespace(topicPath[1]);
                             topicStatsEntity.setBundle(bundle);
@@ -309,10 +309,6 @@ public class BrokerStatsServiceImpl implements 
BrokerStatsService {
         if (serviceUrl == null || serviceUrl.length() <= 0) {
             serviceUrl = requestHost;
         }
-
-        if (!serviceUrl.startsWith("http")) {
-            serviceUrl = "http://"; + serviceUrl;
-        }
         return serviceUrl;
     }
 
diff --git 
a/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java
 
b/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java
index c75e267..17c87e3 100644
--- 
a/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java
+++ 
b/src/main/java/org/apache/pulsar/manager/service/impl/EnvironmentCacheServiceImpl.java
@@ -114,9 +114,6 @@ public class EnvironmentCacheServiceImpl implements 
EnvironmentCacheService {
             String[] webServiceUrlList = webServiceUrl.split(",");
             int index = ThreadLocalRandom.current().nextInt(0, 
webServiceUrlList.length);
             String url = webServiceUrlList[index];
-            if (!url.contains("http://";)) {
-                url = "http://"; + url;
-            }
             log.info("pick web url:{}", url);
             return url;
         }
@@ -141,7 +138,8 @@ public class EnvironmentCacheServiceImpl implements 
EnvironmentCacheService {
             throw new RuntimeException(
                     "No cluster '" + cluster + "' found in environment '" + 
environment + "'");
         }
-        return tlsEnabled && 
StringUtils.isNotBlank(clusterData.getServiceUrlTls()) ? 
clusterData.getServiceUrlTls() : clusterData.getServiceUrl();
+
+        return StringUtils.isNotBlank(clusterData.getServiceUrlTls()) ? 
clusterData.getServiceUrlTls() : clusterData.getServiceUrl();
     }
 
     @Scheduled(
diff --git 
a/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java
 
b/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java
index c269dfa..da9ad9f 100644
--- 
a/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java
+++ 
b/src/main/java/org/apache/pulsar/manager/service/impl/PulsarAdminServiceImpl.java
@@ -79,19 +79,25 @@ public class PulsarAdminServiceImpl implements 
PulsarAdminService {
         pulsarAdmins.values().forEach(value -> value.close());
     }
 
-    public synchronized PulsarAdmin getPulsarAdmin(String url) {
-        if (!pulsarAdmins.containsKey(url)) {
-            pulsarAdmins.put(url, this.createPulsarAdmin(url, null));
-        }
-        return pulsarAdmins.get(url);
+
+    public PulsarAdmin getPulsarAdmin(String url) {
+        return this.createPulsarAdmin(url, null, null);
     }
 
     public PulsarAdmin getPulsarAdmin(String url, String token) {
-        return this.createPulsarAdmin(url, token);
+        return this.createPulsarAdmin(url, null, token);
+    }
+
+    public PulsarAdmin getPulsarAdmin(String url, String env, String token) {
+        return this.createPulsarAdmin(url, env, token);
     }
 
     public BrokerStats brokerStats(String url) {
-        return getPulsarAdmin(url).brokerStats();
+        return getPulsarAdmin(url, null, null).brokerStats();
+    }
+
+    public BrokerStats brokerStats(String url, String env) {
+        return getPulsarAdmin(url, env, null).brokerStats();
     }
 
     public Clusters clusters(String url) {
@@ -149,24 +155,23 @@ public class PulsarAdminServiceImpl implements 
PulsarAdminService {
         return result;
     }
 
-    private String getEnvironmentToken(String url) {
+    private String getEnvironmentToken(String url, String env) {
         Optional<EnvironmentEntity> optionalEnvironmentEntity = 
environmentsRepository.findByBroker(url);
         if (optionalEnvironmentEntity.isPresent()) {
             return optionalEnvironmentEntity.get().getToken();
         }
-        String environment = environmentCacheService.getEnvironment(url);
-        Optional<EnvironmentEntity> environmentEntityOptional = 
environmentsRepository.findByName(environment);
+        Optional<EnvironmentEntity> environmentEntityOptional = 
environmentsRepository.findByName(env);
         return 
environmentEntityOptional.map(EnvironmentEntity::getToken).orElse(null);
     }
 
-    private PulsarAdmin createPulsarAdmin(String url, String token) {
+    private PulsarAdmin createPulsarAdmin(String url, String env, String 
token) {
         try {
             log.info("Create Pulsar Admin instance. url={}, authPlugin={}, 
authParams={}, tlsAllowInsecureConnection={}, tlsTrustCertsFilePath={}, 
tlsEnableHostnameVerification={}",
                     url, authPlugin, authParams, tlsAllowInsecureConnection, 
tlsTrustCertsFilePath, tlsEnableHostnameVerification);
             PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder();
             pulsarAdminBuilder.serviceHttpUrl(url);
             if (null == token) {
-                token = getEnvironmentToken(url);
+                token = getEnvironmentToken(url, env);
             }
             if (StringUtils.isNotBlank(token)) {
                 
pulsarAdminBuilder.authentication(AuthenticationFactory.token(token));
diff --git a/src/main/resources/application.properties 
b/src/main/resources/application.properties
index 80828a2..1ae3a96 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -59,6 +59,11 @@ spring.datasource.initialization-mode=always
 #spring.datasource.username=postgres
 #spring.datasource.password=postgres
 
+# hikari configuration
+spring.datasource.hikari.connectionTimeout=10000
+spring.datasource.hikari.idleTimeout=60000
+spring.datasource.hikari.maxLifetime=300000
+
 # zuul config
 # 
https://cloud.spring.io/spring-cloud-static/Dalston.SR5/multi/multi__router_and_filter_zuul.html
 # By Default Zuul adds  Authorization to be dropped headers list. Below we are 
manually setting it
diff --git 
a/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java
 
b/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java
index a15e312..2aab625 100644
--- 
a/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java
+++ 
b/src/test/java/org/apache/pulsar/manager/service/BrokerStatsServiceImplTest.java
@@ -234,7 +234,7 @@ public class BrokerStatsServiceImplTest {
         brokersMap.put("data", brokersArray);
         Mockito.when(brokersService.getBrokersList(0,0, cluster, serviceUrl))
                 .thenReturn(brokersMap);
-        
Mockito.when(pulsarAdminService.brokerStats(serviceUrl)).thenReturn(stats);
+        Mockito.when(pulsarAdminService.brokerStats(serviceUrl, 
environment)).thenReturn(stats);
         JsonObject data = new Gson().fromJson(testData, JsonObject.class);
         Mockito.when(stats.getTopics())
                 .thenReturn(data);
@@ -310,7 +310,7 @@ public class BrokerStatsServiceImplTest {
         brokersMap.put("data", brokersArray);
         Mockito.when(brokersService.getBrokersList(0,0, cluster, serviceUrl))
                 .thenReturn(brokersMap);
-        
Mockito.when(pulsarAdminService.brokerStats(serviceUrl)).thenReturn(stats);
+        Mockito.when(pulsarAdminService.brokerStats(serviceUrl, 
environment)).thenReturn(stats);
         JsonObject data = new Gson().fromJson(testData, JsonObject.class);
         Mockito.when(stats.getTopics())
                 .thenReturn(data);
diff --git 
a/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java
 
b/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java
index a819e86..4600a08 100644
--- 
a/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java
+++ 
b/src/test/java/org/apache/pulsar/manager/service/PulsarAdminServiceImplTest.java
@@ -50,7 +50,7 @@ public class PulsarAdminServiceImplTest {
 
     @Test
     public void getPulsarAdminTest() {
-        String serviceUrl = 
pulsarAdminService.getPulsarAdmin("http://localhost:8080";).getServiceUrl();
+        String serviceUrl = 
pulsarAdminService.getPulsarAdmin("http://localhost:8080";, null, 
null).getServiceUrl();
         Assert.assertEquals("http://localhost:8080";, serviceUrl);
     }
 

Reply via email to