lucperkins commented on a change in pull request #1865: CLI for offload
URL: https://github.com/apache/incubator-pulsar/pull/1865#discussion_r191882225
 
 

 ##########
 File path: 
pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
 ##########
 @@ -614,6 +618,93 @@ void run() throws PulsarAdminException {
         }
     }
 
+    static MessageId 
findFirstLedgerWithinThreshold(List<PersistentTopicInternalStats.LedgerInfo> 
ledgers,
+                                                    long sizeThreshold) {
+        long suffixSize = 0L;
+
+        ledgers = Lists.reverse(ledgers);
+        for (PersistentTopicInternalStats.LedgerInfo l : ledgers) {
+            suffixSize += l.size;
+            if (suffixSize >= sizeThreshold) {
+                return new MessageIdImpl(l.ledgerId, 0L, -1);
+            }
+        }
+        return null;
+    }
+
+    @Parameters(commandDescription = "Trigger offload data from a topic to 
long term storage")
+    private class Offload extends CliCommand {
+        @Parameter(names = { "-s", "--size-threshold" },
+                   description = "Max amount of data to keep in bookkeeper for 
topic", required = true)
+        private Long sizeThreshold;
+
+        @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+
+            PersistentTopicInternalStats stats = 
topics.getInternalStats(persistentTopic);
+            if (stats.ledgers.size() < 1) {
+                throw new PulsarAdminException("Topic doesn't have any data");
+            }
+
+            LinkedList<PersistentTopicInternalStats.LedgerInfo> ledgers = new 
LinkedList(stats.ledgers);
+            ledgers.get(ledgers.size()-1).size = stats.currentLedgerSize; // 
doesn't get filled in now it seems
+            MessageId messageId = findFirstLedgerWithinThreshold(ledgers, 
sizeThreshold);
+
+            if (messageId == null) {
+                System.out.println("Nothing to offload");
+                return;
+            }
+
+            topics.triggerOffload(persistentTopic, messageId);
+            System.out.println("Offload triggered for " + persistentTopic + " 
for messages before " + messageId);
+        }
+    }
+
+    @Parameters(commandDescription = "Status of offloading on a topic")
 
 Review comment:
   "Check the status of data offloading from a topic to long-term storage"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to