npawar commented on code in PR #8663:
URL: https://github.com/apache/pinot/pull/8663#discussion_r877758941
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1385,4 +1386,89 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
}
+
+ /**
+ * Creates PROPERTYSTORE and IDEALSTATE entries for each partitionGroup to
resume realtime table consumption
+ * @param tableName
+ */
+ public void resumeRealtimeTableConsumption(String tableName) {
+ TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName,
TableType.REALTIME);
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+
+ InstancePartitions instancePartitions =
+
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixResourceManager.getHelixZkManager(),
tableConfig,
+ InstancePartitionsType.CONSUMING);
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+ Collections.singletonMap(InstancePartitionsType.CONSUMING,
instancePartitions);
+ IdealState idealState =
+
HelixHelper.getTableIdealState(_helixResourceManager.getHelixZkManager(),
realtimeTableName);
+ int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+ Map<String, Map<String, String>> instanceStatesMap =
idealState.getRecord().getMapFields();
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+ // Read the smallest offset when a new partition is detected
+ OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
+ streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
+ streamConfig.setOffsetCriteria(originalOffsetCriteria);
+
+ int numPartitionGroups = newPartitionGroupMetadataList.size();
+ Map<Integer, SegmentZKMetadata> partitionToLatestSegment =
getLatestSegmentZKMetadataMap(realtimeTableName);
+ for (PartitionGroupMetadata partitionGroupMetadata :
newPartitionGroupMetadataList) {
+ int partitionId = partitionGroupMetadata.getPartitionGroupId();
+ if (partitionToLatestSegment.containsKey(partitionId)
+ &&
idealState.getInstanceStateMap(partitionToLatestSegment.get(partitionId).getSegmentName())
+ .containsValue(SegmentStateModel.CONSUMING)) {
+ // IN_PROGRESS segment already exists, NO-OP
+ LOGGER.info("Skipping partitionGroupId {}, IN_PROGRESS segment already
exists", partitionId);
+ } else {
+ // Step 1: Create PROPERTYSTORE nodes for each partitionId
(IN_PROGRESS)
+ String offset = partitionGroupMetadata.getStartOffset().toString();
+ long newSegmentCreationTimeMs = System.currentTimeMillis();
+
+ SegmentZKMetadata latestSegmentMeta =
partitionToLatestSegment.get(partitionId);
+ int seqNumber = latestSegmentMeta == null ? STARTING_SEQUENCE_NUMBER
+ : new
LLCSegmentName(latestSegmentMeta.getSegmentName()).getSequenceNumber() + 1;
+
+ LLCSegmentName newLLCSegment = new LLCSegmentName(tableName,
partitionId, seqNumber, newSegmentCreationTimeMs);
+ SegmentZKMetadata newSegmentZKMetadata = new
SegmentZKMetadata(newLLCSegment.getSegmentName());
+
newSegmentZKMetadata.setCreationTime(newLLCSegment.getCreationTimeMs());
+ newSegmentZKMetadata.setStartOffset(offset);
+ newSegmentZKMetadata.setNumReplicas(numReplicas);
+
newSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+ SegmentPartitionMetadata segmentPartitionMetadata =
+ getPartitionMetadataFromTableConfig(tableConfig, partitionId);
+ if (segmentPartitionMetadata != null) {
+ newSegmentZKMetadata.setPartitionMetadata(segmentPartitionMetadata);
+ }
+
+ FlushThresholdUpdater flushThresholdUpdater =
_flushThresholdUpdateManager
+ .getFlushThresholdUpdater(streamConfig);
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(null, offset, 0);
+ flushThresholdUpdater.updateFlushThreshold(streamConfig,
+ newSegmentZKMetadata, committingSegmentDescriptor, null,
+ getMaxNumPartitionsPerInstance(instancePartitions,
numPartitionGroups, numReplicas),
+ newPartitionGroupMetadataList);
+
+ _helixResourceManager.getPropertyStore().set(
Review Comment:
@saurabhd336 just think of the periodic task. There's an API to invoke
periodic task manually (look for Periodic Task tab in swagger). Whether it runs
by schedule, or by manual trigger, periodic task can only run on lead
controller of a table, so that eliminates across controller clashes. As for
same controller, if you look at BasePeriodicTask, you'll see it doesn't let
more than one task to run at once.
So the suggestion is, rely on this mechanism as we already know this works
and prevents clashes. One way to rely on this mechanism is by introducing a
query param to the runPeriodicTask API with our extra config and pass it to the
periodic task execution. Another way could be to keep your new API, but again
use same mechanism happening in runPeriodicTask API to trigger the task.
does this help clarify? We can chat on the slack channel if need be.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]