This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d580bbcaec Exposing consumer's record lag in /consumingSegmentsInfo
(#9515)
d580bbcaec is described below
commit d580bbcaec42281c7bc0667c4db228354b56a889
Author: Navina Ramesh <[email protected]>
AuthorDate: Tue Oct 18 23:07:29 2022 +0530
Exposing consumer's record lag in /consumingSegmentsInfo (#9515)
Re-work of the PR : #8280
Changes in this PR:
/consumingSegmentsInfo API was moved into PinotRealtimeTableResource .
This returns additional per-partition map for upstream latest record offset
and partition lag (when calculated)
Introduces new API in StreamMetadataProvider#getCurrentPartitionLagState
This PR only adds support for Kafka.
Any stream connector that hasn't implemented this method will return
NOT_CALCULATED for these new lag status
Additionally, connectors can extend PartitionLagState interface to return
more custom lag definitions
---
.../restlet/resources/SegmentConsumerInfo.java | 42 +++++++++++++++-
.../api/resources/PinotRealtimeTableResource.java | 49 ++++++++++++++++++-
.../api/resources/PinotSegmentRestletResource.java | 31 ------------
.../util/ConsumingSegmentInfoReader.java | 34 ++++++++++++-
.../ConsumingSegmentInfoReaderStatelessTest.java | 40 ++++++++++++----
.../realtime/HLRealtimeSegmentDataManager.java | 13 +++++
.../realtime/LLRealtimeSegmentDataManager.java | 19 ++++++++
.../realtime/RealtimeSegmentDataManager.java | 14 ++++++
.../stream/kafka20/KafkaConsumerPartitionLag.java | 34 +++++++++++++
.../kafka20/KafkaStreamMetadataProvider.java | 21 ++++++++
.../pinot/server/api/resources/DebugResource.java | 21 ++++++--
.../pinot/server/api/resources/TablesResource.java | 29 +++++++++--
.../pinot/spi/stream/ConsumerPartitionState.java | 56 ++++++++++++++++++++++
.../apache/pinot/spi/stream/PartitionLagState.java | 38 +++++++++++++++
.../pinot/spi/stream/StreamMetadataProvider.java | 12 +++++
15 files changed, 400 insertions(+), 53 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
index 83e043311c..e77bb97d82 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
@@ -32,15 +32,18 @@ public class SegmentConsumerInfo {
private final String _consumerState;
private final long _lastConsumedTimestamp;
private final Map<String, String> _partitionToOffsetMap;
+ private final PartitionOffsetInfo _partitionOffsetInfo;
public SegmentConsumerInfo(@JsonProperty("segmentName") String segmentName,
@JsonProperty("consumerState") String consumerState,
@JsonProperty("lastConsumedTimestamp") long lastConsumedTimestamp,
- @JsonProperty("partitionToOffsetMap") Map<String, String>
partitionToOffsetMap) {
+ @JsonProperty("partitionToOffsetMap") Map<String, String>
partitionToOffsetMap,
+ @JsonProperty("partitionOffsetInfo") PartitionOffsetInfo
partitionOffsetInfo) {
_segmentName = segmentName;
_consumerState = consumerState;
_lastConsumedTimestamp = lastConsumedTimestamp;
_partitionToOffsetMap = partitionToOffsetMap;
+ _partitionOffsetInfo = partitionOffsetInfo;
}
public String getSegmentName() {
@@ -58,4 +61,41 @@ public class SegmentConsumerInfo {
public Map<String, String> getPartitionToOffsetMap() {
return _partitionToOffsetMap;
}
+
+ public PartitionOffsetInfo getPartitionOffsetInfo() {
+ return _partitionOffsetInfo;
+ }
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ static public class PartitionOffsetInfo {
+ @JsonProperty("currentOffsets")
+ public Map<String, String> _currentOffsets;
+
+ @JsonProperty("recordsLag")
+ public Map<String, String> _recordsLag;
+
+ @JsonProperty("latestUpstreamOffsets")
+ public Map<String, String> _latestUpstreamOffsets;
+
+ public PartitionOffsetInfo(
+ @JsonProperty("currentOffsets") Map<String, String> currentOffsets,
+ @JsonProperty("latestUpstreamOffsets") Map<String, String>
latestUpstreamOffsets,
+ @JsonProperty("recordsLag") Map<String, String> recordsLag) {
+ _currentOffsets = currentOffsets;
+ _latestUpstreamOffsets = latestUpstreamOffsets;
+ _recordsLag = recordsLag;
+ }
+
+ public Map<String, String> getCurrentOffsets() {
+ return _currentOffsets;
+ }
+
+ public Map<String, String> getRecordsLag() {
+ return _recordsLag;
+ }
+
+ public Map<String, String> getLatestUpstreamOffsets() {
+ return _latestUpstreamOffsets;
+ }
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 715d65bede..f33433e164 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -22,9 +22,12 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Authorization;
import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
+import java.util.concurrent.Executor;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
@@ -35,10 +38,14 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.helix.model.IdealState;
+import org.apache.pinot.controller.ControllerConf;
import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +60,15 @@ import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
public class PinotRealtimeTableResource {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotRealtimeTableResource.class);
+ @Inject
+ ControllerConf _controllerConf;
+
+ @Inject
+ Executor _executor;
+
+ @Inject
+ HttpConnectionManager _connectionManager;
+
@Inject
PinotHelixResourceManager _pinotHelixResourceManager;
@@ -123,7 +139,7 @@ public class PinotRealtimeTableResource {
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Return pause status of a realtime table",
notes = "Return pause status of a realtime table along with list of
consuming segments.")
- public Response getConsumptionStatus(
+ public Response getPauseStatus(
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName) {
String tableNameWithType =
TableNameBuilder.REALTIME.tableNameWithType(tableName);
validate(tableNameWithType);
@@ -134,6 +150,37 @@ public class PinotRealtimeTableResource {
}
}
+ @GET
+ @Path("/tables/{tableName}/consumingSegmentsInfo")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Returns state of consuming segments", notes = "Gets
the status of consumers from all servers."
+ + "Note that the partitionToOffsetMap has been deprecated and will be
removed in the next release. The info is "
+ + "now embedded within each partition's state as currentOffsetsMap.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 404, message = "Table not found"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap
getConsumingSegmentsInfo(
+ @ApiParam(value = "Realtime table name with or without type", required =
true,
+ example = "myTable | myTable_REALTIME") @PathParam("tableName")
String realtimeTableName) {
+ try {
+ TableType tableType =
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+ if (TableType.OFFLINE == tableType) {
+ throw new IllegalStateException("Cannot get consuming segments info
for OFFLINE table: " + realtimeTableName);
+ }
+ String tableNameWithType =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
+ ConsumingSegmentInfoReader consumingSegmentInfoReader =
+ new ConsumingSegmentInfoReader(_executor, _connectionManager,
_pinotHelixResourceManager);
+ return consumingSegmentInfoReader
+ .getConsumingSegmentsInfo(tableNameWithType,
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ } catch (Exception e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to get consuming segments info for table %s.
%s", realtimeTableName, e.getMessage()),
+ Response.Status.INTERNAL_SERVER_ERROR, e);
+ }
+ }
+
private void validate(String tableNameWithType) {
IdealState idealState =
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
if (idealState == null) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 3badc1f988..ea77b9caec 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -77,7 +77,6 @@ import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import org.apache.pinot.controller.util.CompletionServiceHelper;
-import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
import org.apache.pinot.controller.util.TableMetadataReader;
import org.apache.pinot.controller.util.TableTierReader;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -1010,36 +1009,6 @@ public class PinotSegmentRestletResource {
.getSegmentsMetadata(tableNameWithType, columns,
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
}
- // TODO: Move this API into PinotTableRestletResource
- @GET
- @Path("/tables/{realtimeTableName}/consumingSegmentsInfo")
- @Produces(MediaType.APPLICATION_JSON)
- @ApiOperation(value = "Returns state of consuming segments", notes = "Gets
the status of consumers from all servers")
- @ApiResponses(value = {
- @ApiResponse(code = 200, message = "Success"),
- @ApiResponse(code = 404, message = "Table not found"),
- @ApiResponse(code = 500, message = "Internal server error")
- })
- public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap
getConsumingSegmentsInfo(
- @ApiParam(value = "Realtime table name with or without type", required =
true,
- example = "myTable | myTable_REALTIME")
@PathParam("realtimeTableName") String realtimeTableName) {
- try {
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
- if (TableType.OFFLINE == tableType) {
- throw new IllegalStateException("Cannot get consuming segments info
for OFFLINE table: " + realtimeTableName);
- }
- String tableNameWithType =
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
- ConsumingSegmentInfoReader consumingSegmentInfoReader =
- new ConsumingSegmentInfoReader(_executor, _connectionManager,
_pinotHelixResourceManager);
- return consumingSegmentInfoReader
- .getConsumingSegmentsInfo(tableNameWithType,
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
- } catch (Exception e) {
- throw new ControllerApplicationException(LOGGER,
- String.format("Failed to get consuming segments info for table %s.
%s", realtimeTableName, e.getMessage()),
- Response.Status.INTERNAL_SERVER_ERROR, e);
- }
- }
-
@POST
@Path("/segments/{tableNameWithType}/updateZKTimeInterval")
@Authenticate(AccessType.UPDATE)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
index 5c7ba349c6..8539eb89f8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
@@ -81,9 +81,12 @@ public class ConsumingSegmentInfoReader {
for (Map.Entry<String, List<SegmentConsumerInfo>> entry :
serverToSegmentConsumerInfoMap.entrySet()) {
String serverName = entry.getKey();
for (SegmentConsumerInfo info : entry.getValue()) {
+ SegmentConsumerInfo.PartitionOffsetInfo partitionOffsetInfo =
info.getPartitionOffsetInfo();
+ PartitionOffsetInfo offsetInfo = new
PartitionOffsetInfo(partitionOffsetInfo.getCurrentOffsets(),
+ partitionOffsetInfo.getLatestUpstreamOffsets(),
partitionOffsetInfo.getRecordsLag());
consumingSegmentInfoMap.computeIfAbsent(info.getSegmentName(), k ->
new ArrayList<>()).add(
new ConsumingSegmentInfo(serverName, info.getConsumerState(),
info.getLastConsumedTimestamp(),
- info.getPartitionToOffsetMap()));
+ partitionOffsetInfo.getCurrentOffsets(), offsetInfo));
}
}
// Segments which are in CONSUMING state but found no consumer on the
server
@@ -203,17 +206,44 @@ public class ConsumingSegmentInfoReader {
public String _consumerState;
@JsonProperty("lastConsumedTimestamp")
public long _lastConsumedTimestamp;
+ @Deprecated
@JsonProperty("partitionToOffsetMap")
public Map<String, String> _partitionToOffsetMap;
+ @JsonProperty("partitionOffsetInfo")
+ public PartitionOffsetInfo _partitionOffsetInfo;
+
public ConsumingSegmentInfo(@JsonProperty("serverName") String serverName,
@JsonProperty("consumerState") String consumerState,
@JsonProperty("lastConsumedTimestamp") long lastConsumedTimestamp,
- @JsonProperty("partitionToOffsetMap") Map<String, String>
partitionToOffsetMap) {
+ @JsonProperty("partitionToOffsetMap") Map<String, String>
partitionToOffsetMap,
+ @JsonProperty("partitionOffsetInfo") PartitionOffsetInfo
partitionOffsetInfo) {
_serverName = serverName;
_consumerState = consumerState;
_lastConsumedTimestamp = lastConsumedTimestamp;
_partitionToOffsetMap = partitionToOffsetMap;
+ _partitionOffsetInfo = partitionOffsetInfo;
+ }
+ }
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ static public class PartitionOffsetInfo {
+ @JsonProperty("currentOffsetsMap")
+ public Map<String, String> _currentOffsetsMap;
+
+ @JsonProperty("recordsLagMap")
+ public Map<String, String> _recordsLagMap;
+
+ @JsonProperty("latestUpstreamOffsetMap")
+ public Map<String, String> _latestUpstreamOffsetMap;
+
+ public PartitionOffsetInfo(
+ @JsonProperty("currentOffsetsMap") Map<String, String>
currentOffsetsMap,
+ @JsonProperty("latestUpstreamOffsetMap") Map<String, String>
latestUpstreamOffsetMap,
+ @JsonProperty("recordsLagMap") Map<String, String> recordsLagMap) {
+ _currentOffsetsMap = currentOffsetsMap;
+ _latestUpstreamOffsetMap = latestUpstreamOffsetMap;
+ _recordsLagMap = recordsLagMap;
}
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
index 4877390713..7565c62420 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
@@ -88,35 +88,55 @@ public class ConsumingSegmentInfoReaderStatelessTest {
Map<String, String> partitionToOffset1 = new HashMap<>();
partitionToOffset1.put("1", "150");
FakeConsumingInfoServer s0 = new FakeConsumingInfoServer(Lists
- .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0,
"CONSUMING", 0, partitionToOffset0),
- new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
partitionToOffset1)));
+ .newArrayList(
+ new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
+ partitionToOffset0, new
SegmentConsumerInfo.PartitionOffsetInfo(
+ partitionToOffset0, Collections.emptyMap(),
Collections.emptyMap())),
+ new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
+ partitionToOffset1, new
SegmentConsumerInfo.PartitionOffsetInfo(
+ partitionToOffset1, Collections.emptyMap(),
Collections.emptyMap()))));
s0.start(uriPath, createHandler(200, s0._consumerInfos, 0));
_serverMap.put("server0", s0);
// server1 - 1 consumer each for p0 and p1. CONSUMING.
FakeConsumingInfoServer s1 = new FakeConsumingInfoServer(Lists
- .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0,
"CONSUMING", 0, partitionToOffset0),
- new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
partitionToOffset1)));
+ .newArrayList(
+ new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
+ partitionToOffset0, new
SegmentConsumerInfo.PartitionOffsetInfo(
+ partitionToOffset0, Collections.emptyMap(),
Collections.emptyMap())),
+ new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
+ partitionToOffset1, new
SegmentConsumerInfo.PartitionOffsetInfo(
+ partitionToOffset1, Collections.emptyMap(),
Collections.emptyMap()))));
s1.start(uriPath, createHandler(200, s1._consumerInfos, 0));
_serverMap.put("server1", s1);
// server2 - p1 consumer CONSUMING. p0 consumer NOT_CONSUMING
FakeConsumingInfoServer s2 = new FakeConsumingInfoServer(Lists
- .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0,
"NOT_CONSUMING", 0, partitionToOffset0),
- new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
partitionToOffset1)));
+ .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0,
"NOT_CONSUMING", 0,
+ partitionToOffset0, new
SegmentConsumerInfo.PartitionOffsetInfo(
+ partitionToOffset0, Collections.emptyMap(),
Collections.emptyMap())),
+ new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
partitionToOffset1,
+ new
SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1,
Collections.emptyMap(),
+ Collections.emptyMap()))));
s2.start(uriPath, createHandler(200, s2._consumerInfos, 0));
_serverMap.put("server2", s2);
// server3 - 1 consumer for p1. No consumer for p0
FakeConsumingInfoServer s3 = new FakeConsumingInfoServer(
- Lists.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1,
"CONSUMING", 0, partitionToOffset1)));
+ Lists.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1,
"CONSUMING", 0,
+ partitionToOffset1, new
SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1,
Collections.emptyMap(),
+ Collections.emptyMap()))));
s3.start(uriPath, createHandler(200, s3._consumerInfos, 0));
_serverMap.put("server3", s3);
// server4 - unreachable/error/timeout
FakeConsumingInfoServer s4 = new FakeConsumingInfoServer(Lists
- .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0,
"CONSUMING", 0, partitionToOffset0),
- new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
partitionToOffset1)));
+ .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0,
"CONSUMING", 0,
+ partitionToOffset0, new
SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset0,
+ Collections.emptyMap(), Collections.emptyMap())),
+ new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
partitionToOffset1,
+ new
SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1,
Collections.emptyMap(),
+ Collections.emptyMap()))));
s4.start(uriPath, createHandler(200, s4._consumerInfos, TIMEOUT_MSEC *
EXTENDED_TIMEOUT_FACTOR));
_serverMap.put("server4", s4);
}
@@ -349,6 +369,6 @@ public class ConsumingSegmentInfoReaderStatelessTest {
String consumerState, String partition, String offset) {
Assert.assertTrue(serverNames.contains(info._serverName));
Assert.assertEquals(info._consumerState, consumerState);
- Assert.assertEquals(info._partitionToOffsetMap.get(partition), offset);
+
Assert.assertEquals(info._partitionOffsetInfo._currentOffsetsMap.get(partition),
offset);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 45b4d0291a..aa16a32147 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -51,6 +51,8 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.PartitionLagState;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -424,6 +426,17 @@ public class HLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
throw new UnsupportedOperationException();
}
+ @Override
+ public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, PartitionLagState> getPartitionToLagState(
+ Map<String, ConsumerPartitionState> consumerPartitionStateMap) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public String getSegmentName() {
return _segmentName;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 5f2d733fe6..f395d71e56 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -71,11 +71,13 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumer;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionLagState;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.PermanentConsumerException;
import org.apache.pinot.spi.stream.RowMetadata;
@@ -819,6 +821,23 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
return _lastLogTime;
}
+ @Override
+ public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
+ String partitionGroupId = String.valueOf(_partitionGroupId);
+ return Collections.singletonMap(partitionGroupId, new
ConsumerPartitionState(partitionGroupId, getCurrentOffset(),
+ getLastConsumedTimestamp(), fetchLatestStreamOffset(5_000)));
+ }
+
+ @Override
+ public Map<String, PartitionLagState> getPartitionToLagState(
+ Map<String, ConsumerPartitionState> consumerPartitionStateMap) {
+ if (_partitionMetadataProvider == null) {
+ createPartitionMetadataProvider("Get Partition Lag State");
+ }
+ ;
+ return
_partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
+ }
+
public StreamPartitionMsgOffset getCurrentOffset() {
return _currentOffset;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index e40376a9e0..e1e06c9081 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -25,6 +25,8 @@ import
org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
import org.apache.pinot.segment.spi.MutableSegment;
import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.PartitionLagState;
import org.apache.pinot.spi.utils.CommonConstants.ConsumerState;
@@ -54,5 +56,17 @@ public abstract class RealtimeSegmentDataManager extends
SegmentDataManager {
*/
public abstract ConsumerState getConsumerState();
+ /**
+ * @return Timestamp at which the last record was indexed
+ */
public abstract long getLastConsumedTimestamp();
+
+ /**
+ * @return Per-partition consumer's status, which typically includes last
consumed message timestamp,
+ * latest available upstream offset etc
+ */
+ public abstract Map<String, ConsumerPartitionState>
getConsumerPartitionState();
+
+ public abstract Map<String, PartitionLagState> getPartitionToLagState(
+ Map<String, ConsumerPartitionState> consumerPartitionStateMap);
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java
new file mode 100644
index 0000000000..709c2a7d79
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka20;
+
+import org.apache.pinot.spi.stream.PartitionLagState;
+
+
+public class KafkaConsumerPartitionLag extends PartitionLagState {
+ private final String _recordsLag;
+
+ public KafkaConsumerPartitionLag(String recordsLag) {
+ _recordsLag = recordsLag;
+ }
+
+ public String getRecordsLag() {
+ return _recordsLag;
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 282a862dd6..c6fa7ff218 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -23,10 +23,14 @@ import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionLagState;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -104,6 +108,23 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
}
}
+ @Override
+ public Map<String, PartitionLagState> getCurrentPartitionLagState(
+ Map<String, ConsumerPartitionState> currentPartitionStateMap) {
+ Map<String, PartitionLagState> perPartitionLag = new HashMap<>();
+ for (Map.Entry<String, ConsumerPartitionState> entry:
currentPartitionStateMap.entrySet()) {
+ StreamPartitionMsgOffset currentOffset =
entry.getValue().getCurrentOffset();
+ StreamPartitionMsgOffset upstreamLatest =
entry.getValue().getUpstreamLatestOffset();
+ if (currentOffset instanceof LongMsgOffset && upstreamLatest instanceof
LongMsgOffset) {
+ long offsetLag = ((LongMsgOffset) upstreamLatest).getOffset() -
((LongMsgOffset) currentOffset).getOffset();
+ perPartitionLag.put(entry.getKey(), new
KafkaConsumerPartitionLag(String.valueOf(offsetLag)));
+ } else {
+ perPartitionLag.put(entry.getKey(), new
KafkaConsumerPartitionLag("UNKNOWN"));
+ }
+ }
+ return perPartitionLag;
+ }
+
@Override
public void close()
throws IOException {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
index e53b10c864..9d0a1976fc 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
@@ -49,6 +50,8 @@ import
org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.server.starter.ServerInstance;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.PartitionLagState;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
@@ -134,11 +137,23 @@ public class DebugResource {
SegmentConsumerInfo segmentConsumerInfo = null;
if (tableType == TableType.REALTIME) {
RealtimeSegmentDataManager realtimeSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- String segmentName = segmentDataManager.getSegmentName();
+ Map<String, ConsumerPartitionState> partitionStateMap =
realtimeSegmentDataManager.getConsumerPartitionState();
+ Map<String, PartitionLagState> partitionLagStateMap =
+ realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap);
+ Map<String, String> partitionToCurrentOffsetMap =
realtimeSegmentDataManager.getPartitionToCurrentOffset();
segmentConsumerInfo =
- new SegmentConsumerInfo(segmentName,
realtimeSegmentDataManager.getConsumerState().toString(),
+ new SegmentConsumerInfo(
+ segmentDataManager.getSegmentName(),
+ realtimeSegmentDataManager.getConsumerState().toString(),
realtimeSegmentDataManager.getLastConsumedTimestamp(),
- realtimeSegmentDataManager.getPartitionToCurrentOffset());
+ partitionToCurrentOffsetMap,
+ new
SegmentConsumerInfo.PartitionOffsetInfo(partitionToCurrentOffsetMap,
+ partitionStateMap.entrySet().stream().collect(
+ Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().getUpstreamLatestOffset().toString())
+ ),
+ partitionLagStateMap.entrySet().stream().collect(
+ Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().getRecordsLag())
+ )));
}
return segmentConsumerInfo;
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 6d1e8b847d..8d67bf5ce7 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Named;
import javax.ws.rs.DefaultValue;
@@ -88,6 +89,8 @@ import
org.apache.pinot.server.starter.helix.AdminApiApplication;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.PartitionLagState;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -492,11 +495,12 @@ public class TablesResource {
@Path("tables/{realtimeTableName}/consumingSegmentsInfo")
@Produces(MediaType.APPLICATION_JSON)
@ApiOperation(value = "Get the info for consumers of this REALTIME table",
- notes = "Get consumers info from the table data manager")
+ notes = "Get consumers info from the table data manager. Note that the
partitionToOffsetMap has been deprecated "
+ + "and will be removed in the next release. The info is now embedded
within each partition's state as "
+ + "currentOffsetsMap")
public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
@ApiParam(value = "Name of the REALTIME table", required = true)
@PathParam("realtimeTableName")
String realtimeTableName) {
-
TableType tableType =
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
if (TableType.OFFLINE == tableType) {
throw new WebApplicationException("Cannot get consuming segment info for
OFFLINE table: " + realtimeTableName);
@@ -511,11 +515,26 @@ public class TablesResource {
for (SegmentDataManager segmentDataManager : segmentDataManagers) {
if (segmentDataManager instanceof RealtimeSegmentDataManager) {
RealtimeSegmentDataManager realtimeSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- String segmentName = segmentDataManager.getSegmentName();
+ Map<String, ConsumerPartitionState> partitionStateMap =
+ realtimeSegmentDataManager.getConsumerPartitionState();
+ Map<String, PartitionLagState> partitionLagStateMap =
+
realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap);
+ @Deprecated Map<String, String> partitiionToOffsetMap =
+ realtimeSegmentDataManager.getPartitionToCurrentOffset();
segmentConsumerInfoList.add(
- new SegmentConsumerInfo(segmentName,
realtimeSegmentDataManager.getConsumerState().toString(),
+ new SegmentConsumerInfo(segmentDataManager.getSegmentName(),
+ realtimeSegmentDataManager.getConsumerState().toString(),
realtimeSegmentDataManager.getLastConsumedTimestamp(),
- realtimeSegmentDataManager.getPartitionToCurrentOffset()));
+ partitiionToOffsetMap,
+ new SegmentConsumerInfo.PartitionOffsetInfo(
+ partitiionToOffsetMap,
+ partitionStateMap.entrySet().stream().collect(
+ Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().getUpstreamLatestOffset().toString())
+ ),
+ partitionLagStateMap.entrySet().stream().collect(
+ Collectors.toMap(Map.Entry::getKey, e ->
e.getValue().getRecordsLag())
+ )))
+ );
}
}
} catch (Exception e) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java
new file mode 100644
index 0000000000..9da90d5bff
--- /dev/null
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * Container for holding the current consumption state at a per-partition level
+ */
+public class ConsumerPartitionState {
+ private final String _partitionId;
+ private final StreamPartitionMsgOffset _currentOffset;
+ private final long _lastProcessedTimeMs;
+ private final StreamPartitionMsgOffset _upstreamLatestOffset;
+
+ public ConsumerPartitionState(String partitionId, StreamPartitionMsgOffset
currentOffset, long lastProcessedTimeMs,
+ @Nullable StreamPartitionMsgOffset upstreamLatestOffset) {
+ _partitionId = partitionId;
+ _currentOffset = currentOffset;
+ _lastProcessedTimeMs = lastProcessedTimeMs;
+ _upstreamLatestOffset = upstreamLatestOffset;
+ }
+
+ public StreamPartitionMsgOffset getCurrentOffset() {
+ return _currentOffset;
+ }
+
+ public long getLastProcessedTimeMs() {
+ return _lastProcessedTimeMs;
+ }
+
+ public String getPartitionId() {
+ return _partitionId;
+ }
+
+ public StreamPartitionMsgOffset getUpstreamLatestOffset() {
+ return _upstreamLatestOffset;
+ }
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
new file mode 100644
index 0000000000..21e2222403
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.spi.stream;
+
+/**
+ * Container that can be used for holding per-partition consumer lag
calculated along standard dimensions such as
+ * record offset, ingestion time etc.
+ */
+public class PartitionLagState {
+ protected final static String NOT_CALCULATED = "NOT_CALCULATED";
+
+ /**
+ * Defines how far away the current record's offset / pointer is from
upstream latest record
+ * The distance is based on actual record count.
+ */
+ public String getRecordsLag() {
+ return NOT_CALCULATED;
+ }
+
+ // TODO: Define record availability lag ($latest_record_consumption_time -
$latest_record_ingestion_time)
+}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 6f79df6fe8..94abc3ca6d 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -21,7 +21,9 @@ package org.apache.pinot.spi.stream;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -87,4 +89,14 @@ public interface StreamMetadataProvider extends Closeable {
}
return newPartitionGroupMetadataList;
}
+
+ default Map<String, PartitionLagState> getCurrentPartitionLagState(
+ Map<String, ConsumerPartitionState> currentPartitionStateMap) {
+ Map<String, PartitionLagState> result = new HashMap<>();
+ PartitionLagState unknownLagState = new UnknownLagState();
+ currentPartitionStateMap.forEach((k, v) -> result.put(k, unknownLagState));
+ return result;
+ }
+
+ class UnknownLagState extends PartitionLagState { }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]