sijie closed pull request #1639: Rest API for Ledger Offloading URL: https://github.com/apache/incubator-pulsar/pull/1639
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 3d14b8b08f..6909a82a99 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -46,7 +46,9 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.pulsar.broker.admin.AdminResource; @@ -635,6 +637,10 @@ public ManagedLedgerFactory getManagedLedgerFactory() { return managedLedgerClientFactory.getManagedLedgerFactory(); } + public LedgerOffloader getManagedLedgerOffloader() { + return NullLedgerOffloader.INSTANCE; + } + public ZooKeeperCache getLocalZkCache() { return localZkCache; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 21b14d075f..44f80cf166 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -74,6 +75,8 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; @@ -85,7 +88,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; -import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.common.naming.TopicDomain; @@ -1090,12 +1092,30 @@ protected void internalTriggerCompaction(boolean authoritative) { } } - protected CompactionStatus internalCompactionStatus(boolean authoritative) { + protected LongRunningProcessStatus internalCompactionStatus(boolean authoritative) { validateAdminOperationOnTopic(authoritative); PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); return topic.compactionStatus(); } + protected void internalTriggerOffload(boolean authoritative, MessageIdImpl messageId) { + validateAdminOperationOnTopic(authoritative); + PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); + try { + topic.triggerOffload(messageId); + } catch (AlreadyRunningException e) { + throw new RestException(Status.CONFLICT, e.getMessage()); + } catch (Exception e) { + throw new RestException(e); + } + } + + protected OffloadProcessStatus internalOffloadStatus(boolean authoritative) { + validateAdminOperationOnTopic(authoritative); + PersistentTopic topic = (PersistentTopic) getTopicReference(topicName); + return topic.offloadStatus(); + } + public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(PulsarService pulsar, String clientAppId, AuthenticationDataSource authenticationData, TopicName topicName) { CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 470ee45214..28e403b7d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -38,9 +38,10 @@ import javax.ws.rs.core.Response; import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; @@ -442,11 +443,43 @@ public void compact(@PathParam("property") String property, @PathParam("cluster" @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), @ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run") }) - public CompactionStatus compactionStatus( + public LongRunningProcessStatus compactionStatus( @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(property, cluster, namespace, encodedTopic); return internalCompactionStatus(authoritative); } + + @PUT + @Path("/{tenant}/{cluster}/{namespace}/{topic}/offload") + @ApiOperation(value = "Offload a prefix of a topic to long term storage") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 409, message = "Offload already running")}) + public void triggerOffload(@PathParam("tenant") String tenant, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + MessageIdImpl messageId) { + validateTopicName(tenant, cluster, namespace, encodedTopic); + internalTriggerOffload(authoritative, messageId); + } + + @GET + @Path("/{tenant}/{cluster}/{namespace}/{topic}/offload") + @ApiOperation(value = "Offload a prefix of a topic to long term storage") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), + @ApiResponse(code = 404, message = "Topic does not exist")}) + public OffloadProcessStatus offloadStatus(@PathParam("tenant") String tenant, + @PathParam("cluster") String cluster, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, cluster, namespace, encodedTopic); + return internalOffloadStatus(authoritative); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 5e1a98b1a7..879b730025 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -38,9 +38,10 @@ import javax.ws.rs.core.Response; import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; @@ -428,11 +429,41 @@ public void compact(@PathParam("tenant") String tenant, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), @ApiResponse(code = 404, message = "Topic does not exist, or compaction hasn't run") }) - public CompactionStatus compactionStatus( + public LongRunningProcessStatus compactionStatus( @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); return internalCompactionStatus(authoritative); } + + @PUT + @Path("/{tenant}/{namespace}/{topic}/offload") + @ApiOperation(value = "Offload a prefix of a topic to long term storage") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 409, message = "Offload already running")}) + public void triggerOffload(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + MessageIdImpl messageId) { + validateTopicName(tenant, namespace, encodedTopic); + internalTriggerOffload(authoritative, messageId); + } + + @GET + @Path("/{tenant}/{namespace}/{topic}/offload") + @ApiOperation(value = "Offload a prefix of a topic to long term storage") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 405, message = "Operation not allowed on persistent topic"), + @ApiResponse(code = 404, message = "Topic does not exist")}) + public OffloadProcessStatus offloadStatus(@PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + return internalOffloadStatus(authoritative); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ad709db65a..57a20dea59 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -714,6 +714,8 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); + managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader()); + future.complete(managedLedgerConfig); }, (exception) -> future.completeExceptionally(exception))); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9c59bd9839..1bbbd82757 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.Entry; @@ -81,13 +82,14 @@ import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; import org.apache.pulsar.broker.stats.NamespaceStats; import org.apache.pulsar.broker.stats.ReplicationMetrics; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; -import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ConsumerStats; @@ -173,6 +175,9 @@ CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN); final CompactedTopic compactedTopic; + CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture( + (MessageIdImpl)MessageId.earliest); + // Whether messages published must be encrypted or not in this topic private volatile boolean isEncryptionRequired = false; @@ -1669,23 +1674,61 @@ public synchronized void triggerCompaction() } } - - public synchronized CompactionStatus compactionStatus() { + public synchronized LongRunningProcessStatus compactionStatus() { final CompletableFuture<Long> current; synchronized (this) { current = currentCompaction; } if (!current.isDone()) { - return CompactionStatus.forStatus(CompactionStatus.Status.RUNNING); + return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING); } else { try { if (current.join() == COMPACTION_NEVER_RUN) { - return CompactionStatus.forStatus(CompactionStatus.Status.NOT_RUN); + return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN); + } else { + return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS); + } + } catch (CancellationException | CompletionException e) { + return LongRunningProcessStatus.forError(e.getMessage()); + } + } + } + + public synchronized void triggerOffload(MessageIdImpl messageId) throws AlreadyRunningException { + if (currentOffload.isDone()) { + CompletableFuture<MessageIdImpl> promise = currentOffload = new CompletableFuture<>(); + getManagedLedger().asyncOffloadPrefix( + PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()), + new OffloadCallback() { + @Override + public void offloadComplete(Position pos, Object ctx) { + PositionImpl impl = (PositionImpl)pos; + + promise.complete(new MessageIdImpl(impl.getLedgerId(), impl.getEntryId(), -1)); + } + + @Override + public void offloadFailed(ManagedLedgerException exception, Object ctx) { + promise.completeExceptionally(exception); + } + }, null); + } else { + throw new AlreadyRunningException("Offload already in progress"); + } + } + + public synchronized OffloadProcessStatus offloadStatus() { + if (!currentOffload.isDone()) { + return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING); + } else { + try { + if (currentOffload.join() == MessageId.earliest) { + return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN); } else { - return CompactionStatus.forStatus(CompactionStatus.Status.SUCCESS); + return OffloadProcessStatus.forSuccess(currentOffload.join()); } } catch (CancellationException | CompletionException e) { - return CompactionStatus.forError(e.getMessage()); + return OffloadProcessStatus.forError(e.getMessage()); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java new file mode 100644 index 0000000000..dc925a2756 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiOffloadTest.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.admin; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import com.google.common.collect.Sets; + +import java.util.concurrent.CompletableFuture; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.bookkeeper.mledger.ManagedLedgerInfo; +import org.apache.bookkeeper.mledger.LedgerOffloader; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Slf4j +public class AdminApiOffloadTest extends MockedPulsarServiceBaseTest { + + private static final Logger LOG = LoggerFactory.getLogger(AdminApiOffloadTest.class); + + @BeforeMethod + @Override + public void setup() throws Exception { + conf.setManagedLedgerMaxEntriesPerLedger(10); + conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + + super.internalSetup(); + + // Setup namespaces + admin.clusters().createCluster("test", new ClusterData("http://127.0.0.1" + ":" + BROKER_WEBSERVICE_PORT)); + TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test")); + admin.tenants().createTenant("prop-xyz", tenantInfo); + admin.namespaces().createNamespace("prop-xyz/ns1", Sets.newHashSet("test")); + } + + @AfterMethod + @Override + public void cleanup() throws Exception { + super.internalCleanup(); + } + + private void testOffload(String topicName, String mlName) throws Exception { + LedgerOffloader offloader = mock(LedgerOffloader.class); + doReturn(offloader).when(pulsar).getManagedLedgerOffloader(); + + CompletableFuture<Void> promise = new CompletableFuture<>(); + doReturn(promise).when(offloader).offload(anyObject(), anyObject(), anyObject()); + + MessageId currentId = MessageId.latest; + try (Producer p = pulsarClient.newProducer().topic(topicName).enableBatching(false).create()) { + for (int i = 0; i < 15; i++) { + currentId = p.send("Foobar".getBytes()); + } + } + + ManagedLedgerInfo info = pulsar.getManagedLedgerFactory().getManagedLedgerInfo(mlName); + Assert.assertEquals(info.ledgers.size(), 2); + + Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status, + LongRunningProcessStatus.Status.NOT_RUN); + + admin.persistentTopics().triggerOffload(topicName, currentId); + + Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status, + LongRunningProcessStatus.Status.RUNNING); + + try { + admin.persistentTopics().triggerOffload(topicName, currentId); + Assert.fail("Should have failed"); + } catch (ConflictException e) { + // expected + } + + // fail first time + promise.completeExceptionally(new Exception("Some random failure")); + + Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status, + LongRunningProcessStatus.Status.ERROR); + Assert.assertTrue(admin.persistentTopics().offloadStatus(topicName).lastError.contains("Some random failure")); + + // Try again + doReturn(CompletableFuture.completedFuture(null)) + .when(offloader).offload(anyObject(), anyObject(), anyObject()); + + admin.persistentTopics().triggerOffload(topicName, currentId); + + Assert.assertEquals(admin.persistentTopics().offloadStatus(topicName).status, + LongRunningProcessStatus.Status.SUCCESS); + MessageIdImpl firstUnoffloaded = admin.persistentTopics().offloadStatus(topicName).firstUnoffloadedMessage; + // First unoffloaded is the first entry of current ledger + Assert.assertEquals(firstUnoffloaded.getLedgerId(), info.ledgers.get(1).ledgerId); + Assert.assertEquals(firstUnoffloaded.getEntryId(), 0); + + verify(offloader, times(2)).offload(anyObject(), anyObject(), anyObject()); + } + + + @Test + public void testOffloadV2() throws Exception { + String topicName = "persistent://prop-xyz/ns1/topic1"; + String mlName = "prop-xyz/ns1/persistent/topic1"; + testOffload(topicName, mlName); + } + + @Test + public void testOffloadV1() throws Exception { + String topicName = "persistent://prop-xyz/test/ns1/topic2"; + String mlName = "prop-xyz/test/ns1/persistent/topic2"; + testOffload(topicName, mlName); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index cd00d1c5c1..c9d973dde6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -63,6 +63,7 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; @@ -79,7 +80,6 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; @@ -1959,7 +1959,8 @@ public void testCompactionStatus() throws Exception { pulsarClient.newProducer().topic(topicName).create().close(); assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); - assertEquals(admin.topics().compactionStatus(topicName).status, CompactionStatus.Status.NOT_RUN); + assertEquals(admin.topics().compactionStatus(topicName).status, + LongRunningProcessStatus.Status.NOT_RUN); // mock actual compaction, we don't need to really run it CompletableFuture<Long> promise = new CompletableFuture<Long>(); @@ -1967,18 +1968,21 @@ public void testCompactionStatus() throws Exception { doReturn(promise).when(compactor).compact(topicName); admin.topics().triggerCompaction(topicName); - assertEquals(admin.topics().compactionStatus(topicName).status, CompactionStatus.Status.RUNNING); + assertEquals(admin.topics().compactionStatus(topicName).status, + LongRunningProcessStatus.Status.RUNNING); promise.complete(1L); - assertEquals(admin.topics().compactionStatus(topicName).status, CompactionStatus.Status.SUCCESS); + assertEquals(admin.topics().compactionStatus(topicName).status, + LongRunningProcessStatus.Status.SUCCESS); CompletableFuture<Long> errorPromise = new CompletableFuture<Long>(); doReturn(errorPromise).when(compactor).compact(topicName); admin.topics().triggerCompaction(topicName); errorPromise.completeExceptionally(new Exception("Failed at something")); - assertEquals(admin.topics().compactionStatus(topicName).status, CompactionStatus.Status.ERROR); + assertEquals(admin.topics().compactionStatus(topicName).status, + LongRunningProcessStatus.Status.ERROR); assertTrue(admin.topics().compactionStatus(topicName).lastError.contains("Failed at something")); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 7a514cddc5..d2fb996a19 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -54,6 +54,7 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; @@ -70,7 +71,6 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.lookup.data.LookupData; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; @@ -1977,7 +1977,7 @@ public void testCompactionStatus() throws Exception { assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); assertEquals(admin.topics().compactionStatus(topicName).status, - CompactionStatus.Status.NOT_RUN); + LongRunningProcessStatus.Status.NOT_RUN); // mock actual compaction, we don't need to really run it CompletableFuture<Long> promise = new CompletableFuture<Long>(); @@ -1986,12 +1986,12 @@ public void testCompactionStatus() throws Exception { admin.topics().triggerCompaction(topicName); assertEquals(admin.topics().compactionStatus(topicName).status, - CompactionStatus.Status.RUNNING); + LongRunningProcessStatus.Status.RUNNING); promise.complete(1L); assertEquals(admin.topics().compactionStatus(topicName).status, - CompactionStatus.Status.SUCCESS); + LongRunningProcessStatus.Status.SUCCESS); CompletableFuture<Long> errorPromise = new CompletableFuture<Long>(); doReturn(errorPromise).when(compactor).compact(topicName); @@ -1999,7 +1999,7 @@ public void testCompactionStatus() throws Exception { errorPromise.completeExceptionally(new Exception("Failed at something")); assertEquals(admin.topics().compactionStatus(topicName).status, - CompactionStatus.Status.ERROR); + LongRunningProcessStatus.Status.ERROR); assertTrue(admin.topics().compactionStatus(topicName) .lastError.contains("Failed at something")); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/LongRunningProcessStatus.java similarity index 69% rename from pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java rename to pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/LongRunningProcessStatus.java index 9020c21331..4a10289884 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/compaction/CompactionStatus.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/LongRunningProcessStatus.java @@ -16,12 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.common.compaction; +package org.apache.pulsar.client.admin; /** - * Status of compaction for a topic. + * Status of long running process. */ -public class CompactionStatus { +public class LongRunningProcessStatus { public enum Status { NOT_RUN, RUNNING, @@ -32,21 +32,21 @@ public Status status; public String lastError; - public CompactionStatus() { + public LongRunningProcessStatus() { this.status = Status.NOT_RUN; this.lastError = ""; } - private CompactionStatus(Status status, String lastError) { + LongRunningProcessStatus(Status status, String lastError) { this.status = status; this.lastError = lastError; } - public static CompactionStatus forStatus(Status status) { - return new CompactionStatus(status, ""); + public static LongRunningProcessStatus forStatus(Status status) { + return new LongRunningProcessStatus(status, ""); } - public static CompactionStatus forError(String lastError) { - return new CompactionStatus(Status.ERROR, lastError); + public static LongRunningProcessStatus forError(String lastError) { + return new LongRunningProcessStatus(Status.ERROR, lastError); } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java new file mode 100644 index 0000000000..0644ee1842 --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/OffloadProcessStatus.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin; + +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.impl.MessageIdImpl; + +/** + * Status of offload process. + */ +public class OffloadProcessStatus extends LongRunningProcessStatus { + + public MessageIdImpl firstUnoffloadedMessage; + + public OffloadProcessStatus() { + super(Status.NOT_RUN, ""); + firstUnoffloadedMessage = (MessageIdImpl)MessageId.earliest; + } + + private OffloadProcessStatus(Status status, String lastError, + MessageIdImpl firstUnoffloadedMessage) { + this.status = status; + this.lastError = lastError; + this.firstUnoffloadedMessage = firstUnoffloadedMessage; + } + + public static OffloadProcessStatus forStatus(Status status) { + return new OffloadProcessStatus(status, "", (MessageIdImpl)MessageId.earliest); + } + + public static OffloadProcessStatus forError(String lastError) { + return new OffloadProcessStatus(Status.ERROR, lastError, + (MessageIdImpl)MessageId.earliest); + } + + public static OffloadProcessStatus forSuccess(MessageIdImpl messageId) { + return new OffloadProcessStatus(Status.SUCCESS, "", messageId); + } +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java index 2d0d5e5dbc..aa825e8a6a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -30,7 +30,6 @@ import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; @@ -936,5 +935,21 @@ void createSubscription(String topic, String subscriptionName, MessageId message * * @param topic The topic whose compaction status we wish to check */ - CompactionStatus compactionStatus(String topic) throws PulsarAdminException; + LongRunningProcessStatus compactionStatus(String topic) throws PulsarAdminException; + + /** + * Trigger offloading messages in topic to longterm storage. + * + * @param topic the topic to offload + * @param messageId ID of maximum message which should be offloaded + */ + void triggerOffload(String topic, MessageId messageId) throws PulsarAdminException; + + /** + * Check the status of an ongoing offloading operation for a topic. + * + * @param topic the topic being offloaded + * @return the status of the offload operation + */ + OffloadProcessStatus offloadStatus(String topic) throws PulsarAdminException; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 8749d6ae6f..3f5456687a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -52,6 +52,8 @@ import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; +import org.apache.pulsar.client.admin.OffloadProcessStatus; import org.apache.pulsar.client.admin.PersistentTopics; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; @@ -66,7 +68,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata; -import org.apache.pulsar.common.compaction.CompactionStatus; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -780,12 +781,35 @@ public void triggerCompaction(String topic) } @Override - public CompactionStatus compactionStatus(String topic) + public LongRunningProcessStatus compactionStatus(String topic) throws PulsarAdminException { try { TopicName tn = validateTopic(topic); return request(topicPath(tn, "compaction")) - .get(CompactionStatus.class); + .get(LongRunningProcessStatus.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void triggerOffload(String topic, MessageId messageId) throws PulsarAdminException { + try { + TopicName tn = validateTopic(topic); + WebTarget path = topicPath(tn, "offload"); + request(path).put(Entity.entity(messageId, MediaType.APPLICATION_JSON), MessageIdImpl.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public OffloadProcessStatus offloadStatus(String topic) + throws PulsarAdminException { + try { + TopicName tn = validateTopic(topic); + return request(topicPath(tn, "offload")) + .get(OffloadProcessStatus.class); } catch (Exception e) { throw getApiException(e); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java index 5b652561a6..80372f7868 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdPersistentTopics.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PersistentTopics; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -30,7 +31,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.common.compaction.CompactionStatus; + import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; @@ -572,8 +573,8 @@ void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); try { - CompactionStatus status = persistentTopics.compactionStatus(persistentTopic); - while (wait && status.status == CompactionStatus.Status.RUNNING) { + LongRunningProcessStatus status = persistentTopics.compactionStatus(persistentTopic); + while (wait && status.status == LongRunningProcessStatus.Status.RUNNING) { Thread.sleep(1000); status = persistentTopics.compactionStatus(persistentTopic); } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 04d96902fa..0b8e6c2355 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -36,6 +36,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.Topics; @@ -43,7 +44,7 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.common.compaction.CompactionStatus; + @Parameters(commandDescription = "Operations on persistent topics") public class CmdTopics extends CmdBase { @@ -578,8 +579,8 @@ void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); try { - CompactionStatus status = topics.compactionStatus(persistentTopic); - while (wait && status.status == CompactionStatus.Status.RUNNING) { + LongRunningProcessStatus status = topics.compactionStatus(persistentTopic); + while (wait && status.status == LongRunningProcessStatus.Status.RUNNING) { Thread.sleep(1000); status = topics.compactionStatus(persistentTopic); } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services