[
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348345#comment-16348345
]
Gunnar Morling edited comment on KAFKA-6252 at 2/1/18 10:27 AM:
----------------------------------------------------------------
I also see this issue and can confirm it's not a connector still stuck in its
polling loop.
That being said I think interrupting {{poll()}} after a while after a task
fails to stop seems a reasonable thing to do to me (i.e. option #1 from
[~rhauch]'s comment above). In fact, I was surprised that it is not happening
yet, given that {{poll()}} is declared to throw {{InterruptedException}}.
Apart from the matter of metrics it may help to prevent threads of incorrectly
implemented connectors hang around after stopping the connector. It won't work
in all cases but it should help in many ({{poll()}} will usually invoke and
block on a method dealing correctly with interruption).
was (Author: gunnar.morling):
I also see this issue and can confirm it's not a connector still stuck in its
polling loop.
That being said I think interrupting {{poll()}} after a while after a task
fails to stop seems a reasonable thing to do to me (i.e. option 1) from
[~rhauch] comment above). In fact, I was surprised that it is not happening
yet, given that {{poll()}} is declared to throw {{InterruptedException}}.
Apart from the matter of metrics it may help to prevent threads of incorrectly
implemented connectors hang around after stopping the connector. It won't work
in all cases but it should help in many ({{poll()}} will usually invoke and
block on a method dealing correctly with interruption).
> A metric named 'XX' already exists, can't register another one.
> ---------------------------------------------------------------
>
> Key: KAFKA-6252
> URL: https://issues.apache.org/jira/browse/KAFKA-6252
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 1.0.0
> Environment: Linux
> Reporter: Alexis Sellier
> Assignee: Arjun Satish
> Priority: Critical
> Fix For: 1.1.0, 1.0.1
>
>
> When a connector crashes (or is not implemented correctly by not
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception
> like this is thrown
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName
> [name=offset-commit-max-time-ms, group=connector-task-metrics,
> description=The maximum time in milliseconds taken by this task to commit
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already
> exists, can't register another one.
> at
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
> at
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.<init>(WorkerTask.java:328)
> at
> org.apache.kafka.connect.runtime.WorkerTask.<init>(WorkerTask.java:69)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.<init>(WorkerSinkTask.java:98)
> at
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
> at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all
> the cases
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)