mcvsubbu commented on code in PR #9289:
URL: https://github.com/apache/pinot/pull/9289#discussion_r957635296
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1074,6 +1077,15 @@ IdealState ensureAllPartitionsConsuming(TableConfig
tableConfig, PartitionLevelS
// Get the latest segment ZK metadata for each partition
Map<Integer, SegmentZKMetadata> latestSegmentZKMetadataMap =
getLatestSegmentZKMetadataMap(realtimeTableName);
+ Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToStartOffset = new
HashMap<>();
Review Comment:
Can u add a comment before this line what this map is supposed to contain?
The logic in this class is getting quite hard to read, can we even base
class some methods and sub-class the partitionGroup vs partitionId for the two
different type of streams we suppoer?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1144,21 +1156,33 @@ IdealState ensureAllPartitionsConsuming(TableConfig
tableConfig, PartitionLevelS
// 3. we should never end up with some replicas ONLINE and some
OFFLINE.
if (isAllInstancesInState(instanceStateMap,
SegmentStateModel.OFFLINE)) {
LOGGER.info("Repairing segment: {} which is OFFLINE for all
instances in IdealState", latestSegmentName);
- StreamPartitionMsgOffset startOffset =
offsetFactory.create(latestSegmentZKMetadata.getStartOffset());
+ if (partitionGroupIdToSmallestStreamOffset == null) {
+ partitionGroupIdToSmallestStreamOffset =
fetchPartitionGroupIdToSmallestOffset(streamConfig);
+ }
+ StreamPartitionMsgOffset startOffset =
+ selectStartOffset(offsetCriteria, partitionGroupId,
partitionGroupIdToStartOffset,
+ partitionGroupIdToSmallestStreamOffset,
tableConfig.getTableName(), offsetFactory,
+ latestSegmentZKMetadata.getStartOffset()); // segments are
OFFLINE; start from beginning
createNewConsumingSegment(tableConfig, streamConfig,
latestSegmentZKMetadata, currentTimeMs,
- partitionGroupId, newPartitionGroupMetadataList,
instancePartitions, instanceStatesMap,
- segmentAssignment, instancePartitionsMap, startOffset);
+ newPartitionGroupMetadataList, instancePartitions,
instanceStatesMap, segmentAssignment,
+ instancePartitionsMap, startOffset);
} else {
if (newPartitionGroupSet.contains(partitionGroupId)) {
if (recreateDeletedConsumingSegment &&
latestSegmentZKMetadata.getStatus().isCompleted()
&& isAllInstancesInState(instanceStateMap,
SegmentStateModel.ONLINE)) {
// If we get here, that means in IdealState, the latest
segment has all replicas ONLINE.
// Create a new IN_PROGRESS segment in PROPERTYSTORE,
// add it as CONSUMING segment to IDEALSTATE.
- StreamPartitionMsgOffset startOffset =
offsetFactory.create(latestSegmentZKMetadata.getEndOffset());
+ if (partitionGroupIdToSmallestStreamOffset == null) {
Review Comment:
It may make things more readable if we can get the smallest offset all the
time? Does it involve multiple calls to the stream, and is that what we are
optimizing here? If so, good to add a comment. Otherwise, getting it once
unconditionally make make things a bit more readable.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1254,21 +1261,39 @@ private void createNewConsumingSegment(TableConfig
tableConfig, PartitionLevelSt
instancePartitionsMap);
}
- @Nullable
- private StreamPartitionMsgOffset
getPartitionGroupSmallestOffset(StreamConfig streamConfig, int
partitionGroupId) {
+ private Map<Integer, StreamPartitionMsgOffset>
fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) {
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
- List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata =
+ List<PartitionGroupMetadata> partitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig,
Collections.emptyList());
streamConfig.setOffsetCriteria(originalOffsetCriteria);
- StreamPartitionMsgOffset partitionStartOffset = null;
- for (PartitionGroupMetadata info :
smallestOffsetCriteriaPartitionGroupMetadata) {
- if (info.getPartitionGroupId() == partitionGroupId) {
- partitionStartOffset = info.getStartOffset();
- break;
+ Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToSmallestOffset =
new HashMap<>();
+ for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
+ partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(),
metadata.getStartOffset());
+ }
+ return partitionGroupIdToSmallestOffset;
+ }
+
+ private StreamPartitionMsgOffset selectStartOffset(OffsetCriteria
offsetCriteria, int partitionGroupId,
Review Comment:
Can you consider removing the `offsetCriteria` from the argument here, and
incorporating the logic to deal with a non-null value of `offsetCriteria`
outside this method? Not sure if it will make the logic more readable, but
worth a try, I think
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]