This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9634eb7 [Issue 5903] Support `compact` all partitions of a
partitioned topic (#6537)
9634eb7 is described below
commit 9634eb79773adc399397aa3a73e23118281a1737
Author: Fangbin Sun <[email protected]>
AuthorDate: Thu Mar 19 13:45:47 2020 +0800
[Issue 5903] Support `compact` all partitions of a partitioned topic (#6537)
* Resolve conflict.
* Add a unit test to cover the compact logic.
---
.../broker/admin/impl/PersistentTopicsBase.java | 78 +++++++++++++++++++++-
.../pulsar/broker/admin/v1/PersistentTopics.java | 13 +++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 11 ++-
.../apache/pulsar/broker/admin/AdminApiTest.java | 43 ++++++++++++
.../pulsar/broker/admin/PersistentTopicsTest.java | 29 ++++++++
.../org/apache/pulsar/client/admin/Topics.java | 8 +++
.../pulsar/client/admin/internal/TopicsImpl.java | 23 +++++--
7 files changed, 192 insertions(+), 13 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c5dabb4..bb5e579 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1976,7 +1976,82 @@ public class PersistentTopicsBase extends AdminResource {
}
}
- protected void internalTriggerCompaction(boolean authoritative) {
+ protected void internalTriggerCompaction(AsyncResponse asyncResponse,
boolean authoritative) {
+ log.info("[{}] Trigger compaction on topic {}", clientAppId(),
topicName);
+ try {
+ if (topicName.isGlobal()) {
+ validateGlobalNamespaceOwnership(namespaceName);
+ }
+ } catch (Exception e) {
+ log.error("[{}] Failed to trigger compaction on topic {}",
clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+
+ // If the topic name is a partition name, no need to get partition
topic metadata again
+ if (topicName.isPartitioned()) {
+ try {
+ internalTriggerCompactionNonPartitionedTopic(authoritative);
+ } catch (Exception e) {
+ log.error("[{}] Failed to trigger compaction on topic {}",
clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+ asyncResponse.resume(Response.noContent().build());
+ } else {
+ getPartitionedTopicMetadataAsync(topicName, authoritative,
false).thenAccept(partitionMetadata -> {
+ final int numPartitions = partitionMetadata.partitions;
+ if (numPartitions > 0) {
+ final List<CompletableFuture<Void>> futures =
Lists.newArrayList();
+
+ for (int i = 0; i < numPartitions; i++) {
+ TopicName topicNamePartition =
topicName.getPartition(i);
+ try {
+
futures.add(pulsar().getAdminClient().topics().triggerCompactionAsync(topicNamePartition.toString()));
+ } catch (Exception e) {
+ log.error("[{}] Failed to trigger compaction on
topic {}", clientAppId(), topicNamePartition, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result, exception)
-> {
+ if (exception != null) {
+ Throwable th = exception.getCause();
+ if (th instanceof NotFoundException) {
+ asyncResponse.resume(new
RestException(Status.NOT_FOUND, th.getMessage()));
+ return null;
+ } else if (th instanceof WebApplicationException) {
+ asyncResponse.resume(th);
+ return null;
+ } else {
+ log.error("[{}] Failed to trigger compaction
on topic {}", clientAppId(), topicName, exception);
+ asyncResponse.resume(new
RestException(exception));
+ return null;
+ }
+ }
+ asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ } else {
+ try {
+
internalTriggerCompactionNonPartitionedTopic(authoritative);
+ } catch (Exception e) {
+ log.error("[{}] Failed to trigger compaction on topic
{}", clientAppId(), topicName, e);
+ resumeAsyncResponseExceptionally(asyncResponse, e);
+ return;
+ }
+ asyncResponse.resume(Response.noContent().build());
+ }
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to trigger compaction on topic {}",
clientAppId(), topicName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+ }
+
+ protected void internalTriggerCompactionNonPartitionedTopic(boolean
authoritative) {
validateWriteOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
@@ -1985,6 +2060,7 @@ public class PersistentTopicsBase extends AdminResource {
} catch (AlreadyRunningException e) {
throw new RestException(Status.CONFLICT, e.getMessage());
} catch (Exception e) {
+ log.error("[{}] Failed to trigger compaction on topic {}",
clientAppId(), topicName, e);
throw new RestException(e);
}
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 7fca879..7cf93b7 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -582,11 +582,18 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiResponse(code = 405, message = "Operation not allowed on
persistent topic"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 409, message = "Compaction already running")})
- public void compact(@PathParam("property") String property,
@PathParam("cluster") String cluster,
+ public void compact(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false")
boolean authoritative) {
- validateTopicName(property, cluster, namespace, encodedTopic);
- internalTriggerCompaction(authoritative);
+ try {
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ internalTriggerCompaction(asyncResponse, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 471a2af..8a6efdc 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -973,6 +973,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global
cluster configuration") })
public void compact(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -981,8 +982,14 @@ public class PersistentTopics extends PersistentTopicsBase
{
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this
operation")
@QueryParam("authoritative") @DefaultValue("false") boolean
authoritative) {
- validateTopicName(tenant, namespace, encodedTopic);
- internalTriggerCompaction(authoritative);
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalTriggerCompaction(asyncResponse, authoritative);
+ } catch (WebApplicationException wae) {
+ asyncResponse.resume(wae);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
}
@GET
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index c5bb62a..15e23fd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2235,6 +2235,49 @@ public class AdminApiTest extends
MockedPulsarServiceBaseTest {
}
@Test
+ public void testTriggerCompactionPartitionedTopic() throws Exception {
+ String topicName = "persistent://prop-xyz/ns1/test-part";
+ int numPartitions = 2;
+ admin.topics().createPartitionedTopic(topicName, numPartitions);
+
+ // create a partitioned topic by creating a producer
+
pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
+ assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
+
+ // mock actual compaction, we don't need to really run it
+ CompletableFuture<Long> promise = new CompletableFuture<>();
+ Compactor compactor = pulsar.getCompactor();
+ doReturn(promise).when(compactor).compact(topicName + "-partition-0");
+
+ CompletableFuture<Long> promise1 = new CompletableFuture<>();
+ doReturn(promise1).when(compactor).compact(topicName + "-partition-1");
+ admin.topics().triggerCompaction(topicName);
+
+ // verify compact called once by each partition topic
+ verify(compactor).compact(topicName + "-partition-0");
+ verify(compactor).compact(topicName + "-partition-1");
+ try {
+ admin.topics().triggerCompaction(topicName);
+
+ fail("Shouldn't be able to run while already running");
+ } catch (PulsarAdminException e) {
+ // expected
+ }
+ // compact shouldn't have been called again
+ verify(compactor).compact(topicName + "-partition-0");
+ verify(compactor).compact(topicName + "-partition-1");
+
+ // complete first compaction, and trigger again
+ promise.complete(1L);
+ promise1.complete(1L);
+ admin.topics().triggerCompaction(topicName);
+
+ // verify compact was called again
+ verify(compactor, times(2)).compact(topicName + "-partition-0");
+ verify(compactor, times(2)).compact(topicName + "-partition-1");
+ }
+
+ @Test
public void testCompactionStatus() throws Exception {
String topicName = "persistent://prop-xyz/ns1/topic1";
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index fe1ad87..2d2d0cf 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -443,6 +443,35 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testTriggerCompactionTopic() {
+ final String partitionTopicName = "test-part";
+ final String nonPartitionTopicName = "test-non-part";
+
+ // trigger compaction on non-existing topic
+ AsyncResponse response = mock(AsyncResponse.class);
+ persistentTopics.compact(response, testTenant, testNamespace,
"non-existing-topic", true);
+ ArgumentCaptor<RestException> errCaptor =
ArgumentCaptor.forClass(RestException.class);
+ verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+ Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
+
+ // create non partitioned topic and compaction on it
+ response = mock(AsyncResponse.class);
+ persistentTopics.createNonPartitionedTopic(testTenant, testNamespace,
nonPartitionTopicName, true);
+ persistentTopics.compact(response, testTenant, testNamespace,
nonPartitionTopicName, true);
+ ArgumentCaptor<Response> responseCaptor =
ArgumentCaptor.forClass(Response.class);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
+
+ // create partitioned topic and compaction on it
+ response = mock(AsyncResponse.class);
+ persistentTopics.createPartitionedTopic(response, testTenant,
testNamespace, partitionTopicName, 2);
+ persistentTopics.compact(response, testTenant, testNamespace,
partitionTopicName, true);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
+ }
+
@Test()
public void testGetLastMessageId() throws Exception {
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1",
"role2"), Sets.newHashSet("test"));
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 0a00462..1b2f3fc 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -1070,6 +1070,14 @@ public interface Topics {
void triggerCompaction(String topic) throws PulsarAdminException;
/**
+ * Trigger compaction to run for a topic asynchronously.
+ *
+ * @param topic
+ * The topic on which to trigger compaction
+ */
+ CompletableFuture<Void> triggerCompactionAsync(String topic);
+
+ /**
* Check the status of an ongoing compaction for a topic.
*
* @param topic The topic whose compaction status we wish to check
diff --git
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 3a1e685..aebdbb2 100644
---
a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++
b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -904,18 +904,27 @@ public class TopicsImpl extends BaseResource implements
Topics {
}
@Override
- public void triggerCompaction(String topic)
- throws PulsarAdminException {
+ public void triggerCompaction(String topic) throws PulsarAdminException {
try {
- TopicName tn = validateTopic(topic);
- request(topicPath(tn, "compaction"))
- .put(Entity.entity("", MediaType.APPLICATION_JSON),
ErrorData.class);
- } catch (Exception e) {
- throw getApiException(e);
+ triggerCompactionAsync(topic).get(this.readTimeoutMs,
TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
}
}
@Override
+ public CompletableFuture<Void> triggerCompactionAsync(String topic) {
+ TopicName tn = validateTopic(topic);
+ WebTarget path = topicPath(tn, "compaction");
+ return asyncPutRequest(path, Entity.entity("",
MediaType.APPLICATION_JSON));
+ }
+
+ @Override
public LongRunningProcessStatus compactionStatus(String topic)
throws PulsarAdminException {
try {