This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 697adabc2e Adding table name to create unique Kafka consumer client
Ids (#10324)
697adabc2e is described below
commit 697adabc2e80793107fc40907d0ffefad6d907cf
Author: swaminathanmanish <[email protected]>
AuthorDate: Wed Mar 1 15:12:14 2023 -0800
Adding table name to create unique Kafka consumer client Ids (#10324)
---
.../realtime/LLRealtimeSegmentDataManager.java | 47 ++++++++++------------
.../spi/stream/PartitionGroupMetadataFetcher.java | 3 +-
2 files changed, 24 insertions(+), 26 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 4d82f6953e..28a24ad937 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -231,7 +231,6 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// modify the permit. This boolean make sure the semaphore gets released
only once when the partition group stops
// consuming.
private final AtomicBoolean _acquiredConsumerSemaphore;
- private final String _metricKeyName;
private final ServerMetrics _serverMetrics;
private final BooleanSupplier _isReadyToConsumeData;
private final MutableSegmentImpl _realtimeSegment;
@@ -455,10 +454,10 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// TODO Issue 5359 Need to find a way to bump metrics without getting
actual offset value.
if (_currentOffset instanceof LongMsgOffset) {
// TODO: only LongMsgOffset supplies long offset value.
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.HIGHEST_STREAM_OFFSET_CONSUMED,
((LongMsgOffset) _currentOffset).getOffset());
}
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
lastUpdatedOffset =
_streamPartitionMsgOffsetFactory.create(_currentOffset);
} else if (endCriteriaReached) {
// At this point current offset has not moved because
processStreamEvents() has exited before processing a
@@ -489,7 +488,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
if (totalIdleTimeMillis > idleTimeoutMillis) {
// Update the partition-consuming metric only if we have been
idling beyond idle timeout.
// Create a new stream consumer wrapper, in case we are stuck on
something.
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
recreateStreamConsumer(
String.format("Total idle time: %d ms exceeded idle timeout:
%d ms", totalIdleTimeMillis,
idleTimeoutMillis));
@@ -505,7 +504,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
}
if (_numRowsErrored > 0) {
- _serverMetrics.addMeteredTableValue(_metricKeyName,
ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
+ _serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
_serverMetrics.addMeteredTableValue(_tableStreamName,
ServerMeter.ROWS_WITH_ERRORS, _numRowsErrored);
}
return true;
@@ -568,7 +567,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// TODO: based on a config, decide whether the record should be
silently dropped or stop further consumption on
// decode error
realtimeRowsDroppedMeter =
- _serverMetrics.addMeteredTableValue(_metricKeyName,
ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
+ _serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.INVALID_REALTIME_ROWS_DROPPED, 1,
realtimeRowsDroppedMeter);
_numRowsErrored++;
} else {
@@ -584,12 +583,12 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
}
if (reusedResult.getSkippedRowCount() > 0) {
realtimeRowsDroppedMeter =
- _serverMetrics.addMeteredTableValue(_metricKeyName,
ServerMeter.REALTIME_ROWS_FILTERED,
+ _serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.REALTIME_ROWS_FILTERED,
reusedResult.getSkippedRowCount(), realtimeRowsDroppedMeter);
}
if (reusedResult.getIncompleteRowCount() > 0) {
realtimeIncompleteRowsConsumedMeter =
- _serverMetrics.addMeteredTableValue(_metricKeyName,
ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
+ _serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.INCOMPLETE_REALTIME_ROWS_CONSUMED,
reusedResult.getIncompleteRowCount(),
realtimeIncompleteRowsConsumedMeter);
}
for (GenericRow transformedRow : reusedResult.getTransformedRows()) {
@@ -599,7 +598,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_lastRowMetadata = msgMetadata;
_lastConsumedTimestampMs = System.currentTimeMillis();
realtimeRowsConsumedMeter =
- _serverMetrics.addMeteredTableValue(_metricKeyName,
ServerMeter.REALTIME_ROWS_CONSUMED, 1,
+ _serverMetrics.addMeteredTableValue(_clientId,
ServerMeter.REALTIME_ROWS_CONSUMED, 1,
realtimeRowsConsumedMeter);
} catch (Exception e) {
_numRowsErrored++;
@@ -647,20 +646,20 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
if (_state.shouldConsume()) {
consumeLoop(); // Consume until we reached the end criteria, or
we are stopped.
}
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
if (_shouldStop) {
break;
}
if (_state == State.INITIAL_CONSUMING) {
initialConsumptionEnd = now();
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LAST_REALTIME_SEGMENT_INITIAL_CONSUMPTION_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(initialConsumptionEnd -
_startTimeMs));
} else if (_state == State.CATCHING_UP) {
catchUpTimeMillis += now() - lastCatchUpStart;
_serverMetrics
- .setValueOfTableGauge(_metricKeyName,
ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
+ .setValueOfTableGauge(_clientId,
ServerGauge.LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(catchUpTimeMillis));
}
@@ -752,7 +751,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_state = State.ERROR;
_realtimeTableDataManager
.addSegmentError(_segmentNameStr, new SegmentErrorInfo(now(),
errorMessage, e));
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
return;
}
@@ -760,7 +759,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
if (initialConsumptionEnd != 0L) {
_serverMetrics
- .setValueOfTableGauge(_metricKeyName,
ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
+ .setValueOfTableGauge(_clientId,
ServerGauge.LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(now() -
initialConsumptionEnd));
}
// There is a race condition that the destroy() method can be called
which ends up calling stop on the consumer.
@@ -770,7 +769,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// so it is ok not to mark it non-consuming, as the main thread will
clean up this metric in destroy() method
// as the final step.
if (!_shouldStop) {
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
}
}
}
@@ -936,9 +935,9 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
}
long segmentSizeBytes = FileUtils.sizeOfDirectory(indexDir);
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS,
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LAST_REALTIME_SEGMENT_CREATION_DURATION_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(buildTimeMillis));
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LAST_REALTIME_SEGMENT_CREATION_WAIT_TIME_SECONDS,
TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));
if (forCommit) {
@@ -1082,7 +1081,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
* which no longer resides in this host any more, thus causes false positive
information to the metric system.
*/
private void cleanupMetrics() {
- _serverMetrics.removeTableGauge(_metricKeyName,
ServerGauge.LLC_PARTITION_CONSUMING);
+ _serverMetrics.removeTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING);
}
protected void hold() {
@@ -1131,7 +1130,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
public void goOnlineFromConsuming(SegmentZKMetadata segmentZKMetadata)
throws InterruptedException {
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
try {
// Remove the segment file before we do anything else.
removeSegmentFile();
@@ -1202,7 +1201,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
} catch (Exception e) {
Utils.rethrowException(e);
} finally {
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
}
}
@@ -1228,7 +1227,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_segmentLogger.warn("Exception when catching up to final offset", e);
return false;
} finally {
- _serverMetrics.setValueOfTableGauge(_metricKeyName,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
+ _serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 0);
}
if (_currentOffset.compareTo(endOffset) != 0) {
// Timeout?
@@ -1318,7 +1317,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_segmentZKMetadata.getStatus().toString());
_partitionGroupConsumerSemaphore = partitionGroupConsumerSemaphore;
_acquiredConsumerSemaphore = new AtomicBoolean(false);
- _metricKeyName = _tableNameWithType + "-" + streamTopic + "-" +
_partitionGroupId;
+ _clientId = _tableNameWithType + "-" + streamTopic + "-" +
_partitionGroupId;
_segmentLogger =
LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" +
_segmentNameStr);
_tableStreamName = _tableNameWithType + "_" + streamTopic;
_memoryManager =
getMemoryManager(realtimeTableDataManager.getConsumerDir(), _segmentNameStr,
@@ -1326,7 +1325,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
serverMetrics);
_rateLimiter = RealtimeConsumptionRateManager.getInstance()
- .createRateLimiter(_partitionLevelStreamConfig, _tableNameWithType,
_serverMetrics, _metricKeyName);
+ .createRateLimiter(_partitionLevelStreamConfig, _tableNameWithType,
_serverMetrics, _clientId);
List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
String sortedColumn;
@@ -1402,8 +1401,6 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
Set<String> fieldsToRead =
IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(),
_schema);
StreamMessageDecoder streamMessageDecoder =
StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
_streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
- _clientId = streamTopic + "-" + _partitionGroupId;
-
_transformPipeline = new TransformPipeline(tableConfig, schema);
// Acquire semaphore to create stream consumers
try {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 6ffdbe03cc..69ad7c9ac1 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -63,7 +63,8 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
@Override
public Boolean call()
throws Exception {
- String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() +
"-" + _topicName;
+ String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-"
+ + _streamConfig.getTableNameWithType() + "-" + _topicName;
try (
StreamMetadataProvider streamMetadataProvider =
_streamConsumerFactory.createStreamMetadataProvider(clientId)) {
_newPartitionGroupMetadataList =
streamMetadataProvider.computePartitionGroupMetadata(clientId, _streamConfig,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]