This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b45dd6ed2 Modify consumingSegmentsInfo endpoint to indicate how many 
servers failed (#12523)
3b45dd6ed2 is described below

commit 3b45dd6ed205c55fc58eefff91d592aeb35882b4
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Fri Mar 15 15:55:42 2024 -0700

    Modify consumingSegmentsInfo endpoint to indicate how many servers failed 
(#12523)
    
    * Modify consumingSegmentsInfo endpoint to indicate how many servers failed
    
    * Add unparsable respond
---
 .../util/ConsumingSegmentInfoReader.java           | 34 ++++++++++++++++++----
 .../helix/RealtimeConsumerMonitorTest.java         |  2 +-
 2 files changed, 30 insertions(+), 6 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
index 26dab01955..9218b50330 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
@@ -75,8 +75,9 @@ public class ConsumingSegmentInfoReader {
         
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
 
     // Gets info for segments with LLRealtimeSegmentDataManager found in the 
table data manager
-    Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap =
+    ConsumingSegmentsInfoFromServersResponse response =
         getConsumingSegmentsInfoFromServers(tableNameWithType, 
serverToEndpoints, timeoutMs);
+    Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap = 
response._serverToSegmentConsumerInfoMap;
     TreeMap<String, List<ConsumingSegmentInfo>> consumingSegmentInfoMap = new 
TreeMap<>();
     for (Map.Entry<String, List<SegmentConsumerInfo>> entry : 
serverToSegmentConsumerInfoMap.entrySet()) {
       String serverName = entry.getKey();
@@ -93,14 +94,14 @@ public class ConsumingSegmentInfoReader {
     // Segments which are in CONSUMING state but found no consumer on the 
server
     Set<String> consumingSegments = 
_pinotHelixResourceManager.getConsumingSegments(tableNameWithType);
     consumingSegments.forEach(c -> consumingSegmentInfoMap.putIfAbsent(c, 
Collections.emptyList()));
-    return new ConsumingSegmentsInfoMap(consumingSegmentInfoMap);
+    return new ConsumingSegmentsInfoMap(consumingSegmentInfoMap, 
response._failedResponseCount, response._failedParses);
   }
 
   /**
    * This method makes a MultiGet call to all servers to get the consuming 
segments info.
    * @return servers queried and a list of consumer status information for 
consuming segments on that server
    */
-  private Map<String, List<SegmentConsumerInfo>> 
getConsumingSegmentsInfoFromServers(String tableNameWithType,
+  private ConsumingSegmentsInfoFromServersResponse 
getConsumingSegmentsInfoFromServers(String tableNameWithType,
       BiMap<String, String> serverToEndpoints, int timeoutMs) {
     LOGGER.info("Reading consuming segment info from servers: {} for table: 
{}", serverToEndpoints.keySet(),
         tableNameWithType);
@@ -132,7 +133,8 @@ public class ConsumingSegmentInfoReader {
     if (failedParses != 0) {
       LOGGER.warn("Failed to parse {} / {} segment size info responses from 
servers.", failedParses, serverUrls.size());
     }
-    return serverToConsumingSegmentInfoList;
+    return new ConsumingSegmentsInfoFromServersResponse(
+        serverToConsumingSegmentInfoList, 
serviceResponse._failedResponseCount, failedParses);
   }
 
   private String generateServerURL(String tableNameWithType, String endpoint) {
@@ -189,10 +191,18 @@ public class ConsumingSegmentInfoReader {
   @JsonIgnoreProperties(ignoreUnknown = true)
   static public class ConsumingSegmentsInfoMap {
     public TreeMap<String, List<ConsumingSegmentInfo>> 
_segmentToConsumingInfoMap;
+    @JsonProperty("serversFailingToRespond")
+    public int _serversFailingToRespond;
+    @JsonProperty("serversUnparsableRespond")
+    public int _serversUnparsableRespond;
 
     public ConsumingSegmentsInfoMap(@JsonProperty("segmentToConsumingInfoMap")
-        TreeMap<String, List<ConsumingSegmentInfo>> segmentToConsumingInfoMap) 
{
+        TreeMap<String, List<ConsumingSegmentInfo>> segmentToConsumingInfoMap,
+        @JsonProperty("serversFailingToRespond") int serversFailingToRespond,
+        @JsonProperty("serversUnparsableRespond") int 
serversUnparsableRespond) {
       _segmentToConsumingInfoMap = segmentToConsumingInfoMap;
+      _serversFailingToRespond = serversFailingToRespond;
+      _serversUnparsableRespond = serversUnparsableRespond;
     }
   }
 
@@ -254,4 +264,18 @@ public class ConsumingSegmentInfoReader {
       _availabilityLagMap = availabilityLagMsMap;
     }
   }
+
+  public static class ConsumingSegmentsInfoFromServersResponse {
+    private final Map<String, List<SegmentConsumerInfo>> 
_serverToSegmentConsumerInfoMap;
+    private final int _failedResponseCount;
+    private final int _failedParses;
+
+    public ConsumingSegmentsInfoFromServersResponse(
+        Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap, 
int failedResponseCount,
+        int failedParses) {
+      _serverToSegmentConsumerInfoMap = serverToSegmentConsumerInfoMap;
+      _failedResponseCount = failedResponseCount;
+      _failedParses = failedParses;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
index 28fd0ec795..d4298a65a3 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
@@ -137,7 +137,7 @@ public class RealtimeConsumerMonitorTest {
 
     ConsumingSegmentInfoReader consumingSegmentReader = 
mock(ConsumingSegmentInfoReader.class);
     when(consumingSegmentReader.getConsumingSegmentsInfo(tableName, 10000))
-        .thenReturn(new 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response));
+        .thenReturn(new 
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response, 0, 0));
     RealtimeConsumerMonitor realtimeConsumerMonitor =
         new RealtimeConsumerMonitor(config, helixResourceManager, 
leadControllerManager,
             controllerMetrics, consumingSegmentReader);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to