sijie closed pull request #1639: Rest API for Ledger Offloading
URL: https://github.com/apache/incubator-pulsar/pull/1639
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 3d14b8b08f..6909a82a99 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -46,7 +46,9 @@
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -635,6 +637,10 @@ public ManagedLedgerFactory getManagedLedgerFactory() {
         return managedLedgerClientFactory.getManagedLedgerFactory();
     }
 
+    public LedgerOffloader getManagedLedgerOffloader() {
+        return NullLedgerOffloader.INSTANCE;
+    }
+
     public ZooKeeperCache getLocalZkCache() {
         return localZkCache;
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 21b14d075f..44f80cf166 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -54,6 +54,7 @@
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -74,6 +75,8 @@
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -85,7 +88,6 @@
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.compression.CompressionCodec;
 import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.apache.pulsar.common.naming.TopicDomain;
@@ -1090,12 +1092,30 @@ protected void internalTriggerCompaction(boolean 
authoritative) {
         }
     }
 
-    protected CompactionStatus internalCompactionStatus(boolean authoritative) 
{
+    protected LongRunningProcessStatus internalCompactionStatus(boolean 
authoritative) {
         validateAdminOperationOnTopic(authoritative);
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
         return topic.compactionStatus();
     }
 
+    protected void internalTriggerOffload(boolean authoritative, MessageIdImpl 
messageId) {
+        validateAdminOperationOnTopic(authoritative);
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        try {
+            topic.triggerOffload(messageId);
+        } catch (AlreadyRunningException e) {
+            throw new RestException(Status.CONFLICT, e.getMessage());
+        } catch (Exception e) {
+            throw new RestException(e);
+        }
+    }
+
+    protected OffloadProcessStatus internalOffloadStatus(boolean 
authoritative) {
+        validateAdminOperationOnTopic(authoritative);
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        return topic.offloadStatus();
+    }
+
     public static CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(PulsarService pulsar,
             String clientAppId, AuthenticationDataSource authenticationData, 
TopicName topicName) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new 
CompletableFuture<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 470ee45214..28e403b7d7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -38,9 +38,10 @@
 import javax.ws.rs.core.Response;
 
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -442,11 +443,43 @@ public void compact(@PathParam("property") String 
property, @PathParam("cluster"
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
                             @ApiResponse(code = 405, message = "Operation not 
allowed on persistent topic"),
                             @ApiResponse(code = 404, message = "Topic does not 
exist, or compaction hasn't run") })
-    public CompactionStatus compactionStatus(
+    public LongRunningProcessStatus compactionStatus(
             @PathParam("property") String property, @PathParam("cluster") 
String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(property, cluster, namespace, encodedTopic);
         return internalCompactionStatus(authoritative);
     }
+
+    @PUT
+    @Path("/{tenant}/{cluster}/{namespace}/{topic}/offload")
+    @ApiOperation(value = "Offload a prefix of a topic to long term storage")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 405, message = "Operation not 
allowed on persistent topic"),
+                            @ApiResponse(code = 404, message = "Topic does not 
exist"),
+                            @ApiResponse(code = 409, message = "Offload 
already running")})
+    public void triggerOffload(@PathParam("tenant") String tenant,
+                               @PathParam("cluster") String cluster,
+                               @PathParam("namespace") String namespace,
+                               @PathParam("topic") @Encoded String 
encodedTopic,
+                               @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative,
+                               MessageIdImpl messageId) {
+        validateTopicName(tenant, cluster, namespace, encodedTopic);
+        internalTriggerOffload(authoritative, messageId);
+    }
+
+    @GET
+    @Path("/{tenant}/{cluster}/{namespace}/{topic}/offload")
+    @ApiOperation(value = "Offload a prefix of a topic to long term storage")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 405, message = "Operation not 
allowed on persistent topic"),
+                            @ApiResponse(code = 404, message = "Topic does not 
exist")})
+    public OffloadProcessStatus offloadStatus(@PathParam("tenant") String 
tenant,
+                                              @PathParam("cluster") String 
cluster,
+                                              @PathParam("namespace") String 
namespace,
+                                              @PathParam("topic") @Encoded 
String encodedTopic,
+                                              @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
+        validateTopicName(tenant, cluster, namespace, encodedTopic);
+        return internalOffloadStatus(authoritative);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 5e1a98b1a7..879b730025 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -38,9 +38,10 @@
 import javax.ws.rs.core.Response;
 
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -428,11 +429,41 @@ public void compact(@PathParam("tenant") String tenant,
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
                             @ApiResponse(code = 405, message = "Operation not 
allowed on persistent topic"),
                             @ApiResponse(code = 404, message = "Topic does not 
exist, or compaction hasn't run") })
-    public CompactionStatus compactionStatus(
+    public LongRunningProcessStatus compactionStatus(
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace, @PathParam("topic") 
@Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         return internalCompactionStatus(authoritative);
     }
+
+    @PUT
+    @Path("/{tenant}/{namespace}/{topic}/offload")
+    @ApiOperation(value = "Offload a prefix of a topic to long term storage")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 405, message = "Operation not 
allowed on persistent topic"),
+                            @ApiResponse(code = 404, message = "Topic does not 
exist"),
+                            @ApiResponse(code = 409, message = "Offload 
already running")})
+    public void triggerOffload(@PathParam("tenant") String tenant,
+                               @PathParam("namespace") String namespace,
+                               @PathParam("topic") @Encoded String 
encodedTopic,
+                               @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative,
+                               MessageIdImpl messageId) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        internalTriggerOffload(authoritative, messageId);
+    }
+
+    @GET
+    @Path("/{tenant}/{namespace}/{topic}/offload")
+    @ApiOperation(value = "Offload a prefix of a topic to long term storage")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 405, message = "Operation not 
allowed on persistent topic"),
+                            @ApiResponse(code = 404, message = "Topic does not 
exist")})
+    public OffloadProcessStatus offloadStatus(@PathParam("tenant") String 
tenant,
+                                              @PathParam("namespace") String 
namespace,
+                                              @PathParam("topic") @Encoded 
String encodedTopic,
+                                              @QueryParam("authoritative") 
@DefaultValue("false") boolean authoritative) {
+        validateTopicName(tenant, namespace, encodedTopic);
+        return internalOffloadStatus(authoritative);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index ad709db65a..57a20dea59 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -714,6 +714,8 @@ public void openLedgerFailed(ManagedLedgerException 
exception, Object ctx) {
             
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(),
 TimeUnit.MINUTES);
             
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
 
+            
managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader());
+
             future.complete(managedLedgerConfig);
         }, (exception) -> future.completeExceptionally(exception)));
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 9c59bd9839..1bbbd82757 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -40,6 +40,7 @@
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
 import org.apache.bookkeeper.mledger.Entry;
@@ -81,13 +82,14 @@
 import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
 import org.apache.pulsar.broker.stats.NamespaceStats;
 import org.apache.pulsar.broker.stats.ReplicationMetrics;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.MessageImpl;
 import 
org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
@@ -173,6 +175,9 @@
     CompletableFuture<Long> currentCompaction = 
CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
     final CompactedTopic compactedTopic;
 
+    CompletableFuture<MessageIdImpl> currentOffload = 
CompletableFuture.completedFuture(
+            (MessageIdImpl)MessageId.earliest);
+
     // Whether messages published must be encrypted or not in this topic
     private volatile boolean isEncryptionRequired = false;
 
@@ -1669,23 +1674,61 @@ public synchronized void triggerCompaction()
         }
     }
 
-
-    public synchronized CompactionStatus compactionStatus() {
+    public synchronized LongRunningProcessStatus compactionStatus() {
         final CompletableFuture<Long> current;
         synchronized (this) {
             current = currentCompaction;
         }
         if (!current.isDone()) {
-            return CompactionStatus.forStatus(CompactionStatus.Status.RUNNING);
+            return 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
         } else {
             try {
                 if (current.join() == COMPACTION_NEVER_RUN) {
-                    return 
CompactionStatus.forStatus(CompactionStatus.Status.NOT_RUN);
+                    return 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
+                } else {
+                    return 
LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
+                }
+            } catch (CancellationException | CompletionException e) {
+                return LongRunningProcessStatus.forError(e.getMessage());
+            }
+        }
+    }
+
+    public synchronized void triggerOffload(MessageIdImpl messageId) throws 
AlreadyRunningException {
+        if (currentOffload.isDone()) {
+            CompletableFuture<MessageIdImpl> promise = currentOffload = new 
CompletableFuture<>();
+            getManagedLedger().asyncOffloadPrefix(
+                    PositionImpl.get(messageId.getLedgerId(), 
messageId.getEntryId()),
+                    new OffloadCallback() {
+                        @Override
+                        public void offloadComplete(Position pos, Object ctx) {
+                            PositionImpl impl = (PositionImpl)pos;
+
+                            promise.complete(new 
MessageIdImpl(impl.getLedgerId(), impl.getEntryId(), -1));
+                        }
+
+                        @Override
+                        public void offloadFailed(ManagedLedgerException 
exception, Object ctx) {
+                            promise.completeExceptionally(exception);
+                        }
+                    }, null);
+        } else {
+            throw new AlreadyRunningException("Offload already in progress");
+        }
+    }
+
+    public synchronized OffloadProcessStatus offloadStatus() {
+        if (!currentOffload.isDone()) {
+            return 
OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
+        } else {
+            try {
+                if (currentOffload.join() == MessageId.earliest) {
+                    return 
OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
                 } else {
-                    return 
CompactionStatus.forStatus(CompactionStatus.Status.SUCCESS);
+                    return 
OffloadProcessStatus.forSuccess(currentOffload.join());
                 }
             } catch (CancellationException | CompletionException e) {
-                return CompactionStatus.forError(e.getMessage());
+                return OffloadProcessStatus.forError(e.getMessage());
             }
         }
     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
new file mode 100644
index 0000000000..dc925a2756
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.Sets;
+
+import java.util.concurrent.CompletableFuture;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
+import org.apache.bookkeeper.mledger.LedgerOffloader;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AdminApiOffloadTest.class);
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        conf.setManagedLedgerMaxEntriesPerLedger(10);
+        conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+
+        super.internalSetup();
+
+        // Setup namespaces
+        admin.clusters().createCluster("test", new 
ClusterData("http://127.0.0.1"; + ":" + BROKER_WEBSERVICE_PORT));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", 
"role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("prop-xyz", tenantInfo);
+        admin.namespaces().createNamespace("prop-xyz/ns1", 
Sets.newHashSet("test"));
+    }
+
+    @AfterMethod
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    private void testOffload(String topicName, String mlName) throws Exception 
{
+        LedgerOffloader offloader = mock(LedgerOffloader.class);
+        doReturn(offloader).when(pulsar).getManagedLedgerOffloader();
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        doReturn(promise).when(offloader).offload(anyObject(), anyObject(), 
anyObject());
+
+        MessageId currentId = MessageId.latest;
+        try (Producer p = 
pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) {
+            for (int i = 0; i < 15; i++) {
+                currentId = p.send("Foobar".getBytes());
+            }
+        }
+
+        ManagedLedgerInfo info = 
pulsar.getManagedLedgerFactory().getManagedLedgerInfo(mlName);
+        Assert.assertEquals(info.ledgers.size(), 2);
+
+        
Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status,
+                            LongRunningProcessStatus.Status.NOT_RUN);
+
+        admin.persistentTopics().triggerOffload(topicName, currentId);
+
+        
Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status,
+                            LongRunningProcessStatus.Status.RUNNING);
+
+        try {
+            admin.persistentTopics().triggerOffload(topicName, currentId);
+            Assert.fail("Should have failed");
+        } catch (ConflictException e) {
+            // expected
+        }
+
+        // fail first time
+        promise.completeExceptionally(new Exception("Some random failure"));
+
+        
Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status,
+                            LongRunningProcessStatus.Status.ERROR);
+        
Assert.assertTrue(admin.persistentTopics().offloadStatus(topicName).lastError.contains("Some
 random failure"));
+
+        // Try again
+        doReturn(CompletableFuture.completedFuture(null))
+            .when(offloader).offload(anyObject(), anyObject(), anyObject());
+
+        admin.persistentTopics().triggerOffload(topicName, currentId);
+
+        
Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status,
+                            LongRunningProcessStatus.Status.SUCCESS);
+        MessageIdImpl firstUnoffloaded = 
admin.persistentTopics().offloadStatus(topicName).firstUnoffloadedMessage;
+        // First unoffloaded is the first entry of current ledger
+        Assert.assertEquals(firstUnoffloaded.getLedgerId(), 
info.ledgers.get(1).ledgerId);
+        Assert.assertEquals(firstUnoffloaded.getEntryId(), 0);
+
+        verify(offloader, times(2)).offload(anyObject(), anyObject(), 
anyObject());
+    }
+
+
+    @Test
+    public void testOffloadV2() throws Exception {
+        String topicName = "persistent://prop-xyz/ns1/topic1";
+        String mlName = "prop-xyz/ns1/persistent/topic1";
+        testOffload(topicName, mlName);
+    }
+
+    @Test
+    public void testOffloadV1() throws Exception {
+        String topicName = "persistent://prop-xyz/test/ns1/topic2";
+        String mlName = "prop-xyz/test/ns1/persistent/topic2";
+        testOffload(topicName, mlName);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index cd00d1c5c1..c9d973dde6 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -63,6 +63,7 @@
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -79,7 +80,6 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -1959,7 +1959,8 @@ public void testCompactionStatus() throws Exception {
         pulsarClient.newProducer().topic(topicName).create().close();
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
-        assertEquals(admin.topics().compactionStatus(topicName).status, 
CompactionStatus.Status.NOT_RUN);
+        assertEquals(admin.topics().compactionStatus(topicName).status,
+                     LongRunningProcessStatus.Status.NOT_RUN);
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<Long>();
@@ -1967,18 +1968,21 @@ public void testCompactionStatus() throws Exception {
         doReturn(promise).when(compactor).compact(topicName);
         admin.topics().triggerCompaction(topicName);
 
-        assertEquals(admin.topics().compactionStatus(topicName).status, 
CompactionStatus.Status.RUNNING);
+        assertEquals(admin.topics().compactionStatus(topicName).status,
+                     LongRunningProcessStatus.Status.RUNNING);
 
         promise.complete(1L);
 
-        assertEquals(admin.topics().compactionStatus(topicName).status, 
CompactionStatus.Status.SUCCESS);
+        assertEquals(admin.topics().compactionStatus(topicName).status,
+                     LongRunningProcessStatus.Status.SUCCESS);
 
         CompletableFuture<Long> errorPromise = new CompletableFuture<Long>();
         doReturn(errorPromise).when(compactor).compact(topicName);
         admin.topics().triggerCompaction(topicName);
         errorPromise.completeExceptionally(new Exception("Failed at 
something"));
 
-        assertEquals(admin.topics().compactionStatus(topicName).status, 
CompactionStatus.Status.ERROR);
+        assertEquals(admin.topics().compactionStatus(topicName).status,
+                     LongRunningProcessStatus.Status.ERROR);
         
assertTrue(admin.topics().compactionStatus(topicName).lastError.contains("Failed
 at something"));
     }
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 7a514cddc5..d2fb996a19 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -54,6 +54,7 @@
 import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
@@ -70,7 +71,6 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -1977,7 +1977,7 @@ public void testCompactionStatus() throws Exception {
         assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
 
         assertEquals(admin.topics().compactionStatus(topicName).status,
-                     CompactionStatus.Status.NOT_RUN);
+                     LongRunningProcessStatus.Status.NOT_RUN);
 
         // mock actual compaction, we don't need to really run it
         CompletableFuture<Long> promise = new CompletableFuture<Long>();
@@ -1986,12 +1986,12 @@ public void testCompactionStatus() throws Exception {
         admin.topics().triggerCompaction(topicName);
 
         assertEquals(admin.topics().compactionStatus(topicName).status,
-                     CompactionStatus.Status.RUNNING);
+                     LongRunningProcessStatus.Status.RUNNING);
 
         promise.complete(1L);
 
         assertEquals(admin.topics().compactionStatus(topicName).status,
-                     CompactionStatus.Status.SUCCESS);
+                     LongRunningProcessStatus.Status.SUCCESS);
 
         CompletableFuture<Long> errorPromise = new CompletableFuture<Long>();
         doReturn(errorPromise).when(compactor).compact(topicName);
@@ -1999,7 +1999,7 @@ public void testCompactionStatus() throws Exception {
         errorPromise.completeExceptionally(new Exception("Failed at 
something"));
 
         assertEquals(admin.topics().compactionStatus(topicName).status,
-                     CompactionStatus.Status.ERROR);
+                     LongRunningProcessStatus.Status.ERROR);
         assertTrue(admin.topics().compactionStatus(topicName)
                    .lastError.contains("Failed at something"));
     }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/LongRunningProcessStatus.java
similarity index 69%
rename from 
pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java
rename to 
pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/LongRunningProcessStatus.java
index 9020c21331..4a10289884 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/LongRunningProcessStatus.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.compaction;
+package org.apache.pulsar.client.admin;
 
 /**
- * Status of compaction for a topic.
+ * Status of long running process.
  */
-public class CompactionStatus {
+public class LongRunningProcessStatus {
     public enum Status {
         NOT_RUN,
         RUNNING,
@@ -32,21 +32,21 @@
     public Status status;
     public String lastError;
 
-    public CompactionStatus() {
+    public LongRunningProcessStatus() {
         this.status = Status.NOT_RUN;
         this.lastError = "";
     }
 
-    private CompactionStatus(Status status, String lastError) {
+    LongRunningProcessStatus(Status status, String lastError) {
         this.status = status;
         this.lastError = lastError;
     }
 
-    public static CompactionStatus forStatus(Status status) {
-        return new CompactionStatus(status, "");
+    public static LongRunningProcessStatus forStatus(Status status) {
+        return new LongRunningProcessStatus(status, "");
     }
 
-    public static CompactionStatus forError(String lastError) {
-        return new CompactionStatus(Status.ERROR, lastError);
+    public static LongRunningProcessStatus forError(String lastError) {
+        return new LongRunningProcessStatus(Status.ERROR, lastError);
     }
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java
new file mode 100644
index 0000000000..0644ee1842
--- /dev/null
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.admin;
+
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+
+/**
+ * Status of offload process.
+ */
+public class OffloadProcessStatus extends LongRunningProcessStatus {
+
+    public MessageIdImpl firstUnoffloadedMessage;
+
+    public OffloadProcessStatus() {
+        super(Status.NOT_RUN, "");
+        firstUnoffloadedMessage = (MessageIdImpl)MessageId.earliest;
+    }
+
+    private OffloadProcessStatus(Status status, String lastError,
+                                 MessageIdImpl firstUnoffloadedMessage) {
+        this.status = status;
+        this.lastError = lastError;
+        this.firstUnoffloadedMessage = firstUnoffloadedMessage;
+    }
+
+    public static OffloadProcessStatus forStatus(Status status) {
+        return new OffloadProcessStatus(status, "", 
(MessageIdImpl)MessageId.earliest);
+    }
+
+    public static OffloadProcessStatus forError(String lastError) {
+        return new OffloadProcessStatus(Status.ERROR, lastError,
+                                        (MessageIdImpl)MessageId.earliest);
+    }
+
+    public static OffloadProcessStatus forSuccess(MessageIdImpl messageId) {
+        return new OffloadProcessStatus(Status.SUCCESS, "", messageId);
+    }
+}
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 2d0d5e5dbc..aa825e8a6a 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -30,7 +30,6 @@
 import 
org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
@@ -936,5 +935,21 @@ void createSubscription(String topic, String 
subscriptionName, MessageId message
      *
      * @param topic The topic whose compaction status we wish to check
      */
-    CompactionStatus compactionStatus(String topic) throws 
PulsarAdminException;
+    LongRunningProcessStatus compactionStatus(String topic) throws 
PulsarAdminException;
+
+    /**
+     * Trigger offloading messages in topic to longterm storage.
+     *
+     * @param topic the topic to offload
+     * @param messageId ID of maximum message which should be offloaded
+     */
+    void triggerOffload(String topic, MessageId messageId) throws 
PulsarAdminException;
+
+    /**
+     * Check the status of an ongoing offloading operation for a topic.
+     *
+     * @param topic the topic being offloaded
+     * @return the status of the offload operation
+     */
+    OffloadProcessStatus offloadStatus(String topic) throws 
PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 8749d6ae6f..3f5456687a 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -52,6 +52,8 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.admin.OffloadProcessStatus;
 import org.apache.pulsar.client.admin.PersistentTopics;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
@@ -66,7 +68,6 @@
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
-import org.apache.pulsar.common.compaction.CompactionStatus;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -780,12 +781,35 @@ public void triggerCompaction(String topic)
     }
 
     @Override
-    public CompactionStatus compactionStatus(String topic)
+    public LongRunningProcessStatus compactionStatus(String topic)
             throws PulsarAdminException {
         try {
             TopicName tn = validateTopic(topic);
             return request(topicPath(tn, "compaction"))
-                .get(CompactionStatus.class);
+                .get(LongRunningProcessStatus.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void triggerOffload(String topic, MessageId messageId) throws 
PulsarAdminException {
+        try {
+            TopicName tn = validateTopic(topic);
+            WebTarget path = topicPath(tn, "offload");
+            request(path).put(Entity.entity(messageId, 
MediaType.APPLICATION_JSON), MessageIdImpl.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public OffloadProcessStatus offloadStatus(String topic)
+            throws PulsarAdminException {
+        try {
+            TopicName tn = validateTopic(topic);
+            return request(topicPath(tn, "offload"))
+                .get(OffloadProcessStatus.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
index 5b652561a6..80372f7868 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java
@@ -23,6 +23,7 @@
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PersistentTopics;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -30,7 +31,7 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.compaction.CompactionStatus;
+
 
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
@@ -572,8 +573,8 @@ void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
 
             try {
-                CompactionStatus status = 
persistentTopics.compactionStatus(persistentTopic);
-                while (wait && status.status == 
CompactionStatus.Status.RUNNING) {
+                LongRunningProcessStatus status = 
persistentTopics.compactionStatus(persistentTopic);
+                while (wait && status.status == 
LongRunningProcessStatus.Status.RUNNING) {
                     Thread.sleep(1000);
                     status = 
persistentTopics.compactionStatus(persistentTopic);
                 }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 04d96902fa..0b8e6c2355 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -36,6 +36,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Topics;
@@ -43,7 +44,7 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.common.compaction.CompactionStatus;
+
 
 @Parameters(commandDescription = "Operations on persistent topics")
 public class CmdTopics extends CmdBase {
@@ -578,8 +579,8 @@ void run() throws PulsarAdminException {
             String persistentTopic = validatePersistentTopic(params);
 
             try {
-                CompactionStatus status = 
topics.compactionStatus(persistentTopic);
-                while (wait && status.status == 
CompactionStatus.Status.RUNNING) {
+                LongRunningProcessStatus status = 
topics.compactionStatus(persistentTopic);
+                while (wait && status.status == 
LongRunningProcessStatus.Status.RUNNING) {
                     Thread.sleep(1000);
                     status = topics.compactionStatus(persistentTopic);
                 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to