This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 65bf4ae05 SAMZA-2791: Introduce callback timeout specific to watermark 
messages (#1681)
65bf4ae05 is described below

commit 65bf4ae058d4d674cc63c3f172f3f348a4513640
Author: Bharath Kumarasubramanian <[email protected]>
AuthorDate: Tue Aug 29 14:49:50 2023 -0700

    SAMZA-2791: Introduce callback timeout specific to watermark messages 
(#1681)
    
    Description:
    Currently, watermark is implemented as a special message within Samza. 
However, in terms of processing semantics, it shares similar behavior to normal 
messages processed by the task. i.e., task.callback.timeout.ms, a configuration 
to tune the time until which runloop waits for a message to be processed 
applies to both watermark and normal messages.
    
    However, this tie up constrains watermark processing logic to be bounded by 
the processing messages time bound. For Beam on Samza, we use watermark as a 
trigger to execute event timers which can take a long time depending on the 
number of timers accumulated. Especially, when the application is down, the 
timers accumulated could be too large and users will have to tune this 
configuration which will also impact fault tolerance behavior in case of 
failures/delays during processing messages.
    
    Changes:
    - Introduce callback timeout configuration specific to watermark
    - Update configuration documentation
    - Consolidate overload methods for TaskCallbackManager
    - Always use watermark specific timeout even when run loop is in draining 
mode
    
    API Changes:
    - Internal change to constructor
    
    Upgrade Instructions: None
    
    Usage Instructions:
    - Users can configure the timeout for watermark messages using 
task.callback.watermark.timeout.ms
    - Refer to the configuration documentation for more details and defaults.
---
 .../versioned/jobs/configuration-table.html        | 12 +++
 .../versioned/jobs/samza-configurations.md         |  1 +
 .../org/apache/samza/config/ApplicationConfig.java |  1 +
 .../org/apache/samza/config/RunLoopConfig.java     |  4 +
 .../java/org/apache/samza/config/TaskConfig.java   |  6 ++
 .../java/org/apache/samza/container/RunLoop.java   | 36 +++++++--
 .../org/apache/samza/task/TaskCallbackManager.java | 13 +---
 .../org/apache/samza/config/TestRunLoopConfig.java | 50 ++++++++++++
 .../org/apache/samza/container/TestRunLoop.java    | 90 +++++++++++++++++++++-
 .../apache/samza/task/TestTaskCallbackManager.java |  4 +-
 10 files changed, 196 insertions(+), 21 deletions(-)

diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 18a782f49..f4c8d4d7b 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -863,6 +863,18 @@
                     </td>
                 </tr>
 
+                <tr>
+                    <td class="property" 
id="task-callback-watermark-timeout-ms">task.callback.watermark.timeout.ms</td>
+                    <td class="default">task.callback.watermark.timeout.ms</td>
+                    <td class="description">
+                        It defines the upper bound on the time taken by the 
task to process a watermark message.
+                        When the timeout happens, it will throw a 
TaskCallbackTimeoutException and shut down the container.
+                        Default is <i>task.callback.timeout.ms</i>. 
<b>Note:</b> In event of draining state, it is recommended
+                        to keep the <i>task.callback.drain.timeout.ms</i> to 
be same as <i>task.callback.watermark.timeout.ms</i>
+                        in order to not terminate drain prematurely due to 
higher latency for watermark processing.
+                    </td>
+                </tr>
+
                 <tr>
                     <td class="property" 
id="task-consumer-batch-size">task.consumer.batch.size</td>
                     <td class="default">1</td>
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md 
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 60d8ae8b5..6188e90b4 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -85,6 +85,7 @@ These are the basic properties for setting up a Samza 
application.
 |job.systemstreampartition.<br>input.expansion.enabled|true|When enabled, this 
allows stateful jobs to expand or contract their partition count by a multiple 
of the previous count so that events from an input stream partition are 
processed on the same task as before. This will prevent erroneous results. This 
feature is disabled if the configuration is set to false or if the job is 
stateless. See 
[SEP-5](https://cwiki.apache.org/confluence/display/SAMZA/SEP-5%3A+Enable+partition+expansion
 [...]
 |job.security.manager.<br>factory|(none)|This is the factory class used to 
create the proper SecurityManager to handle security for Samza containers when 
running in a secure environment, such as Yarn with Kerberos eanbled. Samza 
ships with one security manager by 
default:<br><br>`org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory`<br>Supports
 Samza containers to run properly in a Kerberos enabled Yarn cluster. Each 
Samza container, once started, will create a SamzaContainerSecurit [...]
 |task.callback.timeout.ms|-1(no timeout)|For an AsyncStreamTask, this defines 
the max allowed time for a processAsync callback to complete. For a StreamTask, 
this is the max allowed time for a process call to complete. When the timeout 
happens,the container is shutdown. Default is no timeout.|
+|task.callback.watermark.timeout.ms|task.callback.timeout.ms|It defines the 
upper bound on the time taken by the task to process a watermark message. When 
the timeout happens, it will throw a TaskCallbackTimeoutException and shut down 
the container. In event of draining, it is recommended to keep 
`task.callback.drain.timeout.ms` to be same as 
`task.callback.watermark.timeout.ms` in order to prevent drain from terminating 
prematurely due to higher latency for watermark processing.|
 
|task.chooser.class|`org.apache.samza.`<br>`system.chooser.`<br>`RoundRobinChooserFactory`|This
 property can be optionally set to override the default [message 
chooser](../container/streams.html#messagechooser), which determines the order 
in which messages from multiple input streams are processed. The value of this 
property is the fully-qualified name of a Java class that implements 
[MessageChooserFactory](../api/javadocs/org/apache/samza/system/chooser/MessageChooserFactory.html).|
 |task.command.class|`org.apache.samza.job.`<br>`ShellCommandBuilder`|The 
fully-qualified name of the Java class which determines the command line and 
environment variables for a [container](../container/samza-container.html). It 
must be a subclass of 
[CommandBuilder](../api/javadocs/org/apache/samza/job/CommandBuilder.html). 
This defaults to task.command.class=`org.apache.samza.job.ShellCommandBuilder`.|
 |task.drop.deserialization.errors|false|This property is to define how the 
system deals with deserialization failure situation. If set to true, the system 
will skip the error messages and keep running. If set to false, the system with 
throw exceptions and fail the container. |
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index 2363bc753..da5402de9 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -114,6 +114,7 @@ public class ApplicationConfig extends MapConfig {
   public ApplicationApiType getAppApiType() {
     return ApplicationApiType.valueOf(get(APP_API_TYPE, 
ApplicationApiType.HIGH_LEVEL.name()).toUpperCase());
   }
+
   public boolean isHighLevelApiJob() {
     return getAppApiType() == ApplicationApiType.HIGH_LEVEL;
   }
diff --git 
a/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java
index ee1d7f71f..2c95c5141 100644
--- a/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java
@@ -50,6 +50,10 @@ public class RunLoopConfig extends MapConfig {
     return taskConfig.getDrainCallbackTimeoutMs();
   }
 
+  public long getWatermarkCallbackTimeoutMs() {
+    return taskConfig.getWatermarkCallbackTimeoutMs();
+  }
+
   public boolean asyncCommitEnabled() {
     return taskConfig.getAsyncCommit();
   }
diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index 4d7847a91..b1a51e522 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -97,6 +97,8 @@ public class TaskConfig extends MapConfig {
   // default timeout for triggering a callback during drain
   static final long DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS = -1L;
 
+  public static final String WATERMARK_CALLBACK_TIMEOUT_MS = 
"task.callback.watermark.timeout.ms";
+
   // enable async commit
   public static final String ASYNC_COMMIT = "task.async.commit";
   // maximum time to wait for a task worker to complete when there are no new 
messages to handle
@@ -235,6 +237,10 @@ public class TaskConfig extends MapConfig {
     return getLong(DRAIN_CALLBACK_TIMEOUT_MS, 
DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS);
   }
 
+  public long getWatermarkCallbackTimeoutMs() {
+    return getLong(WATERMARK_CALLBACK_TIMEOUT_MS, getCallbackTimeoutMs());
+  }
+
   public boolean getAsyncCommit() {
     return getBoolean(ASYNC_COMMIT, false);
   }
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java 
b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index 25c924f00..e96f986dd 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -78,8 +78,10 @@ public class RunLoop implements Runnable, Throttleable {
   private final int maxConcurrency;
   private final long windowMs;
   private final long commitMs;
-  private final long callbackTimeoutMs;
+  private final long messageCallbackTimeoutMs;
   private final long drainCallbackTimeoutMs;
+
+  private final long watermarkCallbackTimeoutMs;
   private final long maxIdleMs;
   private final SamzaContainerMetrics containerMetrics;
   private final ScheduledExecutorService workerTimer;
@@ -121,12 +123,15 @@ public class RunLoop implements Runnable, Throttleable {
     this.maxConcurrency = config.getMaxConcurrency();
     log.info("Got task concurrency: {}.", maxConcurrency);
 
-    this.callbackTimeoutMs = config.getTaskCallbackTimeoutMs();
-    log.info("Got callback timeout for task in milliseconds: {}.", 
callbackTimeoutMs);
+    this.messageCallbackTimeoutMs = config.getTaskCallbackTimeoutMs();
+    log.info("Got default callback timeout for task in milliseconds: {}.", 
messageCallbackTimeoutMs);
 
     this.drainCallbackTimeoutMs = config.getDrainCallbackTimeoutMs();
     log.info("Got callback timeout for drain in milliseconds: {}.", 
drainCallbackTimeoutMs);
 
+    this.watermarkCallbackTimeoutMs = config.getWatermarkCallbackTimeoutMs();
+    log.info("Got callback timeout for watermark in milliseconds: {}.", 
watermarkCallbackTimeoutMs);
+
     this.maxIdleMs = config.getMaxIdleMs();
     log.info("Got max idle in milliseconds: {}.", maxIdleMs);
 
@@ -152,7 +157,7 @@ public class RunLoop implements Runnable, Throttleable {
     this.latch = new Object();
     this.workerTimer = Executors.newSingleThreadScheduledExecutor();
 
-    this.callbackTimer = (callbackTimeoutMs > 0) ? 
Executors.newSingleThreadScheduledExecutor() : null;
+    this.callbackTimer = (messageCallbackTimeoutMs > 0) ? 
Executors.newSingleThreadScheduledExecutor() : null;
     this.callbackExecutor = new 
ThrottlingScheduler(config.getMaxThrottlingDelayMs());
     this.coordinatorRequests = new CoordinatorRequests(runLoopTasks.keySet());
 
@@ -492,7 +497,7 @@ public class RunLoop implements Runnable, Throttleable {
 
     AsyncTaskWorker(RunLoopTask task) {
       this.task = task;
-      this.callbackManager = new TaskCallbackManager(this, callbackTimer, 
callbackTimeoutMs, maxConcurrency, clock);
+      this.callbackManager = new TaskCallbackManager(this, callbackTimer, 
messageCallbackTimeoutMs, maxConcurrency, clock);
       Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task);
       this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet, 
!task.intermediateStreams().isEmpty(), isHighLevelApiJob,
           runId);
@@ -635,9 +640,24 @@ public class RunLoop implements Runnable, Throttleable {
           containerMetrics.processes().inc();
           // report 1 whenever the contaienr is running. Can be used to 
calculate the number of containers not running
           containerMetrics.containerRunning().set(1L);
-          return isDraining && (envelope.isDrain() || envelope.isWatermark())
-              ? callbackManager.createCallbackForDrain(task.taskName(), 
envelope, coordinator, drainCallbackTimeoutMs)
-              : callbackManager.createCallback(task.taskName(), envelope, 
coordinator);
+
+          /*
+           * Timeout used in the task callback. The value is determined based 
on the following logic
+           * 1. If run loop is in draining mode and the envelope is drain, use 
drainCallbackTimeoutMs
+           * 2. If the envelope is watermark, use watermarkCallbackTimeoutMs 
regardless of the modes. Setting a lower
+           *    watermark callback timeout during draining mode can cause 
drain to be unsuccessful prematurely and
+           *    vice-versa.
+           * 3. Use callbackTimeoutMs otherwise
+           */
+          long timeout = messageCallbackTimeoutMs;
+
+          if (envelope.isWatermark()) {
+            timeout = watermarkCallbackTimeoutMs;
+          } else if (isDraining && envelope.isDrain()) {
+            timeout = drainCallbackTimeoutMs;
+          }
+
+          return callbackManager.createCallback(task.taskName(), envelope, 
coordinator, timeout);
         }
       };
 
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
index d435615d2..8576c79e3 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
@@ -109,16 +109,9 @@ public class TaskCallbackManager {
    * @param taskName task name
    * @param envelope incoming envelope
    * @param coordinator coordinator
-   * @param drainTimeout timeout for processing drain messages.
+   * @param callbackTimeout timeout to expire the callback
    * */
-  public TaskCallbackImpl createCallbackForDrain(TaskName taskName,
-      IncomingMessageEnvelope envelope,
-      ReadableCoordinator coordinator,
-      long drainTimeout) {
-    return createCallback(taskName, envelope, coordinator, drainTimeout);
-  }
-
-  private TaskCallbackImpl createCallback(TaskName taskName,
+  public TaskCallbackImpl createCallback(TaskName taskName,
       IncomingMessageEnvelope envelope,
       ReadableCoordinator coordinator,
       long callbackTimeout) {
@@ -129,7 +122,7 @@ public class TaskCallbackManager {
         @Override
         public void run() {
           ThreadUtil.logThreadDump("Thread dump at task callback timeout");
-          String msg = "Callback for task {} " + callback.taskName + " timed 
out after " + timeout + " ms.";
+          String msg = "Callback for task {} " + callback.taskName + " timed 
out after " + callbackTimeout + " ms.";
           callback.failure(new SamzaException(msg));
         }
       };
diff --git 
a/samza-core/src/test/java/org/apache/samza/config/TestRunLoopConfig.java 
b/samza-core/src/test/java/org/apache/samza/config/TestRunLoopConfig.java
new file mode 100644
index 000000000..e0e737366
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/config/TestRunLoopConfig.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.config;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+public class TestRunLoopConfig {
+
+  @Test
+  public void testWatermarkCallbackTimeoutDefaultsToTaskCallbackTimeout() {
+    long taskCallbackTimeout = 10L;
+    Config config = new 
MapConfig(ImmutableMap.of(TaskConfig.CALLBACK_TIMEOUT_MS, 
Long.toString(taskCallbackTimeout)));
+    RunLoopConfig runLoopConfig = new RunLoopConfig(config);
+    assertEquals("Watermark callback timeout should default to task callback 
timeout",
+        taskCallbackTimeout, runLoopConfig.getWatermarkCallbackTimeoutMs());
+  }
+
+  @Test
+  public void testWatermarkCallbackTimeout() {
+    long taskCallbackTimeout = 10L;
+    long watermarkCallbackTimeout = 20L;
+    Config config = new MapConfig(ImmutableMap.of(
+        TaskConfig.CALLBACK_TIMEOUT_MS, Long.toString(taskCallbackTimeout),
+        TaskConfig.WATERMARK_CALLBACK_TIMEOUT_MS, 
Long.toString(watermarkCallbackTimeout)));
+
+    RunLoopConfig runLoopConfig = new RunLoopConfig(config);
+    assertEquals("Mismatch in watermark callback timeout",
+        watermarkCallbackTimeout, 
runLoopConfig.getWatermarkCallbackTimeoutMs());
+  }
+}
diff --git 
a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java 
b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index c3a030a9d..5afb82abc 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.samza.Partition;
 import org.apache.samza.SamzaException;
@@ -48,7 +49,7 @@ import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 
@@ -90,6 +91,8 @@ public class TestRunLoop {
   private final IncomingMessageEnvelope sspB0Drain = 
IncomingMessageEnvelope.buildDrainMessage(sspB0, runId);
   private final IncomingMessageEnvelope sspB1Drain = 
IncomingMessageEnvelope.buildDrainMessage(sspB1, runId);
 
+  private final IncomingMessageEnvelope watermarkA0 = 
IncomingMessageEnvelope.buildWatermarkEnvelope(sspA0, 1L);
+
   @Rule
   public Timeout maxTestDurationInSeconds = Timeout.seconds(120);
 
@@ -736,6 +739,91 @@ public class TestRunLoop {
     runLoop.run();
   }
 
+  @Test
+  public void testWatermarkCallbackTimeout() throws InterruptedException {
+    final CountDownLatch watermarkProcessLatch = new CountDownLatch(1);
+
+    when(mockRunLoopConfig.getTaskCallbackTimeoutMs()).thenReturn(5L);
+    when(mockRunLoopConfig.getWatermarkCallbackTimeoutMs()).thenReturn(15L);
+
+    SystemConsumers consumers = mock(SystemConsumers.class);
+
+    RunLoopTask task0 = getMockRunLoopTask(taskName0, sspA0);
+    doAnswer(invocation -> {
+      TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, 
TaskCallbackFactory.class);
+      TaskCallback callback = callbackFactory.createCallback();
+      Thread.sleep(10);
+      callback.complete();
+      return null;
+    }).when(task0).process(eq(watermarkA0), any(), any());
+
+    doAnswer(invocation -> {
+      TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, 
TaskCallbackFactory.class);
+      callbackFactory.createCallback().complete();
+      return null;
+    }).when(task0).process(eq(envelopeA00), any(), any());
+
+    doAnswer(invocation -> {
+      TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, 
TaskCallbackFactory.class);
+      watermarkProcessLatch.countDown();
+      callbackFactory.createCallback().complete();
+      return null;
+    }).when(task0).process(eq(envelopeA01), any(), any());
+
+    Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);
+
+    RunLoop runLoop = new RunLoop(tasks, executor, consumers, 
containerMetrics, () -> 0L, mockRunLoopConfig);
+
+    when(consumers.choose(false))
+        .thenReturn(envelopeA00)
+        .thenReturn(watermarkA0)
+        .thenReturn(envelopeA01)
+        .thenReturn(sspA0EndOfStream)
+        .thenReturn(null);
+
+    runLoop.run();
+    assertTrue(watermarkProcessLatch.await(15L, TimeUnit.MILLISECONDS));
+  }
+
+  @Test
+  public void testWatermarkCallbackTimeoutThrowsException() {
+    when(mockRunLoopConfig.getTaskCallbackTimeoutMs()).thenReturn(10L);
+    when(mockRunLoopConfig.getWatermarkCallbackTimeoutMs()).thenReturn(1L);
+
+    SystemConsumers consumers = mock(SystemConsumers.class);
+
+    RunLoopTask task0 = getMockRunLoopTask(taskName0, sspA0);
+    doAnswer(invocation -> {
+      TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, 
TaskCallbackFactory.class);
+      TaskCallback callback = callbackFactory.createCallback();
+      Thread.sleep(5);
+      callback.complete();
+      return null;
+    }).when(task0).process(eq(watermarkA0), any(), any());
+
+    doAnswer(invocation -> {
+      TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, 
TaskCallbackFactory.class);
+      callbackFactory.createCallback().complete();
+      return null;
+    }).when(task0).process(eq(envelopeA00), any(), any());
+
+    Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);
+
+    RunLoop runLoop = new RunLoop(tasks, executor, consumers, 
containerMetrics, () -> 0L, mockRunLoopConfig);
+
+    when(consumers.choose(false))
+        .thenReturn(envelopeA00)
+        .thenReturn(watermarkA0)
+        .thenReturn(null);
+
+    try {
+      runLoop.run();
+      fail("Watermark callback should have timed out and failed run loop");
+    } catch (SamzaException e) {
+
+    }
+  }
+
   private RunLoopTask getMockRunLoopTask(TaskName taskName, 
SystemStreamPartition ... ssps) {
     RunLoopTask task0 = mock(RunLoopTask.class);
     when(task0.systemStreamPartitions()).thenReturn(new 
HashSet<>(Arrays.asList(ssps)));
diff --git 
a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
index de418c014..972f8d3b8 100644
--- 
a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java
@@ -67,10 +67,10 @@ public class TestTaskCallbackManager {
 
   @Test
   public void testCreateDrainCallback() {
-    TaskCallbackImpl callback = callbackManager.createCallbackForDrain(new 
TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null, -1);
+    TaskCallbackImpl callback = callbackManager.createCallback(new 
TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null, -1);
     assertTrue(callback.matchSeqNum(0));
 
-    callback = callbackManager.createCallbackForDrain(new TaskName("Partition 
0"), mock(IncomingMessageEnvelope.class), null, -1);
+    callback = callbackManager.createCallback(new TaskName("Partition 0"), 
mock(IncomingMessageEnvelope.class), null, -1);
     assertTrue(callback.matchSeqNum(1));
   }
 

Reply via email to