merlimat closed pull request #1367: Rest endpoint for triggering compaction
URL: https://github.com/apache/incubator-pulsar/pull/1367
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 47753e6fc..0fd81a7fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 import com.google.common.collect.Lists;
@@ -42,6 +43,8 @@
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.pulsar.compaction.Compactor;
+import org.apache.pulsar.compaction.TwoPhaseCompactor;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
@@ -60,6 +63,8 @@
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
 import org.apache.pulsar.broker.web.WebService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.configuration.VipStatus;
 import org.apache.pulsar.common.naming.TopicName;
@@ -103,6 +108,8 @@
     private ZooKeeperCache localZkCache;
     private GlobalZooKeeperCache globalZkCache;
     private LocalZooKeeperConnectionService localZooKeeperConnectionProvider;
+    private Compactor compactor;
+
     private final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(20,
             new DefaultThreadFactory("pulsar"));
     private final ScheduledExecutorService cacheExecutor = 
Executors.newScheduledThreadPool(10,
@@ -110,11 +117,13 @@
     private final OrderedExecutor orderedExecutor = 
OrderedExecutor.newBuilder().numThreads(8).name("pulsar-ordered")
             .build();
     private final ScheduledExecutorService loadManagerExecutor;
+    private ScheduledExecutorService compactorExecutor;
     private ScheduledFuture<?> loadReportTask = null;
     private ScheduledFuture<?> loadSheddingTask = null;
     private ScheduledFuture<?> loadResourceQuotaTask = null;
     private final AtomicReference<LoadManager> loadManager = new 
AtomicReference<>();
     private PulsarAdmin adminClient = null;
+    private PulsarClient client = null;
     private ZooKeeperClientFactory zkClientFactory = null;
     private final String bindAddress;
     private final String advertisedAddress;
@@ -218,8 +227,17 @@ public void close() throws PulsarServerException {
                 adminClient = null;
             }
 
+            if (client != null) {
+                client.close();
+                client = null;
+            }
+
             nsservice = null;
 
+            if (compactorExecutor != null) {
+                compactorExecutor.shutdown();
+            }
+
             // executor is not initialized in mocks even when real close 
method is called
             // guard against null executors
             if (executor != null) {
@@ -632,6 +650,47 @@ public BookKeeperClientFactory 
getBookKeeperClientFactory() {
         return new BookKeeperClientFactoryImpl();
     }
 
+    protected synchronized ScheduledExecutorService getCompactorExecutor() {
+        if (this.compactorExecutor == null) {
+            compactorExecutor = Executors.newSingleThreadScheduledExecutor(new 
DefaultThreadFactory("compaction"));
+        }
+        return this.compactorExecutor;
+    }
+
+    public synchronized Compactor getCompactor() throws PulsarServerException {
+        if (this.compactor == null) {
+            try {
+                this.compactor = new TwoPhaseCompactor(this.getConfiguration(),
+                                                       getClient(), 
getBookKeeperClient(),
+                                                       getCompactorExecutor());
+            } catch (Exception e) {
+                throw new PulsarServerException(e);
+            }
+        }
+        return this.compactor;
+    }
+
+    public synchronized PulsarClient getClient() throws PulsarServerException {
+        if (this.client == null) {
+            try {
+                ClientBuilder builder = PulsarClient.builder()
+                    .serviceUrl(this.getConfiguration().isTlsEnabled()
+                                ? this.brokerServiceUrlTls : 
this.brokerServiceUrl)
+                    .enableTls(this.getConfiguration().isTlsEnabled())
+                    
.allowTlsInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection())
+                    
.tlsTrustCertsFilePath(this.getConfiguration().getTlsCertificateFilePath());
+                if 
(isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) {
+                    
builder.authentication(this.getConfiguration().getBrokerClientAuthenticationPlugin(),
+                                           
this.getConfiguration().getBrokerClientAuthenticationParameters());
+                }
+                this.client = builder.build();
+            } catch (Exception e) {
+                throw new PulsarServerException(e);
+            }
+        }
+        return this.client;
+    }
+
     public synchronized PulsarAdmin getAdminClient() throws 
PulsarServerException {
         if (this.adminClient == null) {
             try {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 87d1b3ad5..86e68583d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -52,6 +52,7 @@
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.admin.ZkAdminPaths;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
@@ -1072,6 +1073,19 @@ protected void internalExpireMessages(String subName, 
int expireTimeInSeconds, b
         }
     }
 
+    protected void internalTriggerCompaction(boolean authoritative) {
+        validateAdminOperationOnTopic(authoritative);
+
+        PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
+        try {
+            topic.triggerCompaction();
+        } catch (AlreadyRunningException e) {
+            throw new RestException(Status.CONFLICT, e.getMessage());
+        } catch (Exception e) {
+            throw new RestException(e);
+        }
+    }
+
     public static CompletableFuture<PartitionedTopicMetadata> 
getPartitionedTopicMetadata(PulsarService pulsar,
             String clientAppId, AuthenticationDataSource authenticationData, 
TopicName topicName) {
         CompletableFuture<PartitionedTopicMetadata> metadataFuture = new 
CompletableFuture<>();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 8e105424c..f28d229e0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -420,4 +420,19 @@ public MessageId terminate(@PathParam("property") String 
property, @PathParam("c
         validateTopicName(property, cluster, namespace, encodedTopic);
         return internalTerminate(authoritative);
     }
+
+    @PUT
+    @Path("/{property}/{cluster}/{namespace}/{topic}/compaction")
+    @ApiOperation(value = "Trigger a compaction operation on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 405, message = "Operation not 
allowed on persistent topic"),
+                            @ApiResponse(code = 404, message = "Topic does not 
exist"),
+                            @ApiResponse(code = 409, message = "Compaction 
already running")})
+   public void compact(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
+                       @PathParam("namespace") String namespace, 
@PathParam("topic") @Encoded String encodedTopic,
+                       @QueryParam("authoritative") @DefaultValue("false") 
boolean authoritative) {
+        validateTopicName(property, cluster, namespace, encodedTopic);
+        internalTriggerCompaction(authoritative);
+    }
+
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 0f89bfd82..3ffdde872 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -393,4 +393,18 @@ public MessageId terminate(@PathParam("property") String 
property, @PathParam("n
         validateTopicName(property, namespace, encodedTopic);
         return internalTerminate(authoritative);
     }
+
+    @PUT
+    @Path("/{property}/{namespace}/{topic}/compaction")
+    @ApiOperation(value = "Trigger a compaction operation on a topic.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have 
admin permission"),
+                            @ApiResponse(code = 405, message = "Operation not 
allowed on persistent topic"),
+                            @ApiResponse(code = 404, message = "Topic does not 
exist"),
+                            @ApiResponse(code = 409, message = "Compaction 
already running")})
+    public void compact(@PathParam("property") String property,
+                        @PathParam("namespace") String namespace, 
@PathParam("topic") @Encoded String encodedTopic,
+                        @QueryParam("authoritative") @DefaultValue("false") 
boolean authoritative) {
+        validateTopicName(property, namespace, encodedTopic);
+        internalTriggerCompaction(authoritative);
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 1e72bf6c3..31a4ac783 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -139,6 +139,12 @@ public TooManyRequestsException(String msg) {
         }
     }
 
+    public static class AlreadyRunningException extends BrokerServiceException 
{
+        public AlreadyRunningException(String msg) {
+            super(msg);
+        }
+    }
+
     public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
         if (t instanceof ServerMetadataException) {
             return PulsarApi.ServerError.MetadataError;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index e5b79bf7a..77180b32e 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -52,9 +52,11 @@
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import 
org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
 import 
org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
@@ -162,6 +164,9 @@
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
 
     private final MessageDeduplication messageDeduplication;
+
+    private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
+    CompletableFuture<Long> currentCompaction = 
CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
     final CompactedTopic compactedTopic;
 
     // Whether messages published must be encrypted or not in this topic
@@ -1608,6 +1613,15 @@ public Position getLastMessageId() {
         return ledger.getLastConfirmedEntry();
     }
 
+    public synchronized void triggerCompaction()
+            throws PulsarServerException, AlreadyRunningException {
+        if (currentCompaction.isDone()) {
+            currentCompaction = 
brokerService.pulsar().getCompactor().compact(topic);
+        } else {
+            throw new AlreadyRunningException("Compaction already in 
progress");
+        }
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PersistentTopic.class);
 
     @Override
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 426a04cca..aca965efe 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -18,10 +18,14 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -96,6 +100,7 @@
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.compaction.Compactor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -1889,4 +1894,38 @@ public void testTopicBundleRangeLookup() throws 
PulsarAdminException, PulsarServ
         assertEquals(bundleRange, 
pulsar.getNamespaceService().getBundle(TopicName.get(topicName)).getBundleRange());
     }
 
+    @Test
+    public void testTriggerCompaction() throws Exception {
+        String topicName = "persistent://prop-xyz/use/ns1/topic1";
+
+        // create a topic by creating a producer
+        pulsarClient.newProducer().topic(topicName).create().close();
+        assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
+
+        // mock actual compaction, we don't need to really run it
+        CompletableFuture<Long> promise = new CompletableFuture<Long>();
+        Compactor compactor = pulsar.getCompactor();
+        doReturn(promise).when(compactor).compact(topicName);
+        admin.persistentTopics().triggerCompaction(topicName);
+
+        // verify compact called once
+        verify(compactor).compact(topicName);
+        try {
+            admin.persistentTopics().triggerCompaction(topicName);
+
+            fail("Shouldn't be able to run while already running");
+        } catch (ConflictException e) {
+            // expected
+        }
+        // compact shouldn't have been called again
+        verify(compactor).compact(topicName);
+
+        // complete first compaction, and trigger again
+        promise.complete(1L);
+        admin.persistentTopics().triggerCompaction(topicName);
+
+        // verify compact was called again
+        verify(compactor, times(2)).compact(topicName);
+    }
+
 }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index ab450460d..d3c2ea14a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -45,6 +45,7 @@
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
 import org.apache.zookeeper.CreateMode;
@@ -190,6 +191,10 @@ protected PulsarService startBroker(ServiceConfiguration 
conf) throws Exception
         conf.setAuthorizationEnabled(true);
         pulsar.start();
         conf.setAuthorizationEnabled(isAuthorizationEnabled);
+
+        Compactor spiedCompactor = spy(pulsar.getCompactor());
+        doReturn(spiedCompactor).when(pulsar).getCompactor();
+
         return pulsar;
     }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 36b91a40b..f32c70b67 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -67,8 +67,6 @@ protected void setup() throws Exception {
         superUserRoles.add("superUser");
         conf.setSuperUserRoles(superUserRoles);
 
-        
conf.setBrokerClientAuthenticationPlugin(TestAuthenticationProvider.class.getName());
-
         Set<String> providers = new HashSet<>();
         providers.add(TestAuthenticationProvider.class.getName());
         conf.setAuthenticationProviders(providers);
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
index 95daa0aed..93df29038 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PersistentTopics.java
@@ -900,4 +900,13 @@ void createSubscription(String topic, String 
subscriptionName, MessageId message
      *            reset subscription to messageId (or previous nearest 
messageId if given messageId is not valid)
      */
     CompletableFuture<Void> resetCursorAsync(String topic, String subName, 
MessageId messageId);
+
+    /**
+     * Trigger compaction to run for a topic. A single topic can only have one 
instance of compaction
+     * running at any time. Any attempt to trigger another will be met with a 
ConflictException.
+     *
+     * @param topic
+     *            The topic on which to trigger compaction
+     */
+    void triggerCompaction(String topic) throws PulsarAdminException;
 }
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
index c7e35acfd..6193d0889 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PersistentTopicsImpl.java
@@ -711,6 +711,18 @@ public void failed(Throwable throwable) {
         return future;
     }
 
+    @Override
+    public void triggerCompaction(String topic)
+            throws PulsarAdminException {
+        try {
+            TopicName tn = validateTopic(topic);
+            request(topicPath(tn, "compaction"))
+                .put(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
     private WebTarget namespacePath(NamespaceName namespace, String... parts) {
         final WebTarget base = namespace.isV2() ? adminV2PersistentTopics : 
adminPersistentTopics;
         WebTarget namespacePath = base.path(namespace.toString());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to