k-pisey opened a new issue #14086:
URL: https://github.com/apache/pulsar/issues/14086
**Describe the bug**
I use an internal consumer received at consumer event listener
`becameActive(Consumer<?> consumer, int i) ` to consume message from the
partition i.
I got number of received messages that is very unidentical to number of
produced messages.
**To Reproduce**
Steps to reproduce the behavior:
I leave my test program below.
Test steps:
1. Start pulsar container
2. Create partitioned topic with number of partitions equal to 1
3. Subscribe to partitioned topic with failover subscription type and
consumer event listener
4. Wait sufficient time until pulsar broker notified consumer event listener
about an internal consumer became active on partition 0
5. Start receiving message via the given consumer on another thread, it will
be forever running
6. Create a partitioned producer then produces messages to topic
**Expected behavior**
After received the last input message, number of received messages should
identical to number of produced messages
**Desktop (please complete the following information):**
- OS: Mac OS
- Java testcontainers
- Intellij
**Additional context**
- pulsar image version: 2.9.1
- pulsar-client version: 2.9.1
- pulsar-client-admin version: 2.9.1
- testcontainers version: 1.16.2
```java
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.utility.DockerImageName;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public class InternalConsumerTest {
final static String TOPIC = "my-topic";
static void setup(String restUrl) throws PulsarClientException,
PulsarAdminException {
log.info("Start setup");
try (PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(restUrl)
.build();
) {
admin.topics().createPartitionedTopic(TOPIC, 1);
}
log.info("Setup finished");
}
static void subscribeToTopic(PulsarClient client, EventListener
eventListener, String topic, String subscription) {
try {
client.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subscription)
.consumerEventListener(eventListener)
.subscriptionType(SubscriptionType.Failover)
.subscribe();
} catch (PulsarClientException e) {
log.error("Failed to instantiate consumer on topic: {}", TOPIC,
e);
}
}
@Test
void test() throws InterruptedException, PulsarAdminException,
PulsarClientException {
DockerImageName PULSAR_IMAGE =
DockerImageName.parse("apachepulsar/pulsar:2.9.1");
try (PulsarContainer pulsar = new PulsarContainer(PULSAR_IMAGE)
) {
pulsar.start();
Thread.sleep(TimeUnit.SECONDS.toMillis(2));
setup(pulsar.getHttpServiceUrl());
try (PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getPulsarBrokerUrl())
.build()) {
Manager manager = new Manager();
EventListener eventListener = new EventListener(manager);
subscribeToTopic(client, eventListener, TOPIC, "sub-1");
while (!manager.consumingStarted.get()) {
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
}
// sleep another 1 sec after internal consumer started
consuming
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
// create partitioned producer then produce messages to topic
Runnable producerTask = new Thread(producerTask(client));
producerTask.run();
// forever sleep
while (true) {
Thread.sleep(TimeUnit.MINUTES.toMillis(1));
}
}
}
}
static Runnable producerTask(PulsarClient client) {
return () -> {
try (Producer<String> producer =
client.newProducer(Schema.STRING)
.topic(TOPIC)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create()) {
log.info("Start producing");
String[] keys = {"1111", "2222", "3333"};
int k = 0;
for (int i = 0; i < 90; i++) {
if (k == 3) {
k = 0;
}
producer.newMessage()
.key(keys[k])
.value(String.valueOf(i))
.send();
k++;
}
producer.newMessage()
.key(keys[0])
.value(String.valueOf(-1))
.send();
log.info("Producing finished");
} catch (PulsarClientException e) {
log.info("Error", e);
}
};
}
static Runnable consumerTask(Consumer<?> consumer, AtomicBoolean
consumingStarted) {
return () -> {
String s = "";
int n = 0;
log.info("Start receiving message from topic {}",
consumer.getTopic());
consumingStarted.set(true);
while (!s.equals("-1")) { // if message not equal to last input
message
try {
Message<?> msg = consumer.receive(1, TimeUnit.MINUTES);
if (msg != null) {
s = new String(msg.getData());
n++;
log.info("Received message: {}", s);
consumer.acknowledge(msg);
}
} catch (PulsarClientException e) {
log.error("Error", e);
}
}
log.info("Number of received messages: {}", n);
};
}
static class EventListener implements ConsumerEventListener {
private final Manager manager;
EventListener(Manager manager) {
this.manager = manager;
}
@Override
public void becameActive(Consumer<?> consumer, int i) {
log.info("Consumer {name: {}, hashcode: {}} became active on
partition {}", consumer.getConsumerName(), consumer.hashCode(), i);
manager.run(consumer, i);
}
@Override
public void becameInactive(Consumer<?> consumer, int i) {
log.info("Consumer {name: {}, hashcode: {}} became inactive on
partition {}", consumer.getConsumerName(), consumer.hashCode(), i);
}
}
static class Manager {
Map<Integer, Thread> runningMap = new HashMap<>();
AtomicBoolean consumingStarted = new AtomicBoolean(false);
void run(Consumer<?> consumer, int i) {
if (runningMap.containsKey(i)) {
return;
}
Thread thread = new Thread(consumerTask(consumer,
consumingStarted));
thread.start();
runningMap.put(i, thread);
}
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]