Jackie-Jiang commented on code in PR #13112:
URL: https://github.com/apache/pinot/pull/13112#discussion_r1605909440
##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java:
##########
@@ -323,6 +324,15 @@ public void setEndOffset(String endOffset) {
setValue(Segment.Realtime.END_OFFSET, endOffset);
}
+ public StreamContinuationMode getContinuationMode() {
+ return _znRecord.getEnumField(Segment.Realtime.CONTINUATION_MODE,
StreamContinuationMode.class,
+ StreamContinuationMode.RESUME);
Review Comment:
Do you see we might add another mode in the future? I feel a `boolean` field
of whether it is the first segment of a streaming partition is good enough.
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java:
##########
@@ -55,15 +56,15 @@ default void start(StreamPartitionMsgOffset startOffset) {
* @throws TimeoutException If the operation could not be completed within
timeout
* @return A batch of messages from the stream partition group
*/
- default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int
timeoutMs)
+ default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset,
CommonConstants.Segment.Realtime.StreamContinuationMode continuationMode, int
timeoutMs)
Review Comment:
I feel a boolean is easier to understand, and I don't see other possible mode
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -73,11 +74,12 @@ public KinesisConsumer(KinesisConfig config, KinesisClient
kinesisClient) {
* Fetch records from the Kinesis stream between the start and end
KinesisCheckpoint
*/
@Override
- public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset
startMsgOffset, int timeoutMs) {
+ public KinesisMessageBatch fetchMessages(StreamPartitionMsgOffset
startMsgOffset,
Review Comment:
Can we get #12806 in first if it works well?
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupOffset.java:
##########
@@ -41,18 +41,25 @@ public class KinesisPartitionGroupOffset implements
StreamPartitionMsgOffset {
private final String _shardId;
private final String _sequenceNumber;
+ public static final String STATUS_SEPARATOR = "::";
Review Comment:
Revert the changes in this file?
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -430,7 +431,8 @@ protected boolean consumeLoop()
// Update _currentOffset upon return from this method
MessageBatch messageBatch;
try {
- messageBatch = _partitionGroupConsumer.fetchMessages(_currentOffset,
_streamConfig.getFetchTimeoutMillis());
+ messageBatch = _partitionGroupConsumer.fetchMessages(_currentOffset,
_segmentZKMetadata.getContinuationMode(),
Review Comment:
(MAJOR) Only the first batch should be count as ingested from new stream.
Within the `fetchMessages()` API, we should just a boolean `inclusive` to
mark whether we should consume the current offset
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java:
##########
@@ -55,15 +56,15 @@ default void start(StreamPartitionMsgOffset startOffset) {
* @throws TimeoutException If the operation could not be completed within
timeout
* @return A batch of messages from the stream partition group
*/
- default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int
timeoutMs)
+ default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset,
CommonConstants.Segment.Realtime.StreamContinuationMode continuationMode, int
timeoutMs)
Review Comment:
Please also update the javadoc
##########
pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java:
##########
@@ -187,15 +190,27 @@ private KinesisMessageBatch
buildKinesisMessageBatch(KinesisPartitionGroupOffset
return new KinesisMessageBatch(messages, offsetOfNextBatch, endOfShard);
}
- private String getShardIterator(String shardId, String sequenceNumber) {
+ private String getShardIterator(String shardId, String sequenceNumber,
+ CommonConstants.Segment.Realtime.StreamContinuationMode
continuationMode) {
GetShardIteratorRequest.Builder requestBuilder =
GetShardIteratorRequest.builder().streamName(_config.getStreamTopicName()).shardId(shardId);
- if (sequenceNumber != null) {
- requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber)
- .shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
- } else {
- requestBuilder =
requestBuilder.shardIteratorType(_config.getShardIteratorType());
+
+ switch (continuationMode) {
+ case RESUME: {
+ if (sequenceNumber != null) {
+ requestBuilder =
requestBuilder.startingSequenceNumber(sequenceNumber).shardIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+ } else {
+ requestBuilder =
requestBuilder.shardIteratorType(_config.getShardIteratorType());
+ }
+ break;
+ }
+ case INITIALIZE: {
+ requestBuilder =
requestBuilder.shardIteratorType(_config.getShardIteratorType());
+ break;
+ }
+ default: //
Review Comment:
This is bad practice to leave unexpected mode unhandled. Throw exception
instead
##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumer.java:
##########
@@ -55,15 +56,15 @@ default void start(StreamPartitionMsgOffset startOffset) {
* @throws TimeoutException If the operation could not be completed within
timeout
* @return A batch of messages from the stream partition group
*/
- default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int
timeoutMs)
+ default MessageBatch fetchMessages(StreamPartitionMsgOffset startOffset,
CommonConstants.Segment.Realtime.StreamContinuationMode continuationMode, int
timeoutMs)
Review Comment:
(MAJOR) This is backward incompatible (this is a public facing interface).
We need to add default impl for the new added API and not changing existing
signature
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]