sajjad-moradi commented on code in PR #8786:
URL: https://github.com/apache/pinot/pull/8786#discussion_r883966738


##########
pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java:
##########
@@ -59,6 +59,28 @@ public LLCSegmentName(String tableName, int 
partitionGroupId, int sequenceNumber
     _segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + 
sequenceNumber + SEPARATOR + _creationTime;
   }
 
+  private LLCSegmentName(String tableName, int partitionGroupId, int 
sequenceNumber, String creationTime,
+      String segmentName) {
+    _tableName = tableName;
+    _partitionGroupId = partitionGroupId;
+    _sequenceNumber = sequenceNumber;
+    _creationTime = creationTime;
+    _segmentName = segmentName;
+  }
+
+  /**
+   * Returns the {@link LLCSegmentName} for the given segment name, or {@code 
null} if the given segment name does not
+   * represent an LLC segment.
+   */
+  @Nullable
+  public static LLCSegmentName getLLCSegmentName(String segmentName) {
+    String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
+    if (parts.length != 4) {
+      return null;
+    }
+    return new LLCSegmentName(parts[0], Integer.parseInt(parts[1]), 
Integer.parseInt(parts[2]), parts[3], segmentName);

Review Comment:
   Why not just `return new LLCSegmentName(segmentName)`?
   This way there's no need to create the new private constructor.
   Also could you rename the static method? 
`LLCSegmentName.getLCCSegmentName()` is a bit redundant. How about 
`LLCSegmentName.of()`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java:
##########
@@ -90,86 +88,83 @@ protected Context preprocess(Properties 
periodicTaskProperties) {
 
   @Override
   protected void processTable(String tableNameWithType, Context context) {
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-    if (tableType == TableType.REALTIME) {
+    if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      return;
+    }
 
-      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig == null) {
-        LOGGER.warn("Failed to find table config for table: {}, skipping 
validation", tableNameWithType);
-        return;
-      }
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.warn("Failed to find table config for table: {}, skipping 
validation", tableNameWithType);
+      return;
+    }
+    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
 
-      if (context._runSegmentLevelValidation) {
-        runSegmentLevelValidation(tableConfig);
-      }
+    if (context._runSegmentLevelValidation) {
+      runSegmentLevelValidation(tableConfig, streamConfig);
+    }
 
-      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
-          IngestionConfigUtils.getStreamConfigMap(tableConfig));
-      if (streamConfig.hasLowLevelConsumerType()) {
-        _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig,
-            streamConfig, context._recreateDeletedConsumingSegment);
-      }
+    if (streamConfig.hasLowLevelConsumerType()) {
+      _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, 
streamConfig,
+          context._recreateDeletedConsumingSegment);
     }
   }
 
-  private void runSegmentLevelValidation(TableConfig tableConfig) {
+  private void runSegmentLevelValidation(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig) {
     String realtimeTableName = tableConfig.getTableName();
     List<SegmentZKMetadata> segmentsZKMetadata = 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
-    boolean countHLCSegments = true;  // false if this table has ONLY LLC 
segments (i.e. fully migrated)
-    StreamConfig streamConfig =
-        new StreamConfig(realtimeTableName, 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    if (streamConfig.hasLowLevelConsumerType() && 
!streamConfig.hasHighLevelConsumerType()) {
-      countHLCSegments = false;
-    }
-    // Update the gauge to contain the total document count in the segments
-    
_validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
-        computeRealtimeTotalDocumentInSegments(segmentsZKMetadata, 
countHLCSegments));
 
-    if (streamConfig.hasLowLevelConsumerType()
-        && 
_llcRealtimeSegmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()) {
+    // Update the total document count gauge
+    // Count HLC segments if there is no LLC consumer configured
+    boolean hasLLC = streamConfig.hasLowLevelConsumerType();
+    _validationMetrics.updateTotalDocumentCountGauge(realtimeTableName,
+        computeTotalDocumentCount(segmentsZKMetadata, hasLLC));

Review Comment:
   So the assumption here is that there's no table table with both HLC's and 
LLC's, right? If there's such scenario, this changes is backward incompatible.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java:
##########
@@ -51,8 +51,8 @@ public abstract class RealtimeIndexOffHeapMemoryManager 
implements PinotDataBuff
   protected RealtimeIndexOffHeapMemoryManager(ServerMetrics serverMetrics, 
String segmentName) {
     _serverMetrics = serverMetrics;
     _segmentName = segmentName;
-    if (SegmentName.isLowLevelConsumerSegmentName(segmentName)) {
-      LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    LLCSegmentName llcSegmentName = 
LLCSegmentName.getLLCSegmentName(segmentName);
+    if (llcSegmentName != null) {
       _tableName = llcSegmentName.getTableName();

Review Comment:
   ```suggestion
       if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
         _tableName = LLCSegmentName.of(segmentName).getTableName();
   ```



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java:
##########
@@ -243,41 +243,42 @@ public List<PinotTaskConfig> 
generateTasks(List<TableConfig> tableConfigs) {
   }
 
   /**
-   * Fetch completed (non-consuming) segment and partition information
+   * Fetch completed (DONE/UPLOADED) segment and partition information
+   *
    * @param realtimeTableName the realtime table name
-   * @param completedSegmentsZKMetadata list for collecting the completed 
segments ZK metadata
-   * @param partitionToLatestCompletedSegmentName map for collecting the 
partitionId to the latest completed segment
-   *                                              name
+   * @param completedSegmentsZKMetadata list for collecting the completed 
(DONE/UPLOADED) segments ZK metadata
+   * @param partitionToLatestLLCSegmentName map for collecting the partitionId 
to the latest LLC segment name
    * @param allPartitions set for collecting all partition ids
    */
   private void getCompletedSegmentsInfo(String realtimeTableName, 
List<SegmentZKMetadata> completedSegmentsZKMetadata,
-      Map<Integer, String> partitionToLatestCompletedSegmentName, Set<Integer> 
allPartitions) {
+      Map<Integer, String> partitionToLatestLLCSegmentName, Set<Integer> 
allPartitions) {
     List<SegmentZKMetadata> segmentsZKMetadata = 
_clusterInfoAccessor.getSegmentsZKMetadata(realtimeTableName);
 
     Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
     for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
-      LLCSegmentName llcSegmentName = new 
LLCSegmentName(segmentZKMetadata.getSegmentName());
-      allPartitions.add(llcSegmentName.getPartitionGroupId());
-
-      if (segmentZKMetadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
+      Segment.Realtime.Status status = segmentZKMetadata.getStatus();
+      if (status.isCompleted()) {
         completedSegmentsZKMetadata.add(segmentZKMetadata);
-        latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(),
-            (partitionGroupId, latestLLCSegmentName) -> {
-              if (latestLLCSegmentName == null) {
-                return llcSegmentName;
-              } else {
-                if (llcSegmentName.getSequenceNumber() > 
latestLLCSegmentName.getSequenceNumber()) {
-                  return llcSegmentName;
-                } else {
-                  return latestLLCSegmentName;
-                }
-              }
-            });
+      }
+      LLCSegmentName llcSegmentName = 
LLCSegmentName.getLLCSegmentName(segmentZKMetadata.getSegmentName());
+      if (llcSegmentName != null) {

Review Comment:
   Can you add a comment after if statement like: `// UPLOADED segments that 
don't conform to llc segment names`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to