This is an automated email from the ASF dual-hosted git repository.
sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8feec414cb Fix issue with realtime partition mismatch metric (#11871)
8feec414cb is described below
commit 8feec414cbe10d97573740b73cd61715931f05da
Author: Sajjad Moradi <[email protected]>
AuthorDate: Thu Oct 26 18:21:43 2023 -0700
Fix issue with realtime partition mismatch metric (#11871)
---
.../org/apache/pinot/common/metrics/ServerMeter.java | 1 +
.../manager/realtime/RealtimeSegmentDataManager.java | 3 +--
.../local/indexsegment/mutable/MutableSegmentImpl.java | 16 +++++++++++-----
3 files changed, 13 insertions(+), 7 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index f77194d37b..5b995d6e6a 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -41,6 +41,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
REALTIME_OFFSET_COMMITS("commits", true),
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
+ // number of times partition of a record did not match the partition of the
stream
REALTIME_PARTITION_MISMATCH("mismatch", false),
REALTIME_DEDUP_DROPPED("rows", false),
UPSERT_KEYS_IN_WRONG_SEGMENT("rows", false),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index f954eff0e1..087f8c2cd2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1574,10 +1574,9 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
Collections.emptyList(), /*maxWaitTimeMs=*/5000).size();
if (numPartitionGroups != numPartitions) {
- _segmentLogger.warn(
+ _segmentLogger.info(
"Number of stream partitions: {} does not match number of
partitions in the partition config: {}, "
+ "using number of stream " + "partitions",
numPartitionGroups, numPartitions);
- _serverMetrics.addMeteredTableValue(_tableNameWithType,
ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
numPartitions = numPartitionGroups;
}
} catch (Exception e) {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index a99b3c75f4..7adeae3d7b 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -138,6 +138,7 @@ public class MutableSegmentImpl implements MutableSegment {
private final RealtimeSegmentStatsHistory _statsHistory;
private final String _partitionColumn;
private final PartitionFunction _partitionFunction;
+ private final int _mainPartitionId; // partition id designated for this
consuming segment
private final boolean _nullHandlingEnabled;
private final Map<String, IndexContainer> _indexContainerMap = new
HashMap<>();
@@ -211,6 +212,7 @@ public class MutableSegmentImpl implements MutableSegment {
_statsHistory = config.getStatsHistory();
_partitionColumn = config.getPartitionColumn();
_partitionFunction = config.getPartitionFunction();
+ _mainPartitionId = config.getPartitionId();
_nullHandlingEnabled = config.isNullHandlingEnabled();
Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
@@ -290,10 +292,10 @@ public class MutableSegmentImpl implements MutableSegment
{
// NOTE: Use a concurrent set because the partitions can be updated
when the partition of the ingested record
// does not match the stream partition. This could happen when
stream partition changes, or the records
- // are not properly partitioned from the stream. Log an warning
and emit a metric if it happens, then add
+ // are not properly partitioned from the stream. Log a warning
and emit a metric if it happens, then add
// the new partition into this set.
partitions = ConcurrentHashMap.newKeySet();
- partitions.add(config.getPartitionId());
+ partitions.add(_mainPartitionId);
}
// TODO (mutable-index-spi): The comment above was here, but no check
was done.
@@ -666,9 +668,13 @@ public class MutableSegmentImpl implements MutableSegment {
if (column.equals(_partitionColumn)) {
Object valueToPartition = (dataType == BYTES) ? new
ByteArray((byte[]) value) : value;
int partition = _partitionFunction.getPartition(valueToPartition);
- if (indexContainer._partitions.add(partition)) {
- _logger.warn("Found new partition: {} from partition column: {},
value: {}", partition, column,
- valueToPartition);
+ if (partition != _mainPartitionId) {
+ if (indexContainer._partitions.add(partition)) {
+ // for every partition other than mainPartitionId, log a warning
once
+ _logger.warn("Found new partition: {} from partition column: {},
value: {}", partition, column,
+ valueToPartition);
+ }
+ // always emit a metric when a partition other than
mainPartitionId is detected
if (_serverMetrics != null) {
_serverMetrics.addMeteredTableValue(_realtimeTableName,
ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]