[ 
https://issues.apache.org/jira/browse/FLINK-24497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17429034#comment-17429034
 ] 

Preston Price edited comment on FLINK-24497 at 10/14/21, 10:55 PM:
-------------------------------------------------------------------

It looks like this new metric "records-lag" was introduced recently here: 
[https://github.com/apache/flink/pull/16838]

But I have not fully groked the change there to understand its purpose, or 
exactly why this error is surfacing.

Some quick debugging on my side shows that the
{code:java}
Map<MetricName, ? extends Metric> metrics{code}
map contains metrics with the expected group _CONSUMER_FETCH_MANAGER_GROUP_, 
but no metrics with the expected name _records-lag_. This causes the call to
{code:java}
MetricUtil.getKafkaMetric(metrics, filter){code}
to throw the exception because it expects to get at least one match via the
{code:java}
.findFirst()
.orElseThrow(){code}
statement.

This leads me to believe this is a cold-start problem where the absence/initial 
calculation of this metric is not handled gracefully.

I am hoping there is a graceful way to mute these exceptions as they are 
prolific, and clogging up my output.


was (Author: preston.price):
It looks like this new metric "records-lag" was introduced recently here: 
[https://github.com/apache/flink/pull/16838]

But I have not fully groked the change there to understand its purpose, or 
exactly why this error is surfacing.

Some quick debugging on my side shows that the
{code:java}
Map<MetricName, ? extends Metric> metrics{code}
contains metrics with the expected group _CONSUMER_FETCH_MANAGER_GROUP_, but no 
metrics with the expected name _records-lag_. This causes the call to
{code:java}
MetricUtil.getKafkaMetric(metrics, filter){code}
to throw the exception because it expects to get at least one match via the
{code:java}
orElseThrow(){code}
statement.

This leads me to believe this is a cold-start problem where the absence/initial 
calculation of this metric is not handled gracefully.

I am hoping there is a graceful way to mute these exceptions as they are 
prolific, and clogging up my output.

> Kafka metrics fetching throws IllegalStateException 
> ----------------------------------------------------
>
>                 Key: FLINK-24497
>                 URL: https://issues.apache.org/jira/browse/FLINK-24497
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka
>            Reporter: Juha
>            Priority: Minor
>             Fix For: 1.14.0
>
>
> I have a simple job that just consumes from a single Kafka topic, performs 
> some filtering and produces to another topic.
> The TaskManager log has these periodically. This is a new problem in 1.14.0, 
> the same setup didn't have the issue when using 1.13.0 or 1.13.2.
>  {code}
> 2021-10-05T15:22:31.928316  [2021-10-05 15:22:31,927] WARN Error when getting 
> Kafka consumer metric "records-lag" for partition "cpu.kafka-1". Metric 
> "pendingBytes" may not be reported correctly.  
> (org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics:306)
> 2021-10-05T15:22:31.928316  java.lang.IllegalStateException: Cannot find 
> Kafka metric matching current filter.
> 2021-10-05T15:22:31.928316    at 
> org.apache.flink.connector.kafka.MetricUtil.lambda$getKafkaMetric$1(MetricUtil.java:63)
>  ~[flink-connector-kafka_2.12-1.14.0.jar:1.14.0]
> 2021-10-05T15:22:31.928316    at 
> java.util.Optional.orElseThrow(Optional.java:408) ~[?:?]
> 2021-10-05T15:22:31.928316    at 
> org.apache.flink.connector.kafka.MetricUtil.getKafkaMetric(MetricUtil.java:61)
>  ~[flink-connector-kafka_2.12-1.14.0.jar:1.14.0]
> 2021-10-05T15:22:31.928316    at 
> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.getRecordsLagMetric(KafkaSourceReaderMetrics.java:304)
>  ~[flink-connector-kafka_2.12-1.14.0.jar:1.14.0]
> 2021-10-05T15:22:31.928316    at 
> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.lambda$maybeAddRecordsLagMetric$4(KafkaSourceReaderMetrics.java:229)
>  ~[flink-connector-kafka_2.12-1.14.0.jar:1.14.0]
> 2021-10-05T15:22:31.928316    at 
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
>  [?:?]
> 2021-10-05T15:22:31.928316    at 
> org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.maybeAddRecordsLagMetric(KafkaSourceReaderMetrics.java:228)
>  [flink-connector-kafka_2.12-1.14.0.jar:1.14.0]
> 2021-10-05T15:22:31.928316    at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:187)
>  [flink-connector-kafka_2.12-1.14.0.jar:1.14.0]
> 2021-10-05T15:22:31.928316    at 
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
>  [flink-table_2.12-1.14.0.jar:1.14.0]
> 2021-10-05T15:22:31.928316    at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
>  [flink-table_2.12-1.14.0.jar:1.14.0]
> 2021-10-05T15:22:31.928316    at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>  [flink-table_2.12-1.14.0.jar:1.14.0]
> 2021-10-05T15:22:31.928316    at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
> 2021-10-05T15:22:31.928316    at 
> java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
> 2021-10-05T15:22:31.928316    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  [?:?]
> 2021-10-05T15:22:31.928316    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  [?:?]
> 2021-10-05T15:22:31.928316    at java.lang.Thread.run(Thread.java:829) [?:?]
> {code}
> Regards,
> Juha



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to