Jackie-Jiang commented on code in PR #8663:
URL: https://github.com/apache/pinot/pull/8663#discussion_r877541423
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1385,4 +1386,89 @@ public void uploadToDeepStoreIfMissing(TableConfig
tableConfig, List<SegmentZKMe
}
}
}
+
+ /**
+ * Creates PROPERTYSTORE and IDEALSTATE entries for each partitionGroup to
resume realtime table consumption
+ * @param tableName
+ */
+ public void resumeRealtimeTableConsumption(String tableName) {
+ TableConfig tableConfig = _helixResourceManager.getTableConfig(tableName,
TableType.REALTIME);
+ String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+
+ InstancePartitions instancePartitions =
+
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixResourceManager.getHelixZkManager(),
tableConfig,
+ InstancePartitionsType.CONSUMING);
+ Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap =
+ Collections.singletonMap(InstancePartitionsType.CONSUMING,
instancePartitions);
+ IdealState idealState =
+
HelixHelper.getTableIdealState(_helixResourceManager.getHelixZkManager(),
realtimeTableName);
+ int numReplicas = getNumReplicas(tableConfig, instancePartitions);
+ Map<String, Map<String, String>> instanceStatesMap =
idealState.getRecord().getMapFields();
+
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
+ List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
+ getPartitionGroupConsumptionStatusList(idealState, streamConfig);
+ // Read the smallest offset when a new partition is detected
+ OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
+ streamConfig.setOffsetCriteria(OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
+ List<PartitionGroupMetadata> newPartitionGroupMetadataList =
+ getNewPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
+ streamConfig.setOffsetCriteria(originalOffsetCriteria);
+
+ int numPartitionGroups = newPartitionGroupMetadataList.size();
+ Map<Integer, SegmentZKMetadata> partitionToLatestSegment =
getLatestSegmentZKMetadataMap(realtimeTableName);
+ for (PartitionGroupMetadata partitionGroupMetadata :
newPartitionGroupMetadataList) {
+ int partitionId = partitionGroupMetadata.getPartitionGroupId();
+ if (partitionToLatestSegment.containsKey(partitionId)
+ &&
idealState.getInstanceStateMap(partitionToLatestSegment.get(partitionId).getSegmentName())
+ .containsValue(SegmentStateModel.CONSUMING)) {
+ // IN_PROGRESS segment already exists, NO-OP
+ LOGGER.info("Skipping partitionGroupId {}, IN_PROGRESS segment already
exists", partitionId);
+ } else {
+ // Step 1: Create PROPERTYSTORE nodes for each partitionId
(IN_PROGRESS)
+ String offset = partitionGroupMetadata.getStartOffset().toString();
+ long newSegmentCreationTimeMs = System.currentTimeMillis();
+
+ SegmentZKMetadata latestSegmentMeta =
partitionToLatestSegment.get(partitionId);
+ int seqNumber = latestSegmentMeta == null ? STARTING_SEQUENCE_NUMBER
+ : new
LLCSegmentName(latestSegmentMeta.getSegmentName()).getSequenceNumber() + 1;
+
+ LLCSegmentName newLLCSegment = new LLCSegmentName(tableName,
partitionId, seqNumber, newSegmentCreationTimeMs);
+ SegmentZKMetadata newSegmentZKMetadata = new
SegmentZKMetadata(newLLCSegment.getSegmentName());
+
newSegmentZKMetadata.setCreationTime(newLLCSegment.getCreationTimeMs());
+ newSegmentZKMetadata.setStartOffset(offset);
+ newSegmentZKMetadata.setNumReplicas(numReplicas);
+
newSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+ SegmentPartitionMetadata segmentPartitionMetadata =
+ getPartitionMetadataFromTableConfig(tableConfig, partitionId);
+ if (segmentPartitionMetadata != null) {
+ newSegmentZKMetadata.setPartitionMetadata(segmentPartitionMetadata);
+ }
+
+ FlushThresholdUpdater flushThresholdUpdater =
_flushThresholdUpdateManager
+ .getFlushThresholdUpdater(streamConfig);
+ CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(null, offset, 0);
+ flushThresholdUpdater.updateFlushThreshold(streamConfig,
+ newSegmentZKMetadata, committingSegmentDescriptor, null,
+ getMaxNumPartitionsPerInstance(instancePartitions,
numPartitionGroups, numReplicas),
+ newPartitionGroupMetadataList);
+
+ _helixResourceManager.getPropertyStore().set(
Review Comment:
@sajjad-moradi It should be okay because we always follow the same sequence
of operation as the segment commit protocol:
```
Step 1: Update PROPERTYSTORE to change the old segment metadata status to
DONE
Step 2: Update PROPERTYSTORE to create the new segment metadata with status
IN_PROGRESS
Step 3: Update IDEALSTATES to include new segment in CONSUMING state, and
change old segment to ONLINE state.
```
The operation itself is idempotent. Given any intermediate state, it will
return the same result.
If the operation is not idempotent, then we also need to handle the
synchronization across multiple controllers.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]