This is an automated email from the ASF dual-hosted git repository.

guangning pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dde739  Support peek messages from pulsar broker (#241)
9dde739 is described below

commit 9dde739477b743392b325ecdfcaa55932f2e450d
Author: Guangning <[email protected]>
AuthorDate: Thu Feb 6 09:37:47 2020 +0800

    Support peek messages from pulsar broker (#241)
    
    ### Motivation
    
    At present, pulsar-manager does not support a peek message. After the pr is 
merged, pulsar-manager will support a peek message.
    
    ### Modifications
    
    * Support peek messages, include the batch message and non-batch message
    * Upgrade pulsar-client version to 2.4.2
    
    
![image](https://user-images.githubusercontent.com/1907867/71344369-4c64f780-259d-11ea-8c99-ae6a2b670088.png)
---
 build.gradle                                       |   1 +
 front-end/src/api/subscriptions.js                 |  14 ++
 front-end/src/lang/en.js                           |  12 ++
 front-end/src/lang/zh.js                           |  12 ++
 .../management/subscriptions/subscription.vue      | 145 ++++++++++++---------
 .../views/management/topics/partitionedTopic.vue   |   3 +
 front-end/src/views/management/topics/topic.vue    |   3 +
 front-end/src/views/management/users/index.vue     |   2 +-
 gradle.properties                                  |   2 +-
 .../manager/controller/TopicsController.java       |  86 ++++++++++++
 .../pulsar/manager/zuul/EnvironmentForward.java    |  35 +++--
 src/main/resources/application.properties          |   3 +
 12 files changed, 242 insertions(+), 76 deletions(-)

diff --git a/build.gradle b/build.gradle
index 5aeb716..36a0f1d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -107,6 +107,7 @@ dependencies {
     compile group: 'com.google.guava', name: 'guava', version: guavaVersion
     compile group: 'com.google.code.gson', name: 'gson', version: gsonVersion
     compile group: 'org.apache.pulsar', name: 'pulsar-common', version: 
pulsarVersion
+    compile group: 'org.apache.pulsar', name: 'pulsar-client-admin', version: 
pulsarVersion
     compile group: 'io.springfox', name: 'springfox-swagger2', version: 
swagger2Version
     compile group: 'io.springfox', name: 'springfox-swagger-ui', version: 
swaggeruiVersion
     compile group: 'org.apache.pulsar', name: 'pulsar-broker', version: 
brokerVersion
diff --git a/front-end/src/api/subscriptions.js 
b/front-end/src/api/subscriptions.js
index 5a002f2..72aa641 100644
--- a/front-end/src/api/subscriptions.js
+++ b/front-end/src/api/subscriptions.js
@@ -15,6 +15,8 @@ import request from '@/utils/request'
 
 const BASE_URL_V2 = '/admin/v2'
 
+const SPRING_BASE_URL = '/pulsar-manager/admin/v2'
+
 export function fetchSubscriptions(persistent, tenantNamespaceTopic) {
   return request({
     url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/subscriptions`,
@@ -57,3 +59,15 @@ export function deleteSubscriptionOnCluster(cluster, 
persistent, tenantNamespace
     method: 'delete'
   })
 }
+
+export function peekMessagesOnCluster(cluster, persistent, 
tenantNamespaceTopic, subName, messagePosition) {
+  return request({
+    headers: {
+      'Content-Type': 'application/json',
+      'x-pulsar-cluster': cluster
+    },
+    url: SPRING_BASE_URL + 
`/${persistent}/${tenantNamespaceTopic}/subscription/${subName}/${messagePosition}`,
+    method: 'get'
+  })
+}
+
diff --git a/front-end/src/lang/en.js b/front-end/src/lang/en.js
index 3a2d729..5f5fd59 100644
--- a/front-end/src/lang/en.js
+++ b/front-end/src/lang/en.js
@@ -558,6 +558,18 @@ export default {
       storageSize: 'Storage Size',
       entries: 'Entries',
       segments: 'Segments',
+      peek: 'PEEK',
+      peekMessages: 'messages',
+      entryId: 'Entry Id',
+      message: 'Message',
+      messageGreaterThanZero: 'Messages should greater than 0',
+      peekMessageError: 'Peek messages error, please check',
+      messageSkipSuccess: 'Messages skip successfully',
+      expireMessageSuccess: 'Messages expire successfully',
+      clearMessageSuccess: 'Clear messages successfully',
+      minutesNotLessThanZero: 'Minutes cannot be less than 0',
+      resetCursorSuccess: 'Reset cursor successfully',
+      messageIdNotLessThanZero: 'Message Id cannot be less than 0',
       newSub: 'New Subscription',
       sub: 'Subscribe',
       unsub: 'Unsubscribe',
diff --git a/front-end/src/lang/zh.js b/front-end/src/lang/zh.js
index d09b9b3..59afe53 100644
--- a/front-end/src/lang/zh.js
+++ b/front-end/src/lang/zh.js
@@ -558,6 +558,18 @@ export default {
       storageSize: 'Storage Size',
       entries: 'Entries',
       segments: 'Segments',
+      peek: 'PEEK',
+      peekMessages: 'messages',
+      entryId: 'Entry Id',
+      message: 'Message',
+      messageGreaterThanZero: 'Messages should greater than 0',
+      peekMessageError: 'Peek messages error, please check',
+      messageSkipSuccess: 'Messages skip successfully',
+      expireMessageSuccess: 'Messages expire successfully',
+      clearMessageSuccess: 'Clear messages successfully',
+      minutesNotLessThanZero: 'Minutes cannot be less than 0',
+      resetCursorSuccess: 'Reset cursor successfully',
+      messageIdNotLessThanZero: 'Message Id cannot be less than 0',
       newSub: 'New Subscription',
       sub: 'Subscribe',
       unsub: 'Unsubscribe',
diff --git a/front-end/src/views/management/subscriptions/subscription.vue 
b/front-end/src/views/management/subscriptions/subscription.vue
index d890be8..afb4a32 100644
--- a/front-end/src/views/management/subscriptions/subscription.vue
+++ b/front-end/src/views/management/subscriptions/subscription.vue
@@ -102,38 +102,6 @@
       </el-tab-pane>
       <el-tab-pane :label="$t('topic.backlogOpeartion')" 
name="backlogOperation">
         <el-tabs v-model="leftActiveName" :tab-position="tabPosition" 
@tab-click="handleLeftTabClick">
-          <!-- <el-tab-pane label="INSPECT" name="inspect">
-            <el-form :inline="true" :model="form">
-              <el-button type="primary" 
@click="handlePeekMessages">Peek</el-button>
-              <el-form-item>
-                <el-input v-model="form.peekNumMessages" 
placeholder="messages"/>
-              </el-form-item>
-              <span>messages</span>
-            </el-form>
-            <el-row :gutter="24" style="margin-top:15px">
-              <el-col :xs="{span: 24}" :sm="{span: 24}" :md="{span: 24}" 
:lg="{span: 24}" :xl="{span: 24}" style="padding-right:8px;margin-bottom:30px;">
-                <el-table
-                  v-loading="inspectListLoading"
-                  :key="inspectTableKey"
-                  :data="inspectsList"
-                  border
-                  fit
-                  highlight-current-row
-                  style="width: 100%;">
-                  <el-table-column label="Message ID" min-width="10px" 
align="center">
-                    <template slot-scope="scope">
-                      <span>{{ scope.row.messageId }}</span>
-                    </template>
-                  </el-table-column>
-                  <el-table-column label="data" min-width="30px" 
align="center">
-                    <template slot-scope="scope">
-                      <span>{{ scope.row.data }}</span>
-                    </template>
-                  </el-table-column>
-                </el-table>
-              </el-col>
-            </el-row>
-          </el-tab-pane> -->
           <el-tab-pane :label="$t('topic.subscription.skip')" name="skip">
             <el-form :inline="true" :model="form">
               <el-button type="primary" @click="handleSkipMessages">{{ 
$t('topic.subscription.skip') }}</el-button>
@@ -181,6 +149,43 @@
               </el-form-item>
             </el-form>
           </el-tab-pane>
+          <el-tab-pane label="INSPECT" name="peek">
+            <el-form :inline="true" :model="form">
+              <el-button type="primary" @click="handlePeekMessages">{{ 
$t('topic.subscription.peek') }}</el-button>
+              <el-form-item>
+                <el-input v-model="form.peekNumMessages" 
placeholder="messages"/>
+              </el-form-item>
+              <span>{{ $t('topic.subscription.peekMessages') }}</span>
+            </el-form>
+            <el-row :gutter="24" style="margin-top:15px">
+              <el-col :xs="{span: 24}" :sm="{span: 24}" :md="{span: 24}" 
:lg="{span: 24}" :xl="{span: 24}" 
style="padding-right:20px;margin-bottom:30px;">
+                <el-table
+                  v-loading="inspectListLoading"
+                  :key="inspectTableKey"
+                  :data="inspectsList"
+                  border
+                  fit
+                  highlight-current-row
+                  style="width: 100%;">
+                  <el-table-column :label="$t('topic.segment.ledgerId')" 
min-width="10px" align="center">
+                    <template slot-scope="scope">
+                      <span>{{ scope.row.ledgerId }}</span>
+                    </template>
+                  </el-table-column>
+                  <el-table-column :label="$t('topic.subscription.entryId')" 
min-width="10px" align="center">
+                    <template slot-scope="scope">
+                      <span>{{ scope.row.entryId }}</span>
+                    </template>
+                  </el-table-column>
+                  <el-table-column :label="$t('topic.subscription.message')" 
min-width="30px" align="center">
+                    <template slot-scope="scope">
+                      <span>{{ scope.row.data }}</span>
+                    </template>
+                  </el-table-column>
+                </el-table>
+              </el-col>
+            </el-row>
+          </el-tab-pane>
         </el-tabs>
       </el-tab-pane>
     </el-tabs>
@@ -193,7 +198,6 @@ import { fetchNamespaces, getClusters } from 
'@/api/namespaces'
 import {
   fetchTopicsByPulsarManager,
   fetchTopicStats,
-  // peekMessages,
   skipOnCluster,
   expireMessageOnCluster,
   clearBacklogOnCluster,
@@ -201,7 +205,7 @@ import {
   resetCursorByTimestampOnCluster,
   resetCursorByPositionOnCluster
 } from '@/api/topics'
-import { fetchSubscriptions } from '@/api/subscriptions'
+import { fetchSubscriptions, peekMessagesOnCluster } from '@/api/subscriptions'
 import { formatBytes } from '@/utils/index'
 import { numberFormatter } from '@/filters/index'
 
@@ -457,28 +461,49 @@ export default {
     },
     getConsumers() {
     },
-    // To do, parse message
-    // handlePeekMessages() {
-    //   if (this.form.peekNumMessages <= 0) {
-    //     this.$notify({
-    //       title: 'error',
-    //       message: 'Messages should greater than 0',
-    //       type: 'error',
-    //       duration: 3000
-    //     })
-    //     return
-    //   }
-    //   peekMessages(this.postForm.persistent, this.tenantNamespaceTopic, 
this.postForm.subscription, this.form.peekNumMessages).then(response => {
-    //     if (!response.data) return
-    //     console.log(response)
-    //     console.log(response.data)
-    //   })
-    // },
+    handlePeekMessages() {
+      if (this.form.peekNumMessages <= 0) {
+        this.$notify({
+          title: 'error',
+          message: this.$i18n.t('topic.subscription.messageGreaterThanZero'),
+          type: 'error',
+          duration: 3000
+        })
+        return
+      }
+      peekMessagesOnCluster(
+        this.getCurrentCluster(),
+        this.postForm.persistent,
+        this.tenantNamespaceTopic,
+        this.postForm.subscription,
+        this.form.peekNumMessages).then(response => {
+        if (!response.data) return
+        if (!response.data.data) return
+        if (response.data.data.hasOwnProperty('error')) {
+          this.$notify({
+            title: 'error',
+            message: this.$i18n.t('topic.subscription.peekMessageError'),
+            type: 'error',
+            duration: 3000
+          })
+          return
+        }
+        this.inspectsList = []
+        for (var i = 0; i < response.data.data.length; i++) {
+          this.inspectsList.push({
+            'messageId': i,
+            'entryId': response.data.data[i].entryId,
+            'ledgerId': response.data.data[i].ledgerId,
+            'data': window.atob(response.data.data[i].data)
+          })
+        }
+      })
+    },
     handleSkipMessages() {
       if (this.form.skipNumMessages <= 0) {
         this.$notify({
           title: 'error',
-          message: 'Messages should greater than 0',
+          message: this.$i18n.t('topic.subscription.messageGreaterThanZero'),
           type: 'error',
           duration: 3000
         })
@@ -487,7 +512,7 @@ export default {
       skipOnCluster(this.getCurrentCluster(), this.postForm.persistent, 
this.getFullTopic(), this.postForm.subscription, 
this.form.skipNumMessages).then(response => {
         this.$notify({
           title: 'success',
-          message: 'Messages skip success',
+          message: this.$i18n.t('topic.subscription.messageSkipSuccess'),
           type: 'success',
           duration: 3000
         })
@@ -497,7 +522,7 @@ export default {
       if (this.form.expireMessages <= 0) {
         this.$notify({
           title: 'error',
-          message: 'Messages should greater than 0',
+          message: this.$i18n.t('topic.subscription.messageGreaterThanZero'),
           type: 'error',
           duration: 3000
         })
@@ -506,7 +531,7 @@ export default {
       expireMessageOnCluster(this.getCurrentCluster(), 
this.postForm.persistent, this.getFullTopic(), this.postForm.subscription, 
this.form.expireNumMessages).then(response => {
         this.$notify({
           title: 'success',
-          message: 'Messages expire success',
+          message: this.$i18n.t('topic.subscription.expireMessageSuccess'),
           type: 'success',
           duration: 3000
         })
@@ -516,7 +541,7 @@ export default {
       clearBacklogOnCluster(this.getCurrentCluster(), 
this.postForm.persistent, this.getFullTopic(), 
this.postForm.subscription).then(response => {
         this.$notify({
           title: 'success',
-          message: 'Clear messages success',
+          message: this.$i18n.t('topic.subscription.clearMessageSuccess'),
           type: 'success',
           duration: 3000
         })
@@ -526,7 +551,7 @@ export default {
       if (parseInt(this.form.minutes) <= 0) {
         this.$notify({
           title: 'error',
-          message: 'Minutes cannot be less than 0',
+          message: this.$i18n.t('topic.subscription.minutesNotLessThanZero'),
           type: 'error',
           duration: 3000
         })
@@ -541,7 +566,7 @@ export default {
         this.postForm.subscription, timestamp).then(response => {
         this.$notify({
           title: 'success',
-          message: 'Reset cursor success',
+          message: this.$i18n.t('topic.subscription.resetCursorSuccess'),
           type: 'success',
           duration: 3000
         })
@@ -551,7 +576,7 @@ export default {
       if (this.form.messagesId.length <= 0 && this.form.ledgerValue != null) {
         this.$notify({
           title: 'error',
-          message: 'Message Id cannot be less than 0',
+          message: this.$i18n.t('topic.subscription.messageIdNotLessThanZero'),
           type: 'error',
           duration: 3000
         })
@@ -564,7 +589,7 @@ export default {
       resetCursorByPositionOnCluster(this.getCurrentCluster(), 
this.postForm.persistent, this.getFullTopic(), this.postForm.subscription, 
data).then(response => {
         this.$notify({
           title: 'success',
-          message: 'Reset cursor success',
+          message: this.$i18n.t('topic.subscription.resetCursorSuccess'),
           type: 'success',
           duration: 3000
         })
diff --git a/front-end/src/views/management/topics/partitionedTopic.vue 
b/front-end/src/views/management/topics/partitionedTopic.vue
index 1601773..b0857a3 100644
--- a/front-end/src/views/management/topics/partitionedTopic.vue
+++ b/front-end/src/views/management/topics/partitionedTopic.vue
@@ -121,6 +121,9 @@
                       <router-link :to="scope.row.subscriptionLink + 
'?topTab=backlogOperation&leftTab=reset'" class="link-type">
                         <el-dropdown-item command="reset">{{ 
$t('topic.subscription.reset') }}</el-dropdown-item>
                       </router-link>
+                      <router-link :to="scope.row.subscriptionLink + 
'?topTab=backlogOperation&leftTab=peek'" class="link-type">
+                        <el-dropdown-item command="peek">{{ 
$t('topic.subscription.peek') }}</el-dropdown-item>
+                      </router-link>
                     </el-dropdown-menu>
                   </el-dropdown>
                   <el-dropdown v-else @command="handleAllSub">
diff --git a/front-end/src/views/management/topics/topic.vue 
b/front-end/src/views/management/topics/topic.vue
index 43a0b3a..1d211c9 100644
--- a/front-end/src/views/management/topics/topic.vue
+++ b/front-end/src/views/management/topics/topic.vue
@@ -326,6 +326,9 @@
                       <router-link :to="scope.row.subscriptionLink + 
'?topTab=backlogOperation&leftTab=reset'" class="link-type">
                         <el-dropdown-item command="reset">{{ 
$t('topic.subscription.reset') }}</el-dropdown-item>
                       </router-link>
+                      <router-link :to="scope.row.subscriptionLink + 
'?topTab=backlogOperation&leftTab=peek'" class="link-type">
+                        <el-dropdown-item command="peek">{{ 
$t('topic.subscription.peek') }}</el-dropdown-item>
+                      </router-link>
                       <el-dropdown-item command="unsub">{{ 
$t('topic.subscription.unsub') }}</el-dropdown-item>
                     </el-dropdown-menu>
                   </el-dropdown>
diff --git a/front-end/src/views/management/users/index.vue 
b/front-end/src/views/management/users/index.vue
index 91019fe..2ce567f 100644
--- a/front-end/src/views/management/users/index.vue
+++ b/front-end/src/views/management/users/index.vue
@@ -52,7 +52,7 @@
               <span>{{ scope.row.location }}</span>
             </template>
           </el-table-column>
-          <el-table-column :label="$t('usesr.colUserCompany')" align="center" 
min-width="100px">
+          <el-table-column :label="$t('user.colUserCompany')" align="center" 
min-width="100px">
             <template slot-scope="scope">
               <span>{{ scope.row.company }}</span>
             </template>
diff --git a/gradle.properties b/gradle.properties
index e8fb19a..369e6f6 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -9,7 +9,7 @@ lombokVersion=1.18.10
 pageHelperVersion=1.2.4
 mockitoVersion=1.10.19
 guavaVersion=21.0
-pulsarVersion=2.4.0
+pulsarVersion=2.4.2
 swagger2Version=2.9.2
 swaggeruiVersion=2.9.2
 apiMockitoVersion=1.7.1
diff --git 
a/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java 
b/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
index 8e94b4d..56531fb 100644
--- a/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
+++ b/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
@@ -13,6 +13,15 @@
  */
 package org.apache.pulsar.manager.controller;
 
+import com.google.common.collect.Maps;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.manager.service.EnvironmentCacheService;
 import org.apache.pulsar.manager.service.TopicsService;
 import io.swagger.annotations.Api;
@@ -22,6 +31,8 @@ import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import org.hibernate.validator.constraints.Range;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.ResponseEntity;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestMapping;
@@ -32,6 +43,8 @@ import org.springframework.web.bind.annotation.RestController;
 import javax.servlet.http.HttpServletRequest;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.Size;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -47,6 +60,12 @@ public class TopicsController {
     private final EnvironmentCacheService environmentCacheService;
     private final HttpServletRequest request;
 
+    @Value("${backend.jwt.token}")
+    private String token;
+
+    @Value("${pulsar.peek.message}")
+    private boolean peekMessage;
+
     @Autowired
     public TopicsController(
             TopicsService topicsService,
@@ -110,4 +129,71 @@ public class TopicsController {
             tenant, namespace,
             env, serviceUrl);
     }
+
+    @ApiOperation(value = "Peek messages from pulsar broker")
+    @ApiResponses({
+            @ApiResponse(code = 200, message = "ok"),
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
+    @RequestMapping(
+            value = 
"/{persistent}/{tenant}/{namespace}/{topic}/subscription/{subName}/{messagePosition}",
+            method = RequestMethod.GET)
+    public ResponseEntity<Map<String, Object>> peekMessages(
+            @PathVariable String persistent,
+            @PathVariable String tenant,
+            @PathVariable String namespace,
+            @PathVariable String topic,
+            @PathVariable String subName,
+            @PathVariable Integer messagePosition) {
+        String requestHost = environmentCacheService.getServiceUrl(request);
+        Map<String, Object> result = Maps.newHashMap();
+        if (!peekMessage) {
+            result.put("error", "If you want to support peek message," +
+                    "turn on option pulsar.peek.message in file 
application.properties");
+            return ResponseEntity.ok(result);
+        }
+        PulsarAdmin pulsarAdmin = null;
+        // to do check permission for non super, waiting for 
https://github.com/apache/pulsar-manager/pull/238
+        try {
+            PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder();
+            if (token != null && token.length() > 0) {
+                
pulsarAdminBuilder.authentication(AuthenticationFactory.token(token));
+            }
+            pulsarAdmin = 
pulsarAdminBuilder.serviceHttpUrl(requestHost).build();
+            String topicFullPath = persistent + "://" + tenant + "/" + 
namespace + "/" + topic;
+            List<Message<byte[]>> messages = 
pulsarAdmin.topics().peekMessages(topicFullPath, subName, messagePosition);
+            List<Map<String, Object>> mapList = new ArrayList<>();
+            for (Message<byte[]> msg: messages) {
+                Map<String, Object> message = Maps.newHashMap();
+                if (msg.getMessageId() instanceof BatchMessageIdImpl) {
+                    BatchMessageIdImpl msgId = (BatchMessageIdImpl) 
msg.getMessageId();
+                    message.put("ledgerId", msgId.getLedgerId());
+                    message.put("entryId", msgId.getEntryId());
+                    message.put("batchIndex", msgId.getBatchIndex());
+                    message.put("batch", true);
+                } else {
+                    MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+                    message.put("batch", false);
+                    message.put("ledgerId", msgId.getLedgerId());
+                    message.put("entryId", msgId.getEntryId());
+                }
+                if (msg.getProperties().size() > 0) {
+                    msg.getProperties().forEach((k, v) -> {
+                        message.put(k, v);
+                    });
+                }
+                message.put("data", msg.getData());
+                mapList.add(message);
+            }
+            result.put("data", mapList);
+        } catch (PulsarClientException clientException) {
+            result.put("error", clientException.getMessage());
+        } catch (PulsarAdminException adminException) {
+            result.put("error", adminException.getMessage());
+        }
+        if (pulsarAdmin != null) {
+            pulsarAdmin.close();
+        }
+        return ResponseEntity.ok(result);
+    }
 }
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java 
b/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
index e17f154..0965f3d 100644
--- a/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
+++ b/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
@@ -18,6 +18,7 @@ import com.netflix.zuul.ZuulFilter;
 import com.netflix.zuul.context.RequestContext;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.manager.service.PulsarEvent;
+import org.apache.pulsar.manager.service.RolesService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -48,11 +49,14 @@ public class EnvironmentForward extends ZuulFilter {
 
     private final PulsarEvent pulsarEvent;
 
+    private final RolesService rolesService;
+
     @Autowired
     public EnvironmentForward(
-            EnvironmentCacheService environmentCacheService, PulsarEvent 
pulsarEvent) {
+            EnvironmentCacheService environmentCacheService, PulsarEvent 
pulsarEvent, RolesService rolesService) {
         this.environmentCacheService = environmentCacheService;
         this.pulsarEvent = pulsarEvent;
+        this.rolesService = rolesService;
     }
 
     @Override
@@ -80,22 +84,25 @@ public class EnvironmentForward extends ZuulFilter {
         String requestUri = request.getRequestURI();
         String token = request.getHeader("token");
 
-        if (!pulsarEvent.validateRoutePermission(requestUri, token)) {
-            ctx.setResponseBody("This operation does not have permission");
-            return null;
-        }
-        if (requestUri.startsWith("/admin/v2/tenants/")
-                || requestUri.startsWith("/admin/v2/namespaces")
-                || requestUri.startsWith("/admin/v2/persistent")
-                || requestUri.startsWith("/admin/v2/non-persistent")) {
-            Map<String, String> result = pulsarEvent.validateTenantPermission(
-                    requestUri, token);
-            if (result.get("error") != null) {
-                log.error("This operation does not have permission");
-                ctx.setResponseBody(result.get("error"));
+        if (!rolesService.isSuperUser(token)) {
+            if (!pulsarEvent.validateRoutePermission(requestUri, token)) {
+                ctx.setResponseBody("This operation does not have permission");
                 return null;
             }
+            if (requestUri.startsWith("/admin/v2/tenants/")
+                    || requestUri.startsWith("/admin/v2/namespaces")
+                    || requestUri.startsWith("/admin/v2/persistent")
+                    || requestUri.startsWith("/admin/v2/non-persistent")) {
+                Map<String, String> result = 
pulsarEvent.validateTenantPermission(
+                        requestUri, token);
+                if (result.get("error") != null) {
+                    log.error("This operation does not have permission");
+                    ctx.setResponseBody(result.get("error"));
+                    return null;
+                }
+            }
         }
+
         if (redirect != null && redirect.equals("true")) {
             String redirectScheme = request.getParameter("redirect.scheme");
             String redirectHost = request.getParameter("redirect.host");
diff --git a/src/main/resources/application.properties 
b/src/main/resources/application.properties
index fd9f41a..777891e 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -134,3 +134,6 @@ spring.thymeleaf.mode=HTML5
 # default environment configuration
 default.environment.name=
 default.environment.service_url=
+
+# support peek message, default false
+pulsar.peek.message=false

Reply via email to