This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push: new af6f29c KAFKA-12226: Commit source task offsets without blocking on batch delivery (#11323) af6f29c is described below commit af6f29c8a0632ecb5d1089dbf1e55de4dbc5b2b2 Author: Chris Egerton <chr...@confluent.io> AuthorDate: Sun Nov 7 12:39:04 2021 -0500 KAFKA-12226: Commit source task offsets without blocking on batch delivery (#11323) Replaces the current logic for committing source offsets, which is batch-based and blocks until the entirety of the current batch is fully written to and acknowledged by the broker, with a new non-blocking approach that commits source offsets for source records that have been "fully written" by the producer. The new logic consider a record fully written only if that source record and all records before it with the same source partition have all been written to Kafka and acknowledged. This new logic uses a deque for every source partition that a source task produces records for. Each element in that deque is a SubmittedRecord with a flag to track whether the producer has ack'd the delivery of that source record to Kafka. Periodically, the worker (on the same thread that polls the source task for records and transforms, converts, and dispatches them to the producer) polls acknowledged elements from the beginning of each of these deques and collects the latest offset [...] The behavior of the `offset.flush.timeout.ms property` is retained, but essentially now only applies to the actual writing of offset data to the internal offsets topic (if running in distributed mode) or the offsets file (if running in standalone mode). No time is spent during `WorkerSourceTask::commitOffsets` waiting on the acknowledgment of records by the producer. This behavior also does not change how the records are dispatched to the producer nor how the producer sends or batches those records. It's possible that memory exhaustion may occur if, for example, a single Kafka partition is offline for an extended period. In cases like this, the collection of deques in the SubmittedRecords class may continue to grow indefinitely until the partition comes back online and the SubmittedRecords in those deques that targeted the formerly-offline Kafka partition are acknowledged and can be removed. Although this may be suboptimal, it is no worse than the existing behavior of the framewo [...] Author: Chris Egerton <chr...@confluent.io> Reviewed: Randall Hauch <rha...@gmail.com> --- .../kafka/connect/runtime/SubmittedRecords.java | 297 +++++++++++++++++++++ .../kafka/connect/runtime/WorkerSourceTask.java | 185 +++++-------- .../kafka/connect/storage/OffsetStorageWriter.java | 5 +- .../connect/runtime/ErrorHandlingTaskTest.java | 4 - .../connect/runtime/SubmittedRecordsTest.java | 289 ++++++++++++++++++++ .../connect/runtime/WorkerSourceTaskTest.java | 116 ++++---- .../connect/storage/OffsetStorageWriterTest.java | 8 +- 7 files changed, 712 insertions(+), 192 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java new file mode 100644 index 0000000..472a266 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; + +/** + * Used to track source records that have been (or are about to be) dispatched to a producer and their accompanying + * source offsets. Records are tracked in the order in which they are submitted, which should match the order they were + * returned from {@link SourceTask#poll()}. The latest-eligible offsets for each source partition can be retrieved via + * {@link #committableOffsets()}, where every record up to and including the record for each returned offset has been + * either {@link SubmittedRecord#ack() acknowledged} or {@link #removeLastOccurrence(SubmittedRecord) removed}. + * Note that this class is not thread-safe, though a {@link SubmittedRecord} can be + * {@link SubmittedRecord#ack() acknowledged} from a different thread. + */ +class SubmittedRecords { + + private static final Logger log = LoggerFactory.getLogger(SubmittedRecords.class); + + // Visible for testing + final Map<Map<String, Object>, Deque<SubmittedRecord>> records; + + public SubmittedRecords() { + this.records = new HashMap<>(); + } + + /** + * Enqueue a new source record before dispatching it to a producer. + * The returned {@link SubmittedRecord} should either be {@link SubmittedRecord#ack() acknowledged} in the + * producer callback, or {@link #removeLastOccurrence(SubmittedRecord) removed} if the record could not be successfully + * sent to the producer. + * + * @param record the record about to be dispatched; may not be null but may have a null + * {@link SourceRecord#sourcePartition()} and/or {@link SourceRecord#sourceOffset()} + * @return a {@link SubmittedRecord} that can be either {@link SubmittedRecord#ack() acknowledged} once ack'd by + * the producer, or {@link #removeLastOccurrence removed} if synchronously rejected by the producer + */ + @SuppressWarnings("unchecked") + public SubmittedRecord submit(SourceRecord record) { + return submit((Map<String, Object>) record.sourcePartition(), (Map<String, Object>) record.sourceOffset()); + } + + // Convenience method for testing + SubmittedRecord submit(Map<String, Object> partition, Map<String, Object> offset) { + SubmittedRecord result = new SubmittedRecord(partition, offset); + records.computeIfAbsent(result.partition(), p -> new LinkedList<>()) + .add(result); + return result; + } + + /** + * Remove a source record and do not take it into account any longer when tracking offsets. + * Useful if the record has been synchronously rejected by the producer. + * If multiple instances of the same {@link SubmittedRecord} have been submitted already, only the first one found + * (traversing from the end of the deque backward) will be removed. + * @param record the {@link #submit previously-submitted} record to stop tracking; may not be null + * @return whether an instance of the record was removed + */ + public boolean removeLastOccurrence(SubmittedRecord record) { + Deque<SubmittedRecord> deque = records.get(record.partition()); + if (deque == null) { + log.warn("Attempted to remove record from submitted queue for partition {}, but no records with that partition appear to have been submitted", record.partition()); + return false; + } + boolean result = deque.removeLastOccurrence(record); + if (deque.isEmpty()) { + records.remove(record.partition()); + } + if (!result) { + log.warn("Attempted to remove record from submitted queue for partition {}, but the record has not been submitted or has already been removed", record.partition()); + } + return result; + } + + /** + * Clear out any acknowledged records at the head of the deques and return a {@link CommittableOffsets snapshot} of the offsets and offset metadata + * accrued between the last time this method was invoked and now. This snapshot can be {@link CommittableOffsets#updatedWith(CommittableOffsets) combined} + * with an existing snapshot if desired. + * Note that this may take some time to complete if a large number of records has built up, which may occur if a + * Kafka partition is offline and all records targeting that partition go unacknowledged while records targeting + * other partitions continue to be dispatched to the producer and sent successfully + * @return a fresh offset snapshot; never null + */ + public CommittableOffsets committableOffsets() { + Map<Map<String, Object>, Map<String, Object>> offsets = new HashMap<>(); + int totalCommittableMessages = 0; + int totalUncommittableMessages = 0; + int largestDequeSize = 0; + Map<String, Object> largestDequePartition = null; + for (Map.Entry<Map<String, Object>, Deque<SubmittedRecord>> entry : records.entrySet()) { + Map<String, Object> partition = entry.getKey(); + Deque<SubmittedRecord> queuedRecords = entry.getValue(); + int initialDequeSize = queuedRecords.size(); + if (canCommitHead(queuedRecords)) { + Map<String, Object> offset = committableOffset(queuedRecords); + offsets.put(partition, offset); + } + int uncommittableMessages = queuedRecords.size(); + int committableMessages = initialDequeSize - uncommittableMessages; + totalCommittableMessages += committableMessages; + totalUncommittableMessages += uncommittableMessages; + if (uncommittableMessages > largestDequeSize) { + largestDequeSize = uncommittableMessages; + largestDequePartition = partition; + } + } + // Clear out all empty deques from the map to keep it from growing indefinitely + records.values().removeIf(Deque::isEmpty); + return new CommittableOffsets(offsets, totalCommittableMessages, totalUncommittableMessages, records.size(), largestDequeSize, largestDequePartition); + } + + // Note that this will return null if either there are no committable offsets for the given deque, or the latest + // committable offset is itself null. The caller is responsible for distinguishing between the two cases. + private Map<String, Object> committableOffset(Deque<SubmittedRecord> queuedRecords) { + Map<String, Object> result = null; + while (canCommitHead(queuedRecords)) { + result = queuedRecords.poll().offset(); + } + return result; + } + + private boolean canCommitHead(Deque<SubmittedRecord> queuedRecords) { + return queuedRecords.peek() != null && queuedRecords.peek().acked(); + } + + static class SubmittedRecord { + private final Map<String, Object> partition; + private final Map<String, Object> offset; + private volatile boolean acked; + + public SubmittedRecord(Map<String, Object> partition, Map<String, Object> offset) { + this.partition = partition; + this.offset = offset; + this.acked = false; + } + + /** + * Acknowledge this record; signals that its offset may be safely committed. + * This is safe to be called from a different thread than what called {@link SubmittedRecords#submit(SourceRecord)}. + */ + public void ack() { + this.acked = true; + } + + private boolean acked() { + return acked; + } + + private Map<String, Object> partition() { + return partition; + } + + private Map<String, Object> offset() { + return offset; + } + } + + /** + * Contains a snapshot of offsets that can be committed for a source task and metadata for that offset commit + * (such as the number of messages for which offsets can and cannot be committed). + */ + static class CommittableOffsets { + + /** + * An "empty" snapshot that contains no offsets to commit and whose metadata contains no committable or uncommitable messages. + */ + public static final CommittableOffsets EMPTY = new CommittableOffsets(Collections.emptyMap(), 0, 0, 0, 0, null); + + private final Map<Map<String, Object>, Map<String, Object>> offsets; + private final int numCommittableMessages; + private final int numUncommittableMessages; + private final int numDeques; + private final int largestDequeSize; + private final Map<String, Object> largestDequePartition; + + CommittableOffsets( + Map<Map<String, Object>, Map<String, Object>> offsets, + int numCommittableMessages, + int numUncommittableMessages, + int numDeques, + int largestDequeSize, + Map<String, Object> largestDequePartition + ) { + this.offsets = offsets != null ? new HashMap<>(offsets) : Collections.emptyMap(); + this.numCommittableMessages = numCommittableMessages; + this.numUncommittableMessages = numUncommittableMessages; + this.numDeques = numDeques; + this.largestDequeSize = largestDequeSize; + this.largestDequePartition = largestDequePartition; + } + + /** + * @return the offsets that can be committed at the time of the snapshot + */ + public Map<Map<String, Object>, Map<String, Object>> offsets() { + return Collections.unmodifiableMap(offsets); + } + + /** + * @return the number of committable messages at the time of the snapshot, where a committable message is both + * acknowledged and not preceded by any unacknowledged messages in the deque for its source partition + */ + public int numCommittableMessages() { + return numCommittableMessages; + } + + /** + * @return the number of uncommittable messages at the time of the snapshot, where an uncommittable message + * is either unacknowledged, or preceded in the deque for its source partition by an unacknowledged message + */ + public int numUncommittableMessages() { + return numUncommittableMessages; + } + + /** + * @return the number of non-empty deques tracking uncommittable messages at the time of the snapshot + */ + public int numDeques() { + return numDeques; + } + + /** + * @return the size of the largest deque at the time of the snapshot + */ + public int largestDequeSize() { + return largestDequeSize; + } + + /** + * Get the partition for the deque with the most uncommitted messages at the time of the snapshot. + * @return the applicable partition, which may be null, or null if there are no uncommitted messages; + * it is the caller's responsibility to distinguish between these two cases via {@link #hasPending()} + */ + public Map<String, Object> largestDequePartition() { + return largestDequePartition; + } + + /** + * @return whether there were any uncommittable messages at the time of the snapshot + */ + public boolean hasPending() { + return numUncommittableMessages > 0; + } + + /** + * @return whether there were any committable or uncommittable messages at the time of the snapshot + */ + public boolean isEmpty() { + return numCommittableMessages == 0 && numUncommittableMessages == 0 && offsets.isEmpty(); + } + + /** + * Create a new snapshot by combining the data for this snapshot with newer data in a more recent snapshot. + * Offsets are combined (giving precedence to the newer snapshot in case of conflict), the total number of + * committable messages is summed across the two snapshots, and the newer snapshot's information on pending + * messages (num deques, largest deque size, etc.) is used. + * @param newerOffsets the newer snapshot to combine with this snapshot + * @return the new offset snapshot containing information from this snapshot and the newer snapshot; never null + */ + public CommittableOffsets updatedWith(CommittableOffsets newerOffsets) { + Map<Map<String, Object>, Map<String, Object>> offsets = new HashMap<>(this.offsets); + offsets.putAll(newerOffsets.offsets); + + return new CommittableOffsets( + offsets, + this.numCommittableMessages + newerOffsets.numCommittableMessages, + newerOffsets.numUncommittableMessages, + newerOffsets.numDeques, + newerOffsets.largestDequeSize, + newerOffsets.largestDequePartition + ); + } + } +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index 7307ec6..a33821a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -55,7 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; -import java.util.IdentityHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -66,6 +65,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord; +import static org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets; import static org.apache.kafka.connect.runtime.WorkerConfig.TOPIC_TRACKING_ENABLE_CONFIG; /** @@ -94,14 +95,9 @@ class WorkerSourceTask extends WorkerTask { private final TopicCreation topicCreation; private List<SourceRecord> toSend; - private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator - // Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because - // there is no IdentityHashSet. - private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages; - // A second buffer is used while an offset flush is running - private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog; - private boolean flushing; - private CountDownLatch stopRequestedLatch; + private volatile CommittableOffsets committableOffsets; + private final SubmittedRecords submittedRecords; + private final CountDownLatch stopRequestedLatch; private Map<String, String> taskConfig; private boolean started = false; @@ -145,10 +141,8 @@ class WorkerSourceTask extends WorkerTask { this.closeExecutor = closeExecutor; this.toSend = null; - this.lastSendFailed = false; - this.outstandingMessages = new IdentityHashMap<>(); - this.outstandingMessagesBacklog = new IdentityHashMap<>(); - this.flushing = false; + this.committableOffsets = CommittableOffsets.EMPTY; + this.submittedRecords = new SubmittedRecords(); this.stopRequestedLatch = new CountDownLatch(1); this.sourceTaskMetricsGroup = new SourceTaskMetricsGroup(id, connectMetrics); this.producerSendException = new AtomicReference<>(); @@ -237,6 +231,8 @@ class WorkerSourceTask extends WorkerTask { try { log.info("{} Executing source task", this); while (!isStopping()) { + updateCommittableOffsets(); + if (shouldPause()) { onPause(); if (awaitUnpause()) { @@ -246,7 +242,6 @@ class WorkerSourceTask extends WorkerTask { } maybeThrowProducerSendException(); - if (toSend == null) { log.trace("{} Nothing to send to Kafka. Polling source for additional records", this); long start = time.milliseconds(); @@ -255,6 +250,7 @@ class WorkerSourceTask extends WorkerTask { recordPollReturned(toSend.size(), time.milliseconds() - start); } } + if (toSend == null) continue; log.trace("{} About to send {} records to Kafka", this, toSend.size()); @@ -268,6 +264,7 @@ class WorkerSourceTask extends WorkerTask { // simply resulted in not getting more records but all the existing records should be ok to flush // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit // to fail. + updateCommittableOffsets(); commitOffsets(); } } @@ -291,6 +288,13 @@ class WorkerSourceTask extends WorkerTask { } } + private void updateCommittableOffsets() { + CommittableOffsets newOffsets = submittedRecords.committableOffsets(); + synchronized (this) { + this.committableOffsets = this.committableOffsets.updatedWith(newOffsets); + } + } + protected List<SourceRecord> poll() throws InterruptedException { try { return task.poll(); @@ -352,21 +356,7 @@ class WorkerSourceTask extends WorkerTask { } log.trace("{} Appending record to the topic {} with key {}, value {}", this, record.topic(), record.key(), record.value()); - // We need this queued first since the callback could happen immediately (even synchronously in some cases). - // Because of this we need to be careful about handling retries -- we always save the previously attempted - // record as part of toSend and need to use a flag to track whether we should actually add it to the outstanding - // messages and update the offsets. - synchronized (this) { - if (!lastSendFailed) { - if (!flushing) { - outstandingMessages.put(producerRecord, producerRecord); - } else { - outstandingMessagesBacklog.put(producerRecord, producerRecord); - } - // Offsets are converted & serialized in the OffsetWriter - offsetWriter.offset(record.sourcePartition(), record.sourceOffset()); - } - } + SubmittedRecord submittedRecord = submittedRecords.submit(record); try { maybeCreateTopic(record.topic()); final String topic = producerRecord.topic(); @@ -378,7 +368,7 @@ class WorkerSourceTask extends WorkerTask { log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord); producerSendException.compareAndSet(null, e); } else { - recordSent(producerRecord); + submittedRecord.ack(); counter.completeRecord(); log.trace("{} Wrote record successfully: topic {} partition {} offset {}", WorkerSourceTask.this, @@ -390,12 +380,11 @@ class WorkerSourceTask extends WorkerTask { } } }); - lastSendFailed = false; } catch (RetriableException | org.apache.kafka.common.errors.RetriableException e) { log.warn("{} Failed to send record to topic '{}' and partition '{}'. Backing off before retrying: ", this, producerRecord.topic(), producerRecord.partition(), e); toSend = toSend.subList(processed, toSend.size()); - lastSendFailed = true; + submittedRecords.removeLastOccurrence(submittedRecord); counter.retryRemaining(); return false; } catch (ConnectException e) { @@ -473,20 +462,6 @@ class WorkerSourceTask extends WorkerTask { } } - private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record) { - ProducerRecord<byte[], byte[]> removed = outstandingMessages.remove(record); - // While flushing, we may also see callbacks for items in the backlog - if (removed == null && flushing) - removed = outstandingMessagesBacklog.remove(record); - // But if neither one had it, something is very wrong - if (removed == null) { - log.error("{} CRITICAL Saw callback for record from topic {} partition {} that was not present in the outstanding message set", this, record.topic(), record.partition()); - } else if (flushing && outstandingMessages.isEmpty()) { - // flush thread may be waiting on the outstanding messages to clear - this.notifyAll(); - } - } - public boolean commitOffsets() { long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); @@ -495,58 +470,56 @@ class WorkerSourceTask extends WorkerTask { long started = time.milliseconds(); long timeout = started + commitTimeoutMs; + CommittableOffsets offsetsToCommit; synchronized (this) { - // First we need to make sure we snapshot everything in exactly the current state. This - // means both the current set of messages we're still waiting to finish, stored in this - // class, which setting flushing = true will handle by storing any new values into a new - // buffer; and the current set of user-specified offsets, stored in the - // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot. - flushing = true; - boolean flushStarted = offsetWriter.beginFlush(); - // Still wait for any producer records to flush, even if there aren't any offsets to write - // to persistent storage - - // Next we need to wait for all outstanding messages to finish sending - log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); - while (!outstandingMessages.isEmpty()) { - try { - long timeoutMs = timeout - time.milliseconds(); - // If the task has been cancelled, no more records will be sent from the producer; in that case, if any outstanding messages remain, - // we can stop flushing immediately - if (isCancelled() || timeoutMs <= 0) { - log.error("{} Failed to flush, timed out while waiting for producer to flush outstanding {} messages", this, outstandingMessages.size()); - finishFailedFlush(); - recordCommitFailure(time.milliseconds() - started, null); - return false; - } - this.wait(timeoutMs); - } catch (InterruptedException e) { - // We can get interrupted if we take too long committing when the work thread shutdown is requested, - // requiring a forcible shutdown. Give up since we can't safely commit any offsets, but also need - // to stop immediately - log.error("{} Interrupted while flushing messages, offsets will not be committed", this); - finishFailedFlush(); - recordCommitFailure(time.milliseconds() - started, null); - return false; - } - } + offsetsToCommit = this.committableOffsets; + this.committableOffsets = CommittableOffsets.EMPTY; + } - if (!flushStarted) { - // There was nothing in the offsets to process, but we still waited for the data in the - // buffer to flush. This is useful since this can feed into metrics to monitor, e.g. - // flush time, which can be used for monitoring even if the connector doesn't record any - // offsets. - finishSuccessfulFlush(); - long durationMillis = time.milliseconds() - started; - recordCommitSuccess(durationMillis); - log.debug("{} Finished offset commitOffsets successfully in {} ms", - this, durationMillis); - - commitSourceTask(); - return true; + if (committableOffsets.isEmpty()) { + log.info("{} Either no records were produced by the task since the last offset commit, " + + "or every record has been filtered out by a transformation " + + "or dropped due to transformation or conversion errors.", + this + ); + // We continue with the offset commit process here instead of simply returning immediately + // in order to invoke SourceTask::commit and record metrics for a successful offset commit + } else { + log.info("{} Committing offsets for {} acknowledged messages", this, committableOffsets.numCommittableMessages()); + if (committableOffsets.hasPending()) { + log.debug("{} There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. " + + "The source partition with the most pending messages is {}, with {} pending messages", + this, + committableOffsets.numUncommittableMessages(), + committableOffsets.numDeques(), + committableOffsets.largestDequePartition(), + committableOffsets.largestDequeSize() + ); + } else { + log.debug("{} There are currently no pending messages for this offset commit; " + + "all messages dispatched to the task's producer since the last commit have been acknowledged", + this + ); } } + // Update the offset writer with any new offsets for records that have been acked. + // The offset writer will continue to track all offsets until they are able to be successfully flushed. + // IOW, if the offset writer fails to flush, it keeps those offset for the next attempt, + // though we may update them here with newer offsets for acked records. + offsetsToCommit.offsets().forEach(offsetWriter::offset); + + if (!offsetWriter.beginFlush()) { + // There was nothing in the offsets to process, but we still mark a successful offset commit. + long durationMillis = time.milliseconds() - started; + recordCommitSuccess(durationMillis); + log.debug("{} Finished offset commitOffsets successfully in {} ms", + this, durationMillis); + + commitSourceTask(); + return true; + } + // Now we can actually flush the offsets to user storage. Future<Void> flushFuture = offsetWriter.doFlush((error, result) -> { if (error != null) { @@ -558,7 +531,7 @@ class WorkerSourceTask extends WorkerTask { // Very rare case: offsets were unserializable and we finished immediately, unable to store // any data if (flushFuture == null) { - finishFailedFlush(); + offsetWriter.cancelFlush(); recordCommitFailure(time.milliseconds() - started, null); return false; } @@ -570,22 +543,21 @@ class WorkerSourceTask extends WorkerTask { // could look a little confusing. } catch (InterruptedException e) { log.warn("{} Flush of offsets interrupted, cancelling", this); - finishFailedFlush(); + offsetWriter.cancelFlush(); recordCommitFailure(time.milliseconds() - started, e); return false; } catch (ExecutionException e) { log.error("{} Flush of offsets threw an unexpected exception: ", this, e); - finishFailedFlush(); + offsetWriter.cancelFlush(); recordCommitFailure(time.milliseconds() - started, e); return false; } catch (TimeoutException e) { - log.error("{} Timed out waiting to flush offsets to storage", this); - finishFailedFlush(); + log.error("{} Timed out waiting to flush offsets to storage; will try again on next flush interval with latest offsets", this); + offsetWriter.cancelFlush(); recordCommitFailure(time.milliseconds() - started, null); return false; } - finishSuccessfulFlush(); long durationMillis = time.milliseconds() - started; recordCommitSuccess(durationMillis); log.debug("{} Finished commitOffsets successfully in {} ms", @@ -604,21 +576,6 @@ class WorkerSourceTask extends WorkerTask { } } - private synchronized void finishFailedFlush() { - offsetWriter.cancelFlush(); - outstandingMessages.putAll(outstandingMessagesBacklog); - outstandingMessagesBacklog.clear(); - flushing = false; - } - - private synchronized void finishSuccessfulFlush() { - // If we were successful, we can just swap instead of replacing items back into the original map - IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> temp = outstandingMessages; - outstandingMessages = outstandingMessagesBacklog; - outstandingMessagesBacklog = temp; - flushing = false; - } - @Override public String toString() { return "WorkerSourceTask{" + diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java index f8ab6a7..7766e2c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java @@ -89,9 +89,8 @@ public class OffsetStorageWriter { * @param partition the partition to store an offset for * @param offset the offset */ - @SuppressWarnings("unchecked") - public synchronized void offset(Map<String, ?> partition, Map<String, ?> offset) { - data.put((Map<String, Object>) partition, (Map<String, Object>) offset); + public synchronized void offset(Map<String, Object> partition, Map<String, Object> offset) { + data.put(partition, offset); } private boolean flushing() { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index b0cd0b4..4743dce 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -378,8 +378,6 @@ public class ErrorHandlingTaskTest { EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true); - offsetWriter.offset(EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().times(2); sourceTask.initialize(EasyMock.anyObject()); EasyMock.expectLastCall(); @@ -444,8 +442,6 @@ public class ErrorHandlingTaskTest { EasyMock.expect(workerSourceTask.commitOffsets()).andReturn(true); - offsetWriter.offset(EasyMock.anyObject(), EasyMock.anyObject()); - EasyMock.expectLastCall().times(2); sourceTask.initialize(EasyMock.anyObject()); EasyMock.expectLastCall(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java new file mode 100644 index 0000000..fd339fc --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SubmittedRecordsTest.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.kafka.connect.runtime.SubmittedRecords.SubmittedRecord; +import static org.apache.kafka.connect.runtime.SubmittedRecords.CommittableOffsets; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SubmittedRecordsTest { + + private static final Map<String, Object> PARTITION1 = Collections.singletonMap("subreddit", "apachekafka"); + private static final Map<String, Object> PARTITION2 = Collections.singletonMap("subreddit", "adifferentvalue"); + private static final Map<String, Object> PARTITION3 = Collections.singletonMap("subreddit", "asdfqweoicus"); + + private AtomicInteger offset; + + SubmittedRecords submittedRecords; + + @Before + public void setup() { + submittedRecords = new SubmittedRecords(); + offset = new AtomicInteger(); + } + + @Test + public void testNoRecords() { + CommittableOffsets committableOffsets = submittedRecords.committableOffsets(); + assertTrue(committableOffsets.isEmpty()); + + committableOffsets = submittedRecords.committableOffsets(); + assertTrue(committableOffsets.isEmpty()); + + committableOffsets = submittedRecords.committableOffsets(); + assertTrue(committableOffsets.isEmpty()); + + assertNoRemainingDeques(); + } + + @Test + public void testNoCommittedRecords() { + for (int i = 0; i < 3; i++) { + for (Map<String, Object> partition : Arrays.asList(PARTITION1, PARTITION2, PARTITION3)) { + submittedRecords.submit(partition, newOffset()); + } + } + + CommittableOffsets committableOffsets = submittedRecords.committableOffsets(); + assertMetadata(committableOffsets, 0, 9, 3, 3, PARTITION1, PARTITION2, PARTITION3); + assertEquals(Collections.emptyMap(), committableOffsets.offsets()); + + committableOffsets = submittedRecords.committableOffsets(); + assertMetadata(committableOffsets, 0, 9, 3, 3, PARTITION1, PARTITION2, PARTITION3); + assertEquals(Collections.emptyMap(), committableOffsets.offsets()); + + committableOffsets = submittedRecords.committableOffsets(); + assertMetadata(committableOffsets, 0, 9, 3, 3, PARTITION1, PARTITION2, PARTITION3); + assertEquals(Collections.emptyMap(), committableOffsets.offsets()); + } + + @Test + public void testSingleAck() { + Map<String, Object> offset = newOffset(); + + SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, offset); + CommittableOffsets committableOffsets = submittedRecords.committableOffsets(); + // Record has been submitted but not yet acked; cannot commit offsets for it yet + assertFalse(committableOffsets.isEmpty()); + assertEquals(Collections.emptyMap(), committableOffsets.offsets()); + assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION1); + assertNoEmptyDeques(); + + submittedRecord.ack(); + committableOffsets = submittedRecords.committableOffsets(); + // Record has been acked; can commit offsets for it + assertFalse(committableOffsets.isEmpty()); + assertEquals(Collections.singletonMap(PARTITION1, offset), committableOffsets.offsets()); + assertMetadataNoPending(committableOffsets, 1); + + // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks + assertNoRemainingDeques(); + + committableOffsets = submittedRecords.committableOffsets(); + // Old offsets should be wiped + assertEquals(Collections.emptyMap(), committableOffsets.offsets()); + assertTrue(committableOffsets.isEmpty()); + } + + @Test + public void testMultipleAcksAcrossMultiplePartitions() { + Map<String, Object> partition1Offset1 = newOffset(); + Map<String, Object> partition1Offset2 = newOffset(); + Map<String, Object> partition2Offset1 = newOffset(); + Map<String, Object> partition2Offset2 = newOffset(); + + SubmittedRecord partition1Record1 = submittedRecords.submit(PARTITION1, partition1Offset1); + SubmittedRecord partition1Record2 = submittedRecords.submit(PARTITION1, partition1Offset2); + SubmittedRecord partition2Record1 = submittedRecords.submit(PARTITION2, partition2Offset1); + SubmittedRecord partition2Record2 = submittedRecords.submit(PARTITION2, partition2Offset2); + + CommittableOffsets committableOffsets = submittedRecords.committableOffsets(); + // No records ack'd yet; can't commit any offsets + assertEquals(Collections.emptyMap(), committableOffsets.offsets()); + assertMetadata(committableOffsets, 0, 4, 2, 2, PARTITION1, PARTITION2); + assertNoEmptyDeques(); + + partition1Record2.ack(); + committableOffsets = submittedRecords.committableOffsets(); + // One record has been ack'd, but a record that comes before it and corresponds to the same source partition hasn't been + assertEquals(Collections.emptyMap(), committableOffsets.offsets()); + assertMetadata(committableOffsets, 0, 4, 2, 2, PARTITION1, PARTITION2); + assertNoEmptyDeques(); + + partition2Record1.ack(); + committableOffsets = submittedRecords.committableOffsets(); + // We can commit the first offset for the second partition + assertEquals(Collections.singletonMap(PARTITION2, partition2Offset1), committableOffsets.offsets()); + assertMetadata(committableOffsets, 1, 3, 2, 2, PARTITION1); + assertNoEmptyDeques(); + + committableOffsets = submittedRecords.committableOffsets(); + // No new offsets to commit + assertEquals(Collections.emptyMap(), committableOffsets.offsets()); + assertMetadata(committableOffsets, 0, 3, 2, 2, PARTITION1); + assertNoEmptyDeques(); + + partition1Record1.ack(); + partition2Record2.ack(); + + committableOffsets = submittedRecords.committableOffsets(); + // We can commit new offsets for both partitions now + Map<Map<String, Object>, Map<String, Object>> expectedOffsets = new HashMap<>(); + expectedOffsets.put(PARTITION1, partition1Offset2); + expectedOffsets.put(PARTITION2, partition2Offset2); + assertEquals(expectedOffsets, committableOffsets.offsets()); + assertMetadataNoPending(committableOffsets, 3); + + // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks + assertNoRemainingDeques(); + + committableOffsets = submittedRecords.committableOffsets(); + // No new offsets to commit + assertTrue(committableOffsets.isEmpty()); + } + + @Test + public void testRemoveLastSubmittedRecord() { + SubmittedRecord submittedRecord = submittedRecords.submit(PARTITION1, newOffset()); + + CommittableOffsets committableOffsets = submittedRecords.committableOffsets(); + assertEquals(Collections.emptyMap(), committableOffsets.offsets()); + assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION1); + + assertTrue("First attempt to remove record from submitted queue should succeed", submittedRecords.removeLastOccurrence(submittedRecord)); + assertFalse("Attempt to remove already-removed record from submitted queue should fail", submittedRecords.removeLastOccurrence(submittedRecord)); + + committableOffsets = submittedRecords.committableOffsets(); + // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets + assertTrue(committableOffsets.isEmpty()); + + submittedRecord.ack(); + committableOffsets = submittedRecords.committableOffsets(); + // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets + assertTrue(committableOffsets.isEmpty()); + } + + @Test + public void testRemoveNotLastSubmittedRecord() { + Map<String, Object> partition1Offset = newOffset(); + Map<String, Object> partition2Offset = newOffset(); + + SubmittedRecord recordToRemove = submittedRecords.submit(PARTITION1, partition1Offset); + SubmittedRecord lastSubmittedRecord = submittedRecords.submit(PARTITION2, partition2Offset); + + CommittableOffsets committableOffsets = submittedRecords.committableOffsets(); + assertMetadata(committableOffsets, 0, 2, 2, 1, PARTITION1, PARTITION2); + assertNoEmptyDeques(); + + assertTrue("First attempt to remove record from submitted queue should succeed", submittedRecords.removeLastOccurrence(recordToRemove)); + + committableOffsets = submittedRecords.committableOffsets(); + // Even if SubmittedRecords::remove is broken, we haven't ack'd anything yet, so there should be no committable offsets + assertEquals(Collections.emptyMap(), committableOffsets.offsets()); + assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION2); + assertNoEmptyDeques(); + // The only record for this partition has been removed; we shouldn't be tracking a deque for it anymore + assertRemovedDeques(PARTITION1); + + recordToRemove.ack(); + committableOffsets = submittedRecords.committableOffsets(); + // Even though the record has somehow been acknowledged, it should not be counted when collecting committable offsets + assertEquals(Collections.emptyMap(), committableOffsets.offsets()); + assertMetadata(committableOffsets, 0, 1, 1, 1, PARTITION2); + assertNoEmptyDeques(); + + lastSubmittedRecord.ack(); + committableOffsets = submittedRecords.committableOffsets(); + // Now that the last-submitted record has been ack'd, we should be able to commit its offset + assertEquals(Collections.singletonMap(PARTITION2, partition2Offset), committableOffsets.offsets()); + assertMetadata(committableOffsets, 1, 0, 0, 0, (Map<String, Object>) null); + assertFalse(committableOffsets.hasPending()); + + // Everything has been ack'd and consumed; make sure that it's been cleaned up to avoid memory leaks + assertNoRemainingDeques(); + committableOffsets = submittedRecords.committableOffsets(); + assertTrue(committableOffsets.isEmpty()); + } + + @Test + public void testNullPartitionAndOffset() { + SubmittedRecord submittedRecord = submittedRecords.submit(null, null); + CommittableOffsets committableOffsets = submittedRecords.committableOffsets(); + assertMetadata(committableOffsets, 0, 1, 1, 1, (Map<String, Object>) null); + + submittedRecord.ack(); + committableOffsets = submittedRecords.committableOffsets(); + assertEquals(Collections.singletonMap(null, null), committableOffsets.offsets()); + assertMetadataNoPending(committableOffsets, 1); + + assertNoEmptyDeques(); + } + + private void assertNoRemainingDeques() { + assertEquals("Internal records map should be completely empty", Collections.emptyMap(), submittedRecords.records); + } + + @SafeVarargs + private final void assertRemovedDeques(Map<String, ?>... partitions) { + for (Map<String, ?> partition : partitions) { + assertFalse("Deque for partition " + partition + " should have been cleaned up from internal records map", submittedRecords.records.containsKey(partition)); + } + } + + private void assertNoEmptyDeques() { + submittedRecords.records.forEach((partition, deque) -> + assertFalse("Empty deque for partition " + partition + " should have been cleaned up from internal records map", deque.isEmpty()) + ); + } + + private Map<String, Object> newOffset() { + return Collections.singletonMap("timestamp", offset.getAndIncrement()); + } + + private void assertMetadataNoPending(CommittableOffsets committableOffsets, int committableMessages) { + assertEquals(committableMessages, committableOffsets.numCommittableMessages()); + assertFalse(committableOffsets.hasPending()); + } + + @SafeVarargs + @SuppressWarnings("varargs") + private final void assertMetadata( + CommittableOffsets committableOffsets, + int committableMessages, + int uncommittableMessages, + int numDeques, + int largestDequeSize, + Map<String, Object>... largestDequePartitions + ) { + assertEquals(committableMessages, committableOffsets.numCommittableMessages()); + assertEquals(uncommittableMessages, committableOffsets.numUncommittableMessages()); + assertEquals(numDeques, committableOffsets.numDeques()); + assertEquals(largestDequeSize, committableOffsets.largestDequeSize()); + assertTrue(Arrays.asList(largestDequePartitions).contains(committableOffsets.largestDequePartition())); + } +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 640c1d8..fcd657f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -117,8 +117,8 @@ import static org.junit.Assert.assertTrue; public class WorkerSourceTaskTest extends ThreadedTest { private static final String TOPIC = "topic"; private static final String OTHER_TOPIC = "other-topic"; - private static final Map<String, byte[]> PARTITION = Collections.singletonMap("key", "partition".getBytes()); - private static final Map<String, Integer> OFFSET = Collections.singletonMap("key", 12); + private static final Map<String, Object> PARTITION = Collections.singletonMap("key", "partition".getBytes()); + private static final Map<String, Object> OFFSET = Collections.singletonMap("key", 12); // Connect-format data private static final Schema KEY_SCHEMA = Schema.INT32_SCHEMA; @@ -286,6 +286,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { sourceTask.stop(); EasyMock.expectLastCall(); + + offsetWriter.offset(PARTITION, OFFSET); + PowerMock.expectLastCall(); expectOffsetFlush(true); statusListener.onShutdown(taskId); @@ -333,6 +336,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { sourceTask.stop(); EasyMock.expectLastCall(); + + offsetWriter.offset(PARTITION, OFFSET); + PowerMock.expectLastCall(); expectOffsetFlush(true); statusListener.onShutdown(taskId); @@ -547,6 +553,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { final CountDownLatch pollLatch = expectPolls(1); expectOffsetFlush(true); + offsetWriter.offset(PARTITION, OFFSET); + PowerMock.expectLastCall().atLeastOnce(); + expectTopicCreation(TOPIC); sourceTask.stop(); @@ -590,6 +599,9 @@ public class WorkerSourceTaskTest extends ThreadedTest { final CountDownLatch pollLatch = expectPolls(1); expectOffsetFlush(true); + offsetWriter.offset(PARTITION, OFFSET); + PowerMock.expectLastCall().atLeastOnce(); + expectTopicCreation(TOPIC); sourceTask.stop(); @@ -716,25 +728,23 @@ public class WorkerSourceTaskTest extends ThreadedTest { expectTopicCreation(TOPIC); // First round - expectSendRecordOnce(false); + expectSendRecordOnce(); // Any Producer retriable exception should work here expectSendRecordSyncFailure(new org.apache.kafka.common.errors.TimeoutException("retriable sync failure")); // Second round - expectSendRecordOnce(true); - expectSendRecordOnce(false); + expectSendRecordOnce(); + expectSendRecordOnce(); PowerMock.replayAll(); // Try to send 3, make first pass, second fail. Should save last two Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3)); Whitebox.invokeMethod(workerTask, "sendRecords"); - assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed")); assertEquals(Arrays.asList(record2, record3), Whitebox.getInternalState(workerTask, "toSend")); // Next they all succeed Whitebox.invokeMethod(workerTask, "sendRecords"); - assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); assertNull(Whitebox.getInternalState(workerTask, "toSend")); PowerMock.verifyAll(); @@ -792,15 +802,14 @@ public class WorkerSourceTaskTest extends ThreadedTest { expectTopicCreation(TOPIC); // Source task commit record failure will not cause the task to abort - expectSendRecordOnce(false); - expectSendRecordTaskCommitRecordFail(false, false); - expectSendRecordOnce(false); + expectSendRecordOnce(); + expectSendRecordTaskCommitRecordFail(false); + expectSendRecordOnce(); PowerMock.replayAll(); Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3)); Whitebox.invokeMethod(workerTask, "sendRecords"); - assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); assertNull(Whitebox.getInternalState(workerTask, "toSend")); PowerMock.verifyAll(); @@ -930,7 +939,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { expectTopicCreation(TOPIC); - Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, true, false, true, true, true, headers); + Capture<ProducerRecord<byte[], byte[]>> sent = expectSendRecord(TOPIC, true, true, true, true, headers); PowerMock.replayAll(); @@ -968,8 +977,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { expectTopicCreation(TOPIC); - Capture<ProducerRecord<byte[], byte[]>> sentRecordA = expectSendRecord(TOPIC, false, false, true, true, false, null); - Capture<ProducerRecord<byte[], byte[]>> sentRecordB = expectSendRecord(TOPIC, false, false, true, true, false, null); + Capture<ProducerRecord<byte[], byte[]>> sentRecordA = expectSendRecord(TOPIC, false, true, true, false, null); + Capture<ProducerRecord<byte[], byte[]>> sentRecordB = expectSendRecord(TOPIC, false, true, true, false, null); PowerMock.replayAll(); @@ -1009,8 +1018,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo)); EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.singletonMap(TOPIC, topicDesc)); - expectSendRecordTaskCommitRecordSucceed(false, false); - expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false); + expectSendRecordTaskCommitRecordSucceed(false); PowerMock.replayAll(); @@ -1037,19 +1046,17 @@ public class WorkerSourceTaskTest extends ThreadedTest { // Second round - calls to describe and create succeed expectTopicCreation(TOPIC); // Exactly two records are sent - expectSendRecordTaskCommitRecordSucceed(false, false); - expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false); + expectSendRecordTaskCommitRecordSucceed(false); PowerMock.replayAll(); Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); Whitebox.invokeMethod(workerTask, "sendRecords"); - assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed")); assertEquals(Arrays.asList(record1, record2), Whitebox.getInternalState(workerTask, "toSend")); // Next they all succeed Whitebox.invokeMethod(workerTask, "sendRecords"); - assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); assertNull(Whitebox.getInternalState(workerTask, "toSend")); } @@ -1073,19 +1080,17 @@ public class WorkerSourceTaskTest extends ThreadedTest { // Second round expectTopicCreation(TOPIC); - expectSendRecordTaskCommitRecordSucceed(false, false); - expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false); + expectSendRecordTaskCommitRecordSucceed(false); PowerMock.replayAll(); Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2)); Whitebox.invokeMethod(workerTask, "sendRecords"); - assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed")); assertEquals(Arrays.asList(record1, record2), Whitebox.getInternalState(workerTask, "toSend")); // Next they all succeed Whitebox.invokeMethod(workerTask, "sendRecords"); - assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); assertNull(Whitebox.getInternalState(workerTask, "toSend")); } @@ -1105,8 +1110,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { // First round expectPreliminaryCalls(OTHER_TOPIC); expectTopicCreation(TOPIC); - expectSendRecordTaskCommitRecordSucceed(false, false); - expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false); + expectSendRecordTaskCommitRecordSucceed(false); // First call to describe the topic times out EasyMock.expect(admin.describeTopics(OTHER_TOPIC)) @@ -1114,19 +1119,17 @@ public class WorkerSourceTaskTest extends ThreadedTest { // Second round expectTopicCreation(OTHER_TOPIC); - expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders()); + expectSendRecord(OTHER_TOPIC, false, true, true, true, emptyHeaders()); PowerMock.replayAll(); // Try to send 3, make first pass, second fail. Should save last two Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3)); Whitebox.invokeMethod(workerTask, "sendRecords"); - assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed")); assertEquals(Arrays.asList(record3), Whitebox.getInternalState(workerTask, "toSend")); // Next they all succeed Whitebox.invokeMethod(workerTask, "sendRecords"); - assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); assertNull(Whitebox.getInternalState(workerTask, "toSend")); PowerMock.verifyAll(); @@ -1148,8 +1151,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { // First round expectPreliminaryCalls(OTHER_TOPIC); expectTopicCreation(TOPIC); - expectSendRecordTaskCommitRecordSucceed(false, false); - expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false); + expectSendRecordTaskCommitRecordSucceed(false); EasyMock.expect(admin.describeTopics(OTHER_TOPIC)).andReturn(Collections.emptyMap()); // First call to create the topic times out @@ -1159,19 +1162,17 @@ public class WorkerSourceTaskTest extends ThreadedTest { // Second round expectTopicCreation(OTHER_TOPIC); - expectSendRecord(OTHER_TOPIC, false, true, true, true, true, emptyHeaders()); + expectSendRecord(OTHER_TOPIC, false, true, true, true, emptyHeaders()); PowerMock.replayAll(); // Try to send 3, make first pass, second fail. Should save last two Whitebox.setInternalState(workerTask, "toSend", Arrays.asList(record1, record2, record3)); Whitebox.invokeMethod(workerTask, "sendRecords"); - assertEquals(true, Whitebox.getInternalState(workerTask, "lastSendFailed")); assertEquals(Arrays.asList(record3), Whitebox.getInternalState(workerTask, "toSend")); // Next they all succeed Whitebox.invokeMethod(workerTask, "sendRecords"); - assertEquals(false, Whitebox.getInternalState(workerTask, "lastSendFailed")); assertNull(Whitebox.getInternalState(workerTask, "toSend")); PowerMock.verifyAll(); @@ -1264,8 +1265,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(foundTopic(TOPIC)); - expectSendRecordTaskCommitRecordSucceed(false, false); - expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false); + expectSendRecordTaskCommitRecordSucceed(false); PowerMock.replayAll(); @@ -1290,8 +1291,8 @@ public class WorkerSourceTaskTest extends ThreadedTest { Capture<NewTopic> newTopicCapture = EasyMock.newCapture(); EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC)); - expectSendRecordTaskCommitRecordSucceed(false, false); - expectSendRecordTaskCommitRecordSucceed(false, false); + expectSendRecordTaskCommitRecordSucceed(false); + expectSendRecordTaskCommitRecordSucceed(false); PowerMock.replayAll(); @@ -1318,8 +1319,6 @@ public class WorkerSourceTaskTest extends ThreadedTest { private void expectPreliminaryCalls(String topic) { expectConvertHeadersAndKeyValue(topic, true, emptyHeaders()); expectApplyTransformationChain(false); - offsetWriter.offset(PARTITION, OFFSET); - PowerMock.expectLastCall(); } private CountDownLatch expectEmptyPolls(int minimum, final AtomicInteger count) throws InterruptedException { @@ -1363,9 +1362,6 @@ public class WorkerSourceTaskTest extends ThreadedTest { expectConvertHeadersAndKeyValue(false); expectApplyTransformationChain(false); - offsetWriter.offset(PARTITION, OFFSET); - PowerMock.expectLastCall(); - EasyMock.expect( producer.send(EasyMock.anyObject(ProducerRecord.class), EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class))) @@ -1373,33 +1369,28 @@ public class WorkerSourceTaskTest extends ThreadedTest { } private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() throws InterruptedException { - return expectSendRecordTaskCommitRecordSucceed(true, false); + return expectSendRecordTaskCommitRecordSucceed(true); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce(boolean isRetry) throws InterruptedException { - return expectSendRecordTaskCommitRecordSucceed(false, isRetry); + private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce() throws InterruptedException { + return expectSendRecordTaskCommitRecordSucceed(false); } private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordProducerCallbackFail() throws InterruptedException { - return expectSendRecord(TOPIC, false, false, false, false, true, emptyHeaders()); - } - - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes, boolean isRetry) throws InterruptedException { - return expectSendRecord(TOPIC, anyTimes, isRetry, true, true, true, emptyHeaders()); + return expectSendRecord(TOPIC, false, false, false, true, emptyHeaders()); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean anyTimes, boolean isRetry) throws InterruptedException { - return expectSendRecord(TOPIC, anyTimes, isRetry, true, false, true, emptyHeaders()); + private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes) throws InterruptedException { + return expectSendRecord(TOPIC, anyTimes, true, true, true, emptyHeaders()); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry, boolean succeed) throws InterruptedException { - return expectSendRecord(TOPIC, anyTimes, isRetry, succeed, true, true, emptyHeaders()); + private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean anyTimes) throws InterruptedException { + return expectSendRecord(TOPIC, anyTimes, true, false, true, emptyHeaders()); } private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord( String topic, boolean anyTimes, - boolean isRetry, boolean sendSuccess, boolean commitSuccess, boolean isMockedConverters, @@ -1413,16 +1404,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture(); - // 1. Offset data is passed to the offset storage. - if (!isRetry) { - offsetWriter.offset(PARTITION, OFFSET); - if (anyTimes) - PowerMock.expectLastCall().anyTimes(); - else - PowerMock.expectLastCall(); - } - - // 2. Converted data passed to the producer, which will need callbacks invoked for flush to work + // 1. Converted data passed to the producer, which will need callbacks invoked for flush to work IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect( producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks))); @@ -1446,7 +1428,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { expect.andAnswer(expectResponse); if (sendSuccess) { - // 3. As a result of a successful producer send callback, we'll notify the source task of the record commit + // 2. As a result of a successful producer send callback, we'll notify the source task of the record commit expectTaskCommitRecordWithOffset(anyTimes, commitSuccess); expectTaskGetTopic(anyTimes); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java index 38d7a37..b442bca 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java @@ -47,8 +47,8 @@ import static org.junit.Assert.assertTrue; public class OffsetStorageWriterTest { private static final String NAMESPACE = "namespace"; // Connect format - any types should be accepted here - private static final Map<String, String> OFFSET_KEY = Collections.singletonMap("key", "key"); - private static final Map<String, Integer> OFFSET_VALUE = Collections.singletonMap("key", 12); + private static final Map<String, Object> OFFSET_KEY = Collections.singletonMap("key", "key"); + private static final Map<String, Object> OFFSET_VALUE = Collections.singletonMap("key", 12); // Serialized private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); @@ -229,8 +229,8 @@ public class OffsetStorageWriterTest { * ensure tests complete. * @return the captured set of ByteBuffer key-value pairs passed to the storage layer */ - private void expectStore(Map<String, String> key, byte[] keySerialized, - Map<String, Integer> value, byte[] valueSerialized, + private void expectStore(Map<String, Object> key, byte[] keySerialized, + Map<String, Object> value, byte[] valueSerialized, final Callback<Void> callback, final boolean fail, final CountDownLatch waitForCompletion) {