Jackie-Jiang commented on code in PR #8663:
URL: https://github.com/apache/pinot/pull/8663#discussion_r876345203
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1385,4 +1386,89 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
}
+
+ /**
+ * Creates PROPERTYSTORE and IDEALSTATE entries for each partitionGroup to
resume realtime table consumption
+ * @param tableName
+ */
+ public void resumeRealtimeTableConsumption(String tableName) {
+ TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName,
TableType.REALTIME);
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+
+ InstancePartitions instancePartitions =
+
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixResourceManager.getHelixZkManager(),
tableConfig,
+ InstancePartitionsType.CONSUMING);
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+ Collections.singletonMap(InstancePartitionsType.CONSUMING,
instancePartitions);
+ IdealState idealState =
+
HelixHelper.getTableIdealState(_helixResourceManager.getHelixZkManager(),
realtimeTableName);
+ int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+ Map<String, Map<String, String>> instanceStatesMap =
idealState.getRecord().getMapFields();
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+ // Read the smallest offset when a new partition is detected
+ OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
+ streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
+ streamConfig.setOffsetCriteria(originalOffsetCriteria);
+
+ int numPartitionGroups = newPartitionGroupMetadataList.size();
+ Map<Integer, SegmentZKMetadata> partitionToLatestSegment =
getLatestSegmentZKMetadataMap(realtimeTableName);
+ for (PartitionGroupMetadata partitionGroupMetadata :
newPartitionGroupMetadataList) {
Review Comment:
IMO, having a single flag of `enforcedByAdmin` can be quite confusing and
hard to use because this module (segment manager) should not be aware of the
intention of the admin. I'd suggest making the flag specific and
self-explained, such as `recreateDeletedConsumingSegment`. In the future if we
want to add other checks, we can introduce other flags, or wrap the flags into
a options class, but each flag should be specific. @sajjad-moradi @npawar
What's your opinion on this suggestion?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]