This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e104085  Human readable sizes when triggering offload (#1962)
e104085 is described below

commit e10408504d8c7c77e3c20849e90416d0b14fb80a
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Jun 14 18:21:05 2018 +0200

    Human readable sizes when triggering offload (#1962)
    
    When triggering offload from the CLI, allow the user to specify human
    readable sizes, like 10M, 100G etc.
    
    This patch also fixes a bug where the size wasn't being correctly used
    to find the message ID to offload at.
    
    Master Issue: #1511
---
 .../org/apache/pulsar/admin/cli/CmdTopics.java     | 11 ++++---
 .../pulsar/tests/integration/TestS3Offload.java    | 36 +++++++++++++++-------
 2 files changed, 32 insertions(+), 15 deletions(-)

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index eb90342..a85df44 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -622,11 +622,13 @@ public class CmdTopics extends CmdBase {
         long suffixSize = 0L;
 
         ledgers = Lists.reverse(ledgers);
+        long previousLedger = ledgers.get(0).ledgerId;
         for (PersistentTopicInternalStats.LedgerInfo l : ledgers) {
             suffixSize += l.size;
-            if (suffixSize >= sizeThreshold) {
-                return new MessageIdImpl(l.ledgerId, 0L, -1);
+            if (suffixSize > sizeThreshold) {
+                return new MessageIdImpl(previousLedger, 0L, -1);
             }
+            previousLedger = l.ledgerId;
         }
         return null;
     }
@@ -634,15 +636,16 @@ public class CmdTopics extends CmdBase {
     @Parameters(commandDescription = "Trigger offload of data from a topic to 
long-term storage (e.g. Amazon S3)")
     private class Offload extends CliCommand {
         @Parameter(names = { "-s", "--size-threshold" },
-                   description = "Maximum amount of data to keep in BookKeeper 
for the specified topic",
+                   description = "Maximum amount of data to keep in BookKeeper 
for the specified topic (e.g. 10M, 5G).",
                    required = true)
-        private Long sizeThreshold;
+        private String sizeThresholdStr;
 
         @Parameter(description = "persistent://tenant/namespace/topic", 
required = true)
         private java.util.List<String> params;
 
         @Override
         void run() throws PulsarAdminException {
+            long sizeThreshold = validateSizeString(sizeThresholdStr);
             String persistentTopic = validatePersistentTopic(params);
 
             PersistentTopicInternalStats stats = 
topics.getInternalStats(persistentTopic);
diff --git 
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
 
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
index e3ee495..b993aa5 100644
--- 
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
+++ 
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
@@ -22,6 +22,7 @@ import com.github.dockerjava.api.DockerClient;
 import com.google.common.collect.ImmutableMap;
 
 import java.net.URL;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -55,7 +56,8 @@ public class TestS3Offload extends Arquillian {
     private static final Logger log = 
LoggerFactory.getLogger(TestS3Offload.class);
 
     private static final String CLUSTER_NAME = "test";
-    private static final int ENTRIES_PER_LEDGER = 10;
+    private static final int ENTRY_SIZE = 1024;
+    private static final int ENTRIES_PER_LEDGER = 1024;
 
     @ArquillianResource
     DockerClient docker;
@@ -92,6 +94,16 @@ public class TestS3Offload extends Arquillian {
 
     }
 
+    private static byte[] buildEntry(String pattern) {
+        byte[] entry = new byte[ENTRY_SIZE];
+        byte[] patternBytes = pattern.getBytes();
+
+        for (int i = 0; i < entry.length; i++) {
+            entry[i] = patternBytes[i % patternBytes.length];
+        }
+        return entry;
+    }
+
     @Test
     public void testPublishOffloadAndConsumeViaCLI() throws Exception {
         PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
@@ -114,15 +126,17 @@ public class TestS3Offload extends Arquillian {
         bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker, 
CLUSTER_NAME));
 
         long firstLedger = -1;
-        try(PulsarClient client = PulsarClient.create(serviceUrl);
-            Producer producer = client.createProducer(topic)) {
+        try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+            Producer producer = client.newProducer().topic(topic)
+                .blockIfQueueFull(true).enableBatching(false).create()) {
             client.subscribe(topic, "my-sub").close();
+
             // write enough to topic to make it roll
             int i = 0;
             for (; i < ENTRIES_PER_LEDGER*1.5; i++) {
-                producer.send(("offload-message"+i).getBytes());
+                producer.sendAsync(buildEntry("offload-message"+i));
             }
-            MessageId latestMessage = 
producer.send(("offload-message"+i).getBytes());
+            MessageId latestMessage = 
producer.send(buildEntry("offload-message"+i));
 
             // read managed ledger info, check ledgers exist
             ManagedLedgerFactory mlf = new ManagedLedgerFactoryImpl(bkConf);
@@ -134,7 +148,7 @@ public class TestS3Offload extends Arquillian {
             // first offload with a high threshold, nothing should offload
             String output = DockerUtils.runCommand(docker, broker,
                     "/pulsar/bin/pulsar-admin", "topics",
-                    "offload", "--size-threshold", 
String.valueOf(Long.MAX_VALUE),
+                    "offload", "--size-threshold", "100G",
                     topic);
             Assert.assertTrue(output.contains("Nothing to offload"));
 
@@ -145,7 +159,7 @@ public class TestS3Offload extends Arquillian {
             // offload with a low threshold
             output = DockerUtils.runCommand(docker, broker,
                     "/pulsar/bin/pulsar-admin", "topics",
-                    "offload", "--size-threshold", "0",
+                    "offload", "--size-threshold", "1M",
                     topic);
             Assert.assertTrue(output.contains("Offload triggered"));
 
@@ -167,12 +181,12 @@ public class TestS3Offload extends Arquillian {
         Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker, 
CLUSTER_NAME));
 
         log.info("Read back the data (which would be in that first ledger)");
-        try(PulsarClient client = PulsarClient.create(serviceUrl);
-            Consumer consumer = client.subscribe(topic, "my-sub")) {
+        try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+            Consumer consumer = 
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
             // read back from topic
             for (int i = 0; i < ENTRIES_PER_LEDGER*1.5; i++) {
-                Message m = consumer.receive();
-                Assert.assertEquals("offload-message"+i, new 
String(m.getData()));
+                Message m = consumer.receive(1, TimeUnit.MINUTES);
+                Assert.assertEquals(buildEntry("offload-message"+i), 
m.getData());
             }
         }
     }

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to