vrajat commented on code in PR #12157:
URL: https://github.com/apache/pinot/pull/12157#discussion_r1473761868
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -889,6 +892,39 @@ public Map<String, PartitionLagState>
getPartitionToLagState(
return
_partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
}
+ /**
+ * Checks if the stream partition is in a valid state.
+ *
+ * The type of checks is dependent on the stream type. An example is if the
startOffset has expired due to
+ * retention configuration of the stream which may lead to missed data.
+ *
+ * @param startOffset The offset of the first message desired, inclusive
+ */
+ private void validateStartOffset(StreamPartitionMsgOffset startOffset) {
+ if (_partitionMetadataProvider == null) {
+ createPartitionMetadataProvider("validateStartOffset");
+ }
+
+ try {
+ StreamPartitionMsgOffset streamSmallestOffset =
_partitionMetadataProvider.fetchStreamPartitionOffset(
+ OffsetCriteria.SMALLEST_OFFSET_CRITERIA,
+ /*maxWaitTimeMs=*/5000
+ );
+ if (streamSmallestOffset.compareTo(startOffset) > 0) {
Review Comment:
If ingestion is paused and the stream fast-forwards the begin offset, then
the data loss error is raised. Actually - this how I simulate this scenario in
the integration test.
For my knowledge, is it possible to specify an offset when resuming the
ingestion? If yes, and if the given offset < the beginOffset, the error will be
raised. However that is OK ? The user expected to get messages from a specific
offset and didnt.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]