This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d580bbcaec Exposing consumer's record lag in /consumingSegmentsInfo 
(#9515)
d580bbcaec is described below

commit d580bbcaec42281c7bc0667c4db228354b56a889
Author: Navina Ramesh <[email protected]>
AuthorDate: Tue Oct 18 23:07:29 2022 +0530

    Exposing consumer's record lag in /consumingSegmentsInfo (#9515)
    
    Re-work of the PR : #8280
    
    Changes in this PR:
    
    /consumingSegmentsInfo API was moved into PinotRealtimeTableResource .
    This returns additional per-partition map for upstream latest record offset 
and partition lag (when calculated)
    Introduces new API in StreamMetadataProvider#getCurrentPartitionLagState
    This PR only adds support for Kafka.
    Any stream connector that hasn't implemented this method will return 
NOT_CALCULATED for these new lag status
    Additionally, connectors can extend PartitionLagState interface to return 
more custom lag definitions
---
 .../restlet/resources/SegmentConsumerInfo.java     | 42 +++++++++++++++-
 .../api/resources/PinotRealtimeTableResource.java  | 49 ++++++++++++++++++-
 .../api/resources/PinotSegmentRestletResource.java | 31 ------------
 .../util/ConsumingSegmentInfoReader.java           | 34 ++++++++++++-
 .../ConsumingSegmentInfoReaderStatelessTest.java   | 40 ++++++++++++----
 .../realtime/HLRealtimeSegmentDataManager.java     | 13 +++++
 .../realtime/LLRealtimeSegmentDataManager.java     | 19 ++++++++
 .../realtime/RealtimeSegmentDataManager.java       | 14 ++++++
 .../stream/kafka20/KafkaConsumerPartitionLag.java  | 34 +++++++++++++
 .../kafka20/KafkaStreamMetadataProvider.java       | 21 ++++++++
 .../pinot/server/api/resources/DebugResource.java  | 21 ++++++--
 .../pinot/server/api/resources/TablesResource.java | 29 +++++++++--
 .../pinot/spi/stream/ConsumerPartitionState.java   | 56 ++++++++++++++++++++++
 .../apache/pinot/spi/stream/PartitionLagState.java | 38 +++++++++++++++
 .../pinot/spi/stream/StreamMetadataProvider.java   | 12 +++++
 15 files changed, 400 insertions(+), 53 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
index 83e043311c..e77bb97d82 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/SegmentConsumerInfo.java
@@ -32,15 +32,18 @@ public class SegmentConsumerInfo {
   private final String _consumerState;
   private final long _lastConsumedTimestamp;
   private final Map<String, String> _partitionToOffsetMap;
+  private final PartitionOffsetInfo _partitionOffsetInfo;
 
   public SegmentConsumerInfo(@JsonProperty("segmentName") String segmentName,
       @JsonProperty("consumerState") String consumerState,
       @JsonProperty("lastConsumedTimestamp") long lastConsumedTimestamp,
-      @JsonProperty("partitionToOffsetMap") Map<String, String> 
partitionToOffsetMap) {
+      @JsonProperty("partitionToOffsetMap") Map<String, String> 
partitionToOffsetMap,
+      @JsonProperty("partitionOffsetInfo") PartitionOffsetInfo 
partitionOffsetInfo) {
     _segmentName = segmentName;
     _consumerState = consumerState;
     _lastConsumedTimestamp = lastConsumedTimestamp;
     _partitionToOffsetMap = partitionToOffsetMap;
+    _partitionOffsetInfo = partitionOffsetInfo;
   }
 
   public String getSegmentName() {
@@ -58,4 +61,41 @@ public class SegmentConsumerInfo {
   public Map<String, String> getPartitionToOffsetMap() {
     return _partitionToOffsetMap;
   }
+
+  public PartitionOffsetInfo getPartitionOffsetInfo() {
+    return _partitionOffsetInfo;
+  }
+
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  static public class PartitionOffsetInfo {
+      @JsonProperty("currentOffsets")
+      public Map<String, String> _currentOffsets;
+
+      @JsonProperty("recordsLag")
+      public Map<String, String> _recordsLag;
+
+      @JsonProperty("latestUpstreamOffsets")
+      public Map<String, String> _latestUpstreamOffsets;
+
+      public PartitionOffsetInfo(
+          @JsonProperty("currentOffsets") Map<String, String> currentOffsets,
+          @JsonProperty("latestUpstreamOffsets") Map<String, String> 
latestUpstreamOffsets,
+          @JsonProperty("recordsLag") Map<String, String> recordsLag) {
+        _currentOffsets = currentOffsets;
+        _latestUpstreamOffsets = latestUpstreamOffsets;
+        _recordsLag = recordsLag;
+      }
+
+    public Map<String, String> getCurrentOffsets() {
+      return _currentOffsets;
+    }
+
+    public Map<String, String> getRecordsLag() {
+      return _recordsLag;
+    }
+
+    public Map<String, String> getLatestUpstreamOffsets() {
+      return _latestUpstreamOffsets;
+    }
+  }
 }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 715d65bede..f33433e164 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -22,9 +22,12 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiKeyAuthDefinition;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
+import java.util.concurrent.Executor;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -35,10 +38,14 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.helix.model.IdealState;
+import org.apache.pinot.controller.ControllerConf;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +60,15 @@ import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_K
 public class PinotRealtimeTableResource {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotRealtimeTableResource.class);
 
+  @Inject
+  ControllerConf _controllerConf;
+
+  @Inject
+  Executor _executor;
+
+  @Inject
+  HttpConnectionManager _connectionManager;
+
   @Inject
   PinotHelixResourceManager _pinotHelixResourceManager;
 
@@ -123,7 +139,7 @@ public class PinotRealtimeTableResource {
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation(value = "Return pause status of a realtime table",
       notes = "Return pause status of a realtime table along with list of 
consuming segments.")
-  public Response getConsumptionStatus(
+  public Response getPauseStatus(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     validate(tableNameWithType);
@@ -134,6 +150,37 @@ public class PinotRealtimeTableResource {
     }
   }
 
+  @GET
+  @Path("/tables/{tableName}/consumingSegmentsInfo")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Returns state of consuming segments", notes = "Gets 
the status of consumers from all servers."
+      + "Note that the partitionToOffsetMap has been deprecated and will be 
removed in the next release. The info is "
+      + "now embedded within each partition's state as currentOffsetsMap.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Success"),
+      @ApiResponse(code = 404, message = "Table not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap 
getConsumingSegmentsInfo(
+      @ApiParam(value = "Realtime table name with or without type", required = 
true,
+          example = "myTable | myTable_REALTIME") @PathParam("tableName") 
String realtimeTableName) {
+    try {
+      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
+      if (TableType.OFFLINE == tableType) {
+        throw new IllegalStateException("Cannot get consuming segments info 
for OFFLINE table: " + realtimeTableName);
+      }
+      String tableNameWithType = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
+      ConsumingSegmentInfoReader consumingSegmentInfoReader =
+          new ConsumingSegmentInfoReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
+      return consumingSegmentInfoReader
+          .getConsumingSegmentsInfo(tableNameWithType, 
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to get consuming segments info for table %s. 
%s", realtimeTableName, e.getMessage()),
+          Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   private void validate(String tableNameWithType) {
     IdealState idealState = 
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
     if (idealState == null) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 3badc1f988..ea77b9caec 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -77,7 +77,6 @@ import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
 import org.apache.pinot.controller.util.CompletionServiceHelper;
-import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
 import org.apache.pinot.controller.util.TableMetadataReader;
 import org.apache.pinot.controller.util.TableTierReader;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -1010,36 +1009,6 @@ public class PinotSegmentRestletResource {
         .getSegmentsMetadata(tableNameWithType, columns, 
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
   }
 
-  // TODO: Move this API into PinotTableRestletResource
-  @GET
-  @Path("/tables/{realtimeTableName}/consumingSegmentsInfo")
-  @Produces(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Returns state of consuming segments", notes = "Gets 
the status of consumers from all servers")
-  @ApiResponses(value = {
-      @ApiResponse(code = 200, message = "Success"),
-      @ApiResponse(code = 404, message = "Table not found"),
-      @ApiResponse(code = 500, message = "Internal server error")
-  })
-  public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap 
getConsumingSegmentsInfo(
-      @ApiParam(value = "Realtime table name with or without type", required = 
true,
-          example = "myTable | myTable_REALTIME") 
@PathParam("realtimeTableName") String realtimeTableName) {
-    try {
-      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
-      if (TableType.OFFLINE == tableType) {
-        throw new IllegalStateException("Cannot get consuming segments info 
for OFFLINE table: " + realtimeTableName);
-      }
-      String tableNameWithType = 
TableNameBuilder.forType(TableType.REALTIME).tableNameWithType(realtimeTableName);
-      ConsumingSegmentInfoReader consumingSegmentInfoReader =
-          new ConsumingSegmentInfoReader(_executor, _connectionManager, 
_pinotHelixResourceManager);
-      return consumingSegmentInfoReader
-          .getConsumingSegmentsInfo(tableNameWithType, 
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
-    } catch (Exception e) {
-      throw new ControllerApplicationException(LOGGER,
-          String.format("Failed to get consuming segments info for table %s. 
%s", realtimeTableName, e.getMessage()),
-          Response.Status.INTERNAL_SERVER_ERROR, e);
-    }
-  }
-
   @POST
   @Path("/segments/{tableNameWithType}/updateZKTimeInterval")
   @Authenticate(AccessType.UPDATE)
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
index 5c7ba349c6..8539eb89f8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ConsumingSegmentInfoReader.java
@@ -81,9 +81,12 @@ public class ConsumingSegmentInfoReader {
     for (Map.Entry<String, List<SegmentConsumerInfo>> entry : 
serverToSegmentConsumerInfoMap.entrySet()) {
       String serverName = entry.getKey();
       for (SegmentConsumerInfo info : entry.getValue()) {
+        SegmentConsumerInfo.PartitionOffsetInfo partitionOffsetInfo = 
info.getPartitionOffsetInfo();
+        PartitionOffsetInfo offsetInfo = new 
PartitionOffsetInfo(partitionOffsetInfo.getCurrentOffsets(),
+            partitionOffsetInfo.getLatestUpstreamOffsets(), 
partitionOffsetInfo.getRecordsLag());
         consumingSegmentInfoMap.computeIfAbsent(info.getSegmentName(), k -> 
new ArrayList<>()).add(
             new ConsumingSegmentInfo(serverName, info.getConsumerState(), 
info.getLastConsumedTimestamp(),
-                info.getPartitionToOffsetMap()));
+                partitionOffsetInfo.getCurrentOffsets(), offsetInfo));
       }
     }
     // Segments which are in CONSUMING state but found no consumer on the 
server
@@ -203,17 +206,44 @@ public class ConsumingSegmentInfoReader {
     public String _consumerState;
     @JsonProperty("lastConsumedTimestamp")
     public long _lastConsumedTimestamp;
+    @Deprecated
     @JsonProperty("partitionToOffsetMap")
     public Map<String, String> _partitionToOffsetMap;
+    @JsonProperty("partitionOffsetInfo")
+    public PartitionOffsetInfo _partitionOffsetInfo;
+
 
     public ConsumingSegmentInfo(@JsonProperty("serverName") String serverName,
         @JsonProperty("consumerState") String consumerState,
         @JsonProperty("lastConsumedTimestamp") long lastConsumedTimestamp,
-        @JsonProperty("partitionToOffsetMap") Map<String, String> 
partitionToOffsetMap) {
+        @JsonProperty("partitionToOffsetMap") Map<String, String> 
partitionToOffsetMap,
+        @JsonProperty("partitionOffsetInfo") PartitionOffsetInfo 
partitionOffsetInfo) {
       _serverName = serverName;
       _consumerState = consumerState;
       _lastConsumedTimestamp = lastConsumedTimestamp;
       _partitionToOffsetMap = partitionToOffsetMap;
+      _partitionOffsetInfo = partitionOffsetInfo;
+    }
+  }
+
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  static public class PartitionOffsetInfo {
+    @JsonProperty("currentOffsetsMap")
+    public Map<String, String> _currentOffsetsMap;
+
+    @JsonProperty("recordsLagMap")
+    public Map<String, String> _recordsLagMap;
+
+    @JsonProperty("latestUpstreamOffsetMap")
+    public Map<String, String> _latestUpstreamOffsetMap;
+
+    public PartitionOffsetInfo(
+        @JsonProperty("currentOffsetsMap") Map<String, String> 
currentOffsetsMap,
+        @JsonProperty("latestUpstreamOffsetMap") Map<String, String> 
latestUpstreamOffsetMap,
+        @JsonProperty("recordsLagMap") Map<String, String> recordsLagMap) {
+      _currentOffsetsMap = currentOffsetsMap;
+      _latestUpstreamOffsetMap = latestUpstreamOffsetMap;
+      _recordsLagMap = recordsLagMap;
     }
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
index 4877390713..7565c62420 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/ConsumingSegmentInfoReaderStatelessTest.java
@@ -88,35 +88,55 @@ public class ConsumingSegmentInfoReaderStatelessTest {
     Map<String, String> partitionToOffset1 = new HashMap<>();
     partitionToOffset1.put("1", "150");
     FakeConsumingInfoServer s0 = new FakeConsumingInfoServer(Lists
-        .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, 
"CONSUMING", 0, partitionToOffset0),
-            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, 
partitionToOffset1)));
+        .newArrayList(
+            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
+                partitionToOffset0, new 
SegmentConsumerInfo.PartitionOffsetInfo(
+                    partitionToOffset0, Collections.emptyMap(), 
Collections.emptyMap())),
+            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
+                partitionToOffset1, new 
SegmentConsumerInfo.PartitionOffsetInfo(
+                    partitionToOffset1, Collections.emptyMap(), 
Collections.emptyMap()))));
     s0.start(uriPath, createHandler(200, s0._consumerInfos, 0));
     _serverMap.put("server0", s0);
 
     // server1 - 1 consumer each for p0 and p1. CONSUMING.
     FakeConsumingInfoServer s1 = new FakeConsumingInfoServer(Lists
-        .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, 
"CONSUMING", 0, partitionToOffset0),
-            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, 
partitionToOffset1)));
+        .newArrayList(
+            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, "CONSUMING", 0,
+                partitionToOffset0, new 
SegmentConsumerInfo.PartitionOffsetInfo(
+                    partitionToOffset0, Collections.emptyMap(), 
Collections.emptyMap())),
+            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0,
+                partitionToOffset1, new 
SegmentConsumerInfo.PartitionOffsetInfo(
+                    partitionToOffset1, Collections.emptyMap(), 
Collections.emptyMap()))));
     s1.start(uriPath, createHandler(200, s1._consumerInfos, 0));
     _serverMap.put("server1", s1);
 
     // server2 - p1 consumer CONSUMING. p0 consumer NOT_CONSUMING
     FakeConsumingInfoServer s2 = new FakeConsumingInfoServer(Lists
-        .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, 
"NOT_CONSUMING", 0, partitionToOffset0),
-            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, 
partitionToOffset1)));
+        .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, 
"NOT_CONSUMING", 0,
+                partitionToOffset0, new 
SegmentConsumerInfo.PartitionOffsetInfo(
+                    partitionToOffset0, Collections.emptyMap(), 
Collections.emptyMap())),
+            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, 
partitionToOffset1,
+                new 
SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, 
Collections.emptyMap(),
+                    Collections.emptyMap()))));
     s2.start(uriPath, createHandler(200, s2._consumerInfos, 0));
     _serverMap.put("server2", s2);
 
     // server3 - 1 consumer for p1. No consumer for p0
     FakeConsumingInfoServer s3 = new FakeConsumingInfoServer(
-        Lists.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, 
"CONSUMING", 0, partitionToOffset1)));
+        Lists.newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, 
"CONSUMING", 0,
+            partitionToOffset1, new 
SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, 
Collections.emptyMap(),
+                Collections.emptyMap()))));
     s3.start(uriPath, createHandler(200, s3._consumerInfos, 0));
     _serverMap.put("server3", s3);
 
     // server4 - unreachable/error/timeout
     FakeConsumingInfoServer s4 = new FakeConsumingInfoServer(Lists
-        .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, 
"CONSUMING", 0, partitionToOffset0),
-            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, 
partitionToOffset1)));
+        .newArrayList(new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_0, 
"CONSUMING", 0,
+                partitionToOffset0, new 
SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset0,
+                Collections.emptyMap(), Collections.emptyMap())),
+            new SegmentConsumerInfo(SEGMENT_NAME_PARTITION_1, "CONSUMING", 0, 
partitionToOffset1,
+                new 
SegmentConsumerInfo.PartitionOffsetInfo(partitionToOffset1, 
Collections.emptyMap(),
+                    Collections.emptyMap()))));
     s4.start(uriPath, createHandler(200, s4._consumerInfos, TIMEOUT_MSEC * 
EXTENDED_TIMEOUT_FACTOR));
     _serverMap.put("server4", s4);
   }
@@ -349,6 +369,6 @@ public class ConsumingSegmentInfoReaderStatelessTest {
       String consumerState, String partition, String offset) {
     Assert.assertTrue(serverNames.contains(info._serverName));
     Assert.assertEquals(info._consumerState, consumerState);
-    Assert.assertEquals(info._partitionToOffsetMap.get(partition), offset);
+    
Assert.assertEquals(info._partitionOffsetInfo._currentOffsetsMap.get(partition),
 offset);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 45b4d0291a..aa16a32147 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -51,6 +51,8 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.PartitionLagState;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -424,6 +426,17 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<String, PartitionLagState> getPartitionToLagState(
+      Map<String, ConsumerPartitionState> consumerPartitionStateMap) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public String getSegmentName() {
     return _segmentName;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 5f2d733fe6..f395d71e56 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -71,11 +71,13 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
+import org.apache.pinot.spi.stream.PartitionLagState;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
 import org.apache.pinot.spi.stream.PermanentConsumerException;
 import org.apache.pinot.spi.stream.RowMetadata;
@@ -819,6 +821,23 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     return _lastLogTime;
   }
 
+  @Override
+  public Map<String, ConsumerPartitionState> getConsumerPartitionState() {
+    String partitionGroupId = String.valueOf(_partitionGroupId);
+    return Collections.singletonMap(partitionGroupId, new 
ConsumerPartitionState(partitionGroupId, getCurrentOffset(),
+        getLastConsumedTimestamp(), fetchLatestStreamOffset(5_000)));
+  }
+
+  @Override
+  public Map<String, PartitionLagState> getPartitionToLagState(
+      Map<String, ConsumerPartitionState> consumerPartitionStateMap) {
+    if (_partitionMetadataProvider == null) {
+      createPartitionMetadataProvider("Get Partition Lag State");
+    }
+    ;
+    return 
_partitionMetadataProvider.getCurrentPartitionLagState(consumerPartitionStateMap);
+  }
+
   public StreamPartitionMsgOffset getCurrentOffset() {
     return _currentOffset;
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index e40376a9e0..e1e06c9081 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -25,6 +25,8 @@ import 
org.apache.pinot.segment.local.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.segment.local.io.writer.impl.MmapMemoryManager;
 import org.apache.pinot.segment.spi.MutableSegment;
 import org.apache.pinot.segment.spi.memory.PinotDataBufferMemoryManager;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.PartitionLagState;
 import org.apache.pinot.spi.utils.CommonConstants.ConsumerState;
 
 
@@ -54,5 +56,17 @@ public abstract class RealtimeSegmentDataManager extends 
SegmentDataManager {
    */
   public abstract ConsumerState getConsumerState();
 
+  /**
+   * @return Timestamp at which the last record was indexed
+   */
   public abstract long getLastConsumedTimestamp();
+
+  /**
+   * @return Per-partition consumer's status, which typically includes last 
consumed message timestamp,
+   * latest available upstream offset etc
+   */
+  public abstract Map<String, ConsumerPartitionState> 
getConsumerPartitionState();
+
+  public abstract Map<String, PartitionLagState> getPartitionToLagState(
+      Map<String, ConsumerPartitionState> consumerPartitionStateMap);
 }
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java
new file mode 100644
index 0000000000..709c2a7d79
--- /dev/null
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerPartitionLag.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka20;
+
+import org.apache.pinot.spi.stream.PartitionLagState;
+
+
+public class KafkaConsumerPartitionLag extends PartitionLagState {
+  private final String _recordsLag;
+
+  public KafkaConsumerPartitionLag(String recordsLag) {
+    _recordsLag = recordsLag;
+  }
+
+  public String getRecordsLag() {
+    return _recordsLag;
+  }
+}
diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 282a862dd6..c6fa7ff218 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -23,10 +23,14 @@ import java.io.IOException;
 import java.time.Clock;
 import java.time.Duration;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionLagState;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -104,6 +108,23 @@ public class KafkaStreamMetadataProvider extends 
KafkaPartitionLevelConnectionHa
     }
   }
 
+  @Override
+  public Map<String, PartitionLagState> getCurrentPartitionLagState(
+      Map<String, ConsumerPartitionState> currentPartitionStateMap) {
+    Map<String, PartitionLagState> perPartitionLag = new HashMap<>();
+    for (Map.Entry<String, ConsumerPartitionState> entry: 
currentPartitionStateMap.entrySet()) {
+      StreamPartitionMsgOffset currentOffset = 
entry.getValue().getCurrentOffset();
+      StreamPartitionMsgOffset upstreamLatest = 
entry.getValue().getUpstreamLatestOffset();
+      if (currentOffset instanceof LongMsgOffset && upstreamLatest instanceof 
LongMsgOffset) {
+        long offsetLag = ((LongMsgOffset) upstreamLatest).getOffset() - 
((LongMsgOffset) currentOffset).getOffset();
+        perPartitionLag.put(entry.getKey(), new 
KafkaConsumerPartitionLag(String.valueOf(offsetLag)));
+      } else {
+        perPartitionLag.put(entry.getKey(), new 
KafkaConsumerPartitionLag("UNKNOWN"));
+      }
+    }
+    return perPartitionLag;
+  }
+
   @Override
   public void close()
       throws IOException {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
index e53b10c864..9d0a1976fc 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -49,6 +50,8 @@ import 
org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.server.starter.ServerInstance;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.PartitionLagState;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 import static 
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
@@ -134,11 +137,23 @@ public class DebugResource {
     SegmentConsumerInfo segmentConsumerInfo = null;
     if (tableType == TableType.REALTIME) {
       RealtimeSegmentDataManager realtimeSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
-      String segmentName = segmentDataManager.getSegmentName();
+      Map<String, ConsumerPartitionState> partitionStateMap = 
realtimeSegmentDataManager.getConsumerPartitionState();
+      Map<String, PartitionLagState> partitionLagStateMap =
+          realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap);
+      Map<String, String> partitionToCurrentOffsetMap = 
realtimeSegmentDataManager.getPartitionToCurrentOffset();
       segmentConsumerInfo =
-          new SegmentConsumerInfo(segmentName, 
realtimeSegmentDataManager.getConsumerState().toString(),
+          new SegmentConsumerInfo(
+              segmentDataManager.getSegmentName(),
+              realtimeSegmentDataManager.getConsumerState().toString(),
               realtimeSegmentDataManager.getLastConsumedTimestamp(),
-              realtimeSegmentDataManager.getPartitionToCurrentOffset());
+              partitionToCurrentOffsetMap,
+              new 
SegmentConsumerInfo.PartitionOffsetInfo(partitionToCurrentOffsetMap,
+                  partitionStateMap.entrySet().stream().collect(
+                      Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().getUpstreamLatestOffset().toString())
+                  ),
+                  partitionLagStateMap.entrySet().stream().collect(
+                      Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().getRecordsLag())
+                  )));
     }
     return segmentConsumerInfo;
   }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index 6d1e8b847d..8d67bf5ce7 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 import javax.inject.Inject;
 import javax.inject.Named;
 import javax.ws.rs.DefaultValue;
@@ -88,6 +89,8 @@ import 
org.apache.pinot.server.starter.helix.AdminApiApplication;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.stream.ConsumerPartitionState;
+import org.apache.pinot.spi.stream.PartitionLagState;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -492,11 +495,12 @@ public class TablesResource {
   @Path("tables/{realtimeTableName}/consumingSegmentsInfo")
   @Produces(MediaType.APPLICATION_JSON)
   @ApiOperation(value = "Get the info for consumers of this REALTIME table",
-      notes = "Get consumers info from the table data manager")
+      notes = "Get consumers info from the table data manager. Note that the 
partitionToOffsetMap has been deprecated "
+          + "and will be removed in the next release. The info is now embedded 
within each partition's state as "
+          + "currentOffsetsMap")
   public List<SegmentConsumerInfo> getConsumingSegmentsInfo(
       @ApiParam(value = "Name of the REALTIME table", required = true) 
@PathParam("realtimeTableName")
           String realtimeTableName) {
-
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(realtimeTableName);
     if (TableType.OFFLINE == tableType) {
       throw new WebApplicationException("Cannot get consuming segment info for 
OFFLINE table: " + realtimeTableName);
@@ -511,11 +515,26 @@ public class TablesResource {
       for (SegmentDataManager segmentDataManager : segmentDataManagers) {
         if (segmentDataManager instanceof RealtimeSegmentDataManager) {
           RealtimeSegmentDataManager realtimeSegmentDataManager = 
(RealtimeSegmentDataManager) segmentDataManager;
-          String segmentName = segmentDataManager.getSegmentName();
+          Map<String, ConsumerPartitionState> partitionStateMap =
+              realtimeSegmentDataManager.getConsumerPartitionState();
+          Map<String, PartitionLagState> partitionLagStateMap =
+              
realtimeSegmentDataManager.getPartitionToLagState(partitionStateMap);
+          @Deprecated Map<String, String> partitiionToOffsetMap =
+              realtimeSegmentDataManager.getPartitionToCurrentOffset();
           segmentConsumerInfoList.add(
-              new SegmentConsumerInfo(segmentName, 
realtimeSegmentDataManager.getConsumerState().toString(),
+              new SegmentConsumerInfo(segmentDataManager.getSegmentName(),
+                  realtimeSegmentDataManager.getConsumerState().toString(),
                   realtimeSegmentDataManager.getLastConsumedTimestamp(),
-                  realtimeSegmentDataManager.getPartitionToCurrentOffset()));
+                  partitiionToOffsetMap,
+                  new SegmentConsumerInfo.PartitionOffsetInfo(
+                      partitiionToOffsetMap,
+                      partitionStateMap.entrySet().stream().collect(
+                          Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().getUpstreamLatestOffset().toString())
+                      ),
+                      partitionLagStateMap.entrySet().stream().collect(
+                          Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().getRecordsLag())
+                      )))
+          );
         }
       }
     } catch (Exception e) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java
new file mode 100644
index 0000000000..9da90d5bff
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/ConsumerPartitionState.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * Container for holding the current consumption state at a per-partition level
+ */
+public class ConsumerPartitionState {
+  private final String _partitionId;
+  private final StreamPartitionMsgOffset _currentOffset;
+  private final long _lastProcessedTimeMs;
+  private final StreamPartitionMsgOffset _upstreamLatestOffset;
+
+  public ConsumerPartitionState(String partitionId, StreamPartitionMsgOffset 
currentOffset, long lastProcessedTimeMs,
+      @Nullable StreamPartitionMsgOffset upstreamLatestOffset) {
+    _partitionId = partitionId;
+    _currentOffset = currentOffset;
+    _lastProcessedTimeMs = lastProcessedTimeMs;
+    _upstreamLatestOffset = upstreamLatestOffset;
+  }
+
+  public StreamPartitionMsgOffset getCurrentOffset() {
+    return _currentOffset;
+  }
+
+  public long getLastProcessedTimeMs() {
+    return _lastProcessedTimeMs;
+  }
+
+  public String getPartitionId() {
+    return _partitionId;
+  }
+
+  public StreamPartitionMsgOffset getUpstreamLatestOffset() {
+    return _upstreamLatestOffset;
+  }
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
new file mode 100644
index 0000000000..21e2222403
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionLagState.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.spi.stream;
+
+/**
+ * Container that can be used for holding per-partition consumer lag 
calculated along standard dimensions such as
+ * record offset, ingestion time etc.
+ */
+public class PartitionLagState {
+  protected final static String NOT_CALCULATED = "NOT_CALCULATED";
+
+  /**
+   * Defines how far away the current record's offset / pointer is from 
upstream latest record
+   * The distance is based on actual record count.
+   */
+  public String getRecordsLag() {
+    return NOT_CALCULATED;
+  }
+
+  // TODO: Define record availability lag ($latest_record_consumption_time - 
$latest_record_ingestion_time)
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 6f79df6fe8..94abc3ca6d 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -21,7 +21,9 @@ package org.apache.pinot.spi.stream;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeoutException;
 import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.annotations.InterfaceStability;
@@ -87,4 +89,14 @@ public interface StreamMetadataProvider extends Closeable {
     }
     return newPartitionGroupMetadataList;
   }
+
+  default Map<String, PartitionLagState> getCurrentPartitionLagState(
+      Map<String, ConsumerPartitionState> currentPartitionStateMap) {
+    Map<String, PartitionLagState> result = new HashMap<>();
+    PartitionLagState unknownLagState = new UnknownLagState();
+    currentPartitionStateMap.forEach((k, v) -> result.put(k, unknownLagState));
+    return result;
+  }
+
+  class UnknownLagState extends PartitionLagState { }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to