This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3b45dd6ed2 Modify consumingSegmentsInfo endpoint to indicate how many
servers failed (#12523)
3b45dd6ed2 is described below
commit 3b45dd6ed205c55fc58eefff91d592aeb35882b4
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Fri Mar 15 15:55:42 2024 -0700
Modify consumingSegmentsInfo endpoint to indicate how many servers failed
(#12523)
* Modify consumingSegmentsInfo endpoint to indicate how many servers failed
* Add unparsable respond
---
.../util/ConsumingSegmentInfoReader.java | 34 ++++++++++++++++++----
.../helix/RealtimeConsumerMonitorTest.java | 2 +-
2 files changed, 30 insertions(+), 6 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
index 26dab01955..9218b50330 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
@@ -75,8 +75,9 @@ public class ConsumingSegmentInfoReader {
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
// Gets info for segments with LLRealtimeSegmentDataManager found in the
table data manager
- Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap =
+ ConsumingSegmentsInfoFromServersResponse response =
getConsumingSegmentsInfoFromServers(tableNameWithType,
serverToEndpoints, timeoutMs);
+ Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap =
response._serverToSegmentConsumerInfoMap;
TreeMap<String, List<ConsumingSegmentInfo>> consumingSegmentInfoMap = new
TreeMap<>();
for (Map.Entry<String, List<SegmentConsumerInfo>> entry :
serverToSegmentConsumerInfoMap.entrySet()) {
String serverName = entry.getKey();
@@ -93,14 +94,14 @@ public class ConsumingSegmentInfoReader {
// Segments which are in CONSUMING state but found no consumer on the
server
Set<String> consumingSegments =
_pinotHelixResourceManager.getConsumingSegments(tableNameWithType);
consumingSegments.forEach(c -> consumingSegmentInfoMap.putIfAbsent(c,
Collections.emptyList()));
- return new ConsumingSegmentsInfoMap(consumingSegmentInfoMap);
+ return new ConsumingSegmentsInfoMap(consumingSegmentInfoMap,
response._failedResponseCount, response._failedParses);
}
/**
* This method makes a MultiGet call to all servers to get the consuming
segments info.
* @return servers queried and a list of consumer status information for
consuming segments on that server
*/
- private Map<String, List<SegmentConsumerInfo>>
getConsumingSegmentsInfoFromServers(String tableNameWithType,
+ private ConsumingSegmentsInfoFromServersResponse
getConsumingSegmentsInfoFromServers(String tableNameWithType,
BiMap<String, String> serverToEndpoints, int timeoutMs) {
LOGGER.info("Reading consuming segment info from servers: {} for table:
{}", serverToEndpoints.keySet(),
tableNameWithType);
@@ -132,7 +133,8 @@ public class ConsumingSegmentInfoReader {
if (failedParses != 0) {
LOGGER.warn("Failed to parse {} / {} segment size info responses from
servers.", failedParses, serverUrls.size());
}
- return serverToConsumingSegmentInfoList;
+ return new ConsumingSegmentsInfoFromServersResponse(
+ serverToConsumingSegmentInfoList,
serviceResponse._failedResponseCount, failedParses);
}
private String generateServerURL(String tableNameWithType, String endpoint) {
@@ -189,10 +191,18 @@ public class ConsumingSegmentInfoReader {
@JsonIgnoreProperties(ignoreUnknown = true)
static public class ConsumingSegmentsInfoMap {
public TreeMap<String, List<ConsumingSegmentInfo>>
_segmentToConsumingInfoMap;
+ @JsonProperty("serversFailingToRespond")
+ public int _serversFailingToRespond;
+ @JsonProperty("serversUnparsableRespond")
+ public int _serversUnparsableRespond;
public ConsumingSegmentsInfoMap(@JsonProperty("segmentToConsumingInfoMap")
- TreeMap<String, List<ConsumingSegmentInfo>> segmentToConsumingInfoMap)
{
+ TreeMap<String, List<ConsumingSegmentInfo>> segmentToConsumingInfoMap,
+ @JsonProperty("serversFailingToRespond") int serversFailingToRespond,
+ @JsonProperty("serversUnparsableRespond") int
serversUnparsableRespond) {
_segmentToConsumingInfoMap = segmentToConsumingInfoMap;
+ _serversFailingToRespond = serversFailingToRespond;
+ _serversUnparsableRespond = serversUnparsableRespond;
}
}
@@ -254,4 +264,18 @@ public class ConsumingSegmentInfoReader {
_availabilityLagMap = availabilityLagMsMap;
}
}
+
+ public static class ConsumingSegmentsInfoFromServersResponse {
+ private final Map<String, List<SegmentConsumerInfo>>
_serverToSegmentConsumerInfoMap;
+ private final int _failedResponseCount;
+ private final int _failedParses;
+
+ public ConsumingSegmentsInfoFromServersResponse(
+ Map<String, List<SegmentConsumerInfo>> serverToSegmentConsumerInfoMap,
int failedResponseCount,
+ int failedParses) {
+ _serverToSegmentConsumerInfoMap = serverToSegmentConsumerInfoMap;
+ _failedResponseCount = failedResponseCount;
+ _failedParses = failedParses;
+ }
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
index 28fd0ec795..d4298a65a3 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/RealtimeConsumerMonitorTest.java
@@ -137,7 +137,7 @@ public class RealtimeConsumerMonitorTest {
ConsumingSegmentInfoReader consumingSegmentReader =
mock(ConsumingSegmentInfoReader.class);
when(consumingSegmentReader.getConsumingSegmentsInfo(tableName, 10000))
- .thenReturn(new
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response));
+ .thenReturn(new
ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap(response, 0, 0));
RealtimeConsumerMonitor realtimeConsumerMonitor =
new RealtimeConsumerMonitor(config, helixResourceManager,
leadControllerManager,
controllerMetrics, consumingSegmentReader);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]