sijie closed pull request #1962: Human readable sizes when triggering offload
URL: https://github.com/apache/incubator-pulsar/pull/1962
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index eb90342b40..a85df44b30 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -622,11 +622,13 @@ static MessageId
findFirstLedgerWithinThreshold(List<PersistentTopicInternalStat
long suffixSize = 0L;
ledgers = Lists.reverse(ledgers);
+ long previousLedger = ledgers.get(0).ledgerId;
for (PersistentTopicInternalStats.LedgerInfo l : ledgers) {
suffixSize += l.size;
- if (suffixSize >= sizeThreshold) {
- return new MessageIdImpl(l.ledgerId, 0L, -1);
+ if (suffixSize > sizeThreshold) {
+ return new MessageIdImpl(previousLedger, 0L, -1);
}
+ previousLedger = l.ledgerId;
}
return null;
}
@@ -634,15 +636,16 @@ static MessageId
findFirstLedgerWithinThreshold(List<PersistentTopicInternalStat
@Parameters(commandDescription = "Trigger offload of data from a topic to
long-term storage (e.g. Amazon S3)")
private class Offload extends CliCommand {
@Parameter(names = { "-s", "--size-threshold" },
- description = "Maximum amount of data to keep in BookKeeper
for the specified topic",
+ description = "Maximum amount of data to keep in BookKeeper
for the specified topic (e.g. 10M, 5G).",
required = true)
- private Long sizeThreshold;
+ private String sizeThresholdStr;
@Parameter(description = "persistent://tenant/namespace/topic",
required = true)
private java.util.List<String> params;
@Override
void run() throws PulsarAdminException {
+ long sizeThreshold = validateSizeString(sizeThresholdStr);
String persistentTopic = validatePersistentTopic(params);
PersistentTopicInternalStats stats =
topics.getInternalStats(persistentTopic);
diff --git
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
index e3ee495d69..b993aa5500 100644
---
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
+++
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
@@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import java.net.URL;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -55,7 +56,8 @@
private static final Logger log =
LoggerFactory.getLogger(TestS3Offload.class);
private static final String CLUSTER_NAME = "test";
- private static final int ENTRIES_PER_LEDGER = 10;
+ private static final int ENTRY_SIZE = 1024;
+ private static final int ENTRIES_PER_LEDGER = 1024;
@ArquillianResource
DockerClient docker;
@@ -92,6 +94,16 @@ public void teardownBrokers() throws Exception {
}
+ private static byte[] buildEntry(String pattern) {
+ byte[] entry = new byte[ENTRY_SIZE];
+ byte[] patternBytes = pattern.getBytes();
+
+ for (int i = 0; i < entry.length; i++) {
+ entry[i] = patternBytes[i % patternBytes.length];
+ }
+ return entry;
+ }
+
@Test
public void testPublishOffloadAndConsumeViaCLI() throws Exception {
PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
@@ -114,15 +126,17 @@ public void testPublishOffloadAndConsumeViaCLI() throws
Exception {
bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker,
CLUSTER_NAME));
long firstLedger = -1;
- try(PulsarClient client = PulsarClient.create(serviceUrl);
- Producer producer = client.createProducer(topic)) {
+ try(PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
+ Producer producer = client.newProducer().topic(topic)
+ .blockIfQueueFull(true).enableBatching(false).create()) {
client.subscribe(topic, "my-sub").close();
+
// write enough to topic to make it roll
int i = 0;
for (; i < ENTRIES_PER_LEDGER*1.5; i++) {
- producer.send(("offload-message"+i).getBytes());
+ producer.sendAsync(buildEntry("offload-message"+i));
}
- MessageId latestMessage =
producer.send(("offload-message"+i).getBytes());
+ MessageId latestMessage =
producer.send(buildEntry("offload-message"+i));
// read managed ledger info, check ledgers exist
ManagedLedgerFactory mlf = new ManagedLedgerFactoryImpl(bkConf);
@@ -134,7 +148,7 @@ public void testPublishOffloadAndConsumeViaCLI() throws
Exception {
// first offload with a high threshold, nothing should offload
String output = DockerUtils.runCommand(docker, broker,
"/pulsar/bin/pulsar-admin", "topics",
- "offload", "--size-threshold",
String.valueOf(Long.MAX_VALUE),
+ "offload", "--size-threshold", "100G",
topic);
Assert.assertTrue(output.contains("Nothing to offload"));
@@ -145,7 +159,7 @@ public void testPublishOffloadAndConsumeViaCLI() throws
Exception {
// offload with a low threshold
output = DockerUtils.runCommand(docker, broker,
"/pulsar/bin/pulsar-admin", "topics",
- "offload", "--size-threshold", "0",
+ "offload", "--size-threshold", "1M",
topic);
Assert.assertTrue(output.contains("Offload triggered"));
@@ -167,12 +181,12 @@ public void testPublishOffloadAndConsumeViaCLI() throws
Exception {
Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker,
CLUSTER_NAME));
log.info("Read back the data (which would be in that first ledger)");
- try(PulsarClient client = PulsarClient.create(serviceUrl);
- Consumer consumer = client.subscribe(topic, "my-sub")) {
+ try(PulsarClient client =
PulsarClient.builder().serviceUrl(serviceUrl).build();
+ Consumer consumer =
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
// read back from topic
for (int i = 0; i < ENTRIES_PER_LEDGER*1.5; i++) {
- Message m = consumer.receive();
- Assert.assertEquals("offload-message"+i, new
String(m.getData()));
+ Message m = consumer.receive(1, TimeUnit.MINUTES);
+ Assert.assertEquals(buildEntry("offload-message"+i),
m.getData());
}
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services