This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e643c5e  Update message backlog figures in the database when 
clearing/deleting subscriptions. Keeping the UI up to date pending the next 
collector run. (#4559)
e643c5e is described below

commit e643c5e4968484b06f64e216cb110a733b1aa5db
Author: rshermanTHG <[email protected]>
AuthorDate: Thu Jun 20 11:18:02 2019 +0100

    Update message backlog figures in the database when clearing/deleting 
subscriptions. Keeping the UI up to date pending the next collector run. (#4559)
    
    Motivation
    When a subscription's backlog is cleared or the subscription is deleted the 
UI doesn't reflect the change in the backlog until the next run of the 
collector. This change updates the database with the new backlog count at the 
subscription and topic level.
    
    Modifications
    If the deletion of a subscription has a 204 response reduce the topic's 
message backlog count by the amount on the subscription.
    If the clearing of a subscription has a 204 response reduce the topic's 
message backlog count by the amount on the subscription and reduce the 
subscriptions backlog count to 0.
---
 dashboard/django/stats/models.py |  2 +-
 dashboard/django/stats/views.py  | 32 +++++++++++++++++++++++++-------
 2 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/dashboard/django/stats/models.py b/dashboard/django/stats/models.py
index 8daaa5b..86918f7 100644
--- a/dashboard/django/stats/models.py
+++ b/dashboard/django/stats/models.py
@@ -169,7 +169,7 @@ class Subscription(Model):
     unackedMessages  = BigIntegerField(default=0)
 
     class Meta:
-        index_together = ('name', 'topic', 'timestamp', 'deleted')
+        unique_together = ('name', 'topic', 'timestamp')
 
     def __str__(self):
         return self.name
diff --git a/dashboard/django/stats/views.py b/dashboard/django/stats/views.py
index a371aa0a..338d08a 100644
--- a/dashboard/django/stats/views.py
+++ b/dashboard/django/stats/views.py
@@ -143,7 +143,7 @@ def deleteNamespace(request, namespace_name):
 
 def topic(request, topic_name):
     timestamp = get_timestamp()
-    topic_name = 'persistent://' + topic_name.split('persistent/', 1)[1]
+    topic_name = extract_topic_db_name(topic_name)
     cluster_name = request.GET.get('cluster')
     clusters = []
 
@@ -309,7 +309,16 @@ def clusters(request):
 
 def clearSubscription(request, topic_name, subscription_name):
     url = settings.SERVICE_URL + '/admin/v2/' + topic_name + '/subscription/' 
+ subscription_name + '/skip_all'
-    requests.post(url)
+    response = requests.post(url)
+    if response.status_code == 204:
+        ts = get_timestamp()
+        topic_db_name = extract_topic_db_name(topic_name)
+        topic = Topic.objects.get(name=topic_db_name, timestamp=ts)
+        subscription = Subscription.objects.get(name=subscription_name, 
topic=topic, timestamp=ts)
+        topic.backlog = topic.backlog - subscription.msgBacklog
+        topic.save(update_fields=['backlog'])
+        subscription.msgBacklog = 0
+        subscription.save(update_fields=['msgBacklog'])
     return redirect('topic', topic_name=topic_name)
 
 def deleteSubscription(request, topic_name, subscription_name):
@@ -318,9 +327,11 @@ def deleteSubscription(request, topic_name, 
subscription_name):
     status = response.status_code
     if status == 204:
         ts = get_timestamp()
-        topic_db_name = 'persistent://' + topic_name.split('persistent/', 1)[1]
-        topic = Topic.objects.filter(name=topic_db_name, timestamp=ts)[0]
-        Subscription.objects.filter(name=subscription_name, topic=topic, 
timestamp=ts).update(deleted=True)
+        topic_db_name = extract_topic_db_name(topic_name)
+        topic = Topic.objects.get(name=topic_db_name, timestamp=ts)
+        deleted_subscription = 
Subscription.objects.get(name=subscription_name, topic=topic, timestamp=ts)
+        deleted_subscription.deleted = True
+        deleted_subscription.save(update_fields=['deleted'])
         subscriptions = Subscription.objects.filter(topic=topic, 
deleted=False, timestamp=ts)
         if not subscriptions:
             topic.deleted=True
@@ -328,10 +339,13 @@ def deleteSubscription(request, topic_name, 
subscription_name):
             m = re.search(r"persistent/(?P<namespace>.*)/.*", topic_name)
             namespace_name = m.group("namespace")
             return redirect('namespace', namespace_name=namespace_name)
+        else:
+            topic.backlog = topic.backlog - deleted_subscription.msgBacklog
+            topic.save(update_fields=['backlog'])
     return redirect('topic', topic_name=topic_name)
 
 def messages(request, topic_name, subscription_name):
-    topic_name = 'persistent://' + topic_name.split('persistent/', 1)[1]
+    topic_name = extract_topic_db_name(topic_name)
     timestamp = get_timestamp()
     cluster_name = request.GET.get('cluster')
 
@@ -356,4 +370,8 @@ def peek(request, topic_name, subscription_name, 
message_number):
     context = {
         'message_body' : json.dumps(json.loads(message), indent=4),
     }
-    return render(request, 'stats/peek.html', context)
\ No newline at end of file
+    return render(request, 'stats/peek.html', context)
+
+
+def extract_topic_db_name(topic_name):
+    return 'persistent://' + topic_name.split('persistent/', 1)[1]
\ No newline at end of file

Reply via email to