This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fd75576 Fix the client id to be topic and partition (#4955)
fd75576 is described below
commit fd75576104df90c7d1ef3b2ec584bea7dab64118
Author: Subbu Subramaniam <[email protected]>
AuthorDate: Thu Jan 2 15:01:46 2020 -0800
Fix the client id to be topic and partition (#4955)
* Fix the client id to be topic and partition
This is a possible backward-incompatible change with metrics and monitoring
systems.
Earlier client id was set to partition ID and host name, causing collision
if there are different topics being consumed on any host running pinot
realtime
consumer.
If a pinot installation uses the metrics generated by Kafka, then the
alerting
and monitoring system should move to use the new client ID in metric names,
if any.
Metrics generated by Pinot do not use client ID, so no change is required
for
for any pinot metrics being monitored.
* Remove extra space
---
.../manager/realtime/LLRealtimeSegmentDataManager.java | 15 +++++++--------
1 file changed, 7 insertions(+), 8 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index a569d45..4615b9d 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -39,9 +39,7 @@ import org.apache.pinot.common.config.CompletionConfig;
import org.apache.pinot.common.config.IndexingConfig;
import org.apache.pinot.common.config.SegmentPartitionConfig;
import org.apache.pinot.common.config.TableConfig;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.common.data.StarTreeIndexSpec;
-import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.common.metadata.instance.InstanceZKMetadata;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
@@ -51,9 +49,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import
org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.CompletionMode;
import org.apache.pinot.common.utils.LLCSegmentName;
-import org.apache.pinot.common.utils.NetUtil;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
-import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.apache.pinot.core.data.recordtransformer.RecordTransformer;
import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
@@ -62,19 +58,22 @@ import
org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.core.io.readerwriter.PinotDataBufferMemoryManager;
import org.apache.pinot.core.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.core.segment.creator.impl.V1Constants;
+import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.PermanentConsumerException;
+import org.apache.pinot.spi.stream.RowMetadata;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.TransientConsumerException;
-import org.apache.pinot.core.segment.creator.impl.V1Constants;
-import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
-import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
@@ -1103,7 +1102,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// Create message decoder
_messageDecoder =
StreamDecoderProvider.create(_partitionLevelStreamConfig, _schema);
- _clientId = _streamPartitionId + "-" + NetUtil.getHostnameOrAddress();
+ _clientId = _streamTopic + "-" + _streamPartitionId;
// Create record transformer
_recordTransformer = CompositeTransformer.getDefaultTransformer(schema);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]