[ https://issues.apache.org/jira/browse/KAFKA-6661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16401219#comment-16401219 ]
ASF GitHub Bot commented on KAFKA-6661: --------------------------------------- ewencp closed pull request #4716: KAFKA-6661: Ensure sink connectors don’t resume consumer when task is paused URL: https://github.com/apache/kafka/pull/4716 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 2995a4e813a..2ba785c4668 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -130,7 +130,7 @@ public void initialize(TaskConfig taskConfig) { try { this.taskConfig = taskConfig.originalsStrings(); this.consumer = createConsumer(); - this.context = new WorkerSinkTaskContext(consumer); + this.context = new WorkerSinkTaskContext(consumer, this); } catch (Throwable t) { log.error("{} Task failed initialization and will not be started.", this, t); onFailure(t); @@ -601,7 +601,7 @@ SinkTaskMetricsGroup sinkTaskMetricsGroup() { private class HandleRebalance implements ConsumerRebalanceListener { @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { - log.debug("{} Partitions assigned", WorkerSinkTask.this); + log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions); lastCommittedOffsets = new HashMap<>(); currentOffsets = new HashMap<>(); for (TopicPartition tp : partitions) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java index 08789497645..386f992e82a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.errors.IllegalWorkerStateException; import org.apache.kafka.connect.sink.SinkTaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Collections; @@ -29,26 +31,32 @@ import java.util.Set; public class WorkerSinkTaskContext implements SinkTaskContext { + + private final Logger log = LoggerFactory.getLogger(getClass()); private Map<TopicPartition, Long> offsets; private long timeoutMs; private KafkaConsumer<byte[], byte[]> consumer; + private final WorkerSinkTask sinkTask; private final Set<TopicPartition> pausedPartitions; private boolean commitRequested; - public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer) { + public WorkerSinkTaskContext(KafkaConsumer<byte[], byte[]> consumer, WorkerSinkTask sinkTask) { this.offsets = new HashMap<>(); this.timeoutMs = -1L; this.consumer = consumer; + this.sinkTask = sinkTask; this.pausedPartitions = new HashSet<>(); } @Override public void offset(Map<TopicPartition, Long> offsets) { + log.debug("{} Setting offsets for topic partitions {}", this, offsets); this.offsets.putAll(offsets); } @Override public void offset(TopicPartition tp, long offset) { + log.debug("{} Setting offset for topic partition {} to {}", this, tp, offset); offsets.put(tp, offset); } @@ -66,6 +74,7 @@ public void clearOffsets() { @Override public void timeout(long timeoutMs) { + log.debug("{} Setting timeout to {} ms", this, timeoutMs); this.timeoutMs = timeoutMs; } @@ -92,7 +101,12 @@ public void pause(TopicPartition... partitions) { } try { Collections.addAll(pausedPartitions, partitions); - consumer.pause(Arrays.asList(partitions)); + if (sinkTask.shouldPause()) { + log.debug("{} Connector is paused, so not pausing consumer's partitions {}", this, partitions); + } else { + consumer.pause(Arrays.asList(partitions)); + log.debug("{} Pausing partitions {}. Connector is not paused.", this, partitions); + } } catch (IllegalStateException e) { throw new IllegalWorkerStateException("SinkTasks may not pause partitions that are not currently assigned to them.", e); } @@ -105,7 +119,12 @@ public void resume(TopicPartition... partitions) { } try { pausedPartitions.removeAll(Arrays.asList(partitions)); - consumer.resume(Arrays.asList(partitions)); + if (sinkTask.shouldPause()) { + log.debug("{} Connector is paused, so not resuming consumer's partitions {}", this, partitions); + } else { + consumer.resume(Arrays.asList(partitions)); + log.debug("{} Resuming partitions: {}", this, partitions); + } } catch (IllegalStateException e) { throw new IllegalWorkerStateException("SinkTasks may not resume partitions that are not currently assigned to them.", e); } @@ -117,6 +136,7 @@ public void resume(TopicPartition... partitions) { @Override public void requestCommit() { + log.debug("{} Requesting commit", this); commitRequested = true; } @@ -128,4 +148,10 @@ public void clearCommitRequest() { commitRequested = false; } + @Override + public String toString() { + return "WorkerSinkTaskContext{" + + "id=" + sinkTask.id + + '}'; + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Sink connectors that explicitly 'resume' topic partitions can resume a paused > task > ---------------------------------------------------------------------------------- > > Key: KAFKA-6661 > URL: https://issues.apache.org/jira/browse/KAFKA-6661 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.9.0.0, 0.10.0.0, 0.11.0.0, 1.0.0 > Reporter: Randall Hauch > Assignee: Randall Hauch > Priority: Critical > > Sink connectors are allowed to use the {{SinkTaskContext}}'s methods to > explicitly pause and resume topic partitions. This is useful when connectors > need additional time processing the records for specific topic partitions > (e.g., the external system has an outage). > However, when the sink connector has been paused via the REST API, the worker > for the sink tasks pause the consumer. When the connector is polled, the poll > request might timeout and return no records. Connect then calls the task's > {{put(...)}} method (with no records), and this allows the task to optionally > call any of the {{SinkTaskContext}}'s pause or resume methods. If it calls > resume, this will unexpectedly resume the paused consumer, causing the > consumer to return messages and the connector to process those messages -- > despite the connector still being paused. > This is reported against 1.0, but the affected code has not been changed > since at least 0.9.0.0. > A workaround is to remove rather than pause a connector. It's inconvenient, > but it works. -- This message was sent by Atlassian JIRA (v7.6.3#76005)