[ https://issues.apache.org/jira/browse/KAFKA-3092?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ewen Cheslack-Postava resolved KAFKA-3092. ------------------------------------------ Resolution: Fixed Fix Version/s: 0.9.1.0 Issue resolved by pull request 815 [https://github.com/apache/kafka/pull/815] > Rename SinkTask.onPartitionsAssigned/onPartitionsRevoked and Clarify Contract > ----------------------------------------------------------------------------- > > Key: KAFKA-3092 > URL: https://issues.apache.org/jira/browse/KAFKA-3092 > Project: Kafka > Issue Type: Improvement > Components: copycat > Reporter: Jason Gustafson > Assignee: Jason Gustafson > Fix For: 0.9.1.0 > > > The purpose of the onPartitionsRevoked() and onPartitionsAssigned() methods > exposed in Kafka Connect's SinkTask interface seems a little unclear and too > closely tied to consumer semantics. From the javadoc, these APIs are used to > open/close per-partition resources, but that would suggest that we should > always get one call to onPartitionsAssigned() before writing any records for > the corresponding partitions and one call to onPartitionsRevoked() when we > have finished with them. However, the same methods on the consumer are used > to indicate phases of the rebalance operation: onPartitionsRevoked() is > called before the rebalance begins and onPartitionsAssigned() is called after > it completes. In particular, the consumer does not guarantee a final call to > onPartitionsRevoked(). > This mismatch makes the contract of these methods unclear. In fact, the > WorkerSinkTask currently does not guarantee the initial call to > onPartitionsAssigned(), nor the final call to onPartitionsRevoked(). Instead, > the task implementation must pull the initial assignment from the > SinkTaskContext. To make it more confusing, the call to commit offsets > following onPartitionsRevoked() causes a flush() on a partition which had > already been revoked. All of this makes it difficult to use this API as > suggested in the javadocs. > To fix this, we should clarify the behavior of these methods and consider > renaming them to avoid confusion with the same methods in the consumer API. > If onPartitionsAssigned() is meant for opening resources, maybe we can rename > it to open(). Similarly, onPartitionsRevoked() can be renamed to close(). We > can then fix the code to ensure that a typical open/close contract is > enforced. This would also mean removing the need to pass the initial > assignment in the SinkTaskContext. This would give the following API: > {code} > void open(Collection<TopicPartition> partitions); > void close(Collection<TopicPartition> partitions); > {code} > We could also consider going a little further. Instead of depending on > onPartitionsAssigned() to open resources, tasks could open partition > resources on demand as records are received. In general, connectors will need > some way to close partition-specific resources, but there might not be any > need to pass the full list of partitions to close since the only open > resources should be those that have received writes since the last rebalance. > In this case, we just have a single method: > {code} > void close(); > {code} > The downside to this is that the difference between close() and stop() then > becomes a little unclear. > Obviously these are not compatible changes and connectors would have to be > updated. -- This message was sent by Atlassian JIRA (v6.3.4#6332)