This is an automated email from the ASF dual-hosted git repository.
rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c145ac853b Remove the wrong warning log in KafkaPartitionLevelConsumer
(#9536)
c145ac853b is described below
commit c145ac853b10d1a5b3d7c9038bac4d7d4d0bf3a5
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Oct 5 10:11:54 2022 -0700
Remove the wrong warning log in KafkaPartitionLevelConsumer (#9536)
---
.../pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java | 5 +----
1 file changed, 1 insertion(+), 4 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
index b6b116164f..59ee1c0eab 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java
@@ -58,8 +58,6 @@ public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHa
LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout:
{}ms", _topicPartition, startOffset,
endOffset, timeoutMillis);
}
- LOGGER.warn("poll consumer: {}, startOffset: {}, endOffset:{} timeout:
{}ms", _topicPartition, startOffset,
- endOffset, timeoutMillis);
_consumer.seek(_topicPartition, startOffset);
ConsumerRecords<String, Bytes> consumerRecords =
_consumer.poll(Duration.ofMillis(timeoutMillis));
List<ConsumerRecord<String, Bytes>> messageAndOffsets =
consumerRecords.records(_topicPartition);
@@ -73,8 +71,7 @@ public class KafkaPartitionLevelConsumer extends
KafkaPartitionLevelConnectionHa
if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
if (message != null) {
StreamMessageMetadata rowMetadata = (StreamMessageMetadata)
_kafkaMetadataExtractor.extract(messageAndOffset);
- filtered.add(
- new KafkaStreamMessage(keyBytes, message.get(), rowMetadata));
+ filtered.add(new KafkaStreamMessage(keyBytes, message.get(),
rowMetadata));
} else if (LOGGER.isDebugEnabled()) {
LOGGER.debug("tombstone message at offset {}", offset);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]