This is an automated email from the ASF dual-hosted git repository.
richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c3d17b9ed7 Fix PulsarConsumerTest (#8691)
c3d17b9ed7 is described below
commit c3d17b9ed7616e88352b1813f301949777d8c4d7
Author: Xiang Fu <[email protected]>
AuthorDate: Thu May 12 23:58:54 2022 -0700
Fix PulsarConsumerTest (#8691)
---
.../plugin/stream/pulsar/PulsarConsumerTest.java | 77 ++++++++++++++--------
1 file changed, 50 insertions(+), 27 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
index 12c0cf45ad..2b6a16e33d 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
@@ -164,22 +165,24 @@ public class PulsarConsumerTest {
throws Exception {
for (int p = 0; p < NUM_PARTITION; p++) {
final int partition = p;
- Producer<String> producer =
-
_pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC).messageRouter(new
MessageRouter() {
+ try (Producer<String> producer =
_pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC)
+ .messageRouter(new MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata)
{
return partition;
}
- }).create();
-
- for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
- MessageId messageId = producer.send(MESSAGE_PREFIX + "_" + i);
- if (!_partitionToFirstMessageIdMap.containsKey(partition)) {
- _partitionToFirstMessageIdMap.put(partition, messageId);
+ }).create()) {
+ for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
+ MessageId messageId = producer.send(MESSAGE_PREFIX + "_" + i);
+ if (!_partitionToFirstMessageIdMap.containsKey(partition)) {
+ _partitionToFirstMessageIdMap.put(partition, messageId);
+ }
}
+ producer.flush();
}
-
- producer.flush();
+ waitForCondition(input -> validatePartitionMessageCount(partition,
NUM_RECORDS_PER_PARTITION), 15 * 1000L,
+ 5 * 60 * 1000L, "Failed to consume " + NUM_RECORDS_PER_PARTITION + "
messages from partition " + partition,
+ true);
}
}
@@ -187,29 +190,49 @@ public class PulsarConsumerTest {
throws Exception {
for (int p = 0; p < NUM_PARTITION; p++) {
final int partition = p;
- Producer<String> producer =
+ try (Producer<String> producer =
_pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC_BATCH).messageRouter(new
MessageRouter() {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata)
{
return partition;
}
- }).batchingMaxMessages(BATCH_SIZE).batchingMaxPublishDelay(1,
TimeUnit.SECONDS).create();
-
- for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
- CompletableFuture<MessageId> messageIdCompletableFuture =
producer.sendAsync(MESSAGE_PREFIX + "_" + i);
- messageIdCompletableFuture.thenAccept(messageId -> {
-
- List<BatchMessageIdImpl> batchMessageIdList =
_partitionToMessageIdMapping
- .getOrDefault(partition, new ArrayList<>());
- batchMessageIdList.add((BatchMessageIdImpl) messageId);
- _partitionToMessageIdMapping.put(partition, batchMessageIdList);
-
- if (!_partitionToFirstMessageIdMapBatch.containsKey(partition)) {
- _partitionToFirstMessageIdMapBatch.put(partition, messageId);
- }
- });
+ }).batchingMaxMessages(BATCH_SIZE).batchingMaxPublishDelay(1,
TimeUnit.SECONDS).create()) {
+ for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
+ CompletableFuture<MessageId> messageIdCompletableFuture =
producer.sendAsync(MESSAGE_PREFIX + "_" + i);
+ messageIdCompletableFuture.thenAccept(messageId -> {
+
+ List<BatchMessageIdImpl> batchMessageIdList =
_partitionToMessageIdMapping
+ .getOrDefault(partition, new ArrayList<>());
+ batchMessageIdList.add((BatchMessageIdImpl) messageId);
+ _partitionToMessageIdMapping.put(partition, batchMessageIdList);
+
+ if (!_partitionToFirstMessageIdMapBatch.containsKey(partition)) {
+ _partitionToFirstMessageIdMapBatch.put(partition, messageId);
+ }
+ });
+ }
+ producer.flush();
}
- producer.flush();
+ waitForCondition(input -> validatePartitionMessageCount(partition,
NUM_RECORDS_PER_PARTITION), 15 * 1000L,
+ 5 * 60 * 1000L, "Failed to consume " + NUM_RECORDS_PER_PARTITION + "
messages from partition " + partition,
+ true);
+ }
+ }
+
+ private boolean validatePartitionMessageCount(int partition, int
expectedMsgCount) {
+ final PartitionGroupConsumer consumer =
StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC))
+ .createPartitionGroupConsumer(CLIENT_ID,
+ new PartitionGroupConsumptionStatus(partition, 1, new
MessageIdStreamOffset(MessageId.earliest), null,
+ "CONSUMING"));
+ try {
+ final MessageBatch messageBatch = consumer.fetchMessages(new
MessageIdStreamOffset(MessageId.earliest),
+ new
MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition,
expectedMsgCount)),
+ CONSUMER_FETCH_TIMEOUT_MILLIS);
+ System.out.println(
+ "Partition: " + partition + ", Consumed messageBatch count = " +
messageBatch.getMessageCount());
+ return messageBatch.getMessageCount() == expectedMsgCount;
+ } catch (TimeoutException e) {
+ return false;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]