This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9634eb7  [Issue 5903] Support `compact` all partitions of a 
partitioned topic (#6537)
9634eb7 is described below

commit 9634eb79773adc399397aa3a73e23118281a1737
Author: Fangbin Sun <[email protected]>
AuthorDate: Thu Mar 19 13:45:47 2020 +0800

    [Issue 5903] Support `compact` all partitions of a partitioned topic (#6537)
    
    * Resolve conflict.
    
    * Add a unit test to cover the compact logic.
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 78 +++++++++++++++++++++-
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 13 +++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 11 ++-
 .../apache/pulsar/broker/admin/AdminApiTest.java   | 43 ++++++++++++
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 29 ++++++++
 .../org/apache/pulsar/client/admin/Topics.java     |  8 +++
 .../pulsar/client/admin/internal/TopicsImpl.java   | 23 +++++--
 7 files changed, 192 insertions(+), 13 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c5dabb4..bb5e579 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1976,7 +1976,82 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    protected void internalTriggerCompaction(boolean authoritative) {
+    protected void internalTriggerCompaction(AsyncResponse asyncResponse, 
boolean authoritative) {
+        log.info("[{}] Trigger compaction on topic {}", clientAppId(), 
topicName);
+        try {
+            if (topicName.isGlobal()) {
+                validateGlobalNamespaceOwnership(namespaceName);
+            }
+        } catch (Exception e) {
+            log.error("[{}] Failed to trigger compaction on topic {}", 
clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+
+        // If the topic name is a partition name, no need to get partition 
topic metadata again
+        if (topicName.isPartitioned()) {
+            try {
+                internalTriggerCompactionNonPartitionedTopic(authoritative);
+            } catch (Exception e) {
+                log.error("[{}] Failed to trigger compaction on topic {}", 
clientAppId(), topicName, e);
+                resumeAsyncResponseExceptionally(asyncResponse, e);
+                return;
+            }
+            asyncResponse.resume(Response.noContent().build());
+        } else {
+            getPartitionedTopicMetadataAsync(topicName, authoritative, 
false).thenAccept(partitionMetadata -> {
+                final int numPartitions = partitionMetadata.partitions;
+                if (numPartitions > 0) {
+                    final List<CompletableFuture<Void>> futures = 
Lists.newArrayList();
+
+                    for (int i = 0; i < numPartitions; i++) {
+                        TopicName topicNamePartition = 
topicName.getPartition(i);
+                        try {
+                            
futures.add(pulsar().getAdminClient().topics().triggerCompactionAsync(topicNamePartition.toString()));
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to trigger compaction on 
topic {}", clientAppId(), topicNamePartition, e);
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
+                    }
+
+                    FutureUtil.waitForAll(futures).handle((result, exception) 
-> {
+                        if (exception != null) {
+                            Throwable th = exception.getCause();
+                            if (th instanceof NotFoundException) {
+                                asyncResponse.resume(new 
RestException(Status.NOT_FOUND, th.getMessage()));
+                                return null;
+                            } else if (th instanceof WebApplicationException) {
+                                asyncResponse.resume(th);
+                                return null;
+                            } else {
+                                log.error("[{}] Failed to trigger compaction 
on topic {}", clientAppId(), topicName, exception);
+                                asyncResponse.resume(new 
RestException(exception));
+                                return null;
+                            }
+                        }
+                        asyncResponse.resume(Response.noContent().build());
+                        return null;
+                    });
+                } else {
+                    try {
+                        
internalTriggerCompactionNonPartitionedTopic(authoritative);
+                    } catch (Exception e) {
+                        log.error("[{}] Failed to trigger compaction on topic 
{}", clientAppId(), topicName, e);
+                        resumeAsyncResponseExceptionally(asyncResponse, e);
+                        return;
+                    }
+                    asyncResponse.resume(Response.noContent().build());
+                }
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to trigger compaction on topic {}", 
clientAppId(), topicName, ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
+        }
+    }
+
+    protected void internalTriggerCompactionNonPartitionedTopic(boolean 
authoritative) {
         validateWriteOperationOnTopic(authoritative);
 
         PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
@@ -1985,6 +2060,7 @@ public class PersistentTopicsBase extends AdminResource {
         } catch (AlreadyRunningException e) {
             throw new RestException(Status.CONFLICT, e.getMessage());
         } catch (Exception e) {
+            log.error("[{}] Failed to trigger compaction on topic {}", 
clientAppId(), topicName, e);
             throw new RestException(e);
         }
     }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 7fca879..7cf93b7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -582,11 +582,18 @@ public class PersistentTopics extends 
PersistentTopicsBase {
             @ApiResponse(code = 405, message = "Operation not allowed on 
persistent topic"),
             @ApiResponse(code = 404, message = "Topic does not exist"),
             @ApiResponse(code = 409, message = "Compaction already running")})
-    public void compact(@PathParam("property") String property, 
@PathParam("cluster") String cluster,
+    public void compact(@Suspended final AsyncResponse asyncResponse,
+                        @PathParam("property") String property, 
@PathParam("cluster") String cluster,
                         @PathParam("namespace") String namespace, 
@PathParam("topic") @Encoded String encodedTopic,
                         @QueryParam("authoritative") @DefaultValue("false") 
boolean authoritative) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalTriggerCompaction(authoritative);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalTriggerCompaction(asyncResponse, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @GET
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 471a2af..8a6efdc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -973,6 +973,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 500, message = "Internal server error"),
             @ApiResponse(code = 503, message = "Failed to validate global 
cluster configuration") })
     public void compact(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -981,8 +982,14 @@ public class PersistentTopics extends PersistentTopicsBase 
{
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this 
operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean 
authoritative) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalTriggerCompaction(authoritative);
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalTriggerCompaction(asyncResponse, authoritative);
+        } catch (WebApplicationException wae) {
+            asyncResponse.resume(wae);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
     }
 
     @GET
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index c5bb62a..15e23fd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2235,6 +2235,49 @@ public class AdminApiTest extends 
MockedPulsarServiceBaseTest {
     }
 
     @Test
+    public void testTriggerCompactionPartitionedTopic() throws Exception {
+        String topicName = "persistent://prop-xyz/ns1/test-part";
+        int numPartitions = 2;
+        admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+        // create a partitioned topic by creating a producer
+        
pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
+        assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
+
+        // mock actual compaction, we don't need to really run it
+        CompletableFuture<Long> promise = new CompletableFuture<>();
+        Compactor compactor = pulsar.getCompactor();
+        doReturn(promise).when(compactor).compact(topicName + "-partition-0");
+
+        CompletableFuture<Long> promise1 = new CompletableFuture<>();
+        doReturn(promise1).when(compactor).compact(topicName + "-partition-1");
+        admin.topics().triggerCompaction(topicName);
+
+        // verify compact called once by each partition topic
+        verify(compactor).compact(topicName + "-partition-0");
+        verify(compactor).compact(topicName + "-partition-1");
+        try {
+            admin.topics().triggerCompaction(topicName);
+
+            fail("Shouldn't be able to run while already running");
+        } catch (PulsarAdminException e) {
+            // expected
+        }
+        // compact shouldn't have been called again
+        verify(compactor).compact(topicName + "-partition-0");
+        verify(compactor).compact(topicName + "-partition-1");
+
+        // complete first compaction, and trigger again
+        promise.complete(1L);
+        promise1.complete(1L);
+        admin.topics().triggerCompaction(topicName);
+
+        // verify compact was called again
+        verify(compactor, times(2)).compact(topicName + "-partition-0");
+        verify(compactor, times(2)).compact(topicName + "-partition-1");
+    }
+
+    @Test
     public void testCompactionStatus() throws Exception {
         String topicName = "persistent://prop-xyz/ns1/topic1";
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index fe1ad87..2d2d0cf 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -443,6 +443,35 @@ public class PersistentTopicsTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testTriggerCompactionTopic() {
+        final String partitionTopicName = "test-part";
+        final String nonPartitionTopicName = "test-non-part";
+
+        // trigger compaction on non-existing topic
+        AsyncResponse response = mock(AsyncResponse.class);
+        persistentTopics.compact(response, testTenant, testNamespace, 
"non-existing-topic", true);
+        ArgumentCaptor<RestException> errCaptor = 
ArgumentCaptor.forClass(RestException.class);
+        verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+        Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), 
Response.Status.NOT_FOUND.getStatusCode());
+
+        // create non partitioned topic and compaction on it
+        response = mock(AsyncResponse.class);
+        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, 
nonPartitionTopicName, true);
+        persistentTopics.compact(response, testTenant, testNamespace, 
nonPartitionTopicName, true);
+        ArgumentCaptor<Response> responseCaptor = 
ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
+
+        // create partitioned topic and compaction on it
+        response = mock(AsyncResponse.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, 
testNamespace, partitionTopicName, 2);
+        persistentTopics.compact(response, testTenant, testNamespace, 
partitionTopicName, true);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        verify(response, 
timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), 
Response.Status.NO_CONTENT.getStatusCode());
+    }
+
     @Test()
     public void testGetLastMessageId() throws Exception {
         TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", 
"role2"), Sets.newHashSet("test"));
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 0a00462..1b2f3fc 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1070,6 +1070,14 @@ public interface Topics {
     void triggerCompaction(String topic) throws PulsarAdminException;
 
     /**
+     * Trigger compaction to run for a topic asynchronously.
+     *
+     * @param topic
+     *            The topic on which to trigger compaction
+     */
+    CompletableFuture<Void> triggerCompactionAsync(String topic);
+
+    /**
      * Check the status of an ongoing compaction for a topic.
      *
      * @param topic The topic whose compaction status we wish to check
diff --git 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 3a1e685..aebdbb2 100644
--- 
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ 
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -904,18 +904,27 @@ public class TopicsImpl extends BaseResource implements 
Topics {
     }
 
     @Override
-    public void triggerCompaction(String topic)
-            throws PulsarAdminException {
+    public void triggerCompaction(String topic) throws PulsarAdminException {
         try {
-            TopicName tn = validateTopic(topic);
-            request(topicPath(tn, "compaction"))
-                .put(Entity.entity("", MediaType.APPLICATION_JSON), 
ErrorData.class);
-        } catch (Exception e) {
-            throw getApiException(e);
+            triggerCompactionAsync(topic).get(this.readTimeoutMs, 
TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
         }
     }
 
     @Override
+    public CompletableFuture<Void> triggerCompactionAsync(String topic) {
+        TopicName tn = validateTopic(topic);
+        WebTarget path = topicPath(tn, "compaction");
+        return asyncPutRequest(path, Entity.entity("", 
MediaType.APPLICATION_JSON));
+    }
+
+    @Override
     public LongRunningProcessStatus compactionStatus(String topic)
             throws PulsarAdminException {
         try {

Reply via email to