sajjad-moradi commented on code in PR #8786:
URL: https://github.com/apache/pinot/pull/8786#discussion_r883966738
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -59,6 +59,28 @@ public LLCSegmentName(String tableName, int
partitionGroupId, int sequenceNumber
_segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR +
sequenceNumber + SEPARATOR + _creationTime;
}
+ private LLCSegmentName(String tableName, int partitionGroupId, int
sequenceNumber, String creationTime,
+ String segmentName) {
+ _tableName = tableName;
+ _partitionGroupId = partitionGroupId;
+ _sequenceNumber = sequenceNumber;
+ _creationTime = creationTime;
+ _segmentName = segmentName;
+ }
+
+ /**
+ * Returns the {@link LLCSegmentName} for the given segment name, or {@code
null} if the given segment name does not
+ * represent an LLC segment.
+ */
+ @Nullable
+ public static LLCSegmentName getLLCSegmentName(String segmentName) {
+ String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
+ if (parts.length != 4) {
+ return null;
+ }
+ return new LLCSegmentName(parts[0], Integer.parseInt(parts[1]),
Integer.parseInt(parts[2]), parts[3], segmentName);
Review Comment:
Why not just `return new LLCSegmentName(segmentName)`?
This way there's no need to create the new private constructor.
Also could you rename the static method?
`LLCSegmentName.getLCCSegmentName()` is a bit redundant. How about
`LLCSegmentName.of()`?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java:
##########
@@ -90,86 +88,83 @@ protected Context preprocess(Properties
periodicTaskProperties) {
@Override
protected void processTable(String tableNameWithType, Context context) {
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == TableType.REALTIME) {
+ if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ return;
+ }
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
- return;
- }
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
+ return;
+ }
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
- if (context._runSegmentLevelValidation) {
- runSegmentLevelValidation(tableConfig);
- }
+ if (context._runSegmentLevelValidation) {
+ runSegmentLevelValidation(tableConfig, streamConfig);
+ }
- PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
- IngestionConfigUtils.getStreamConfigMap(tableConfig));
- if (streamConfig.hasLowLevelConsumerType()) {
- _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig,
- streamConfig, context._recreateDeletedConsumingSegment);
- }
+ if (streamConfig.hasLowLevelConsumerType()) {
+ _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig,
streamConfig,
+ context._recreateDeletedConsumingSegment);
}
}
- private void runSegmentLevelValidation(TableConfig tableConfig) {
+ private void runSegmentLevelValidation(TableConfig tableConfig,
PartitionLevelStreamConfig streamConfig) {
String realtimeTableName = tableConfig.getTableName();
List<SegmentZKMetadata> segmentsZKMetadata =
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
- boolean countHLCSegments = true; // false if this table has ONLY LLC
segments (i.e. fully migrated)
- StreamConfig streamConfig =
- new StreamConfig(realtimeTableName,
IngestionConfigUtils.getStreamConfigMap(tableConfig));
- if (streamConfig.hasLowLevelConsumerType() &&
!streamConfig.hasHighLevelConsumerType()) {
- countHLCSegments = false;
- }
- // Update the gauge to contain the total document count in the segments
-
_validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
- computeRealtimeTotalDocumentInSegments(segmentsZKMetadata,
countHLCSegments));
- if (streamConfig.hasLowLevelConsumerType()
- &&
_llcRealtimeSegmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()) {
+ // Update the total document count gauge
+ // Count HLC segments if there is no LLC consumer configured
+ boolean hasLLC = streamConfig.hasLowLevelConsumerType();
+ _validationMetrics.updateTotalDocumentCountGauge(realtimeTableName,
+ computeTotalDocumentCount(segmentsZKMetadata, hasLLC));
Review Comment:
So the assumption here is that there's no table table with both HLC's and
LLC's, right? If there's such scenario, this changes is backward incompatible.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java:
##########
@@ -51,8 +51,8 @@ public abstract class RealtimeIndexOffHeapMemoryManager
implements PinotDataBuff
protected RealtimeIndexOffHeapMemoryManager(ServerMetrics serverMetrics,
String segmentName) {
_serverMetrics = serverMetrics;
_segmentName = segmentName;
- if (SegmentName.isLowLevelConsumerSegmentName(segmentName)) {
- LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ LLCSegmentName llcSegmentName =
LLCSegmentName.getLLCSegmentName(segmentName);
+ if (llcSegmentName != null) {
_tableName = llcSegmentName.getTableName();
Review Comment:
```suggestion
if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
_tableName = LLCSegmentName.of(segmentName).getTableName();
```
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -243,41 +243,42 @@ public List<PinotTaskConfig>
generateTasks(List<TableConfig> tableConfigs) {
}
/**
- * Fetch completed (non-consuming) segment and partition information
+ * Fetch completed (DONE/UPLOADED) segment and partition information
+ *
* @param realtimeTableName the realtime table name
- * @param completedSegmentsZKMetadata list for collecting the completed
segments ZK metadata
- * @param partitionToLatestCompletedSegmentName map for collecting the
partitionId to the latest completed segment
- * name
+ * @param completedSegmentsZKMetadata list for collecting the completed
(DONE/UPLOADED) segments ZK metadata
+ * @param partitionToLatestLLCSegmentName map for collecting the partitionId
to the latest LLC segment name
* @param allPartitions set for collecting all partition ids
*/
private void getCompletedSegmentsInfo(String realtimeTableName,
List<SegmentZKMetadata> completedSegmentsZKMetadata,
- Map<Integer, String> partitionToLatestCompletedSegmentName, Set<Integer>
allPartitions) {
+ Map<Integer, String> partitionToLatestLLCSegmentName, Set<Integer>
allPartitions) {
List<SegmentZKMetadata> segmentsZKMetadata =
_clusterInfoAccessor.getSegmentsZKMetadata(realtimeTableName);
Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
- LLCSegmentName llcSegmentName = new
LLCSegmentName(segmentZKMetadata.getSegmentName());
- allPartitions.add(llcSegmentName.getPartitionGroupId());
-
- if (segmentZKMetadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
+ Segment.Realtime.Status status = segmentZKMetadata.getStatus();
+ if (status.isCompleted()) {
completedSegmentsZKMetadata.add(segmentZKMetadata);
- latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(),
- (partitionGroupId, latestLLCSegmentName) -> {
- if (latestLLCSegmentName == null) {
- return llcSegmentName;
- } else {
- if (llcSegmentName.getSequenceNumber() >
latestLLCSegmentName.getSequenceNumber()) {
- return llcSegmentName;
- } else {
- return latestLLCSegmentName;
- }
- }
- });
+ }
+ LLCSegmentName llcSegmentName =
LLCSegmentName.getLLCSegmentName(segmentZKMetadata.getSegmentName());
+ if (llcSegmentName != null) {
Review Comment:
Can you add a comment after if statement like: `// UPLOADED segments that
don't conform to llc segment names`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]