sijie closed pull request #1962: Human readable sizes when triggering offload
URL: https://github.com/apache/incubator-pulsar/pull/1962
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index eb90342b40..a85df44b30 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -622,11 +622,13 @@ static MessageId 
findFirstLedgerWithinThreshold(List<PersistentTopicInternalStat
         long suffixSize = 0L;
 
         ledgers = Lists.reverse(ledgers);
+        long previousLedger = ledgers.get(0).ledgerId;
         for (PersistentTopicInternalStats.LedgerInfo l : ledgers) {
             suffixSize += l.size;
-            if (suffixSize >= sizeThreshold) {
-                return new MessageIdImpl(l.ledgerId, 0L, -1);
+            if (suffixSize > sizeThreshold) {
+                return new MessageIdImpl(previousLedger, 0L, -1);
             }
+            previousLedger = l.ledgerId;
         }
         return null;
     }
@@ -634,15 +636,16 @@ static MessageId 
findFirstLedgerWithinThreshold(List<PersistentTopicInternalStat
     @Parameters(commandDescription = "Trigger offload of data from a topic to 
long-term storage (e.g. Amazon S3)")
     private class Offload extends CliCommand {
         @Parameter(names = { "-s", "--size-threshold" },
-                   description = "Maximum amount of data to keep in BookKeeper 
for the specified topic",
+                   description = "Maximum amount of data to keep in BookKeeper 
for the specified topic (e.g. 10M, 5G).",
                    required = true)
-        private Long sizeThreshold;
+        private String sizeThresholdStr;
 
         @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
         private java.util.List<String> params;
 
         @Override
         void run() throws PulsarAdminException {
+            long sizeThreshold = validateSizeString(sizeThresholdStr);
             String persistentTopic = validatePersistentTopic(params);
 
             PersistentTopicInternalStats stats = 
topics.getInternalStats(persistentTopic);
diff --git 
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
 
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
index e3ee495d69..b993aa5500 100644
--- 
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
+++ 
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
@@ -22,6 +22,7 @@
 import com.google.common.collect.ImmutableMap;
 
 import java.net.URL;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -55,7 +56,8 @@
     private static final Logger log = 
LoggerFactory.getLogger(TestS3Offload.class);
 
     private static final String CLUSTER_NAME = "test";
-    private static final int ENTRIES_PER_LEDGER = 10;
+    private static final int ENTRY_SIZE = 1024;
+    private static final int ENTRIES_PER_LEDGER = 1024;
 
     @ArquillianResource
     DockerClient docker;
@@ -92,6 +94,16 @@ public void teardownBrokers() throws Exception {
 
     }
 
+    private static byte[] buildEntry(String pattern) {
+        byte[] entry = new byte[ENTRY_SIZE];
+        byte[] patternBytes = pattern.getBytes();
+
+        for (int i = 0; i < entry.length; i++) {
+            entry[i] = patternBytes[i % patternBytes.length];
+        }
+        return entry;
+    }
+
     @Test
     public void testPublishOffloadAndConsumeViaCLI() throws Exception {
         PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
@@ -114,15 +126,17 @@ public void testPublishOffloadAndConsumeViaCLI() throws 
Exception {
         bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker, 
CLUSTER_NAME));
 
         long firstLedger = -1;
-        try(PulsarClient client = PulsarClient.create(serviceUrl);
-            Producer producer = client.createProducer(topic)) {
+        try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+            Producer producer = client.newProducer().topic(topic)
+                .blockIfQueueFull(true).enableBatching(false).create()) {
             client.subscribe(topic, "my-sub").close();
+
             // write enough to topic to make it roll
             int i = 0;
             for (; i < ENTRIES_PER_LEDGER*1.5; i++) {
-                producer.send(("offload-message"+i).getBytes());
+                producer.sendAsync(buildEntry("offload-message"+i));
             }
-            MessageId latestMessage = 
producer.send(("offload-message"+i).getBytes());
+            MessageId latestMessage = 
producer.send(buildEntry("offload-message"+i));
 
             // read managed ledger info, check ledgers exist
             ManagedLedgerFactory mlf = new ManagedLedgerFactoryImpl(bkConf);
@@ -134,7 +148,7 @@ public void testPublishOffloadAndConsumeViaCLI() throws 
Exception {
             // first offload with a high threshold, nothing should offload
             String output = DockerUtils.runCommand(docker, broker,
                     "/pulsar/bin/pulsar-admin", "topics",
-                    "offload", "--size-threshold", 
String.valueOf(Long.MAX_VALUE),
+                    "offload", "--size-threshold", "100G",
                     topic);
             Assert.assertTrue(output.contains("Nothing to offload"));
 
@@ -145,7 +159,7 @@ public void testPublishOffloadAndConsumeViaCLI() throws 
Exception {
             // offload with a low threshold
             output = DockerUtils.runCommand(docker, broker,
                     "/pulsar/bin/pulsar-admin", "topics",
-                    "offload", "--size-threshold", "0",
+                    "offload", "--size-threshold", "1M",
                     topic);
             Assert.assertTrue(output.contains("Offload triggered"));
 
@@ -167,12 +181,12 @@ public void testPublishOffloadAndConsumeViaCLI() throws 
Exception {
         Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker, 
CLUSTER_NAME));
 
         log.info("Read back the data (which would be in that first ledger)");
-        try(PulsarClient client = PulsarClient.create(serviceUrl);
-            Consumer consumer = client.subscribe(topic, "my-sub")) {
+        try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+            Consumer consumer = 
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
             // read back from topic
             for (int i = 0; i < ENTRIES_PER_LEDGER*1.5; i++) {
-                Message m = consumer.receive();
-                Assert.assertEquals("offload-message"+i, new 
String(m.getData()));
+                Message m = consumer.receive(1, TimeUnit.MINUTES);
+                Assert.assertEquals(buildEntry("offload-message"+i), 
m.getData());
             }
         }
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to