npawar commented on a change in pull request #8017:
URL: https://github.com/apache/pinot/pull/8017#discussion_r825154864



##########
File path: 
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -124,19 +127,28 @@ public StreamPartitionMsgOffset 
fetchStreamPartitionOffset(OffsetCriteria offset
 
         for (int p = newPartitionStartIndex; p < 
partitionedTopicNameList.size(); p++) {
 
-          Reader reader =
-              
_pulsarClient.newReader().topic(getPartitionedTopicName(p)).startMessageId(_config.getInitialMessageId())
-                  .create();
+          Consumer consumer = 
_pulsarClient.newConsumer().topic(getPartitionedTopicName(p))

Review comment:
       getPartitionedTopicName is making a call to 
`_pulsarClient.getPartitionsForTopic(_topic).get()` for every partition. Can we 
call it just once before the loop?

##########
File path: 
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -124,19 +127,28 @@ public StreamPartitionMsgOffset 
fetchStreamPartitionOffset(OffsetCriteria offset
 
         for (int p = newPartitionStartIndex; p < 
partitionedTopicNameList.size(); p++) {
 
-          Reader reader =
-              
_pulsarClient.newReader().topic(getPartitionedTopicName(p)).startMessageId(_config.getInitialMessageId())
-                  .create();
+          Consumer consumer = 
_pulsarClient.newConsumer().topic(getPartitionedTopicName(p))

Review comment:
       same as above, close every new consumer?

##########
File path: 
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -80,16 +84,15 @@ public StreamPartitionMsgOffset 
fetchStreamPartitionOffset(OffsetCriteria offset
     Preconditions.checkNotNull(offsetCriteria);
     try {
       MessageId offset = null;
+      Consumer consumer =

Review comment:
       we're reading earliest/largest in so many places now..
   In `PulsarConfig`:
   Based on `OffsetCriteria` smallest/largest/custom, we get 
   `InitialMessageID` earliest/latest/custom
   Based on `InitialMessageID`, we get 
   `SubscriberInitialPosition` Earliest/Latest/Latest.
   Is it okay to have `SubscriberInitialPosition`=Latest for a custom 
OffsetCriteria? Should we even keep custom offset criteria in Pulsar? I see in 
`fetchStreamPartitionOffset` we don't support it.
   
   Further, in `PulsarStreamMetadataProvider#fetchStreamPartitionOffset`, the 
Consumer is created based on `SubscriberInitialPosition`. But based on the 
`offsetCriteria` param in that method, we make the consumer `getLastMessage` or 
`receive` message. What if `PulsarConfig` had `largest` `OffsetCriteria`, but 
the param in this method has `smallest`?

##########
File path: 
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -124,19 +127,28 @@ public StreamPartitionMsgOffset 
fetchStreamPartitionOffset(OffsetCriteria offset
 
         for (int p = newPartitionStartIndex; p < 
partitionedTopicNameList.size(); p++) {
 
-          Reader reader =
-              
_pulsarClient.newReader().topic(getPartitionedTopicName(p)).startMessageId(_config.getInitialMessageId())
-                  .create();
+          Consumer consumer = 
_pulsarClient.newConsumer().topic(getPartitionedTopicName(p))
+              
.subscriptionInitialPosition(_config.getInitialSubscriberPosition())
+              .subscriptionName(ConsumerName.generateRandomName()).subscribe();
 
-          if (reader.hasMessageAvailable()) {
-            Message message = reader.readNext();
+          Message message = consumer.receive(timeoutMillis, 
TimeUnit.MILLISECONDS);

Review comment:
       maybe add a comment here, that the offsetCriteria from StreamConfig 
param supplied to this method is ignored on purpose when deciding the offset, 
and that purposely "earliest" is used for all ew partitions

##########
File path: 
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
##########
@@ -80,16 +84,15 @@ public StreamPartitionMsgOffset 
fetchStreamPartitionOffset(OffsetCriteria offset
     Preconditions.checkNotNull(offsetCriteria);
     try {
       MessageId offset = null;
+      Consumer consumer =
+          _pulsarClient.newConsumer().topic(_topic)
+              
.subscriptionInitialPosition(_config.getInitialSubscriberPosition())
+              .subscriptionName("Pinot_" + UUID.randomUUID()).subscribe();
+
       if (offsetCriteria.isLargest()) {
-        _reader.seek(MessageId.latest);
-        if (_reader.hasMessageAvailable()) {
-          offset = _reader.readNext().getMessageId();
-        }
+        offset = consumer.getLastMessageId();
       } else if (offsetCriteria.isSmallest()) {
-        _reader.seek(MessageId.earliest);
-        if (_reader.hasMessageAvailable()) {
-          offset = _reader.readNext().getMessageId();
-        }
+        offset = consumer.receive().getMessageId();

Review comment:
       you prolly need to close the Consumer at the end?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to