[ 
https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16309815#comment-16309815
 ] 

Randall Hauch edited comment on KAFKA-6252 at 1/3/18 4:05 PM:
--------------------------------------------------------------

As mentioned above, connectors run into this is because the Task doesn't 
properly implement `stop()`. For example, a source task has a {{poll()}} method 
that is expected to block while there are no new records to return. However, 
that blocking must be interrupted when {{stop()}} is called. If the source 
connector uses a thread, then {{stop()}} should interrupt that thread and set 
the state such that {{poll()}} will return an empty list. 

Other source connector implementation may use a {{BlockingQueue}} and call the 
queue's {{take()}} method to grab items out of a queue. In this case, 
{{take()}} blocks indefinitely until there is an item in the queue, but if the 
task's {{stop()}} method is called while {{poll()}} is blocked on the queue's 
{{take()}} method, the task will likely never add another item to the queue and 
{{take()}} -- and thus {{poll()}} -- will never return.

The proper way to use a blocking queue is to use {{BlockingQueue.poll(timeout, 
unit)}} method to block for a max amount of time, and to then handle the case 
when no item is retrieved from the queue. The task's {{poll()}} can return an 
empty list.


{code:java}
public class MySourceTask implements SourceTask {
    private final BlockingQueue<SourceRecord> queue;
    private final AtomicBoolean running = new AtomicBoolean(false);
    ...
    public List<SourceRecord> poll() {
        List<SourceRecord> batch = new LinkedList<>();
        while (running.get()) {
            // Poll for new records but only for a max amount of time!
            SourceRecord record = queue.poll(1L, TimeUnit.SECONDS);
            if (record == null) {
                // the queue was empty, so continue looping ...
                continue;
            }
            // We found at least one record, so add that and drain any others 
...
            batch.add(record);
            queue.drainTo(batch);
            return batch;
        }
        // Task has been stopped
        return batch;
    }
    public void stop() {
        running.set(false);
    }
}
{code}

Of course, there are lots of variations of this basic idea. For example, if 
needed the task could limit the size of the batch by using 
{{queue.drainTo(batch, batch.size() - maxBatchSize - 1)}}, obviously ensuring 
that the second argument is non-negative. 


was (Author: rhauch):
As mentioned above, connectors run into this is because the Task doesn't 
properly implement `stop()`. For example, a source task has a {{poll()}} method 
that is expected to block while there are no new records to return. However, 
that blocking must be interrupted when {{stop()}} is called. If the source 
connector uses a thread, then {{stop()}} should interrupt that thread and set 
the state such that {{poll()}} will return an empty list. 

Other source connector implementation may use a {{BlockingQueue}} and call the 
queue's {{take()}} method to grab items out of a queue. In this case, 
{{take()}} blocks indefinitely until there is an item in the queue, but if the 
task's {{stop()}} method is called while {{poll()}} is blocked on the queue's 
{{take()}} method, the task will likely never add another item to the queue and 
{{take()}} -- and thus {{poll()}} -- will never return.

The proper way to use a blocking queue is to use {{BlockingQueue.poll(timeout, 
unit)}} method to block for a max amount of time, and to then handle the case 
when no item is retrieved from the queue. The task's {{poll()}} can return an 
empty list.

An even better approach is to use {{BlockingQueue.drain(...)}} to drain any 
existing items in the queue. This works really well when using 
{{BlockingQueue<SourceRecord>}}, since the queue can be drained directly into 
the {{List<SourceRecord>}} instance that you can then return. If the queue 
contains something other than {{SourceRecord}}, then simply drain to a new list 
and then process those items to create the {{SourceRecord}} objects and add 
them to the list that you'll return.


> A metric named 'XX' already exists, can't register another one.
> ---------------------------------------------------------------
>
>                 Key: KAFKA-6252
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6252
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.0.0
>         Environment: Linux
>            Reporter: Alexis Sellier
>            Priority: Critical
>
> When a connector crashes (or is not implemented correctly by not 
> stopping/interrupting {{poll()}}), It cannot be restarted and an exception 
> like this is thrown 
> {code:java}
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=offset-commit-max-time-ms, group=connector-task-metrics, 
> description=The maximum time in milliseconds taken by this task to commit 
> offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already 
> exists, can't register another one.
>       at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532)
>       at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256)
>       at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241)
>       at 
> org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.<init>(WorkerTask.java:328)
>       at 
> org.apache.kafka.connect.runtime.WorkerTask.<init>(WorkerTask.java:69)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.<init>(WorkerSinkTask.java:98)
>       at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449)
>       at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {code}
> I guess it's because the function taskMetricsGroup.close is not call in all 
> the cases



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to