This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 022e0a0893 Add ability to track filtered messages offset (#12602)
022e0a0893 is described below

commit 022e0a08937d8f406bd41cfb6893d0a9b7423b08
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Fri Apr 19 01:44:50 2024 +0530

    Add ability to track filtered messages offset (#12602)
---
 .../manager/realtime/RealtimeSegmentDataManager.java    | 17 +++++++++++++++++
 .../config/table/ingestion/StreamIngestionConfig.java   | 11 +++++++++++
 2 files changed, 28 insertions(+)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index d15e791160..6771e038d1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -307,6 +307,8 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
 
   private final StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime;
   private final CompletionMode _segmentCompletionMode;
+  private final List<String> _filteredMessageOffsets = new ArrayList<>();
+  private boolean _trackFilteredMessageOffsets = false;
 
   // TODO each time this method is called, we print reason for stop. Good to 
print only once.
   private boolean endCriteriaReached() {
@@ -609,6 +611,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         if (reusedResult.getSkippedRowCount() > 0) {
           realtimeRowsDroppedMeter = 
_serverMetrics.addMeteredTableValue(_clientId, 
ServerMeter.REALTIME_ROWS_FILTERED,
               reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter);
+          if (_trackFilteredMessageOffsets) {
+            _filteredMessageOffsets.add(offset.toString());
+          }
         }
         if (reusedResult.getIncompleteRowCount() > 0) {
           realtimeIncompleteRowsConsumedMeter =
@@ -1421,6 +1426,12 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
         .createRateLimiter(_streamConfig, _tableNameWithType, _serverMetrics, 
_clientId);
     _serverRateLimiter = 
RealtimeConsumptionRateManager.getInstance().getServerRateLimiter();
 
+    if (tableConfig.getIngestionConfig() != null
+        && tableConfig.getIngestionConfig().getStreamIngestionConfig() != 
null) {
+      _trackFilteredMessageOffsets =
+          
tableConfig.getIngestionConfig().getStreamIngestionConfig().isTrackFilteredMessageOffsets();
+    }
+
     List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
     String sortedColumn;
     if (sortedColumns.isEmpty()) {
@@ -1758,6 +1769,12 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       _segmentLogger.info(
           "Consumed {} events from (rate:{}/s), currentOffset={}, 
numRowsConsumedSoFar={}, numRowsIndexedSoFar={}",
           rowsConsumed, consumedRate, _currentOffset, _numRowsConsumed, 
_numRowsIndexed);
+      if (_filteredMessageOffsets.size() > 0) {
+        if (_trackFilteredMessageOffsets) {
+          _segmentLogger.info("Filtered events with offsets: {}", 
_filteredMessageOffsets);
+        }
+        _filteredMessageOffsets.clear();
+      }
       _lastConsumedCount = _numRowsConsumed;
       _lastLogTime = now;
     }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
index 2d832dd4b2..5b216ca9d2 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
@@ -37,6 +37,9 @@ public class StreamIngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Whether to use column major mode when creating the 
segment.")
   private boolean _columnMajorSegmentBuilderEnabled = true;
 
+  @JsonPropertyDescription("Whether to track offsets of the filtered stream 
messages during consumption.")
+  private boolean _trackFilteredMessageOffsets = false;
+
   @JsonCreator
   public StreamIngestionConfig(@JsonProperty("streamConfigMaps") 
List<Map<String, String>> streamConfigMaps) {
     _streamConfigMaps = streamConfigMaps;
@@ -53,4 +56,12 @@ public class StreamIngestionConfig extends BaseJsonConfig {
   public boolean getColumnMajorSegmentBuilderEnabled() {
     return _columnMajorSegmentBuilderEnabled;
   }
+
+  public void setTrackFilteredMessageOffsets(boolean 
trackFilteredMessageOffsets) {
+    _trackFilteredMessageOffsets = trackFilteredMessageOffsets;
+  }
+
+  public boolean isTrackFilteredMessageOffsets() {
+    return _trackFilteredMessageOffsets;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to