sijie closed pull request #2256: Namespace level policy for offload deletion lag
URL: https://github.com/apache/incubator-pulsar/pull/2256
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 553f53ab90..c4202c68db 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1590,7 +1590,7 @@ protected void internalSetOffloadThreshold(long 
newThreshold) {
             globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), 
nodeStat.getVersion());
             policiesCache().invalidate(path(POLICIES, 
namespaceName.toString()));
             log.info("[{}] Successfully updated offloadThreshold 
configuration: namespace={}, value={}",
-                     clientAppId(), namespaceName, 
policies.compaction_threshold);
+                     clientAppId(), namespaceName, policies.offload_threshold);
 
         } catch (KeeperException.NoNodeException e) {
             log.warn("[{}] Failed to update offloadThreshold configuration for 
namespace {}: does not exist",
@@ -1609,5 +1609,42 @@ protected void internalSetOffloadThreshold(long 
newThreshold) {
         }
     }
 
+    protected Long internalGetOffloadDeletionLag() {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        return getNamespacePolicies(namespaceName).offload_deletion_lag_ms;
+    }
+
+    protected void internalSetOffloadDeletionLag(Long newDeletionLagMs) {
+        validateSuperUserAccess();
+        validatePoliciesReadOnlyAccess();
+
+        try {
+            Stat nodeStat = new Stat();
+            final String path = path(POLICIES, namespaceName.toString());
+            byte[] content = globalZk().getData(path, null, nodeStat);
+            Policies policies = jsonMapper().readValue(content, 
Policies.class);
+            policies.offload_deletion_lag_ms = newDeletionLagMs;
+            globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), 
nodeStat.getVersion());
+            policiesCache().invalidate(path(POLICIES, 
namespaceName.toString()));
+            log.info("[{}] Successfully updated offloadDeletionLagMs 
configuration: namespace={}, value={}",
+                     clientAppId(), namespaceName, 
policies.offload_deletion_lag_ms);
+
+        } catch (KeeperException.NoNodeException e) {
+            log.warn("[{}] Failed to update offloadDeletionLagMs configuration 
for namespace {}: does not exist",
+                     clientAppId(), namespaceName);
+            throw new RestException(Status.NOT_FOUND, "Namespace does not 
exist");
+        } catch (KeeperException.BadVersionException e) {
+            log.warn("[{}] Failed to update offloadDeletionLagMs configuration 
for namespace {}: concurrent modification",
+                     clientAppId(), namespaceName);
+            throw new RestException(Status.CONFLICT, "Concurrent 
modification");
+        } catch (RestException pfe) {
+            throw pfe;
+        } catch (Exception e) {
+            log.error("[{}] Failed to update offloadDeletionLag configuration 
for namespace {}",
+                      clientAppId(), namespaceName, e);
+            throw new RestException(e);
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(NamespacesBase.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 0fcd4aa6f9..07e26ae0da 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -715,5 +715,49 @@ public void setOffloadThreshold(@PathParam("property") 
String property,
         internalSetOffloadThreshold(newThreshold);
     }
 
+    @GET
+    @Path("/{property}/{namespace}/offloadDeletionLagMs")
+    @ApiOperation(value = "Number of milliseconds to wait before deleting a 
ledger segment which has been offloaded"
+                          + " from the Pulsar cluster's local storage (i.e. 
BookKeeper)",
+                  notes = "A negative value denotes that deletion has been 
completely disabled."
+                          + " 'null' denotes that the topics in the namespace 
will fall back to the"
+                          + " broker default for deletion lag.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist") })
+    public Long getOffloadDeletionLag(@PathParam("property") String property,
+                                      @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(property, namespace);
+        return internalGetOffloadDeletionLag();
+    }
+
+    @PUT
+    @Path("/{property}/{namespace}/offloadDeletionLagMs")
+    @ApiOperation(value = "Set number of milliseconds to wait before deleting 
a ledger segment which has been offloaded"
+                          + " from the Pulsar cluster's local storage (i.e. 
BookKeeper)",
+                  notes = "A negative value disables the deletion completely.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent 
modification"),
+                            @ApiResponse(code = 412, message = 
"offloadDeletionLagMs value is not valid") })
+    public void setOffloadDeletionLag(@PathParam("property") String property,
+                                      @PathParam("namespace") String namespace,
+                                      long newDeletionLagMs) {
+        validateNamespaceName(property, namespace);
+        internalSetOffloadDeletionLag(newDeletionLagMs);
+    }
+
+    @DELETE
+    @Path("/{property}/{namespace}/offloadDeletionLagMs")
+    @ApiOperation(value = "Clear the namespace configured offload deletion 
lag. The topics in the namespace"
+                          + " will fallback to using the default configured 
deletion lag for the broker")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 404, message = "Namespace 
doesn't exist"),
+                            @ApiResponse(code = 409, message = "Concurrent 
modification") })
+    public void clearOffloadDeletionLag(@PathParam("property") String property,
+                                        @PathParam("namespace") String 
namespace) {
+        validateNamespaceName(property, namespace);
+        internalSetOffloadDeletionLag(null);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(Namespaces.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 182f3cc152..42893a7676 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -766,10 +766,14 @@ public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
             
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
 
             
managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader());
-            
managedLedgerConfig.setOffloadLedgerDeletionLag(serviceConfig.getManagedLedgerOffloadDeletionLagMs(),
-                                                            
TimeUnit.MILLISECONDS);
-
-            policies.ifPresent(p -> 
managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(p.offload_threshold));
+            policies.ifPresent(p -> {
+                    long lag = 
serviceConfig.getManagedLedgerOffloadDeletionLagMs();
+                    if (p.offload_deletion_lag_ms != null) {
+                        lag = p.offload_deletion_lag_ms;
+                    }
+                    managedLedgerConfig.setOffloadLedgerDeletionLag(lag, 
TimeUnit.MILLISECONDS);
+                    
managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(p.offload_threshold);
+                });
 
             future.complete(managedLedgerConfig);
         }, (exception) -> future.completeExceptionally(exception)));
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 5ee2f2227e..8a319c6969 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -21,6 +21,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
 import 
org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
@@ -1212,4 +1213,72 @@ void clearNamespaceBundleBacklogForSubscription(String 
namespace, String bundle,
      *             Unexpected error
      */
     void setOffloadThreshold(String namespace, long compactionThreshold) 
throws PulsarAdminException;
+
+    /**
+     * Get the offload deletion lag for a namespace, in milliseconds.
+     * The number of milliseconds to wait before deleting a ledger segment 
which has been offloaded from
+     * the Pulsar cluster's local storage (i.e. BookKeeper).
+     *
+     * If the offload deletion lag has not been set for the namespace, the 
method returns 'null'
+     * and the namespace will use the configured default of the pulsar broker.
+     *
+     * A negative value disables deletion of the local ledger completely, 
though it will still be deleted
+     * if it exceeds the topics retention policy, along with the offloaded 
copy.
+     *
+     * <p>
+     * Response example:
+     *
+     * <pre>
+     * <code>3600000</code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @return the offload deletion lag for the namespace in milliseconds, or 
null if not set
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    Long getOffloadDeleteLagMs(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set the offload deletion lag for a namespace.
+     *
+     * The offload deletion lag is the amount of time to wait after offloading 
a ledger segment to long term storage,
+     * before deleting its copy stored on the Pulsar cluster's local storage 
(i.e. BookKeeper).
+     *
+     * A negative value disables deletion of the local ledger completely, 
though it will still be deleted
+     * if it exceeds the topics retention policy, along with the offloaded 
copy.
+     *
+     * @param namespace
+     *            Namespace name
+     * @param lag the duration to wait before deleting the local copy
+     * @param unit the timeunit of the duration
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setOffloadDeleteLag(String namespace, long lag, TimeUnit unit) throws 
PulsarAdminException;
+
+    /**
+     * Clear the offload deletion lag for a namespace.
+     *
+     * The namespace will fall back to using the configured default of the 
pulsar broker.
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void clearOffloadDeleteLag(String namespace) throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 4ed0b8acf9..ba2950efe5 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
@@ -701,6 +702,40 @@ public void setOffloadThreshold(String namespace, long 
offloadThreshold) throws
         }
     }
 
+    @Override
+    public Long getOffloadDeleteLagMs(String namespace) throws 
PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+            return request(path).get(Long.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void setOffloadDeleteLag(String namespace, long lag, TimeUnit unit) 
throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+            request(path).put(Entity.entity(TimeUnit.MILLISECONDS.convert(lag, 
unit), MediaType.APPLICATION_JSON),
+                              ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void clearOffloadDeleteLag(String namespace) throws 
PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "offloadDeletionLagMs");
+            request(path).delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2Namespaces : 
adminNamespaces;
         WebTarget namespacePath = base.path(namespace.toString());
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 4f898a576d..341b24995f 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -29,6 +29,7 @@
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.pulsar.admin.cli.utils.IOUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -832,6 +833,53 @@ void run() throws PulsarAdminException {
         }
     }
 
+    @Parameters(commandDescription = "Get offloadDeletionLag, in minutes, for 
a namespace")
+    private class GetOffloadDeletionLag extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            Long lag = admin.namespaces().getOffloadDeleteLagMs(namespace);
+            if (lag != null) {
+                System.out.println(TimeUnit.MINUTES.convert(lag, 
TimeUnit.MILLISECONDS) + " minute(s)");
+            } else {
+                System.out.println("Unset for namespace. Defaulting to broker 
setting.");
+            }
+        }
+    }
+
+    @Parameters(commandDescription = "Set offloadDeletionLag for a namespace")
+    private class SetOffloadDeletionLag extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--lag", "-l" },
+                   description = "Duration to wait after offloading a ledger 
segment, before deleting the copy of that"
+                                  + " segment from cluster local storage. (eg: 
10m, 5h, 3d, 2w).",
+                   required = true)
+        private String lag = "-1";
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().setOffloadDeleteLag(namespace, 
validateTimeString(lag), TimeUnit.MINUTES);
+        }
+    }
+
+    @Parameters(commandDescription = "Clear offloadDeletionLag for a 
namespace")
+    private class ClearOffloadDeletionLag extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().clearOffloadDeleteLag(namespace);
+        }
+    }
+
     public CmdNamespaces(PulsarAdmin admin) {
         super("namespaces", admin);
         jcommander.addCommand("list", new GetNamespacesPerProperty());
@@ -897,5 +945,9 @@ public CmdNamespaces(PulsarAdmin admin) {
         jcommander.addCommand("get-offload-threshold", new 
GetOffloadThreshold());
         jcommander.addCommand("set-offload-threshold", new 
SetOffloadThreshold());
 
+        jcommander.addCommand("get-offload-deletion-lag", new 
GetOffloadDeletionLag());
+        jcommander.addCommand("set-offload-deletion-lag", new 
SetOffloadDeletionLag());
+        jcommander.addCommand("clear-offload-deletion-lag", new 
ClearOffloadDeletionLag());
+
     }
 }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 35a752b78f..9c7efeb6a9 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -59,6 +59,7 @@
 
     public long compaction_threshold = 0;
     public long offload_threshold = -1;
+    public Long offload_deletion_lag_ms = null;
 
     @Override
     public boolean equals(Object obj) {
@@ -80,7 +81,8 @@ public boolean equals(Object obj) {
                     && max_consumers_per_topic == other.max_consumers_per_topic
                     && max_consumers_per_subscription == 
other.max_consumers_per_subscription
                     && compaction_threshold == other.compaction_threshold
-                    && offload_threshold == other.offload_threshold;
+                    && offload_threshold == other.offload_threshold
+                    && offload_deletion_lag_ms == 
other.offload_deletion_lag_ms;
         }
 
         return false;
@@ -112,6 +114,7 @@ public String toString() {
                 .add("max_consumers_per_topic", max_consumers_per_topic)
                 .add("max_consumers_per_subscription", max_consumers_per_topic)
                 .add("compaction_threshold", compaction_threshold)
-                .add("offload_threshold", offload_threshold).toString();
+                .add("offload_threshold", offload_threshold)
+                .add("offload_deletion_lag_ms", 
offload_deletion_lag_ms).toString();
     }
 }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
index b595dd3729..dd1cb7ae84 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/offload/TestS3Offload.java
@@ -18,12 +18,15 @@
  */
 package org.apache.pulsar.tests.integration.offload;
 
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMap;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -32,6 +35,7 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
 
 import org.apache.pulsar.tests.integration.containers.BrokerContainer;
 import org.apache.pulsar.tests.integration.containers.S3Container;
@@ -241,4 +245,109 @@ public void 
testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String a
             
Assert.assertEquals(admin.namespaces().getOffloadThreshold(namespace), -1L);
         }
     }
+
+    private boolean ledgerOffloaded(List<LedgerInfo> ledgers, long ledgerId) {
+        return ledgers.stream().filter(l -> l.ledgerId == ledgerId)
+            .map(l -> l.offloaded).findFirst().get();
+    }
+
+    private long writeAndWaitForOffload(String serviceUrl, String adminUrl, 
String topic) throws Exception {
+        try(PulsarClient client = 
PulsarClient.builder().serviceUrl(serviceUrl).build();
+            Producer producer = client.newProducer().topic(topic)
+                .blockIfQueueFull(true).enableBatching(false).create();
+            PulsarAdmin admin = 
PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) {
+
+            List<LedgerInfo> ledgers = 
admin.topics().getInternalStats(topic).ledgers;
+            long currentLedger = ledgers.get(ledgers.size() - 1).ledgerId;
+
+            
client.newConsumer().topic(topic).subscriptionName("my-sub").subscribe().close();
+
+            // write enough to topic to make it roll twice
+            for (int i = 0; i < ENTRIES_PER_LEDGER * 2.5; i++) {
+                producer.sendAsync(buildEntry("offload-message" + i));
+            }
+            producer.send(buildEntry("final-offload-message"));
+
+            // wait up to 30 seconds for offload to occur
+            for (int i = 0;
+                 i < 300 && 
!ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers, currentLedger);
+                 i++) {
+                Thread.sleep(100);
+            }
+            
Assert.assertTrue(ledgerOffloaded(admin.topics().getInternalStats(topic).ledgers,
 currentLedger));
+
+            return currentLedger;
+        }
+    }
+
+    public boolean ledgerExistsInBookKeeper(long ledgerId) throws Exception {
+        ClientConfiguration bkConf = new ClientConfiguration();
+        bkConf.setZkServers(pulsarCluster.getZKConnString());
+        try (BookKeeperAdmin bk = new BookKeeperAdmin(bkConf)) {
+            try {
+                bk.openLedger(ledgerId).close();
+                return true;
+            } catch (BKException.BKNoSuchLedgerExistsException e) {
+                return false;
+            }
+        }
+    }
+
+    @Test(dataProvider =  "ServiceAndAdminUrls")
+    public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, 
String adminUrl) throws Exception {
+        final String tenant = "s3-offload-test-deletion-lag-" + randomName(4);
+        final String namespace = tenant + "/ns1";
+        final String topic = "persistent://" + namespace + "/topic1";
+
+        pulsarCluster.runAdminCommandOnAnyBroker("tenants",
+                "create", "--allowed-clusters", pulsarCluster.getClusterName(),
+                "--admin-roles", "offload-admin", tenant);
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                "create", "--clusters", pulsarCluster.getClusterName(), 
namespace);
+
+        // set threshold to offload runs immediately after role
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
+                                                 "set-offload-threshold", 
"--size", "0", namespace);
+
+        String output = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-offload-deletion-lag", 
namespace).getStdout();
+        Assert.assertTrue(output.contains("Unset for namespace"));
+
+        long offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, 
topic);
+        // give it up to 5 seconds to delete, it shouldn't
+        // so we wait this every time
+        Thread.sleep(5000);
+        Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", 
"set-offload-deletion-lag", namespace,
+                                                 "--lag", "0m");
+        output = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-offload-deletion-lag", 
namespace).getStdout();
+        Assert.assertTrue(output.contains("0 minute(s)"));
+
+        offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+        // wait up to 10 seconds for ledger to be deleted
+        for (int i = 0; i < 10 && ledgerExistsInBookKeeper(offloadedLedger); 
i++) {
+            writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+            Thread.sleep(1000);
+        }
+        Assert.assertFalse(ledgerExistsInBookKeeper(offloadedLedger));
+
+        pulsarCluster.runAdminCommandOnAnyBroker("namespaces", 
"clear-offload-deletion-lag", namespace);
+
+        Thread.sleep(5); // wait 5 seconds to allow broker to see update
+
+        output = pulsarCluster.runAdminCommandOnAnyBroker(
+                "namespaces", "get-offload-deletion-lag", 
namespace).getStdout();
+        Assert.assertTrue(output.contains("Unset for namespace"));
+
+        offloadedLedger = writeAndWaitForOffload(serviceUrl, adminUrl, topic);
+
+        // give it up to 5 seconds to delete, it shouldn't
+        // so we wait this every time
+        Thread.sleep(5000);
+        Assert.assertTrue(ledgerExistsInBookKeeper(offloadedLedger));
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to