This is an automated email from the ASF dual-hosted git repository.
guangning pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-manager.git
The following commit(s) were added to refs/heads/master by this push:
new 9dde739 Support peek messages from pulsar broker (#241)
9dde739 is described below
commit 9dde739477b743392b325ecdfcaa55932f2e450d
Author: Guangning <[email protected]>
AuthorDate: Thu Feb 6 09:37:47 2020 +0800
Support peek messages from pulsar broker (#241)
### Motivation
At present, pulsar-manager does not support a peek message. After the pr is
merged, pulsar-manager will support a peek message.
### Modifications
* Support peek messages, include the batch message and non-batch message
* Upgrade pulsar-client version to 2.4.2

---
build.gradle | 1 +
front-end/src/api/subscriptions.js | 14 ++
front-end/src/lang/en.js | 12 ++
front-end/src/lang/zh.js | 12 ++
.../management/subscriptions/subscription.vue | 145 ++++++++++++---------
.../views/management/topics/partitionedTopic.vue | 3 +
front-end/src/views/management/topics/topic.vue | 3 +
front-end/src/views/management/users/index.vue | 2 +-
gradle.properties | 2 +-
.../manager/controller/TopicsController.java | 86 ++++++++++++
.../pulsar/manager/zuul/EnvironmentForward.java | 35 +++--
src/main/resources/application.properties | 3 +
12 files changed, 242 insertions(+), 76 deletions(-)
diff --git a/build.gradle b/build.gradle
index 5aeb716..36a0f1d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -107,6 +107,7 @@ dependencies {
compile group: 'com.google.guava', name: 'guava', version: guavaVersion
compile group: 'com.google.code.gson', name: 'gson', version: gsonVersion
compile group: 'org.apache.pulsar', name: 'pulsar-common', version:
pulsarVersion
+ compile group: 'org.apache.pulsar', name: 'pulsar-client-admin', version:
pulsarVersion
compile group: 'io.springfox', name: 'springfox-swagger2', version:
swagger2Version
compile group: 'io.springfox', name: 'springfox-swagger-ui', version:
swaggeruiVersion
compile group: 'org.apache.pulsar', name: 'pulsar-broker', version:
brokerVersion
diff --git a/front-end/src/api/subscriptions.js
b/front-end/src/api/subscriptions.js
index 5a002f2..72aa641 100644
--- a/front-end/src/api/subscriptions.js
+++ b/front-end/src/api/subscriptions.js
@@ -15,6 +15,8 @@ import request from '@/utils/request'
const BASE_URL_V2 = '/admin/v2'
+const SPRING_BASE_URL = '/pulsar-manager/admin/v2'
+
export function fetchSubscriptions(persistent, tenantNamespaceTopic) {
return request({
url: BASE_URL_V2 + `/${persistent}/${tenantNamespaceTopic}/subscriptions`,
@@ -57,3 +59,15 @@ export function deleteSubscriptionOnCluster(cluster,
persistent, tenantNamespace
method: 'delete'
})
}
+
+export function peekMessagesOnCluster(cluster, persistent,
tenantNamespaceTopic, subName, messagePosition) {
+ return request({
+ headers: {
+ 'Content-Type': 'application/json',
+ 'x-pulsar-cluster': cluster
+ },
+ url: SPRING_BASE_URL +
`/${persistent}/${tenantNamespaceTopic}/subscription/${subName}/${messagePosition}`,
+ method: 'get'
+ })
+}
+
diff --git a/front-end/src/lang/en.js b/front-end/src/lang/en.js
index 3a2d729..5f5fd59 100644
--- a/front-end/src/lang/en.js
+++ b/front-end/src/lang/en.js
@@ -558,6 +558,18 @@ export default {
storageSize: 'Storage Size',
entries: 'Entries',
segments: 'Segments',
+ peek: 'PEEK',
+ peekMessages: 'messages',
+ entryId: 'Entry Id',
+ message: 'Message',
+ messageGreaterThanZero: 'Messages should greater than 0',
+ peekMessageError: 'Peek messages error, please check',
+ messageSkipSuccess: 'Messages skip successfully',
+ expireMessageSuccess: 'Messages expire successfully',
+ clearMessageSuccess: 'Clear messages successfully',
+ minutesNotLessThanZero: 'Minutes cannot be less than 0',
+ resetCursorSuccess: 'Reset cursor successfully',
+ messageIdNotLessThanZero: 'Message Id cannot be less than 0',
newSub: 'New Subscription',
sub: 'Subscribe',
unsub: 'Unsubscribe',
diff --git a/front-end/src/lang/zh.js b/front-end/src/lang/zh.js
index d09b9b3..59afe53 100644
--- a/front-end/src/lang/zh.js
+++ b/front-end/src/lang/zh.js
@@ -558,6 +558,18 @@ export default {
storageSize: 'Storage Size',
entries: 'Entries',
segments: 'Segments',
+ peek: 'PEEK',
+ peekMessages: 'messages',
+ entryId: 'Entry Id',
+ message: 'Message',
+ messageGreaterThanZero: 'Messages should greater than 0',
+ peekMessageError: 'Peek messages error, please check',
+ messageSkipSuccess: 'Messages skip successfully',
+ expireMessageSuccess: 'Messages expire successfully',
+ clearMessageSuccess: 'Clear messages successfully',
+ minutesNotLessThanZero: 'Minutes cannot be less than 0',
+ resetCursorSuccess: 'Reset cursor successfully',
+ messageIdNotLessThanZero: 'Message Id cannot be less than 0',
newSub: 'New Subscription',
sub: 'Subscribe',
unsub: 'Unsubscribe',
diff --git a/front-end/src/views/management/subscriptions/subscription.vue
b/front-end/src/views/management/subscriptions/subscription.vue
index d890be8..afb4a32 100644
--- a/front-end/src/views/management/subscriptions/subscription.vue
+++ b/front-end/src/views/management/subscriptions/subscription.vue
@@ -102,38 +102,6 @@
</el-tab-pane>
<el-tab-pane :label="$t('topic.backlogOpeartion')"
name="backlogOperation">
<el-tabs v-model="leftActiveName" :tab-position="tabPosition"
@tab-click="handleLeftTabClick">
- <!-- <el-tab-pane label="INSPECT" name="inspect">
- <el-form :inline="true" :model="form">
- <el-button type="primary"
@click="handlePeekMessages">Peek</el-button>
- <el-form-item>
- <el-input v-model="form.peekNumMessages"
placeholder="messages"/>
- </el-form-item>
- <span>messages</span>
- </el-form>
- <el-row :gutter="24" style="margin-top:15px">
- <el-col :xs="{span: 24}" :sm="{span: 24}" :md="{span: 24}"
:lg="{span: 24}" :xl="{span: 24}" style="padding-right:8px;margin-bottom:30px;">
- <el-table
- v-loading="inspectListLoading"
- :key="inspectTableKey"
- :data="inspectsList"
- border
- fit
- highlight-current-row
- style="width: 100%;">
- <el-table-column label="Message ID" min-width="10px"
align="center">
- <template slot-scope="scope">
- <span>{{ scope.row.messageId }}</span>
- </template>
- </el-table-column>
- <el-table-column label="data" min-width="30px"
align="center">
- <template slot-scope="scope">
- <span>{{ scope.row.data }}</span>
- </template>
- </el-table-column>
- </el-table>
- </el-col>
- </el-row>
- </el-tab-pane> -->
<el-tab-pane :label="$t('topic.subscription.skip')" name="skip">
<el-form :inline="true" :model="form">
<el-button type="primary" @click="handleSkipMessages">{{
$t('topic.subscription.skip') }}</el-button>
@@ -181,6 +149,43 @@
</el-form-item>
</el-form>
</el-tab-pane>
+ <el-tab-pane label="INSPECT" name="peek">
+ <el-form :inline="true" :model="form">
+ <el-button type="primary" @click="handlePeekMessages">{{
$t('topic.subscription.peek') }}</el-button>
+ <el-form-item>
+ <el-input v-model="form.peekNumMessages"
placeholder="messages"/>
+ </el-form-item>
+ <span>{{ $t('topic.subscription.peekMessages') }}</span>
+ </el-form>
+ <el-row :gutter="24" style="margin-top:15px">
+ <el-col :xs="{span: 24}" :sm="{span: 24}" :md="{span: 24}"
:lg="{span: 24}" :xl="{span: 24}"
style="padding-right:20px;margin-bottom:30px;">
+ <el-table
+ v-loading="inspectListLoading"
+ :key="inspectTableKey"
+ :data="inspectsList"
+ border
+ fit
+ highlight-current-row
+ style="width: 100%;">
+ <el-table-column :label="$t('topic.segment.ledgerId')"
min-width="10px" align="center">
+ <template slot-scope="scope">
+ <span>{{ scope.row.ledgerId }}</span>
+ </template>
+ </el-table-column>
+ <el-table-column :label="$t('topic.subscription.entryId')"
min-width="10px" align="center">
+ <template slot-scope="scope">
+ <span>{{ scope.row.entryId }}</span>
+ </template>
+ </el-table-column>
+ <el-table-column :label="$t('topic.subscription.message')"
min-width="30px" align="center">
+ <template slot-scope="scope">
+ <span>{{ scope.row.data }}</span>
+ </template>
+ </el-table-column>
+ </el-table>
+ </el-col>
+ </el-row>
+ </el-tab-pane>
</el-tabs>
</el-tab-pane>
</el-tabs>
@@ -193,7 +198,6 @@ import { fetchNamespaces, getClusters } from
'@/api/namespaces'
import {
fetchTopicsByPulsarManager,
fetchTopicStats,
- // peekMessages,
skipOnCluster,
expireMessageOnCluster,
clearBacklogOnCluster,
@@ -201,7 +205,7 @@ import {
resetCursorByTimestampOnCluster,
resetCursorByPositionOnCluster
} from '@/api/topics'
-import { fetchSubscriptions } from '@/api/subscriptions'
+import { fetchSubscriptions, peekMessagesOnCluster } from '@/api/subscriptions'
import { formatBytes } from '@/utils/index'
import { numberFormatter } from '@/filters/index'
@@ -457,28 +461,49 @@ export default {
},
getConsumers() {
},
- // To do, parse message
- // handlePeekMessages() {
- // if (this.form.peekNumMessages <= 0) {
- // this.$notify({
- // title: 'error',
- // message: 'Messages should greater than 0',
- // type: 'error',
- // duration: 3000
- // })
- // return
- // }
- // peekMessages(this.postForm.persistent, this.tenantNamespaceTopic,
this.postForm.subscription, this.form.peekNumMessages).then(response => {
- // if (!response.data) return
- // console.log(response)
- // console.log(response.data)
- // })
- // },
+ handlePeekMessages() {
+ if (this.form.peekNumMessages <= 0) {
+ this.$notify({
+ title: 'error',
+ message: this.$i18n.t('topic.subscription.messageGreaterThanZero'),
+ type: 'error',
+ duration: 3000
+ })
+ return
+ }
+ peekMessagesOnCluster(
+ this.getCurrentCluster(),
+ this.postForm.persistent,
+ this.tenantNamespaceTopic,
+ this.postForm.subscription,
+ this.form.peekNumMessages).then(response => {
+ if (!response.data) return
+ if (!response.data.data) return
+ if (response.data.data.hasOwnProperty('error')) {
+ this.$notify({
+ title: 'error',
+ message: this.$i18n.t('topic.subscription.peekMessageError'),
+ type: 'error',
+ duration: 3000
+ })
+ return
+ }
+ this.inspectsList = []
+ for (var i = 0; i < response.data.data.length; i++) {
+ this.inspectsList.push({
+ 'messageId': i,
+ 'entryId': response.data.data[i].entryId,
+ 'ledgerId': response.data.data[i].ledgerId,
+ 'data': window.atob(response.data.data[i].data)
+ })
+ }
+ })
+ },
handleSkipMessages() {
if (this.form.skipNumMessages <= 0) {
this.$notify({
title: 'error',
- message: 'Messages should greater than 0',
+ message: this.$i18n.t('topic.subscription.messageGreaterThanZero'),
type: 'error',
duration: 3000
})
@@ -487,7 +512,7 @@ export default {
skipOnCluster(this.getCurrentCluster(), this.postForm.persistent,
this.getFullTopic(), this.postForm.subscription,
this.form.skipNumMessages).then(response => {
this.$notify({
title: 'success',
- message: 'Messages skip success',
+ message: this.$i18n.t('topic.subscription.messageSkipSuccess'),
type: 'success',
duration: 3000
})
@@ -497,7 +522,7 @@ export default {
if (this.form.expireMessages <= 0) {
this.$notify({
title: 'error',
- message: 'Messages should greater than 0',
+ message: this.$i18n.t('topic.subscription.messageGreaterThanZero'),
type: 'error',
duration: 3000
})
@@ -506,7 +531,7 @@ export default {
expireMessageOnCluster(this.getCurrentCluster(),
this.postForm.persistent, this.getFullTopic(), this.postForm.subscription,
this.form.expireNumMessages).then(response => {
this.$notify({
title: 'success',
- message: 'Messages expire success',
+ message: this.$i18n.t('topic.subscription.expireMessageSuccess'),
type: 'success',
duration: 3000
})
@@ -516,7 +541,7 @@ export default {
clearBacklogOnCluster(this.getCurrentCluster(),
this.postForm.persistent, this.getFullTopic(),
this.postForm.subscription).then(response => {
this.$notify({
title: 'success',
- message: 'Clear messages success',
+ message: this.$i18n.t('topic.subscription.clearMessageSuccess'),
type: 'success',
duration: 3000
})
@@ -526,7 +551,7 @@ export default {
if (parseInt(this.form.minutes) <= 0) {
this.$notify({
title: 'error',
- message: 'Minutes cannot be less than 0',
+ message: this.$i18n.t('topic.subscription.minutesNotLessThanZero'),
type: 'error',
duration: 3000
})
@@ -541,7 +566,7 @@ export default {
this.postForm.subscription, timestamp).then(response => {
this.$notify({
title: 'success',
- message: 'Reset cursor success',
+ message: this.$i18n.t('topic.subscription.resetCursorSuccess'),
type: 'success',
duration: 3000
})
@@ -551,7 +576,7 @@ export default {
if (this.form.messagesId.length <= 0 && this.form.ledgerValue != null) {
this.$notify({
title: 'error',
- message: 'Message Id cannot be less than 0',
+ message: this.$i18n.t('topic.subscription.messageIdNotLessThanZero'),
type: 'error',
duration: 3000
})
@@ -564,7 +589,7 @@ export default {
resetCursorByPositionOnCluster(this.getCurrentCluster(),
this.postForm.persistent, this.getFullTopic(), this.postForm.subscription,
data).then(response => {
this.$notify({
title: 'success',
- message: 'Reset cursor success',
+ message: this.$i18n.t('topic.subscription.resetCursorSuccess'),
type: 'success',
duration: 3000
})
diff --git a/front-end/src/views/management/topics/partitionedTopic.vue
b/front-end/src/views/management/topics/partitionedTopic.vue
index 1601773..b0857a3 100644
--- a/front-end/src/views/management/topics/partitionedTopic.vue
+++ b/front-end/src/views/management/topics/partitionedTopic.vue
@@ -121,6 +121,9 @@
<router-link :to="scope.row.subscriptionLink +
'?topTab=backlogOperation&leftTab=reset'" class="link-type">
<el-dropdown-item command="reset">{{
$t('topic.subscription.reset') }}</el-dropdown-item>
</router-link>
+ <router-link :to="scope.row.subscriptionLink +
'?topTab=backlogOperation&leftTab=peek'" class="link-type">
+ <el-dropdown-item command="peek">{{
$t('topic.subscription.peek') }}</el-dropdown-item>
+ </router-link>
</el-dropdown-menu>
</el-dropdown>
<el-dropdown v-else @command="handleAllSub">
diff --git a/front-end/src/views/management/topics/topic.vue
b/front-end/src/views/management/topics/topic.vue
index 43a0b3a..1d211c9 100644
--- a/front-end/src/views/management/topics/topic.vue
+++ b/front-end/src/views/management/topics/topic.vue
@@ -326,6 +326,9 @@
<router-link :to="scope.row.subscriptionLink +
'?topTab=backlogOperation&leftTab=reset'" class="link-type">
<el-dropdown-item command="reset">{{
$t('topic.subscription.reset') }}</el-dropdown-item>
</router-link>
+ <router-link :to="scope.row.subscriptionLink +
'?topTab=backlogOperation&leftTab=peek'" class="link-type">
+ <el-dropdown-item command="peek">{{
$t('topic.subscription.peek') }}</el-dropdown-item>
+ </router-link>
<el-dropdown-item command="unsub">{{
$t('topic.subscription.unsub') }}</el-dropdown-item>
</el-dropdown-menu>
</el-dropdown>
diff --git a/front-end/src/views/management/users/index.vue
b/front-end/src/views/management/users/index.vue
index 91019fe..2ce567f 100644
--- a/front-end/src/views/management/users/index.vue
+++ b/front-end/src/views/management/users/index.vue
@@ -52,7 +52,7 @@
<span>{{ scope.row.location }}</span>
</template>
</el-table-column>
- <el-table-column :label="$t('usesr.colUserCompany')" align="center"
min-width="100px">
+ <el-table-column :label="$t('user.colUserCompany')" align="center"
min-width="100px">
<template slot-scope="scope">
<span>{{ scope.row.company }}</span>
</template>
diff --git a/gradle.properties b/gradle.properties
index e8fb19a..369e6f6 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -9,7 +9,7 @@ lombokVersion=1.18.10
pageHelperVersion=1.2.4
mockitoVersion=1.10.19
guavaVersion=21.0
-pulsarVersion=2.4.0
+pulsarVersion=2.4.2
swagger2Version=2.9.2
swaggeruiVersion=2.9.2
apiMockitoVersion=1.7.1
diff --git
a/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
b/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
index 8e94b4d..56531fb 100644
--- a/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
+++ b/src/main/java/org/apache/pulsar/manager/controller/TopicsController.java
@@ -13,6 +13,15 @@
*/
package org.apache.pulsar.manager.controller;
+import com.google.common.collect.Maps;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.manager.service.EnvironmentCacheService;
import org.apache.pulsar.manager.service.TopicsService;
import io.swagger.annotations.Api;
@@ -22,6 +31,8 @@ import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.hibernate.validator.constraints.Range;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
@@ -32,6 +43,8 @@ import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import javax.validation.constraints.Min;
import javax.validation.constraints.Size;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
/**
@@ -47,6 +60,12 @@ public class TopicsController {
private final EnvironmentCacheService environmentCacheService;
private final HttpServletRequest request;
+ @Value("${backend.jwt.token}")
+ private String token;
+
+ @Value("${pulsar.peek.message}")
+ private boolean peekMessage;
+
@Autowired
public TopicsController(
TopicsService topicsService,
@@ -110,4 +129,71 @@ public class TopicsController {
tenant, namespace,
env, serviceUrl);
}
+
+ @ApiOperation(value = "Peek messages from pulsar broker")
+ @ApiResponses({
+ @ApiResponse(code = 200, message = "ok"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @RequestMapping(
+ value =
"/{persistent}/{tenant}/{namespace}/{topic}/subscription/{subName}/{messagePosition}",
+ method = RequestMethod.GET)
+ public ResponseEntity<Map<String, Object>> peekMessages(
+ @PathVariable String persistent,
+ @PathVariable String tenant,
+ @PathVariable String namespace,
+ @PathVariable String topic,
+ @PathVariable String subName,
+ @PathVariable Integer messagePosition) {
+ String requestHost = environmentCacheService.getServiceUrl(request);
+ Map<String, Object> result = Maps.newHashMap();
+ if (!peekMessage) {
+ result.put("error", "If you want to support peek message," +
+ "turn on option pulsar.peek.message in file
application.properties");
+ return ResponseEntity.ok(result);
+ }
+ PulsarAdmin pulsarAdmin = null;
+ // to do check permission for non super, waiting for
https://github.com/apache/pulsar-manager/pull/238
+ try {
+ PulsarAdminBuilder pulsarAdminBuilder = PulsarAdmin.builder();
+ if (token != null && token.length() > 0) {
+
pulsarAdminBuilder.authentication(AuthenticationFactory.token(token));
+ }
+ pulsarAdmin =
pulsarAdminBuilder.serviceHttpUrl(requestHost).build();
+ String topicFullPath = persistent + "://" + tenant + "/" +
namespace + "/" + topic;
+ List<Message<byte[]>> messages =
pulsarAdmin.topics().peekMessages(topicFullPath, subName, messagePosition);
+ List<Map<String, Object>> mapList = new ArrayList<>();
+ for (Message<byte[]> msg: messages) {
+ Map<String, Object> message = Maps.newHashMap();
+ if (msg.getMessageId() instanceof BatchMessageIdImpl) {
+ BatchMessageIdImpl msgId = (BatchMessageIdImpl)
msg.getMessageId();
+ message.put("ledgerId", msgId.getLedgerId());
+ message.put("entryId", msgId.getEntryId());
+ message.put("batchIndex", msgId.getBatchIndex());
+ message.put("batch", true);
+ } else {
+ MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+ message.put("batch", false);
+ message.put("ledgerId", msgId.getLedgerId());
+ message.put("entryId", msgId.getEntryId());
+ }
+ if (msg.getProperties().size() > 0) {
+ msg.getProperties().forEach((k, v) -> {
+ message.put(k, v);
+ });
+ }
+ message.put("data", msg.getData());
+ mapList.add(message);
+ }
+ result.put("data", mapList);
+ } catch (PulsarClientException clientException) {
+ result.put("error", clientException.getMessage());
+ } catch (PulsarAdminException adminException) {
+ result.put("error", adminException.getMessage());
+ }
+ if (pulsarAdmin != null) {
+ pulsarAdmin.close();
+ }
+ return ResponseEntity.ok(result);
+ }
}
\ No newline at end of file
diff --git
a/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
b/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
index e17f154..0965f3d 100644
--- a/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
+++ b/src/main/java/org/apache/pulsar/manager/zuul/EnvironmentForward.java
@@ -18,6 +18,7 @@ import com.netflix.zuul.ZuulFilter;
import com.netflix.zuul.context.RequestContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.manager.service.PulsarEvent;
+import org.apache.pulsar.manager.service.RolesService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -48,11 +49,14 @@ public class EnvironmentForward extends ZuulFilter {
private final PulsarEvent pulsarEvent;
+ private final RolesService rolesService;
+
@Autowired
public EnvironmentForward(
- EnvironmentCacheService environmentCacheService, PulsarEvent
pulsarEvent) {
+ EnvironmentCacheService environmentCacheService, PulsarEvent
pulsarEvent, RolesService rolesService) {
this.environmentCacheService = environmentCacheService;
this.pulsarEvent = pulsarEvent;
+ this.rolesService = rolesService;
}
@Override
@@ -80,22 +84,25 @@ public class EnvironmentForward extends ZuulFilter {
String requestUri = request.getRequestURI();
String token = request.getHeader("token");
- if (!pulsarEvent.validateRoutePermission(requestUri, token)) {
- ctx.setResponseBody("This operation does not have permission");
- return null;
- }
- if (requestUri.startsWith("/admin/v2/tenants/")
- || requestUri.startsWith("/admin/v2/namespaces")
- || requestUri.startsWith("/admin/v2/persistent")
- || requestUri.startsWith("/admin/v2/non-persistent")) {
- Map<String, String> result = pulsarEvent.validateTenantPermission(
- requestUri, token);
- if (result.get("error") != null) {
- log.error("This operation does not have permission");
- ctx.setResponseBody(result.get("error"));
+ if (!rolesService.isSuperUser(token)) {
+ if (!pulsarEvent.validateRoutePermission(requestUri, token)) {
+ ctx.setResponseBody("This operation does not have permission");
return null;
}
+ if (requestUri.startsWith("/admin/v2/tenants/")
+ || requestUri.startsWith("/admin/v2/namespaces")
+ || requestUri.startsWith("/admin/v2/persistent")
+ || requestUri.startsWith("/admin/v2/non-persistent")) {
+ Map<String, String> result =
pulsarEvent.validateTenantPermission(
+ requestUri, token);
+ if (result.get("error") != null) {
+ log.error("This operation does not have permission");
+ ctx.setResponseBody(result.get("error"));
+ return null;
+ }
+ }
}
+
if (redirect != null && redirect.equals("true")) {
String redirectScheme = request.getParameter("redirect.scheme");
String redirectHost = request.getParameter("redirect.host");
diff --git a/src/main/resources/application.properties
b/src/main/resources/application.properties
index fd9f41a..777891e 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -134,3 +134,6 @@ spring.thymeleaf.mode=HTML5
# default environment configuration
default.environment.name=
default.environment.service_url=
+
+# support peek message, default false
+pulsar.peek.message=false