This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new bbec7eb fix add partition failed when bundle unloaded (#6856)
bbec7eb is described below
commit bbec7eb6ca30bef10b83d46839ef152e3267c656
Author: hangc0276 <[email protected]>
AuthorDate: Wed May 27 13:35:45 2020 +0800
fix add partition failed when bundle unloaded (#6856)
### Motivation
When a topic with high input/output load, we will get add partition failed,
the failed log as follows:
```
Failed to perform http post request:
org.asynchttpclient.handler.MaxRedirectException: Maximum redirect reached: 5
null
Reason: org.asynchttpclient.handler.MaxRedirectException: Maximum redirect
reached: 5
```
### Bug description
The reason is when the topic with high load, the topic's bundle will be
unload. In the same time, we call pulsar admin to add partition for the topic,
the request will post to one broker A, broker A can't find the topic-bundle's
owner, it will redirect the request to the leader broker B, broker B find a
candidate broker C to own the bundle, and redirect request to broker C with
authoritative flag.
However, broker C can't receive the authoritative flag (that's the bug),
and can't find the topic-bundle's owner and has no authoritative flag, so it
redirect the request to the leader broker B and goes on in cycle. In the end it
reaches the max redirect limit and failed.
---
.../org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 4 ++--
.../java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java | 8 +++++---
.../java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java | 4 +++-
.../java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java | 2 +-
4 files changed, 11 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ece5371..a052191 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -468,8 +468,8 @@ public class PersistentTopicsBase extends AdminResource {
*
* @param numPartitions
*/
- protected void internalUpdatePartitionedTopic(int numPartitions, boolean
updateLocalTopicOnly) {
- validateWriteOperationOnTopic(false);
+ protected void internalUpdatePartitionedTopic(int numPartitions, boolean
updateLocalTopicOnly, boolean authoritative) {
+ validateWriteOperationOnTopic(authoritative);
// Only do the validation if it's the first hop.
if (!updateLocalTopicOnly) {
validatePartitionTopicUpdate(topicName.getLocalName(),
numPartitions);
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 3c56f1c..c3003e5 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -46,8 +46,6 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
-import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
-import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TopicStats;
@@ -57,6 +55,8 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.ApiParam;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -186,9 +186,11 @@ public class PersistentTopics extends PersistentTopicsBase
{
public void updatePartitionedTopic(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("topic")
@Encoded String encodedTopic,
@QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean
updateLocalTopicOnly,
+ @ApiParam(value = "Is authentication required to perform this
operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
int numPartitions) {
validateTopicName(property, cluster, namespace, encodedTopic);
- internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly);
+ internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly,
authoritative);
}
@GET
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index dcd2ed8..4721c05 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -278,10 +278,12 @@ public class PersistentTopics extends
PersistentTopicsBase {
@ApiParam(value = "Specify topic name", required = true)
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("updateLocalTopicOnly") @DefaultValue("false") boolean
updateLocalTopicOnly,
+ @ApiParam(value = "Is authentication required to perform this
operation")
+ @QueryParam("authoritative") @DefaultValue("false") boolean
authoritative,
@ApiParam(value = "The number of partitions for the topic",
required = true, type = "int", defaultValue = "0")
int numPartitions) {
validatePartitionedTopicName(tenant, namespace, encodedTopic);
- internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly);
+ internalUpdatePartitionedTopic(numPartitions, updateLocalTopicOnly,
authoritative);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 96b4aa3..452e2e4 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -288,7 +288,7 @@ public class PersistentTopicsTest extends
MockedPulsarServiceBaseTest {
persistentTopics.createPartitionedTopic(response, testTenant,
testNamespace, partitionedTopicName, 5);
verify(response,
timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(),
Response.Status.NO_CONTENT.getStatusCode());
- persistentTopics.updatePartitionedTopic(testTenant, testNamespace,
partitionedTopicName, true, 10);
+ persistentTopics.updatePartitionedTopic(testTenant, testNamespace,
partitionedTopicName, true, false, 10);
}
@Test