kishoreg commented on a change in pull request #5542:
URL: https://github.com/apache/incubator-pinot/pull/5542#discussion_r439062405



##########
File path: 
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
##########
@@ -77,7 +79,7 @@
 
   private static final long RANDOM_SEED = System.currentTimeMillis();
   private static final Random RANDOM = new Random(RANDOM_SEED);
-  static final long PARTITION_OFFSET = RANDOM.nextInt(Integer.MAX_VALUE);
+  static final LongMsgOffset PARTITION_OFFSET = new 
LongMsgOffset(RANDOM.nextInt(Integer.MAX_VALUE));

Review comment:
        LongOffset

##########
File path: 
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLevelConsumer.java
##########
@@ -31,19 +31,35 @@
 public interface PartitionLevelConsumer extends Closeable {
 
   /**
+   * Is here for backward compatibility for a short time.
+   * TODO Issue 5359 remove this API once external kafka consumers implements 
return of StreamPartitionMsgOffset
    * Fetch messages from the stream between the specified offsets
    * @param startOffset
    * @param endOffset
    * @param timeoutMillis
    * @return
    * @throws java.util.concurrent.TimeoutException
    */
+  @Deprecated
   MessageBatch fetchMessages(long startOffset, long endOffset, int 
timeoutMillis)
       throws java.util.concurrent.TimeoutException;
 
+  /**
+   * Fetch messages and the per-partition high watermark from Kafka between 
the specified offsets.
+   *
+   * @param startOffset The offset of the first message desired, inclusive
+   * @param endOffset The offset of the last message desired, exclusive, or 
null
+   * @param timeoutMillis Timeout in milliseconds
+   * @throws java.util.concurrent.TimeoutException If the operation could not 
be completed within {@code timeoutMillis}
+   * milliseconds
+   * @return An iterable containing messages fetched from the stream partition 
and their offsets, as well as the
+   * high watermark for this partition.
+   */
   default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, 
StreamPartitionMsgOffset endOffset, int timeoutMillis)

Review comment:
       Nicely done!

##########
File path: 
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -893,20 +891,19 @@ IdealState ensureAllPartitionsConsuming(TableConfig 
tableConfig, PartitionLevelS
 
             // Create a new segment to re-consume from the previous start 
offset
             LLCSegmentName newLLCSegmentName = 
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
-            long startOffset = latestSegmentZKMetadata.getStartOffset();
+            StreamPartitionMsgOffset startOffset = 
offsetFactory.create(latestSegmentZKMetadata.getStartOffset());

Review comment:
       We can drop Msg, StreamPartitionOffset




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to