This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 022e0a0893 Add ability to track filtered messages offset (#12602)
022e0a0893 is described below
commit 022e0a08937d8f406bd41cfb6893d0a9b7423b08
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Fri Apr 19 01:44:50 2024 +0530
Add ability to track filtered messages offset (#12602)
---
.../manager/realtime/RealtimeSegmentDataManager.java | 17 +++++++++++++++++
.../config/table/ingestion/StreamIngestionConfig.java | 11 +++++++++++
2 files changed, 28 insertions(+)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index d15e791160..6771e038d1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -307,6 +307,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private final StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime;
private final CompletionMode _segmentCompletionMode;
+ private final List<String> _filteredMessageOffsets = new ArrayList<>();
+ private boolean _trackFilteredMessageOffsets = false;
// TODO each time this method is called, we print reason for stop. Good to
print only once.
private boolean endCriteriaReached() {
@@ -609,6 +611,9 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
if (reusedResult.getSkippedRowCount() > 0) {
realtimeRowsDroppedMeter =
_serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.REALTIME_ROWS_FILTERED,
reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter);
+ if (_trackFilteredMessageOffsets) {
+ _filteredMessageOffsets.add(offset.toString());
+ }
}
if (reusedResult.getIncompleteRowCount() > 0) {
realtimeIncompleteRowsConsumedMeter =
@@ -1421,6 +1426,12 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
.createRateLimiter(_streamConfig, _tableNameWithType, _serverMetrics,
_clientId);
_serverRateLimiter =
RealtimeConsumptionRateManager.getInstance().getServerRateLimiter();
+ if (tableConfig.getIngestionConfig() != null
+ && tableConfig.getIngestionConfig().getStreamIngestionConfig() !=
null) {
+ _trackFilteredMessageOffsets =
+
tableConfig.getIngestionConfig().getStreamIngestionConfig().isTrackFilteredMessageOffsets();
+ }
+
List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
String sortedColumn;
if (sortedColumns.isEmpty()) {
@@ -1758,6 +1769,12 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_segmentLogger.info(
"Consumed {} events from (rate:{}/s), currentOffset={},
numRowsConsumedSoFar={}, numRowsIndexedSoFar={}",
rowsConsumed, consumedRate, _currentOffset, _numRowsConsumed,
_numRowsIndexed);
+ if (_filteredMessageOffsets.size() > 0) {
+ if (_trackFilteredMessageOffsets) {
+ _segmentLogger.info("Filtered events with offsets: {}",
_filteredMessageOffsets);
+ }
+ _filteredMessageOffsets.clear();
+ }
_lastConsumedCount = _numRowsConsumed;
_lastLogTime = now;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
index 2d832dd4b2..5b216ca9d2 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
@@ -37,6 +37,9 @@ public class StreamIngestionConfig extends BaseJsonConfig {
@JsonPropertyDescription("Whether to use column major mode when creating the
segment.")
private boolean _columnMajorSegmentBuilderEnabled = true;
+ @JsonPropertyDescription("Whether to track offsets of the filtered stream
messages during consumption.")
+ private boolean _trackFilteredMessageOffsets = false;
+
@JsonCreator
public StreamIngestionConfig(@JsonProperty("streamConfigMaps")
List<Map<String, String>> streamConfigMaps) {
_streamConfigMaps = streamConfigMaps;
@@ -53,4 +56,12 @@ public class StreamIngestionConfig extends BaseJsonConfig {
public boolean getColumnMajorSegmentBuilderEnabled() {
return _columnMajorSegmentBuilderEnabled;
}
+
+ public void setTrackFilteredMessageOffsets(boolean
trackFilteredMessageOffsets) {
+ _trackFilteredMessageOffsets = trackFilteredMessageOffsets;
+ }
+
+ public boolean isTrackFilteredMessageOffsets() {
+ return _trackFilteredMessageOffsets;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]