C0urante commented on code in PR #13383: URL: https://github.com/apache/kafka/pull/13383#discussion_r1143902556
########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ########## @@ -315,33 +305,28 @@ public void testPause() throws Exception { taskFuture.get(); - PowerMock.verifyAll(); + verifyCleanStartup(); + verifyTaskGetTopic(10); Review Comment: Should this use `count.get()` instead of `10`, since it's possible that the task may have been polled extra times between when line 293 (`assertTrue(awaitLatch(pollLatch));`) completed and when the task finished pausing? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ########## @@ -832,166 +746,117 @@ private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throw // Note that we stub these to allow any number of calls because the thread will continue to // run. The count passed in + latch returned just makes sure we get *at least* that number of // calls - EasyMock.expect(sourceTask.poll()) - .andStubAnswer(() -> { - count.incrementAndGet(); - latch.countDown(); - Thread.sleep(10); - return RECORDS; - }); + doAnswer((Answer<List<SourceRecord>>) invocation -> { + count.incrementAndGet(); + latch.countDown(); + Thread.sleep(10); + return RECORDS; + }).when(sourceTask).poll(); + // Fallout of the poll() call - expectSendRecordAnyTimes(); + expectSendRecord(); return latch; } private CountDownLatch expectPolls(int count) throws InterruptedException { return expectPolls(count, new AtomicInteger()); } - @SuppressWarnings("unchecked") - private void expectSendRecordSyncFailure(Throwable error) { - expectConvertHeadersAndKeyValue(false); - expectApplyTransformationChain(false); - - EasyMock.expect( - producer.send(EasyMock.anyObject(ProducerRecord.class), - EasyMock.anyObject(org.apache.kafka.clients.producer.Callback.class))) - .andThrow(error); + private void expectSendRecord() throws InterruptedException { + expectSendRecordTaskCommitRecordSucceed(); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordAnyTimes() throws InterruptedException { - return expectSendRecordTaskCommitRecordSucceed(true); + // + private void expectSendRecordProducerCallbackFail() throws InterruptedException { + expectSendRecord(TOPIC, false, false, true, emptyHeaders()); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordOnce() throws InterruptedException { - return expectSendRecordTaskCommitRecordSucceed(false); + private void expectSendRecordTaskCommitRecordSucceed() throws InterruptedException { + expectSendRecord(TOPIC, true, true, true, emptyHeaders()); } - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordProducerCallbackFail() throws InterruptedException { - return expectSendRecord(TOPIC, false, false, false, true, emptyHeaders()); - } - - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordSucceed(boolean anyTimes) throws InterruptedException { - return expectSendRecord(TOPIC, anyTimes, true, true, true, emptyHeaders()); - } - - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecordTaskCommitRecordFail(boolean anyTimes) throws InterruptedException { - return expectSendRecord(TOPIC, anyTimes, true, false, true, emptyHeaders()); - } - - private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord( - String topic, - boolean anyTimes, - boolean sendSuccess, - boolean commitSuccess, - boolean isMockedConverters, - Headers headers + private void expectSendRecord( + String topic, + boolean sendSuccess, + boolean commitSuccess, + boolean isMockedConverters, + Headers headers ) throws InterruptedException { if (isMockedConverters) { - expectConvertHeadersAndKeyValue(topic, anyTimes, headers); + expectConvertHeadersAndKeyValue(topic, headers); } - expectApplyTransformationChain(anyTimes); - - Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture(); - - // 1. Converted data passed to the producer, which will need callbacks invoked for flush to work - IExpectationSetters<Future<RecordMetadata>> expect = EasyMock.expect( - producer.send(EasyMock.capture(sent), - EasyMock.capture(producerCallbacks))); - IAnswer<Future<RecordMetadata>> expectResponse = () -> { - synchronized (producerCallbacks) { - for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) { - if (sendSuccess) { - cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, - 0L, 0, 0), null); - } else { - cb.onCompletion(null, new TopicAuthorizationException("foo")); - } - } - producerCallbacks.reset(); - } - return sendFuture; - }; - if (anyTimes) - expect.andStubAnswer(expectResponse); - else - expect.andAnswer(expectResponse); + expectApplyTransformationChain(); if (sendSuccess) { // 2. As a result of a successful producer send callback, we'll notify the source task of the record commit - expectTaskCommitRecordWithOffset(anyTimes, commitSuccess); - expectTaskGetTopic(anyTimes); + expectTaskCommitRecordWithOffset(commitSuccess); + expectTaskGetTopic(); } - return sent; + doAnswer(producerSendAnswer(sendSuccess)) + .when(producer).send(any(ProducerRecord.class), any(org.apache.kafka.clients.producer.Callback.class)); } - private void expectConvertHeadersAndKeyValue(boolean anyTimes) { - expectConvertHeadersAndKeyValue(TOPIC, anyTimes, emptyHeaders()); + private Answer<Future<RecordMetadata>> producerSendAnswer(boolean sendSuccess) { + return invocation -> { + org.apache.kafka.clients.producer.Callback cb = invocation.getArgument(1); + if (sendSuccess) { + cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), + null); + } else { + cb.onCompletion(null, new TopicAuthorizationException("foo")); + } + + return null; + }; } - private void expectConvertHeadersAndKeyValue(String topic, boolean anyTimes, Headers headers) { - for (Header header : headers) { - IExpectationSetters<byte[]> convertHeaderExpect = EasyMock.expect(headerConverter.fromConnectHeader(topic, header.key(), Schema.STRING_SCHEMA, new String(header.value()))); - if (anyTimes) - convertHeaderExpect.andStubReturn(header.value()); - else - convertHeaderExpect.andReturn(header.value()); + private void expectConvertHeadersAndKeyValue(String topic, Headers headers) { + if (headers.iterator().hasNext()) { + when(headerConverter.fromConnectHeader(anyString(), anyString(), eq(Schema.STRING_SCHEMA), + anyString())) + .thenAnswer((Answer<byte[]>) invocation -> { + String headerValue = invocation.getArgument(3, String.class); + return headerValue.getBytes(StandardCharsets.UTF_8); + }); } - IExpectationSetters<byte[]> convertKeyExpect = EasyMock.expect(keyConverter.fromConnectData(topic, headers, KEY_SCHEMA, KEY)); - if (anyTimes) - convertKeyExpect.andStubReturn(SERIALIZED_KEY); - else - convertKeyExpect.andReturn(SERIALIZED_KEY); - IExpectationSetters<byte[]> convertValueExpect = EasyMock.expect(valueConverter.fromConnectData(topic, headers, RECORD_SCHEMA, RECORD)); - if (anyTimes) - convertValueExpect.andStubReturn(SERIALIZED_RECORD); - else - convertValueExpect.andReturn(SERIALIZED_RECORD); + + when(keyConverter.fromConnectData(eq(topic), any(Headers.class), eq(KEY_SCHEMA), eq(KEY))) + .thenReturn(SERIALIZED_KEY); + when(valueConverter.fromConnectData(eq(topic), any(Headers.class), eq(RECORD_SCHEMA), + eq(RECORD))) + .thenReturn(SERIALIZED_RECORD); } - private void expectApplyTransformationChain(boolean anyTimes) { - final Capture<SourceRecord> recordCapture = EasyMock.newCapture(); - IExpectationSetters<SourceRecord> convertKeyExpect = EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))); - if (anyTimes) - convertKeyExpect.andStubAnswer(recordCapture::getValue); - else - convertKeyExpect.andAnswer(recordCapture::getValue); + private void expectApplyTransformationChain() { + when(transformationChain.apply(any(SourceRecord.class))) + .thenAnswer(AdditionalAnswers.returnsFirstArg()); } - private void expectTaskCommitRecordWithOffset(boolean anyTimes, boolean succeed) throws InterruptedException { - sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class), EasyMock.anyObject(RecordMetadata.class)); - IExpectationSetters<Void> expect = EasyMock.expectLastCall(); + // + private void expectTaskCommitRecordWithOffset(boolean succeed) throws InterruptedException { if (!succeed) { - expect = expect.andThrow(new RuntimeException("Error committing record in source task")); - } - if (anyTimes) { - expect.anyTimes(); + doThrow(new RuntimeException("Error committing record in source task")) + .when(sourceTask).commitRecord(any(SourceRecord.class), any(RecordMetadata.class)); } } - private void expectTaskGetTopic(boolean anyTimes) { - final Capture<String> connectorCapture = EasyMock.newCapture(); - final Capture<String> topicCapture = EasyMock.newCapture(); - IExpectationSetters<TopicStatus> expect = EasyMock.expect(statusBackingStore.getTopic( - EasyMock.capture(connectorCapture), - EasyMock.capture(topicCapture))); - if (anyTimes) { - expect.andStubAnswer(() -> new TopicStatus( - topicCapture.getValue(), - new ConnectorTaskId(connectorCapture.getValue(), 0), - Time.SYSTEM.milliseconds())); - } else { - expect.andAnswer(() -> new TopicStatus( - topicCapture.getValue(), - new ConnectorTaskId(connectorCapture.getValue(), 0), - Time.SYSTEM.milliseconds())); - } - if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) { - assertEquals("job", connectorCapture.getValue()); - assertEquals(TOPIC, topicCapture.getValue()); - } + private void expectTaskGetTopic() { + when(statusBackingStore.getTopic(anyString(), anyString())).thenAnswer((Answer<TopicStatus>) invocation -> { + String connector = invocation.getArgument(0, String.class); + String topic = invocation.getArgument(1, String.class); + return new TopicStatus(topic, new ConnectorTaskId(connector, 0), Time.SYSTEM.milliseconds()); + }); + } + + private void verifyTaskGetTopic(int times) { + ArgumentCaptor<String> connectorCapture = ArgumentCaptor.forClass(String.class); + ArgumentCaptor<String> topicCapture = ArgumentCaptor.forClass(String.class); + verify(statusBackingStore, times(times)).getTopic(connectorCapture.capture(), topicCapture.capture()); + + assertEquals("job", connectorCapture.getValue()); + assertEquals(TOPIC, topicCapture.getValue()); } private boolean awaitLatch(CountDownLatch latch) { Review Comment: Out of scope, so implement (or don't) at your discretion, but this method (and invocations of it) can be replaced with [ConcurrencyUtils::awaitLatch](https://github.com/apache/kafka/blob/1d8f79964e9578f3769fd13c2535988bfc803696/connect/runtime/src/test/java/org/apache/kafka/connect/test/util/ConcurrencyUtils.java#L42-L50). ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ########## @@ -315,33 +305,28 @@ public void testPause() throws Exception { taskFuture.get(); - PowerMock.verifyAll(); + verifyCleanStartup(); + verifyTaskGetTopic(10); + verifyOffsetFlush(true); + verifyTopicCreation(TOPIC); + verify(statusListener).onPause(taskId); + verify(statusListener).onShutdown(taskId); + verify(sourceTask).stop(); + verify(offsetWriter).offset(PARTITION, OFFSET); + verifyClose(); } @Test public void testPollsInBackground() throws Exception { createWorkerTask(); - expectCleanStartup(); - final CountDownLatch pollLatch = expectPolls(10); // In this test, we don't flush, so nothing goes any further than the offset writer expectTopicCreation(TOPIC); + expectOffsetFlush(true, true); Review Comment: Nit: this can be simplified, right? ```suggestion expectOffsetFlush(true); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ########## @@ -315,33 +305,28 @@ public void testPause() throws Exception { taskFuture.get(); - PowerMock.verifyAll(); + verifyCleanStartup(); + verifyTaskGetTopic(10); + verifyOffsetFlush(true); + verifyTopicCreation(TOPIC); + verify(statusListener).onPause(taskId); + verify(statusListener).onShutdown(taskId); + verify(sourceTask).stop(); + verify(offsetWriter).offset(PARTITION, OFFSET); + verifyClose(); } @Test public void testPollsInBackground() throws Exception { createWorkerTask(); - expectCleanStartup(); - final CountDownLatch pollLatch = expectPolls(10); // In this test, we don't flush, so nothing goes any further than the offset writer Review Comment: Is this still accurate? Looks like we do end up performing an offset flush in this test. ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ########## @@ -391,37 +369,26 @@ public void testFailureInPoll() throws Exception { taskFuture.get(); assertPollMetrics(0); - PowerMock.verifyAll(); + verifyCleanStartup(); + verifyOffsetFlush(true); + verify(statusListener).onFailure(taskId, exception); + verify(sourceTask).stop(); Review Comment: Looks like we might be missing coverage for this call (originally verified in `expectEmptyOffsetFlush`): ```suggestion verify(sourceTask).commit(); verify(sourceTask).stop(); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ########## @@ -1003,31 +868,35 @@ private boolean awaitLatch(CountDownLatch latch) { return false; } - @SuppressWarnings("unchecked") private void expectOffsetFlush(boolean succeed) throws Exception { - EasyMock.expect(offsetWriter.beginFlush(EasyMock.anyLong(), EasyMock.anyObject())).andReturn(true); - Future<Void> flushFuture = PowerMock.createMock(Future.class); - EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture); + expectOffsetFlush(succeed, true); + } + + @SuppressWarnings("unchecked") + private void expectOffsetFlush(boolean succeed, boolean offsetFlush, Boolean... remainingVals) throws Exception { + when(offsetWriter.beginFlush(anyLong(), any(TimeUnit.class))).thenReturn(offsetFlush, remainingVals); + + if (offsetFlush) { Review Comment: Isn't this technically incorrect? Shouldn't we set up expectations for `offsetWriter::doFlush` if any of `remainingVals` is `true` as well? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ########## @@ -474,129 +442,107 @@ public void testFailureInPollAfterStop() throws Exception { taskFuture.get(); assertPollMetrics(0); - PowerMock.verifyAll(); + verifyCleanStartup(); + verify(statusListener).onShutdown(taskId); + verify(sourceTask).stop(); + verifyOffsetFlush(true); + verifyClose(); } @Test public void testPollReturnsNoRecords() throws Exception { // Test that the task handles an empty list of records createWorkerTask(); - expectCleanStartup(); - // We'll wait for some data, then trigger a flush - final CountDownLatch pollLatch = expectEmptyPolls(1, new AtomicInteger()); - expectEmptyOffsetFlush(); - - sourceTask.stop(); - EasyMock.expectLastCall(); - expectEmptyOffsetFlush(); - - statusListener.onShutdown(taskId); - EasyMock.expectLastCall(); - - expectClose(); - - PowerMock.replayAll(); + final CountDownLatch pollLatch = expectEmptyPolls(new AtomicInteger()); + expectOffsetFlush(true, false); workerTask.initialize(TASK_CONFIG); Future<?> taskFuture = executor.submit(workerTask); assertTrue(awaitLatch(pollLatch)); assertTrue(workerTask.commitOffsets()); + verify(offsetWriter).beginFlush(anyLong(), any(TimeUnit.class)); + reset(offsetWriter); + workerTask.stop(); assertTrue(workerTask.awaitStop(1000)); + verify(offsetWriter).beginFlush(anyLong(), any(TimeUnit.class)); + verifyNoMoreInteractions(offsetWriter); taskFuture.get(); assertPollMetrics(0); - PowerMock.verifyAll(); + verifyCleanStartup(); + verify(sourceTask).stop(); + verify(statusListener).onShutdown(taskId); + verifyClose(); } @Test public void testCommit() throws Exception { // Test that the task commits properly when prompted createWorkerTask(); - expectCleanStartup(); - // We'll wait for some data, then trigger a flush final CountDownLatch pollLatch = expectPolls(1); - expectOffsetFlush(true); - - offsetWriter.offset(PARTITION, OFFSET); Review Comment: Are we losing coverage for this invocation in this test? ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ########## @@ -433,35 +400,36 @@ public void testFailureInPollAfterCancel() throws Exception { taskFuture.get(); assertPollMetrics(0); - PowerMock.verifyAll(); + verifyCleanStartup(); + verify(producer).close(Duration.ZERO); + verify(producer).close(Duration.ofSeconds(30)); + verify(sourceTask).stop(); + verify(admin).close(any(Duration.class)); + verify(transformationChain).close(); + verify(offsetReader, times(2)).close(); Review Comment: Since `OffsetStorageReaderImpl::close` is idempotent, it doesn't hurt that we invoke it twice, but it's definitely not necessary. We just have to make sure it's invoked at least once: ```suggestion verify(offsetReader, atLeast(1)).close(); ``` ########## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java: ########## @@ -474,129 +442,107 @@ public void testFailureInPollAfterStop() throws Exception { taskFuture.get(); assertPollMetrics(0); - PowerMock.verifyAll(); + verifyCleanStartup(); + verify(statusListener).onShutdown(taskId); + verify(sourceTask).stop(); + verifyOffsetFlush(true); + verifyClose(); } @Test public void testPollReturnsNoRecords() throws Exception { // Test that the task handles an empty list of records createWorkerTask(); - expectCleanStartup(); - // We'll wait for some data, then trigger a flush - final CountDownLatch pollLatch = expectEmptyPolls(1, new AtomicInteger()); - expectEmptyOffsetFlush(); - - sourceTask.stop(); - EasyMock.expectLastCall(); - expectEmptyOffsetFlush(); - - statusListener.onShutdown(taskId); - EasyMock.expectLastCall(); - - expectClose(); - - PowerMock.replayAll(); + final CountDownLatch pollLatch = expectEmptyPolls(new AtomicInteger()); + expectOffsetFlush(true, false); workerTask.initialize(TASK_CONFIG); Future<?> taskFuture = executor.submit(workerTask); assertTrue(awaitLatch(pollLatch)); assertTrue(workerTask.commitOffsets()); + verify(offsetWriter).beginFlush(anyLong(), any(TimeUnit.class)); + reset(offsetWriter); Review Comment: I try to avoid `reset` when I can. Can we remove this line and change the next assertion for `offsetWriter::beginFlush` to use `times(2)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org