eolivelli commented on a change in pull request #12287:
URL: https://github.com/apache/pulsar/pull/12287#discussion_r723716176
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java
##########
@@ -934,6 +934,106 @@ public void testCustomPartitionedProducer() throws
Exception {
}
}
+ /**
+ * Test producer and consumer interceptor to validate onPartitions change
api invocation when partitions of the
+ * topic changes.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testPartitionedTopicInterceptor() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(),
0);// Creates new client connection
+
+ int numPartitions = 4;
+ TopicName topicName = TopicName
+
.get("persistent://my-property/my-ns/interceptor-partitionedtopic1-" +
System.currentTimeMillis());
+
+ admin.topics().createPartitionedTopic(topicName.toString(),
numPartitions);
+
+ AtomicInteger newProducerPartitions = new AtomicInteger(0);
+ AtomicInteger newConsumerPartitions = new AtomicInteger(0);
+
+ Producer<byte[]> producer =
pulsarClient.newProducer().topic(topicName.toString()).enableBatching(false)
+ .autoUpdatePartitions(true).autoUpdatePartitionsInterval(1,
TimeUnit.SECONDS)
+ .intercept(new ProducerInterceptor<byte[]>() {
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public Message<byte[]> beforeSend(Producer<byte[]>
producer, Message<byte[]> message) {
+ return message;
+ }
+
+ @Override
+ public void onSendAcknowledgement(Producer<byte[]>
producer, Message<byte[]> message,
+ MessageId msgId, Throwable exception) {
+ }
+
+ @Override
+ public void onPartitionsChange(String topicName, int
partitions) {
+ newProducerPartitions.set(partitions);
+ }
+
}).messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
+
+ Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topicName.toString())
+
.subscriptionName("my-partitioned-subscriber").autoUpdatePartitionsInterval(1,
TimeUnit.SECONDS)
+ .intercept(new ConsumerInterceptor<byte[]>() {
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public Message<byte[]> beforeConsume(Consumer<byte[]>
consumer, Message<byte[]> message) {
+ return message;
+ }
+
+ @Override
+ public void onAcknowledge(Consumer<byte[]> consumer,
MessageId messageId, Throwable exception) {
+
+ }
+
+ @Override
+ public void onAcknowledgeCumulative(Consumer<byte[]>
consumer, MessageId messageId,
+ Throwable exception) {
+ }
+
+ @Override
+ public void onNegativeAcksSend(Consumer<byte[]> c,
Set<MessageId> messageIds) {
+ }
+
+ @Override
+ public void onAckTimeoutSend(Consumer<byte[]> c2,
Set<MessageId> messageIds) {
+ }
+
+ @Override
+ public void onPartitionsChange(String topic, int
partitions) {
+ newConsumerPartitions.set(partitions);
+ }
+ }).subscribe();
+
+ int newPartitions = numPartitions + 5;
+ admin.topics().updatePartitionedTopic(topicName.toString(),
newPartitions);
+
+ retryStrategically(
Review comment:
What about using Awaiatility ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]