This is an automated email from the ASF dual-hosted git repository.
crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 9366a0ea [Improve] Admin GetStats: Fill in missing fields (#1309)
9366a0ea is described below
commit 9366a0eaff772b7097ab1bb2147e287460dc61ba
Author: crossoverJie <[email protected]>
AuthorDate: Thu Nov 21 16:58:18 2024 +0800
[Improve] Admin GetStats: Fill in missing fields (#1309)
* admin append topic stats
* lint
---
pulsaradmin/pkg/admin/topic_test.go | 52 +++++++++++++++++++++-
pulsaradmin/pkg/utils/data.go | 89 ++++++++++++++++++++++++++++---------
2 files changed, 119 insertions(+), 22 deletions(-)
diff --git a/pulsaradmin/pkg/admin/topic_test.go
b/pulsaradmin/pkg/admin/topic_test.go
index fced0542..734cce1c 100644
--- a/pulsaradmin/pkg/admin/topic_test.go
+++ b/pulsaradmin/pkg/admin/topic_test.go
@@ -123,18 +123,21 @@ func TestPartitionState(t *testing.T) {
assert.Nil(t, err)
defer client.Close()
+ subName := "my-sub"
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
- SubscriptionName: "my-sub",
+ SubscriptionName: subName,
Type: pulsar.Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()
// create producer
+ producerName := "test-producer"
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: topic,
DisableBatching: false,
+ Name: producerName,
})
assert.Nil(t, err)
defer producer.Close()
@@ -173,6 +176,53 @@ func TestPartitionState(t *testing.T) {
assert.Equal(t, len(subscriptionStats.Consumers), 0)
}
+ partition, err := topicName.GetPartition(0)
+ assert.Nil(t, err)
+ topicState, err := admin.Topics().GetStats(*partition)
+ assert.Nil(t, err)
+ assert.Equal(t, len(topicState.Publishers), 1)
+ publisher := topicState.Publishers[0]
+ assert.Equal(t, publisher.AccessModel, utils.ProduceModeShared)
+ assert.Equal(t, publisher.IsSupportsPartialProducer, false)
+ assert.Equal(t, publisher.ProducerName, producerName)
+ assert.Contains(t, publisher.Address, "127.0.0.1")
+ assert.Contains(t, publisher.ClientVersion, "Pulsar Go version")
+
+ sub := topicState.Subscriptions[subName]
+ assert.Equal(t, sub.BytesOutCounter, int64(0))
+ assert.Equal(t, sub.MsgOutCounter, int64(0))
+ assert.Equal(t, sub.MessageAckRate, float64(0))
+ assert.Equal(t, sub.ChunkedMessageRate, float64(0))
+ assert.Equal(t, sub.BacklogSize, int64(0))
+ assert.Equal(t, sub.EarliestMsgPublishTimeInBacklog, int64(0))
+ assert.Equal(t, sub.LastExpireTimestamp, int64(0))
+ assert.Equal(t, sub.TotalMsgExpired, int64(0))
+ assert.Equal(t, sub.LastMarkDeleteAdvancedTimestamp, int64(0))
+ assert.Equal(t, sub.IsDurable, true)
+ assert.Equal(t, sub.AllowOutOfOrderDelivery, false)
+ assert.Equal(t, sub.ConsumersAfterMarkDeletePosition,
map[string]string{})
+ assert.Equal(t, sub.NonContiguousDeletedMessagesRanges, 0)
+ assert.Equal(t, sub.NonContiguousDeletedMessagesRangesSrzSize, 0)
+ assert.Equal(t, sub.DelayedMessageIndexSizeInBytes, int64(0))
+ assert.Equal(t, sub.SubscriptionProperties, map[string]string{})
+ assert.Equal(t, sub.FilterProcessedMsgCount, int64(0))
+ assert.Equal(t, sub.FilterAcceptedMsgCount, int64(0))
+ assert.Equal(t, sub.FilterRejectedMsgCount, int64(0))
+ assert.Equal(t, sub.FilterRescheduledMsgCount, int64(0))
+
+ assert.Equal(t, len(sub.Consumers), 1)
+ consumerState := sub.Consumers[0]
+ assert.Equal(t, consumerState.BytesOutCounter, int64(0))
+ assert.Equal(t, consumerState.MsgOutCounter, int64(0))
+ assert.Equal(t, consumerState.MessageAckRate, float64(0))
+ assert.Equal(t, consumerState.ChunkedMessageRate, float64(0))
+ assert.Equal(t, consumerState.AvgMessagesPerEntry, int(0))
+ assert.Contains(t, consumerState.Address, "127.0.0.1")
+ assert.Contains(t, consumerState.ClientVersion, "Pulsar Go version")
+ assert.Equal(t, consumerState.LastAckedTimestamp, int64(0))
+ assert.Equal(t, consumerState.LastConsumedTimestamp, int64(0))
+ assert.True(t, consumerState.LastConsumedFlowTimestamp > 0)
+
}
func TestNonPartitionState(t *testing.T) {
randomName := newTopicName()
diff --git a/pulsaradmin/pkg/utils/data.go b/pulsaradmin/pkg/utils/data.go
index c9664e71..58673dcc 100644
--- a/pulsaradmin/pkg/utils/data.go
+++ b/pulsaradmin/pkg/utils/data.go
@@ -244,31 +244,67 @@ type TopicStats struct {
DeDuplicationStatus string
`json:"deduplicationStatus"`
}
+type ProducerAccessMode string
+
+const (
+ ProduceModeShared ProducerAccessMode = "Shared"
+ ProduceModeExclusive = "Exclusive"
+ ProduceModeExclusiveWithFencing =
"ExclusiveWithFencing"
+ ProduceModeWaitForExclusive = "WaitForExclusive"
+)
+
type PublisherStats struct {
- ProducerID int64 `json:"producerId"`
- MsgRateIn float64 `json:"msgRateIn"`
- MsgThroughputIn float64 `json:"msgThroughputIn"`
- AverageMsgSize float64 `json:"averageMsgSize"`
- Metadata map[string]string `json:"metadata"`
+ AccessModel ProducerAccessMode `json:"accessMode"`
+ ProducerID int64 `json:"producerId"`
+ MsgRateIn float64 `json:"msgRateIn"`
+ MsgThroughputIn float64 `json:"msgThroughputIn"`
+ AverageMsgSize float64 `json:"averageMsgSize"`
+ ChunkedMessageRate float64 `json:"chunkedMessageRate"`
+ IsSupportsPartialProducer bool
`json:"supportsPartialProducer"`
+ ProducerName string `json:"producerName"`
+ Address string `json:"address"`
+ ConnectedSince string `json:"connectedSince"`
+ ClientVersion string `json:"clientVersion"`
+ Metadata map[string]string `json:"metadata"`
}
type SubscriptionStats struct {
- BlockedSubscriptionOnUnackedMsgs bool
`json:"blockedSubscriptionOnUnackedMsgs"`
- IsReplicated bool `json:"isReplicated"`
- LastConsumedFlowTimestamp int64
`json:"lastConsumedFlowTimestamp"`
- LastConsumedTimestamp int64
`json:"lastConsumedTimestamp"`
- LastAckedTimestamp int64
`json:"lastAckedTimestamp"`
- MsgRateOut float64 `json:"msgRateOut"`
- MsgThroughputOut float64
`json:"msgThroughputOut"`
- MsgRateRedeliver float64
`json:"msgRateRedeliver"`
- MsgRateExpired float64 `json:"msgRateExpired"`
- MsgBacklog int64 `json:"msgBacklog"`
- MsgBacklogNoDelayed int64
`json:"msgBacklogNoDelayed"`
- MsgDelayed int64 `json:"msgDelayed"`
- UnAckedMessages int64
`json:"unackedMessages"`
- SubType string `json:"type"`
- ActiveConsumerName string
`json:"activeConsumerName"`
- Consumers []ConsumerStats `json:"consumers"`
+ BlockedSubscriptionOnUnackedMsgs bool
`json:"blockedSubscriptionOnUnackedMsgs"`
+ IsReplicated bool
`json:"isReplicated"`
+ LastConsumedFlowTimestamp int64
`json:"lastConsumedFlowTimestamp"`
+ LastConsumedTimestamp int64
`json:"lastConsumedTimestamp"`
+ LastAckedTimestamp int64
`json:"lastAckedTimestamp"`
+ MsgRateOut float64
`json:"msgRateOut"`
+ MsgThroughputOut float64
`json:"msgThroughputOut"`
+ MsgRateRedeliver float64
`json:"msgRateRedeliver"`
+ MsgRateExpired float64
`json:"msgRateExpired"`
+ MsgBacklog int64
`json:"msgBacklog"`
+ MsgBacklogNoDelayed int64
`json:"msgBacklogNoDelayed"`
+ MsgDelayed int64
`json:"msgDelayed"`
+ UnAckedMessages int64
`json:"unackedMessages"`
+ SubType string
`json:"type"`
+ ActiveConsumerName string
`json:"activeConsumerName"`
+ BytesOutCounter int64
`json:"bytesOutCounter"`
+ MsgOutCounter int64
`json:"msgOutCounter"`
+ MessageAckRate float64
`json:"messageAckRate"`
+ ChunkedMessageRate float64
`json:"chunkedMessageRate"`
+ BacklogSize int64
`json:"backlogSize"`
+ EarliestMsgPublishTimeInBacklog int64
`json:"earliestMsgPublishTimeInBacklog"`
+ TotalMsgExpired int64
`json:"totalMsgExpired"`
+ LastExpireTimestamp int64
`json:"lastExpireTimestamp"`
+ LastMarkDeleteAdvancedTimestamp int64
`json:"lastMarkDeleteAdvancedTimestamp"`
+ Consumers []ConsumerStats
`json:"consumers"`
+ IsDurable bool
`json:"isDurable"`
+ AllowOutOfOrderDelivery bool
`json:"allowOutOfOrderDelivery"`
+ ConsumersAfterMarkDeletePosition map[string]string
`json:"consumersAfterMarkDeletePosition"`
+ NonContiguousDeletedMessagesRanges int
`json:"nonContiguousDeletedMessagesRanges"`
+ NonContiguousDeletedMessagesRangesSrzSize int
`json:"nonContiguousDeletedMessagesRangesSerializedSize"`
+ DelayedMessageIndexSizeInBytes int64
`json:"delayedMessageIndexSizeInBytes"`
+ SubscriptionProperties map[string]string
`json:"subscriptionProperties"`
+ FilterProcessedMsgCount int64
`json:"filterProcessedMsgCount"`
+ FilterAcceptedMsgCount int64
`json:"filterAcceptedMsgCount"`
+ FilterRejectedMsgCount int64
`json:"filterRejectedMsgCount"`
+ FilterRescheduledMsgCount int64
`json:"filterRescheduledMsgCount"`
}
type ConsumerStats struct {
@@ -279,6 +315,17 @@ type ConsumerStats struct {
MsgThroughputOut float64 `json:"msgThroughputOut"`
MsgRateRedeliver float64 `json:"msgRateRedeliver"`
ConsumerName string `json:"consumerName"`
+ BytesOutCounter int64 `json:"bytesOutCounter"`
+ MsgOutCounter int64 `json:"msgOutCounter"`
+ MessageAckRate float64 `json:"messageAckRate"`
+ ChunkedMessageRate float64
`json:"chunkedMessageRate"`
+ AvgMessagesPerEntry int
`json:"avgMessagesPerEntry"`
+ Address string `json:"address"`
+ ConnectedSince string `json:"connectedSince"`
+ ClientVersion string `json:"clientVersion"`
+ LastAckedTimestamp int64
`json:"lastAckedTimestamp"`
+ LastConsumedTimestamp int64
`json:"lastConsumedTimestamp"`
+ LastConsumedFlowTimestamp int64
`json:"lastConsumedFlowTimestamp"`
Metadata map[string]string `json:"metadata"`
}