shekhar-rajak commented on code in PR #12735:
URL: https://github.com/apache/kafka/pull/12735#discussion_r993520740
##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java:
##########
@@ -494,72 +502,29 @@ private void assertErrorHandlingMetricValue(String name,
double expected) {
assertEquals(expected, measured, 0.001d);
}
- private void expectInitializeTask() {
- consumer.subscribe(EasyMock.eq(singletonList(TOPIC)),
EasyMock.capture(rebalanceListener));
- PowerMock.expectLastCall();
-
- sinkTask.initialize(EasyMock.capture(sinkTaskContext));
- PowerMock.expectLastCall();
- sinkTask.start(TASK_PROPS);
- PowerMock.expectLastCall();
- }
-
- private void expectTaskGetTopic(boolean anyTimes) {
- final Capture<String> connectorCapture = EasyMock.newCapture();
- final Capture<String> topicCapture = EasyMock.newCapture();
- IExpectationSetters<TopicStatus> expect =
EasyMock.expect(statusBackingStore.getTopic(
- EasyMock.capture(connectorCapture),
- EasyMock.capture(topicCapture)));
- if (anyTimes) {
- expect.andStubAnswer(() -> new TopicStatus(
- topicCapture.getValue(),
- new ConnectorTaskId(connectorCapture.getValue(), 0),
- Time.SYSTEM.milliseconds()));
- } else {
- expect.andAnswer(() -> new TopicStatus(
- topicCapture.getValue(),
- new ConnectorTaskId(connectorCapture.getValue(), 0),
- Time.SYSTEM.milliseconds()));
- }
- if (connectorCapture.hasCaptured() && topicCapture.hasCaptured()) {
- assertEquals("job", connectorCapture.getValue());
- assertEquals(TOPIC, topicCapture.getValue());
- }
- }
-
- private void expectClose() {
- producer.close(EasyMock.anyObject(Duration.class));
- EasyMock.expectLastCall();
-
- admin.close(EasyMock.anyObject(Duration.class));
- EasyMock.expectLastCall();
+ private void expectClose() throws IOException {
- offsetReader.close();
- EasyMock.expectLastCall();
-
- offsetStore.stop();
- EasyMock.expectLastCall();
-
- try {
- headerConverter.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- EasyMock.expectLastCall();
+ verify(producer).close(any(Duration.class));
+ verify(admin).close(any(Duration.class));
+ verify(offsetReader).close();
+ verify(offsetStore).stop();
+ // headerConverter.close() can throw IOException
+ verify(headerConverter).close();
}
private void expectTopicCreation(String topic) {
if (workerConfig.topicCreationEnable()) {
-
EasyMock.expect(admin.describeTopics(topic)).andReturn(Collections.emptyMap());
- Capture<NewTopic> newTopicCapture = EasyMock.newCapture();
+
when(admin.describeTopics(topic)).thenReturn(Collections.emptyMap());
+ ArgumentCaptor<NewTopic> newTopicCapture =
+ ArgumentCaptor.forClass(NewTopic.class);
if (enableTopicCreation) {
Set<String> created = Collections.singleton(topic);
Set<String> existing = Collections.emptySet();
TopicAdmin.TopicCreationResponse response = new
TopicAdmin.TopicCreationResponse(created, existing);
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(response);
+
when(admin.createOrFindTopics(newTopicCapture.capture())).thenReturn(response);
Review Comment:
Note: these capture not checked. Not sure why we have these.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]