This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 65bf4ae05 SAMZA-2791: Introduce callback timeout specific to watermark
messages (#1681)
65bf4ae05 is described below
commit 65bf4ae058d4d674cc63c3f172f3f348a4513640
Author: Bharath Kumarasubramanian <[email protected]>
AuthorDate: Tue Aug 29 14:49:50 2023 -0700
SAMZA-2791: Introduce callback timeout specific to watermark messages
(#1681)
Description:
Currently, watermark is implemented as a special message within Samza.
However, in terms of processing semantics, it shares similar behavior to normal
messages processed by the task. i.e., task.callback.timeout.ms, a configuration
to tune the time until which runloop waits for a message to be processed
applies to both watermark and normal messages.
However, this tie up constrains watermark processing logic to be bounded by
the processing messages time bound. For Beam on Samza, we use watermark as a
trigger to execute event timers which can take a long time depending on the
number of timers accumulated. Especially, when the application is down, the
timers accumulated could be too large and users will have to tune this
configuration which will also impact fault tolerance behavior in case of
failures/delays during processing messages.
Changes:
- Introduce callback timeout configuration specific to watermark
- Update configuration documentation
- Consolidate overload methods for TaskCallbackManager
- Always use watermark specific timeout even when run loop is in draining
mode
API Changes:
- Internal change to constructor
Upgrade Instructions: None
Usage Instructions:
- Users can configure the timeout for watermark messages using
task.callback.watermark.timeout.ms
- Refer to the configuration documentation for more details and defaults.
---
.../versioned/jobs/configuration-table.html | 12 +++
.../versioned/jobs/samza-configurations.md | 1 +
.../org/apache/samza/config/ApplicationConfig.java | 1 +
.../org/apache/samza/config/RunLoopConfig.java | 4 +
.../java/org/apache/samza/config/TaskConfig.java | 6 ++
.../java/org/apache/samza/container/RunLoop.java | 36 +++++++--
.../org/apache/samza/task/TaskCallbackManager.java | 13 +---
.../org/apache/samza/config/TestRunLoopConfig.java | 50 ++++++++++++
.../org/apache/samza/container/TestRunLoop.java | 90 +++++++++++++++++++++-
.../apache/samza/task/TestTaskCallbackManager.java | 4 +-
10 files changed, 196 insertions(+), 21 deletions(-)
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 18a782f49..f4c8d4d7b 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -863,6 +863,18 @@
</td>
</tr>
+ <tr>
+ <td class="property"
id="task-callback-watermark-timeout-ms">task.callback.watermark.timeout.ms</td>
+ <td class="default">task.callback.watermark.timeout.ms</td>
+ <td class="description">
+ It defines the upper bound on the time taken by the
task to process a watermark message.
+ When the timeout happens, it will throw a
TaskCallbackTimeoutException and shut down the container.
+ Default is <i>task.callback.timeout.ms</i>.
<b>Note:</b> In event of draining state, it is recommended
+ to keep the <i>task.callback.drain.timeout.ms</i> to
be same as <i>task.callback.watermark.timeout.ms</i>
+ in order to not terminate drain prematurely due to
higher latency for watermark processing.
+ </td>
+ </tr>
+
<tr>
<td class="property"
id="task-consumer-batch-size">task.consumer.batch.size</td>
<td class="default">1</td>
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 60d8ae8b5..6188e90b4 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -85,6 +85,7 @@ These are the basic properties for setting up a Samza
application.
|job.systemstreampartition.<br>input.expansion.enabled|true|When enabled, this
allows stateful jobs to expand or contract their partition count by a multiple
of the previous count so that events from an input stream partition are
processed on the same task as before. This will prevent erroneous results. This
feature is disabled if the configuration is set to false or if the job is
stateless. See
[SEP-5](https://cwiki.apache.org/confluence/display/SAMZA/SEP-5%3A+Enable+partition+expansion
[...]
|job.security.manager.<br>factory|(none)|This is the factory class used to
create the proper SecurityManager to handle security for Samza containers when
running in a secure environment, such as Yarn with Kerberos eanbled. Samza
ships with one security manager by
default:<br><br>`org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory`<br>Supports
Samza containers to run properly in a Kerberos enabled Yarn cluster. Each
Samza container, once started, will create a SamzaContainerSecurit [...]
|task.callback.timeout.ms|-1(no timeout)|For an AsyncStreamTask, this defines
the max allowed time for a processAsync callback to complete. For a StreamTask,
this is the max allowed time for a process call to complete. When the timeout
happens,the container is shutdown. Default is no timeout.|
+|task.callback.watermark.timeout.ms|task.callback.timeout.ms|It defines the
upper bound on the time taken by the task to process a watermark message. When
the timeout happens, it will throw a TaskCallbackTimeoutException and shut down
the container. In event of draining, it is recommended to keep
`task.callback.drain.timeout.ms` to be same as
`task.callback.watermark.timeout.ms` in order to prevent drain from terminating
prematurely due to higher latency for watermark processing.|
|task.chooser.class|`org.apache.samza.`<br>`system.chooser.`<br>`RoundRobinChooserFactory`|This
property can be optionally set to override the default [message
chooser](../container/streams.html#messagechooser), which determines the order
in which messages from multiple input streams are processed. The value of this
property is the fully-qualified name of a Java class that implements
[MessageChooserFactory](../api/javadocs/org/apache/samza/system/chooser/MessageChooserFactory.html).|
|task.command.class|`org.apache.samza.job.`<br>`ShellCommandBuilder`|The
fully-qualified name of the Java class which determines the command line and
environment variables for a [container](../container/samza-container.html). It
must be a subclass of
[CommandBuilder](../api/javadocs/org/apache/samza/job/CommandBuilder.html).
This defaults to task.command.class=`org.apache.samza.job.ShellCommandBuilder`.|
|task.drop.deserialization.errors|false|This property is to define how the
system deals with deserialization failure situation. If set to true, the system
will skip the error messages and keep running. If set to false, the system with
throw exceptions and fail the container. |
diff --git
a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index 2363bc753..da5402de9 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -114,6 +114,7 @@ public class ApplicationConfig extends MapConfig {
public ApplicationApiType getAppApiType() {
return ApplicationApiType.valueOf(get(APP_API_TYPE,
ApplicationApiType.HIGH_LEVEL.name()).toUpperCase());
}
+
public boolean isHighLevelApiJob() {
return getAppApiType() == ApplicationApiType.HIGH_LEVEL;
}
diff --git
a/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java
b/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java
index ee1d7f71f..2c95c5141 100644
--- a/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java
@@ -50,6 +50,10 @@ public class RunLoopConfig extends MapConfig {
return taskConfig.getDrainCallbackTimeoutMs();
}
+ public long getWatermarkCallbackTimeoutMs() {
+ return taskConfig.getWatermarkCallbackTimeoutMs();
+ }
+
public boolean asyncCommitEnabled() {
return taskConfig.getAsyncCommit();
}
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index 4d7847a91..b1a51e522 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -97,6 +97,8 @@ public class TaskConfig extends MapConfig {
// default timeout for triggering a callback during drain
static final long DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS = -1L;
+ public static final String WATERMARK_CALLBACK_TIMEOUT_MS =
"task.callback.watermark.timeout.ms";
+
// enable async commit
public static final String ASYNC_COMMIT = "task.async.commit";
// maximum time to wait for a task worker to complete when there are no new
messages to handle
@@ -235,6 +237,10 @@ public class TaskConfig extends MapConfig {
return getLong(DRAIN_CALLBACK_TIMEOUT_MS,
DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS);
}
+ public long getWatermarkCallbackTimeoutMs() {
+ return getLong(WATERMARK_CALLBACK_TIMEOUT_MS, getCallbackTimeoutMs());
+ }
+
public boolean getAsyncCommit() {
return getBoolean(ASYNC_COMMIT, false);
}
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index 25c924f00..e96f986dd 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -78,8 +78,10 @@ public class RunLoop implements Runnable, Throttleable {
private final int maxConcurrency;
private final long windowMs;
private final long commitMs;
- private final long callbackTimeoutMs;
+ private final long messageCallbackTimeoutMs;
private final long drainCallbackTimeoutMs;
+
+ private final long watermarkCallbackTimeoutMs;
private final long maxIdleMs;
private final SamzaContainerMetrics containerMetrics;
private final ScheduledExecutorService workerTimer;
@@ -121,12 +123,15 @@ public class RunLoop implements Runnable, Throttleable {
this.maxConcurrency = config.getMaxConcurrency();
log.info("Got task concurrency: {}.", maxConcurrency);
- this.callbackTimeoutMs = config.getTaskCallbackTimeoutMs();
- log.info("Got callback timeout for task in milliseconds: {}.",
callbackTimeoutMs);
+ this.messageCallbackTimeoutMs = config.getTaskCallbackTimeoutMs();
+ log.info("Got default callback timeout for task in milliseconds: {}.",
messageCallbackTimeoutMs);
this.drainCallbackTimeoutMs = config.getDrainCallbackTimeoutMs();
log.info("Got callback timeout for drain in milliseconds: {}.",
drainCallbackTimeoutMs);
+ this.watermarkCallbackTimeoutMs = config.getWatermarkCallbackTimeoutMs();
+ log.info("Got callback timeout for watermark in milliseconds: {}.",
watermarkCallbackTimeoutMs);
+
this.maxIdleMs = config.getMaxIdleMs();
log.info("Got max idle in milliseconds: {}.", maxIdleMs);
@@ -152,7 +157,7 @@ public class RunLoop implements Runnable, Throttleable {
this.latch = new Object();
this.workerTimer = Executors.newSingleThreadScheduledExecutor();
- this.callbackTimer = (callbackTimeoutMs > 0) ?
Executors.newSingleThreadScheduledExecutor() : null;
+ this.callbackTimer = (messageCallbackTimeoutMs > 0) ?
Executors.newSingleThreadScheduledExecutor() : null;
this.callbackExecutor = new
ThrottlingScheduler(config.getMaxThrottlingDelayMs());
this.coordinatorRequests = new CoordinatorRequests(runLoopTasks.keySet());
@@ -492,7 +497,7 @@ public class RunLoop implements Runnable, Throttleable {
AsyncTaskWorker(RunLoopTask task) {
this.task = task;
- this.callbackManager = new TaskCallbackManager(this, callbackTimer,
callbackTimeoutMs, maxConcurrency, clock);
+ this.callbackManager = new TaskCallbackManager(this, callbackTimer,
messageCallbackTimeoutMs, maxConcurrency, clock);
Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task);
this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet,
!task.intermediateStreams().isEmpty(), isHighLevelApiJob,
runId);
@@ -635,9 +640,24 @@ public class RunLoop implements Runnable, Throttleable {
containerMetrics.processes().inc();
// report 1 whenever the contaienr is running. Can be used to
calculate the number of containers not running
containerMetrics.containerRunning().set(1L);
- return isDraining && (envelope.isDrain() || envelope.isWatermark())
- ? callbackManager.createCallbackForDrain(task.taskName(),
envelope, coordinator, drainCallbackTimeoutMs)
- : callbackManager.createCallback(task.taskName(), envelope,
coordinator);
+
+ /*
+ * Timeout used in the task callback. The value is determined based
on the following logic
+ * 1. If run loop is in draining mode and the envelope is drain, use
drainCallbackTimeoutMs
+ * 2. If the envelope is watermark, use watermarkCallbackTimeoutMs
regardless of the modes. Setting a lower
+ * watermark callback timeout during draining mode can cause
drain to be unsuccessful prematurely and
+ * vice-versa.
+ * 3. Use callbackTimeoutMs otherwise
+ */
+ long timeout = messageCallbackTimeoutMs;
+
+ if (envelope.isWatermark()) {
+ timeout = watermarkCallbackTimeoutMs;
+ } else if (isDraining && envelope.isDrain()) {
+ timeout = drainCallbackTimeoutMs;
+ }
+
+ return callbackManager.createCallback(task.taskName(), envelope,
coordinator, timeout);
}
};
diff --git
a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
index d435615d2..8576c79e3 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
@@ -109,16 +109,9 @@ public class TaskCallbackManager {
* @param taskName task name
* @param envelope incoming envelope
* @param coordinator coordinator
- * @param drainTimeout timeout for processing drain messages.
+ * @param callbackTimeout timeout to expire the callback
* */
- public TaskCallbackImpl createCallbackForDrain(TaskName taskName,
- IncomingMessageEnvelope envelope,
- ReadableCoordinator coordinator,
- long drainTimeout) {
- return createCallback(taskName, envelope, coordinator, drainTimeout);
- }
-
- private TaskCallbackImpl createCallback(TaskName taskName,
+ public TaskCallbackImpl createCallback(TaskName taskName,
IncomingMessageEnvelope envelope,
ReadableCoordinator coordinator,
long callbackTimeout) {
@@ -129,7 +122,7 @@ public class TaskCallbackManager {
@Override
public void run() {
ThreadUtil.logThreadDump("Thread dump at task callback timeout");
- String msg = "Callback for task {} " + callback.taskName + " timed
out after " + timeout + " ms.";
+ String msg = "Callback for task {} " + callback.taskName + " timed
out after " + callbackTimeout + " ms.";
callback.failure(new SamzaException(msg));
}
};
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestRunLoopConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestRunLoopConfig.java
new file mode 100644
index 000000000..e0e737366
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestRunLoopConfig.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestRunLoopConfig {
+
+ @Test
+ public void testWatermarkCallbackTimeoutDefaultsToTaskCallbackTimeout() {
+ long taskCallbackTimeout = 10L;
+ Config config = new
MapConfig(ImmutableMap.of(TaskConfig.CALLBACK_TIMEOUT_MS,
Long.toString(taskCallbackTimeout)));
+ RunLoopConfig runLoopConfig = new RunLoopConfig(config);
+ assertEquals("Watermark callback timeout should default to task callback
timeout",
+ taskCallbackTimeout, runLoopConfig.getWatermarkCallbackTimeoutMs());
+ }
+
+ @Test
+ public void testWatermarkCallbackTimeout() {
+ long taskCallbackTimeout = 10L;
+ long watermarkCallbackTimeout = 20L;
+ Config config = new MapConfig(ImmutableMap.of(
+ TaskConfig.CALLBACK_TIMEOUT_MS, Long.toString(taskCallbackTimeout),
+ TaskConfig.WATERMARK_CALLBACK_TIMEOUT_MS,
Long.toString(watermarkCallbackTimeout)));
+
+ RunLoopConfig runLoopConfig = new RunLoopConfig(config);
+ assertEquals("Mismatch in watermark callback timeout",
+ watermarkCallbackTimeout,
runLoopConfig.getWatermarkCallbackTimeoutMs());
+ }
+}
diff --git
a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index c3a030a9d..5afb82abc 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
@@ -48,7 +49,7 @@ import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@@ -90,6 +91,8 @@ public class TestRunLoop {
private final IncomingMessageEnvelope sspB0Drain =
IncomingMessageEnvelope.buildDrainMessage(sspB0, runId);
private final IncomingMessageEnvelope sspB1Drain =
IncomingMessageEnvelope.buildDrainMessage(sspB1, runId);
+ private final IncomingMessageEnvelope watermarkA0 =
IncomingMessageEnvelope.buildWatermarkEnvelope(sspA0, 1L);
+
@Rule
public Timeout maxTestDurationInSeconds = Timeout.seconds(120);
@@ -736,6 +739,91 @@ public class TestRunLoop {
runLoop.run();
}
+ @Test
+ public void testWatermarkCallbackTimeout() throws InterruptedException {
+ final CountDownLatch watermarkProcessLatch = new CountDownLatch(1);
+
+ when(mockRunLoopConfig.getTaskCallbackTimeoutMs()).thenReturn(5L);
+ when(mockRunLoopConfig.getWatermarkCallbackTimeoutMs()).thenReturn(15L);
+
+ SystemConsumers consumers = mock(SystemConsumers.class);
+
+ RunLoopTask task0 = getMockRunLoopTask(taskName0, sspA0);
+ doAnswer(invocation -> {
+ TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2,
TaskCallbackFactory.class);
+ TaskCallback callback = callbackFactory.createCallback();
+ Thread.sleep(10);
+ callback.complete();
+ return null;
+ }).when(task0).process(eq(watermarkA0), any(), any());
+
+ doAnswer(invocation -> {
+ TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2,
TaskCallbackFactory.class);
+ callbackFactory.createCallback().complete();
+ return null;
+ }).when(task0).process(eq(envelopeA00), any(), any());
+
+ doAnswer(invocation -> {
+ TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2,
TaskCallbackFactory.class);
+ watermarkProcessLatch.countDown();
+ callbackFactory.createCallback().complete();
+ return null;
+ }).when(task0).process(eq(envelopeA01), any(), any());
+
+ Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);
+
+ RunLoop runLoop = new RunLoop(tasks, executor, consumers,
containerMetrics, () -> 0L, mockRunLoopConfig);
+
+ when(consumers.choose(false))
+ .thenReturn(envelopeA00)
+ .thenReturn(watermarkA0)
+ .thenReturn(envelopeA01)
+ .thenReturn(sspA0EndOfStream)
+ .thenReturn(null);
+
+ runLoop.run();
+ assertTrue(watermarkProcessLatch.await(15L, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testWatermarkCallbackTimeoutThrowsException() {
+ when(mockRunLoopConfig.getTaskCallbackTimeoutMs()).thenReturn(10L);
+ when(mockRunLoopConfig.getWatermarkCallbackTimeoutMs()).thenReturn(1L);
+
+ SystemConsumers consumers = mock(SystemConsumers.class);
+
+ RunLoopTask task0 = getMockRunLoopTask(taskName0, sspA0);
+ doAnswer(invocation -> {
+ TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2,
TaskCallbackFactory.class);
+ TaskCallback callback = callbackFactory.createCallback();
+ Thread.sleep(5);
+ callback.complete();
+ return null;
+ }).when(task0).process(eq(watermarkA0), any(), any());
+
+ doAnswer(invocation -> {
+ TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2,
TaskCallbackFactory.class);
+ callbackFactory.createCallback().complete();
+ return null;
+ }).when(task0).process(eq(envelopeA00), any(), any());
+
+ Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);
+
+ RunLoop runLoop = new RunLoop(tasks, executor, consumers,
containerMetrics, () -> 0L, mockRunLoopConfig);
+
+ when(consumers.choose(false))
+ .thenReturn(envelopeA00)
+ .thenReturn(watermarkA0)
+ .thenReturn(null);
+
+ try {
+ runLoop.run();
+ fail("Watermark callback should have timed out and failed run loop");
+ } catch (SamzaException e) {
+
+ }
+ }
+
private RunLoopTask getMockRunLoopTask(TaskName taskName,
SystemStreamPartition ... ssps) {
RunLoopTask task0 = mock(RunLoopTask.class);
when(task0.systemStreamPartitions()).thenReturn(new
HashSet<>(Arrays.asList(ssps)));
diff --git
a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
index de418c014..972f8d3b8 100644
---
a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
+++
b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
@@ -67,10 +67,10 @@ public class TestTaskCallbackManager {
@Test
public void testCreateDrainCallback() {
- TaskCallbackImpl callback = callbackManager.createCallbackForDrain(new
TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null, -1);
+ TaskCallbackImpl callback = callbackManager.createCallback(new
TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null, -1);
assertTrue(callback.matchSeqNum(0));
- callback = callbackManager.createCallbackForDrain(new TaskName("Partition
0"), mock(IncomingMessageEnvelope.class), null, -1);
+ callback = callbackManager.createCallback(new TaskName("Partition 0"),
mock(IncomingMessageEnvelope.class), null, -1);
assertTrue(callback.matchSeqNum(1));
}