This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6a5739f076 Enhance PulsarConsumerTest (#12948)
6a5739f076 is described below
commit 6a5739f076107df16f303c1e0ea0e082a94f8004
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Apr 18 13:22:21 2024 -0700
Enhance PulsarConsumerTest (#12948)
---
.../plugin/stream/pulsar/PulsarConsumerTest.java | 187 +++++++++------------
1 file changed, 80 insertions(+), 107 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
index 01cd5cd26e..1baf212f17 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.stream.BytesStreamMessage;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
@@ -31,7 +32,7 @@ import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
@@ -39,7 +40,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
-import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.utility.DockerImageName;
import org.testng.annotations.AfterClass;
@@ -49,6 +50,7 @@ import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
public class PulsarConsumerTest {
@@ -62,103 +64,85 @@ public class PulsarConsumerTest {
public static final int NUM_PARTITIONS = 2;
public static final int NUM_RECORDS_PER_PARTITION = 1000;
public static final int BATCH_SIZE = 10;
- public static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int)
TimeUnit.MINUTES.toMillis(1);
+ public static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int)
TimeUnit.SECONDS.toMillis(1);
private final List<List<MessageId>> _partitionToMessageIdMapping = new
ArrayList<>(NUM_PARTITIONS);
private final List<List<MessageId>> _partitionToMessageIdMappingBatch = new
ArrayList<>(NUM_PARTITIONS);
private PulsarContainer _pulsar;
- private PulsarClient _pulsarClient;
@BeforeClass
public void setUp()
throws Exception {
_pulsar = new
PulsarContainer(PULSAR_IMAGE).withStartupTimeout(Duration.ofMinutes(5));
- try {
- _pulsar.start();
- _pulsarClient =
PulsarClient.builder().serviceUrl(_pulsar.getPulsarBrokerUrl()).build();
-
- try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(_pulsar.getHttpServiceUrl()).build()) {
- createTopics(admin);
- publishRecords();
- publishRecordsBatch();
- waitForMessagesToPublish(admin, TEST_TOPIC);
- waitForMessagesToPublish(admin, TEST_TOPIC_BATCH);
- }
- } catch (Exception e) {
- _pulsar.stop();
- throw new RuntimeException("Failed to setUp test environment", e);
+ _pulsar.start();
+ try (PulsarAdmin admin =
PulsarAdmin.builder().serviceHttpUrl(_pulsar.getHttpServiceUrl()).build()) {
+ Topics topics = admin.topics();
+ topics.createPartitionedTopic(TEST_TOPIC, NUM_PARTITIONS);
+ topics.createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITIONS);
}
- }
-
- private void createTopics(PulsarAdmin admin)
- throws PulsarAdminException {
- InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
- inactiveTopicPolicies.setDeleteWhileInactive(false);
- admin.namespaces().setInactiveTopicPolicies("public/default",
inactiveTopicPolicies);
-
- admin.topics().createPartitionedTopic(TEST_TOPIC, NUM_PARTITIONS);
- admin.topics().createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITIONS);
- }
-
- private void waitForMessagesToPublish(PulsarAdmin admin, String topicName)
- throws Exception {
- long endTimeMs = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5);
- while (System.currentTimeMillis() < endTimeMs) {
- if (admin.topics().getPartitionedStats(topicName,
false).getMsgInCounter()
- == NUM_RECORDS_PER_PARTITION * NUM_PARTITIONS) {
- return;
- }
- Thread.sleep(1000);
+ try (PulsarClient client =
PulsarClient.builder().serviceUrl(_pulsar.getPulsarBrokerUrl()).build()) {
+ publishRecords(client);
+ publishRecordsBatch(client);
}
- throw new RuntimeException("Failed to publish messages to topic: " +
topicName);
}
@AfterClass
public void tearDown()
throws Exception {
- _pulsarClient.close();
_pulsar.stop();
}
- public void publishRecords()
+ public void publishRecords(PulsarClient client)
throws Exception {
for (int p = 0; p < NUM_PARTITIONS; p++) {
List<MessageId> messageIds = new ArrayList<>(NUM_RECORDS_PER_PARTITION);
_partitionToMessageIdMapping.add(messageIds);
int partition = p;
- try (Producer<String> producer =
_pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC)
+ try (Producer<String> producer =
client.newProducer(Schema.STRING).topic(TEST_TOPIC)
.messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata)
{
return partition;
}
- }).create()) {
+ }).enableBatching(false).create()) {
+ List<Future<MessageId>> futures = new
ArrayList<>(NUM_RECORDS_PER_PARTITION);
for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
- messageIds.add(producer.send(MESSAGE_PREFIX + i));
+ futures.add(producer.sendAsync(MESSAGE_PREFIX + i));
}
producer.flush();
+ for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
+ MessageId messageId = futures.get(i).get();
+ assertFalse(messageId instanceof BatchMessageIdImpl);
+ messageIds.add(messageId);
+ }
}
}
}
- public void publishRecordsBatch()
+ public void publishRecordsBatch(PulsarClient client)
throws Exception {
for (int p = 0; p < NUM_PARTITIONS; p++) {
List<MessageId> messageIds = new ArrayList<>(NUM_RECORDS_PER_PARTITION);
_partitionToMessageIdMappingBatch.add(messageIds);
int partition = p;
- try (Producer<String> producer =
_pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC_BATCH)
+ try (Producer<String> producer =
client.newProducer(Schema.STRING).topic(TEST_TOPIC_BATCH)
.messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata)
{
return partition;
}
}).batchingMaxMessages(BATCH_SIZE).batchingMaxPublishDelay(1,
TimeUnit.SECONDS).create()) {
+ List<Future<MessageId>> futures = new
ArrayList<>(NUM_RECORDS_PER_PARTITION);
for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
- messageIds.add(producer.send(MESSAGE_PREFIX + i));
+ futures.add(producer.sendAsync(MESSAGE_PREFIX + i));
}
producer.flush();
+ for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
+ MessageId messageId = futures.get(i).get();
+ assertTrue(messageId instanceof BatchMessageIdImpl);
+ messageIds.add(messageId);
+ }
}
}
}
@@ -179,92 +163,81 @@ public class PulsarConsumerTest {
public void testPartitionLevelConsumer()
throws Exception {
StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC));
- int numPartitions;
try (PulsarStreamMetadataProvider metadataProvider = new
PulsarStreamMetadataProvider(CLIENT_ID,
getStreamConfig(TEST_TOPIC))) {
- numPartitions =
metadataProvider.fetchPartitionCount(CONSUMER_FETCH_TIMEOUT_MILLIS);
+
assertEquals(metadataProvider.fetchPartitionCount(CONSUMER_FETCH_TIMEOUT_MILLIS),
NUM_PARTITIONS);
}
-
- for (int partition = 0; partition < numPartitions; partition++) {
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ List<MessageId> messageIds = _partitionToMessageIdMapping.get(i);
PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
- new PartitionGroupConsumptionStatus(partition, 0, new
MessageIdStreamOffset(MessageId.earliest), null,
- "CONSUMING");
+ new PartitionGroupConsumptionStatus(i, 0, new
MessageIdStreamOffset(MessageId.earliest), null, "CONSUMING");
try (
PulsarPartitionLevelConsumer consumer =
(PulsarPartitionLevelConsumer)
streamConsumerFactory.createPartitionGroupConsumer(
CLIENT_ID, partitionGroupConsumptionStatus)) {
- PulsarMessageBatch messageBatch =
- consumer.fetchMessages(new
MessageIdStreamOffset(MessageId.earliest), CONSUMER_FETCH_TIMEOUT_MILLIS);
- assertEquals(messageBatch.getMessageCount(), 1000);
- assertFalse(messageBatch.isEndOfPartitionGroup());
- for (int i = 0; i < 1000; i++) {
- verifyMessage(messageBatch.getStreamMessage(i), partition, i, false);
- }
-
- messageBatch =
- consumer.fetchMessages(new
MessageIdStreamOffset(_partitionToMessageIdMapping.get(partition).get(500)),
- CONSUMER_FETCH_TIMEOUT_MILLIS);
- assertEquals(messageBatch.getMessageCount(), 500);
- assertFalse(messageBatch.isEndOfPartitionGroup());
- for (int i = 0; i < 500; i++) {
- verifyMessage(messageBatch.getStreamMessage(i), partition, 500 + i,
false);
- }
+ // Start from earliest
+ testConsumer(consumer, 0, messageIds);
+ // Start from middle
+ testConsumer(consumer, 500, messageIds);
}
}
}
- private void verifyMessage(BytesStreamMessage streamMessage, int partition,
int index, boolean batch) {
- assertEquals(new String(streamMessage.getValue()), MESSAGE_PREFIX + index);
- StreamMessageMetadata messageMetadata = streamMessage.getMetadata();
- assertNotNull(messageMetadata);
- MessageIdStreamOffset offset = (MessageIdStreamOffset)
messageMetadata.getOffset();
- assertNotNull(offset);
- MessageIdStreamOffset nextOffset = (MessageIdStreamOffset)
messageMetadata.getNextOffset();
- assertNotNull(nextOffset);
- List<MessageId> messageIds =
- batch ? _partitionToMessageIdMappingBatch.get(partition) :
_partitionToMessageIdMapping.get(partition);
- assertEquals(offset.getMessageId(), messageIds.get(index));
- if (index < NUM_RECORDS_PER_PARTITION - 1) {
- assertEquals(nextOffset.getMessageId(), messageIds.get(index + 1));
- }
- }
-
@Test
public void testPartitionLevelConsumerBatchMessages()
throws Exception {
StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC_BATCH));
- int numPartitions;
try (PulsarStreamMetadataProvider metadataProvider = new
PulsarStreamMetadataProvider(CLIENT_ID,
getStreamConfig(TEST_TOPIC_BATCH))) {
- numPartitions =
metadataProvider.fetchPartitionCount(CONSUMER_FETCH_TIMEOUT_MILLIS);
+
assertEquals(metadataProvider.fetchPartitionCount(CONSUMER_FETCH_TIMEOUT_MILLIS),
NUM_PARTITIONS);
}
-
- for (int partition = 0; partition < numPartitions; partition++) {
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ List<MessageId> messageIds = _partitionToMessageIdMappingBatch.get(i);
PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
- new PartitionGroupConsumptionStatus(partition, 0, new
MessageIdStreamOffset(MessageId.earliest), null,
- "CONSUMING");
+ new PartitionGroupConsumptionStatus(i, 0, new
MessageIdStreamOffset(MessageId.earliest), null, "CONSUMING");
try (
PulsarPartitionLevelConsumer consumer =
(PulsarPartitionLevelConsumer)
streamConsumerFactory.createPartitionGroupConsumer(
CLIENT_ID, partitionGroupConsumptionStatus)) {
- PulsarMessageBatch messageBatch =
- consumer.fetchMessages(new
MessageIdStreamOffset(MessageId.earliest), CONSUMER_FETCH_TIMEOUT_MILLIS);
- assertEquals(messageBatch.getMessageCount(), 1000);
- assertFalse(messageBatch.isEndOfPartitionGroup());
- for (int i = 0; i < 1000; i++) {
- verifyMessage(messageBatch.getStreamMessage(i), partition, i, true);
- }
+ // Start from earliest
+ testConsumer(consumer, 0, messageIds);
+ // Start from middle
+ testConsumer(consumer, 500, messageIds);
+ }
+ }
+ }
- messageBatch =
- consumer.fetchMessages(new
MessageIdStreamOffset(_partitionToMessageIdMappingBatch.get(partition).get(500)),
- CONSUMER_FETCH_TIMEOUT_MILLIS);
- assertEquals(messageBatch.getMessageCount(), 500);
- assertFalse(messageBatch.isEndOfPartitionGroup());
- for (int i = 0; i < 500; i++) {
- verifyMessage(messageBatch.getStreamMessage(i), partition, 500 + i,
true);
- }
+ private void testConsumer(PulsarPartitionLevelConsumer consumer, int
startIndex, List<MessageId> messageIds) {
+ MessageId startMessageId = startIndex == 0 ? MessageId.earliest :
messageIds.get(startIndex);
+ int numMessagesFetched = startIndex;
+ while (numMessagesFetched < NUM_RECORDS_PER_PARTITION) {
+ PulsarMessageBatch messageBatch =
+ consumer.fetchMessages(new MessageIdStreamOffset(startMessageId),
CONSUMER_FETCH_TIMEOUT_MILLIS);
+ int messageCount = messageBatch.getMessageCount();
+ assertFalse(messageBatch.isEndOfPartitionGroup());
+ for (int i = 0; i < messageCount; i++) {
+ verifyMessage(messageBatch.getStreamMessage(i), numMessagesFetched +
i, messageIds);
+ }
+ numMessagesFetched += messageCount;
+ if (numMessagesFetched < NUM_RECORDS_PER_PARTITION) {
+ startMessageId = messageIds.get(numMessagesFetched);
}
}
+ assertEquals(numMessagesFetched, NUM_RECORDS_PER_PARTITION);
+ }
+
+ private void verifyMessage(BytesStreamMessage streamMessage, int index,
List<MessageId> messageIds) {
+ assertEquals(new String(streamMessage.getValue()), MESSAGE_PREFIX + index);
+ StreamMessageMetadata messageMetadata = streamMessage.getMetadata();
+ assertNotNull(messageMetadata);
+ MessageIdStreamOffset offset = (MessageIdStreamOffset)
messageMetadata.getOffset();
+ assertNotNull(offset);
+ MessageIdStreamOffset nextOffset = (MessageIdStreamOffset)
messageMetadata.getNextOffset();
+ assertNotNull(nextOffset);
+ assertEquals(offset.getMessageId(), messageIds.get(index));
+ if (index < NUM_RECORDS_PER_PARTITION - 1) {
+ assertEquals(nextOffset.getMessageId(), messageIds.get(index + 1));
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]