sijie closed pull request #1973: Offload threshold policy for namespaces
URL: https://github.com/apache/incubator-pulsar/pull/1973
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index bba3dc7ef2..b8a4c53a38 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1570,5 +1570,42 @@ protected void internalSetCompactionThreshold(long 
newThreshold) {
         }
     }
 
+    protected long internalGetOffloadThreshold() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        return getNamespacePolicies(namespaceName).offload_threshold;
+    }
+
+    protected void internalSetOffloadThreshold(long newThreshold) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, 
Policies.class);
+            policies.offload_threshold = newThreshold;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), 
nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, 
namespaceName.toString()));
+            log.info("[{}] Successfully updated offloadThreshold 
configuration: namespace={}, value={}",
+                     clientAppId(), namespaceName, 
policies.compaction_threshold);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update offloadThreshold configuration for 
namespace {}: does not exist",
+                     clientAppId(), namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update offloadThreshold configuration for 
namespace {}: concurrent modification",
+                     clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent 
modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update offloadThreshold configuration 
for namespace {}",
+                      clientAppId(), namespaceName, e);
+            throw new RestException(e);
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
index f8edcf126c..bbc4438626 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java
@@ -746,5 +746,36 @@ public void setCompactionThreshold(@PathParam("property") 
String property,
         internalSetCompactionThreshold(newThreshold);
     }
 
+    @GET
+    @Path("/{property}/{cluster}/{namespace}/offloadThreshold")
+    @ApiOperation(value = "Maximum number of bytes stored on the pulsar 
cluster for a topic,"
+                          + " before the broker will start offloading to 
longterm storage",
+                  notes = "A negative value disables automatic offloading")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist") })
+    public long getOffloadThreshold(@PathParam("property") String property,
+                                    @PathParam("cluster") String cluster,
+                                    @PathParam("namespace") String namespace) {
+        validateNamespaceName(property, cluster, namespace);
+        return internalGetOffloadThreshold();
+    }
+
+    @PUT
+    @Path("/{property}/{cluster}/{namespace}/offloadThreshold")
+    @ApiOperation(value = "Set maximum number of bytes stored on the pulsar 
cluster for a topic,"
+                          + " before the broker will start offloading to 
longterm storage",
+                  notes = "A negative value disables automatic offloading")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent 
modification"),
+                            @ApiResponse(code = 412, message = 
"offloadThreshold value is not valid") })
+    public void setOffloadThreshold(@PathParam("property") String property,
+                                    @PathParam("cluster") String cluster,
+                                    @PathParam("namespace") String namespace,
+                                    long newThreshold) {
+        validateNamespaceName(property, cluster, namespace);
+        internalSetOffloadThreshold(newThreshold);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index bd467ed764..0fcd4aa6f9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -686,5 +686,34 @@ public void setCompactionThreshold(@PathParam("property") 
String property,
         internalSetCompactionThreshold(newThreshold);
     }
 
+    @GET
+    @Path("/{property}/{namespace}/offloadThreshold")
+    @ApiOperation(value = "Maximum number of bytes stored on the pulsar 
cluster for a topic,"
+                          + " before the broker will start offloading to 
longterm storage",
+                  notes = "A negative value disables automatic offloading")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist") })
+    public long getOffloadThreshold(@PathParam("property") String property,
+                                       @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetOffloadThreshold();
+    }
+
+    @PUT
+    @Path("/{property}/{namespace}/offloadThreshold")
+    @ApiOperation(value = "Set maximum number of bytes stored on the pulsar 
cluster for a topic,"
+                          + " before the broker will start offloading to 
longterm storage",
+                  notes = "A negative value disables automatic offloading")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent 
modification"),
+                            @ApiResponse(code = 412, message = 
"offloadThreshold value is not valid") })
+    public void setOffloadThreshold(@PathParam("property") String property,
+                                    @PathParam("namespace") String namespace,
+                                    long newThreshold) {
+        validateNamespaceName(property, namespace);
+        internalSetOffloadThreshold(newThreshold);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 63d74dd736..5bdf8129a3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -746,6 +746,8 @@ public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
             
managedLedgerConfig.setOffloadLedgerDeletionLag(serviceConfig.getManagedLedgerOffloadDeletionLagMs(),
                                                             
TimeUnit.MILLISECONDS);
 
+            policies.ifPresent(p -> 
managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(p.offload_threshold));
+
             future.complete(managedLedgerConfig);
         }, (exception) -> future.completeExceptionally(exception)));
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4768fa8967..7db3e71181 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1405,6 +1405,7 @@ public PersistentTopicInternalStats getInternalStats() {
             info.ledgerId = li.getLedgerId();
             info.entries = li.getEntries();
             info.size = li.getSize();
+            info.offloaded = li.hasOffloadContext() && 
li.getOffloadContext().getComplete();
             stats.ledgers.add(info);
         });
 
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 03cb52d030..5ee2f2227e 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -1164,4 +1164,52 @@ void clearNamespaceBundleBacklogForSubscription(String 
namespace, String bundle,
      */
     void setCompactionThreshold(String namespace, long compactionThreshold) 
throws PulsarAdminException;
 
+    /**
+     * Get the offloadThreshold for a namespace. The maximum number of bytes 
stored on the pulsar cluster for topics
+     * in the namespace before data starts being offloaded to longterm storage.
+     *
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    long getOffloadThreshold(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set the offloadThreshold for a namespace. The maximum number of bytes 
stored on the pulsar cluster for topics
+     * in the namespace before data starts being offloaded to longterm storage.
+     *
+     * Negative values disabled automatic offloading. Setting a threshold of 0 
will offload data as soon as possible.
+     * <p>
+     * Request example:
+     *
+     * <pre>
+     * <code>10000000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param offloadThreshold
+     *            maximum number of bytes stored before offloading is triggered
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setOffloadThreshold(String namespace, long compactionThreshold) 
throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 6c79a06a0c..4ed0b8acf9 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -679,6 +679,28 @@ public void setCompactionThreshold(String namespace, long 
compactionThreshold) t
         }
     }
 
+    @Override
+    public long getOffloadThreshold(String namespace) throws 
PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadThreshold");
+            return request(path).get(Long.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setOffloadThreshold(String namespace, long offloadThreshold) 
throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadThreshold");
+            request(path).put(Entity.entity(offloadThreshold, 
MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Namespaces : 
adminNamespaces;
         WebTarget namespacePath = base.path(namespace.toString());
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 1d881ab359..0eaf81b184 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -796,6 +796,38 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Get offloadThreshold for a namespace")
+    private class GetOffloadThreshold extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            print(admin.namespaces().getOffloadThreshold(namespace));
+        }
+    }
+
+    @Parameters(commandDescription = "Set offloadThreshold for a namespace")
+    private class SetOffloadThreshold extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--size", "-s" },
+                   description = "Maximum number of bytes stored in the pulsar 
cluster for a topic before data will"
+                                 + " start being automatically offloaded to 
longterm storage (eg: 10M, 16G, 3T, 100)."
+                                 + " Negative values disable automatic 
offload."
+                                 + " 0 triggers offloading as soon as 
possible.",
+                   required = true)
+        private String threshold = "-1";
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setOffloadThreshold(namespace, 
validateSizeString(threshold));
+        }
+    }
+
     public CmdNamespaces(PulsarAdmin admin) {
         super("namespaces", admin);
         jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -857,5 +889,9 @@ public CmdNamespaces(PulsarAdmin admin) {
 
         jcommander.addCommand("get-compaction-threshold", new 
GetCompactionThreshold());
         jcommander.addCommand("set-compaction-threshold", new 
SetCompactionThreshold());
+
+        jcommander.addCommand("get-offload-threshold", new 
GetOffloadThreshold());
+        jcommander.addCommand("set-offload-threshold", new 
SetOffloadThreshold());
+
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
index 26545157bb..c0416f5322 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/PersistentTopicInternalStats.java
@@ -45,6 +45,7 @@
         public long ledgerId;
         public long entries;
         public long size;
+        public boolean offloaded;
     }
 
     public static class CursorStats {
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index a33119e6b0..35a752b78f 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -58,6 +58,7 @@
     public int max_consumers_per_subscription = 0;
 
     public long compaction_threshold = 0;
+    public long offload_threshold = -1;
 
     @Override
     public boolean equals(Object obj) {
@@ -78,7 +79,8 @@ public boolean equals(Object obj) {
                     && max_producers_per_topic == other.max_producers_per_topic
                     && max_consumers_per_topic == other.max_consumers_per_topic
                     && max_consumers_per_subscription == 
other.max_consumers_per_subscription
-                    && compaction_threshold == other.compaction_threshold;
+                    && compaction_threshold == other.compaction_threshold
+                    && offload_threshold == other.offload_threshold;
         }
 
         return false;
@@ -109,6 +111,7 @@ public String toString() {
                 .add("max_producers_per_topic", max_producers_per_topic)
                 .add("max_consumers_per_topic", max_consumers_per_topic)
                 .add("max_consumers_per_subscription", max_consumers_per_topic)
-                .add("compaction_threshold", compaction_threshold).toString();
+                .add("compaction_threshold", compaction_threshold)
+                .add("offload_threshold", offload_threshold).toString();
     }
 }
diff --git 
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
 
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
index b993aa5500..42489767ea 100644
--- 
a/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
+++ 
b/tests/integration/s3-offload/src/test/java/org/apache/pulsar/tests/integration/TestS3Offload.java
@@ -21,17 +21,11 @@
 import com.github.dockerjava.api.DockerClient;
 import com.google.common.collect.ImmutableMap;
 
-import java.net.URL;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 
-import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
-import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
-import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
-
-import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -91,7 +85,6 @@ public void configureAndStartBrokers() throws Exception {
     public void teardownBrokers() throws Exception {
         PulsarClusterUtils.stopAllProxies(docker, CLUSTER_NAME);
         Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker, 
CLUSTER_NAME));
-
     }
 
     private static byte[] buildEntry(String pattern) {
@@ -106,30 +99,30 @@ public void teardownBrokers() throws Exception {
 
     @Test
     public void testPublishOffloadAndConsumeViaCLI() throws Exception {
+        final String TENANT = "s3-offload-test-cli";
+        final String NAMESPACE = "s3-offload-test-cli/ns1";
+        final String TOPIC = "persistent://s3-offload-test-cli/ns1/topic1";
+
         PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
                 "/pulsar/bin/pulsar-admin", "tenants",
                 "create", "--allowed-clusters", CLUSTER_NAME,
-                "--admin-roles", "offload-admin", "s3-offload-test");
+                "--admin-roles", "offload-admin", TENANT);
         PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
                 "/pulsar/bin/pulsar-admin", "namespaces",
-                "create", "--clusters", CLUSTER_NAME, "s3-offload-test/ns1");
+                "create", "--clusters", CLUSTER_NAME, NAMESPACE);
 
-        String broker = PulsarClusterUtils.brokerSet(docker, 
CLUSTER_NAME).stream().findAny().get();
-        String brokerIp = DockerUtils.getContainerIP(docker, broker);
-        String proxyIp  = PulsarClusterUtils.proxySet(docker, CLUSTER_NAME)
+        String broker = PulsarClusterUtils.brokerSet(docker, 
CLUSTER_NAME).stream().findFirst().get();
+        String proxyIp = PulsarClusterUtils.proxySet(docker, CLUSTER_NAME)
             .stream().map((c) -> DockerUtils.getContainerIP(docker, 
c)).findFirst().get();
         String serviceUrl = "pulsar://" + proxyIp + ":6650";
-        String adminUrl = "http://"; + brokerIp + ":8080";
-        String topic = "persistent://s3-offload-test/ns1/topic1";
-
-        ClientConfiguration bkConf = new ClientConfiguration();
-        bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker, 
CLUSTER_NAME));
+        String adminUrl = "http://"; + proxyIp + ":8080";
 
         long firstLedger = -1;
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Producer producer = client.newProducer().topic(topic)
-                .blockIfQueueFull(true).enableBatching(false).create()) {
-            client.subscribe(topic, "my-sub").close();
+            Producer producer = client.newProducer().topic(TOPIC)
+                .blockIfQueueFull(true).enableBatching(false).create();
+            PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+            
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe().close();
 
             // write enough to topic to make it roll
             int i = 0;
@@ -139,40 +132,35 @@ public void testPublishOffloadAndConsumeViaCLI() throws 
Exception {
             MessageId latestMessage = 
producer.send(buildEntry("offload-message"+i));
 
             // read managed ledger info, check ledgers exist
-            ManagedLedgerFactory mlf = new ManagedLedgerFactoryImpl(bkConf);
-            ManagedLedgerInfo info = 
mlf.getManagedLedgerInfo("s3-offload-test/ns1/persistent/topic1");
-            Assert.assertEquals(info.ledgers.size(), 2);
-
-            firstLedger = info.ledgers.get(0).ledgerId;
+            firstLedger = 
admin.topics().getInternalStats(TOPIC).ledgers.get(0).ledgerId;
 
             // first offload with a high threshold, nothing should offload
             String output = DockerUtils.runCommand(docker, broker,
                     "/pulsar/bin/pulsar-admin", "topics",
-                    "offload", "--size-threshold", "100G",
-                    topic);
+                    "offload", "--size-threshold", "100G", TOPIC);
             Assert.assertTrue(output.contains("Nothing to offload"));
 
             output = DockerUtils.runCommand(docker, broker,
-                    "/pulsar/bin/pulsar-admin", "topics", "offload-status", 
topic);
+                    "/pulsar/bin/pulsar-admin", "topics", "offload-status", 
TOPIC);
             Assert.assertTrue(output.contains("Offload has not been run"));
 
             // offload with a low threshold
             output = DockerUtils.runCommand(docker, broker,
                     "/pulsar/bin/pulsar-admin", "topics",
-                    "offload", "--size-threshold", "1M",
-                    topic);
+                    "offload", "--size-threshold", "1M", TOPIC);
             Assert.assertTrue(output.contains("Offload triggered"));
 
             output = DockerUtils.runCommand(docker, broker,
-                    "/pulsar/bin/pulsar-admin", "topics", "offload-status", 
"-w", topic);
+                    "/pulsar/bin/pulsar-admin", "topics", "offload-status", 
"-w", TOPIC);
             Assert.assertTrue(output.contains("Offload was a success"));
         }
 
-        log.info("Kill ledger");
         // stop brokers to clear all caches, open handles, etc
         Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker, 
CLUSTER_NAME));
 
         // delete the first ledger, so that we cannot possibly read from it
+        ClientConfiguration bkConf = new ClientConfiguration();
+        bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker, 
CLUSTER_NAME));
         try (BookKeeper bk = new BookKeeper(bkConf)) {
             bk.deleteLedger(firstLedger);
         }
@@ -182,7 +170,7 @@ public void testPublishOffloadAndConsumeViaCLI() throws 
Exception {
 
         log.info("Read back the data (which would be in that first ledger)");
         try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
-            Consumer consumer = 
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe()) {
+            Consumer consumer = 
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe()) {
             // read back from topic
             for (int i = 0; i < ENTRIES_PER_LEDGER*1.5; i++) {
                 Message m = consumer.receive(1, TimeUnit.MINUTES);
@@ -190,4 +178,84 @@ public void testPublishOffloadAndConsumeViaCLI() throws 
Exception {
             }
         }
     }
+
+    @Test
+    public void testPublishOffloadAndConsumeViaThreshold() throws Exception {
+        final String TENANT = "s3-offload-test-threshold";
+        final String NAMESPACE = "s3-offload-test-threshold/ns1";
+        final String TOPIC = 
"persistent://s3-offload-test-threshold/ns1/topic1";
+
+        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
+                "/pulsar/bin/pulsar-admin", "tenants",
+                "create", "--allowed-clusters", CLUSTER_NAME,
+                "--admin-roles", "offload-admin", TENANT);
+        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
+                "/pulsar/bin/pulsar-admin", "namespaces",
+                "create", "--clusters", CLUSTER_NAME, NAMESPACE);
+        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
+                "/pulsar/bin/pulsar-admin", "namespaces",
+                "set-offload-threshold", "--size", "1M", NAMESPACE);
+
+        String proxyIp  = PulsarClusterUtils.proxySet(docker, CLUSTER_NAME)
+            .stream().map((c) -> DockerUtils.getContainerIP(docker, 
c)).findFirst().get();
+        String serviceUrl = "pulsar://" + proxyIp + ":6650";
+        String adminUrl = "http://"; + proxyIp + ":8080";
+
+        long firstLedger = 0;
+        try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+            Producer producer = client.newProducer().topic(TOPIC)
+                .blockIfQueueFull(true).enableBatching(false).create();
+            PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+
+            
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe().close();
+
+            // write enough to topic to make it roll twice
+            for (int i = 0; i < ENTRIES_PER_LEDGER*2.5; i++) {
+                producer.sendAsync(buildEntry("offload-message"+i));
+            }
+            producer.send(buildEntry("final-offload-message"));
+
+            firstLedger = 
admin.topics().getInternalStats(TOPIC).ledgers.get(0).ledgerId;
+
+            // wait up to 30 seconds for offload to occur
+            for (int i = 0; i < 300 && 
!admin.topics().getInternalStats(TOPIC).ledgers.get(0).offloaded; i++) {
+                Thread.sleep(100);
+            }
+            
Assert.assertTrue(admin.topics().getInternalStats(TOPIC).ledgers.get(0).offloaded);
+        }
+
+        // stop brokers to clear all caches, open handles, etc
+        Assert.assertTrue(PulsarClusterUtils.stopAllBrokers(docker, 
CLUSTER_NAME));
+
+        // delete the first ledger, so that we cannot possibly read from it
+        ClientConfiguration bkConf = new ClientConfiguration();
+        bkConf.setZkServers(PulsarClusterUtils.zookeeperConnectString(docker, 
CLUSTER_NAME));
+        try (BookKeeper bk = new BookKeeper(bkConf)) {
+            bk.deleteLedger(firstLedger);
+        }
+
+        // start all brokers again
+        Assert.assertTrue(PulsarClusterUtils.startAllBrokers(docker, 
CLUSTER_NAME));
+
+        log.info("Read back the data (which would be in that first ledger)");
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+             Consumer consumer = 
client.newConsumer().topic(TOPIC).subscriptionName("my-sub").subscribe()) {
+            // read back from topic
+            for (int i = 0; i < ENTRIES_PER_LEDGER*2.5; i++) {
+                Message m = consumer.receive(1, TimeUnit.MINUTES);
+                Assert.assertEquals(buildEntry("offload-message"+i), 
m.getData());
+            }
+        }
+
+        // try disabling
+        PulsarClusterUtils.runOnAnyBroker(docker, CLUSTER_NAME,
+                "/pulsar/bin/pulsar-admin", "namespaces",
+                "set-offload-threshold", "--size", "-1", NAMESPACE);
+
+        // hard to validate that it has been disabled as we'd be waiting for
+        // something _not_ to happen (i.e. waiting for ages), so just check
+        try (PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+            
Assert.assertEquals(admin.namespaces().getOffloadThreshold(NAMESPACE), -1L);
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to