Jackie-Jiang commented on code in PR #9621:
URL: https://github.com/apache/pinot/pull/9621#discussion_r1005002974


##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -517,8 +516,12 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
           RealtimeSegmentDataManager realtimeSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
           Map<String, ConsumerPartitionState> partitionStateMap =
               realtimeSegmentDataManager.getConsumerPartitionState();
-          Map<String, PartitionLagState> partitionLagStateMap =
-              
realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap);
+          Map<String, String> recordsLagMap = new HashMap<>();
+          Map<String, String> availabilityLagMap = new HashMap<>();

Review Comment:
   (minor) Use the full name: `recordAvailabilityLagMsMap`



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java:
##########
@@ -23,12 +23,19 @@
 
 public class KafkaConsumerPartitionLag extends PartitionLagState {
   private final String _recordsLag;
+  private final String _recordAvailabilityLag;
 
-  public KafkaConsumerPartitionLag(String recordsLag) {
+  public KafkaConsumerPartitionLag(String recordsLag, String 
recordAvailabilityLag) {

Review Comment:
   ```suggestion
     public KafkaConsumerPartitionLag(String recordsLag, String 
recordAvailabilityLagMs) {
   ```



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java:
##########
@@ -113,14 +114,27 @@ public Map<String, PartitionLagState> 
getCurrentPartitionLagState(
       Map<String, ConsumerPartitionState> currentPartitionStateMap) {
     Map<String, PartitionLagState> perPartitionLag = new HashMap<>();
     for (Map.Entry<String, ConsumerPartitionState> entry: 
currentPartitionStateMap.entrySet()) {
-      StreamPartitionMsgOffset currentOffset = 
entry.getValue().getCurrentOffset();
-      StreamPartitionMsgOffset upstreamLatest = 
entry.getValue().getUpstreamLatestOffset();
+      ConsumerPartitionState partitionState = entry.getValue();
+      // Compute records-lag
+      StreamPartitionMsgOffset currentOffset = 
partitionState.getCurrentOffset();
+      StreamPartitionMsgOffset upstreamLatest = 
partitionState.getUpstreamLatestOffset();
+      String offsetLagString = "UNKNOWN";
+
       if (currentOffset instanceof LongMsgOffset && upstreamLatest instanceof 
LongMsgOffset) {
         long offsetLag = ((LongMsgOffset) upstreamLatest).getOffset() - 
((LongMsgOffset) currentOffset).getOffset();
-        perPartitionLag.put(entry.getKey(), new 
KafkaConsumerPartitionLag(String.valueOf(offsetLag)));
-      } else {
-        perPartitionLag.put(entry.getKey(), new 
KafkaConsumerPartitionLag("UNKNOWN"));
+        offsetLagString = String.valueOf(offsetLag);
       }
+
+      // Compute record-availability
+      String recordAvailabilityLag = "UNKNOWN";

Review Comment:
   (minor)
   ```suggestion
         String recordAvailabilityLagMs = "UNKNOWN";
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java:
##########
@@ -237,13 +238,18 @@ static public class PartitionOffsetInfo {
     @JsonProperty("latestUpstreamOffsetMap")
     public Map<String, String> _latestUpstreamOffsetMap;
 
+    @JsonProperty("recordsAvailabilityLagMap")
+    public Map<String, String> _availabilityLagMap;
+
     public PartitionOffsetInfo(
         @JsonProperty("currentOffsetsMap") Map<String, String> 
currentOffsetsMap,
         @JsonProperty("latestUpstreamOffsetMap") Map<String, String> 
latestUpstreamOffsetMap,
-        @JsonProperty("recordsLagMap") Map<String, String> recordsLagMap) {
+        @JsonProperty("recordsLagMap") Map<String, String> recordsLagMap,
+        @JsonProperty("recordsAvailabilityLagMap") Map<String, String> 
availabilityLagMap) {

Review Comment:
   ```suggestion
           @JsonProperty("recordsAvailabilityLagMsMap") Map<String, String> 
recordAvailabilityLagMsMap) {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java:
##########
@@ -237,13 +238,18 @@ static public class PartitionOffsetInfo {
     @JsonProperty("latestUpstreamOffsetMap")
     public Map<String, String> _latestUpstreamOffsetMap;
 
+    @JsonProperty("recordsAvailabilityLagMap")
+    public Map<String, String> _availabilityLagMap;

Review Comment:
   ```suggestion
       @JsonProperty("recordsAvailabilityLagMsMap")
       public Map<String, String> _recordAvailabilityLagMsMap;
   ```



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java:
##########
@@ -23,12 +23,19 @@
 
 public class KafkaConsumerPartitionLag extends PartitionLagState {
   private final String _recordsLag;
+  private final String _recordAvailabilityLag;

Review Comment:
   ```suggestion
     private final String _recordAvailabilityMsLag;
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java:
##########
@@ -97,5 +102,9 @@ public Map<String, String> getRecordsLag() {
     public Map<String, String> getLatestUpstreamOffsets() {
       return _latestUpstreamOffsets;
     }
+
+    public Map<String, String> getAvailabilityLag() {
+      return _availabilityLag;

Review Comment:
   ```suggestion
       public Map<String, String> getRecordAvailabilityLagMs() {
         return _recordAvailabilityLagMs;
   ```



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java:
##########
@@ -168,22 +168,24 @@ private SegmentConsumerInfo 
getSegmentConsumerInfo(SegmentDataManager segmentDat
     if (tableType == TableType.REALTIME) {
       RealtimeSegmentDataManager realtimeSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
       Map<String, ConsumerPartitionState> partitionStateMap = 
realtimeSegmentDataManager.getConsumerPartitionState();
-      Map<String, PartitionLagState> partitionLagStateMap =
-          realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap);
-      Map<String, String> partitionToCurrentOffsetMap = 
realtimeSegmentDataManager.getPartitionToCurrentOffset();
+      Map<String, String> currentOffsets = 
realtimeSegmentDataManager.getPartitionToCurrentOffset();
+      Map<String, String> upstreamLatest = 
partitionStateMap.entrySet().stream().collect(
+          Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().getUpstreamLatestOffset().toString()));
+      Map<String, String> recordsLagMap = new HashMap<>();
+      Map<String, String> recordsAvailabilityMap = new HashMap<>();

Review Comment:
   ```suggestion
         Map<String, String> recordAvailabilityLagMsMap = new HashMap<>();
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java:
##########
@@ -68,23 +68,28 @@ public PartitionOffsetInfo getPartitionOffsetInfo() {
 
   @JsonIgnoreProperties(ignoreUnknown = true)
   static public class PartitionOffsetInfo {
-      @JsonProperty("currentOffsets")
-      public Map<String, String> _currentOffsets;
-
-      @JsonProperty("recordsLag")
-      public Map<String, String> _recordsLag;
-
-      @JsonProperty("latestUpstreamOffsets")
-      public Map<String, String> _latestUpstreamOffsets;
-
-      public PartitionOffsetInfo(
-          @JsonProperty("currentOffsets") Map<String, String> currentOffsets,
-          @JsonProperty("latestUpstreamOffsets") Map<String, String> 
latestUpstreamOffsets,
-          @JsonProperty("recordsLag") Map<String, String> recordsLag) {
-        _currentOffsets = currentOffsets;
-        _latestUpstreamOffsets = latestUpstreamOffsets;
-        _recordsLag = recordsLag;
-      }
+    @JsonProperty("currentOffsets")
+    public Map<String, String> _currentOffsets;
+
+    @JsonProperty("recordsLag")
+    public Map<String, String> _recordsLag;
+
+    @JsonProperty("latestUpstreamOffsets")
+    public Map<String, String> _latestUpstreamOffsets;
+
+    @JsonProperty("recordsAvailabilityLag")
+    public Map<String, String> _availabilityLag;
+
+    public PartitionOffsetInfo(
+        @JsonProperty("currentOffsets") Map<String, String> currentOffsets,
+        @JsonProperty("latestUpstreamOffsets") Map<String, String> 
latestUpstreamOffsets,
+        @JsonProperty("recordsLag") Map<String, String> recordsLag,
+        @JsonProperty("recordsAvailabilityLag") Map<String, String> 
availabilityLag) {

Review Comment:
   ```suggestion
           @JsonProperty("recordAvailabilityLagMs") Map<String, String> 
recordAvailabilityLagMs) {
   ```



##########
pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java:
##########
@@ -68,23 +68,28 @@ public PartitionOffsetInfo getPartitionOffsetInfo() {
 
   @JsonIgnoreProperties(ignoreUnknown = true)
   static public class PartitionOffsetInfo {
-      @JsonProperty("currentOffsets")
-      public Map<String, String> _currentOffsets;
-
-      @JsonProperty("recordsLag")
-      public Map<String, String> _recordsLag;
-
-      @JsonProperty("latestUpstreamOffsets")
-      public Map<String, String> _latestUpstreamOffsets;
-
-      public PartitionOffsetInfo(
-          @JsonProperty("currentOffsets") Map<String, String> currentOffsets,
-          @JsonProperty("latestUpstreamOffsets") Map<String, String> 
latestUpstreamOffsets,
-          @JsonProperty("recordsLag") Map<String, String> recordsLag) {
-        _currentOffsets = currentOffsets;
-        _latestUpstreamOffsets = latestUpstreamOffsets;
-        _recordsLag = recordsLag;
-      }
+    @JsonProperty("currentOffsets")
+    public Map<String, String> _currentOffsets;
+
+    @JsonProperty("recordsLag")
+    public Map<String, String> _recordsLag;
+
+    @JsonProperty("latestUpstreamOffsets")
+    public Map<String, String> _latestUpstreamOffsets;
+
+    @JsonProperty("recordsAvailabilityLag")
+    public Map<String, String> _availabilityLag;

Review Comment:
   ```suggestion
       @JsonProperty("recordAvailabilityLagMs")
       public Map<String, String> _recordAvailabilityLagMs;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to