yashmayya commented on code in PR #12615:
URL: https://github.com/apache/kafka/pull/12615#discussion_r969848670


##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java:
##########
@@ -78,155 +74,80 @@ public void tearDown() {
     public void standardStartup() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("initializeAndStart")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
-
-        workerTask.initialize(TASK_CONFIG);
-        expectLastCall();
-
-        workerTask.initializeAndStart();
-        expectLastCall();
-
-        workerTask.execute();
-        expectLastCall();
-
-        statusListener.onStartup(taskId);
-        expectLastCall();
-
-        workerTask.close();
-        expectLastCall();
-
-        statusListener.onShutdown(taskId);
-        expectLastCall();
-
-        replay(workerTask);
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore);
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.run();
         workerTask.stop();
         workerTask.awaitStop(1000L);
 
-        verify(workerTask);
+        verify(statusListener).onStartup(eq(taskId));
+        verify(statusListener).onShutdown(eq(taskId));
     }
 
     @Test
     public void stopBeforeStarting() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
 
-        workerTask.initialize(TASK_CONFIG);
-        EasyMock.expectLastCall();
-
-        workerTask.close();
-        EasyMock.expectLastCall();
+            @Override
+            public void initializeAndStart() {
+                fail("This method is expected to not be invoked");
+            }
 
-        replay(workerTask);
+            @Override
+            public void execute() {
+                fail("This method is expected to not be invoked");
+            }
+        };
 
         workerTask.initialize(TASK_CONFIG);
         workerTask.stop();
         workerTask.awaitStop(1000L);
 
         // now run should not do anything
         workerTask.run();
-
-        verify(workerTask);
     }
 
     @Test
     public void cancelBeforeStopping() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
-
-        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(
-                        ConnectorTaskId.class,
-                        TaskStatus.Listener.class,
-                        TargetState.class,
-                        ClassLoader.class,
-                        ConnectMetrics.class,
-                        RetryWithToleranceOperator.class,
-                        Time.class,
-                        StatusBackingStore.class
-                )
-                .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
-                        retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
-                .addMockedMethod("initialize")
-                .addMockedMethod("initializeAndStart")
-                .addMockedMethod("execute")
-                .addMockedMethod("close")
-                .createStrictMock();
-
         final CountDownLatch stopped = new CountDownLatch(1);
-        final Thread thread = new Thread(() -> {
-            try {
-                stopped.await();
-            } catch (Exception e) {
-            }
-        });
-
-        workerTask.initialize(TASK_CONFIG);
-        EasyMock.expectLastCall();
-
-        workerTask.initializeAndStart();
-        EasyMock.expectLastCall();
-
-        workerTask.execute();
-        expectLastCall().andAnswer(() -> {
-            thread.start();
-            return null;
-        });
 
-        statusListener.onStartup(taskId);
-        expectLastCall();
+        WorkerTask workerTask = new TestWorkerTask(taskId, statusListener, 
TargetState.STARTED, loader, metrics,
+                retryWithToleranceOperator, Time.SYSTEM, statusBackingStore) {
 
-        workerTask.close();
-        expectLastCall();
-
-        // there should be no call to onShutdown()
+            @Override
+            public void execute() {
+                try {
+                    stopped.await();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
+            }
 
-        replay(workerTask);
+            // Trigger task shutdown immediately after start. The task will 
block in it's execute() method

Review Comment:
   Yikes, this is an embarrassing one, thanks 🙈



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to