This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ed68ec106fe [fix][broker] Fix closing of
AbstractDispatcherSingleActiveConsumer and reduce flakiness of test (#21736)
ed68ec106fe is described below
commit ed68ec106fed34ea39fc52810e270094b35e2ca3
Author: Lari Hotari <[email protected]>
AuthorDate: Tue Dec 19 16:13:14 2023 +0200
[fix][broker] Fix closing of AbstractDispatcherSingleActiveConsumer and
reduce flakiness of test (#21736)
---
.../AbstractDispatcherSingleActiveConsumer.java | 2 ++
.../apache/pulsar/broker/service/Dispatcher.java | 4 +++-
.../NonPersistentDispatcherMultipleConsumers.java | 2 ++
.../PersistentDispatcherMultipleConsumers.java | 7 ------
.../PersistentDispatcherSingleActiveConsumer.java | 15 ------------
.../SubscriptionMessageDispatchThrottlingTest.java | 28 ++++++++++++----------
6 files changed, 23 insertions(+), 35 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 5e74158c9c2..4a8a805f16b 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import
org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import
org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.client.impl.Murmur3Hash32;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
@@ -261,6 +262,7 @@ public abstract class
AbstractDispatcherSingleActiveConsumer extends AbstractBas
public CompletableFuture<Void> close(boolean disconnectConsumers,
Optional<BrokerLookupData>
assignedBrokerLookupData) {
IS_CLOSED_UPDATER.set(this, TRUE);
+ getRateLimiter().ifPresent(DispatchRateLimiter::close);
return disconnectConsumers
? disconnectAllConsumers(false, assignedBrokerLookupData) :
CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
index bdea106171b..08e5caaa2dd 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Dispatcher.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.pulsar.broker.service;
import java.util.List;
@@ -101,7 +102,8 @@ public interface Dispatcher {
}
default void updateRateLimiter() {
- //No-op
+ initializeDispatchRateLimiterIfNeeded();
+ getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate);
}
default boolean initializeDispatchRateLimiterIfNeeded() {
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index 0a9ec9fbb57..29bca715741 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
@@ -131,6 +132,7 @@ public class NonPersistentDispatcherMultipleConsumers
extends AbstractDispatcher
public CompletableFuture<Void> close(boolean disconnectConsumers,
Optional<BrokerLookupData>
assignedBrokerLookupData) {
IS_CLOSED_UPDATER.set(this, TRUE);
+ getRateLimiter().ifPresent(DispatchRateLimiter::close);
return disconnectConsumers
? disconnectAllConsumers(false, assignedBrokerLookupData) :
CompletableFuture.completedFuture(null);
}
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 6001561d72a..cafc398a3c8 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -1026,13 +1026,6 @@ public class PersistentDispatcherMultipleConsumers
extends AbstractDispatcherMul
return dispatchRateLimiter;
}
- @Override
- public void updateRateLimiter() {
- if (!initializeDispatchRateLimiterIfNeeded()) {
-
this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
- }
- }
-
@Override
public boolean initializeDispatchRateLimiterIfNeeded() {
if (!dispatchRateLimiter.isPresent() &&
DispatchRateLimiter.isDispatchRateEnabled(
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 54c039d632f..806773af451 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -25,7 +25,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -560,13 +559,6 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
return dispatchRateLimiter;
}
- @Override
- public void updateRateLimiter() {
- if (!initializeDispatchRateLimiterIfNeeded()) {
-
this.dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate);
- }
- }
-
@Override
public boolean initializeDispatchRateLimiterIfNeeded() {
if (!dispatchRateLimiter.isPresent() &&
DispatchRateLimiter.isDispatchRateEnabled(
@@ -578,13 +570,6 @@ public class PersistentDispatcherSingleActiveConsumer
extends AbstractDispatcher
return false;
}
- @Override
- public CompletableFuture<Void> close() {
- IS_CLOSED_UPDATER.set(this, TRUE);
- dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
- return disconnectAllConsumers();
- }
-
@Override
public boolean checkAndUnblockIfStuck() {
Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index 6304ed82d4f..02de11a2bcc 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.api;
import static org.awaitility.Awaitility.await;
import com.google.common.collect.Sets;
-import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -48,7 +47,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
* @param subscription
* @throws Exception
*/
- @Test(dataProvider = "subscriptionAndDispatchRateType", timeOut = 5000)
+ @Test(dataProvider = "subscriptionAndDispatchRateType", timeOut = 30000)
public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType
subscription,
DispatchRateType
dispatchRateType) throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -144,7 +143,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
* @param subscription
* @throws Exception
*/
- @Test(dataProvider = "subscriptions", timeOut = 5000)
+ @Test(dataProvider = "subscriptions", timeOut = 30000)
public void
testMessageRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType
subscription)
throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -218,7 +217,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
log.info("-- Exiting {} test --", methodName);
}
- @Test(dataProvider = "subscriptions", timeOut = 30000, invocationCount =
15)
+ @Test(dataProvider = "subscriptions", timeOut = 30000)
private void testMessageNotDuplicated(SubscriptionType subscription)
throws Exception {
int brokerRate = 1000;
int topicRate = 5000;
@@ -273,7 +272,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
Assert.fail("Should only have PersistentDispatcher in this test");
}
final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
- Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+ Awaitility.await().untilAsserted(() -> {
DispatchRateLimiter brokerDispatchRateLimiter =
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
Assert.assertTrue(brokerDispatchRateLimiter != null
&& brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
@@ -320,7 +319,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
* @param subscription
* @throws Exception
*/
- @Test(dataProvider = "subscriptions", timeOut = 5000)
+ @Test(dataProvider = "subscriptions", timeOut = 30000)
public void
testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType
subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -451,7 +450,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
Assert.fail("Should only have PersistentDispatcher in this test");
}
final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
- Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+ Awaitility.await().untilAsserted(() -> {
DispatchRateLimiter brokerDispatchRateLimiter =
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
Assert.assertTrue(brokerDispatchRateLimiter != null
&& brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
@@ -526,7 +525,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
* @param subscription
* @throws Exception
*/
- @Test(dataProvider = "subscriptions", timeOut = 8000)
+ @Test(dataProvider = "subscriptions", timeOut = 30000)
public void
testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType
subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -539,6 +538,11 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
long initBytes =
pulsar.getConfiguration().getDispatchThrottlingRatePerTopicInByte();
final int byteRate = 1000;
admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" +
byteRate);
+
+ Awaitility.await().untilAsserted(() -> {
+
Assert.assertEquals(pulsar.getConfiguration().getDispatchThrottlingRateInByte(),
byteRate);
+ });
+
admin.namespaces().createNamespace(namespace1,
Sets.newHashSet("test"));
admin.namespaces().createNamespace(namespace2,
Sets.newHashSet("test"));
@@ -572,7 +576,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
Producer<byte[]> producer1 =
pulsarClient.newProducer().topic(topicName1).create();
Producer<byte[]> producer2 =
pulsarClient.newProducer().topic(topicName2).create();
- Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+ Awaitility.await().untilAsserted(() -> {
DispatchRateLimiter rateLimiter =
pulsar.getBrokerService().getBrokerDispatchRateLimiter();
Assert.assertTrue(rateLimiter != null
&& rateLimiter.getDispatchRateOnByte() > 0);
@@ -605,7 +609,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
*
* @throws Exception
*/
- @Test(timeOut = 5000)
+ @Test(timeOut = 30000)
public void testRateLimitingMultipleConsumers() throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -691,7 +695,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
}
- @Test(dataProvider = "subscriptions", timeOut = 5000)
+ @Test(dataProvider = "subscriptions", timeOut = 30000)
public void testClusterRateLimitingConfiguration(SubscriptionType
subscription) throws Exception {
log.info("-- Starting {} test --", methodName);
@@ -868,7 +872,7 @@ public class SubscriptionMessageDispatchThrottlingTest
extends MessageDispatchTh
log.info("-- Exiting {} test --", methodName);
}
- @Test(dataProvider = "subscriptions", timeOut = 11000)
+ @Test(dataProvider = "subscriptions", timeOut = 30000)
public void testClosingRateLimiter(SubscriptionType subscription) throws
Exception {
log.info("-- Starting {} test --", methodName);