Jackie-Jiang commented on code in PR #9621:
URL: https://github.com/apache/pinot/pull/9621#discussion_r1005002974
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -517,8 +516,12 @@ public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
RealtimeSegmentDataManager realtimeSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
Map<String, ConsumerPartitionState> partitionStateMap =
realtimeSegmentDataManager.getConsumerPartitionState();
- Map<String, PartitionLagState> partitionLagStateMap =
-
realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap);
+ Map<String, String> recordsLagMap = new HashMap<>();
+ Map<String, String> availabilityLagMap = new HashMap<>();
Review Comment:
(minor) Use the full name: `recordAvailabilityLagMsMap`
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java:
##########
@@ -23,12 +23,19 @@
public class KafkaConsumerPartitionLag extends PartitionLagState {
private final String _recordsLag;
+ private final String _recordAvailabilityLag;
- public KafkaConsumerPartitionLag(String recordsLag) {
+ public KafkaConsumerPartitionLag(String recordsLag, String
recordAvailabilityLag) {
Review Comment:
```suggestion
public KafkaConsumerPartitionLag(String recordsLag, String
recordAvailabilityLagMs) {
```
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java:
##########
@@ -113,14 +114,27 @@ public Map<String, PartitionLagState>
getCurrentPartitionLagState(
Map<String, ConsumerPartitionState> currentPartitionStateMap) {
Map<String, PartitionLagState> perPartitionLag = new HashMap<>();
for (Map.Entry<String, ConsumerPartitionState> entry:
currentPartitionStateMap.entrySet()) {
- StreamPartitionMsgOffset currentOffset =
entry.getValue().getCurrentOffset();
- StreamPartitionMsgOffset upstreamLatest =
entry.getValue().getUpstreamLatestOffset();
+ ConsumerPartitionState partitionState = entry.getValue();
+ // Compute records-lag
+ StreamPartitionMsgOffset currentOffset =
partitionState.getCurrentOffset();
+ StreamPartitionMsgOffset upstreamLatest =
partitionState.getUpstreamLatestOffset();
+ String offsetLagString = "UNKNOWN";
+
if (currentOffset instanceof LongMsgOffset && upstreamLatest instanceof
LongMsgOffset) {
long offsetLag = ((LongMsgOffset) upstreamLatest).getOffset() -
((LongMsgOffset) currentOffset).getOffset();
- perPartitionLag.put(entry.getKey(), new
KafkaConsumerPartitionLag(String.valueOf(offsetLag)));
- } else {
- perPartitionLag.put(entry.getKey(), new
KafkaConsumerPartitionLag("UNKNOWN"));
+ offsetLagString = String.valueOf(offsetLag);
}
+
+ // Compute record-availability
+ String recordAvailabilityLag = "UNKNOWN";
Review Comment:
(minor)
```suggestion
String recordAvailabilityLagMs = "UNKNOWN";
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java:
##########
@@ -237,13 +238,18 @@ static public class PartitionOffsetInfo {
@JsonProperty("latestUpstreamOffsetMap")
public Map<String, String> _latestUpstreamOffsetMap;
+ @JsonProperty("recordsAvailabilityLagMap")
+ public Map<String, String> _availabilityLagMap;
+
public PartitionOffsetInfo(
@JsonProperty("currentOffsetsMap") Map<String, String>
currentOffsetsMap,
@JsonProperty("latestUpstreamOffsetMap") Map<String, String>
latestUpstreamOffsetMap,
- @JsonProperty("recordsLagMap") Map<String, String> recordsLagMap) {
+ @JsonProperty("recordsLagMap") Map<String, String> recordsLagMap,
+ @JsonProperty("recordsAvailabilityLagMap") Map<String, String>
availabilityLagMap) {
Review Comment:
```suggestion
@JsonProperty("recordsAvailabilityLagMsMap") Map<String, String>
recordAvailabilityLagMsMap) {
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java:
##########
@@ -237,13 +238,18 @@ static public class PartitionOffsetInfo {
@JsonProperty("latestUpstreamOffsetMap")
public Map<String, String> _latestUpstreamOffsetMap;
+ @JsonProperty("recordsAvailabilityLagMap")
+ public Map<String, String> _availabilityLagMap;
Review Comment:
```suggestion
@JsonProperty("recordsAvailabilityLagMsMap")
public Map<String, String> _recordAvailabilityLagMsMap;
```
##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java:
##########
@@ -23,12 +23,19 @@
public class KafkaConsumerPartitionLag extends PartitionLagState {
private final String _recordsLag;
+ private final String _recordAvailabilityLag;
Review Comment:
```suggestion
private final String _recordAvailabilityMsLag;
```
##########
pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java:
##########
@@ -97,5 +102,9 @@ public Map<String, String> getRecordsLag() {
public Map<String, String> getLatestUpstreamOffsets() {
return _latestUpstreamOffsets;
}
+
+ public Map<String, String> getAvailabilityLag() {
+ return _availabilityLag;
Review Comment:
```suggestion
public Map<String, String> getRecordAvailabilityLagMs() {
return _recordAvailabilityLagMs;
```
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java:
##########
@@ -168,22 +168,24 @@ private SegmentConsumerInfo
getSegmentConsumerInfo(SegmentDataManager segmentDat
if (tableType == TableType.REALTIME) {
RealtimeSegmentDataManager realtimeSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
Map<String, ConsumerPartitionState> partitionStateMap =
realtimeSegmentDataManager.getConsumerPartitionState();
- Map<String, PartitionLagState> partitionLagStateMap =
- realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap);
- Map<String, String> partitionToCurrentOffsetMap =
realtimeSegmentDataManager.getPartitionToCurrentOffset();
+ Map<String, String> currentOffsets =
realtimeSegmentDataManager.getPartitionToCurrentOffset();
+ Map<String, String> upstreamLatest =
partitionStateMap.entrySet().stream().collect(
+ Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().getUpstreamLatestOffset().toString()));
+ Map<String, String> recordsLagMap = new HashMap<>();
+ Map<String, String> recordsAvailabilityMap = new HashMap<>();
Review Comment:
```suggestion
Map<String, String> recordAvailabilityLagMsMap = new HashMap<>();
```
##########
pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java:
##########
@@ -68,23 +68,28 @@ public PartitionOffsetInfo getPartitionOffsetInfo() {
@JsonIgnoreProperties(ignoreUnknown = true)
static public class PartitionOffsetInfo {
- @JsonProperty("currentOffsets")
- public Map<String, String> _currentOffsets;
-
- @JsonProperty("recordsLag")
- public Map<String, String> _recordsLag;
-
- @JsonProperty("latestUpstreamOffsets")
- public Map<String, String> _latestUpstreamOffsets;
-
- public PartitionOffsetInfo(
- @JsonProperty("currentOffsets") Map<String, String> currentOffsets,
- @JsonProperty("latestUpstreamOffsets") Map<String, String>
latestUpstreamOffsets,
- @JsonProperty("recordsLag") Map<String, String> recordsLag) {
- _currentOffsets = currentOffsets;
- _latestUpstreamOffsets = latestUpstreamOffsets;
- _recordsLag = recordsLag;
- }
+ @JsonProperty("currentOffsets")
+ public Map<String, String> _currentOffsets;
+
+ @JsonProperty("recordsLag")
+ public Map<String, String> _recordsLag;
+
+ @JsonProperty("latestUpstreamOffsets")
+ public Map<String, String> _latestUpstreamOffsets;
+
+ @JsonProperty("recordsAvailabilityLag")
+ public Map<String, String> _availabilityLag;
+
+ public PartitionOffsetInfo(
+ @JsonProperty("currentOffsets") Map<String, String> currentOffsets,
+ @JsonProperty("latestUpstreamOffsets") Map<String, String>
latestUpstreamOffsets,
+ @JsonProperty("recordsLag") Map<String, String> recordsLag,
+ @JsonProperty("recordsAvailabilityLag") Map<String, String>
availabilityLag) {
Review Comment:
```suggestion
@JsonProperty("recordAvailabilityLagMs") Map<String, String>
recordAvailabilityLagMs) {
```
##########
pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java:
##########
@@ -68,23 +68,28 @@ public PartitionOffsetInfo getPartitionOffsetInfo() {
@JsonIgnoreProperties(ignoreUnknown = true)
static public class PartitionOffsetInfo {
- @JsonProperty("currentOffsets")
- public Map<String, String> _currentOffsets;
-
- @JsonProperty("recordsLag")
- public Map<String, String> _recordsLag;
-
- @JsonProperty("latestUpstreamOffsets")
- public Map<String, String> _latestUpstreamOffsets;
-
- public PartitionOffsetInfo(
- @JsonProperty("currentOffsets") Map<String, String> currentOffsets,
- @JsonProperty("latestUpstreamOffsets") Map<String, String>
latestUpstreamOffsets,
- @JsonProperty("recordsLag") Map<String, String> recordsLag) {
- _currentOffsets = currentOffsets;
- _latestUpstreamOffsets = latestUpstreamOffsets;
- _recordsLag = recordsLag;
- }
+ @JsonProperty("currentOffsets")
+ public Map<String, String> _currentOffsets;
+
+ @JsonProperty("recordsLag")
+ public Map<String, String> _recordsLag;
+
+ @JsonProperty("latestUpstreamOffsets")
+ public Map<String, String> _latestUpstreamOffsets;
+
+ @JsonProperty("recordsAvailabilityLag")
+ public Map<String, String> _availabilityLag;
Review Comment:
```suggestion
@JsonProperty("recordAvailabilityLagMs")
public Map<String, String> _recordAvailabilityLagMs;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]