Ning Zhang created KAFKA-10370: ---------------------------------- Summary: WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext Key: KAFKA-10370 URL: https://issues.apache.org/jira/browse/KAFKA-10370 Project: Kafka Issue Type: New Feature Components: KafkaConnect Affects Versions: 2.5.0 Reporter: Ning Zhang Assignee: Ning Zhang Fix For: 2.6.0
In WorkerSinkTask.java, when we want the consumer to start consuming from certain offsets, rather than from the last committed offset, [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295] is used to carry the offsets from external world (e.g. implementation of SinkTask). In the [poll() method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, (2) consumer.seek(tp, offset) to rewind the consumer. when running (2), we saw the following IllegalStateException: ``` java.lang.IllegalStateException: No current assignment for partition mytopic-1 at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251) at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135) ``` As suggested in https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, the resolution is to use *consumer.assign* with *consumer.seek* , instead of *consumer.subscribe* -- This message was sent by Atlassian Jira (v8.3.4#803005)