This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 958bc0601c7 KAFKA-5756: Wait for concurrent source task offset flush 
to complete before starting next flush (#13208)
958bc0601c7 is described below

commit 958bc0601c7852b8aaf28960280b1f45efd7d850
Author: Greg Harris <[email protected]>
AuthorDate: Wed Feb 15 18:29:20 2023 -0800

    KAFKA-5756: Wait for concurrent source task offset flush to complete before 
starting next flush (#13208)
    
    Reviewers: Mickael Maison <[email protected]>, Chris Egerton 
<[email protected]>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../connect/runtime/AbstractWorkerSourceTask.java  |  4 ++
 .../runtime/ExactlyOnceWorkerSourceTask.java       | 14 ++++--
 .../kafka/connect/runtime/WorkerSinkTask.java      |  4 ++
 .../kafka/connect/runtime/WorkerSourceTask.java    | 14 +++++-
 .../kafka/connect/storage/OffsetStorageWriter.java | 53 ++++++++++++++++++----
 .../runtime/ExactlyOnceWorkerSourceTaskTest.java   |  5 +-
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  | 47 +++++++++++++++++++
 .../connect/runtime/WorkerSourceTaskTest.java      |  5 +-
 .../connect/storage/OffsetStorageWriterTest.java   | 36 ++++++++-------
 10 files changed, 147 insertions(+), 37 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 6402c315b6a..41e29453381 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -155,7 +155,7 @@
               
files="(KafkaConfigBackingStore|Values|ConnectMetricsRegistry).java"/>
 
     <suppress checks="NPathComplexity"
-              
files="(DistributedHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|TopicAdmin).java"/>
+              
files="(DistributedHerder|RestClient|RestServer|JsonConverter|KafkaConfigBackingStore|FileStreamSourceTask|WorkerSourceTask|TopicAdmin).java"/>
 
     <!-- connect tests-->
     <suppress checks="ClassDataAbstractionCoupling"
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
index 9f6104b26a0..ff15f631a73 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java
@@ -365,6 +365,10 @@ public abstract class AbstractWorkerSourceTask extends 
WorkerTask {
         } catch (InterruptedException e) {
             // Ignore and allow to exit.
         } catch (RuntimeException e) {
+            if (isCancelled()) {
+                log.debug("Skipping final offset commit as task has been 
cancelled");
+                throw e;
+            }
             try {
                 finalOffsetCommit(true);
             } catch (Exception offsetException) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
index 9bb5433f36b..d4ef5ba8106 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java
@@ -219,9 +219,6 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
         if (failed) {
             log.debug("Skipping final offset commit as task has failed");
             return;
-        } else if (isCancelled()) {
-            log.debug("Skipping final offset commit as task has been 
cancelled");
-            return;
         }
 
         // It should be safe to commit here even if we were in the middle of 
retrying on RetriableExceptions in the
@@ -263,7 +260,16 @@ class ExactlyOnceWorkerSourceTask extends 
AbstractWorkerSourceTask {
         maybeBeginTransaction();
 
         AtomicReference<Throwable> flushError = new AtomicReference<>();
-        if (offsetWriter.beginFlush()) {
+        boolean shouldFlush = false;
+        try {
+            // Begin the flush without waiting, as there should not be any 
concurrent flushes.
+            // This is because commitTransaction is always called on the same 
thread, and should always block until
+            // the flush is complete, or cancel the flush if an error occurs.
+            shouldFlush = offsetWriter.beginFlush();
+        } catch (Throwable e) {
+            flushError.compareAndSet(null, e);
+        }
+        if (shouldFlush) {
             // Now we can actually write the offsets to the internal topic.
             // No need to track the flush future here since it's guaranteed to 
complete by the time
             // Producer::commitTransaction completes
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index b1481728e30..85454a60670 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -366,6 +366,10 @@ class WorkerSinkTask extends WorkerTask {
      * the write commit.
      **/
     private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, 
boolean closing, int seqno) {
+        if (isCancelled()) {
+            log.debug("Skipping final offset commit as task has been 
cancelled");
+            return;
+        }
         if (closing) {
             doCommitSync(offsets, seqno);
         } else {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 8c588f0a3c3..f136b2a5f4f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -249,7 +249,19 @@ class WorkerSourceTask extends AbstractWorkerSourceTask {
         // though we may update them here with newer offsets for acked records.
         offsetsToCommit.offsets().forEach(offsetWriter::offset);
 
-        if (!offsetWriter.beginFlush()) {
+        boolean shouldFlush;
+        try {
+            shouldFlush = offsetWriter.beginFlush(timeout - 
time.milliseconds(), TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            log.warn("{} Interrupted while waiting for previous offset flush 
to complete, cancelling", this);
+            recordCommitFailure(time.milliseconds() - started, e);
+            return false;
+        } catch (TimeoutException e) {
+            log.warn("{} Timed out while waiting for previous offset flush to 
complete, cancelling", this);
+            recordCommitFailure(time.milliseconds() - started, e);
+            return false;
+        }
+        if (!shouldFlush) {
             // There was nothing in the offsets to process, but we still mark 
a successful offset commit.
             long durationMillis = time.milliseconds() - started;
             recordCommitSuccess(durationMillis);
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index e1da123ee1f..d3141d4758e 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -26,6 +26,9 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 /**
  * <p>
@@ -73,6 +76,7 @@ public class OffsetStorageWriter {
     private Map<Map<String, Object>, Map<String, Object>> data = new 
HashMap<>();
 
     private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
+    private final Semaphore flushInProgress = new Semaphore(1);
     // Unique ID for each flush request to handle callbacks after timeouts
     private long currentFlushId = 0;
 
@@ -100,23 +104,50 @@ public class OffsetStorageWriter {
 
     /**
      * Performs the first step of a flush operation, snapshotting the current 
state. This does not
-     * actually initiate the flush with the underlying storage.
+     * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+     * have finished before beginning a new flush.
      *
      * @return true if a flush was initiated, false if no data was available
+     * @throws ConnectException if the previous flush is not complete before 
this method is called
      */
-    public synchronized boolean beginFlush() {
-        if (flushing()) {
-            log.error("Invalid call to OffsetStorageWriter flush() while 
already flushing, the "
+    public boolean beginFlush() {
+        try {
+            return beginFlush(0, TimeUnit.NANOSECONDS);
+        } catch (InterruptedException | TimeoutException e) {
+            log.error("Invalid call to OffsetStorageWriter beginFlush() while 
already flushing, the "
                     + "framework should not allow this");
             throw new ConnectException("OffsetStorageWriter is already 
flushing");
         }
+    }
 
-        if (data.isEmpty())
-            return false;
-
-        toFlush = data;
-        data = new HashMap<>();
-        return true;
+    /**
+     * Performs the first step of a flush operation, snapshotting the current 
state. This does not
+     * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+     * have finished before beginning a new flush.
+     * <p>If and only if this method returns true, the caller must call {@link 
#doFlush(Callback)}
+     * or {@link #cancelFlush()} to finish the flush operation and allow later 
calls to complete.
+     *
+     * @param timeout A maximum duration to wait for previous flushes to 
finish before giving up on waiting
+     * @param timeUnit Units of the timeout argument
+     * @return true if a flush was initiated, false if no data was available
+     * @throws InterruptedException if this thread was interrupted while 
waiting for the previous flush to complete
+     * @throws TimeoutException if the {@code timeout} elapses before previous 
flushes are complete.
+     */
+    public boolean beginFlush(long timeout, TimeUnit timeUnit) throws 
InterruptedException, TimeoutException {
+        if (flushInProgress.tryAcquire(Math.max(0, timeout), timeUnit)) {
+            synchronized (this) {
+                if (data.isEmpty()) {
+                    flushInProgress.release();
+                    return false;
+                } else {
+                    toFlush = data;
+                    data = new HashMap<>();
+                    return true;
+                }
+            }
+        } else {
+            throw new TimeoutException("Timed out waiting for previous flush 
to finish");
+        }
     }
 
     /**
@@ -193,6 +224,7 @@ public class OffsetStorageWriter {
             toFlush.putAll(data);
             data = toFlush;
             currentFlushId++;
+            flushInProgress.release();
             toFlush = null;
         }
     }
@@ -211,6 +243,7 @@ public class OffsetStorageWriter {
             cancelFlush();
         } else {
             currentFlushId++;
+            flushInProgress.release();
             toFlush = null;
         }
         return true;
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
index e077cff912b..632f2d8f15f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTaskTest.java
@@ -79,6 +79,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
@@ -904,7 +905,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
         workerTaskFuture = executor.submit(workerTask);
     }
 
-    private void expectSuccessfulFlushes() {
+    private void expectSuccessfulFlushes() throws InterruptedException, 
TimeoutException {
         when(offsetWriter.beginFlush()).thenReturn(true);
         when(offsetWriter.doFlush(any())).thenAnswer(invocation -> {
             Callback<Void> cb = invocation.getArgument(0);
@@ -1036,7 +1037,7 @@ public class ExactlyOnceWorkerSourceTaskTest {
         verify(producer, times(count)).send(any(), any());
     }
 
-    private void verifyTransactions(int numBatches) throws 
InterruptedException {
+    private void verifyTransactions(int numBatches) throws 
InterruptedException, TimeoutException {
         VerificationMode mode = times(numBatches);
         verify(producer, mode).beginTransaction();
         verify(producer, mode).commitTransaction();
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index d8c79ff23ed..044b4a4e3a2 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -1364,6 +1364,53 @@ public class WorkerSinkTaskTest {
         }
     }
 
+    @Test
+    public void testTaskCancelPreventsFinalOffsetCommit() throws Exception {
+        createTask(initialState);
+        expectInitializeTask();
+        expectTaskGetTopic(true);
+
+        expectPollInitialAssignment();
+        
EasyMock.expect(consumer.assignment()).andReturn(INITIAL_ASSIGNMENT).times(2);
+
+        // Put one message through the task to get some offsets to commit
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall();
+
+        // the second put will return after the task is stopped and cancelled 
(asynchronously)
+        expectConsumerPoll(1);
+        expectConversionAndTransformation(1);
+        sinkTask.put(EasyMock.anyObject());
+        PowerMock.expectLastCall().andAnswer(() -> {
+            workerTask.stop();
+            workerTask.cancel();
+            return null;
+        });
+
+        // stop wakes up the consumer
+        consumer.wakeup();
+        EasyMock.expectLastCall();
+
+        // task performs normal steps in advance of committing offsets
+        final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(TOPIC_PARTITION, new OffsetAndMetadata(FIRST_OFFSET + 2));
+        offsets.put(TOPIC_PARTITION2, new OffsetAndMetadata(FIRST_OFFSET));
+        sinkTask.preCommit(offsets);
+        EasyMock.expectLastCall().andReturn(offsets);
+        sinkTask.close(EasyMock.anyObject());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.execute();
+
+        PowerMock.verifyAll();
+    }
+
     // Verify that when commitAsync is called but the supplied callback is not 
called by the consumer before a
     // rebalance occurs, the async callback does not reset the last committed 
offset from the rebalance.
     // See KAFKA-5731 for more information.
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index f2530681cf9..928f5d89430 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -417,7 +417,6 @@ public class WorkerSourceTaskTest {
 
         sourceTask.stop();
         EasyMock.expectLastCall();
-        expectOffsetFlush(true);
 
         expectClose();
 
@@ -1006,7 +1005,7 @@ public class WorkerSourceTaskTest {
 
     @SuppressWarnings("unchecked")
     private void expectOffsetFlush(boolean succeed) throws Exception {
-        EasyMock.expect(offsetWriter.beginFlush()).andReturn(true);
+        EasyMock.expect(offsetWriter.beginFlush(EasyMock.anyLong(), 
EasyMock.anyObject())).andReturn(true);
         Future<Void> flushFuture = PowerMock.createMock(Future.class);
         
EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture);
         // Should throw for failure
@@ -1024,7 +1023,7 @@ public class WorkerSourceTaskTest {
     }
 
     private void expectEmptyOffsetFlush() throws Exception {
-        EasyMock.expect(offsetWriter.beginFlush()).andReturn(false);
+        EasyMock.expect(offsetWriter.beginFlush(EasyMock.anyLong(), 
EasyMock.anyObject())).andReturn(false);
         sourceTask.commit();
         EasyMock.expectLastCall();
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
index b6eb0f6a487..8fa34ac95a3 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.connect.storage;
 
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.util.Callback;
 import org.junit.After;
 import org.junit.Before;
@@ -33,6 +32,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
@@ -82,7 +82,7 @@ public class OffsetStorageWriterTest {
 
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
 
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(isNull(), isNull());
     }
@@ -96,7 +96,7 @@ public class OffsetStorageWriterTest {
 
         writer.offset(OFFSET_KEY, null);
 
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(isNull(), isNull());
     }
@@ -111,14 +111,15 @@ public class OffsetStorageWriterTest {
 
         writer.offset(null, OFFSET_VALUE);
 
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(isNull(), isNull());
     }
 
     @Test
-    public void testNoOffsetsToFlush() {
-        assertFalse(writer.beginFlush());
+    public void testNoOffsetsToFlush() throws InterruptedException, 
TimeoutException {
+        assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
+        assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 
         // If no offsets are flushed, we should finish immediately and not 
have made any calls to the
         // underlying storage layer
@@ -135,22 +136,22 @@ public class OffsetStorageWriterTest {
         // First time the write fails
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, true, null);
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
         // Second time it succeeds
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, null);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
         verify(callback).onCompletion(isNull(), isNull());
 
         // Third time it has no data to flush so we won't get past beginFlush()
-        assertFalse(writer.beginFlush());
+        assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
     }
 
     @Test
-    public void testAlreadyFlushing() {
+    public void testAlreadyFlushing() throws InterruptedException, 
TimeoutException {
         @SuppressWarnings("unchecked")
         final Callback<Void> callback = mock(Callback.class);
         // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
@@ -158,15 +159,18 @@ public class OffsetStorageWriterTest {
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
+        assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, 
TimeUnit.MILLISECONDS));
         writer.doFlush(callback);
-        assertThrows(ConnectException.class, writer::beginFlush);
+        assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, 
TimeUnit.MILLISECONDS));
+        allowStoreCompleteCountdown.countDown();
+        assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
     }
 
     @Test
-    public void testCancelBeforeAwaitFlush() {
+    public void testCancelBeforeAwaitFlush() throws InterruptedException, 
TimeoutException {
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         writer.cancelFlush();
     }
 
@@ -180,7 +184,7 @@ public class OffsetStorageWriterTest {
         expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
         writer.offset(OFFSET_KEY, OFFSET_VALUE);
-        assertTrue(writer.beginFlush());
+        assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
         // Start the flush, then immediately cancel before allowing the mocked 
store request to finish
         Future<Void> flushFuture = writer.doFlush(callback);
         writer.cancelFlush();
@@ -214,7 +218,7 @@ public class OffsetStorageWriterTest {
                 keySerialized == null ? null : ByteBuffer.wrap(keySerialized),
                 valueSerialized == null ? null : 
ByteBuffer.wrap(valueSerialized));
         when(store.set(eq(offsetsSerialized), 
storeCallback.capture())).thenAnswer(invocation -> {
-            final Callback<Void> cb = storeCallback.getValue();
+            final Callback<Void> cb = invocation.getArgument(1);
             return service.submit(() -> {
                 if (waitForCompletion != null)
                     assertTrue(waitForCompletion.await(10000, 
TimeUnit.MILLISECONDS));

Reply via email to