This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new af6f29c  KAFKA-12226: Commit source task offsets without blocking on 
batch delivery (#11323)
af6f29c is described below

commit af6f29c8a0632ecb5d1089dbf1e55de4dbc5b2b2
Author: Chris Egerton <chr...@confluent.io>
AuthorDate: Sun Nov 7 12:39:04 2021 -0500

    KAFKA-12226: Commit source task offsets without blocking on batch delivery 
(#11323)
    
    Replaces the current logic for committing source offsets, which is 
batch-based and blocks until the entirety of the current batch is fully written 
to and acknowledged by the broker, with a new non-blocking approach that 
commits source offsets for source records that have been "fully written" by the 
producer. The new logic consider a record fully written only if that source 
record and all records before it with the same source partition have all been 
written to Kafka and acknowledged.
    
    This new logic uses a deque for every source partition that a source task 
produces records for. Each element in that deque is a SubmittedRecord with a 
flag to track whether the producer has ack'd the delivery of that source record 
to Kafka. Periodically, the worker (on the same thread that polls the source 
task for records and transforms, converts, and dispatches them to the producer) 
polls acknowledged elements from the beginning of each of these deques and 
collects the latest offset [...]
    
    The behavior of the `offset.flush.timeout.ms property` is retained, but 
essentially now only applies to the actual writing of offset data to the 
internal offsets topic (if running in distributed mode) or the offsets file (if 
running in standalone mode). No time is spent during 
`WorkerSourceTask::commitOffsets` waiting on the acknowledgment of records by 
the producer.
    
    This behavior also does not change how the records are dispatched to the 
producer nor how the producer sends or batches those records.
    
    It's possible that memory exhaustion may occur if, for example, a single 
Kafka partition is offline for an extended period. In cases like this, the 
collection of deques in the SubmittedRecords class may continue to grow 
indefinitely until the partition comes back online and the SubmittedRecords in 
those deques that targeted the formerly-offline Kafka partition are 
acknowledged and can be removed. Although this may be suboptimal, it is no 
worse than the existing behavior of the framewo [...]
    
    Author: Chris Egerton <chr...@confluent.io>
    Reviewed: Randall Hauch <rha...@gmail.com>
---
 .../kafka/connect/runtime/SubmittedRecords.java    | 297 +++++++++++++++++++++
 .../kafka/connect/runtime/WorkerSourceTask.java    | 185 +++++--------
 .../kafka/connect/storage/OffsetStorageWriter.java |   5 +-
 .../connect/runtime/ErrorHandlingTaskTest.java     |   4 -
 .../connect/runtime/SubmittedRecordsTest.java      | 289 ++++++++++++++++++++
 .../connect/runtime/WorkerSourceTaskTest.java      | 116 ++++----
 .../connect/storage/OffsetStorageWriterTest.java   |   8 +-
 7 files changed, 712 insertions(+), 192 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
new file mode 100644
index 0000000..472a266
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.source.SourceTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * Used to track source records that have been (or are about to be) dispatched 
to a producer and their accompanying
+ * source offsets. Records are tracked in the order in which they are 
submitted, which should match the order they were
+ * returned from {@link SourceTask#poll()}. The latest-eligible offsets for 
each source partition can be retrieved via
+ * {@link #committableOffsets()}, where every record up to and including the 
record for each returned offset has been
+ * either {@link SubmittedRecord#ack() acknowledged} or {@link 
#removeLastOccurrence(SubmittedRecord) removed}.
+ * Note that this class is not thread-safe, though a {@link SubmittedRecord} 
can be
+ * {@link SubmittedRecord#ack() acknowledged} from a different thread.
+ */
+class SubmittedRecords {
+
+    private static final Logger log = 
LoggerFactory.getLogger(SubmittedRecords.class);
+
+    // Visible for testing
+    final Map<Map<String, Object>, Deque<SubmittedRecord>> records;
+
+    public SubmittedRecords() {
+        this.records = new HashMap<>();
+    }
+
+    /**
+     * Enqueue a new source record before dispatching it to a producer.
+     * The returned {@link SubmittedRecord} should either be {@link 
SubmittedRecord#ack() acknowledged} in the
+     * producer callback, or {@link #removeLastOccurrence(SubmittedRecord) 
removed} if the record could not be successfully
+     * sent to the producer.
+     * 
+     * @param record the record about to be dispatched; may not be null but 
may have a null
+     *               {@link SourceRecord#sourcePartition()} and/or {@link 
SourceRecord#sourceOffset()}
+     * @return a {@link SubmittedRecord} that can be either {@link 
SubmittedRecord#ack() acknowledged} once ack'd by
+     *         the producer, or {@link #removeLastOccurrence removed} if 
synchronously rejected by the producer
+     */
+    @SuppressWarnings("unchecked")
+    public SubmittedRecord submit(SourceRecord record) {
+        return submit((Map<String, Object>) record.sourcePartition(), 
(Map<String, Object>) record.sourceOffset());
+    }
+
+    // Convenience method for testing
+    SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> 
offset) {
+        SubmittedRecord result = new SubmittedRecord(partition, offset);
+        records.computeIfAbsent(result.partition(), p -> new LinkedList<>())
+                .add(result);
+        return result;
+    }
+
+    /**
+     * Remove a source record and do not take it into account any longer when 
tracking offsets.
+     * Useful if the record has been synchronously rejected by the producer.
+     * If multiple instances of the same {@link SubmittedRecord} have been 
submitted already, only the first one found
+     * (traversing from the end of the deque backward) will be removed.
+     * @param record the {@link #submit previously-submitted} record to stop 
tracking; may not be null
+     * @return whether an instance of the record was removed
+     */
+    public boolean removeLastOccurrence(SubmittedRecord record) {
+        Deque<SubmittedRecord> deque = records.get(record.partition());
+        if (deque == null) {
+            log.warn("Attempted to remove record from submitted queue for 
partition {}, but no records with that partition appear to have been 
submitted", record.partition());
+            return false;
+        }
+        boolean result = deque.removeLastOccurrence(record);
+        if (deque.isEmpty()) {
+            records.remove(record.partition());
+        }
+        if (!result) {
+            log.warn("Attempted to remove record from submitted queue for 
partition {}, but the record has not been submitted or has already been 
removed", record.partition());
+        }
+        return result;
+    }
+
+    /**
+     * Clear out any acknowledged records at the head of the deques and return 
a {@link CommittableOffsets snapshot} of the offsets and offset metadata
+     * accrued between the last time this method was invoked and now. This 
snapshot can be {@link CommittableOffsets#updatedWith(CommittableOffsets) 
combined}
+     * with an existing snapshot if desired.
+     * Note that this may take some time to complete if a large number of 
records has built up, which may occur if a
+     * Kafka partition is offline and all records targeting that partition go 
unacknowledged while records targeting
+     * other partitions continue to be dispatched to the producer and sent 
successfully
+     * @return a fresh offset snapshot; never null
+     */
+    public CommittableOffsets committableOffsets() {
+        Map<Map<String, Object>, Map<String, Object>> offsets = new 
HashMap<>();
+        int totalCommittableMessages = 0;
+        int totalUncommittableMessages = 0;
+        int largestDequeSize = 0;
+        Map<String, Object> largestDequePartition = null;
+        for (Map.Entry<Map<String, Object>, Deque<SubmittedRecord>> entry : 
records.entrySet()) {
+            Map<String, Object> partition = entry.getKey();
+            Deque<SubmittedRecord> queuedRecords = entry.getValue();
+            int initialDequeSize = queuedRecords.size();
+            if (canCommitHead(queuedRecords)) {
+                Map<String, Object> offset = committableOffset(queuedRecords);
+                offsets.put(partition, offset);
+            }
+            int uncommittableMessages = queuedRecords.size();
+            int committableMessages = initialDequeSize - uncommittableMessages;
+            totalCommittableMessages += committableMessages;
+            totalUncommittableMessages += uncommittableMessages;
+            if (uncommittableMessages > largestDequeSize) {
+                largestDequeSize = uncommittableMessages;
+                largestDequePartition = partition;
+            }
+        }
+        // Clear out all empty deques from the map to keep it from growing 
indefinitely
+        records.values().removeIf(Deque::isEmpty);
+        return new CommittableOffsets(offsets, totalCommittableMessages, 
totalUncommittableMessages, records.size(), largestDequeSize, 
largestDequePartition);
+    }
+
+    // Note that this will return null if either there are no committable 
offsets for the given deque, or the latest
+    // committable offset is itself null. The caller is responsible for 
distinguishing between the two cases.
+    private Map<String, Object> committableOffset(Deque<SubmittedRecord> 
queuedRecords) {
+        Map<String, Object> result = null;
+        while (canCommitHead(queuedRecords)) {
+            result = queuedRecords.poll().offset();
+        }
+        return result;
+    }
+
+    private boolean canCommitHead(Deque<SubmittedRecord> queuedRecords) {
+        return queuedRecords.peek() != null && queuedRecords.peek().acked();
+    }
+
+    static class SubmittedRecord {
+        private final Map<String, Object> partition;
+        private final Map<String, Object> offset;
+        private volatile boolean acked;
+
+        public SubmittedRecord(Map<String, Object> partition, Map<String, 
Object> offset) {
+            this.partition = partition;
+            this.offset = offset;
+            this.acked = false;
+        }
+
+        /**
+         * Acknowledge this record; signals that its offset may be safely 
committed.
+         * This is safe to be called from a different thread than what called 
{@link SubmittedRecords#submit(SourceRecord)}.
+         */
+        public void ack() {
+            this.acked = true;
+        }
+
+        private boolean acked() {
+            return acked;
+        }
+
+        private Map<String, Object> partition() {
+            return partition;
+        }
+
+        private Map<String, Object> offset() {
+            return offset;
+        }
+    }
+
+    /**
+     * Contains a snapshot of offsets that can be committed for a source task 
and metadata for that offset commit
+     * (such as the number of messages for which offsets can and cannot be 
committed).
+     */
+    static class CommittableOffsets {
+
+        /**
+         * An "empty" snapshot that contains no offsets to commit and whose 
metadata contains no committable or uncommitable messages.
+         */
+        public static final CommittableOffsets EMPTY = new 
CommittableOffsets(Collections.emptyMap(), 0, 0, 0, 0, null);
+
+        private final Map<Map<String, Object>, Map<String, Object>> offsets;
+        private final int numCommittableMessages;
+        private final int numUncommittableMessages;
+        private final int numDeques;
+        private final int largestDequeSize;
+        private final Map<String, Object> largestDequePartition;
+
+        CommittableOffsets(
+                Map<Map<String, Object>, Map<String, Object>> offsets,
+                int numCommittableMessages,
+                int numUncommittableMessages,
+                int numDeques,
+                int largestDequeSize,
+                Map<String, Object> largestDequePartition
+        ) {
+            this.offsets = offsets != null ? new HashMap<>(offsets) : 
Collections.emptyMap();
+            this.numCommittableMessages = numCommittableMessages;
+            this.numUncommittableMessages = numUncommittableMessages;
+            this.numDeques = numDeques;
+            this.largestDequeSize = largestDequeSize;
+            this.largestDequePartition = largestDequePartition;
+        }
+
+        /**
+         * @return the offsets that can be committed at the time of the 
snapshot
+         */
+        public Map<Map<String, Object>, Map<String, Object>> offsets() {
+            return Collections.unmodifiableMap(offsets);
+        }
+
+        /**
+         * @return the number of committable messages at the time of the 
snapshot, where a committable message is both
+         * acknowledged and not preceded by any unacknowledged messages in the 
deque for its source partition
+         */
+        public int numCommittableMessages() {
+            return numCommittableMessages;
+        }
+
+        /**
+         * @return the number of uncommittable messages at the time of the 
snapshot, where an uncommittable message
+         * is either unacknowledged, or preceded in the deque for its source 
partition by an unacknowledged message
+         */
+        public int numUncommittableMessages() {
+            return numUncommittableMessages;
+        }
+
+        /**
+         * @return the number of non-empty deques tracking uncommittable 
messages at the time of the snapshot
+         */
+        public int numDeques() {
+            return numDeques;
+        }
+
+        /**
+         * @return the size of the largest deque at the time of the snapshot
+         */
+        public int largestDequeSize() {
+            return largestDequeSize;
+        }
+
+        /**
+         * Get the partition for the deque with the most uncommitted messages 
at the time of the snapshot.
+         * @return the applicable partition, which may be null, or null if 
there are no uncommitted messages;
+         * it is the caller's responsibility to distinguish between these two 
cases via {@link #hasPending()}
+         */
+        public Map<String, Object> largestDequePartition() {
+            return largestDequePartition;
+        }
+
+        /**
+         * @return whether there were any uncommittable messages at the time 
of the snapshot
+         */
+        public boolean hasPending() {
+            return numUncommittableMessages > 0;
+        }
+
+        /**
+         * @return whether there were any committable or uncommittable 
messages at the time of the snapshot
+         */
+        public boolean isEmpty() {
+            return numCommittableMessages == 0 && numUncommittableMessages == 
0 && offsets.isEmpty();
+        }
+
+        /**
+         * Create a new snapshot by combining the data for this snapshot with 
newer data in a more recent snapshot.
+         * Offsets are combined (giving precedence to the newer snapshot in 
case of conflict), the total number of
+         * committable messages is summed across the two snapshots, and the 
newer snapshot's information on pending
+         * messages (num deques, largest deque size, etc.) is used.
+         * @param newerOffsets the newer snapshot to combine with this snapshot
+         * @return the new offset snapshot containing information from this 
snapshot and the newer snapshot; never null
+         */
+        public CommittableOffsets updatedWith(CommittableOffsets newerOffsets) 
{
+            Map<Map<String, Object>, Map<String, Object>> offsets = new 
HashMap<>(this.offsets);
+            offsets.putAll(newerOffsets.offsets);
+
+            return new CommittableOffsets(
+                    offsets,
+                    this.numCommittableMessages + 
newerOffsets.numCommittableMessages,
+                    newerOffsets.numUncommittableMessages,
+                    newerOffsets.numDeques,
+                    newerOffsets.largestDequeSize,
+                    newerOffsets.largestDequePartition
+            );
+        }
+    }
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 7307ec6..a33821a 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -55,7 +55,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
-import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -66,6 +65,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static 
org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
+import static 
org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;
 import static 
org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG;
 
 /**
@@ -94,14 +95,9 @@ class WorkerSourceTask extends WorkerTask {
     private final TopicCreation topicCreation;
 
     private List<SourceRecord> toSend;
-    private boolean lastSendFailed; // Whether the last send failed 
*synchronously*, i.e. never made it into the producer's RecordAccumulator
-    // Use IdentityHashMap to ensure correctness with duplicate records. This 
is a HashMap because
-    // there is no IdentityHashSet.
-    private IdentityHashMap<ProducerRecord<byte[], byte[]>, 
ProducerRecord<byte[], byte[]>> outstandingMessages;
-    // A second buffer is used while an offset flush is running
-    private IdentityHashMap<ProducerRecord<byte[], byte[]>, 
ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
-    private boolean flushing;
-    private CountDownLatch stopRequestedLatch;
+    private volatile CommittableOffsets committableOffsets;
+    private final SubmittedRecords submittedRecords;
+    private final CountDownLatch stopRequestedLatch;
 
     private Map<String, String> taskConfig;
     private boolean started = false;
@@ -145,10 +141,8 @@ class WorkerSourceTask extends WorkerTask {
         this.closeExecutor = closeExecutor;
 
         this.toSend = null;
-        this.lastSendFailed = false;
-        this.outstandingMessages = new IdentityHashMap<>();
-        this.outstandingMessagesBacklog = new IdentityHashMap<>();
-        this.flushing = false;
+        this.committableOffsets = CommittableOffsets.EMPTY;
+        this.submittedRecords = new SubmittedRecords();
         this.stopRequestedLatch = new CountDownLatch(1);
         this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, 
connectMetrics);
         this.producerSendException = new AtomicReference<>();
@@ -237,6 +231,8 @@ class WorkerSourceTask extends WorkerTask {
         try {
             log.info("{} Executing source task", this);
             while (!isStopping()) {
+                updateCommittableOffsets();
+
                 if (shouldPause()) {
                     onPause();
                     if (awaitUnpause()) {
@@ -246,7 +242,6 @@ class WorkerSourceTask extends WorkerTask {
                 }
 
                 maybeThrowProducerSendException();
-
                 if (toSend == null) {
                     log.trace("{} Nothing to send to Kafka. Polling source for 
additional records", this);
                     long start = time.milliseconds();
@@ -255,6 +250,7 @@ class WorkerSourceTask extends WorkerTask {
                         recordPollReturned(toSend.size(), time.milliseconds() 
- start);
                     }
                 }
+
                 if (toSend == null)
                     continue;
                 log.trace("{} About to send {} records to Kafka", this, 
toSend.size());
@@ -268,6 +264,7 @@ class WorkerSourceTask extends WorkerTask {
             // simply resulted in not getting more records but all the 
existing records should be ok to flush
             // and commit offsets. Worst case, task.flush() will also throw an 
exception causing the offset commit
             // to fail.
+            updateCommittableOffsets();
             commitOffsets();
         }
     }
@@ -291,6 +288,13 @@ class WorkerSourceTask extends WorkerTask {
         }
     }
 
+    private void updateCommittableOffsets() {
+        CommittableOffsets newOffsets = submittedRecords.committableOffsets();
+        synchronized (this) {
+            this.committableOffsets = 
this.committableOffsets.updatedWith(newOffsets);
+        }
+    }
+
     protected List<SourceRecord> poll() throws InterruptedException {
         try {
             return task.poll();
@@ -352,21 +356,7 @@ class WorkerSourceTask extends WorkerTask {
             }
 
             log.trace("{} Appending record to the topic {} with key {}, value 
{}", this, record.topic(), record.key(), record.value());
-            // We need this queued first since the callback could happen 
immediately (even synchronously in some cases).
-            // Because of this we need to be careful about handling retries -- 
we always save the previously attempted
-            // record as part of toSend and need to use a flag to track 
whether we should actually add it to the outstanding
-            // messages and update the offsets.
-            synchronized (this) {
-                if (!lastSendFailed) {
-                    if (!flushing) {
-                        outstandingMessages.put(producerRecord, 
producerRecord);
-                    } else {
-                        outstandingMessagesBacklog.put(producerRecord, 
producerRecord);
-                    }
-                    // Offsets are converted & serialized in the OffsetWriter
-                    offsetWriter.offset(record.sourcePartition(), 
record.sourceOffset());
-                }
-            }
+            SubmittedRecord submittedRecord = submittedRecords.submit(record);
             try {
                 maybeCreateTopic(record.topic());
                 final String topic = producerRecord.topic();
@@ -378,7 +368,7 @@ class WorkerSourceTask extends WorkerTask {
                             log.trace("{} Failed record: {}", 
WorkerSourceTask.this, preTransformRecord);
                             producerSendException.compareAndSet(null, e);
                         } else {
-                            recordSent(producerRecord);
+                            submittedRecord.ack();
                             counter.completeRecord();
                             log.trace("{} Wrote record successfully: topic {} 
partition {} offset {}",
                                     WorkerSourceTask.this,
@@ -390,12 +380,11 @@ class WorkerSourceTask extends WorkerTask {
                             }
                         }
                     });
-                lastSendFailed = false;
             } catch (RetriableException | 
org.apache.kafka.common.errors.RetriableException e) {
                 log.warn("{} Failed to send record to topic '{}' and partition 
'{}'. Backing off before retrying: ",
                         this, producerRecord.topic(), 
producerRecord.partition(), e);
                 toSend = toSend.subList(processed, toSend.size());
-                lastSendFailed = true;
+                submittedRecords.removeLastOccurrence(submittedRecord);
                 counter.retryRemaining();
                 return false;
             } catch (ConnectException e) {
@@ -473,20 +462,6 @@ class WorkerSourceTask extends WorkerTask {
         }
     }
 
-    private synchronized void recordSent(final ProducerRecord<byte[], byte[]> 
record) {
-        ProducerRecord<byte[], byte[]> removed = 
outstandingMessages.remove(record);
-        // While flushing, we may also see callbacks for items in the backlog
-        if (removed == null && flushing)
-            removed = outstandingMessagesBacklog.remove(record);
-        // But if neither one had it, something is very wrong
-        if (removed == null) {
-            log.error("{} CRITICAL Saw callback for record from topic {} 
partition {} that was not present in the outstanding message set", this, 
record.topic(), record.partition());
-        } else if (flushing && outstandingMessages.isEmpty()) {
-            // flush thread may be waiting on the outstanding messages to clear
-            this.notifyAll();
-        }
-    }
-
     public boolean commitOffsets() {
         long commitTimeoutMs = 
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
 
@@ -495,58 +470,56 @@ class WorkerSourceTask extends WorkerTask {
         long started = time.milliseconds();
         long timeout = started + commitTimeoutMs;
 
+        CommittableOffsets offsetsToCommit;
         synchronized (this) {
-            // First we need to make sure we snapshot everything in exactly 
the current state. This
-            // means both the current set of messages we're still waiting to 
finish, stored in this
-            // class, which setting flushing = true will handle by storing any 
new values into a new
-            // buffer; and the current set of user-specified offsets, stored 
in the
-            // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-            flushing = true;
-            boolean flushStarted = offsetWriter.beginFlush();
-            // Still wait for any producer records to flush, even if there 
aren't any offsets to write
-            // to persistent storage
-
-            // Next we need to wait for all outstanding messages to finish 
sending
-            log.info("{} flushing {} outstanding messages for offset commit", 
this, outstandingMessages.size());
-            while (!outstandingMessages.isEmpty()) {
-                try {
-                    long timeoutMs = timeout - time.milliseconds();
-                    // If the task has been cancelled, no more records will be 
sent from the producer; in that case, if any outstanding messages remain,
-                    // we can stop flushing immediately
-                    if (isCancelled() || timeoutMs <= 0) {
-                        log.error("{} Failed to flush, timed out while waiting 
for producer to flush outstanding {} messages", this, 
outstandingMessages.size());
-                        finishFailedFlush();
-                        recordCommitFailure(time.milliseconds() - started, 
null);
-                        return false;
-                    }
-                    this.wait(timeoutMs);
-                } catch (InterruptedException e) {
-                    // We can get interrupted if we take too long committing 
when the work thread shutdown is requested,
-                    // requiring a forcible shutdown. Give up since we can't 
safely commit any offsets, but also need
-                    // to stop immediately
-                    log.error("{} Interrupted while flushing messages, offsets 
will not be committed", this);
-                    finishFailedFlush();
-                    recordCommitFailure(time.milliseconds() - started, null);
-                    return false;
-                }
-            }
+            offsetsToCommit = this.committableOffsets;
+            this.committableOffsets = CommittableOffsets.EMPTY;
+        }
 
-            if (!flushStarted) {
-                // There was nothing in the offsets to process, but we still 
waited for the data in the
-                // buffer to flush. This is useful since this can feed into 
metrics to monitor, e.g.
-                // flush time, which can be used for monitoring even if the 
connector doesn't record any
-                // offsets.
-                finishSuccessfulFlush();
-                long durationMillis = time.milliseconds() - started;
-                recordCommitSuccess(durationMillis);
-                log.debug("{} Finished offset commitOffsets successfully in {} 
ms",
-                        this, durationMillis);
-
-                commitSourceTask();
-                return true;
+        if (committableOffsets.isEmpty()) {
+            log.info("{} Either no records were produced by the task since the 
last offset commit, " 
+                    + "or every record has been filtered out by a 
transformation " 
+                    + "or dropped due to transformation or conversion errors.",
+                    this
+            );
+            // We continue with the offset commit process here instead of 
simply returning immediately
+            // in order to invoke SourceTask::commit and record metrics for a 
successful offset commit
+        } else {
+            log.info("{} Committing offsets for {} acknowledged messages", 
this, committableOffsets.numCommittableMessages());
+            if (committableOffsets.hasPending()) {
+                log.debug("{} There are currently {} pending messages spread 
across {} source partitions whose offsets will not be committed. "
+                                + "The source partition with the most pending 
messages is {}, with {} pending messages",
+                        this,
+                        committableOffsets.numUncommittableMessages(),
+                        committableOffsets.numDeques(),
+                        committableOffsets.largestDequePartition(),
+                        committableOffsets.largestDequeSize()
+                );
+            } else {
+                log.debug("{} There are currently no pending messages for this 
offset commit; " 
+                        + "all messages dispatched to the task's producer 
since the last commit have been acknowledged",
+                        this
+                );
             }
         }
 
+        // Update the offset writer with any new offsets for records that have 
been acked.
+        // The offset writer will continue to track all offsets until they are 
able to be successfully flushed.
+        // IOW, if the offset writer fails to flush, it keeps those offset for 
the next attempt,
+        // though we may update them here with newer offsets for acked records.
+        offsetsToCommit.offsets().forEach(offsetWriter::offset);
+
+        if (!offsetWriter.beginFlush()) {
+            // There was nothing in the offsets to process, but we still mark 
a successful offset commit.
+            long durationMillis = time.milliseconds() - started;
+            recordCommitSuccess(durationMillis);
+            log.debug("{} Finished offset commitOffsets successfully in {} ms",
+                    this, durationMillis);
+
+            commitSourceTask();
+            return true;
+        }
+
         // Now we can actually flush the offsets to user storage.
         Future<Void> flushFuture = offsetWriter.doFlush((error, result) -> {
             if (error != null) {
@@ -558,7 +531,7 @@ class WorkerSourceTask extends WorkerTask {
         // Very rare case: offsets were unserializable and we finished 
immediately, unable to store
         // any data
         if (flushFuture == null) {
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, null);
             return false;
         }
@@ -570,22 +543,21 @@ class WorkerSourceTask extends WorkerTask {
             // could look a little confusing.
         } catch (InterruptedException e) {
             log.warn("{} Flush of offsets interrupted, cancelling", this);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (ExecutionException e) {
             log.error("{} Flush of offsets threw an unexpected exception: ", 
this, e);
-            finishFailedFlush();
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, e);
             return false;
         } catch (TimeoutException e) {
-            log.error("{} Timed out waiting to flush offsets to storage", 
this);
-            finishFailedFlush();
+            log.error("{} Timed out waiting to flush offsets to storage; will 
try again on next flush interval with latest offsets", this);
+            offsetWriter.cancelFlush();
             recordCommitFailure(time.milliseconds() - started, null);
             return false;
         }
 
-        finishSuccessfulFlush();
         long durationMillis = time.milliseconds() - started;
         recordCommitSuccess(durationMillis);
         log.debug("{} Finished commitOffsets successfully in {} ms",
@@ -604,21 +576,6 @@ class WorkerSourceTask extends WorkerTask {
         }
     }
 
-    private synchronized void finishFailedFlush() {
-        offsetWriter.cancelFlush();
-        outstandingMessages.putAll(outstandingMessagesBacklog);
-        outstandingMessagesBacklog.clear();
-        flushing = false;
-    }
-
-    private synchronized void finishSuccessfulFlush() {
-        // If we were successful, we can just swap instead of replacing items 
back into the original map
-        IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], 
byte[]>> temp = outstandingMessages;
-        outstandingMessages = outstandingMessagesBacklog;
-        outstandingMessagesBacklog = temp;
-        flushing = false;
-    }
-
     @Override
     public String toString() {
         return "WorkerSourceTask{" +
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index f8ab6a7..7766e2c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -89,9 +89,8 @@ public class OffsetStorageWriter {
      * @param partition the partition to store an offset for
      * @param offset the offset
      */
-    @SuppressWarnings("unchecked")
-    public synchronized void offset(Map<String, ?> partition, Map<String, ?> 
offset) {
-        data.put((Map<String, Object>) partition, (Map<String, Object>) 
offset);
+    public synchronized void offset(Map<String, Object> partition, Map<String, 
Object> offset) {
+        data.put(partition, offset);
     }
 
     private boolean flushing() {
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index b0cd0b4..4743dce 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -378,8 +378,6 @@ public class ErrorHandlingTaskTest {
 
         EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
 
-        offsetWriter.offset(EasyMock.anyObject(), EasyMock.anyObject());
-        EasyMock.expectLastCall().times(2);
         sourceTask.initialize(EasyMock.anyObject());
         EasyMock.expectLastCall();
 
@@ -444,8 +442,6 @@ public class ErrorHandlingTaskTest {
 
         EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true);
 
-        offsetWriter.offset(EasyMock.anyObject(), EasyMock.anyObject());
-        EasyMock.expectLastCall().times(2);
         sourceTask.initialize(EasyMock.anyObject());
         EasyMock.expectLastCall();
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
new file mode 100644
index 0000000..fd339fc
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static 
org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord;
+import static 
org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SubmittedRecordsTest {
+
+    private static final Map<String, Object> PARTITION1 = 
Collections.singletonMap("subreddit", "apachekafka");
+    private static final Map<String, Object> PARTITION2 = 
Collections.singletonMap("subreddit", "adifferentvalue");
+    private static final Map<String, Object> PARTITION3 = 
Collections.singletonMap("subreddit", "asdfqweoicus");
+
+    private AtomicInteger offset;
+
+    SubmittedRecords submittedRecords;
+
+    @Before
+    public void setup() {
+        submittedRecords = new SubmittedRecords();
+        offset = new AtomicInteger();
+    }
+
+    @Test
+    public void testNoRecords() {
+        CommittableOffsets committableOffsets = 
submittedRecords.committableOffsets();
+        assertTrue(committableOffsets.isEmpty());
+
+        committableOffsets = submittedRecords.committableOffsets();
+        assertTrue(committableOffsets.isEmpty());
+
+        committableOffsets = submittedRecords.committableOffsets();
+        assertTrue(committableOffsets.isEmpty());
+
+        assertNoRemainingDeques();
+    }
+
+    @Test
+    public void testNoCommittedRecords() {
+        for (int i = 0; i < 3; i++) {
+            for (Map<String, Object> partition : Arrays.asList(PARTITION1, 
PARTITION2, PARTITION3)) {
+                submittedRecords.submit(partition, newOffset());
+            }
+        }
+
+        CommittableOffsets committableOffsets = 
submittedRecords.committableOffsets();
+        assertMetadata(committableOffsets, 0, 9, 3, 3, PARTITION1, PARTITION2, 
PARTITION3);
+        assertEquals(Collections.emptyMap(), committableOffsets.offsets());
+
+        committableOffsets = submittedRecords.committableOffsets();
+        assertMetadata(committableOffsets, 0, 9, 3, 3, PARTITION1, PARTITION2, 
PARTITION3);
+        assertEquals(Collections.emptyMap(), committableOffsets.offsets());
+
+        committableOffsets = submittedRecords.committableOffsets();
+        assertMetadata(committableOffsets, 0, 9, 3, 3, PARTITION1, PARTITION2, 
PARTITION3);
+        assertEquals(Collections.emptyMap(), committableOffsets.offsets());
+    }
+
+    @Test
+    public void testSingleAck() {
+        Map<String, Object> offset = newOffset();
+
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, 
offset);
+        CommittableOffsets committableOffsets = 
submittedRecords.committableOffsets();
+        // Record has been submitted but not yet acked; cannot commit offsets 
for it yet
+        assertFalse(committableOffsets.isEmpty());
+        assertEquals(Collections.emptyMap(), committableOffsets.offsets());
+        assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION1);
+        assertNoEmptyDeques();
+
+        submittedRecord.ack();
+        committableOffsets = submittedRecords.committableOffsets();
+        // Record has been acked; can commit offsets for it
+        assertFalse(committableOffsets.isEmpty());
+        assertEquals(Collections.singletonMap(PARTITION1, offset), 
committableOffsets.offsets());
+        assertMetadataNoPending(committableOffsets, 1);
+
+        // Everything has been ack'd and consumed; make sure that it's been 
cleaned up to avoid memory leaks
+        assertNoRemainingDeques();
+
+        committableOffsets = submittedRecords.committableOffsets();
+        // Old offsets should be wiped
+        assertEquals(Collections.emptyMap(), committableOffsets.offsets());
+        assertTrue(committableOffsets.isEmpty());
+    }
+
+    @Test
+    public void testMultipleAcksAcrossMultiplePartitions() {
+        Map<String, Object> partition1Offset1 = newOffset();
+        Map<String, Object> partition1Offset2 = newOffset();
+        Map<String, Object> partition2Offset1 = newOffset();
+        Map<String, Object> partition2Offset2 = newOffset();
+
+        SubmittedRecord partition1Record1 = 
submittedRecords.submit(PARTITION1, partition1Offset1);
+        SubmittedRecord partition1Record2 = 
submittedRecords.submit(PARTITION1, partition1Offset2);
+        SubmittedRecord partition2Record1 = 
submittedRecords.submit(PARTITION2, partition2Offset1);
+        SubmittedRecord partition2Record2 = 
submittedRecords.submit(PARTITION2, partition2Offset2);
+
+        CommittableOffsets committableOffsets = 
submittedRecords.committableOffsets();
+        // No records ack'd yet; can't commit any offsets
+        assertEquals(Collections.emptyMap(), committableOffsets.offsets());
+        assertMetadata(committableOffsets, 0, 4, 2, 2, PARTITION1, PARTITION2);
+        assertNoEmptyDeques();
+
+        partition1Record2.ack();
+        committableOffsets = submittedRecords.committableOffsets();
+        // One record has been ack'd, but a record that comes before it and 
corresponds to the same source partition hasn't been
+        assertEquals(Collections.emptyMap(), committableOffsets.offsets());
+        assertMetadata(committableOffsets, 0, 4, 2, 2, PARTITION1, PARTITION2);
+        assertNoEmptyDeques();
+
+        partition2Record1.ack();
+        committableOffsets = submittedRecords.committableOffsets();
+        // We can commit the first offset for the second partition
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset1), 
committableOffsets.offsets());
+        assertMetadata(committableOffsets, 1, 3, 2, 2, PARTITION1);
+        assertNoEmptyDeques();
+
+        committableOffsets = submittedRecords.committableOffsets();
+        // No new offsets to commit
+        assertEquals(Collections.emptyMap(), committableOffsets.offsets());
+        assertMetadata(committableOffsets, 0, 3, 2, 2, PARTITION1);
+        assertNoEmptyDeques();
+
+        partition1Record1.ack();
+        partition2Record2.ack();
+
+        committableOffsets = submittedRecords.committableOffsets();
+        // We can commit new offsets for both partitions now
+        Map<Map<String, Object>, Map<String, Object>> expectedOffsets = new 
HashMap<>();
+        expectedOffsets.put(PARTITION1, partition1Offset2);
+        expectedOffsets.put(PARTITION2, partition2Offset2);
+        assertEquals(expectedOffsets, committableOffsets.offsets());
+        assertMetadataNoPending(committableOffsets, 3);
+
+        // Everything has been ack'd and consumed; make sure that it's been 
cleaned up to avoid memory leaks
+        assertNoRemainingDeques();
+
+        committableOffsets = submittedRecords.committableOffsets();
+        // No new offsets to commit
+        assertTrue(committableOffsets.isEmpty());
+    }
+
+    @Test
+    public void testRemoveLastSubmittedRecord() {
+        SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, 
newOffset());
+
+        CommittableOffsets committableOffsets = 
submittedRecords.committableOffsets();
+        assertEquals(Collections.emptyMap(), committableOffsets.offsets());
+        assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION1);
+
+        assertTrue("First attempt to remove record from submitted queue should 
succeed", submittedRecords.removeLastOccurrence(submittedRecord));
+        assertFalse("Attempt to remove already-removed record from submitted 
queue should fail", submittedRecords.removeLastOccurrence(submittedRecord));
+
+        committableOffsets = submittedRecords.committableOffsets();
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd 
anything yet, so there should be no committable offsets
+        assertTrue(committableOffsets.isEmpty());
+
+        submittedRecord.ack();
+        committableOffsets = submittedRecords.committableOffsets();
+        // Even though the record has somehow been acknowledged, it should not 
be counted when collecting committable offsets
+        assertTrue(committableOffsets.isEmpty());
+    }
+
+    @Test
+    public void testRemoveNotLastSubmittedRecord() {
+        Map<String, Object> partition1Offset = newOffset();
+        Map<String, Object> partition2Offset = newOffset();
+
+        SubmittedRecord recordToRemove = submittedRecords.submit(PARTITION1, 
partition1Offset);
+        SubmittedRecord lastSubmittedRecord = 
submittedRecords.submit(PARTITION2, partition2Offset);
+
+        CommittableOffsets committableOffsets = 
submittedRecords.committableOffsets();
+        assertMetadata(committableOffsets, 0, 2, 2, 1, PARTITION1, PARTITION2);
+        assertNoEmptyDeques();
+
+        assertTrue("First attempt to remove record from submitted queue should 
succeed", submittedRecords.removeLastOccurrence(recordToRemove));
+
+        committableOffsets = submittedRecords.committableOffsets();
+        // Even if SubmittedRecords::remove is broken, we haven't ack'd 
anything yet, so there should be no committable offsets
+        assertEquals(Collections.emptyMap(), committableOffsets.offsets());
+        assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION2);
+        assertNoEmptyDeques();
+        // The only record for this partition has been removed; we shouldn't 
be tracking a deque for it anymore
+        assertRemovedDeques(PARTITION1);
+
+        recordToRemove.ack();
+        committableOffsets = submittedRecords.committableOffsets();
+        // Even though the record has somehow been acknowledged, it should not 
be counted when collecting committable offsets
+        assertEquals(Collections.emptyMap(), committableOffsets.offsets());
+        assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION2);
+        assertNoEmptyDeques();
+
+        lastSubmittedRecord.ack();
+        committableOffsets = submittedRecords.committableOffsets();
+        // Now that the last-submitted record has been ack'd, we should be 
able to commit its offset
+        assertEquals(Collections.singletonMap(PARTITION2, partition2Offset), 
committableOffsets.offsets());
+        assertMetadata(committableOffsets, 1, 0, 0, 0, (Map<String, Object>) 
null);
+        assertFalse(committableOffsets.hasPending());
+
+        // Everything has been ack'd and consumed; make sure that it's been 
cleaned up to avoid memory leaks
+        assertNoRemainingDeques();
+        committableOffsets = submittedRecords.committableOffsets();
+        assertTrue(committableOffsets.isEmpty());
+    }
+
+    @Test
+    public void testNullPartitionAndOffset() {
+        SubmittedRecord submittedRecord = submittedRecords.submit(null, null);
+        CommittableOffsets committableOffsets = 
submittedRecords.committableOffsets();
+        assertMetadata(committableOffsets, 0, 1, 1, 1, (Map<String, Object>) 
null);
+
+        submittedRecord.ack();
+        committableOffsets = submittedRecords.committableOffsets();
+        assertEquals(Collections.singletonMap(null, null), 
committableOffsets.offsets());
+        assertMetadataNoPending(committableOffsets, 1);
+
+        assertNoEmptyDeques();
+    }
+
+    private void assertNoRemainingDeques() {
+        assertEquals("Internal records map should be completely empty", 
Collections.emptyMap(), submittedRecords.records);
+    }
+
+    @SafeVarargs
+    private final void assertRemovedDeques(Map<String, ?>... partitions) {
+        for (Map<String, ?> partition : partitions) {
+            assertFalse("Deque for partition " + partition + " should have 
been cleaned up from internal records map", 
submittedRecords.records.containsKey(partition));
+        }
+    }
+
+    private void assertNoEmptyDeques() {
+        submittedRecords.records.forEach((partition, deque) ->
+            assertFalse("Empty deque for partition " + partition + " should 
have been cleaned up from internal records map", deque.isEmpty())
+        );
+    }
+
+    private Map<String, Object> newOffset() {
+        return Collections.singletonMap("timestamp", offset.getAndIncrement());
+    }
+
+    private void assertMetadataNoPending(CommittableOffsets 
committableOffsets, int committableMessages) {
+        assertEquals(committableMessages, 
committableOffsets.numCommittableMessages());
+        assertFalse(committableOffsets.hasPending());
+    }
+
+    @SafeVarargs
+    @SuppressWarnings("varargs")
+    private final void assertMetadata(
+            CommittableOffsets committableOffsets,
+            int committableMessages,
+            int uncommittableMessages,
+            int numDeques,
+            int largestDequeSize,
+            Map<String, Object>... largestDequePartitions
+    ) {
+        assertEquals(committableMessages, 
committableOffsets.numCommittableMessages());
+        assertEquals(uncommittableMessages, 
committableOffsets.numUncommittableMessages());
+        assertEquals(numDeques, committableOffsets.numDeques());
+        assertEquals(largestDequeSize, committableOffsets.largestDequeSize());
+        
assertTrue(Arrays.asList(largestDequePartitions).contains(committableOffsets.largestDequePartition()));
+    }
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 640c1d8..fcd657f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -117,8 +117,8 @@ import static org.junit.Assert.assertTrue;
 public class WorkerSourceTaskTest extends ThreadedTest {
     private static final String TOPIC = "topic";
     private static final String OTHER_TOPIC = "other-topic";
-    private static final Map<String, byte[]> PARTITION = 
Collections.singletonMap("key", "partition".getBytes());
-    private static final Map<String, Integer> OFFSET = 
Collections.singletonMap("key", 12);
+    private static final Map<String, Object> PARTITION = 
Collections.singletonMap("key", "partition".getBytes());
+    private static final Map<String, Object> OFFSET = 
Collections.singletonMap("key", 12);
 
     // Connect-format data
     private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA;
@@ -286,6 +286,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         sourceTask.stop();
         EasyMock.expectLastCall();
+
+        offsetWriter.offset(PARTITION, OFFSET);
+        PowerMock.expectLastCall();
         expectOffsetFlush(true);
 
         statusListener.onShutdown(taskId);
@@ -333,6 +336,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         sourceTask.stop();
         EasyMock.expectLastCall();
+
+        offsetWriter.offset(PARTITION, OFFSET);
+        PowerMock.expectLastCall();
         expectOffsetFlush(true);
 
         statusListener.onShutdown(taskId);
@@ -547,6 +553,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         final CountDownLatch pollLatch = expectPolls(1);
         expectOffsetFlush(true);
 
+        offsetWriter.offset(PARTITION, OFFSET);
+        PowerMock.expectLastCall().atLeastOnce();
+
         expectTopicCreation(TOPIC);
 
         sourceTask.stop();
@@ -590,6 +599,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         final CountDownLatch pollLatch = expectPolls(1);
         expectOffsetFlush(true);
 
+        offsetWriter.offset(PARTITION, OFFSET);
+        PowerMock.expectLastCall().atLeastOnce();
+
         expectTopicCreation(TOPIC);
 
         sourceTask.stop();
@@ -716,25 +728,23 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         expectTopicCreation(TOPIC);
 
         // First round
-        expectSendRecordOnce(false);
+        expectSendRecordOnce();
         // Any Producer retriable exception should work here
         expectSendRecordSyncFailure(new 
org.apache.kafka.common.errors.TimeoutException("retriable sync failure"));
 
         // Second round
-        expectSendRecordOnce(true);
-        expectSendRecordOnce(false);
+        expectSendRecordOnce();
+        expectSendRecordOnce();
 
         PowerMock.replayAll();
 
         // Try to send 3, make first pass, second fail. Should save last two
         Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
         Whitebox.invokeMethod(workerTask, "sendRecords");
-        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
         assertEquals(Arrays.asList(record2, record3), 
Whitebox.getInternalState(workerTask, "toSend"));
 
         // Next they all succeed
         Whitebox.invokeMethod(workerTask, "sendRecords");
-        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
         assertNull(Whitebox.getInternalState(workerTask, "toSend"));
 
         PowerMock.verifyAll();
@@ -792,15 +802,14 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         expectTopicCreation(TOPIC);
 
         // Source task commit record failure will not cause the task to abort
-        expectSendRecordOnce(false);
-        expectSendRecordTaskCommitRecordFail(false, false);
-        expectSendRecordOnce(false);
+        expectSendRecordOnce();
+        expectSendRecordTaskCommitRecordFail(false);
+        expectSendRecordOnce();
 
         PowerMock.replayAll();
 
         Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
         Whitebox.invokeMethod(workerTask, "sendRecords");
-        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
         assertNull(Whitebox.getInternalState(workerTask, "toSend"));
 
         PowerMock.verifyAll();
@@ -930,7 +939,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         expectTopicCreation(TOPIC);
 
-        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, 
true, false, true, true, true, headers);
+        Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, 
true, true, true, true, headers);
 
         PowerMock.replayAll();
 
@@ -968,8 +977,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         expectTopicCreation(TOPIC);
 
-        Capture<ProducerRecord<byte[], byte[]>> sentRecordA = 
expectSendRecord(TOPIC, false, false, true, true, false, null);
-        Capture<ProducerRecord<byte[], byte[]>> sentRecordB = 
expectSendRecord(TOPIC, false, false, true, true, false, null);
+        Capture<ProducerRecord<byte[], byte[]>> sentRecordA = 
expectSendRecord(TOPIC, false, true, true, false, null);
+        Capture<ProducerRecord<byte[], byte[]>> sentRecordB = 
expectSendRecord(TOPIC, false, true, true, false, null);
 
         PowerMock.replayAll();
 
@@ -1009,8 +1018,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         TopicDescription topicDesc = new TopicDescription(TOPIC, false, 
Collections.singletonList(topicPartitionInfo));
         
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC,
 topicDesc));
 
-        expectSendRecordTaskCommitRecordSucceed(false, false);
-        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false);
+        expectSendRecordTaskCommitRecordSucceed(false);
 
         PowerMock.replayAll();
 
@@ -1037,19 +1046,17 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         // Second round - calls to describe and create succeed
         expectTopicCreation(TOPIC);
         // Exactly two records are sent
-        expectSendRecordTaskCommitRecordSucceed(false, false);
-        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false);
+        expectSendRecordTaskCommitRecordSucceed(false);
 
         PowerMock.replayAll();
 
         Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
         Whitebox.invokeMethod(workerTask, "sendRecords");
-        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
         assertEquals(Arrays.asList(record1, record2), 
Whitebox.getInternalState(workerTask, "toSend"));
 
         // Next they all succeed
         Whitebox.invokeMethod(workerTask, "sendRecords");
-        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
         assertNull(Whitebox.getInternalState(workerTask, "toSend"));
     }
 
@@ -1073,19 +1080,17 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         // Second round
         expectTopicCreation(TOPIC);
-        expectSendRecordTaskCommitRecordSucceed(false, false);
-        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false);
+        expectSendRecordTaskCommitRecordSucceed(false);
 
         PowerMock.replayAll();
 
         Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2));
         Whitebox.invokeMethod(workerTask, "sendRecords");
-        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
         assertEquals(Arrays.asList(record1, record2), 
Whitebox.getInternalState(workerTask, "toSend"));
 
         // Next they all succeed
         Whitebox.invokeMethod(workerTask, "sendRecords");
-        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
         assertNull(Whitebox.getInternalState(workerTask, "toSend"));
     }
 
@@ -1105,8 +1110,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         // First round
         expectPreliminaryCalls(OTHER_TOPIC);
         expectTopicCreation(TOPIC);
-        expectSendRecordTaskCommitRecordSucceed(false, false);
-        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false);
+        expectSendRecordTaskCommitRecordSucceed(false);
 
         // First call to describe the topic times out
         EasyMock.expect(admin.describeTopics(OTHER_TOPIC))
@@ -1114,19 +1119,17 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         // Second round
         expectTopicCreation(OTHER_TOPIC);
-        expectSendRecord(OTHER_TOPIC, false, true, true, true, true, 
emptyHeaders());
+        expectSendRecord(OTHER_TOPIC, false, true, true, true, emptyHeaders());
 
         PowerMock.replayAll();
 
         // Try to send 3, make first pass, second fail. Should save last two
         Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
         Whitebox.invokeMethod(workerTask, "sendRecords");
-        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
         assertEquals(Arrays.asList(record3), 
Whitebox.getInternalState(workerTask, "toSend"));
 
         // Next they all succeed
         Whitebox.invokeMethod(workerTask, "sendRecords");
-        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
         assertNull(Whitebox.getInternalState(workerTask, "toSend"));
 
         PowerMock.verifyAll();
@@ -1148,8 +1151,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         // First round
         expectPreliminaryCalls(OTHER_TOPIC);
         expectTopicCreation(TOPIC);
-        expectSendRecordTaskCommitRecordSucceed(false, false);
-        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false);
+        expectSendRecordTaskCommitRecordSucceed(false);
 
         
EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap());
         // First call to create the topic times out
@@ -1159,19 +1162,17 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         // Second round
         expectTopicCreation(OTHER_TOPIC);
-        expectSendRecord(OTHER_TOPIC, false, true, true, true, true, 
emptyHeaders());
+        expectSendRecord(OTHER_TOPIC, false, true, true, true, emptyHeaders());
 
         PowerMock.replayAll();
 
         // Try to send 3, make first pass, second fail. Should save last two
         Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, 
record2, record3));
         Whitebox.invokeMethod(workerTask, "sendRecords");
-        assertEquals(true, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
         assertEquals(Arrays.asList(record3), 
Whitebox.getInternalState(workerTask, "toSend"));
 
         // Next they all succeed
         Whitebox.invokeMethod(workerTask, "sendRecords");
-        assertEquals(false, Whitebox.getInternalState(workerTask, 
"lastSendFailed"));
         assertNull(Whitebox.getInternalState(workerTask, "toSend"));
 
         PowerMock.verifyAll();
@@ -1264,8 +1265,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
         
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(foundTopic(TOPIC));
 
-        expectSendRecordTaskCommitRecordSucceed(false, false);
-        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false);
+        expectSendRecordTaskCommitRecordSucceed(false);
 
         PowerMock.replayAll();
 
@@ -1290,8 +1291,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
         
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
 
-        expectSendRecordTaskCommitRecordSucceed(false, false);
-        expectSendRecordTaskCommitRecordSucceed(false, false);
+        expectSendRecordTaskCommitRecordSucceed(false);
+        expectSendRecordTaskCommitRecordSucceed(false);
 
         PowerMock.replayAll();
 
@@ -1318,8 +1319,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     private void expectPreliminaryCalls(String topic) {
         expectConvertHeadersAndKeyValue(topic, true, emptyHeaders());
         expectApplyTransformationChain(false);
-        offsetWriter.offset(PARTITION, OFFSET);
-        PowerMock.expectLastCall();
     }
 
     private CountDownLatch expectEmptyPolls(int minimum, final AtomicInteger 
count) throws InterruptedException {
@@ -1363,9 +1362,6 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         expectConvertHeadersAndKeyValue(false);
         expectApplyTransformationChain(false);
 
-        offsetWriter.offset(PARTITION, OFFSET);
-        PowerMock.expectLastCall();
-
         EasyMock.expect(
             producer.send(EasyMock.anyObject(ProducerRecord.class),
                 
EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class)))
@@ -1373,33 +1369,28 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() 
throws InterruptedException {
-        return expectSendRecordTaskCommitRecordSucceed(true, false);
+        return expectSendRecordTaskCommitRecordSucceed(true);
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordOnce(boolean isRetry) throws InterruptedException {
-        return expectSendRecordTaskCommitRecordSucceed(false, isRetry);
+    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce() 
throws InterruptedException {
+        return expectSendRecordTaskCommitRecordSucceed(false);
     }
 
     private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordProducerCallbackFail() throws InterruptedException {
-        return expectSendRecord(TOPIC, false, false, false, false, true, 
emptyHeaders());
-    }
-
-    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordTaskCommitRecordSucceed(boolean anyTimes, boolean isRetry) 
throws InterruptedException {
-        return expectSendRecord(TOPIC, anyTimes, isRetry, true, true, true, 
emptyHeaders());
+        return expectSendRecord(TOPIC, false, false, false, true, 
emptyHeaders());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordTaskCommitRecordFail(boolean anyTimes, boolean isRetry) throws 
InterruptedException {
-        return expectSendRecord(TOPIC, anyTimes, isRetry, true, false, true, 
emptyHeaders());
+    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordTaskCommitRecordSucceed(boolean anyTimes) throws 
InterruptedException {
+        return expectSendRecord(TOPIC, anyTimes, true, true, true, 
emptyHeaders());
     }
 
-    private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean 
anyTimes, boolean isRetry, boolean succeed) throws InterruptedException {
-        return expectSendRecord(TOPIC, anyTimes, isRetry, succeed, true, true, 
emptyHeaders());
+    private Capture<ProducerRecord<byte[], byte[]>> 
expectSendRecordTaskCommitRecordFail(boolean anyTimes) throws 
InterruptedException {
+        return expectSendRecord(TOPIC, anyTimes, true, false, true, 
emptyHeaders());
     }
 
     private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(
         String topic,
         boolean anyTimes,
-        boolean isRetry,
         boolean sendSuccess,
         boolean commitSuccess,
         boolean isMockedConverters,
@@ -1413,16 +1404,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
 
-        // 1. Offset data is passed to the offset storage.
-        if (!isRetry) {
-            offsetWriter.offset(PARTITION, OFFSET);
-            if (anyTimes)
-                PowerMock.expectLastCall().anyTimes();
-            else
-                PowerMock.expectLastCall();
-        }
-
-        // 2. Converted data passed to the producer, which will need callbacks 
invoked for flush to work
+        // 1. Converted data passed to the producer, which will need callbacks 
invoked for flush to work
         IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect(
             producer.send(EasyMock.capture(sent),
                 EasyMock.capture(producerCallbacks)));
@@ -1446,7 +1428,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
             expect.andAnswer(expectResponse);
 
         if (sendSuccess) {
-            // 3. As a result of a successful producer send callback, we'll 
notify the source task of the record commit
+            // 2. As a result of a successful producer send callback, we'll 
notify the source task of the record commit
             expectTaskCommitRecordWithOffset(anyTimes, commitSuccess);
             expectTaskGetTopic(anyTimes);
         }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
index 38d7a37..b442bca 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
@@ -47,8 +47,8 @@ import static org.junit.Assert.assertTrue;
 public class OffsetStorageWriterTest {
     private static final String NAMESPACE = "namespace";
     // Connect format - any types should be accepted here
-    private static final Map<String, String> OFFSET_KEY = 
Collections.singletonMap("key", "key");
-    private static final Map<String, Integer> OFFSET_VALUE = 
Collections.singletonMap("key", 12);
+    private static final Map<String, Object> OFFSET_KEY = 
Collections.singletonMap("key", "key");
+    private static final Map<String, Object> OFFSET_VALUE = 
Collections.singletonMap("key", 12);
 
     // Serialized
     private static final byte[] OFFSET_KEY_SERIALIZED = 
"key-serialized".getBytes();
@@ -229,8 +229,8 @@ public class OffsetStorageWriterTest {
      *                          ensure tests complete.
      * @return the captured set of ByteBuffer key-value pairs passed to the 
storage layer
      */
-    private void expectStore(Map<String, String> key, byte[] keySerialized,
-                             Map<String, Integer> value, byte[] 
valueSerialized,
+    private void expectStore(Map<String, Object> key, byte[] keySerialized,
+                             Map<String, Object> value, byte[] valueSerialized,
                              final Callback<Void> callback,
                              final boolean fail,
                              final CountDownLatch waitForCompletion) {

Reply via email to