This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e104085 Human readable sizes when triggering offload (#1962)
e104085 is described below
commit e10408504d8c7c77e3c20849e90416d0b14fb80a
Author: Ivan Kelly <[email protected]>
AuthorDate: Thu Jun 14 18:21:05 2018 +0200
Human readable sizes when triggering offload (#1962)
When triggering offload from the CLI, allow the user to specify human
readable sizes, like 10M, 100G etc.
This patch also fixes a bug where the size wasn't being correctly used
to find the message ID to offload at.
Master Issue: #1511
---
.../org/apache/pulsar/admin/cli/CmdTopics.java | 11 ++++---
.../pulsar/tests/integration/TestS3Offload.java | 36 +++++++++++++++-------
2 files changed, 32 insertions(+), 15 deletions(-)
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index eb90342..a85df44 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -622,11 +622,13 @@ public class CmdTopics extends CmdBase {
long suffixSize = 0L;
ledgers = Lists.reverse(ledgers);
+ long previousLedger = ledgers.get(0).ledgerId;
for (PersistentTopicInternalStats.LedgerInfo l : ledgers) {
suffixSize += l.size;
- if (suffixSize >= sizeThreshold) {
- return new MessageIdImpl(l.ledgerId, 0L, -1);
+ if (suffixSize > sizeThreshold) {
+ return new MessageIdImpl(previousLedger, 0L, -1);
}
+ previousLedger = l.ledgerId;
}
return null;
}
@@ -634,15 +636,16 @@ public class CmdTopics extends CmdBase {
@Parameters(commandDescription = "Trigger offload of data from a topic to
long-term storage (e.g. Amazon S3)")
private class Offload extends CliCommand {
@Parameter(names = { "-s", "--size-threshold" },
- description = "Maximum amount of data to keep in BookKeeper
for the specified topic",
+ description = "Maximum amount of data to keep in BookKeeper
for the specified topic (e.g. 10M, 5G).",
required = true)
- private Long sizeThreshold;
+ private String sizeThresholdStr;
@Parameter(description = "persistent://tenant/namespace/topic",
required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
+ long sizeThreshold = validateSizeString(sizeThresholdStr);
String persistentTopic = validatePersistentTopic(params);
PersistentTopicInternalStats stats =
topics.getInternalStats(persistentTopic);
diff --git
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
index e3ee495..b993aa5 100644
---
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
+++
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
@@ -22,6 +22,7 @@ import com.github.dockerjava.api.DockerClient;
import com.google.common.collect.ImmutableMap;
import java.net.URL;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -55,7 +56,8 @@ public class TestS3Offload extends Arquillian {
private static final Logger log =
LoggerFactory.getLogger(TestS3Offload.class);
private static final String CLUSTER_NAME = "test";
- private static final int ENTRIES_PER_LEDGER = 10;
+ private static final int ENTRY_SIZE = 1024;
+ private static final int ENTRIES_PER_LEDGER = 1024;
@ArquillianResource
DockerClient docker;
@@ -92,6 +94,16 @@ public class TestS3Offload extends Arquillian {
}
+ private static byte[] buildEntry(String pattern) {
+ byte[] entry = new byte[ENTRY_SIZE];
+ byte[] patternBytes = pattern.getBytes();
+
+ for (int i = 0; i < entry.length; i++) {
+ entry[i] = patternBytes[i % patternBytes.length];
+ }
+ return entry;
+ }
+
@Test
public void testPublishOffloadAndConsumeViaCLI() throws Exception {
PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
@@ -114,15 +126,17 @@ public class TestS3Offload extends Arquillian {
bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker,
CLUSTER_NAME));
long firstLedger = -1;
- try(PulsarClient client = PulsarClient.create(serviceUrl);
- Producer producer = client.createProducer(topic)) {
+ try(PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
+ Producer producer = client.newProducer().topic(topic)
+ .blockIfQueueFull(true).enableBatching(false).create()) {
client.subscribe(topic, "my-sub").close();
+
// write enough to topic to make it roll
int i = 0;
for (; i < ENTRIES_PER_LEDGER*1.5; i++) {
- producer.send(("offload-message"+i).getBytes());
+ producer.sendAsync(buildEntry("offload-message"+i));
}
- MessageId latestMessage =
producer.send(("offload-message"+i).getBytes());
+ MessageId latestMessage =
producer.send(buildEntry("offload-message"+i));
// read managed ledger info, check ledgers exist
ManagedLedgerFactory mlf = new ManagedLedgerFactoryImpl(bkConf);
@@ -134,7 +148,7 @@ public class TestS3Offload extends Arquillian {
// first offload with a high threshold, nothing should offload
String output = DockerUtils.runCommand(docker, broker,
"/pulsar/bin/pulsar-admin", "topics",
- "offload", "--size-threshold",
String.valueOf(Long.MAX_VALUE),
+ "offload", "--size-threshold", "100G",
topic);
Assert.assertTrue(output.contains("Nothing to offload"));
@@ -145,7 +159,7 @@ public class TestS3Offload extends Arquillian {
// offload with a low threshold
output = DockerUtils.runCommand(docker, broker,
"/pulsar/bin/pulsar-admin", "topics",
- "offload", "--size-threshold", "0",
+ "offload", "--size-threshold", "1M",
topic);
Assert.assertTrue(output.contains("Offload triggered"));
@@ -167,12 +181,12 @@ public class TestS3Offload extends Arquillian {
Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker,
CLUSTER_NAME));
log.info("Read back the data (which would be in that first ledger)");
- try(PulsarClient client = PulsarClient.create(serviceUrl);
- Consumer consumer = client.subscribe(topic, "my-sub")) {
+ try(PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
+ Consumer consumer =
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
// read back from topic
for (int i = 0; i < ENTRIES_PER_LEDGER*1.5; i++) {
- Message m = consumer.receive();
- Assert.assertEquals("offload-message"+i, new
String(m.getData()));
+ Message m = consumer.receive(1, TimeUnit.MINUTES);
+ Assert.assertEquals(buildEntry("offload-message"+i),
m.getData());
}
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].