npawar commented on a change in pull request #8017:
URL: https://github.com/apache/pinot/pull/8017#discussion_r825154864
##########
File path:
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -124,19 +127,28 @@ public StreamPartitionMsgOffset
fetchStreamPartitionOffset(OffsetCriteria offset
for (int p = newPartitionStartIndex; p <
partitionedTopicNameList.size(); p++) {
- Reader reader =
-
_pulsarClient.newReader().topic(getPartitionedTopicName(p)).startMessageId(_config.getInitialMessageId())
- .create();
+ Consumer consumer =
_pulsarClient.newConsumer().topic(getPartitionedTopicName(p))
Review comment:
getPartitionedTopicName is making a call to
`_pulsarClient.getPartitionsForTopic(_topic).get()` for every partition. Can we
call it just once before the loop?
##########
File path:
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -124,19 +127,28 @@ public StreamPartitionMsgOffset
fetchStreamPartitionOffset(OffsetCriteria offset
for (int p = newPartitionStartIndex; p <
partitionedTopicNameList.size(); p++) {
- Reader reader =
-
_pulsarClient.newReader().topic(getPartitionedTopicName(p)).startMessageId(_config.getInitialMessageId())
- .create();
+ Consumer consumer =
_pulsarClient.newConsumer().topic(getPartitionedTopicName(p))
Review comment:
same as above, close every new consumer?
##########
File path:
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -80,16 +84,15 @@ public StreamPartitionMsgOffset
fetchStreamPartitionOffset(OffsetCriteria offset
Preconditions.checkNotNull(offsetCriteria);
try {
MessageId offset = null;
+ Consumer consumer =
Review comment:
we're reading earliest/largest in so many places now..
In `PulsarConfig`:
Based on `OffsetCriteria` smallest/largest/custom, we get
`InitialMessageID` earliest/latest/custom
Based on `InitialMessageID`, we get
`SubscriberInitialPosition` Earliest/Latest/Latest.
Is it okay to have `SubscriberInitialPosition`=Latest for a custom
OffsetCriteria? Should we even keep custom offset criteria in Pulsar? I see in
`fetchStreamPartitionOffset` we don't support it.
Further, in `PulsarStreamMetadataProvider#fetchStreamPartitionOffset`, the
Consumer is created based on `SubscriberInitialPosition`. But based on the
`offsetCriteria` param in that method, we make the consumer `getLastMessage` or
`receive` message. What if `PulsarConfig` had `largest` `OffsetCriteria`, but
the param in this method has `smallest`?
##########
File path:
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -124,19 +127,28 @@ public StreamPartitionMsgOffset
fetchStreamPartitionOffset(OffsetCriteria offset
for (int p = newPartitionStartIndex; p <
partitionedTopicNameList.size(); p++) {
- Reader reader =
-
_pulsarClient.newReader().topic(getPartitionedTopicName(p)).startMessageId(_config.getInitialMessageId())
- .create();
+ Consumer consumer =
_pulsarClient.newConsumer().topic(getPartitionedTopicName(p))
+
.subscriptionInitialPosition(_config.getInitialSubscriberPosition())
+ .subscriptionName(ConsumerName.generateRandomName()).subscribe();
- if (reader.hasMessageAvailable()) {
- Message message = reader.readNext();
+ Message message = consumer.receive(timeoutMillis,
TimeUnit.MILLISECONDS);
Review comment:
maybe add a comment here, that the offsetCriteria from StreamConfig
param supplied to this method is ignored on purpose when deciding the offset,
and that purposely "earliest" is used for all ew partitions
##########
File path:
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -80,16 +84,15 @@ public StreamPartitionMsgOffset
fetchStreamPartitionOffset(OffsetCriteria offset
Preconditions.checkNotNull(offsetCriteria);
try {
MessageId offset = null;
+ Consumer consumer =
+ _pulsarClient.newConsumer().topic(_topic)
+
.subscriptionInitialPosition(_config.getInitialSubscriberPosition())
+ .subscriptionName("Pinot_" + UUID.randomUUID()).subscribe();
+
if (offsetCriteria.isLargest()) {
- _reader.seek(MessageId.latest);
- if (_reader.hasMessageAvailable()) {
- offset = _reader.readNext().getMessageId();
- }
+ offset = consumer.getLastMessageId();
} else if (offsetCriteria.isSmallest()) {
- _reader.seek(MessageId.earliest);
- if (_reader.hasMessageAvailable()) {
- offset = _reader.readNext().getMessageId();
- }
+ offset = consumer.receive().getMessageId();
Review comment:
you prolly need to close the Consumer at the end?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]